| @@ -21,7 +21,7 @@ def add_document_to_index_task(dataset_document_id: str): | |||
| Async Add document to index | |||
| :param dataset_document_id: | |||
| Usage: add_document_to_index.delay(dataset_document_id) | |||
| Usage: add_document_to_index_task.delay(dataset_document_id) | |||
| """ | |||
| logging.info(click.style("Start add document to index: {}".format(dataset_document_id), fg="green")) | |||
| start_at = time.perf_counter() | |||
| @@ -21,7 +21,7 @@ def batch_clean_document_task(document_ids: list[str], dataset_id: str, doc_form | |||
| :param doc_form: doc_form | |||
| :param file_ids: file ids | |||
| Usage: clean_document_task.delay(document_id, dataset_id) | |||
| Usage: batch_clean_document_task.delay(document_ids, dataset_id) | |||
| """ | |||
| logging.info(click.style("Start batch clean documents when documents deleted", fg="green")) | |||
| start_at = time.perf_counter() | |||
| @@ -35,7 +35,7 @@ def batch_create_segment_to_index_task( | |||
| :param tenant_id: | |||
| :param user_id: | |||
| Usage: batch_create_segment_to_index_task.delay(segment_id) | |||
| Usage: batch_create_segment_to_index_task.delay(job_id, content, dataset_id, document_id, tenant_id, user_id) | |||
| """ | |||
| logging.info(click.style("Start batch create segment jobId: {}".format(job_id), fg="green")) | |||
| start_at = time.perf_counter() | |||
| @@ -17,7 +17,7 @@ def delete_segment_from_index_task(index_node_ids: list, dataset_id: str, docume | |||
| :param dataset_id: | |||
| :param document_id: | |||
| Usage: delete_segment_from_index_task.delay(segment_ids) | |||
| Usage: delete_segment_from_index_task.delay(index_node_ids, dataset_id, document_id) | |||
| """ | |||
| logging.info(click.style("Start delete segment from index", fg="green")) | |||
| start_at = time.perf_counter() | |||
| @@ -15,7 +15,9 @@ from models.dataset import Document as DatasetDocument | |||
| def disable_segments_from_index_task(segment_ids: list, dataset_id: str, document_id: str): | |||
| """ | |||
| Async disable segments from index | |||
| :param segment_ids: | |||
| :param segment_ids: list of segment ids | |||
| :param dataset_id: dataset id | |||
| :param document_id: document id | |||
| Usage: disable_segments_from_index_task.delay(segment_ids, dataset_id, document_id) | |||
| """ | |||
| @@ -19,7 +19,7 @@ def document_indexing_task(dataset_id: str, document_ids: list): | |||
| :param dataset_id: | |||
| :param document_ids: | |||
| Usage: document_indexing_task.delay(dataset_id, document_id) | |||
| Usage: document_indexing_task.delay(dataset_id, document_ids) | |||
| """ | |||
| documents = [] | |||
| start_at = time.perf_counter() | |||
| @@ -20,7 +20,7 @@ def duplicate_document_indexing_task(dataset_id: str, document_ids: list): | |||
| :param dataset_id: | |||
| :param document_ids: | |||
| Usage: duplicate_document_indexing_task.delay(dataset_id, document_id) | |||
| Usage: duplicate_document_indexing_task.delay(dataset_id, document_ids) | |||
| """ | |||
| documents = [] | |||
| start_at = time.perf_counter() | |||
| @@ -18,9 +18,11 @@ from models.dataset import Document as DatasetDocument | |||
| def enable_segments_to_index_task(segment_ids: list, dataset_id: str, document_id: str): | |||
| """ | |||
| Async enable segments to index | |||
| :param segment_ids: | |||
| :param segment_ids: list of segment ids | |||
| :param dataset_id: dataset id | |||
| :param document_id: document id | |||
| Usage: enable_segments_to_index_task.delay(segment_ids) | |||
| Usage: enable_segments_to_index_task.delay(segment_ids, dataset_id, document_id) | |||
| """ | |||
| start_at = time.perf_counter() | |||
| dataset = db.session.query(Dataset).filter(Dataset.id == dataset_id).first() | |||
| @@ -1,91 +0,0 @@ | |||
| import json | |||
| import logging | |||
| import time | |||
| import click | |||
| from celery import shared_task # type: ignore | |||
| from core.indexing_runner import DocumentIsPausedError | |||
| from extensions.ext_database import db | |||
| from extensions.ext_storage import storage | |||
| from models.dataset import Dataset, ExternalKnowledgeApis | |||
| from models.model import UploadFile | |||
| from services.external_knowledge_service import ExternalDatasetService | |||
| @shared_task(queue="dataset") | |||
| def external_document_indexing_task( | |||
| dataset_id: str, external_knowledge_api_id: str, data_source: dict, process_parameter: dict | |||
| ): | |||
| """ | |||
| Async process document | |||
| :param dataset_id: | |||
| :param external_knowledge_api_id: | |||
| :param data_source: | |||
| :param process_parameter: | |||
| Usage: external_document_indexing_task.delay(dataset_id, document_id) | |||
| """ | |||
| start_at = time.perf_counter() | |||
| dataset = db.session.query(Dataset).filter(Dataset.id == dataset_id).first() | |||
| if not dataset: | |||
| logging.info( | |||
| click.style("Processed external dataset: {} failed, dataset not exit.".format(dataset_id), fg="red") | |||
| ) | |||
| return | |||
| # get external api template | |||
| external_knowledge_api = ( | |||
| db.session.query(ExternalKnowledgeApis) | |||
| .filter( | |||
| ExternalKnowledgeApis.id == external_knowledge_api_id, ExternalKnowledgeApis.tenant_id == dataset.tenant_id | |||
| ) | |||
| .first() | |||
| ) | |||
| if not external_knowledge_api: | |||
| logging.info( | |||
| click.style( | |||
| "Processed external dataset: {} failed, api template: {} not exit.".format( | |||
| dataset_id, external_knowledge_api_id | |||
| ), | |||
| fg="red", | |||
| ) | |||
| ) | |||
| return | |||
| files = {} | |||
| if data_source["type"] == "upload_file": | |||
| upload_file_list = data_source["info_list"]["file_info_list"]["file_ids"] | |||
| for file_id in upload_file_list: | |||
| file = ( | |||
| db.session.query(UploadFile) | |||
| .filter(UploadFile.tenant_id == dataset.tenant_id, UploadFile.id == file_id) | |||
| .first() | |||
| ) | |||
| if file: | |||
| files[file.id] = (file.name, storage.load_once(file.key), file.mime_type) | |||
| try: | |||
| settings = ExternalDatasetService.get_external_knowledge_api_settings( | |||
| json.loads(external_knowledge_api.settings) | |||
| ) | |||
| # do http request | |||
| response = ExternalDatasetService.process_external_api(settings, files) | |||
| job_id = response.json().get("job_id") | |||
| if job_id: | |||
| # save job_id to dataset | |||
| dataset.job_id = job_id | |||
| db.session.commit() | |||
| end_at = time.perf_counter() | |||
| logging.info( | |||
| click.style( | |||
| "Processed external dataset: {} successful, latency: {}".format(dataset.id, end_at - start_at), | |||
| fg="green", | |||
| ) | |||
| ) | |||
| except DocumentIsPausedError as ex: | |||
| logging.info(click.style(str(ex), fg="yellow")) | |||
| except Exception: | |||
| pass | |||
| @@ -20,7 +20,7 @@ def retry_document_indexing_task(dataset_id: str, document_ids: list[str]): | |||
| :param dataset_id: | |||
| :param document_ids: | |||
| Usage: retry_document_indexing_task.delay(dataset_id, document_id) | |||
| Usage: retry_document_indexing_task.delay(dataset_id, document_ids) | |||
| """ | |||
| documents: list[Document] = [] | |||
| start_at = time.perf_counter() | |||