### What problem does this PR solve? ### Type of change - [x] Bug Fix (non-breaking change which fixes an issue)tags/v0.5.0
| @@ -328,12 +328,12 @@ def rename(): | |||
| # @login_required | |||
| def get(file_id): | |||
| try: | |||
| e, doc = FileService.get_by_id(file_id) | |||
| e, file = FileService.get_by_id(file_id) | |||
| if not e: | |||
| return get_data_error_result(retmsg="Document not found!") | |||
| response = flask.make_response(MINIO.get(doc.parent_id, doc.location)) | |||
| ext = re.search(r"\.([^.]+)$", doc.name) | |||
| response = flask.make_response(MINIO.get(file.parent_id, file.location)) | |||
| ext = re.search(r"\.([^.]+)$", file.name) | |||
| if ext: | |||
| if doc.type == FileType.VISUAL.value: | |||
| response.headers.set('Content-Type', 'image/%s' % ext.group(1)) | |||
| @@ -18,6 +18,8 @@ from datetime import datetime | |||
| from api.db.db_models import DB | |||
| from api.db.db_models import File, Document, File2Document | |||
| from api.db.services.common_service import CommonService | |||
| from api.db.services.document_service import DocumentService | |||
| from api.db.services.file_service import FileService | |||
| from api.utils import current_timestamp, datetime_format | |||
| @@ -64,3 +66,18 @@ class File2DocumentService(CommonService): | |||
| num = cls.model.update(obj).where(cls.model.id == file_id).execute() | |||
| e, obj = cls.get_by_id(cls.model.id) | |||
| return obj | |||
| @classmethod | |||
| @DB.connection_context() | |||
| def get_minio_address(cls, doc_id=None, file_id=None): | |||
| if doc_id: | |||
| ids = File2DocumentService.get_by_document_id(doc_id) | |||
| else: | |||
| ids = File2DocumentService.get_by_file_id(file_id) | |||
| if ids: | |||
| e, file = FileService.get_by_id(ids[0].file_id) | |||
| return file.parent_id, file.location | |||
| else: | |||
| assert doc_id, "please specify doc_id" | |||
| e, doc = DocumentService.get_by_id(doc_id) | |||
| return doc.kb_id, doc.location | |||
| @@ -21,7 +21,6 @@ from api.db.db_models import DB, File2Document, Knowledgebase | |||
| from api.db.db_models import File, Document | |||
| from api.db.services.common_service import CommonService | |||
| from api.utils import get_uuid | |||
| from rag.utils import MINIO | |||
| class FileService(CommonService): | |||
| @@ -241,3 +240,4 @@ class FileService(CommonService): | |||
| dfs(folder_id) | |||
| return size | |||
| @@ -15,8 +15,8 @@ | |||
| # | |||
| import random | |||
| from peewee import Expression | |||
| from api.db.db_models import DB | |||
| from peewee import Expression, JOIN | |||
| from api.db.db_models import DB, File2Document, File | |||
| from api.db import StatusEnum, FileType, TaskStatus | |||
| from api.db.db_models import Task, Document, Knowledgebase, Tenant | |||
| from api.db.services.common_service import CommonService | |||
| @@ -75,8 +75,10 @@ class TaskService(CommonService): | |||
| @DB.connection_context() | |||
| def get_ongoing_doc_name(cls): | |||
| with DB.lock("get_task", -1): | |||
| docs = cls.model.select(*[Document.kb_id, Document.location]) \ | |||
| docs = cls.model.select(*[Document.id, Document.kb_id, Document.location, File.parent_id]) \ | |||
| .join(Document, on=(cls.model.doc_id == Document.id)) \ | |||
| .join(File2Document, on=(File2Document.document_id == Document.id), join_type=JOIN.LEFT_OUTER) \ | |||
| .join(File, on=(File2Document.file_id == File.id)) \ | |||
| .where( | |||
| Document.status == StatusEnum.VALID.value, | |||
| Document.run == TaskStatus.RUNNING.value, | |||
| @@ -88,7 +90,7 @@ class TaskService(CommonService): | |||
| docs = list(docs.dicts()) | |||
| if not docs: return [] | |||
| return list(set([(d["kb_id"], d["location"]) for d in docs])) | |||
| return list(set([(d["parent_id"] if d["parent_id"] else d["kb_id"], d["location"]) for d in docs])) | |||
| @classmethod | |||
| @DB.connection_context() | |||
| @@ -20,6 +20,8 @@ import random | |||
| from datetime import datetime | |||
| from api.db.db_models import Task | |||
| from api.db.db_utils import bulk_insert_into_db | |||
| from api.db.services.file2document_service import File2DocumentService | |||
| from api.db.services.file_service import FileService | |||
| from api.db.services.task_service import TaskService | |||
| from deepdoc.parser import PdfParser | |||
| from deepdoc.parser.excel_parser import HuExcelParser | |||
| @@ -87,10 +89,11 @@ def dispatch(): | |||
| tsks = [] | |||
| try: | |||
| file_bin = MINIO.get(r["kb_id"], r["location"]) | |||
| bucket, name = File2DocumentService.get_minio_address(doc_id=r["id"]) | |||
| file_bin = MINIO.get(bucket, name) | |||
| if REDIS_CONN.is_alive(): | |||
| try: | |||
| REDIS_CONN.set("{}/{}".format(r["kb_id"], r["location"]), file_bin, 12*60) | |||
| REDIS_CONN.set("{}/{}".format(bucket, name), file_bin, 12*60) | |||
| except Exception as e: | |||
| cron_logger.warning("Put into redis[EXCEPTION]:" + str(e)) | |||
| @@ -24,6 +24,8 @@ import sys | |||
| import time | |||
| import traceback | |||
| from functools import partial | |||
| from api.db.services.file2document_service import File2DocumentService | |||
| from rag.utils import MINIO | |||
| from api.db.db_models import close_connection | |||
| from rag.settings import database_logger | |||
| @@ -135,7 +137,8 @@ def build(row): | |||
| pool = Pool(processes=1) | |||
| try: | |||
| st = timer() | |||
| thr = pool.apply_async(get_minio_binary, args=(row["kb_id"], row["location"])) | |||
| bucket, name = File2DocumentService.get_minio_address(doc_id=row["doc_id"]) | |||
| thr = pool.apply_async(get_minio_binary, args=(bucket, name)) | |||
| binary = thr.get(timeout=90) | |||
| pool.terminate() | |||
| cron_logger.info( | |||