| import requests | import requests | ||||
| from configs import dify_config | from configs import dify_config | ||||
| from services.rag_pipeline.pipeline_template.database.database_retrieval import DatabasePipelineTemplateRetrieval | |||||
| from services.rag_pipeline.pipeline_template.pipeline_template_base import PipelineTemplateRetrievalBase | from services.rag_pipeline.pipeline_template.pipeline_template_base import PipelineTemplateRetrievalBase | ||||
| from services.rag_pipeline.pipeline_template.pipeline_template_type import PipelineTemplateType | from services.rag_pipeline.pipeline_template.pipeline_template_type import PipelineTemplateType | ||||
| from services.recommend_app.buildin.buildin_retrieval import BuildInRecommendAppRetrieval | from services.recommend_app.buildin.buildin_retrieval import BuildInRecommendAppRetrieval | ||||
| try: | try: | ||||
| result = self.fetch_pipeline_template_detail_from_dify_official(template_id) | result = self.fetch_pipeline_template_detail_from_dify_official(template_id) | ||||
| except Exception as e: | except Exception as e: | ||||
| logger.warning("fetch recommended app detail from dify official failed: %r, switch to built-in.", e) | |||||
| result = BuildInRecommendAppRetrieval.fetch_recommended_app_detail_from_builtin(template_id) | |||||
| logger.warning("fetch recommended app detail from dify official failed: %r, switch to database.", e) | |||||
| result = [DatabasePipelineTemplateRetrieval.fetch_pipeline_template_detail_from_db(template_id)] | |||||
| return result | return result | ||||
| def get_pipeline_templates(self, language: str) -> dict: | def get_pipeline_templates(self, language: str) -> dict: | ||||
| try: | try: | ||||
| result = self.fetch_pipeline_templates_from_dify_official(language) | result = self.fetch_pipeline_templates_from_dify_official(language) | ||||
| except Exception as e: | except Exception as e: | ||||
| logger.warning("fetch pipeline templates from dify official failed: %r, switch to built-in.", e) | |||||
| result = BuildInRecommendAppRetrieval.fetch_recommended_apps_from_builtin(language) | |||||
| logger.warning("fetch pipeline templates from dify official failed: %r, switch to database.", e) | |||||
| result = DatabasePipelineTemplateRetrieval.fetch_pipeline_templates_from_db(language) | |||||
| return result | return result | ||||
| def get_type(self) -> str: | def get_type(self) -> str: |