Вы не можете выбрать более 25 тем Темы должны начинаться с буквы или цифры, могут содержать дефисы(-) и должны содержать не более 35 символов.

sync_website_document_indexing_task.py 3.6KB

10 месяцев назад
1 год назад
10 месяцев назад
10 месяцев назад
10 месяцев назад
10 месяцев назад
10 месяцев назад
10 месяцев назад
10 месяцев назад
10 месяцев назад
10 месяцев назад
10 месяцев назад
10 месяцев назад
1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192
  1. import datetime
  2. import logging
  3. import time
  4. import click
  5. from celery import shared_task # type: ignore
  6. from core.indexing_runner import IndexingRunner
  7. from core.rag.index_processor.index_processor_factory import IndexProcessorFactory
  8. from extensions.ext_database import db
  9. from extensions.ext_redis import redis_client
  10. from models.dataset import Dataset, Document, DocumentSegment
  11. from services.feature_service import FeatureService
  12. @shared_task(queue="dataset")
  13. def sync_website_document_indexing_task(dataset_id: str, document_id: str):
  14. """
  15. Async process document
  16. :param dataset_id:
  17. :param document_id:
  18. Usage: sync_website_document_indexing_task.delay(dataset_id, document_id)
  19. """
  20. start_at = time.perf_counter()
  21. dataset = db.session.query(Dataset).filter(Dataset.id == dataset_id).first()
  22. if dataset is None:
  23. raise ValueError("Dataset not found")
  24. sync_indexing_cache_key = "document_{}_is_sync".format(document_id)
  25. # check document limit
  26. features = FeatureService.get_features(dataset.tenant_id)
  27. try:
  28. if features.billing.enabled:
  29. vector_space = features.vector_space
  30. if 0 < vector_space.limit <= vector_space.size:
  31. raise ValueError(
  32. "Your total number of documents plus the number of uploads have over the limit of "
  33. "your subscription."
  34. )
  35. except Exception as e:
  36. document = (
  37. db.session.query(Document).filter(Document.id == document_id, Document.dataset_id == dataset_id).first()
  38. )
  39. if document:
  40. document.indexing_status = "error"
  41. document.error = str(e)
  42. document.stopped_at = datetime.datetime.now(datetime.timezone.utc).replace(tzinfo=None)
  43. db.session.add(document)
  44. db.session.commit()
  45. redis_client.delete(sync_indexing_cache_key)
  46. return
  47. logging.info(click.style("Start sync website document: {}".format(document_id), fg="green"))
  48. document = db.session.query(Document).filter(Document.id == document_id, Document.dataset_id == dataset_id).first()
  49. if not document:
  50. logging.info(click.style("Document not found: {}".format(document_id), fg="yellow"))
  51. return
  52. try:
  53. # clean old data
  54. index_processor = IndexProcessorFactory(document.doc_form).init_index_processor()
  55. segments = db.session.query(DocumentSegment).filter(DocumentSegment.document_id == document_id).all()
  56. if segments:
  57. index_node_ids = [segment.index_node_id for segment in segments]
  58. # delete from vector index
  59. index_processor.clean(dataset, index_node_ids, with_keywords=True, delete_child_chunks=True)
  60. for segment in segments:
  61. db.session.delete(segment)
  62. db.session.commit()
  63. document.indexing_status = "parsing"
  64. document.processing_started_at = datetime.datetime.now(datetime.timezone.utc).replace(tzinfo=None)
  65. db.session.add(document)
  66. db.session.commit()
  67. indexing_runner = IndexingRunner()
  68. indexing_runner.run([document])
  69. redis_client.delete(sync_indexing_cache_key)
  70. except Exception as ex:
  71. document.indexing_status = "error"
  72. document.error = str(ex)
  73. document.stopped_at = datetime.datetime.now(datetime.timezone.utc).replace(tzinfo=None)
  74. db.session.add(document)
  75. db.session.commit()
  76. logging.info(click.style(str(ex), fg="yellow"))
  77. redis_client.delete(sync_indexing_cache_key)
  78. pass
  79. end_at = time.perf_counter()
  80. logging.info(click.style("Sync document: {} latency: {}".format(document_id, end_at - start_at), fg="green"))