| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495 | 
							- import concurrent
 - import json
 - import logging
 - from concurrent.futures import ThreadPoolExecutor
 - from typing import Optional
 - 
 - from flask import Flask, current_app
 - 
 - from core.entities.application_entities import ExternalDataVariableEntity
 - from core.external_data_tool.factory import ExternalDataToolFactory
 - 
 - logger = logging.getLogger(__name__)
 - 
 - 
 - class ExternalDataFetchFeature:
 -     def fetch(self, tenant_id: str,
 -               app_id: str,
 -               external_data_tools: list[ExternalDataVariableEntity],
 -               inputs: dict,
 -               query: str) -> dict:
 -         """
 -         Fill in variable inputs from external data tools if exists.
 - 
 -         :param tenant_id: workspace id
 -         :param app_id: app id
 -         :param external_data_tools: external data tools configs
 -         :param inputs: the inputs
 -         :param query: the query
 -         :return: the filled inputs
 -         """
 -         # Group tools by type and config
 -         grouped_tools = {}
 -         for tool in external_data_tools:
 -             tool_key = (tool.type, json.dumps(tool.config, sort_keys=True))
 -             grouped_tools.setdefault(tool_key, []).append(tool)
 - 
 -         results = {}
 -         with ThreadPoolExecutor() as executor:
 -             futures = {}
 -             for tool in external_data_tools:
 -                 future = executor.submit(
 -                     self._query_external_data_tool,
 -                     current_app._get_current_object(),
 -                     tenant_id,
 -                     app_id,
 -                     tool,
 -                     inputs,
 -                     query
 -                 )
 - 
 -                 futures[future] = tool
 - 
 -             for future in concurrent.futures.as_completed(futures):
 -                 tool_variable, result = future.result()
 -                 results[tool_variable] = result
 - 
 -         inputs.update(results)
 -         return inputs
 - 
 -     def _query_external_data_tool(self, flask_app: Flask,
 -                                   tenant_id: str,
 -                                   app_id: str,
 -                                   external_data_tool: ExternalDataVariableEntity,
 -                                   inputs: dict,
 -                                   query: str) -> tuple[Optional[str], Optional[str]]:
 -         """
 -         Query external data tool.
 -         :param flask_app: flask app
 -         :param tenant_id: tenant id
 -         :param app_id: app id
 -         :param external_data_tool: external data tool
 -         :param inputs: inputs
 -         :param query: query
 -         :return:
 -         """
 -         with flask_app.app_context():
 -             tool_variable = external_data_tool.variable
 -             tool_type = external_data_tool.type
 -             tool_config = external_data_tool.config
 - 
 -             external_data_tool_factory = ExternalDataToolFactory(
 -                 name=tool_type,
 -                 tenant_id=tenant_id,
 -                 app_id=app_id,
 -                 variable=tool_variable,
 -                 config=tool_config
 -             )
 - 
 -             # query external data tool
 -             result = external_data_tool_factory.query(
 -                 inputs=inputs,
 -                 query=query
 -             )
 - 
 -             return tool_variable, result
 
 
  |