|
|
|
@@ -27,73 +27,73 @@ def duplicate_document_indexing_task(dataset_id: str, document_ids: list): |
|
|
|
documents = [] |
|
|
|
start_at = time.perf_counter() |
|
|
|
|
|
|
|
dataset = db.session.query(Dataset).where(Dataset.id == dataset_id).first() |
|
|
|
if dataset is None: |
|
|
|
logger.info(click.style(f"Dataset not found: {dataset_id}", fg="red")) |
|
|
|
db.session.close() |
|
|
|
return |
|
|
|
|
|
|
|
# check document limit |
|
|
|
features = FeatureService.get_features(dataset.tenant_id) |
|
|
|
try: |
|
|
|
if features.billing.enabled: |
|
|
|
vector_space = features.vector_space |
|
|
|
count = len(document_ids) |
|
|
|
if features.billing.subscription.plan == "sandbox" and count > 1: |
|
|
|
raise ValueError("Your current plan does not support batch upload, please upgrade your plan.") |
|
|
|
batch_upload_limit = int(dify_config.BATCH_UPLOAD_LIMIT) |
|
|
|
if count > batch_upload_limit: |
|
|
|
raise ValueError(f"You have reached the batch upload limit of {batch_upload_limit}.") |
|
|
|
if 0 < vector_space.limit <= vector_space.size: |
|
|
|
raise ValueError( |
|
|
|
"Your total number of documents plus the number of uploads have over the limit of " |
|
|
|
"your subscription." |
|
|
|
dataset = db.session.query(Dataset).where(Dataset.id == dataset_id).first() |
|
|
|
if dataset is None: |
|
|
|
logger.info(click.style(f"Dataset not found: {dataset_id}", fg="red")) |
|
|
|
db.session.close() |
|
|
|
return |
|
|
|
|
|
|
|
# check document limit |
|
|
|
features = FeatureService.get_features(dataset.tenant_id) |
|
|
|
try: |
|
|
|
if features.billing.enabled: |
|
|
|
vector_space = features.vector_space |
|
|
|
count = len(document_ids) |
|
|
|
if features.billing.subscription.plan == "sandbox" and count > 1: |
|
|
|
raise ValueError("Your current plan does not support batch upload, please upgrade your plan.") |
|
|
|
batch_upload_limit = int(dify_config.BATCH_UPLOAD_LIMIT) |
|
|
|
if count > batch_upload_limit: |
|
|
|
raise ValueError(f"You have reached the batch upload limit of {batch_upload_limit}.") |
|
|
|
if 0 < vector_space.limit <= vector_space.size: |
|
|
|
raise ValueError( |
|
|
|
"Your total number of documents plus the number of uploads have over the limit of " |
|
|
|
"your subscription." |
|
|
|
) |
|
|
|
except Exception as e: |
|
|
|
for document_id in document_ids: |
|
|
|
document = ( |
|
|
|
db.session.query(Document) |
|
|
|
.where(Document.id == document_id, Document.dataset_id == dataset_id) |
|
|
|
.first() |
|
|
|
) |
|
|
|
except Exception as e: |
|
|
|
if document: |
|
|
|
document.indexing_status = "error" |
|
|
|
document.error = str(e) |
|
|
|
document.stopped_at = naive_utc_now() |
|
|
|
db.session.add(document) |
|
|
|
db.session.commit() |
|
|
|
return |
|
|
|
|
|
|
|
for document_id in document_ids: |
|
|
|
logger.info(click.style(f"Start process document: {document_id}", fg="green")) |
|
|
|
|
|
|
|
document = ( |
|
|
|
db.session.query(Document).where(Document.id == document_id, Document.dataset_id == dataset_id).first() |
|
|
|
) |
|
|
|
if document: |
|
|
|
document.indexing_status = "error" |
|
|
|
document.error = str(e) |
|
|
|
document.stopped_at = naive_utc_now() |
|
|
|
db.session.add(document) |
|
|
|
db.session.commit() |
|
|
|
return |
|
|
|
finally: |
|
|
|
db.session.close() |
|
|
|
|
|
|
|
for document_id in document_ids: |
|
|
|
logger.info(click.style(f"Start process document: {document_id}", fg="green")) |
|
|
|
|
|
|
|
document = ( |
|
|
|
db.session.query(Document).where(Document.id == document_id, Document.dataset_id == dataset_id).first() |
|
|
|
) |
|
|
|
|
|
|
|
if document: |
|
|
|
# clean old data |
|
|
|
index_type = document.doc_form |
|
|
|
index_processor = IndexProcessorFactory(index_type).init_index_processor() |
|
|
|
if document: |
|
|
|
# clean old data |
|
|
|
index_type = document.doc_form |
|
|
|
index_processor = IndexProcessorFactory(index_type).init_index_processor() |
|
|
|
|
|
|
|
segments = db.session.query(DocumentSegment).where(DocumentSegment.document_id == document_id).all() |
|
|
|
if segments: |
|
|
|
index_node_ids = [segment.index_node_id for segment in segments] |
|
|
|
segments = db.session.query(DocumentSegment).where(DocumentSegment.document_id == document_id).all() |
|
|
|
if segments: |
|
|
|
index_node_ids = [segment.index_node_id for segment in segments] |
|
|
|
|
|
|
|
# delete from vector index |
|
|
|
index_processor.clean(dataset, index_node_ids, with_keywords=True, delete_child_chunks=True) |
|
|
|
# delete from vector index |
|
|
|
index_processor.clean(dataset, index_node_ids, with_keywords=True, delete_child_chunks=True) |
|
|
|
|
|
|
|
for segment in segments: |
|
|
|
db.session.delete(segment) |
|
|
|
db.session.commit() |
|
|
|
for segment in segments: |
|
|
|
db.session.delete(segment) |
|
|
|
db.session.commit() |
|
|
|
|
|
|
|
document.indexing_status = "parsing" |
|
|
|
document.processing_started_at = naive_utc_now() |
|
|
|
documents.append(document) |
|
|
|
db.session.add(document) |
|
|
|
db.session.commit() |
|
|
|
document.indexing_status = "parsing" |
|
|
|
document.processing_started_at = naive_utc_now() |
|
|
|
documents.append(document) |
|
|
|
db.session.add(document) |
|
|
|
db.session.commit() |
|
|
|
|
|
|
|
try: |
|
|
|
indexing_runner = IndexingRunner() |
|
|
|
indexing_runner.run(documents) |
|
|
|
end_at = time.perf_counter() |