Вы не можете выбрать более 25 тем Темы должны начинаться с буквы или цифры, могут содержать дефисы(-) и должны содержать не более 35 символов.

retry_document_indexing_task.py 4.1KB

10 месяцев назад
7 месяцев назад
10 месяцев назад
10 месяцев назад
10 месяцев назад
10 месяцев назад
10 месяцев назад
10 месяцев назад
10 месяцев назад
10 месяцев назад
10 месяцев назад
10 месяцев назад
10 месяцев назад
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102
  1. import datetime
  2. import logging
  3. import time
  4. import click
  5. from celery import shared_task # type: ignore
  6. from core.indexing_runner import IndexingRunner
  7. from core.rag.index_processor.index_processor_factory import IndexProcessorFactory
  8. from extensions.ext_database import db
  9. from extensions.ext_redis import redis_client
  10. from models.dataset import Dataset, Document, DocumentSegment
  11. from services.feature_service import FeatureService
  12. @shared_task(queue="dataset")
  13. def retry_document_indexing_task(dataset_id: str, document_ids: list[str]):
  14. """
  15. Async process document
  16. :param dataset_id:
  17. :param document_ids:
  18. Usage: retry_document_indexing_task.delay(dataset_id, document_ids)
  19. """
  20. documents: list[Document] = []
  21. start_at = time.perf_counter()
  22. dataset = db.session.query(Dataset).filter(Dataset.id == dataset_id).first()
  23. if not dataset:
  24. logging.info(click.style("Dataset not found: {}".format(dataset_id), fg="red"))
  25. db.session.close()
  26. return
  27. for document_id in document_ids:
  28. retry_indexing_cache_key = "document_{}_is_retried".format(document_id)
  29. # check document limit
  30. features = FeatureService.get_features(dataset.tenant_id)
  31. try:
  32. if features.billing.enabled:
  33. vector_space = features.vector_space
  34. if 0 < vector_space.limit <= vector_space.size:
  35. raise ValueError(
  36. "Your total number of documents plus the number of uploads have over the limit of "
  37. "your subscription."
  38. )
  39. except Exception as e:
  40. document = (
  41. db.session.query(Document).filter(Document.id == document_id, Document.dataset_id == dataset_id).first()
  42. )
  43. if document:
  44. document.indexing_status = "error"
  45. document.error = str(e)
  46. document.stopped_at = datetime.datetime.now(datetime.UTC).replace(tzinfo=None)
  47. db.session.add(document)
  48. db.session.commit()
  49. redis_client.delete(retry_indexing_cache_key)
  50. db.session.close()
  51. return
  52. logging.info(click.style("Start retry document: {}".format(document_id), fg="green"))
  53. document = (
  54. db.session.query(Document).filter(Document.id == document_id, Document.dataset_id == dataset_id).first()
  55. )
  56. if not document:
  57. logging.info(click.style("Document not found: {}".format(document_id), fg="yellow"))
  58. db.session.close()
  59. return
  60. try:
  61. # clean old data
  62. index_processor = IndexProcessorFactory(document.doc_form).init_index_processor()
  63. segments = db.session.query(DocumentSegment).filter(DocumentSegment.document_id == document_id).all()
  64. if segments:
  65. index_node_ids = [segment.index_node_id for segment in segments]
  66. # delete from vector index
  67. index_processor.clean(dataset, index_node_ids, with_keywords=True, delete_child_chunks=True)
  68. for segment in segments:
  69. db.session.delete(segment)
  70. db.session.commit()
  71. document.indexing_status = "parsing"
  72. document.processing_started_at = datetime.datetime.now(datetime.UTC).replace(tzinfo=None)
  73. db.session.add(document)
  74. db.session.commit()
  75. indexing_runner = IndexingRunner()
  76. indexing_runner.run([document])
  77. redis_client.delete(retry_indexing_cache_key)
  78. except Exception as ex:
  79. document.indexing_status = "error"
  80. document.error = str(ex)
  81. document.stopped_at = datetime.datetime.now(datetime.UTC).replace(tzinfo=None)
  82. db.session.add(document)
  83. db.session.commit()
  84. logging.info(click.style(str(ex), fg="yellow"))
  85. redis_client.delete(retry_indexing_cache_key)
  86. pass
  87. finally:
  88. db.session.close()
  89. end_at = time.perf_counter()
  90. logging.info(click.style("Retry dataset: {} latency: {}".format(dataset_id, end_at - start_at), fg="green"))