You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

remove_document_from_index_task.py 2.8KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576
  1. import logging
  2. import time
  3. import click
  4. from celery import shared_task
  5. from sqlalchemy import select
  6. from core.rag.index_processor.index_processor_factory import IndexProcessorFactory
  7. from extensions.ext_database import db
  8. from extensions.ext_redis import redis_client
  9. from libs.datetime_utils import naive_utc_now
  10. from models.dataset import Document, DocumentSegment
  11. logger = logging.getLogger(__name__)
  12. @shared_task(queue="dataset")
  13. def remove_document_from_index_task(document_id: str):
  14. """
  15. Async Remove document from index
  16. :param document_id: document id
  17. Usage: remove_document_from_index.delay(document_id)
  18. """
  19. logger.info(click.style(f"Start remove document segments from index: {document_id}", fg="green"))
  20. start_at = time.perf_counter()
  21. document = db.session.query(Document).where(Document.id == document_id).first()
  22. if not document:
  23. logger.info(click.style(f"Document not found: {document_id}", fg="red"))
  24. db.session.close()
  25. return
  26. if document.indexing_status != "completed":
  27. logger.info(click.style(f"Document is not completed, remove is not allowed: {document_id}", fg="red"))
  28. db.session.close()
  29. return
  30. indexing_cache_key = f"document_{document.id}_indexing"
  31. try:
  32. dataset = document.dataset
  33. if not dataset:
  34. raise Exception("Document has no dataset")
  35. index_processor = IndexProcessorFactory(document.doc_form).init_index_processor()
  36. segments = db.session.scalars(select(DocumentSegment).where(DocumentSegment.document_id == document.id)).all()
  37. index_node_ids = [segment.index_node_id for segment in segments]
  38. if index_node_ids:
  39. try:
  40. index_processor.clean(dataset, index_node_ids, with_keywords=True, delete_child_chunks=False)
  41. except Exception:
  42. logger.exception("clean dataset %s from index failed", dataset.id)
  43. # update segment to disable
  44. db.session.query(DocumentSegment).where(DocumentSegment.document_id == document.id).update(
  45. {
  46. DocumentSegment.enabled: False,
  47. DocumentSegment.disabled_at: naive_utc_now(),
  48. DocumentSegment.disabled_by: document.disabled_by,
  49. DocumentSegment.updated_at: naive_utc_now(),
  50. }
  51. )
  52. db.session.commit()
  53. end_at = time.perf_counter()
  54. logger.info(click.style(f"Document removed from index: {document.id} latency: {end_at - start_at}", fg="green"))
  55. except Exception:
  56. logger.exception("remove document from index failed")
  57. if not document.archived:
  58. document.enabled = True
  59. db.session.commit()
  60. finally:
  61. redis_client.delete(indexing_cache_key)
  62. db.session.close()