| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677 | 
							- from typing import Optional
 - 
 - import yaml
 - 
 - from extensions.ext_database import db
 - from models.dataset import PipelineBuiltInTemplate
 - from services.rag_pipeline.pipeline_template.pipeline_template_base import PipelineTemplateRetrievalBase
 - from services.rag_pipeline.pipeline_template.pipeline_template_type import PipelineTemplateType
 - 
 - 
 - class DatabasePipelineTemplateRetrieval(PipelineTemplateRetrievalBase):
 -     """
 -     Retrieval pipeline   template from database
 -     """
 - 
 -     def get_pipeline_templates(self, language: str) -> dict:
 -         result = self.fetch_pipeline_templates_from_db(language)
 -         return result
 - 
 -     def get_pipeline_template_detail(self, pipeline_id: str):
 -         result = self.fetch_pipeline_template_detail_from_db(pipeline_id)
 -         return result
 - 
 -     def get_type(self) -> str:
 -         return PipelineTemplateType.DATABASE
 - 
 -     @classmethod
 -     def fetch_pipeline_templates_from_db(cls, language: str) -> dict:
 -         """
 -         Fetch pipeline templates from db.
 -         :param language: language
 -         :return:
 -         """
 - 
 -         pipeline_built_in_templates: list[PipelineBuiltInTemplate] = (
 -             db.session.query(PipelineBuiltInTemplate).filter(PipelineBuiltInTemplate.language == language).all()
 -         )
 - 
 -         recommended_pipelines_results = []
 -         for pipeline_built_in_template in pipeline_built_in_templates:
 - 
 -             recommended_pipeline_result = {
 -                 "id": pipeline_built_in_template.id,
 -                 "name": pipeline_built_in_template.name,
 -                 "description": pipeline_built_in_template.description,
 -                 "icon": pipeline_built_in_template.icon,
 -                 "copyright": pipeline_built_in_template.copyright,
 -                 "privacy_policy": pipeline_built_in_template.privacy_policy,
 -                 "position": pipeline_built_in_template.position,
 -                 "chunk_structure": pipeline_built_in_template.chunk_structure,
 -             }
 -             recommended_pipelines_results.append(recommended_pipeline_result)
 - 
 -         return {"pipeline_templates": recommended_pipelines_results}
 - 
 -     @classmethod
 -     def fetch_pipeline_template_detail_from_db(cls, pipeline_id: str) -> Optional[dict]:
 -         """
 -         Fetch pipeline template detail from db.
 -         :param pipeline_id: Pipeline ID
 -         :return:
 -         """
 -         # is in public recommended list
 -         pipeline_template = (
 -             db.session.query(PipelineBuiltInTemplate).filter(PipelineBuiltInTemplate.id == pipeline_id).first()
 -         )
 - 
 -         if not pipeline_template:
 -             return None
 - 
 -         return {
 -             "id": pipeline_template.id,
 -             "name": pipeline_template.name,
 -             "icon": pipeline_template.icon,
 -             "chunk_structure": pipeline_template.chunk_structure,
 -             "export_data": yaml.safe_load(pipeline_template.yaml_content),
 -         }
 
 
  |