| if db_model is not None: | if db_model is not None: | ||||
| offload_data = db_model.offload_data | offload_data = db_model.offload_data | ||||
| else: | else: | ||||
| db_model = self._to_db_model(domain_model) | db_model = self._to_db_model(domain_model) | ||||
| offload_data = [] | |||||
| offload_data = db_model.offload_data | |||||
| offload_data = db_model.offload_data | |||||
| if domain_model.inputs is not None: | if domain_model.inputs is not None: | ||||
| result = self._truncate_and_upload( | result = self._truncate_and_upload( | ||||
| domain_model.inputs, | domain_model.inputs, |
| self.logger.debug("Abort command sent to engine") | self.logger.debug("Abort command sent to engine") | ||||
| except Exception: | except Exception: | ||||
| self.logger.exception("Failed to send abort command: %s") | |||||
| self.logger.exception("Failed to send abort command") |
| """ | """ | ||||
| Retry error document | Retry error document | ||||
| """ | """ | ||||
| document_pipeline_excution_log = ( | |||||
| document_pipeline_execution_log = ( | |||||
| db.session.query(DocumentPipelineExecutionLog) | db.session.query(DocumentPipelineExecutionLog) | ||||
| .where(DocumentPipelineExecutionLog.document_id == document.id) | .where(DocumentPipelineExecutionLog.document_id == document.id) | ||||
| .first() | .first() | ||||
| ) | ) | ||||
| if not document_pipeline_excution_log: | |||||
| if not document_pipeline_execution_log: | |||||
| raise ValueError("Document pipeline execution log not found") | raise ValueError("Document pipeline execution log not found") | ||||
| pipeline = db.session.query(Pipeline).where(Pipeline.id == document_pipeline_excution_log.pipeline_id).first() | |||||
| pipeline = db.session.query(Pipeline).where(Pipeline.id == document_pipeline_execution_log.pipeline_id).first() | |||||
| if not pipeline: | if not pipeline: | ||||
| raise ValueError("Pipeline not found") | raise ValueError("Pipeline not found") | ||||
| # convert to app config | # convert to app config | ||||
| workflow=workflow, | workflow=workflow, | ||||
| user=user, | user=user, | ||||
| args={ | args={ | ||||
| "inputs": document_pipeline_excution_log.input_data, | |||||
| "start_node_id": document_pipeline_excution_log.datasource_node_id, | |||||
| "datasource_type": document_pipeline_excution_log.datasource_type, | |||||
| "datasource_info_list": [json.loads(document_pipeline_excution_log.datasource_info)], | |||||
| "inputs": document_pipeline_execution_log.input_data, | |||||
| "start_node_id": document_pipeline_execution_log.datasource_node_id, | |||||
| "datasource_type": document_pipeline_execution_log.datasource_type, | |||||
| "datasource_info_list": [json.loads(document_pipeline_execution_log.datasource_info)], | |||||
| "original_document_id": document.id, | "original_document_id": document.id, | ||||
| }, | }, | ||||
| invoke_from=InvokeFrom.PUBLISHED, | invoke_from=InvokeFrom.PUBLISHED, |
| indexing_technique = dataset.indexing_technique | indexing_technique = dataset.indexing_technique | ||||
| if not datasource_type and not indexing_technique: | if not datasource_type and not indexing_technique: | ||||
| return self._transfrom_to_empty_pipeline(dataset) | |||||
| return self._transform_to_empty_pipeline(dataset) | |||||
| doc_form = dataset.doc_form | doc_form = dataset.doc_form | ||||
| if not doc_form: | if not doc_form: | ||||
| return self._transfrom_to_empty_pipeline(dataset) | |||||
| return self._transform_to_empty_pipeline(dataset) | |||||
| retrieval_model = dataset.retrieval_model | retrieval_model = dataset.retrieval_model | ||||
| pipeline_yaml = self._get_transform_yaml(doc_form, datasource_type, indexing_technique) | pipeline_yaml = self._get_transform_yaml(doc_form, datasource_type, indexing_technique) | ||||
| # deal dependencies | # deal dependencies | ||||
| logger.debug("Installing missing pipeline plugins %s", need_install_plugin_unique_identifiers) | logger.debug("Installing missing pipeline plugins %s", need_install_plugin_unique_identifiers) | ||||
| PluginService.install_from_marketplace_pkg(tenant_id, need_install_plugin_unique_identifiers) | PluginService.install_from_marketplace_pkg(tenant_id, need_install_plugin_unique_identifiers) | ||||
| def _transfrom_to_empty_pipeline(self, dataset: Dataset): | |||||
| def _transform_to_empty_pipeline(self, dataset: Dataset): | |||||
| pipeline = Pipeline( | pipeline = Pipeline( | ||||
| tenant_id=dataset.tenant_id, | tenant_id=dataset.tenant_id, | ||||
| name=dataset.name, | name=dataset.name, |