| results = {} | results = {} | ||||
| with ThreadPoolExecutor() as executor: | with ThreadPoolExecutor() as executor: | ||||
| futures = {} | futures = {} | ||||
| for tools in grouped_tools.values(): | |||||
| # Only query the first tool in each group | |||||
| first_tool = tools[0] | |||||
| for tool in external_data_tools: | |||||
| if not tool.get("enabled"): | |||||
| continue | |||||
| future = executor.submit( | future = executor.submit( | ||||
| cls.query_external_data_tool, current_app._get_current_object(), tenant_id, app_id, first_tool, | |||||
| cls.query_external_data_tool, current_app._get_current_object(), tenant_id, app_id, tool, | |||||
| inputs, query | inputs, query | ||||
| ) | ) | ||||
| for tool in tools: | |||||
| futures[future] = tool | |||||
| futures[future] = tool | |||||
| for future in concurrent.futures.as_completed(futures): | for future in concurrent.futures.as_completed(futures): | ||||
| tool_key, result = future.result() | |||||
| if tool_key in grouped_tools: | |||||
| for tool in grouped_tools[tool_key]: | |||||
| results[tool['variable']] = result | |||||
| tool_variable, result = future.result() | |||||
| results[tool_variable] = result | |||||
| inputs.update(results) | inputs.update(results) | ||||
| return inputs | return inputs | ||||
| query=query | query=query | ||||
| ) | ) | ||||
| tool_key = (external_data_tool.get("type"), json.dumps(external_data_tool.get("config"), sort_keys=True)) | |||||
| return tool_key, result | |||||
| return tool_variable, result | |||||
| @classmethod | @classmethod | ||||
| def get_query_for_agent(cls, app: App, app_model_config: AppModelConfig, query: str, inputs: dict) -> str: | def get_query_for_agent(cls, app: App, app_model_config: AppModelConfig, query: str, inputs: dict) -> str: |