選択できるのは25トピックまでです。 トピックは、先頭が英数字で、英数字とダッシュ('-')を使用した35文字以内のものにしてください。

mcp_server_app.py 16KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428
  1. from flask import Response, request
  2. from flask_login import current_user, login_required
  3. from api.db import VALID_MCP_SERVER_TYPES
  4. from api.db.db_models import MCPServer
  5. from api.db.services.mcp_server_service import MCPServerService
  6. from api.db.services.user_service import TenantService
  7. from api.settings import RetCode
  8. from api.utils import get_uuid
  9. from api.utils.api_utils import get_data_error_result, get_json_result, server_error_response, validate_request
  10. from api.utils.mcp_server import get_mcp_tools
  11. from api.utils.web_utils import get_float, safe_json_parse
  12. from rag.utils.mcp_tool_call_conn import MCPToolCallSession, close_multiple_mcp_toolcall_sessions
  13. @manager.route("/list", methods=["POST"]) # noqa: F821
  14. @login_required
  15. def list_mcp() -> Response:
  16. keywords = request.args.get("keywords", "")
  17. page_number = int(request.args.get("page", 0))
  18. items_per_page = int(request.args.get("page_size", 0))
  19. orderby = request.args.get("orderby", "create_time")
  20. if request.args.get("desc", "true").lower() == "false":
  21. desc = False
  22. else:
  23. desc = True
  24. req = request.get_json()
  25. mcp_ids = req.get("mcp_ids", [])
  26. try:
  27. servers = MCPServerService.get_servers(current_user.id, mcp_ids, 0, 0, orderby, desc, keywords) or []
  28. total = len(servers)
  29. if page_number and items_per_page:
  30. servers = servers[(page_number - 1) * items_per_page : page_number * items_per_page]
  31. return get_json_result(data={"mcp_servers": servers, "total": total})
  32. except Exception as e:
  33. return server_error_response(e)
  34. @manager.route("/detail", methods=["GET"]) # noqa: F821
  35. @login_required
  36. def detail() -> Response:
  37. mcp_id = request.args["mcp_id"]
  38. try:
  39. mcp_server = MCPServerService.get_or_none(id=mcp_id, tenant_id=current_user.id)
  40. if mcp_server is None:
  41. return get_json_result(code=RetCode.NOT_FOUND, data=None)
  42. return get_json_result(data=mcp_server.to_dict())
  43. except Exception as e:
  44. return server_error_response(e)
  45. @manager.route("/create", methods=["POST"]) # noqa: F821
  46. @login_required
  47. @validate_request("name", "url", "server_type")
  48. def create() -> Response:
  49. req = request.get_json()
  50. server_type = req.get("server_type", "")
  51. if server_type not in VALID_MCP_SERVER_TYPES:
  52. return get_data_error_result(message="Unsupported MCP server type.")
  53. server_name = req.get("name", "")
  54. if not server_name or len(server_name.encode("utf-8")) > 255:
  55. return get_data_error_result(message=f"Invaild MCP name or length is {len(server_name)} which is large than 255.")
  56. e, _ = MCPServerService.get_by_name_and_tenant(name=server_name, tenant_id=current_user.id)
  57. if e:
  58. return get_data_error_result(message="Duplicated MCP server name.")
  59. url = req.get("url", "")
  60. if not url:
  61. return get_data_error_result(message="Invaild url.")
  62. headers = safe_json_parse(req.get("headers", {}))
  63. req["headers"] = headers
  64. variables = safe_json_parse(req.get("variables", {}))
  65. variables.pop("tools", None)
  66. timeout = get_float(req, "timeout", 10)
  67. try:
  68. req["id"] = get_uuid()
  69. req["tenant_id"] = current_user.id
  70. e, _ = TenantService.get_by_id(current_user.id)
  71. if not e:
  72. return get_data_error_result(message="Tenant not found.")
  73. mcp_server = MCPServer(id=server_name, name=server_name, url=url, server_type=server_type, variables=variables, headers=headers)
  74. server_tools, err_message = get_mcp_tools([mcp_server], timeout)
  75. if err_message:
  76. return get_data_error_result(err_message)
  77. tools = server_tools[server_name]
  78. tools = {tool["name"]: tool for tool in tools if isinstance(tool, dict) and "name" in tool}
  79. variables["tools"] = tools
  80. req["variables"] = variables
  81. if not MCPServerService.insert(**req):
  82. return get_data_error_result("Failed to create MCP server.")
  83. return get_json_result(data=req)
  84. except Exception as e:
  85. return server_error_response(e)
  86. @manager.route("/update", methods=["POST"]) # noqa: F821
  87. @login_required
  88. @validate_request("mcp_id")
  89. def update() -> Response:
  90. req = request.get_json()
  91. mcp_id = req.get("mcp_id", "")
  92. e, mcp_server = MCPServerService.get_by_id(mcp_id)
  93. if not e or mcp_server.tenant_id != current_user.id:
  94. return get_data_error_result(message=f"Cannot find MCP server {mcp_id} for user {current_user.id}")
  95. server_type = req.get("server_type", mcp_server.server_type)
  96. if server_type and server_type not in VALID_MCP_SERVER_TYPES:
  97. return get_data_error_result(message="Unsupported MCP server type.")
  98. server_name = req.get("name", mcp_server.name)
  99. if server_name and len(server_name.encode("utf-8")) > 255:
  100. return get_data_error_result(message=f"Invaild MCP name or length is {len(server_name)} which is large than 255.")
  101. url = req.get("url", mcp_server.url)
  102. if not url:
  103. return get_data_error_result(message="Invaild url.")
  104. headers = safe_json_parse(req.get("headers", mcp_server.headers))
  105. req["headers"] = headers
  106. variables = safe_json_parse(req.get("variables", mcp_server.variables))
  107. variables.pop("tools", None)
  108. timeout = get_float(req, "timeout", 10)
  109. try:
  110. req["tenant_id"] = current_user.id
  111. req.pop("mcp_id", None)
  112. req["id"] = mcp_id
  113. mcp_server = MCPServer(id=server_name, name=server_name, url=url, server_type=server_type, variables=variables, headers=headers)
  114. server_tools, err_message = get_mcp_tools([mcp_server], timeout)
  115. if err_message:
  116. return get_data_error_result(err_message)
  117. tools = server_tools[server_name]
  118. tools = {tool["name"]: tool for tool in tools if isinstance(tool, dict) and "name" in tool}
  119. variables["tools"] = tools
  120. req["variables"] = variables
  121. if not MCPServerService.filter_update([MCPServer.id == mcp_id, MCPServer.tenant_id == current_user.id], req):
  122. return get_data_error_result(message="Failed to updated MCP server.")
  123. e, updated_mcp = MCPServerService.get_by_id(req["id"])
  124. if not e:
  125. return get_data_error_result(message="Failed to fetch updated MCP server.")
  126. return get_json_result(data=updated_mcp.to_dict())
  127. except Exception as e:
  128. return server_error_response(e)
  129. @manager.route("/rm", methods=["POST"]) # noqa: F821
  130. @login_required
  131. @validate_request("mcp_ids")
  132. def rm() -> Response:
  133. req = request.get_json()
  134. mcp_ids = req.get("mcp_ids", [])
  135. try:
  136. req["tenant_id"] = current_user.id
  137. if not MCPServerService.delete_by_ids(mcp_ids):
  138. return get_data_error_result(message=f"Failed to delete MCP servers {mcp_ids}")
  139. return get_json_result(data=True)
  140. except Exception as e:
  141. return server_error_response(e)
  142. @manager.route("/import", methods=["POST"]) # noqa: F821
  143. @login_required
  144. @validate_request("mcpServers")
  145. def import_multiple() -> Response:
  146. req = request.get_json()
  147. servers = req.get("mcpServers", {})
  148. if not servers:
  149. return get_data_error_result(message="No MCP servers provided.")
  150. timeout = get_float(req, "timeout", 10)
  151. results = []
  152. try:
  153. for server_name, config in servers.items():
  154. if not all(key in config for key in {"type", "url"}):
  155. results.append({"server": server_name, "success": False, "message": "Missing required fields (type or url)"})
  156. continue
  157. if not server_name or len(server_name.encode("utf-8")) > 255:
  158. results.append({"server": server_name, "success": False, "message": f"Invaild MCP name or length is {len(server_name)} which is large than 255."})
  159. continue
  160. base_name = server_name
  161. new_name = base_name
  162. counter = 0
  163. while True:
  164. e, _ = MCPServerService.get_by_name_and_tenant(name=new_name, tenant_id=current_user.id)
  165. if not e:
  166. break
  167. new_name = f"{base_name}_{counter}"
  168. counter += 1
  169. create_data = {
  170. "id": get_uuid(),
  171. "tenant_id": current_user.id,
  172. "name": new_name,
  173. "url": config["url"],
  174. "server_type": config["type"],
  175. "variables": {"authorization_token": config.get("authorization_token", "")},
  176. }
  177. headers = {"authorization_token": config["authorization_token"]} if "authorization_token" in config else {}
  178. variables = {k: v for k, v in config.items() if k not in {"type", "url", "headers"}}
  179. mcp_server = MCPServer(id=new_name, name=new_name, url=config["url"], server_type=config["type"], variables=variables, headers=headers)
  180. server_tools, err_message = get_mcp_tools([mcp_server], timeout)
  181. if err_message:
  182. results.append({"server": base_name, "success": False, "message": err_message})
  183. continue
  184. tools = server_tools[new_name]
  185. tools = {tool["name"]: tool for tool in tools if isinstance(tool, dict) and "name" in tool}
  186. create_data["variables"]["tools"] = tools
  187. if MCPServerService.insert(**create_data):
  188. result = {"server": server_name, "success": True, "action": "created", "id": create_data["id"], "new_name": new_name}
  189. if new_name != base_name:
  190. result["message"] = f"Renamed from '{base_name}' to '{new_name}' avoid duplication"
  191. results.append(result)
  192. else:
  193. results.append({"server": server_name, "success": False, "message": "Failed to create MCP server."})
  194. return get_json_result(data={"results": results})
  195. except Exception as e:
  196. return server_error_response(e)
  197. @manager.route("/export", methods=["POST"]) # noqa: F821
  198. @login_required
  199. @validate_request("mcp_ids")
  200. def export_multiple() -> Response:
  201. req = request.get_json()
  202. mcp_ids = req.get("mcp_ids", [])
  203. if not mcp_ids:
  204. return get_data_error_result(message="No MCP server IDs provided.")
  205. try:
  206. exported_servers = {}
  207. for mcp_id in mcp_ids:
  208. e, mcp_server = MCPServerService.get_by_id(mcp_id)
  209. if e and mcp_server.tenant_id == current_user.id:
  210. server_key = mcp_server.name
  211. exported_servers[server_key] = {
  212. "type": mcp_server.server_type,
  213. "url": mcp_server.url,
  214. "name": mcp_server.name,
  215. "authorization_token": mcp_server.variables.get("authorization_token", ""),
  216. "tools": mcp_server.variables.get("tools", {}),
  217. }
  218. return get_json_result(data={"mcpServers": exported_servers})
  219. except Exception as e:
  220. return server_error_response(e)
  221. @manager.route("/list_tools", methods=["POST"]) # noqa: F821
  222. @login_required
  223. @validate_request("mcp_ids")
  224. def list_tools() -> Response:
  225. req = request.get_json()
  226. mcp_ids = req.get("mcp_ids", [])
  227. if not mcp_ids:
  228. return get_data_error_result(message="No MCP server IDs provided.")
  229. timeout = get_float(req, "timeout", 10)
  230. results = {}
  231. tool_call_sessions = []
  232. try:
  233. for mcp_id in mcp_ids:
  234. e, mcp_server = MCPServerService.get_by_id(mcp_id)
  235. if e and mcp_server.tenant_id == current_user.id:
  236. server_key = mcp_server.id
  237. cached_tools = mcp_server.variables.get("tools", {})
  238. tool_call_session = MCPToolCallSession(mcp_server, mcp_server.variables)
  239. tool_call_sessions.append(tool_call_session)
  240. try:
  241. tools = tool_call_session.get_tools(timeout)
  242. except Exception as e:
  243. tools = []
  244. return get_data_error_result(message=f"MCP list tools error: {e}")
  245. results[server_key] = []
  246. for tool in tools:
  247. tool_dict = tool.model_dump()
  248. cached_tool = cached_tools.get(tool_dict["name"], {})
  249. tool_dict["enabled"] = cached_tool.get("enabled", True)
  250. results[server_key].append(tool_dict)
  251. return get_json_result(data=results)
  252. except Exception as e:
  253. return server_error_response(e)
  254. finally:
  255. # PERF: blocking call to close sessions — consider moving to background thread or task queue
  256. close_multiple_mcp_toolcall_sessions(tool_call_sessions)
  257. @manager.route("/test_tool", methods=["POST"]) # noqa: F821
  258. @login_required
  259. @validate_request("mcp_id", "tool_name", "arguments")
  260. def test_tool() -> Response:
  261. req = request.get_json()
  262. mcp_id = req.get("mcp_id", "")
  263. if not mcp_id:
  264. return get_data_error_result(message="No MCP server ID provided.")
  265. timeout = get_float(req, "timeout", 10)
  266. tool_name = req.get("tool_name", "")
  267. arguments = req.get("arguments", {})
  268. if not all([tool_name, arguments]):
  269. return get_data_error_result(message="Require provide tool name and arguments.")
  270. tool_call_sessions = []
  271. try:
  272. e, mcp_server = MCPServerService.get_by_id(mcp_id)
  273. if not e or mcp_server.tenant_id != current_user.id:
  274. return get_data_error_result(message=f"Cannot find MCP server {mcp_id} for user {current_user.id}")
  275. tool_call_session = MCPToolCallSession(mcp_server, mcp_server.variables)
  276. tool_call_sessions.append(tool_call_session)
  277. result = tool_call_session.tool_call(tool_name, arguments, timeout)
  278. # PERF: blocking call to close sessions — consider moving to background thread or task queue
  279. close_multiple_mcp_toolcall_sessions(tool_call_sessions)
  280. return get_json_result(data=result)
  281. except Exception as e:
  282. return server_error_response(e)
  283. @manager.route("/cache_tools", methods=["POST"]) # noqa: F821
  284. @login_required
  285. @validate_request("mcp_id", "tools")
  286. def cache_tool() -> Response:
  287. req = request.get_json()
  288. mcp_id = req.get("mcp_id", "")
  289. if not mcp_id:
  290. return get_data_error_result(message="No MCP server ID provided.")
  291. tools = req.get("tools", [])
  292. e, mcp_server = MCPServerService.get_by_id(mcp_id)
  293. if not e or mcp_server.tenant_id != current_user.id:
  294. return get_data_error_result(message=f"Cannot find MCP server {mcp_id} for user {current_user.id}")
  295. variables = mcp_server.variables
  296. tools = {tool["name"]: tool for tool in tools if isinstance(tool, dict) and "name" in tool}
  297. variables["tools"] = tools
  298. if not MCPServerService.filter_update([MCPServer.id == mcp_id, MCPServer.tenant_id == current_user.id], {"variables": variables}):
  299. return get_data_error_result(message="Failed to updated MCP server.")
  300. return get_json_result(data=tools)
  301. @manager.route("/test_mcp", methods=["POST"]) # noqa: F821
  302. @validate_request("url", "server_type")
  303. def test_mcp() -> Response:
  304. req = request.get_json()
  305. url = req.get("url", "")
  306. if not url:
  307. return get_data_error_result(message="Invaild MCP url.")
  308. server_type = req.get("server_type", "")
  309. if server_type not in VALID_MCP_SERVER_TYPES:
  310. return get_data_error_result(message="Unsupported MCP server type.")
  311. timeout = get_float(req, "timeout", 10)
  312. headers = safe_json_parse(req.get("headers", {}))
  313. variables = safe_json_parse(req.get("variables", {}))
  314. mcp_server = MCPServer(id=f"{server_type}: {url}", server_type=server_type, url=url, headers=headers, variables=variables)
  315. result = []
  316. try:
  317. tool_call_session = MCPToolCallSession(mcp_server, mcp_server.variables)
  318. try:
  319. tools = tool_call_session.get_tools(timeout)
  320. except Exception as e:
  321. tools = []
  322. return get_data_error_result(message=f"Test MCP error: {e}")
  323. finally:
  324. # PERF: blocking call to close sessions — consider moving to background thread or task queue
  325. close_multiple_mcp_toolcall_sessions([tool_call_session])
  326. for tool in tools:
  327. tool_dict = tool.model_dump()
  328. tool_dict["enabled"] = True
  329. result.append(tool_dict)
  330. return get_json_result(data=result)
  331. except Exception as e:
  332. return server_error_response(e)