| "models/**/*.py", | "models/**/*.py", | ||||
| "migrations/**/*", | "migrations/**/*", | ||||
| "services/**/*.py", | "services/**/*.py", | ||||
| "tasks/**/*.py", | |||||
| ] | ] | ||||
| [tool.pytest_env] | [tool.pytest_env] |
| from models.dataset import DocumentSegment | from models.dataset import DocumentSegment | ||||
| @shared_task(queue='dataset') | |||||
| @shared_task(queue="dataset") | |||||
| def add_document_to_index_task(dataset_document_id: str): | def add_document_to_index_task(dataset_document_id: str): | ||||
| """ | """ | ||||
| Async Add document to index | Async Add document to index | ||||
| Usage: add_document_to_index.delay(document_id) | Usage: add_document_to_index.delay(document_id) | ||||
| """ | """ | ||||
| logging.info(click.style('Start add document to index: {}'.format(dataset_document_id), fg='green')) | |||||
| logging.info(click.style("Start add document to index: {}".format(dataset_document_id), fg="green")) | |||||
| start_at = time.perf_counter() | start_at = time.perf_counter() | ||||
| dataset_document = db.session.query(DatasetDocument).filter(DatasetDocument.id == dataset_document_id).first() | dataset_document = db.session.query(DatasetDocument).filter(DatasetDocument.id == dataset_document_id).first() | ||||
| if not dataset_document: | if not dataset_document: | ||||
| raise NotFound('Document not found') | |||||
| raise NotFound("Document not found") | |||||
| if dataset_document.indexing_status != 'completed': | |||||
| if dataset_document.indexing_status != "completed": | |||||
| return | return | ||||
| indexing_cache_key = 'document_{}_indexing'.format(dataset_document.id) | |||||
| indexing_cache_key = "document_{}_indexing".format(dataset_document.id) | |||||
| try: | try: | ||||
| segments = db.session.query(DocumentSegment).filter( | |||||
| DocumentSegment.document_id == dataset_document.id, | |||||
| DocumentSegment.enabled == True | |||||
| ) \ | |||||
| .order_by(DocumentSegment.position.asc()).all() | |||||
| segments = ( | |||||
| db.session.query(DocumentSegment) | |||||
| .filter(DocumentSegment.document_id == dataset_document.id, DocumentSegment.enabled == True) | |||||
| .order_by(DocumentSegment.position.asc()) | |||||
| .all() | |||||
| ) | |||||
| documents = [] | documents = [] | ||||
| for segment in segments: | for segment in segments: | ||||
| "doc_hash": segment.index_node_hash, | "doc_hash": segment.index_node_hash, | ||||
| "document_id": segment.document_id, | "document_id": segment.document_id, | ||||
| "dataset_id": segment.dataset_id, | "dataset_id": segment.dataset_id, | ||||
| } | |||||
| }, | |||||
| ) | ) | ||||
| documents.append(document) | documents.append(document) | ||||
| dataset = dataset_document.dataset | dataset = dataset_document.dataset | ||||
| if not dataset: | if not dataset: | ||||
| raise Exception('Document has no dataset') | |||||
| raise Exception("Document has no dataset") | |||||
| index_type = dataset.doc_form | index_type = dataset.doc_form | ||||
| index_processor = IndexProcessorFactory(index_type).init_index_processor() | index_processor = IndexProcessorFactory(index_type).init_index_processor() | ||||
| end_at = time.perf_counter() | end_at = time.perf_counter() | ||||
| logging.info( | logging.info( | ||||
| click.style('Document added to index: {} latency: {}'.format(dataset_document.id, end_at - start_at), fg='green')) | |||||
| click.style( | |||||
| "Document added to index: {} latency: {}".format(dataset_document.id, end_at - start_at), fg="green" | |||||
| ) | |||||
| ) | |||||
| except Exception as e: | except Exception as e: | ||||
| logging.exception("add document to index failed") | logging.exception("add document to index failed") | ||||
| dataset_document.enabled = False | dataset_document.enabled = False | ||||
| dataset_document.disabled_at = datetime.datetime.now(datetime.timezone.utc).replace(tzinfo=None) | dataset_document.disabled_at = datetime.datetime.now(datetime.timezone.utc).replace(tzinfo=None) | ||||
| dataset_document.status = 'error' | |||||
| dataset_document.status = "error" | |||||
| dataset_document.error = str(e) | dataset_document.error = str(e) | ||||
| db.session.commit() | db.session.commit() | ||||
| finally: | finally: |
| from services.dataset_service import DatasetCollectionBindingService | from services.dataset_service import DatasetCollectionBindingService | ||||
| @shared_task(queue='dataset') | |||||
| def add_annotation_to_index_task(annotation_id: str, question: str, tenant_id: str, app_id: str, | |||||
| collection_binding_id: str): | |||||
| @shared_task(queue="dataset") | |||||
| def add_annotation_to_index_task( | |||||
| annotation_id: str, question: str, tenant_id: str, app_id: str, collection_binding_id: str | |||||
| ): | |||||
| """ | """ | ||||
| Add annotation to index. | Add annotation to index. | ||||
| :param annotation_id: annotation id | :param annotation_id: annotation id | ||||
| Usage: clean_dataset_task.delay(dataset_id, tenant_id, indexing_technique, index_struct) | Usage: clean_dataset_task.delay(dataset_id, tenant_id, indexing_technique, index_struct) | ||||
| """ | """ | ||||
| logging.info(click.style('Start build index for annotation: {}'.format(annotation_id), fg='green')) | |||||
| logging.info(click.style("Start build index for annotation: {}".format(annotation_id), fg="green")) | |||||
| start_at = time.perf_counter() | start_at = time.perf_counter() | ||||
| try: | try: | ||||
| dataset_collection_binding = DatasetCollectionBindingService.get_dataset_collection_binding_by_id_and_type( | dataset_collection_binding = DatasetCollectionBindingService.get_dataset_collection_binding_by_id_and_type( | ||||
| collection_binding_id, | |||||
| 'annotation' | |||||
| collection_binding_id, "annotation" | |||||
| ) | ) | ||||
| dataset = Dataset( | dataset = Dataset( | ||||
| id=app_id, | id=app_id, | ||||
| tenant_id=tenant_id, | tenant_id=tenant_id, | ||||
| indexing_technique='high_quality', | |||||
| indexing_technique="high_quality", | |||||
| embedding_model_provider=dataset_collection_binding.provider_name, | embedding_model_provider=dataset_collection_binding.provider_name, | ||||
| embedding_model=dataset_collection_binding.model_name, | embedding_model=dataset_collection_binding.model_name, | ||||
| collection_binding_id=dataset_collection_binding.id | |||||
| collection_binding_id=dataset_collection_binding.id, | |||||
| ) | ) | ||||
| document = Document( | document = Document( | ||||
| page_content=question, | |||||
| metadata={ | |||||
| "annotation_id": annotation_id, | |||||
| "app_id": app_id, | |||||
| "doc_id": annotation_id | |||||
| } | |||||
| page_content=question, metadata={"annotation_id": annotation_id, "app_id": app_id, "doc_id": annotation_id} | |||||
| ) | ) | ||||
| vector = Vector(dataset, attributes=['doc_id', 'annotation_id', 'app_id']) | |||||
| vector = Vector(dataset, attributes=["doc_id", "annotation_id", "app_id"]) | |||||
| vector.create([document], duplicate_check=True) | vector.create([document], duplicate_check=True) | ||||
| end_at = time.perf_counter() | end_at = time.perf_counter() | ||||
| logging.info( | logging.info( | ||||
| click.style( | click.style( | ||||
| 'Build index successful for annotation: {} latency: {}'.format(annotation_id, end_at - start_at), | |||||
| fg='green')) | |||||
| "Build index successful for annotation: {} latency: {}".format(annotation_id, end_at - start_at), | |||||
| fg="green", | |||||
| ) | |||||
| ) | |||||
| except Exception: | except Exception: | ||||
| logging.exception("Build index for annotation failed") | logging.exception("Build index for annotation failed") |
| from services.dataset_service import DatasetCollectionBindingService | from services.dataset_service import DatasetCollectionBindingService | ||||
| @shared_task(queue='dataset') | |||||
| def batch_import_annotations_task(job_id: str, content_list: list[dict], app_id: str, tenant_id: str, | |||||
| user_id: str): | |||||
| @shared_task(queue="dataset") | |||||
| def batch_import_annotations_task(job_id: str, content_list: list[dict], app_id: str, tenant_id: str, user_id: str): | |||||
| """ | """ | ||||
| Add annotation to index. | Add annotation to index. | ||||
| :param job_id: job_id | :param job_id: job_id | ||||
| :param user_id: user_id | :param user_id: user_id | ||||
| """ | """ | ||||
| logging.info(click.style('Start batch import annotation: {}'.format(job_id), fg='green')) | |||||
| logging.info(click.style("Start batch import annotation: {}".format(job_id), fg="green")) | |||||
| start_at = time.perf_counter() | start_at = time.perf_counter() | ||||
| indexing_cache_key = 'app_annotation_batch_import_{}'.format(str(job_id)) | |||||
| indexing_cache_key = "app_annotation_batch_import_{}".format(str(job_id)) | |||||
| # get app info | # get app info | ||||
| app = db.session.query(App).filter( | |||||
| App.id == app_id, | |||||
| App.tenant_id == tenant_id, | |||||
| App.status == 'normal' | |||||
| ).first() | |||||
| app = db.session.query(App).filter(App.id == app_id, App.tenant_id == tenant_id, App.status == "normal").first() | |||||
| if app: | if app: | ||||
| try: | try: | ||||
| documents = [] | documents = [] | ||||
| for content in content_list: | for content in content_list: | ||||
| annotation = MessageAnnotation( | annotation = MessageAnnotation( | ||||
| app_id=app.id, | |||||
| content=content['answer'], | |||||
| question=content['question'], | |||||
| account_id=user_id | |||||
| app_id=app.id, content=content["answer"], question=content["question"], account_id=user_id | |||||
| ) | ) | ||||
| db.session.add(annotation) | db.session.add(annotation) | ||||
| db.session.flush() | db.session.flush() | ||||
| document = Document( | document = Document( | ||||
| page_content=content['question'], | |||||
| metadata={ | |||||
| "annotation_id": annotation.id, | |||||
| "app_id": app_id, | |||||
| "doc_id": annotation.id | |||||
| } | |||||
| page_content=content["question"], | |||||
| metadata={"annotation_id": annotation.id, "app_id": app_id, "doc_id": annotation.id}, | |||||
| ) | ) | ||||
| documents.append(document) | documents.append(document) | ||||
| # if annotation reply is enabled , batch add annotations' index | # if annotation reply is enabled , batch add annotations' index | ||||
| app_annotation_setting = db.session.query(AppAnnotationSetting).filter( | |||||
| AppAnnotationSetting.app_id == app_id | |||||
| ).first() | |||||
| app_annotation_setting = ( | |||||
| db.session.query(AppAnnotationSetting).filter(AppAnnotationSetting.app_id == app_id).first() | |||||
| ) | |||||
| if app_annotation_setting: | if app_annotation_setting: | ||||
| dataset_collection_binding = DatasetCollectionBindingService.get_dataset_collection_binding_by_id_and_type( | |||||
| app_annotation_setting.collection_binding_id, | |||||
| 'annotation' | |||||
| dataset_collection_binding = ( | |||||
| DatasetCollectionBindingService.get_dataset_collection_binding_by_id_and_type( | |||||
| app_annotation_setting.collection_binding_id, "annotation" | |||||
| ) | |||||
| ) | ) | ||||
| if not dataset_collection_binding: | if not dataset_collection_binding: | ||||
| raise NotFound("App annotation setting not found") | raise NotFound("App annotation setting not found") | ||||
| dataset = Dataset( | dataset = Dataset( | ||||
| id=app_id, | id=app_id, | ||||
| tenant_id=tenant_id, | tenant_id=tenant_id, | ||||
| indexing_technique='high_quality', | |||||
| indexing_technique="high_quality", | |||||
| embedding_model_provider=dataset_collection_binding.provider_name, | embedding_model_provider=dataset_collection_binding.provider_name, | ||||
| embedding_model=dataset_collection_binding.model_name, | embedding_model=dataset_collection_binding.model_name, | ||||
| collection_binding_id=dataset_collection_binding.id | |||||
| collection_binding_id=dataset_collection_binding.id, | |||||
| ) | ) | ||||
| vector = Vector(dataset, attributes=['doc_id', 'annotation_id', 'app_id']) | |||||
| vector = Vector(dataset, attributes=["doc_id", "annotation_id", "app_id"]) | |||||
| vector.create(documents, duplicate_check=True) | vector.create(documents, duplicate_check=True) | ||||
| db.session.commit() | db.session.commit() | ||||
| redis_client.setex(indexing_cache_key, 600, 'completed') | |||||
| redis_client.setex(indexing_cache_key, 600, "completed") | |||||
| end_at = time.perf_counter() | end_at = time.perf_counter() | ||||
| logging.info( | logging.info( | ||||
| click.style( | click.style( | ||||
| 'Build index successful for batch import annotation: {} latency: {}'.format(job_id, end_at - start_at), | |||||
| fg='green')) | |||||
| "Build index successful for batch import annotation: {} latency: {}".format( | |||||
| job_id, end_at - start_at | |||||
| ), | |||||
| fg="green", | |||||
| ) | |||||
| ) | |||||
| except Exception as e: | except Exception as e: | ||||
| db.session.rollback() | db.session.rollback() | ||||
| redis_client.setex(indexing_cache_key, 600, 'error') | |||||
| indexing_error_msg_key = 'app_annotation_batch_import_error_msg_{}'.format(str(job_id)) | |||||
| redis_client.setex(indexing_cache_key, 600, "error") | |||||
| indexing_error_msg_key = "app_annotation_batch_import_error_msg_{}".format(str(job_id)) | |||||
| redis_client.setex(indexing_error_msg_key, 600, str(e)) | redis_client.setex(indexing_error_msg_key, 600, str(e)) | ||||
| logging.exception("Build index for batch import annotations failed") | logging.exception("Build index for batch import annotations failed") |
| from services.dataset_service import DatasetCollectionBindingService | from services.dataset_service import DatasetCollectionBindingService | ||||
| @shared_task(queue='dataset') | |||||
| def delete_annotation_index_task(annotation_id: str, app_id: str, tenant_id: str, | |||||
| collection_binding_id: str): | |||||
| @shared_task(queue="dataset") | |||||
| def delete_annotation_index_task(annotation_id: str, app_id: str, tenant_id: str, collection_binding_id: str): | |||||
| """ | """ | ||||
| Async delete annotation index task | Async delete annotation index task | ||||
| """ | """ | ||||
| logging.info(click.style('Start delete app annotation index: {}'.format(app_id), fg='green')) | |||||
| logging.info(click.style("Start delete app annotation index: {}".format(app_id), fg="green")) | |||||
| start_at = time.perf_counter() | start_at = time.perf_counter() | ||||
| try: | try: | ||||
| dataset_collection_binding = DatasetCollectionBindingService.get_dataset_collection_binding_by_id_and_type( | dataset_collection_binding = DatasetCollectionBindingService.get_dataset_collection_binding_by_id_and_type( | ||||
| collection_binding_id, | |||||
| 'annotation' | |||||
| collection_binding_id, "annotation" | |||||
| ) | ) | ||||
| dataset = Dataset( | dataset = Dataset( | ||||
| id=app_id, | id=app_id, | ||||
| tenant_id=tenant_id, | tenant_id=tenant_id, | ||||
| indexing_technique='high_quality', | |||||
| collection_binding_id=dataset_collection_binding.id | |||||
| indexing_technique="high_quality", | |||||
| collection_binding_id=dataset_collection_binding.id, | |||||
| ) | ) | ||||
| try: | try: | ||||
| vector = Vector(dataset, attributes=['doc_id', 'annotation_id', 'app_id']) | |||||
| vector.delete_by_metadata_field('annotation_id', annotation_id) | |||||
| vector = Vector(dataset, attributes=["doc_id", "annotation_id", "app_id"]) | |||||
| vector.delete_by_metadata_field("annotation_id", annotation_id) | |||||
| except Exception: | except Exception: | ||||
| logging.exception("Delete annotation index failed when annotation deleted.") | logging.exception("Delete annotation index failed when annotation deleted.") | ||||
| end_at = time.perf_counter() | end_at = time.perf_counter() | ||||
| logging.info( | logging.info( | ||||
| click.style('App annotations index deleted : {} latency: {}'.format(app_id, end_at - start_at), | |||||
| fg='green')) | |||||
| click.style("App annotations index deleted : {} latency: {}".format(app_id, end_at - start_at), fg="green") | |||||
| ) | |||||
| except Exception as e: | except Exception as e: | ||||
| logging.exception("Annotation deleted index failed:{}".format(str(e))) | logging.exception("Annotation deleted index failed:{}".format(str(e))) | ||||
| from models.model import App, AppAnnotationSetting, MessageAnnotation | from models.model import App, AppAnnotationSetting, MessageAnnotation | ||||
| @shared_task(queue='dataset') | |||||
| @shared_task(queue="dataset") | |||||
| def disable_annotation_reply_task(job_id: str, app_id: str, tenant_id: str): | def disable_annotation_reply_task(job_id: str, app_id: str, tenant_id: str): | ||||
| """ | """ | ||||
| Async enable annotation reply task | Async enable annotation reply task | ||||
| """ | """ | ||||
| logging.info(click.style('Start delete app annotations index: {}'.format(app_id), fg='green')) | |||||
| logging.info(click.style("Start delete app annotations index: {}".format(app_id), fg="green")) | |||||
| start_at = time.perf_counter() | start_at = time.perf_counter() | ||||
| # get app info | # get app info | ||||
| app = db.session.query(App).filter( | |||||
| App.id == app_id, | |||||
| App.tenant_id == tenant_id, | |||||
| App.status == 'normal' | |||||
| ).first() | |||||
| app = db.session.query(App).filter(App.id == app_id, App.tenant_id == tenant_id, App.status == "normal").first() | |||||
| annotations_count = db.session.query(MessageAnnotation).filter(MessageAnnotation.app_id == app_id).count() | annotations_count = db.session.query(MessageAnnotation).filter(MessageAnnotation.app_id == app_id).count() | ||||
| if not app: | if not app: | ||||
| raise NotFound("App not found") | raise NotFound("App not found") | ||||
| app_annotation_setting = db.session.query(AppAnnotationSetting).filter( | |||||
| AppAnnotationSetting.app_id == app_id | |||||
| ).first() | |||||
| app_annotation_setting = ( | |||||
| db.session.query(AppAnnotationSetting).filter(AppAnnotationSetting.app_id == app_id).first() | |||||
| ) | |||||
| if not app_annotation_setting: | if not app_annotation_setting: | ||||
| raise NotFound("App annotation setting not found") | raise NotFound("App annotation setting not found") | ||||
| disable_app_annotation_key = 'disable_app_annotation_{}'.format(str(app_id)) | |||||
| disable_app_annotation_job_key = 'disable_app_annotation_job_{}'.format(str(job_id)) | |||||
| disable_app_annotation_key = "disable_app_annotation_{}".format(str(app_id)) | |||||
| disable_app_annotation_job_key = "disable_app_annotation_job_{}".format(str(job_id)) | |||||
| try: | try: | ||||
| dataset = Dataset( | dataset = Dataset( | ||||
| id=app_id, | id=app_id, | ||||
| tenant_id=tenant_id, | tenant_id=tenant_id, | ||||
| indexing_technique='high_quality', | |||||
| collection_binding_id=app_annotation_setting.collection_binding_id | |||||
| indexing_technique="high_quality", | |||||
| collection_binding_id=app_annotation_setting.collection_binding_id, | |||||
| ) | ) | ||||
| try: | try: | ||||
| if annotations_count > 0: | if annotations_count > 0: | ||||
| vector = Vector(dataset, attributes=['doc_id', 'annotation_id', 'app_id']) | |||||
| vector.delete_by_metadata_field('app_id', app_id) | |||||
| vector = Vector(dataset, attributes=["doc_id", "annotation_id", "app_id"]) | |||||
| vector.delete_by_metadata_field("app_id", app_id) | |||||
| except Exception: | except Exception: | ||||
| logging.exception("Delete annotation index failed when annotation deleted.") | logging.exception("Delete annotation index failed when annotation deleted.") | ||||
| redis_client.setex(disable_app_annotation_job_key, 600, 'completed') | |||||
| redis_client.setex(disable_app_annotation_job_key, 600, "completed") | |||||
| # delete annotation setting | # delete annotation setting | ||||
| db.session.delete(app_annotation_setting) | db.session.delete(app_annotation_setting) | ||||
| end_at = time.perf_counter() | end_at = time.perf_counter() | ||||
| logging.info( | logging.info( | ||||
| click.style('App annotations index deleted : {} latency: {}'.format(app_id, end_at - start_at), | |||||
| fg='green')) | |||||
| click.style("App annotations index deleted : {} latency: {}".format(app_id, end_at - start_at), fg="green") | |||||
| ) | |||||
| except Exception as e: | except Exception as e: | ||||
| logging.exception("Annotation batch deleted index failed:{}".format(str(e))) | logging.exception("Annotation batch deleted index failed:{}".format(str(e))) | ||||
| redis_client.setex(disable_app_annotation_job_key, 600, 'error') | |||||
| disable_app_annotation_error_key = 'disable_app_annotation_error_{}'.format(str(job_id)) | |||||
| redis_client.setex(disable_app_annotation_job_key, 600, "error") | |||||
| disable_app_annotation_error_key = "disable_app_annotation_error_{}".format(str(job_id)) | |||||
| redis_client.setex(disable_app_annotation_error_key, 600, str(e)) | redis_client.setex(disable_app_annotation_error_key, 600, str(e)) | ||||
| finally: | finally: | ||||
| redis_client.delete(disable_app_annotation_key) | redis_client.delete(disable_app_annotation_key) |
| from services.dataset_service import DatasetCollectionBindingService | from services.dataset_service import DatasetCollectionBindingService | ||||
| @shared_task(queue='dataset') | |||||
| def enable_annotation_reply_task(job_id: str, app_id: str, user_id: str, tenant_id: str, score_threshold: float, | |||||
| embedding_provider_name: str, embedding_model_name: str): | |||||
| @shared_task(queue="dataset") | |||||
| def enable_annotation_reply_task( | |||||
| job_id: str, | |||||
| app_id: str, | |||||
| user_id: str, | |||||
| tenant_id: str, | |||||
| score_threshold: float, | |||||
| embedding_provider_name: str, | |||||
| embedding_model_name: str, | |||||
| ): | |||||
| """ | """ | ||||
| Async enable annotation reply task | Async enable annotation reply task | ||||
| """ | """ | ||||
| logging.info(click.style('Start add app annotation to index: {}'.format(app_id), fg='green')) | |||||
| logging.info(click.style("Start add app annotation to index: {}".format(app_id), fg="green")) | |||||
| start_at = time.perf_counter() | start_at = time.perf_counter() | ||||
| # get app info | # get app info | ||||
| app = db.session.query(App).filter( | |||||
| App.id == app_id, | |||||
| App.tenant_id == tenant_id, | |||||
| App.status == 'normal' | |||||
| ).first() | |||||
| app = db.session.query(App).filter(App.id == app_id, App.tenant_id == tenant_id, App.status == "normal").first() | |||||
| if not app: | if not app: | ||||
| raise NotFound("App not found") | raise NotFound("App not found") | ||||
| annotations = db.session.query(MessageAnnotation).filter(MessageAnnotation.app_id == app_id).all() | annotations = db.session.query(MessageAnnotation).filter(MessageAnnotation.app_id == app_id).all() | ||||
| enable_app_annotation_key = 'enable_app_annotation_{}'.format(str(app_id)) | |||||
| enable_app_annotation_job_key = 'enable_app_annotation_job_{}'.format(str(job_id)) | |||||
| enable_app_annotation_key = "enable_app_annotation_{}".format(str(app_id)) | |||||
| enable_app_annotation_job_key = "enable_app_annotation_job_{}".format(str(job_id)) | |||||
| try: | try: | ||||
| documents = [] | documents = [] | ||||
| dataset_collection_binding = DatasetCollectionBindingService.get_dataset_collection_binding( | dataset_collection_binding = DatasetCollectionBindingService.get_dataset_collection_binding( | ||||
| embedding_provider_name, | |||||
| embedding_model_name, | |||||
| 'annotation' | |||||
| embedding_provider_name, embedding_model_name, "annotation" | |||||
| ) | |||||
| annotation_setting = ( | |||||
| db.session.query(AppAnnotationSetting).filter(AppAnnotationSetting.app_id == app_id).first() | |||||
| ) | ) | ||||
| annotation_setting = db.session.query(AppAnnotationSetting).filter( | |||||
| AppAnnotationSetting.app_id == app_id).first() | |||||
| if annotation_setting: | if annotation_setting: | ||||
| annotation_setting.score_threshold = score_threshold | annotation_setting.score_threshold = score_threshold | ||||
| annotation_setting.collection_binding_id = dataset_collection_binding.id | annotation_setting.collection_binding_id = dataset_collection_binding.id | ||||
| score_threshold=score_threshold, | score_threshold=score_threshold, | ||||
| collection_binding_id=dataset_collection_binding.id, | collection_binding_id=dataset_collection_binding.id, | ||||
| created_user_id=user_id, | created_user_id=user_id, | ||||
| updated_user_id=user_id | |||||
| updated_user_id=user_id, | |||||
| ) | ) | ||||
| db.session.add(new_app_annotation_setting) | db.session.add(new_app_annotation_setting) | ||||
| dataset = Dataset( | dataset = Dataset( | ||||
| id=app_id, | id=app_id, | ||||
| tenant_id=tenant_id, | tenant_id=tenant_id, | ||||
| indexing_technique='high_quality', | |||||
| indexing_technique="high_quality", | |||||
| embedding_model_provider=embedding_provider_name, | embedding_model_provider=embedding_provider_name, | ||||
| embedding_model=embedding_model_name, | embedding_model=embedding_model_name, | ||||
| collection_binding_id=dataset_collection_binding.id | |||||
| collection_binding_id=dataset_collection_binding.id, | |||||
| ) | ) | ||||
| if annotations: | if annotations: | ||||
| for annotation in annotations: | for annotation in annotations: | ||||
| document = Document( | document = Document( | ||||
| page_content=annotation.question, | page_content=annotation.question, | ||||
| metadata={ | |||||
| "annotation_id": annotation.id, | |||||
| "app_id": app_id, | |||||
| "doc_id": annotation.id | |||||
| } | |||||
| metadata={"annotation_id": annotation.id, "app_id": app_id, "doc_id": annotation.id}, | |||||
| ) | ) | ||||
| documents.append(document) | documents.append(document) | ||||
| vector = Vector(dataset, attributes=['doc_id', 'annotation_id', 'app_id']) | |||||
| vector = Vector(dataset, attributes=["doc_id", "annotation_id", "app_id"]) | |||||
| try: | try: | ||||
| vector.delete_by_metadata_field('app_id', app_id) | |||||
| vector.delete_by_metadata_field("app_id", app_id) | |||||
| except Exception as e: | except Exception as e: | ||||
| logging.info( | |||||
| click.style('Delete annotation index error: {}'.format(str(e)), | |||||
| fg='red')) | |||||
| logging.info(click.style("Delete annotation index error: {}".format(str(e)), fg="red")) | |||||
| vector.create(documents) | vector.create(documents) | ||||
| db.session.commit() | db.session.commit() | ||||
| redis_client.setex(enable_app_annotation_job_key, 600, 'completed') | |||||
| redis_client.setex(enable_app_annotation_job_key, 600, "completed") | |||||
| end_at = time.perf_counter() | end_at = time.perf_counter() | ||||
| logging.info( | logging.info( | ||||
| click.style('App annotations added to index: {} latency: {}'.format(app_id, end_at - start_at), | |||||
| fg='green')) | |||||
| click.style("App annotations added to index: {} latency: {}".format(app_id, end_at - start_at), fg="green") | |||||
| ) | |||||
| except Exception as e: | except Exception as e: | ||||
| logging.exception("Annotation batch created index failed:{}".format(str(e))) | logging.exception("Annotation batch created index failed:{}".format(str(e))) | ||||
| redis_client.setex(enable_app_annotation_job_key, 600, 'error') | |||||
| enable_app_annotation_error_key = 'enable_app_annotation_error_{}'.format(str(job_id)) | |||||
| redis_client.setex(enable_app_annotation_job_key, 600, "error") | |||||
| enable_app_annotation_error_key = "enable_app_annotation_error_{}".format(str(job_id)) | |||||
| redis_client.setex(enable_app_annotation_error_key, 600, str(e)) | redis_client.setex(enable_app_annotation_error_key, 600, str(e)) | ||||
| db.session.rollback() | db.session.rollback() | ||||
| finally: | finally: |
| from services.dataset_service import DatasetCollectionBindingService | from services.dataset_service import DatasetCollectionBindingService | ||||
| @shared_task(queue='dataset') | |||||
| def update_annotation_to_index_task(annotation_id: str, question: str, tenant_id: str, app_id: str, | |||||
| collection_binding_id: str): | |||||
| @shared_task(queue="dataset") | |||||
| def update_annotation_to_index_task( | |||||
| annotation_id: str, question: str, tenant_id: str, app_id: str, collection_binding_id: str | |||||
| ): | |||||
| """ | """ | ||||
| Update annotation to index. | Update annotation to index. | ||||
| :param annotation_id: annotation id | :param annotation_id: annotation id | ||||
| Usage: clean_dataset_task.delay(dataset_id, tenant_id, indexing_technique, index_struct) | Usage: clean_dataset_task.delay(dataset_id, tenant_id, indexing_technique, index_struct) | ||||
| """ | """ | ||||
| logging.info(click.style('Start update index for annotation: {}'.format(annotation_id), fg='green')) | |||||
| logging.info(click.style("Start update index for annotation: {}".format(annotation_id), fg="green")) | |||||
| start_at = time.perf_counter() | start_at = time.perf_counter() | ||||
| try: | try: | ||||
| dataset_collection_binding = DatasetCollectionBindingService.get_dataset_collection_binding_by_id_and_type( | dataset_collection_binding = DatasetCollectionBindingService.get_dataset_collection_binding_by_id_and_type( | ||||
| collection_binding_id, | |||||
| 'annotation' | |||||
| collection_binding_id, "annotation" | |||||
| ) | ) | ||||
| dataset = Dataset( | dataset = Dataset( | ||||
| id=app_id, | id=app_id, | ||||
| tenant_id=tenant_id, | tenant_id=tenant_id, | ||||
| indexing_technique='high_quality', | |||||
| indexing_technique="high_quality", | |||||
| embedding_model_provider=dataset_collection_binding.provider_name, | embedding_model_provider=dataset_collection_binding.provider_name, | ||||
| embedding_model=dataset_collection_binding.model_name, | embedding_model=dataset_collection_binding.model_name, | ||||
| collection_binding_id=dataset_collection_binding.id | |||||
| collection_binding_id=dataset_collection_binding.id, | |||||
| ) | ) | ||||
| document = Document( | document = Document( | ||||
| page_content=question, | |||||
| metadata={ | |||||
| "annotation_id": annotation_id, | |||||
| "app_id": app_id, | |||||
| "doc_id": annotation_id | |||||
| } | |||||
| page_content=question, metadata={"annotation_id": annotation_id, "app_id": app_id, "doc_id": annotation_id} | |||||
| ) | ) | ||||
| vector = Vector(dataset, attributes=['doc_id', 'annotation_id', 'app_id']) | |||||
| vector.delete_by_metadata_field('annotation_id', annotation_id) | |||||
| vector = Vector(dataset, attributes=["doc_id", "annotation_id", "app_id"]) | |||||
| vector.delete_by_metadata_field("annotation_id", annotation_id) | |||||
| vector.add_texts([document]) | vector.add_texts([document]) | ||||
| end_at = time.perf_counter() | end_at = time.perf_counter() | ||||
| logging.info( | logging.info( | ||||
| click.style( | click.style( | ||||
| 'Build index successful for annotation: {} latency: {}'.format(annotation_id, end_at - start_at), | |||||
| fg='green')) | |||||
| "Build index successful for annotation: {} latency: {}".format(annotation_id, end_at - start_at), | |||||
| fg="green", | |||||
| ) | |||||
| ) | |||||
| except Exception: | except Exception: | ||||
| logging.exception("Build index for annotation failed") | logging.exception("Build index for annotation failed") |
| from models.dataset import Dataset, Document, DocumentSegment | from models.dataset import Dataset, Document, DocumentSegment | ||||
| @shared_task(queue='dataset') | |||||
| def batch_create_segment_to_index_task(job_id: str, content: list, dataset_id: str, document_id: str, | |||||
| tenant_id: str, user_id: str): | |||||
| @shared_task(queue="dataset") | |||||
| def batch_create_segment_to_index_task( | |||||
| job_id: str, content: list, dataset_id: str, document_id: str, tenant_id: str, user_id: str | |||||
| ): | |||||
| """ | """ | ||||
| Async batch create segment to index | Async batch create segment to index | ||||
| :param job_id: | :param job_id: | ||||
| Usage: batch_create_segment_to_index_task.delay(segment_id) | Usage: batch_create_segment_to_index_task.delay(segment_id) | ||||
| """ | """ | ||||
| logging.info(click.style('Start batch create segment jobId: {}'.format(job_id), fg='green')) | |||||
| logging.info(click.style("Start batch create segment jobId: {}".format(job_id), fg="green")) | |||||
| start_at = time.perf_counter() | start_at = time.perf_counter() | ||||
| indexing_cache_key = 'segment_batch_import_{}'.format(job_id) | |||||
| indexing_cache_key = "segment_batch_import_{}".format(job_id) | |||||
| try: | try: | ||||
| dataset = db.session.query(Dataset).filter(Dataset.id == dataset_id).first() | dataset = db.session.query(Dataset).filter(Dataset.id == dataset_id).first() | ||||
| if not dataset: | if not dataset: | ||||
| raise ValueError('Dataset not exist.') | |||||
| raise ValueError("Dataset not exist.") | |||||
| dataset_document = db.session.query(Document).filter(Document.id == document_id).first() | dataset_document = db.session.query(Document).filter(Document.id == document_id).first() | ||||
| if not dataset_document: | if not dataset_document: | ||||
| raise ValueError('Document not exist.') | |||||
| raise ValueError("Document not exist.") | |||||
| if not dataset_document.enabled or dataset_document.archived or dataset_document.indexing_status != 'completed': | |||||
| raise ValueError('Document is not available.') | |||||
| if not dataset_document.enabled or dataset_document.archived or dataset_document.indexing_status != "completed": | |||||
| raise ValueError("Document is not available.") | |||||
| document_segments = [] | document_segments = [] | ||||
| embedding_model = None | embedding_model = None | ||||
| if dataset.indexing_technique == 'high_quality': | |||||
| if dataset.indexing_technique == "high_quality": | |||||
| model_manager = ModelManager() | model_manager = ModelManager() | ||||
| embedding_model = model_manager.get_model_instance( | embedding_model = model_manager.get_model_instance( | ||||
| tenant_id=dataset.tenant_id, | tenant_id=dataset.tenant_id, | ||||
| provider=dataset.embedding_model_provider, | provider=dataset.embedding_model_provider, | ||||
| model_type=ModelType.TEXT_EMBEDDING, | model_type=ModelType.TEXT_EMBEDDING, | ||||
| model=dataset.embedding_model | |||||
| model=dataset.embedding_model, | |||||
| ) | ) | ||||
| for segment in content: | for segment in content: | ||||
| content = segment['content'] | |||||
| content = segment["content"] | |||||
| doc_id = str(uuid.uuid4()) | doc_id = str(uuid.uuid4()) | ||||
| segment_hash = helper.generate_text_hash(content) | segment_hash = helper.generate_text_hash(content) | ||||
| # calc embedding use tokens | # calc embedding use tokens | ||||
| tokens = embedding_model.get_text_embedding_num_tokens( | |||||
| texts=[content] | |||||
| ) if embedding_model else 0 | |||||
| max_position = db.session.query(func.max(DocumentSegment.position)).filter( | |||||
| DocumentSegment.document_id == dataset_document.id | |||||
| ).scalar() | |||||
| tokens = embedding_model.get_text_embedding_num_tokens(texts=[content]) if embedding_model else 0 | |||||
| max_position = ( | |||||
| db.session.query(func.max(DocumentSegment.position)) | |||||
| .filter(DocumentSegment.document_id == dataset_document.id) | |||||
| .scalar() | |||||
| ) | |||||
| segment_document = DocumentSegment( | segment_document = DocumentSegment( | ||||
| tenant_id=tenant_id, | tenant_id=tenant_id, | ||||
| dataset_id=dataset_id, | dataset_id=dataset_id, | ||||
| tokens=tokens, | tokens=tokens, | ||||
| created_by=user_id, | created_by=user_id, | ||||
| indexing_at=datetime.datetime.now(datetime.timezone.utc).replace(tzinfo=None), | indexing_at=datetime.datetime.now(datetime.timezone.utc).replace(tzinfo=None), | ||||
| status='completed', | |||||
| completed_at=datetime.datetime.now(datetime.timezone.utc).replace(tzinfo=None) | |||||
| status="completed", | |||||
| completed_at=datetime.datetime.now(datetime.timezone.utc).replace(tzinfo=None), | |||||
| ) | ) | ||||
| if dataset_document.doc_form == 'qa_model': | |||||
| segment_document.answer = segment['answer'] | |||||
| if dataset_document.doc_form == "qa_model": | |||||
| segment_document.answer = segment["answer"] | |||||
| db.session.add(segment_document) | db.session.add(segment_document) | ||||
| document_segments.append(segment_document) | document_segments.append(segment_document) | ||||
| # add index to db | # add index to db | ||||
| indexing_runner = IndexingRunner() | indexing_runner = IndexingRunner() | ||||
| indexing_runner.batch_add_segments(document_segments, dataset) | indexing_runner.batch_add_segments(document_segments, dataset) | ||||
| db.session.commit() | db.session.commit() | ||||
| redis_client.setex(indexing_cache_key, 600, 'completed') | |||||
| redis_client.setex(indexing_cache_key, 600, "completed") | |||||
| end_at = time.perf_counter() | end_at = time.perf_counter() | ||||
| logging.info(click.style('Segment batch created job: {} latency: {}'.format(job_id, end_at - start_at), fg='green')) | |||||
| logging.info( | |||||
| click.style("Segment batch created job: {} latency: {}".format(job_id, end_at - start_at), fg="green") | |||||
| ) | |||||
| except Exception as e: | except Exception as e: | ||||
| logging.exception("Segments batch created index failed:{}".format(str(e))) | logging.exception("Segments batch created index failed:{}".format(str(e))) | ||||
| redis_client.setex(indexing_cache_key, 600, 'error') | |||||
| redis_client.setex(indexing_cache_key, 600, "error") |
| # Add import statement for ValueError | # Add import statement for ValueError | ||||
| @shared_task(queue='dataset') | |||||
| def clean_dataset_task(dataset_id: str, tenant_id: str, indexing_technique: str, | |||||
| index_struct: str, collection_binding_id: str, doc_form: str): | |||||
| @shared_task(queue="dataset") | |||||
| def clean_dataset_task( | |||||
| dataset_id: str, | |||||
| tenant_id: str, | |||||
| indexing_technique: str, | |||||
| index_struct: str, | |||||
| collection_binding_id: str, | |||||
| doc_form: str, | |||||
| ): | |||||
| """ | """ | ||||
| Clean dataset when dataset deleted. | Clean dataset when dataset deleted. | ||||
| :param dataset_id: dataset id | :param dataset_id: dataset id | ||||
| Usage: clean_dataset_task.delay(dataset_id, tenant_id, indexing_technique, index_struct) | Usage: clean_dataset_task.delay(dataset_id, tenant_id, indexing_technique, index_struct) | ||||
| """ | """ | ||||
| logging.info(click.style('Start clean dataset when dataset deleted: {}'.format(dataset_id), fg='green')) | |||||
| logging.info(click.style("Start clean dataset when dataset deleted: {}".format(dataset_id), fg="green")) | |||||
| start_at = time.perf_counter() | start_at = time.perf_counter() | ||||
| try: | try: | ||||
| segments = db.session.query(DocumentSegment).filter(DocumentSegment.dataset_id == dataset_id).all() | segments = db.session.query(DocumentSegment).filter(DocumentSegment.dataset_id == dataset_id).all() | ||||
| if documents is None or len(documents) == 0: | if documents is None or len(documents) == 0: | ||||
| logging.info(click.style('No documents found for dataset: {}'.format(dataset_id), fg='green')) | |||||
| logging.info(click.style("No documents found for dataset: {}".format(dataset_id), fg="green")) | |||||
| else: | else: | ||||
| logging.info(click.style('Cleaning documents for dataset: {}'.format(dataset_id), fg='green')) | |||||
| logging.info(click.style("Cleaning documents for dataset: {}".format(dataset_id), fg="green")) | |||||
| # Specify the index type before initializing the index processor | # Specify the index type before initializing the index processor | ||||
| if doc_form is None: | if doc_form is None: | ||||
| raise ValueError("Index type must be specified.") | raise ValueError("Index type must be specified.") | ||||
| if documents: | if documents: | ||||
| for document in documents: | for document in documents: | ||||
| try: | try: | ||||
| if document.data_source_type == 'upload_file': | |||||
| if document.data_source_type == "upload_file": | |||||
| if document.data_source_info: | if document.data_source_info: | ||||
| data_source_info = document.data_source_info_dict | data_source_info = document.data_source_info_dict | ||||
| if data_source_info and 'upload_file_id' in data_source_info: | |||||
| file_id = data_source_info['upload_file_id'] | |||||
| file = db.session.query(UploadFile).filter( | |||||
| UploadFile.tenant_id == document.tenant_id, | |||||
| UploadFile.id == file_id | |||||
| ).first() | |||||
| if data_source_info and "upload_file_id" in data_source_info: | |||||
| file_id = data_source_info["upload_file_id"] | |||||
| file = ( | |||||
| db.session.query(UploadFile) | |||||
| .filter(UploadFile.tenant_id == document.tenant_id, UploadFile.id == file_id) | |||||
| .first() | |||||
| ) | |||||
| if not file: | if not file: | ||||
| continue | continue | ||||
| storage.delete(file.key) | storage.delete(file.key) | ||||
| db.session.commit() | db.session.commit() | ||||
| end_at = time.perf_counter() | end_at = time.perf_counter() | ||||
| logging.info( | logging.info( | ||||
| click.style('Cleaned dataset when dataset deleted: {} latency: {}'.format(dataset_id, end_at - start_at), fg='green')) | |||||
| click.style( | |||||
| "Cleaned dataset when dataset deleted: {} latency: {}".format(dataset_id, end_at - start_at), fg="green" | |||||
| ) | |||||
| ) | |||||
| except Exception: | except Exception: | ||||
| logging.exception("Cleaned dataset when dataset deleted failed") | logging.exception("Cleaned dataset when dataset deleted failed") |
| from models.model import UploadFile | from models.model import UploadFile | ||||
| @shared_task(queue='dataset') | |||||
| @shared_task(queue="dataset") | |||||
| def clean_document_task(document_id: str, dataset_id: str, doc_form: str, file_id: Optional[str]): | def clean_document_task(document_id: str, dataset_id: str, doc_form: str, file_id: Optional[str]): | ||||
| """ | """ | ||||
| Clean document when document deleted. | Clean document when document deleted. | ||||
| Usage: clean_document_task.delay(document_id, dataset_id) | Usage: clean_document_task.delay(document_id, dataset_id) | ||||
| """ | """ | ||||
| logging.info(click.style('Start clean document when document deleted: {}'.format(document_id), fg='green')) | |||||
| logging.info(click.style("Start clean document when document deleted: {}".format(document_id), fg="green")) | |||||
| start_at = time.perf_counter() | start_at = time.perf_counter() | ||||
| try: | try: | ||||
| dataset = db.session.query(Dataset).filter(Dataset.id == dataset_id).first() | dataset = db.session.query(Dataset).filter(Dataset.id == dataset_id).first() | ||||
| if not dataset: | if not dataset: | ||||
| raise Exception('Document has no dataset') | |||||
| raise Exception("Document has no dataset") | |||||
| segments = db.session.query(DocumentSegment).filter(DocumentSegment.document_id == document_id).all() | segments = db.session.query(DocumentSegment).filter(DocumentSegment.document_id == document_id).all() | ||||
| # check segment is exist | # check segment is exist | ||||
| db.session.commit() | db.session.commit() | ||||
| if file_id: | if file_id: | ||||
| file = db.session.query(UploadFile).filter( | |||||
| UploadFile.id == file_id | |||||
| ).first() | |||||
| file = db.session.query(UploadFile).filter(UploadFile.id == file_id).first() | |||||
| if file: | if file: | ||||
| try: | try: | ||||
| storage.delete(file.key) | storage.delete(file.key) | ||||
| end_at = time.perf_counter() | end_at = time.perf_counter() | ||||
| logging.info( | logging.info( | ||||
| click.style('Cleaned document when document deleted: {} latency: {}'.format(document_id, end_at - start_at), fg='green')) | |||||
| click.style( | |||||
| "Cleaned document when document deleted: {} latency: {}".format(document_id, end_at - start_at), | |||||
| fg="green", | |||||
| ) | |||||
| ) | |||||
| except Exception: | except Exception: | ||||
| logging.exception("Cleaned document when document deleted failed") | logging.exception("Cleaned document when document deleted failed") |
| from models.dataset import Dataset, Document, DocumentSegment | from models.dataset import Dataset, Document, DocumentSegment | ||||
| @shared_task(queue='dataset') | |||||
| @shared_task(queue="dataset") | |||||
| def clean_notion_document_task(document_ids: list[str], dataset_id: str): | def clean_notion_document_task(document_ids: list[str], dataset_id: str): | ||||
| """ | """ | ||||
| Clean document when document deleted. | Clean document when document deleted. | ||||
| Usage: clean_notion_document_task.delay(document_ids, dataset_id) | Usage: clean_notion_document_task.delay(document_ids, dataset_id) | ||||
| """ | """ | ||||
| logging.info(click.style('Start clean document when import form notion document deleted: {}'.format(dataset_id), fg='green')) | |||||
| logging.info( | |||||
| click.style("Start clean document when import form notion document deleted: {}".format(dataset_id), fg="green") | |||||
| ) | |||||
| start_at = time.perf_counter() | start_at = time.perf_counter() | ||||
| try: | try: | ||||
| dataset = db.session.query(Dataset).filter(Dataset.id == dataset_id).first() | dataset = db.session.query(Dataset).filter(Dataset.id == dataset_id).first() | ||||
| if not dataset: | if not dataset: | ||||
| raise Exception('Document has no dataset') | |||||
| raise Exception("Document has no dataset") | |||||
| index_type = dataset.doc_form | index_type = dataset.doc_form | ||||
| index_processor = IndexProcessorFactory(index_type).init_index_processor() | index_processor = IndexProcessorFactory(index_type).init_index_processor() | ||||
| for document_id in document_ids: | for document_id in document_ids: | ||||
| document = db.session.query(Document).filter( | |||||
| Document.id == document_id | |||||
| ).first() | |||||
| document = db.session.query(Document).filter(Document.id == document_id).first() | |||||
| db.session.delete(document) | db.session.delete(document) | ||||
| segments = db.session.query(DocumentSegment).filter(DocumentSegment.document_id == document_id).all() | segments = db.session.query(DocumentSegment).filter(DocumentSegment.document_id == document_id).all() | ||||
| db.session.commit() | db.session.commit() | ||||
| end_at = time.perf_counter() | end_at = time.perf_counter() | ||||
| logging.info( | logging.info( | ||||
| click.style('Clean document when import form notion document deleted end :: {} latency: {}'.format( | |||||
| dataset_id, end_at - start_at), | |||||
| fg='green')) | |||||
| click.style( | |||||
| "Clean document when import form notion document deleted end :: {} latency: {}".format( | |||||
| dataset_id, end_at - start_at | |||||
| ), | |||||
| fg="green", | |||||
| ) | |||||
| ) | |||||
| except Exception: | except Exception: | ||||
| logging.exception("Cleaned document when import form notion document deleted failed") | logging.exception("Cleaned document when import form notion document deleted failed") |
| from models.dataset import DocumentSegment | from models.dataset import DocumentSegment | ||||
| @shared_task(queue='dataset') | |||||
| @shared_task(queue="dataset") | |||||
| def create_segment_to_index_task(segment_id: str, keywords: Optional[list[str]] = None): | def create_segment_to_index_task(segment_id: str, keywords: Optional[list[str]] = None): | ||||
| """ | """ | ||||
| Async create segment to index | Async create segment to index | ||||
| :param keywords: | :param keywords: | ||||
| Usage: create_segment_to_index_task.delay(segment_id) | Usage: create_segment_to_index_task.delay(segment_id) | ||||
| """ | """ | ||||
| logging.info(click.style('Start create segment to index: {}'.format(segment_id), fg='green')) | |||||
| logging.info(click.style("Start create segment to index: {}".format(segment_id), fg="green")) | |||||
| start_at = time.perf_counter() | start_at = time.perf_counter() | ||||
| segment = db.session.query(DocumentSegment).filter(DocumentSegment.id == segment_id).first() | segment = db.session.query(DocumentSegment).filter(DocumentSegment.id == segment_id).first() | ||||
| if not segment: | if not segment: | ||||
| raise NotFound('Segment not found') | |||||
| raise NotFound("Segment not found") | |||||
| if segment.status != 'waiting': | |||||
| if segment.status != "waiting": | |||||
| return | return | ||||
| indexing_cache_key = 'segment_{}_indexing'.format(segment.id) | |||||
| indexing_cache_key = "segment_{}_indexing".format(segment.id) | |||||
| try: | try: | ||||
| # update segment status to indexing | # update segment status to indexing | ||||
| update_params = { | update_params = { | ||||
| DocumentSegment.status: "indexing", | DocumentSegment.status: "indexing", | ||||
| DocumentSegment.indexing_at: datetime.datetime.now(datetime.timezone.utc).replace(tzinfo=None) | |||||
| DocumentSegment.indexing_at: datetime.datetime.now(datetime.timezone.utc).replace(tzinfo=None), | |||||
| } | } | ||||
| DocumentSegment.query.filter_by(id=segment.id).update(update_params) | DocumentSegment.query.filter_by(id=segment.id).update(update_params) | ||||
| db.session.commit() | db.session.commit() | ||||
| "doc_hash": segment.index_node_hash, | "doc_hash": segment.index_node_hash, | ||||
| "document_id": segment.document_id, | "document_id": segment.document_id, | ||||
| "dataset_id": segment.dataset_id, | "dataset_id": segment.dataset_id, | ||||
| } | |||||
| }, | |||||
| ) | ) | ||||
| dataset = segment.dataset | dataset = segment.dataset | ||||
| if not dataset: | if not dataset: | ||||
| logging.info(click.style('Segment {} has no dataset, pass.'.format(segment.id), fg='cyan')) | |||||
| logging.info(click.style("Segment {} has no dataset, pass.".format(segment.id), fg="cyan")) | |||||
| return | return | ||||
| dataset_document = segment.document | dataset_document = segment.document | ||||
| if not dataset_document: | if not dataset_document: | ||||
| logging.info(click.style('Segment {} has no document, pass.'.format(segment.id), fg='cyan')) | |||||
| logging.info(click.style("Segment {} has no document, pass.".format(segment.id), fg="cyan")) | |||||
| return | return | ||||
| if not dataset_document.enabled or dataset_document.archived or dataset_document.indexing_status != 'completed': | |||||
| logging.info(click.style('Segment {} document status is invalid, pass.'.format(segment.id), fg='cyan')) | |||||
| if not dataset_document.enabled or dataset_document.archived or dataset_document.indexing_status != "completed": | |||||
| logging.info(click.style("Segment {} document status is invalid, pass.".format(segment.id), fg="cyan")) | |||||
| return | return | ||||
| index_type = dataset.doc_form | index_type = dataset.doc_form | ||||
| # update segment to completed | # update segment to completed | ||||
| update_params = { | update_params = { | ||||
| DocumentSegment.status: "completed", | DocumentSegment.status: "completed", | ||||
| DocumentSegment.completed_at: datetime.datetime.now(datetime.timezone.utc).replace(tzinfo=None) | |||||
| DocumentSegment.completed_at: datetime.datetime.now(datetime.timezone.utc).replace(tzinfo=None), | |||||
| } | } | ||||
| DocumentSegment.query.filter_by(id=segment.id).update(update_params) | DocumentSegment.query.filter_by(id=segment.id).update(update_params) | ||||
| db.session.commit() | db.session.commit() | ||||
| end_at = time.perf_counter() | end_at = time.perf_counter() | ||||
| logging.info(click.style('Segment created to index: {} latency: {}'.format(segment.id, end_at - start_at), fg='green')) | |||||
| logging.info( | |||||
| click.style("Segment created to index: {} latency: {}".format(segment.id, end_at - start_at), fg="green") | |||||
| ) | |||||
| except Exception as e: | except Exception as e: | ||||
| logging.exception("create segment to index failed") | logging.exception("create segment to index failed") | ||||
| segment.enabled = False | segment.enabled = False | ||||
| segment.disabled_at = datetime.datetime.now(datetime.timezone.utc).replace(tzinfo=None) | segment.disabled_at = datetime.datetime.now(datetime.timezone.utc).replace(tzinfo=None) | ||||
| segment.status = 'error' | |||||
| segment.status = "error" | |||||
| segment.error = str(e) | segment.error = str(e) | ||||
| db.session.commit() | db.session.commit() | ||||
| finally: | finally: |
| from models.dataset import Document as DatasetDocument | from models.dataset import Document as DatasetDocument | ||||
| @shared_task(queue='dataset') | |||||
| @shared_task(queue="dataset") | |||||
| def deal_dataset_vector_index_task(dataset_id: str, action: str): | def deal_dataset_vector_index_task(dataset_id: str, action: str): | ||||
| """ | """ | ||||
| Async deal dataset from index | Async deal dataset from index | ||||
| :param action: action | :param action: action | ||||
| Usage: deal_dataset_vector_index_task.delay(dataset_id, action) | Usage: deal_dataset_vector_index_task.delay(dataset_id, action) | ||||
| """ | """ | ||||
| logging.info(click.style('Start deal dataset vector index: {}'.format(dataset_id), fg='green')) | |||||
| logging.info(click.style("Start deal dataset vector index: {}".format(dataset_id), fg="green")) | |||||
| start_at = time.perf_counter() | start_at = time.perf_counter() | ||||
| try: | try: | ||||
| dataset = Dataset.query.filter_by( | |||||
| id=dataset_id | |||||
| ).first() | |||||
| dataset = Dataset.query.filter_by(id=dataset_id).first() | |||||
| if not dataset: | if not dataset: | ||||
| raise Exception('Dataset not found') | |||||
| raise Exception("Dataset not found") | |||||
| index_type = dataset.doc_form | index_type = dataset.doc_form | ||||
| index_processor = IndexProcessorFactory(index_type).init_index_processor() | index_processor = IndexProcessorFactory(index_type).init_index_processor() | ||||
| if action == "remove": | if action == "remove": | ||||
| index_processor.clean(dataset, None, with_keywords=False) | index_processor.clean(dataset, None, with_keywords=False) | ||||
| elif action == "add": | elif action == "add": | ||||
| dataset_documents = db.session.query(DatasetDocument).filter( | |||||
| DatasetDocument.dataset_id == dataset_id, | |||||
| DatasetDocument.indexing_status == 'completed', | |||||
| DatasetDocument.enabled == True, | |||||
| DatasetDocument.archived == False, | |||||
| ).all() | |||||
| dataset_documents = ( | |||||
| db.session.query(DatasetDocument) | |||||
| .filter( | |||||
| DatasetDocument.dataset_id == dataset_id, | |||||
| DatasetDocument.indexing_status == "completed", | |||||
| DatasetDocument.enabled == True, | |||||
| DatasetDocument.archived == False, | |||||
| ) | |||||
| .all() | |||||
| ) | |||||
| if dataset_documents: | if dataset_documents: | ||||
| dataset_documents_ids = [doc.id for doc in dataset_documents] | dataset_documents_ids = [doc.id for doc in dataset_documents] | ||||
| db.session.query(DatasetDocument).filter(DatasetDocument.id.in_(dataset_documents_ids)) \ | |||||
| .update({"indexing_status": "indexing"}, synchronize_session=False) | |||||
| db.session.query(DatasetDocument).filter(DatasetDocument.id.in_(dataset_documents_ids)).update( | |||||
| {"indexing_status": "indexing"}, synchronize_session=False | |||||
| ) | |||||
| db.session.commit() | db.session.commit() | ||||
| for dataset_document in dataset_documents: | for dataset_document in dataset_documents: | ||||
| try: | try: | ||||
| # add from vector index | # add from vector index | ||||
| segments = db.session.query(DocumentSegment).filter( | |||||
| DocumentSegment.document_id == dataset_document.id, | |||||
| DocumentSegment.enabled == True | |||||
| ) .order_by(DocumentSegment.position.asc()).all() | |||||
| segments = ( | |||||
| db.session.query(DocumentSegment) | |||||
| .filter(DocumentSegment.document_id == dataset_document.id, DocumentSegment.enabled == True) | |||||
| .order_by(DocumentSegment.position.asc()) | |||||
| .all() | |||||
| ) | |||||
| if segments: | if segments: | ||||
| documents = [] | documents = [] | ||||
| for segment in segments: | for segment in segments: | ||||
| "doc_hash": segment.index_node_hash, | "doc_hash": segment.index_node_hash, | ||||
| "document_id": segment.document_id, | "document_id": segment.document_id, | ||||
| "dataset_id": segment.dataset_id, | "dataset_id": segment.dataset_id, | ||||
| } | |||||
| }, | |||||
| ) | ) | ||||
| documents.append(document) | documents.append(document) | ||||
| # save vector index | # save vector index | ||||
| index_processor.load(dataset, documents, with_keywords=False) | index_processor.load(dataset, documents, with_keywords=False) | ||||
| db.session.query(DatasetDocument).filter(DatasetDocument.id == dataset_document.id) \ | |||||
| .update({"indexing_status": "completed"}, synchronize_session=False) | |||||
| db.session.query(DatasetDocument).filter(DatasetDocument.id == dataset_document.id).update( | |||||
| {"indexing_status": "completed"}, synchronize_session=False | |||||
| ) | |||||
| db.session.commit() | db.session.commit() | ||||
| except Exception as e: | except Exception as e: | ||||
| db.session.query(DatasetDocument).filter(DatasetDocument.id == dataset_document.id) \ | |||||
| .update({"indexing_status": "error", "error": str(e)}, synchronize_session=False) | |||||
| db.session.query(DatasetDocument).filter(DatasetDocument.id == dataset_document.id).update( | |||||
| {"indexing_status": "error", "error": str(e)}, synchronize_session=False | |||||
| ) | |||||
| db.session.commit() | db.session.commit() | ||||
| elif action == 'update': | |||||
| dataset_documents = db.session.query(DatasetDocument).filter( | |||||
| DatasetDocument.dataset_id == dataset_id, | |||||
| DatasetDocument.indexing_status == 'completed', | |||||
| DatasetDocument.enabled == True, | |||||
| DatasetDocument.archived == False, | |||||
| ).all() | |||||
| elif action == "update": | |||||
| dataset_documents = ( | |||||
| db.session.query(DatasetDocument) | |||||
| .filter( | |||||
| DatasetDocument.dataset_id == dataset_id, | |||||
| DatasetDocument.indexing_status == "completed", | |||||
| DatasetDocument.enabled == True, | |||||
| DatasetDocument.archived == False, | |||||
| ) | |||||
| .all() | |||||
| ) | |||||
| # add new index | # add new index | ||||
| if dataset_documents: | if dataset_documents: | ||||
| # update document status | # update document status | ||||
| dataset_documents_ids = [doc.id for doc in dataset_documents] | dataset_documents_ids = [doc.id for doc in dataset_documents] | ||||
| db.session.query(DatasetDocument).filter(DatasetDocument.id.in_(dataset_documents_ids)) \ | |||||
| .update({"indexing_status": "indexing"}, synchronize_session=False) | |||||
| db.session.query(DatasetDocument).filter(DatasetDocument.id.in_(dataset_documents_ids)).update( | |||||
| {"indexing_status": "indexing"}, synchronize_session=False | |||||
| ) | |||||
| db.session.commit() | db.session.commit() | ||||
| # clean index | # clean index | ||||
| for dataset_document in dataset_documents: | for dataset_document in dataset_documents: | ||||
| # update from vector index | # update from vector index | ||||
| try: | try: | ||||
| segments = db.session.query(DocumentSegment).filter( | |||||
| DocumentSegment.document_id == dataset_document.id, | |||||
| DocumentSegment.enabled == True | |||||
| ).order_by(DocumentSegment.position.asc()).all() | |||||
| segments = ( | |||||
| db.session.query(DocumentSegment) | |||||
| .filter(DocumentSegment.document_id == dataset_document.id, DocumentSegment.enabled == True) | |||||
| .order_by(DocumentSegment.position.asc()) | |||||
| .all() | |||||
| ) | |||||
| if segments: | if segments: | ||||
| documents = [] | documents = [] | ||||
| for segment in segments: | for segment in segments: | ||||
| "doc_hash": segment.index_node_hash, | "doc_hash": segment.index_node_hash, | ||||
| "document_id": segment.document_id, | "document_id": segment.document_id, | ||||
| "dataset_id": segment.dataset_id, | "dataset_id": segment.dataset_id, | ||||
| } | |||||
| }, | |||||
| ) | ) | ||||
| documents.append(document) | documents.append(document) | ||||
| # save vector index | # save vector index | ||||
| index_processor.load(dataset, documents, with_keywords=False) | index_processor.load(dataset, documents, with_keywords=False) | ||||
| db.session.query(DatasetDocument).filter(DatasetDocument.id == dataset_document.id) \ | |||||
| .update({"indexing_status": "completed"}, synchronize_session=False) | |||||
| db.session.query(DatasetDocument).filter(DatasetDocument.id == dataset_document.id).update( | |||||
| {"indexing_status": "completed"}, synchronize_session=False | |||||
| ) | |||||
| db.session.commit() | db.session.commit() | ||||
| except Exception as e: | except Exception as e: | ||||
| db.session.query(DatasetDocument).filter(DatasetDocument.id == dataset_document.id) \ | |||||
| .update({"indexing_status": "error", "error": str(e)}, synchronize_session=False) | |||||
| db.session.query(DatasetDocument).filter(DatasetDocument.id == dataset_document.id).update( | |||||
| {"indexing_status": "error", "error": str(e)}, synchronize_session=False | |||||
| ) | |||||
| db.session.commit() | db.session.commit() | ||||
| end_at = time.perf_counter() | end_at = time.perf_counter() | ||||
| logging.info( | logging.info( | ||||
| click.style('Deal dataset vector index: {} latency: {}'.format(dataset_id, end_at - start_at), fg='green')) | |||||
| click.style("Deal dataset vector index: {} latency: {}".format(dataset_id, end_at - start_at), fg="green") | |||||
| ) | |||||
| except Exception: | except Exception: | ||||
| logging.exception("Deal dataset vector index failed") | logging.exception("Deal dataset vector index failed") |
| from models.dataset import Dataset, Document | from models.dataset import Dataset, Document | ||||
| @shared_task(queue='dataset') | |||||
| @shared_task(queue="dataset") | |||||
| def delete_segment_from_index_task(segment_id: str, index_node_id: str, dataset_id: str, document_id: str): | def delete_segment_from_index_task(segment_id: str, index_node_id: str, dataset_id: str, document_id: str): | ||||
| """ | """ | ||||
| Async Remove segment from index | Async Remove segment from index | ||||
| Usage: delete_segment_from_index_task.delay(segment_id) | Usage: delete_segment_from_index_task.delay(segment_id) | ||||
| """ | """ | ||||
| logging.info(click.style('Start delete segment from index: {}'.format(segment_id), fg='green')) | |||||
| logging.info(click.style("Start delete segment from index: {}".format(segment_id), fg="green")) | |||||
| start_at = time.perf_counter() | start_at = time.perf_counter() | ||||
| indexing_cache_key = 'segment_{}_delete_indexing'.format(segment_id) | |||||
| indexing_cache_key = "segment_{}_delete_indexing".format(segment_id) | |||||
| try: | try: | ||||
| dataset = db.session.query(Dataset).filter(Dataset.id == dataset_id).first() | dataset = db.session.query(Dataset).filter(Dataset.id == dataset_id).first() | ||||
| if not dataset: | if not dataset: | ||||
| logging.info(click.style('Segment {} has no dataset, pass.'.format(segment_id), fg='cyan')) | |||||
| logging.info(click.style("Segment {} has no dataset, pass.".format(segment_id), fg="cyan")) | |||||
| return | return | ||||
| dataset_document = db.session.query(Document).filter(Document.id == document_id).first() | dataset_document = db.session.query(Document).filter(Document.id == document_id).first() | ||||
| if not dataset_document: | if not dataset_document: | ||||
| logging.info(click.style('Segment {} has no document, pass.'.format(segment_id), fg='cyan')) | |||||
| logging.info(click.style("Segment {} has no document, pass.".format(segment_id), fg="cyan")) | |||||
| return | return | ||||
| if not dataset_document.enabled or dataset_document.archived or dataset_document.indexing_status != 'completed': | |||||
| logging.info(click.style('Segment {} document status is invalid, pass.'.format(segment_id), fg='cyan')) | |||||
| if not dataset_document.enabled or dataset_document.archived or dataset_document.indexing_status != "completed": | |||||
| logging.info(click.style("Segment {} document status is invalid, pass.".format(segment_id), fg="cyan")) | |||||
| return | return | ||||
| index_type = dataset_document.doc_form | index_type = dataset_document.doc_form | ||||
| index_processor.clean(dataset, [index_node_id]) | index_processor.clean(dataset, [index_node_id]) | ||||
| end_at = time.perf_counter() | end_at = time.perf_counter() | ||||
| logging.info(click.style('Segment deleted from index: {} latency: {}'.format(segment_id, end_at - start_at), fg='green')) | |||||
| logging.info( | |||||
| click.style("Segment deleted from index: {} latency: {}".format(segment_id, end_at - start_at), fg="green") | |||||
| ) | |||||
| except Exception: | except Exception: | ||||
| logging.exception("delete segment from index failed") | logging.exception("delete segment from index failed") | ||||
| finally: | finally: |
| from models.dataset import DocumentSegment | from models.dataset import DocumentSegment | ||||
| @shared_task(queue='dataset') | |||||
| @shared_task(queue="dataset") | |||||
| def disable_segment_from_index_task(segment_id: str): | def disable_segment_from_index_task(segment_id: str): | ||||
| """ | """ | ||||
| Async disable segment from index | Async disable segment from index | ||||
| Usage: disable_segment_from_index_task.delay(segment_id) | Usage: disable_segment_from_index_task.delay(segment_id) | ||||
| """ | """ | ||||
| logging.info(click.style('Start disable segment from index: {}'.format(segment_id), fg='green')) | |||||
| logging.info(click.style("Start disable segment from index: {}".format(segment_id), fg="green")) | |||||
| start_at = time.perf_counter() | start_at = time.perf_counter() | ||||
| segment = db.session.query(DocumentSegment).filter(DocumentSegment.id == segment_id).first() | segment = db.session.query(DocumentSegment).filter(DocumentSegment.id == segment_id).first() | ||||
| if not segment: | if not segment: | ||||
| raise NotFound('Segment not found') | |||||
| raise NotFound("Segment not found") | |||||
| if segment.status != 'completed': | |||||
| raise NotFound('Segment is not completed , disable action is not allowed.') | |||||
| if segment.status != "completed": | |||||
| raise NotFound("Segment is not completed , disable action is not allowed.") | |||||
| indexing_cache_key = 'segment_{}_indexing'.format(segment.id) | |||||
| indexing_cache_key = "segment_{}_indexing".format(segment.id) | |||||
| try: | try: | ||||
| dataset = segment.dataset | dataset = segment.dataset | ||||
| if not dataset: | if not dataset: | ||||
| logging.info(click.style('Segment {} has no dataset, pass.'.format(segment.id), fg='cyan')) | |||||
| logging.info(click.style("Segment {} has no dataset, pass.".format(segment.id), fg="cyan")) | |||||
| return | return | ||||
| dataset_document = segment.document | dataset_document = segment.document | ||||
| if not dataset_document: | if not dataset_document: | ||||
| logging.info(click.style('Segment {} has no document, pass.'.format(segment.id), fg='cyan')) | |||||
| logging.info(click.style("Segment {} has no document, pass.".format(segment.id), fg="cyan")) | |||||
| return | return | ||||
| if not dataset_document.enabled or dataset_document.archived or dataset_document.indexing_status != 'completed': | |||||
| logging.info(click.style('Segment {} document status is invalid, pass.'.format(segment.id), fg='cyan')) | |||||
| if not dataset_document.enabled or dataset_document.archived or dataset_document.indexing_status != "completed": | |||||
| logging.info(click.style("Segment {} document status is invalid, pass.".format(segment.id), fg="cyan")) | |||||
| return | return | ||||
| index_type = dataset_document.doc_form | index_type = dataset_document.doc_form | ||||
| index_processor.clean(dataset, [segment.index_node_id]) | index_processor.clean(dataset, [segment.index_node_id]) | ||||
| end_at = time.perf_counter() | end_at = time.perf_counter() | ||||
| logging.info(click.style('Segment removed from index: {} latency: {}'.format(segment.id, end_at - start_at), fg='green')) | |||||
| logging.info( | |||||
| click.style("Segment removed from index: {} latency: {}".format(segment.id, end_at - start_at), fg="green") | |||||
| ) | |||||
| except Exception: | except Exception: | ||||
| logging.exception("remove segment from index failed") | logging.exception("remove segment from index failed") | ||||
| segment.enabled = True | segment.enabled = True |
| from models.source import DataSourceOauthBinding | from models.source import DataSourceOauthBinding | ||||
| @shared_task(queue='dataset') | |||||
| @shared_task(queue="dataset") | |||||
| def document_indexing_sync_task(dataset_id: str, document_id: str): | def document_indexing_sync_task(dataset_id: str, document_id: str): | ||||
| """ | """ | ||||
| Async update document | Async update document | ||||
| Usage: document_indexing_sync_task.delay(dataset_id, document_id) | Usage: document_indexing_sync_task.delay(dataset_id, document_id) | ||||
| """ | """ | ||||
| logging.info(click.style('Start sync document: {}'.format(document_id), fg='green')) | |||||
| logging.info(click.style("Start sync document: {}".format(document_id), fg="green")) | |||||
| start_at = time.perf_counter() | start_at = time.perf_counter() | ||||
| document = db.session.query(Document).filter( | |||||
| Document.id == document_id, | |||||
| Document.dataset_id == dataset_id | |||||
| ).first() | |||||
| document = db.session.query(Document).filter(Document.id == document_id, Document.dataset_id == dataset_id).first() | |||||
| if not document: | if not document: | ||||
| raise NotFound('Document not found') | |||||
| raise NotFound("Document not found") | |||||
| data_source_info = document.data_source_info_dict | data_source_info = document.data_source_info_dict | ||||
| if document.data_source_type == 'notion_import': | |||||
| if not data_source_info or 'notion_page_id' not in data_source_info \ | |||||
| or 'notion_workspace_id' not in data_source_info: | |||||
| if document.data_source_type == "notion_import": | |||||
| if ( | |||||
| not data_source_info | |||||
| or "notion_page_id" not in data_source_info | |||||
| or "notion_workspace_id" not in data_source_info | |||||
| ): | |||||
| raise ValueError("no notion page found") | raise ValueError("no notion page found") | ||||
| workspace_id = data_source_info['notion_workspace_id'] | |||||
| page_id = data_source_info['notion_page_id'] | |||||
| page_type = data_source_info['type'] | |||||
| page_edited_time = data_source_info['last_edited_time'] | |||||
| workspace_id = data_source_info["notion_workspace_id"] | |||||
| page_id = data_source_info["notion_page_id"] | |||||
| page_type = data_source_info["type"] | |||||
| page_edited_time = data_source_info["last_edited_time"] | |||||
| data_source_binding = DataSourceOauthBinding.query.filter( | data_source_binding = DataSourceOauthBinding.query.filter( | ||||
| db.and_( | db.and_( | ||||
| DataSourceOauthBinding.tenant_id == document.tenant_id, | DataSourceOauthBinding.tenant_id == document.tenant_id, | ||||
| DataSourceOauthBinding.provider == 'notion', | |||||
| DataSourceOauthBinding.provider == "notion", | |||||
| DataSourceOauthBinding.disabled == False, | DataSourceOauthBinding.disabled == False, | ||||
| DataSourceOauthBinding.source_info['workspace_id'] == f'"{workspace_id}"' | |||||
| DataSourceOauthBinding.source_info["workspace_id"] == f'"{workspace_id}"', | |||||
| ) | ) | ||||
| ).first() | ).first() | ||||
| if not data_source_binding: | if not data_source_binding: | ||||
| raise ValueError('Data source binding not found.') | |||||
| raise ValueError("Data source binding not found.") | |||||
| loader = NotionExtractor( | loader = NotionExtractor( | ||||
| notion_workspace_id=workspace_id, | notion_workspace_id=workspace_id, | ||||
| notion_obj_id=page_id, | notion_obj_id=page_id, | ||||
| notion_page_type=page_type, | notion_page_type=page_type, | ||||
| notion_access_token=data_source_binding.access_token, | notion_access_token=data_source_binding.access_token, | ||||
| tenant_id=document.tenant_id | |||||
| tenant_id=document.tenant_id, | |||||
| ) | ) | ||||
| last_edited_time = loader.get_notion_last_edited_time() | last_edited_time = loader.get_notion_last_edited_time() | ||||
| # check the page is updated | # check the page is updated | ||||
| if last_edited_time != page_edited_time: | if last_edited_time != page_edited_time: | ||||
| document.indexing_status = 'parsing' | |||||
| document.indexing_status = "parsing" | |||||
| document.processing_started_at = datetime.datetime.now(datetime.timezone.utc).replace(tzinfo=None) | document.processing_started_at = datetime.datetime.now(datetime.timezone.utc).replace(tzinfo=None) | ||||
| db.session.commit() | db.session.commit() | ||||
| try: | try: | ||||
| dataset = db.session.query(Dataset).filter(Dataset.id == dataset_id).first() | dataset = db.session.query(Dataset).filter(Dataset.id == dataset_id).first() | ||||
| if not dataset: | if not dataset: | ||||
| raise Exception('Dataset not found') | |||||
| raise Exception("Dataset not found") | |||||
| index_type = document.doc_form | index_type = document.doc_form | ||||
| index_processor = IndexProcessorFactory(index_type).init_index_processor() | index_processor = IndexProcessorFactory(index_type).init_index_processor() | ||||
| end_at = time.perf_counter() | end_at = time.perf_counter() | ||||
| logging.info( | logging.info( | ||||
| click.style('Cleaned document when document update data source or process rule: {} latency: {}'.format(document_id, end_at - start_at), fg='green')) | |||||
| click.style( | |||||
| "Cleaned document when document update data source or process rule: {} latency: {}".format( | |||||
| document_id, end_at - start_at | |||||
| ), | |||||
| fg="green", | |||||
| ) | |||||
| ) | |||||
| except Exception: | except Exception: | ||||
| logging.exception("Cleaned document when document update data source or process rule failed") | logging.exception("Cleaned document when document update data source or process rule failed") | ||||
| indexing_runner = IndexingRunner() | indexing_runner = IndexingRunner() | ||||
| indexing_runner.run([document]) | indexing_runner.run([document]) | ||||
| end_at = time.perf_counter() | end_at = time.perf_counter() | ||||
| logging.info(click.style('update document: {} latency: {}'.format(document.id, end_at - start_at), fg='green')) | |||||
| logging.info( | |||||
| click.style("update document: {} latency: {}".format(document.id, end_at - start_at), fg="green") | |||||
| ) | |||||
| except DocumentIsPausedException as ex: | except DocumentIsPausedException as ex: | ||||
| logging.info(click.style(str(ex), fg='yellow')) | |||||
| logging.info(click.style(str(ex), fg="yellow")) | |||||
| except Exception: | except Exception: | ||||
| pass | pass |
| from services.feature_service import FeatureService | from services.feature_service import FeatureService | ||||
| @shared_task(queue='dataset') | |||||
| @shared_task(queue="dataset") | |||||
| def document_indexing_task(dataset_id: str, document_ids: list): | def document_indexing_task(dataset_id: str, document_ids: list): | ||||
| """ | """ | ||||
| Async process document | Async process document | ||||
| if count > batch_upload_limit: | if count > batch_upload_limit: | ||||
| raise ValueError(f"You have reached the batch upload limit of {batch_upload_limit}.") | raise ValueError(f"You have reached the batch upload limit of {batch_upload_limit}.") | ||||
| if 0 < vector_space.limit <= vector_space.size: | if 0 < vector_space.limit <= vector_space.size: | ||||
| raise ValueError("Your total number of documents plus the number of uploads have over the limit of " | |||||
| "your subscription.") | |||||
| raise ValueError( | |||||
| "Your total number of documents plus the number of uploads have over the limit of " | |||||
| "your subscription." | |||||
| ) | |||||
| except Exception as e: | except Exception as e: | ||||
| for document_id in document_ids: | for document_id in document_ids: | ||||
| document = db.session.query(Document).filter( | |||||
| Document.id == document_id, | |||||
| Document.dataset_id == dataset_id | |||||
| ).first() | |||||
| document = ( | |||||
| db.session.query(Document).filter(Document.id == document_id, Document.dataset_id == dataset_id).first() | |||||
| ) | |||||
| if document: | if document: | ||||
| document.indexing_status = 'error' | |||||
| document.indexing_status = "error" | |||||
| document.error = str(e) | document.error = str(e) | ||||
| document.stopped_at = datetime.datetime.now(datetime.timezone.utc).replace(tzinfo=None) | document.stopped_at = datetime.datetime.now(datetime.timezone.utc).replace(tzinfo=None) | ||||
| db.session.add(document) | db.session.add(document) | ||||
| return | return | ||||
| for document_id in document_ids: | for document_id in document_ids: | ||||
| logging.info(click.style('Start process document: {}'.format(document_id), fg='green')) | |||||
| logging.info(click.style("Start process document: {}".format(document_id), fg="green")) | |||||
| document = db.session.query(Document).filter( | |||||
| Document.id == document_id, | |||||
| Document.dataset_id == dataset_id | |||||
| ).first() | |||||
| document = ( | |||||
| db.session.query(Document).filter(Document.id == document_id, Document.dataset_id == dataset_id).first() | |||||
| ) | |||||
| if document: | if document: | ||||
| document.indexing_status = 'parsing' | |||||
| document.indexing_status = "parsing" | |||||
| document.processing_started_at = datetime.datetime.now(datetime.timezone.utc).replace(tzinfo=None) | document.processing_started_at = datetime.datetime.now(datetime.timezone.utc).replace(tzinfo=None) | ||||
| documents.append(document) | documents.append(document) | ||||
| db.session.add(document) | db.session.add(document) | ||||
| indexing_runner = IndexingRunner() | indexing_runner = IndexingRunner() | ||||
| indexing_runner.run(documents) | indexing_runner.run(documents) | ||||
| end_at = time.perf_counter() | end_at = time.perf_counter() | ||||
| logging.info(click.style('Processed dataset: {} latency: {}'.format(dataset_id, end_at - start_at), fg='green')) | |||||
| logging.info(click.style("Processed dataset: {} latency: {}".format(dataset_id, end_at - start_at), fg="green")) | |||||
| except DocumentIsPausedException as ex: | except DocumentIsPausedException as ex: | ||||
| logging.info(click.style(str(ex), fg='yellow')) | |||||
| logging.info(click.style(str(ex), fg="yellow")) | |||||
| except Exception: | except Exception: | ||||
| pass | pass |
| from models.dataset import Dataset, Document, DocumentSegment | from models.dataset import Dataset, Document, DocumentSegment | ||||
| @shared_task(queue='dataset') | |||||
| @shared_task(queue="dataset") | |||||
| def document_indexing_update_task(dataset_id: str, document_id: str): | def document_indexing_update_task(dataset_id: str, document_id: str): | ||||
| """ | """ | ||||
| Async update document | Async update document | ||||
| Usage: document_indexing_update_task.delay(dataset_id, document_id) | Usage: document_indexing_update_task.delay(dataset_id, document_id) | ||||
| """ | """ | ||||
| logging.info(click.style('Start update document: {}'.format(document_id), fg='green')) | |||||
| logging.info(click.style("Start update document: {}".format(document_id), fg="green")) | |||||
| start_at = time.perf_counter() | start_at = time.perf_counter() | ||||
| document = db.session.query(Document).filter( | |||||
| Document.id == document_id, | |||||
| Document.dataset_id == dataset_id | |||||
| ).first() | |||||
| document = db.session.query(Document).filter(Document.id == document_id, Document.dataset_id == dataset_id).first() | |||||
| if not document: | if not document: | ||||
| raise NotFound('Document not found') | |||||
| raise NotFound("Document not found") | |||||
| document.indexing_status = 'parsing' | |||||
| document.indexing_status = "parsing" | |||||
| document.processing_started_at = datetime.datetime.now(datetime.timezone.utc).replace(tzinfo=None) | document.processing_started_at = datetime.datetime.now(datetime.timezone.utc).replace(tzinfo=None) | ||||
| db.session.commit() | db.session.commit() | ||||
| try: | try: | ||||
| dataset = db.session.query(Dataset).filter(Dataset.id == dataset_id).first() | dataset = db.session.query(Dataset).filter(Dataset.id == dataset_id).first() | ||||
| if not dataset: | if not dataset: | ||||
| raise Exception('Dataset not found') | |||||
| raise Exception("Dataset not found") | |||||
| index_type = document.doc_form | index_type = document.doc_form | ||||
| index_processor = IndexProcessorFactory(index_type).init_index_processor() | index_processor = IndexProcessorFactory(index_type).init_index_processor() | ||||
| db.session.commit() | db.session.commit() | ||||
| end_at = time.perf_counter() | end_at = time.perf_counter() | ||||
| logging.info( | logging.info( | ||||
| click.style('Cleaned document when document update data source or process rule: {} latency: {}'.format(document_id, end_at - start_at), fg='green')) | |||||
| click.style( | |||||
| "Cleaned document when document update data source or process rule: {} latency: {}".format( | |||||
| document_id, end_at - start_at | |||||
| ), | |||||
| fg="green", | |||||
| ) | |||||
| ) | |||||
| except Exception: | except Exception: | ||||
| logging.exception("Cleaned document when document update data source or process rule failed") | logging.exception("Cleaned document when document update data source or process rule failed") | ||||
| indexing_runner = IndexingRunner() | indexing_runner = IndexingRunner() | ||||
| indexing_runner.run([document]) | indexing_runner.run([document]) | ||||
| end_at = time.perf_counter() | end_at = time.perf_counter() | ||||
| logging.info(click.style('update document: {} latency: {}'.format(document.id, end_at - start_at), fg='green')) | |||||
| logging.info(click.style("update document: {} latency: {}".format(document.id, end_at - start_at), fg="green")) | |||||
| except DocumentIsPausedException as ex: | except DocumentIsPausedException as ex: | ||||
| logging.info(click.style(str(ex), fg='yellow')) | |||||
| logging.info(click.style(str(ex), fg="yellow")) | |||||
| except Exception: | except Exception: | ||||
| pass | pass |
| from services.feature_service import FeatureService | from services.feature_service import FeatureService | ||||
| @shared_task(queue='dataset') | |||||
| @shared_task(queue="dataset") | |||||
| def duplicate_document_indexing_task(dataset_id: str, document_ids: list): | def duplicate_document_indexing_task(dataset_id: str, document_ids: list): | ||||
| """ | """ | ||||
| Async process document | Async process document | ||||
| if count > batch_upload_limit: | if count > batch_upload_limit: | ||||
| raise ValueError(f"You have reached the batch upload limit of {batch_upload_limit}.") | raise ValueError(f"You have reached the batch upload limit of {batch_upload_limit}.") | ||||
| if 0 < vector_space.limit <= vector_space.size: | if 0 < vector_space.limit <= vector_space.size: | ||||
| raise ValueError("Your total number of documents plus the number of uploads have over the limit of " | |||||
| "your subscription.") | |||||
| raise ValueError( | |||||
| "Your total number of documents plus the number of uploads have over the limit of " | |||||
| "your subscription." | |||||
| ) | |||||
| except Exception as e: | except Exception as e: | ||||
| for document_id in document_ids: | for document_id in document_ids: | ||||
| document = db.session.query(Document).filter( | |||||
| Document.id == document_id, | |||||
| Document.dataset_id == dataset_id | |||||
| ).first() | |||||
| document = ( | |||||
| db.session.query(Document).filter(Document.id == document_id, Document.dataset_id == dataset_id).first() | |||||
| ) | |||||
| if document: | if document: | ||||
| document.indexing_status = 'error' | |||||
| document.indexing_status = "error" | |||||
| document.error = str(e) | document.error = str(e) | ||||
| document.stopped_at = datetime.datetime.utcnow() | document.stopped_at = datetime.datetime.utcnow() | ||||
| db.session.add(document) | db.session.add(document) | ||||
| return | return | ||||
| for document_id in document_ids: | for document_id in document_ids: | ||||
| logging.info(click.style('Start process document: {}'.format(document_id), fg='green')) | |||||
| logging.info(click.style("Start process document: {}".format(document_id), fg="green")) | |||||
| document = db.session.query(Document).filter( | |||||
| Document.id == document_id, | |||||
| Document.dataset_id == dataset_id | |||||
| ).first() | |||||
| document = ( | |||||
| db.session.query(Document).filter(Document.id == document_id, Document.dataset_id == dataset_id).first() | |||||
| ) | |||||
| if document: | if document: | ||||
| # clean old data | # clean old data | ||||
| db.session.delete(segment) | db.session.delete(segment) | ||||
| db.session.commit() | db.session.commit() | ||||
| document.indexing_status = 'parsing' | |||||
| document.indexing_status = "parsing" | |||||
| document.processing_started_at = datetime.datetime.utcnow() | document.processing_started_at = datetime.datetime.utcnow() | ||||
| documents.append(document) | documents.append(document) | ||||
| db.session.add(document) | db.session.add(document) | ||||
| indexing_runner = IndexingRunner() | indexing_runner = IndexingRunner() | ||||
| indexing_runner.run(documents) | indexing_runner.run(documents) | ||||
| end_at = time.perf_counter() | end_at = time.perf_counter() | ||||
| logging.info(click.style('Processed dataset: {} latency: {}'.format(dataset_id, end_at - start_at), fg='green')) | |||||
| logging.info(click.style("Processed dataset: {} latency: {}".format(dataset_id, end_at - start_at), fg="green")) | |||||
| except DocumentIsPausedException as ex: | except DocumentIsPausedException as ex: | ||||
| logging.info(click.style(str(ex), fg='yellow')) | |||||
| logging.info(click.style(str(ex), fg="yellow")) | |||||
| except Exception: | except Exception: | ||||
| pass | pass |
| from models.dataset import DocumentSegment | from models.dataset import DocumentSegment | ||||
| @shared_task(queue='dataset') | |||||
| @shared_task(queue="dataset") | |||||
| def enable_segment_to_index_task(segment_id: str): | def enable_segment_to_index_task(segment_id: str): | ||||
| """ | """ | ||||
| Async enable segment to index | Async enable segment to index | ||||
| Usage: enable_segment_to_index_task.delay(segment_id) | Usage: enable_segment_to_index_task.delay(segment_id) | ||||
| """ | """ | ||||
| logging.info(click.style('Start enable segment to index: {}'.format(segment_id), fg='green')) | |||||
| logging.info(click.style("Start enable segment to index: {}".format(segment_id), fg="green")) | |||||
| start_at = time.perf_counter() | start_at = time.perf_counter() | ||||
| segment = db.session.query(DocumentSegment).filter(DocumentSegment.id == segment_id).first() | segment = db.session.query(DocumentSegment).filter(DocumentSegment.id == segment_id).first() | ||||
| if not segment: | if not segment: | ||||
| raise NotFound('Segment not found') | |||||
| raise NotFound("Segment not found") | |||||
| if segment.status != 'completed': | |||||
| raise NotFound('Segment is not completed, enable action is not allowed.') | |||||
| if segment.status != "completed": | |||||
| raise NotFound("Segment is not completed, enable action is not allowed.") | |||||
| indexing_cache_key = 'segment_{}_indexing'.format(segment.id) | |||||
| indexing_cache_key = "segment_{}_indexing".format(segment.id) | |||||
| try: | try: | ||||
| document = Document( | document = Document( | ||||
| "doc_hash": segment.index_node_hash, | "doc_hash": segment.index_node_hash, | ||||
| "document_id": segment.document_id, | "document_id": segment.document_id, | ||||
| "dataset_id": segment.dataset_id, | "dataset_id": segment.dataset_id, | ||||
| } | |||||
| }, | |||||
| ) | ) | ||||
| dataset = segment.dataset | dataset = segment.dataset | ||||
| if not dataset: | if not dataset: | ||||
| logging.info(click.style('Segment {} has no dataset, pass.'.format(segment.id), fg='cyan')) | |||||
| logging.info(click.style("Segment {} has no dataset, pass.".format(segment.id), fg="cyan")) | |||||
| return | return | ||||
| dataset_document = segment.document | dataset_document = segment.document | ||||
| if not dataset_document: | if not dataset_document: | ||||
| logging.info(click.style('Segment {} has no document, pass.'.format(segment.id), fg='cyan')) | |||||
| logging.info(click.style("Segment {} has no document, pass.".format(segment.id), fg="cyan")) | |||||
| return | return | ||||
| if not dataset_document.enabled or dataset_document.archived or dataset_document.indexing_status != 'completed': | |||||
| logging.info(click.style('Segment {} document status is invalid, pass.'.format(segment.id), fg='cyan')) | |||||
| if not dataset_document.enabled or dataset_document.archived or dataset_document.indexing_status != "completed": | |||||
| logging.info(click.style("Segment {} document status is invalid, pass.".format(segment.id), fg="cyan")) | |||||
| return | return | ||||
| index_processor = IndexProcessorFactory(dataset_document.doc_form).init_index_processor() | index_processor = IndexProcessorFactory(dataset_document.doc_form).init_index_processor() | ||||
| index_processor.load(dataset, [document]) | index_processor.load(dataset, [document]) | ||||
| end_at = time.perf_counter() | end_at = time.perf_counter() | ||||
| logging.info(click.style('Segment enabled to index: {} latency: {}'.format(segment.id, end_at - start_at), fg='green')) | |||||
| logging.info( | |||||
| click.style("Segment enabled to index: {} latency: {}".format(segment.id, end_at - start_at), fg="green") | |||||
| ) | |||||
| except Exception as e: | except Exception as e: | ||||
| logging.exception("enable segment to index failed") | logging.exception("enable segment to index failed") | ||||
| segment.enabled = False | segment.enabled = False | ||||
| segment.disabled_at = datetime.datetime.now(datetime.timezone.utc).replace(tzinfo=None) | segment.disabled_at = datetime.datetime.now(datetime.timezone.utc).replace(tzinfo=None) | ||||
| segment.status = 'error' | |||||
| segment.status = "error" | |||||
| segment.error = str(e) | segment.error = str(e) | ||||
| db.session.commit() | db.session.commit() | ||||
| finally: | finally: |
| from extensions.ext_mail import mail | from extensions.ext_mail import mail | ||||
| @shared_task(queue='mail') | |||||
| @shared_task(queue="mail") | |||||
| def send_invite_member_mail_task(language: str, to: str, token: str, inviter_name: str, workspace_name: str): | def send_invite_member_mail_task(language: str, to: str, token: str, inviter_name: str, workspace_name: str): | ||||
| """ | """ | ||||
| Async Send invite member mail | Async Send invite member mail | ||||
| if not mail.is_inited(): | if not mail.is_inited(): | ||||
| return | return | ||||
| logging.info(click.style('Start send invite member mail to {} in workspace {}'.format(to, workspace_name), | |||||
| fg='green')) | |||||
| logging.info( | |||||
| click.style("Start send invite member mail to {} in workspace {}".format(to, workspace_name), fg="green") | |||||
| ) | |||||
| start_at = time.perf_counter() | start_at = time.perf_counter() | ||||
| # send invite member mail using different languages | # send invite member mail using different languages | ||||
| try: | try: | ||||
| url = f'{dify_config.CONSOLE_WEB_URL}/activate?token={token}' | |||||
| if language == 'zh-Hans': | |||||
| html_content = render_template('invite_member_mail_template_zh-CN.html', | |||||
| to=to, | |||||
| inviter_name=inviter_name, | |||||
| workspace_name=workspace_name, | |||||
| url=url) | |||||
| url = f"{dify_config.CONSOLE_WEB_URL}/activate?token={token}" | |||||
| if language == "zh-Hans": | |||||
| html_content = render_template( | |||||
| "invite_member_mail_template_zh-CN.html", | |||||
| to=to, | |||||
| inviter_name=inviter_name, | |||||
| workspace_name=workspace_name, | |||||
| url=url, | |||||
| ) | |||||
| mail.send(to=to, subject="立即加入 Dify 工作空间", html=html_content) | mail.send(to=to, subject="立即加入 Dify 工作空间", html=html_content) | ||||
| else: | else: | ||||
| html_content = render_template('invite_member_mail_template_en-US.html', | |||||
| to=to, | |||||
| inviter_name=inviter_name, | |||||
| workspace_name=workspace_name, | |||||
| url=url) | |||||
| html_content = render_template( | |||||
| "invite_member_mail_template_en-US.html", | |||||
| to=to, | |||||
| inviter_name=inviter_name, | |||||
| workspace_name=workspace_name, | |||||
| url=url, | |||||
| ) | |||||
| mail.send(to=to, subject="Join Dify Workspace Now", html=html_content) | mail.send(to=to, subject="Join Dify Workspace Now", html=html_content) | ||||
| end_at = time.perf_counter() | end_at = time.perf_counter() | ||||
| logging.info( | logging.info( | ||||
| click.style('Send invite member mail to {} succeeded: latency: {}'.format(to, end_at - start_at), | |||||
| fg='green')) | |||||
| click.style( | |||||
| "Send invite member mail to {} succeeded: latency: {}".format(to, end_at - start_at), fg="green" | |||||
| ) | |||||
| ) | |||||
| except Exception: | except Exception: | ||||
| logging.exception("Send invite member mail to {} failed".format(to)) | |||||
| logging.exception("Send invite member mail to {} failed".format(to)) |
| from extensions.ext_mail import mail | from extensions.ext_mail import mail | ||||
| @shared_task(queue='mail') | |||||
| @shared_task(queue="mail") | |||||
| def send_reset_password_mail_task(language: str, to: str, token: str): | def send_reset_password_mail_task(language: str, to: str, token: str): | ||||
| """ | """ | ||||
| Async Send reset password mail | Async Send reset password mail | ||||
| if not mail.is_inited(): | if not mail.is_inited(): | ||||
| return | return | ||||
| logging.info(click.style('Start password reset mail to {}'.format(to), fg='green')) | |||||
| logging.info(click.style("Start password reset mail to {}".format(to), fg="green")) | |||||
| start_at = time.perf_counter() | start_at = time.perf_counter() | ||||
| # send reset password mail using different languages | # send reset password mail using different languages | ||||
| try: | try: | ||||
| url = f'{dify_config.CONSOLE_WEB_URL}/forgot-password?token={token}' | |||||
| if language == 'zh-Hans': | |||||
| html_content = render_template('reset_password_mail_template_zh-CN.html', | |||||
| to=to, | |||||
| url=url) | |||||
| url = f"{dify_config.CONSOLE_WEB_URL}/forgot-password?token={token}" | |||||
| if language == "zh-Hans": | |||||
| html_content = render_template("reset_password_mail_template_zh-CN.html", to=to, url=url) | |||||
| mail.send(to=to, subject="重置您的 Dify 密码", html=html_content) | mail.send(to=to, subject="重置您的 Dify 密码", html=html_content) | ||||
| else: | else: | ||||
| html_content = render_template('reset_password_mail_template_en-US.html', | |||||
| to=to, | |||||
| url=url) | |||||
| html_content = render_template("reset_password_mail_template_en-US.html", to=to, url=url) | |||||
| mail.send(to=to, subject="Reset Your Dify Password", html=html_content) | mail.send(to=to, subject="Reset Your Dify Password", html=html_content) | ||||
| end_at = time.perf_counter() | end_at = time.perf_counter() | ||||
| logging.info( | logging.info( | ||||
| click.style('Send password reset mail to {} succeeded: latency: {}'.format(to, end_at - start_at), | |||||
| fg='green')) | |||||
| click.style( | |||||
| "Send password reset mail to {} succeeded: latency: {}".format(to, end_at - start_at), fg="green" | |||||
| ) | |||||
| ) | |||||
| except Exception: | except Exception: | ||||
| logging.exception("Send password reset mail to {} failed".format(to)) | logging.exception("Send password reset mail to {} failed".format(to)) |
| from models.workflow import WorkflowRun | from models.workflow import WorkflowRun | ||||
| @shared_task(queue='ops_trace') | |||||
| @shared_task(queue="ops_trace") | |||||
| def process_trace_tasks(tasks_data): | def process_trace_tasks(tasks_data): | ||||
| """ | """ | ||||
| Async process trace tasks | Async process trace tasks | ||||
| """ | """ | ||||
| from core.ops.ops_trace_manager import OpsTraceManager | from core.ops.ops_trace_manager import OpsTraceManager | ||||
| trace_info = tasks_data.get('trace_info') | |||||
| app_id = tasks_data.get('app_id') | |||||
| trace_info_type = tasks_data.get('trace_info_type') | |||||
| trace_info = tasks_data.get("trace_info") | |||||
| app_id = tasks_data.get("app_id") | |||||
| trace_info_type = tasks_data.get("trace_info_type") | |||||
| trace_instance = OpsTraceManager.get_ops_trace_instance(app_id) | trace_instance = OpsTraceManager.get_ops_trace_instance(app_id) | ||||
| if trace_info.get('message_data'): | |||||
| trace_info['message_data'] = Message.from_dict(data=trace_info['message_data']) | |||||
| if trace_info.get('workflow_data'): | |||||
| trace_info['workflow_data'] = WorkflowRun.from_dict(data=trace_info['workflow_data']) | |||||
| if trace_info.get('documents'): | |||||
| trace_info['documents'] = [Document(**doc) for doc in trace_info['documents']] | |||||
| if trace_info.get("message_data"): | |||||
| trace_info["message_data"] = Message.from_dict(data=trace_info["message_data"]) | |||||
| if trace_info.get("workflow_data"): | |||||
| trace_info["workflow_data"] = WorkflowRun.from_dict(data=trace_info["workflow_data"]) | |||||
| if trace_info.get("documents"): | |||||
| trace_info["documents"] = [Document(**doc) for doc in trace_info["documents"]] | |||||
| try: | try: | ||||
| if trace_instance: | if trace_instance: |
| from models.dataset import Document | from models.dataset import Document | ||||
| @shared_task(queue='dataset') | |||||
| @shared_task(queue="dataset") | |||||
| def recover_document_indexing_task(dataset_id: str, document_id: str): | def recover_document_indexing_task(dataset_id: str, document_id: str): | ||||
| """ | """ | ||||
| Async recover document | Async recover document | ||||
| Usage: recover_document_indexing_task.delay(dataset_id, document_id) | Usage: recover_document_indexing_task.delay(dataset_id, document_id) | ||||
| """ | """ | ||||
| logging.info(click.style('Recover document: {}'.format(document_id), fg='green')) | |||||
| logging.info(click.style("Recover document: {}".format(document_id), fg="green")) | |||||
| start_at = time.perf_counter() | start_at = time.perf_counter() | ||||
| document = db.session.query(Document).filter( | |||||
| Document.id == document_id, | |||||
| Document.dataset_id == dataset_id | |||||
| ).first() | |||||
| document = db.session.query(Document).filter(Document.id == document_id, Document.dataset_id == dataset_id).first() | |||||
| if not document: | if not document: | ||||
| raise NotFound('Document not found') | |||||
| raise NotFound("Document not found") | |||||
| try: | try: | ||||
| indexing_runner = IndexingRunner() | indexing_runner = IndexingRunner() | ||||
| elif document.indexing_status == "indexing": | elif document.indexing_status == "indexing": | ||||
| indexing_runner.run_in_indexing_status(document) | indexing_runner.run_in_indexing_status(document) | ||||
| end_at = time.perf_counter() | end_at = time.perf_counter() | ||||
| logging.info(click.style('Processed document: {} latency: {}'.format(document.id, end_at - start_at), fg='green')) | |||||
| logging.info( | |||||
| click.style("Processed document: {} latency: {}".format(document.id, end_at - start_at), fg="green") | |||||
| ) | |||||
| except DocumentIsPausedException as ex: | except DocumentIsPausedException as ex: | ||||
| logging.info(click.style(str(ex), fg='yellow')) | |||||
| logging.info(click.style(str(ex), fg="yellow")) | |||||
| except Exception: | except Exception: | ||||
| pass | pass |
| from models.workflow import ConversationVariable, Workflow, WorkflowAppLog, WorkflowNodeExecution, WorkflowRun | from models.workflow import ConversationVariable, Workflow, WorkflowAppLog, WorkflowNodeExecution, WorkflowRun | ||||
| @shared_task(queue='app_deletion', bind=True, max_retries=3) | |||||
| @shared_task(queue="app_deletion", bind=True, max_retries=3) | |||||
| def remove_app_and_related_data_task(self, tenant_id: str, app_id: str): | def remove_app_and_related_data_task(self, tenant_id: str, app_id: str): | ||||
| logging.info(click.style(f'Start deleting app and related data: {tenant_id}:{app_id}', fg='green')) | |||||
| logging.info(click.style(f"Start deleting app and related data: {tenant_id}:{app_id}", fg="green")) | |||||
| start_at = time.perf_counter() | start_at = time.perf_counter() | ||||
| try: | try: | ||||
| # Delete related data | # Delete related data | ||||
| _delete_conversation_variables(app_id=app_id) | _delete_conversation_variables(app_id=app_id) | ||||
| end_at = time.perf_counter() | end_at = time.perf_counter() | ||||
| logging.info(click.style(f'App and related data deleted: {app_id} latency: {end_at - start_at}', fg='green')) | |||||
| logging.info(click.style(f"App and related data deleted: {app_id} latency: {end_at - start_at}", fg="green")) | |||||
| except SQLAlchemyError as e: | except SQLAlchemyError as e: | ||||
| logging.exception( | logging.exception( | ||||
| click.style(f"Database error occurred while deleting app {app_id} and related data", fg='red')) | |||||
| click.style(f"Database error occurred while deleting app {app_id} and related data", fg="red") | |||||
| ) | |||||
| raise self.retry(exc=e, countdown=60) # Retry after 60 seconds | raise self.retry(exc=e, countdown=60) # Retry after 60 seconds | ||||
| except Exception as e: | except Exception as e: | ||||
| logging.exception(click.style(f"Error occurred while deleting app {app_id} and related data", fg='red')) | |||||
| logging.exception(click.style(f"Error occurred while deleting app {app_id} and related data", fg="red")) | |||||
| raise self.retry(exc=e, countdown=60) # Retry after 60 seconds | raise self.retry(exc=e, countdown=60) # Retry after 60 seconds | ||||
| """select id from app_model_configs where app_id=:app_id limit 1000""", | """select id from app_model_configs where app_id=:app_id limit 1000""", | ||||
| {"app_id": app_id}, | {"app_id": app_id}, | ||||
| del_model_config, | del_model_config, | ||||
| "app model config" | |||||
| "app model config", | |||||
| ) | ) | ||||
| def del_site(site_id: str): | def del_site(site_id: str): | ||||
| db.session.query(Site).filter(Site.id == site_id).delete(synchronize_session=False) | db.session.query(Site).filter(Site.id == site_id).delete(synchronize_session=False) | ||||
| _delete_records( | |||||
| """select id from sites where app_id=:app_id limit 1000""", | |||||
| {"app_id": app_id}, | |||||
| del_site, | |||||
| "site" | |||||
| ) | |||||
| _delete_records("""select id from sites where app_id=:app_id limit 1000""", {"app_id": app_id}, del_site, "site") | |||||
| def _delete_app_api_tokens(tenant_id: str, app_id: str): | def _delete_app_api_tokens(tenant_id: str, app_id: str): | ||||
| db.session.query(ApiToken).filter(ApiToken.id == api_token_id).delete(synchronize_session=False) | db.session.query(ApiToken).filter(ApiToken.id == api_token_id).delete(synchronize_session=False) | ||||
| _delete_records( | _delete_records( | ||||
| """select id from api_tokens where app_id=:app_id limit 1000""", | |||||
| {"app_id": app_id}, | |||||
| del_api_token, | |||||
| "api token" | |||||
| """select id from api_tokens where app_id=:app_id limit 1000""", {"app_id": app_id}, del_api_token, "api token" | |||||
| ) | ) | ||||
| """select id from installed_apps where tenant_id=:tenant_id and app_id=:app_id limit 1000""", | """select id from installed_apps where tenant_id=:tenant_id and app_id=:app_id limit 1000""", | ||||
| {"tenant_id": tenant_id, "app_id": app_id}, | {"tenant_id": tenant_id, "app_id": app_id}, | ||||
| del_installed_app, | del_installed_app, | ||||
| "installed app" | |||||
| "installed app", | |||||
| ) | ) | ||||
| def _delete_recommended_apps(tenant_id: str, app_id: str): | def _delete_recommended_apps(tenant_id: str, app_id: str): | ||||
| def del_recommended_app(recommended_app_id: str): | def del_recommended_app(recommended_app_id: str): | ||||
| db.session.query(RecommendedApp).filter(RecommendedApp.id == recommended_app_id).delete( | db.session.query(RecommendedApp).filter(RecommendedApp.id == recommended_app_id).delete( | ||||
| synchronize_session=False) | |||||
| synchronize_session=False | |||||
| ) | |||||
| _delete_records( | _delete_records( | ||||
| """select id from recommended_apps where app_id=:app_id limit 1000""", | """select id from recommended_apps where app_id=:app_id limit 1000""", | ||||
| {"app_id": app_id}, | {"app_id": app_id}, | ||||
| del_recommended_app, | del_recommended_app, | ||||
| "recommended app" | |||||
| "recommended app", | |||||
| ) | ) | ||||
| def _delete_app_annotation_data(tenant_id: str, app_id: str): | def _delete_app_annotation_data(tenant_id: str, app_id: str): | ||||
| def del_annotation_hit_history(annotation_hit_history_id: str): | def del_annotation_hit_history(annotation_hit_history_id: str): | ||||
| db.session.query(AppAnnotationHitHistory).filter( | db.session.query(AppAnnotationHitHistory).filter( | ||||
| AppAnnotationHitHistory.id == annotation_hit_history_id).delete(synchronize_session=False) | |||||
| AppAnnotationHitHistory.id == annotation_hit_history_id | |||||
| ).delete(synchronize_session=False) | |||||
| _delete_records( | _delete_records( | ||||
| """select id from app_annotation_hit_histories where app_id=:app_id limit 1000""", | """select id from app_annotation_hit_histories where app_id=:app_id limit 1000""", | ||||
| {"app_id": app_id}, | {"app_id": app_id}, | ||||
| del_annotation_hit_history, | del_annotation_hit_history, | ||||
| "annotation hit history" | |||||
| "annotation hit history", | |||||
| ) | ) | ||||
| def del_annotation_setting(annotation_setting_id: str): | def del_annotation_setting(annotation_setting_id: str): | ||||
| db.session.query(AppAnnotationSetting).filter(AppAnnotationSetting.id == annotation_setting_id).delete( | db.session.query(AppAnnotationSetting).filter(AppAnnotationSetting.id == annotation_setting_id).delete( | ||||
| synchronize_session=False) | |||||
| synchronize_session=False | |||||
| ) | |||||
| _delete_records( | _delete_records( | ||||
| """select id from app_annotation_settings where app_id=:app_id limit 1000""", | """select id from app_annotation_settings where app_id=:app_id limit 1000""", | ||||
| {"app_id": app_id}, | {"app_id": app_id}, | ||||
| del_annotation_setting, | del_annotation_setting, | ||||
| "annotation setting" | |||||
| "annotation setting", | |||||
| ) | ) | ||||
| """select id from app_dataset_joins where app_id=:app_id limit 1000""", | """select id from app_dataset_joins where app_id=:app_id limit 1000""", | ||||
| {"app_id": app_id}, | {"app_id": app_id}, | ||||
| del_dataset_join, | del_dataset_join, | ||||
| "dataset join" | |||||
| "dataset join", | |||||
| ) | ) | ||||
| """select id from workflows where tenant_id=:tenant_id and app_id=:app_id limit 1000""", | """select id from workflows where tenant_id=:tenant_id and app_id=:app_id limit 1000""", | ||||
| {"tenant_id": tenant_id, "app_id": app_id}, | {"tenant_id": tenant_id, "app_id": app_id}, | ||||
| del_workflow, | del_workflow, | ||||
| "workflow" | |||||
| "workflow", | |||||
| ) | ) | ||||
| """select id from workflow_runs where tenant_id=:tenant_id and app_id=:app_id limit 1000""", | """select id from workflow_runs where tenant_id=:tenant_id and app_id=:app_id limit 1000""", | ||||
| {"tenant_id": tenant_id, "app_id": app_id}, | {"tenant_id": tenant_id, "app_id": app_id}, | ||||
| del_workflow_run, | del_workflow_run, | ||||
| "workflow run" | |||||
| "workflow run", | |||||
| ) | ) | ||||
| def _delete_app_workflow_node_executions(tenant_id: str, app_id: str): | def _delete_app_workflow_node_executions(tenant_id: str, app_id: str): | ||||
| def del_workflow_node_execution(workflow_node_execution_id: str): | def del_workflow_node_execution(workflow_node_execution_id: str): | ||||
| db.session.query(WorkflowNodeExecution).filter( | |||||
| WorkflowNodeExecution.id == workflow_node_execution_id).delete(synchronize_session=False) | |||||
| db.session.query(WorkflowNodeExecution).filter(WorkflowNodeExecution.id == workflow_node_execution_id).delete( | |||||
| synchronize_session=False | |||||
| ) | |||||
| _delete_records( | _delete_records( | ||||
| """select id from workflow_node_executions where tenant_id=:tenant_id and app_id=:app_id limit 1000""", | """select id from workflow_node_executions where tenant_id=:tenant_id and app_id=:app_id limit 1000""", | ||||
| {"tenant_id": tenant_id, "app_id": app_id}, | {"tenant_id": tenant_id, "app_id": app_id}, | ||||
| del_workflow_node_execution, | del_workflow_node_execution, | ||||
| "workflow node execution" | |||||
| "workflow node execution", | |||||
| ) | ) | ||||
| def _delete_app_workflow_app_logs(tenant_id: str, app_id: str): | def _delete_app_workflow_app_logs(tenant_id: str, app_id: str): | ||||
| def del_workflow_app_log(workflow_app_log_id: str): | def del_workflow_app_log(workflow_app_log_id: str): | ||||
| db.session.query(WorkflowAppLog).filter(WorkflowAppLog.id == workflow_app_log_id).delete(synchronize_session=False) | |||||
| db.session.query(WorkflowAppLog).filter(WorkflowAppLog.id == workflow_app_log_id).delete( | |||||
| synchronize_session=False | |||||
| ) | |||||
| _delete_records( | _delete_records( | ||||
| """select id from workflow_app_logs where tenant_id=:tenant_id and app_id=:app_id limit 1000""", | """select id from workflow_app_logs where tenant_id=:tenant_id and app_id=:app_id limit 1000""", | ||||
| {"tenant_id": tenant_id, "app_id": app_id}, | {"tenant_id": tenant_id, "app_id": app_id}, | ||||
| del_workflow_app_log, | del_workflow_app_log, | ||||
| "workflow app log" | |||||
| "workflow app log", | |||||
| ) | ) | ||||
| def _delete_app_conversations(tenant_id: str, app_id: str): | def _delete_app_conversations(tenant_id: str, app_id: str): | ||||
| def del_conversation(conversation_id: str): | def del_conversation(conversation_id: str): | ||||
| db.session.query(PinnedConversation).filter(PinnedConversation.conversation_id == conversation_id).delete( | db.session.query(PinnedConversation).filter(PinnedConversation.conversation_id == conversation_id).delete( | ||||
| synchronize_session=False) | |||||
| synchronize_session=False | |||||
| ) | |||||
| db.session.query(Conversation).filter(Conversation.id == conversation_id).delete(synchronize_session=False) | db.session.query(Conversation).filter(Conversation.id == conversation_id).delete(synchronize_session=False) | ||||
| _delete_records( | _delete_records( | ||||
| """select id from conversations where app_id=:app_id limit 1000""", | """select id from conversations where app_id=:app_id limit 1000""", | ||||
| {"app_id": app_id}, | {"app_id": app_id}, | ||||
| del_conversation, | del_conversation, | ||||
| "conversation" | |||||
| "conversation", | |||||
| ) | ) | ||||
| def _delete_conversation_variables(*, app_id: str): | def _delete_conversation_variables(*, app_id: str): | ||||
| stmt = delete(ConversationVariable).where(ConversationVariable.app_id == app_id) | stmt = delete(ConversationVariable).where(ConversationVariable.app_id == app_id) | ||||
| with db.engine.connect() as conn: | with db.engine.connect() as conn: | ||||
| conn.execute(stmt) | conn.execute(stmt) | ||||
| conn.commit() | conn.commit() | ||||
| logging.info(click.style(f"Deleted conversation variables for app {app_id}", fg='green')) | |||||
| logging.info(click.style(f"Deleted conversation variables for app {app_id}", fg="green")) | |||||
| def _delete_app_messages(tenant_id: str, app_id: str): | def _delete_app_messages(tenant_id: str, app_id: str): | ||||
| def del_message(message_id: str): | def del_message(message_id: str): | ||||
| db.session.query(MessageFeedback).filter(MessageFeedback.message_id == message_id).delete( | db.session.query(MessageFeedback).filter(MessageFeedback.message_id == message_id).delete( | ||||
| synchronize_session=False) | |||||
| synchronize_session=False | |||||
| ) | |||||
| db.session.query(MessageAnnotation).filter(MessageAnnotation.message_id == message_id).delete( | db.session.query(MessageAnnotation).filter(MessageAnnotation.message_id == message_id).delete( | ||||
| synchronize_session=False) | |||||
| db.session.query(MessageChain).filter(MessageChain.message_id == message_id).delete( | |||||
| synchronize_session=False) | |||||
| synchronize_session=False | |||||
| ) | |||||
| db.session.query(MessageChain).filter(MessageChain.message_id == message_id).delete(synchronize_session=False) | |||||
| db.session.query(MessageAgentThought).filter(MessageAgentThought.message_id == message_id).delete( | db.session.query(MessageAgentThought).filter(MessageAgentThought.message_id == message_id).delete( | ||||
| synchronize_session=False) | |||||
| synchronize_session=False | |||||
| ) | |||||
| db.session.query(MessageFile).filter(MessageFile.message_id == message_id).delete(synchronize_session=False) | db.session.query(MessageFile).filter(MessageFile.message_id == message_id).delete(synchronize_session=False) | ||||
| db.session.query(SavedMessage).filter(SavedMessage.message_id == message_id).delete( | |||||
| synchronize_session=False) | |||||
| db.session.query(SavedMessage).filter(SavedMessage.message_id == message_id).delete(synchronize_session=False) | |||||
| db.session.query(Message).filter(Message.id == message_id).delete() | db.session.query(Message).filter(Message.id == message_id).delete() | ||||
| _delete_records( | _delete_records( | ||||
| """select id from messages where app_id=:app_id limit 1000""", | |||||
| {"app_id": app_id}, | |||||
| del_message, | |||||
| "message" | |||||
| """select id from messages where app_id=:app_id limit 1000""", {"app_id": app_id}, del_message, "message" | |||||
| ) | ) | ||||
| def _delete_workflow_tool_providers(tenant_id: str, app_id: str): | def _delete_workflow_tool_providers(tenant_id: str, app_id: str): | ||||
| def del_tool_provider(tool_provider_id: str): | def del_tool_provider(tool_provider_id: str): | ||||
| db.session.query(WorkflowToolProvider).filter(WorkflowToolProvider.id == tool_provider_id).delete( | db.session.query(WorkflowToolProvider).filter(WorkflowToolProvider.id == tool_provider_id).delete( | ||||
| synchronize_session=False) | |||||
| synchronize_session=False | |||||
| ) | |||||
| _delete_records( | _delete_records( | ||||
| """select id from tool_workflow_providers where tenant_id=:tenant_id and app_id=:app_id limit 1000""", | """select id from tool_workflow_providers where tenant_id=:tenant_id and app_id=:app_id limit 1000""", | ||||
| {"tenant_id": tenant_id, "app_id": app_id}, | {"tenant_id": tenant_id, "app_id": app_id}, | ||||
| del_tool_provider, | del_tool_provider, | ||||
| "tool workflow provider" | |||||
| "tool workflow provider", | |||||
| ) | ) | ||||
| """select id from tag_bindings where tenant_id=:tenant_id and target_id=:app_id limit 1000""", | """select id from tag_bindings where tenant_id=:tenant_id and target_id=:app_id limit 1000""", | ||||
| {"tenant_id": tenant_id, "app_id": app_id}, | {"tenant_id": tenant_id, "app_id": app_id}, | ||||
| del_tag_binding, | del_tag_binding, | ||||
| "tag binding" | |||||
| "tag binding", | |||||
| ) | ) | ||||
| """select id from end_users where tenant_id=:tenant_id and app_id=:app_id limit 1000""", | """select id from end_users where tenant_id=:tenant_id and app_id=:app_id limit 1000""", | ||||
| {"tenant_id": tenant_id, "app_id": app_id}, | {"tenant_id": tenant_id, "app_id": app_id}, | ||||
| del_end_user, | del_end_user, | ||||
| "end user" | |||||
| "end user", | |||||
| ) | ) | ||||
| def _delete_trace_app_configs(tenant_id: str, app_id: str): | def _delete_trace_app_configs(tenant_id: str, app_id: str): | ||||
| def del_trace_app_config(trace_app_config_id: str): | def del_trace_app_config(trace_app_config_id: str): | ||||
| db.session.query(TraceAppConfig).filter(TraceAppConfig.id == trace_app_config_id).delete( | db.session.query(TraceAppConfig).filter(TraceAppConfig.id == trace_app_config_id).delete( | ||||
| synchronize_session=False) | |||||
| synchronize_session=False | |||||
| ) | |||||
| _delete_records( | _delete_records( | ||||
| """select id from trace_app_config where app_id=:app_id limit 1000""", | """select id from trace_app_config where app_id=:app_id limit 1000""", | ||||
| {"app_id": app_id}, | {"app_id": app_id}, | ||||
| del_trace_app_config, | del_trace_app_config, | ||||
| "trace app config" | |||||
| "trace app config", | |||||
| ) | ) | ||||
| try: | try: | ||||
| delete_func(record_id) | delete_func(record_id) | ||||
| db.session.commit() | db.session.commit() | ||||
| logging.info(click.style(f"Deleted {name} {record_id}", fg='green')) | |||||
| logging.info(click.style(f"Deleted {name} {record_id}", fg="green")) | |||||
| except Exception: | except Exception: | ||||
| logging.exception(f"Error occurred while deleting {name} {record_id}") | logging.exception(f"Error occurred while deleting {name} {record_id}") | ||||
| continue | continue |
| from models.dataset import Document, DocumentSegment | from models.dataset import Document, DocumentSegment | ||||
| @shared_task(queue='dataset') | |||||
| @shared_task(queue="dataset") | |||||
| def remove_document_from_index_task(document_id: str): | def remove_document_from_index_task(document_id: str): | ||||
| """ | """ | ||||
| Async Remove document from index | Async Remove document from index | ||||
| Usage: remove_document_from_index.delay(document_id) | Usage: remove_document_from_index.delay(document_id) | ||||
| """ | """ | ||||
| logging.info(click.style('Start remove document segments from index: {}'.format(document_id), fg='green')) | |||||
| logging.info(click.style("Start remove document segments from index: {}".format(document_id), fg="green")) | |||||
| start_at = time.perf_counter() | start_at = time.perf_counter() | ||||
| document = db.session.query(Document).filter(Document.id == document_id).first() | document = db.session.query(Document).filter(Document.id == document_id).first() | ||||
| if not document: | if not document: | ||||
| raise NotFound('Document not found') | |||||
| raise NotFound("Document not found") | |||||
| if document.indexing_status != 'completed': | |||||
| if document.indexing_status != "completed": | |||||
| return | return | ||||
| indexing_cache_key = 'document_{}_indexing'.format(document.id) | |||||
| indexing_cache_key = "document_{}_indexing".format(document.id) | |||||
| try: | try: | ||||
| dataset = document.dataset | dataset = document.dataset | ||||
| if not dataset: | if not dataset: | ||||
| raise Exception('Document has no dataset') | |||||
| raise Exception("Document has no dataset") | |||||
| index_processor = IndexProcessorFactory(document.doc_form).init_index_processor() | index_processor = IndexProcessorFactory(document.doc_form).init_index_processor() | ||||
| end_at = time.perf_counter() | end_at = time.perf_counter() | ||||
| logging.info( | logging.info( | ||||
| click.style('Document removed from index: {} latency: {}'.format(document.id, end_at - start_at), fg='green')) | |||||
| click.style( | |||||
| "Document removed from index: {} latency: {}".format(document.id, end_at - start_at), fg="green" | |||||
| ) | |||||
| ) | |||||
| except Exception: | except Exception: | ||||
| logging.exception("remove document from index failed") | logging.exception("remove document from index failed") | ||||
| if not document.archived: | if not document.archived: |
| from services.feature_service import FeatureService | from services.feature_service import FeatureService | ||||
| @shared_task(queue='dataset') | |||||
| @shared_task(queue="dataset") | |||||
| def retry_document_indexing_task(dataset_id: str, document_ids: list[str]): | def retry_document_indexing_task(dataset_id: str, document_ids: list[str]): | ||||
| """ | """ | ||||
| Async process document | Async process document | ||||
| dataset = db.session.query(Dataset).filter(Dataset.id == dataset_id).first() | dataset = db.session.query(Dataset).filter(Dataset.id == dataset_id).first() | ||||
| for document_id in document_ids: | for document_id in document_ids: | ||||
| retry_indexing_cache_key = 'document_{}_is_retried'.format(document_id) | |||||
| retry_indexing_cache_key = "document_{}_is_retried".format(document_id) | |||||
| # check document limit | # check document limit | ||||
| features = FeatureService.get_features(dataset.tenant_id) | features = FeatureService.get_features(dataset.tenant_id) | ||||
| try: | try: | ||||
| if features.billing.enabled: | if features.billing.enabled: | ||||
| vector_space = features.vector_space | vector_space = features.vector_space | ||||
| if 0 < vector_space.limit <= vector_space.size: | if 0 < vector_space.limit <= vector_space.size: | ||||
| raise ValueError("Your total number of documents plus the number of uploads have over the limit of " | |||||
| "your subscription.") | |||||
| raise ValueError( | |||||
| "Your total number of documents plus the number of uploads have over the limit of " | |||||
| "your subscription." | |||||
| ) | |||||
| except Exception as e: | except Exception as e: | ||||
| document = db.session.query(Document).filter( | |||||
| Document.id == document_id, | |||||
| Document.dataset_id == dataset_id | |||||
| ).first() | |||||
| document = ( | |||||
| db.session.query(Document).filter(Document.id == document_id, Document.dataset_id == dataset_id).first() | |||||
| ) | |||||
| if document: | if document: | ||||
| document.indexing_status = 'error' | |||||
| document.indexing_status = "error" | |||||
| document.error = str(e) | document.error = str(e) | ||||
| document.stopped_at = datetime.datetime.utcnow() | document.stopped_at = datetime.datetime.utcnow() | ||||
| db.session.add(document) | db.session.add(document) | ||||
| redis_client.delete(retry_indexing_cache_key) | redis_client.delete(retry_indexing_cache_key) | ||||
| return | return | ||||
| logging.info(click.style('Start retry document: {}'.format(document_id), fg='green')) | |||||
| document = db.session.query(Document).filter( | |||||
| Document.id == document_id, | |||||
| Document.dataset_id == dataset_id | |||||
| ).first() | |||||
| logging.info(click.style("Start retry document: {}".format(document_id), fg="green")) | |||||
| document = ( | |||||
| db.session.query(Document).filter(Document.id == document_id, Document.dataset_id == dataset_id).first() | |||||
| ) | |||||
| try: | try: | ||||
| if document: | if document: | ||||
| # clean old data | # clean old data | ||||
| db.session.delete(segment) | db.session.delete(segment) | ||||
| db.session.commit() | db.session.commit() | ||||
| document.indexing_status = 'parsing' | |||||
| document.indexing_status = "parsing" | |||||
| document.processing_started_at = datetime.datetime.utcnow() | document.processing_started_at = datetime.datetime.utcnow() | ||||
| db.session.add(document) | db.session.add(document) | ||||
| db.session.commit() | db.session.commit() | ||||
| indexing_runner.run([document]) | indexing_runner.run([document]) | ||||
| redis_client.delete(retry_indexing_cache_key) | redis_client.delete(retry_indexing_cache_key) | ||||
| except Exception as ex: | except Exception as ex: | ||||
| document.indexing_status = 'error' | |||||
| document.indexing_status = "error" | |||||
| document.error = str(ex) | document.error = str(ex) | ||||
| document.stopped_at = datetime.datetime.utcnow() | document.stopped_at = datetime.datetime.utcnow() | ||||
| db.session.add(document) | db.session.add(document) | ||||
| db.session.commit() | db.session.commit() | ||||
| logging.info(click.style(str(ex), fg='yellow')) | |||||
| logging.info(click.style(str(ex), fg="yellow")) | |||||
| redis_client.delete(retry_indexing_cache_key) | redis_client.delete(retry_indexing_cache_key) | ||||
| pass | pass | ||||
| end_at = time.perf_counter() | end_at = time.perf_counter() | ||||
| logging.info(click.style('Retry dataset: {} latency: {}'.format(dataset_id, end_at - start_at), fg='green')) | |||||
| logging.info(click.style("Retry dataset: {} latency: {}".format(dataset_id, end_at - start_at), fg="green")) |
| from services.feature_service import FeatureService | from services.feature_service import FeatureService | ||||
| @shared_task(queue='dataset') | |||||
| @shared_task(queue="dataset") | |||||
| def sync_website_document_indexing_task(dataset_id: str, document_id: str): | def sync_website_document_indexing_task(dataset_id: str, document_id: str): | ||||
| """ | """ | ||||
| Async process document | Async process document | ||||
| dataset = db.session.query(Dataset).filter(Dataset.id == dataset_id).first() | dataset = db.session.query(Dataset).filter(Dataset.id == dataset_id).first() | ||||
| sync_indexing_cache_key = 'document_{}_is_sync'.format(document_id) | |||||
| sync_indexing_cache_key = "document_{}_is_sync".format(document_id) | |||||
| # check document limit | # check document limit | ||||
| features = FeatureService.get_features(dataset.tenant_id) | features = FeatureService.get_features(dataset.tenant_id) | ||||
| try: | try: | ||||
| if features.billing.enabled: | if features.billing.enabled: | ||||
| vector_space = features.vector_space | vector_space = features.vector_space | ||||
| if 0 < vector_space.limit <= vector_space.size: | if 0 < vector_space.limit <= vector_space.size: | ||||
| raise ValueError("Your total number of documents plus the number of uploads have over the limit of " | |||||
| "your subscription.") | |||||
| raise ValueError( | |||||
| "Your total number of documents plus the number of uploads have over the limit of " | |||||
| "your subscription." | |||||
| ) | |||||
| except Exception as e: | except Exception as e: | ||||
| document = db.session.query(Document).filter( | |||||
| Document.id == document_id, | |||||
| Document.dataset_id == dataset_id | |||||
| ).first() | |||||
| document = ( | |||||
| db.session.query(Document).filter(Document.id == document_id, Document.dataset_id == dataset_id).first() | |||||
| ) | |||||
| if document: | if document: | ||||
| document.indexing_status = 'error' | |||||
| document.indexing_status = "error" | |||||
| document.error = str(e) | document.error = str(e) | ||||
| document.stopped_at = datetime.datetime.utcnow() | document.stopped_at = datetime.datetime.utcnow() | ||||
| db.session.add(document) | db.session.add(document) | ||||
| redis_client.delete(sync_indexing_cache_key) | redis_client.delete(sync_indexing_cache_key) | ||||
| return | return | ||||
| logging.info(click.style('Start sync website document: {}'.format(document_id), fg='green')) | |||||
| document = db.session.query(Document).filter( | |||||
| Document.id == document_id, | |||||
| Document.dataset_id == dataset_id | |||||
| ).first() | |||||
| logging.info(click.style("Start sync website document: {}".format(document_id), fg="green")) | |||||
| document = db.session.query(Document).filter(Document.id == document_id, Document.dataset_id == dataset_id).first() | |||||
| try: | try: | ||||
| if document: | if document: | ||||
| # clean old data | # clean old data | ||||
| db.session.delete(segment) | db.session.delete(segment) | ||||
| db.session.commit() | db.session.commit() | ||||
| document.indexing_status = 'parsing' | |||||
| document.indexing_status = "parsing" | |||||
| document.processing_started_at = datetime.datetime.utcnow() | document.processing_started_at = datetime.datetime.utcnow() | ||||
| db.session.add(document) | db.session.add(document) | ||||
| db.session.commit() | db.session.commit() | ||||
| indexing_runner.run([document]) | indexing_runner.run([document]) | ||||
| redis_client.delete(sync_indexing_cache_key) | redis_client.delete(sync_indexing_cache_key) | ||||
| except Exception as ex: | except Exception as ex: | ||||
| document.indexing_status = 'error' | |||||
| document.indexing_status = "error" | |||||
| document.error = str(ex) | document.error = str(ex) | ||||
| document.stopped_at = datetime.datetime.utcnow() | document.stopped_at = datetime.datetime.utcnow() | ||||
| db.session.add(document) | db.session.add(document) | ||||
| db.session.commit() | db.session.commit() | ||||
| logging.info(click.style(str(ex), fg='yellow')) | |||||
| logging.info(click.style(str(ex), fg="yellow")) | |||||
| redis_client.delete(sync_indexing_cache_key) | redis_client.delete(sync_indexing_cache_key) | ||||
| pass | pass | ||||
| end_at = time.perf_counter() | end_at = time.perf_counter() | ||||
| logging.info(click.style('Sync document: {} latency: {}'.format(document_id, end_at - start_at), fg='green')) | |||||
| logging.info(click.style("Sync document: {} latency: {}".format(document_id, end_at - start_at), fg="green")) |