You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

retry_document_indexing_task.py 4.1KB

преди 10 месеца
преди 10 месеца
преди 10 месеца
преди 10 месеца
преди 10 месеца
преди 10 месеца
преди 10 месеца
преди 10 месеца
преди 10 месеца
преди 10 месеца
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102
  1. import datetime
  2. import logging
  3. import time
  4. import click
  5. from celery import shared_task # type: ignore
  6. from core.indexing_runner import IndexingRunner
  7. from core.rag.index_processor.index_processor_factory import IndexProcessorFactory
  8. from extensions.ext_database import db
  9. from extensions.ext_redis import redis_client
  10. from models.dataset import Dataset, Document, DocumentSegment
  11. from services.feature_service import FeatureService
  12. @shared_task(queue="dataset")
  13. def retry_document_indexing_task(dataset_id: str, document_ids: list[str]):
  14. """
  15. Async process document
  16. :param dataset_id:
  17. :param document_ids:
  18. Usage: retry_document_indexing_task.delay(dataset_id, document_ids)
  19. """
  20. documents: list[Document] = []
  21. start_at = time.perf_counter()
  22. dataset = db.session.query(Dataset).filter(Dataset.id == dataset_id).first()
  23. if not dataset:
  24. logging.info(click.style("Dataset not found: {}".format(dataset_id), fg="red"))
  25. db.session.close()
  26. return
  27. for document_id in document_ids:
  28. retry_indexing_cache_key = "document_{}_is_retried".format(document_id)
  29. # check document limit
  30. features = FeatureService.get_features(dataset.tenant_id)
  31. try:
  32. if features.billing.enabled:
  33. vector_space = features.vector_space
  34. if 0 < vector_space.limit <= vector_space.size:
  35. raise ValueError(
  36. "Your total number of documents plus the number of uploads have over the limit of "
  37. "your subscription."
  38. )
  39. except Exception as e:
  40. document = (
  41. db.session.query(Document).filter(Document.id == document_id, Document.dataset_id == dataset_id).first()
  42. )
  43. if document:
  44. document.indexing_status = "error"
  45. document.error = str(e)
  46. document.stopped_at = datetime.datetime.now(datetime.UTC).replace(tzinfo=None)
  47. db.session.add(document)
  48. db.session.commit()
  49. redis_client.delete(retry_indexing_cache_key)
  50. db.session.close()
  51. return
  52. logging.info(click.style("Start retry document: {}".format(document_id), fg="green"))
  53. document = (
  54. db.session.query(Document).filter(Document.id == document_id, Document.dataset_id == dataset_id).first()
  55. )
  56. if not document:
  57. logging.info(click.style("Document not found: {}".format(document_id), fg="yellow"))
  58. db.session.close()
  59. return
  60. try:
  61. # clean old data
  62. index_processor = IndexProcessorFactory(document.doc_form).init_index_processor()
  63. segments = db.session.query(DocumentSegment).filter(DocumentSegment.document_id == document_id).all()
  64. if segments:
  65. index_node_ids = [segment.index_node_id for segment in segments]
  66. # delete from vector index
  67. index_processor.clean(dataset, index_node_ids, with_keywords=True, delete_child_chunks=True)
  68. for segment in segments:
  69. db.session.delete(segment)
  70. db.session.commit()
  71. document.indexing_status = "parsing"
  72. document.processing_started_at = datetime.datetime.now(datetime.UTC).replace(tzinfo=None)
  73. db.session.add(document)
  74. db.session.commit()
  75. indexing_runner = IndexingRunner()
  76. indexing_runner.run([document])
  77. redis_client.delete(retry_indexing_cache_key)
  78. except Exception as ex:
  79. document.indexing_status = "error"
  80. document.error = str(ex)
  81. document.stopped_at = datetime.datetime.now(datetime.UTC).replace(tzinfo=None)
  82. db.session.add(document)
  83. db.session.commit()
  84. logging.info(click.style(str(ex), fg="yellow"))
  85. redis_client.delete(retry_indexing_cache_key)
  86. pass
  87. finally:
  88. db.session.close()
  89. end_at = time.perf_counter()
  90. logging.info(click.style("Retry dataset: {} latency: {}".format(dataset_id, end_at - start_at), fg="green"))