Browse Source

fix: database lock timeout by separating external MCP calls from transactions (#22821)

tags/1.7.0
Novice 3 months ago
parent
commit
e6913744ae
No account linked to committer's email address
1 changed files with 48 additions and 25 deletions
  1. 48
    25
      api/services/tools/mcp_tools_manage_service.py

+ 48
- 25
api/services/tools/mcp_tools_manage_service.py View File

@classmethod @classmethod
def list_mcp_tool_from_remote_server(cls, tenant_id: str, provider_id: str) -> ToolProviderApiEntity: def list_mcp_tool_from_remote_server(cls, tenant_id: str, provider_id: str) -> ToolProviderApiEntity:
mcp_provider = cls.get_mcp_provider_by_provider_id(provider_id, tenant_id) mcp_provider = cls.get_mcp_provider_by_provider_id(provider_id, tenant_id)
server_url = mcp_provider.decrypted_server_url
authed = mcp_provider.authed

try: try:
with MCPClient(
mcp_provider.decrypted_server_url, provider_id, tenant_id, authed=mcp_provider.authed, for_list=True
) as mcp_client:
with MCPClient(server_url, provider_id, tenant_id, authed=authed, for_list=True) as mcp_client:
tools = mcp_client.list_tools() tools = mcp_client.list_tools()
except MCPAuthError: except MCPAuthError:
raise ValueError("Please auth the tool first") raise ValueError("Please auth the tool first")
except MCPError as e: except MCPError as e:
raise ValueError(f"Failed to connect to MCP server: {e}") raise ValueError(f"Failed to connect to MCP server: {e}")
mcp_provider.tools = json.dumps([tool.model_dump() for tool in tools])
mcp_provider.authed = True
mcp_provider.updated_at = datetime.now()
db.session.commit()

try:
mcp_provider = cls.get_mcp_provider_by_provider_id(provider_id, tenant_id)
mcp_provider.tools = json.dumps([tool.model_dump() for tool in tools])
mcp_provider.authed = True
mcp_provider.updated_at = datetime.now()
db.session.commit()
except Exception:
db.session.rollback()
raise

user = mcp_provider.load_user() user = mcp_provider.load_user()
return ToolProviderApiEntity( return ToolProviderApiEntity(
id=mcp_provider.id, id=mcp_provider.id,
server_identifier: str, server_identifier: str,
): ):
mcp_provider = cls.get_mcp_provider_by_provider_id(provider_id, tenant_id) mcp_provider = cls.get_mcp_provider_by_provider_id(provider_id, tenant_id)
mcp_provider.updated_at = datetime.now()
mcp_provider.name = name
mcp_provider.icon = (
json.dumps({"content": icon, "background": icon_background}) if icon_type == "emoji" else icon
)
mcp_provider.server_identifier = server_identifier

reconnect_result = None
encrypted_server_url = None
server_url_hash = None


if UNCHANGED_SERVER_URL_PLACEHOLDER not in server_url: if UNCHANGED_SERVER_URL_PLACEHOLDER not in server_url:
encrypted_server_url = encrypter.encrypt_token(tenant_id, server_url) encrypted_server_url = encrypter.encrypt_token(tenant_id, server_url)
mcp_provider.server_url = encrypted_server_url
server_url_hash = hashlib.sha256(server_url.encode()).hexdigest() server_url_hash = hashlib.sha256(server_url.encode()).hexdigest()


if server_url_hash != mcp_provider.server_url_hash: if server_url_hash != mcp_provider.server_url_hash:
cls._re_connect_mcp_provider(mcp_provider, provider_id, tenant_id)
mcp_provider.server_url_hash = server_url_hash
reconnect_result = cls._re_connect_mcp_provider(server_url, provider_id, tenant_id)
try: try:
mcp_provider.updated_at = datetime.now()
mcp_provider.name = name
mcp_provider.icon = (
json.dumps({"content": icon, "background": icon_background}) if icon_type == "emoji" else icon
)
mcp_provider.server_identifier = server_identifier

if encrypted_server_url is not None and server_url_hash is not None:
mcp_provider.server_url = encrypted_server_url
mcp_provider.server_url_hash = server_url_hash

if reconnect_result:
mcp_provider.authed = reconnect_result["authed"]
mcp_provider.tools = reconnect_result["tools"]
mcp_provider.encrypted_credentials = reconnect_result["encrypted_credentials"]

db.session.commit() db.session.commit()
except IntegrityError as e: except IntegrityError as e:
db.session.rollback() db.session.rollback()
if "unique_mcp_provider_server_identifier" in error_msg: if "unique_mcp_provider_server_identifier" in error_msg:
raise ValueError(f"MCP tool {server_identifier} already exists") raise ValueError(f"MCP tool {server_identifier} already exists")
raise raise
except Exception:
db.session.rollback()
raise


@classmethod @classmethod
def update_mcp_provider_credentials( def update_mcp_provider_credentials(
db.session.commit() db.session.commit()


@classmethod @classmethod
def _re_connect_mcp_provider(cls, mcp_provider: MCPToolProvider, provider_id: str, tenant_id: str):
"""re-connect mcp provider"""
def _re_connect_mcp_provider(cls, server_url: str, provider_id: str, tenant_id: str):
try: try:
with MCPClient( with MCPClient(
mcp_provider.decrypted_server_url,
server_url,
provider_id, provider_id,
tenant_id, tenant_id,
authed=False, authed=False,
for_list=True, for_list=True,
) as mcp_client: ) as mcp_client:
tools = mcp_client.list_tools() tools = mcp_client.list_tools()
mcp_provider.authed = True
mcp_provider.tools = json.dumps([tool.model_dump() for tool in tools])
return {
"authed": True,
"tools": json.dumps([tool.model_dump() for tool in tools]),
"encrypted_credentials": "{}",
}
except MCPAuthError: except MCPAuthError:
mcp_provider.authed = False
mcp_provider.tools = "[]"
return {"authed": False, "tools": "[]", "encrypted_credentials": "{}"}
except MCPError as e: except MCPError as e:
raise ValueError(f"Failed to re-connect MCP server: {e}") from e raise ValueError(f"Failed to re-connect MCP server: {e}") from e
# reset credentials
mcp_provider.encrypted_credentials = "{}"

Loading…
Cancel
Save