| @@ -7,4 +7,5 @@ def handle(sender, **kwargs): | |||
| document_id = sender | |||
| dataset_id = kwargs.get('dataset_id') | |||
| doc_form = kwargs.get('doc_form') | |||
| clean_document_task.delay(document_id, dataset_id, doc_form) | |||
| file_id = kwargs.get('file_id') | |||
| clean_document_task.delay(document_id, dataset_id, doc_form, file_id) | |||
| @@ -524,7 +524,14 @@ class DocumentService: | |||
| @staticmethod | |||
| def delete_document(document): | |||
| # trigger document_was_deleted signal | |||
| document_was_deleted.send(document.id, dataset_id=document.dataset_id, doc_form=document.doc_form) | |||
| file_id = None | |||
| if document.data_source_type == 'upload_file': | |||
| if document.data_source_info: | |||
| data_source_info = document.data_source_info_dict | |||
| if data_source_info and 'upload_file_id' in data_source_info: | |||
| file_id = data_source_info['upload_file_id'] | |||
| document_was_deleted.send(document.id, dataset_id=document.dataset_id, | |||
| doc_form=document.doc_form, file_id=file_id) | |||
| db.session.delete(document) | |||
| db.session.commit() | |||
| @@ -6,6 +6,7 @@ from celery import shared_task | |||
| from core.rag.index_processor.index_processor_factory import IndexProcessorFactory | |||
| from extensions.ext_database import db | |||
| from extensions.ext_storage import storage | |||
| from models.dataset import ( | |||
| AppDatasetJoin, | |||
| Dataset, | |||
| @@ -14,6 +15,7 @@ from models.dataset import ( | |||
| Document, | |||
| DocumentSegment, | |||
| ) | |||
| from models.model import UploadFile | |||
| # Add import statement for ValueError | |||
| @@ -65,8 +67,27 @@ def clean_dataset_task(dataset_id: str, tenant_id: str, indexing_technique: str, | |||
| db.session.query(DatasetQuery).filter(DatasetQuery.dataset_id == dataset_id).delete() | |||
| db.session.query(AppDatasetJoin).filter(AppDatasetJoin.dataset_id == dataset_id).delete() | |||
| db.session.commit() | |||
| # delete files | |||
| if documents: | |||
| for document in documents: | |||
| try: | |||
| if document.data_source_type == 'upload_file': | |||
| if document.data_source_info: | |||
| data_source_info = document.data_source_info_dict | |||
| if data_source_info and 'upload_file_id' in data_source_info: | |||
| file_id = data_source_info['upload_file_id'] | |||
| file = db.session.query(UploadFile).filter( | |||
| UploadFile.tenant_id == document.tenant_id, | |||
| UploadFile.id == file_id | |||
| ).first() | |||
| if not file: | |||
| continue | |||
| storage.delete(file.key) | |||
| db.session.delete(file) | |||
| except Exception: | |||
| continue | |||
| db.session.commit() | |||
| end_at = time.perf_counter() | |||
| logging.info( | |||
| click.style('Cleaned dataset when dataset deleted: {} latency: {}'.format(dataset_id, end_at - start_at), fg='green')) | |||
| @@ -1,21 +1,25 @@ | |||
| import logging | |||
| import time | |||
| from typing import Optional | |||
| import click | |||
| from celery import shared_task | |||
| from core.rag.index_processor.index_processor_factory import IndexProcessorFactory | |||
| from extensions.ext_database import db | |||
| from extensions.ext_storage import storage | |||
| from models.dataset import Dataset, DocumentSegment | |||
| from models.model import UploadFile | |||
| @shared_task(queue='dataset') | |||
| def clean_document_task(document_id: str, dataset_id: str, doc_form: str): | |||
| def clean_document_task(document_id: str, dataset_id: str, doc_form: str, file_id: Optional[str]): | |||
| """ | |||
| Clean document when document deleted. | |||
| :param document_id: document id | |||
| :param dataset_id: dataset id | |||
| :param doc_form: doc_form | |||
| :param file_id: file id | |||
| Usage: clean_document_task.delay(document_id, dataset_id) | |||
| """ | |||
| @@ -39,8 +43,20 @@ def clean_document_task(document_id: str, dataset_id: str, doc_form: str): | |||
| db.session.delete(segment) | |||
| db.session.commit() | |||
| end_at = time.perf_counter() | |||
| logging.info( | |||
| click.style('Cleaned document when document deleted: {} latency: {}'.format(document_id, end_at - start_at), fg='green')) | |||
| if file_id: | |||
| file = db.session.query(UploadFile).filter( | |||
| UploadFile.id == file_id | |||
| ).first() | |||
| if file: | |||
| try: | |||
| storage.delete(file.key) | |||
| except Exception: | |||
| logging.exception("Delete file failed when document deleted, file_id: {}".format(file_id)) | |||
| db.session.delete(file) | |||
| db.session.commit() | |||
| end_at = time.perf_counter() | |||
| logging.info( | |||
| click.style('Cleaned document when document deleted: {} latency: {}'.format(document_id, end_at - start_at), fg='green')) | |||
| except Exception: | |||
| logging.exception("Cleaned document when document deleted failed") | |||