Вы не можете выбрать более 25 тем Темы должны начинаться с буквы или цифры, могут содержать дефисы(-) и должны содержать не более 35 символов.

retry_document_indexing_task.py 4.2KB

10 месяцев назад
7 месяцев назад
10 месяцев назад
10 месяцев назад
10 месяцев назад
10 месяцев назад
10 месяцев назад
10 месяцев назад
10 месяцев назад
10 месяцев назад
10 месяцев назад
10 месяцев назад
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102
  1. import datetime
  2. import logging
  3. import time
  4. import click
  5. from celery import shared_task # type: ignore
  6. from core.indexing_runner import IndexingRunner
  7. from core.rag.index_processor.index_processor_factory import IndexProcessorFactory
  8. from extensions.ext_database import db
  9. from extensions.ext_redis import redis_client
  10. from models.dataset import Dataset, Document, DocumentSegment
  11. from services.feature_service import FeatureService
  12. @shared_task(queue="dataset")
  13. def retry_document_indexing_task(dataset_id: str, document_ids: list[str]):
  14. """
  15. Async process document
  16. :param dataset_id:
  17. :param document_ids:
  18. Usage: retry_document_indexing_task.delay(dataset_id, document_ids)
  19. """
  20. documents: list[Document] = []
  21. start_at = time.perf_counter()
  22. dataset = db.session.query(Dataset).filter(Dataset.id == dataset_id).first()
  23. if not dataset:
  24. logging.info(click.style("Dataset not found: {}".format(dataset_id), fg="red"))
  25. db.session.close()
  26. return
  27. tenant_id = dataset.tenant_id
  28. for document_id in document_ids:
  29. retry_indexing_cache_key = "document_{}_is_retried".format(document_id)
  30. # check document limit
  31. features = FeatureService.get_features(tenant_id)
  32. try:
  33. if features.billing.enabled:
  34. vector_space = features.vector_space
  35. if 0 < vector_space.limit <= vector_space.size:
  36. raise ValueError(
  37. "Your total number of documents plus the number of uploads have over the limit of "
  38. "your subscription."
  39. )
  40. except Exception as e:
  41. document = (
  42. db.session.query(Document).filter(Document.id == document_id, Document.dataset_id == dataset_id).first()
  43. )
  44. if document:
  45. document.indexing_status = "error"
  46. document.error = str(e)
  47. document.stopped_at = datetime.datetime.now(datetime.UTC).replace(tzinfo=None)
  48. db.session.add(document)
  49. db.session.commit()
  50. redis_client.delete(retry_indexing_cache_key)
  51. db.session.close()
  52. return
  53. logging.info(click.style("Start retry document: {}".format(document_id), fg="green"))
  54. document = (
  55. db.session.query(Document).filter(Document.id == document_id, Document.dataset_id == dataset_id).first()
  56. )
  57. if not document:
  58. logging.info(click.style("Document not found: {}".format(document_id), fg="yellow"))
  59. db.session.close()
  60. return
  61. try:
  62. # clean old data
  63. index_processor = IndexProcessorFactory(document.doc_form).init_index_processor()
  64. segments = db.session.query(DocumentSegment).filter(DocumentSegment.document_id == document_id).all()
  65. if segments:
  66. index_node_ids = [segment.index_node_id for segment in segments]
  67. # delete from vector index
  68. index_processor.clean(dataset, index_node_ids, with_keywords=True, delete_child_chunks=True)
  69. for segment in segments:
  70. db.session.delete(segment)
  71. db.session.commit()
  72. document.indexing_status = "parsing"
  73. document.processing_started_at = datetime.datetime.now(datetime.UTC).replace(tzinfo=None)
  74. db.session.add(document)
  75. db.session.commit()
  76. indexing_runner = IndexingRunner()
  77. indexing_runner.run([document])
  78. redis_client.delete(retry_indexing_cache_key)
  79. except Exception as ex:
  80. document.indexing_status = "error"
  81. document.error = str(ex)
  82. document.stopped_at = datetime.datetime.now(datetime.UTC).replace(tzinfo=None)
  83. db.session.add(document)
  84. db.session.commit()
  85. logging.info(click.style(str(ex), fg="yellow"))
  86. redis_client.delete(retry_indexing_cache_key)
  87. logging.exception("retry_document_indexing_task failed, document_id: {}".format(document_id))
  88. finally:
  89. db.session.close()
  90. end_at = time.perf_counter()
  91. logging.info(click.style("Retry dataset: {} latency: {}".format(dataset_id, end_at - start_at), fg="green"))