Вы не можете выбрать более 25 тем Темы должны начинаться с буквы или цифры, могут содержать дефисы(-) и должны содержать не более 35 символов.

disable_segments_from_index_task.py 2.9KB

10 месяцев назад
10 месяцев назад
10 месяцев назад
7 месяцев назад
10 месяцев назад
10 месяцев назад
10 месяцев назад
10 месяцев назад
10 месяцев назад
10 месяцев назад
10 месяцев назад
10 месяцев назад
10 месяцев назад
10 месяцев назад
10 месяцев назад
12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485
  1. import logging
  2. import time
  3. import click
  4. from celery import shared_task
  5. from core.rag.index_processor.index_processor_factory import IndexProcessorFactory
  6. from extensions.ext_database import db
  7. from extensions.ext_redis import redis_client
  8. from models.dataset import Dataset, DocumentSegment
  9. from models.dataset import Document as DatasetDocument
  10. logger = logging.getLogger(__name__)
  11. @shared_task(queue="dataset")
  12. def disable_segments_from_index_task(segment_ids: list, dataset_id: str, document_id: str):
  13. """
  14. Async disable segments from index
  15. :param segment_ids: list of segment ids
  16. :param dataset_id: dataset id
  17. :param document_id: document id
  18. Usage: disable_segments_from_index_task.delay(segment_ids, dataset_id, document_id)
  19. """
  20. start_at = time.perf_counter()
  21. dataset = db.session.query(Dataset).where(Dataset.id == dataset_id).first()
  22. if not dataset:
  23. logger.info(click.style(f"Dataset {dataset_id} not found, pass.", fg="cyan"))
  24. db.session.close()
  25. return
  26. dataset_document = db.session.query(DatasetDocument).where(DatasetDocument.id == document_id).first()
  27. if not dataset_document:
  28. logger.info(click.style(f"Document {document_id} not found, pass.", fg="cyan"))
  29. db.session.close()
  30. return
  31. if not dataset_document.enabled or dataset_document.archived or dataset_document.indexing_status != "completed":
  32. logger.info(click.style(f"Document {document_id} status is invalid, pass.", fg="cyan"))
  33. db.session.close()
  34. return
  35. # sync index processor
  36. index_processor = IndexProcessorFactory(dataset_document.doc_form).init_index_processor()
  37. segments = (
  38. db.session.query(DocumentSegment)
  39. .where(
  40. DocumentSegment.id.in_(segment_ids),
  41. DocumentSegment.dataset_id == dataset_id,
  42. DocumentSegment.document_id == document_id,
  43. )
  44. .all()
  45. )
  46. if not segments:
  47. db.session.close()
  48. return
  49. try:
  50. index_node_ids = [segment.index_node_id for segment in segments]
  51. index_processor.clean(dataset, index_node_ids, with_keywords=True, delete_child_chunks=False)
  52. end_at = time.perf_counter()
  53. logger.info(click.style(f"Segments removed from index latency: {end_at - start_at}", fg="green"))
  54. except Exception:
  55. # update segment error msg
  56. db.session.query(DocumentSegment).where(
  57. DocumentSegment.id.in_(segment_ids),
  58. DocumentSegment.dataset_id == dataset_id,
  59. DocumentSegment.document_id == document_id,
  60. ).update(
  61. {
  62. "disabled_at": None,
  63. "disabled_by": None,
  64. "enabled": True,
  65. }
  66. )
  67. db.session.commit()
  68. finally:
  69. for segment in segments:
  70. indexing_cache_key = f"segment_{segment.id}_indexing"
  71. redis_client.delete(indexing_cache_key)
  72. db.session.close()