Вы не можете выбрать более 25 тем Темы должны начинаться с буквы или цифры, могут содержать дефисы(-) и должны содержать не более 35 символов.

retry_document_indexing_task.py 3.9KB

10 месяцев назад
7 месяцев назад
10 месяцев назад
10 месяцев назад
10 месяцев назад
10 месяцев назад
10 месяцев назад
10 месяцев назад
10 месяцев назад
10 месяцев назад
10 месяцев назад
10 месяцев назад
10 месяцев назад
10 месяцев назад
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596
  1. import datetime
  2. import logging
  3. import time
  4. import click
  5. from celery import shared_task # type: ignore
  6. from core.indexing_runner import IndexingRunner
  7. from core.rag.index_processor.index_processor_factory import IndexProcessorFactory
  8. from extensions.ext_database import db
  9. from extensions.ext_redis import redis_client
  10. from models.dataset import Dataset, Document, DocumentSegment
  11. from services.feature_service import FeatureService
  12. @shared_task(queue="dataset")
  13. def retry_document_indexing_task(dataset_id: str, document_ids: list[str]):
  14. """
  15. Async process document
  16. :param dataset_id:
  17. :param document_ids:
  18. Usage: retry_document_indexing_task.delay(dataset_id, document_ids)
  19. """
  20. documents: list[Document] = []
  21. start_at = time.perf_counter()
  22. dataset = db.session.query(Dataset).filter(Dataset.id == dataset_id).first()
  23. if not dataset:
  24. raise ValueError("Dataset not found")
  25. for document_id in document_ids:
  26. retry_indexing_cache_key = "document_{}_is_retried".format(document_id)
  27. # check document limit
  28. features = FeatureService.get_features(dataset.tenant_id)
  29. try:
  30. if features.billing.enabled:
  31. vector_space = features.vector_space
  32. if 0 < vector_space.limit <= vector_space.size:
  33. raise ValueError(
  34. "Your total number of documents plus the number of uploads have over the limit of "
  35. "your subscription."
  36. )
  37. except Exception as e:
  38. document = (
  39. db.session.query(Document).filter(Document.id == document_id, Document.dataset_id == dataset_id).first()
  40. )
  41. if document:
  42. document.indexing_status = "error"
  43. document.error = str(e)
  44. document.stopped_at = datetime.datetime.now(datetime.timezone.utc).replace(tzinfo=None)
  45. db.session.add(document)
  46. db.session.commit()
  47. redis_client.delete(retry_indexing_cache_key)
  48. return
  49. logging.info(click.style("Start retry document: {}".format(document_id), fg="green"))
  50. document = (
  51. db.session.query(Document).filter(Document.id == document_id, Document.dataset_id == dataset_id).first()
  52. )
  53. if not document:
  54. logging.info(click.style("Document not found: {}".format(document_id), fg="yellow"))
  55. return
  56. try:
  57. # clean old data
  58. index_processor = IndexProcessorFactory(document.doc_form).init_index_processor()
  59. segments = db.session.query(DocumentSegment).filter(DocumentSegment.document_id == document_id).all()
  60. if segments:
  61. index_node_ids = [segment.index_node_id for segment in segments]
  62. # delete from vector index
  63. index_processor.clean(dataset, index_node_ids, with_keywords=True, delete_child_chunks=True)
  64. for segment in segments:
  65. db.session.delete(segment)
  66. db.session.commit()
  67. document.indexing_status = "parsing"
  68. document.processing_started_at = datetime.datetime.now(datetime.timezone.utc).replace(tzinfo=None)
  69. db.session.add(document)
  70. db.session.commit()
  71. indexing_runner = IndexingRunner()
  72. indexing_runner.run([document])
  73. redis_client.delete(retry_indexing_cache_key)
  74. except Exception as ex:
  75. document.indexing_status = "error"
  76. document.error = str(ex)
  77. document.stopped_at = datetime.datetime.now(datetime.timezone.utc).replace(tzinfo=None)
  78. db.session.add(document)
  79. db.session.commit()
  80. logging.info(click.style(str(ex), fg="yellow"))
  81. redis_client.delete(retry_indexing_cache_key)
  82. pass
  83. end_at = time.perf_counter()
  84. logging.info(click.style("Retry dataset: {} latency: {}".format(dataset_id, end_at - start_at), fg="green"))