### What problem does this PR solve? ### Type of change - [x] Refactoringtags/v0.5.0
| @@ -16,6 +16,8 @@ | |||
| from peewee import Expression | |||
| from elasticsearch_dsl import Q | |||
| from api.utils import current_timestamp | |||
| from rag.utils.es_conn import ELASTICSEARCH | |||
| from rag.utils.minio_conn import MINIO | |||
| from rag.nlp import search | |||
| @@ -90,7 +92,7 @@ class DocumentService(CommonService): | |||
| @classmethod | |||
| @DB.connection_context() | |||
| def get_newly_uploaded(cls, tm, mod=0, comm=1, items_per_page=64): | |||
| def get_newly_uploaded(cls, tm): | |||
| fields = [ | |||
| cls.model.id, | |||
| cls.model.kb_id, | |||
| @@ -112,11 +114,9 @@ class DocumentService(CommonService): | |||
| cls.model.status == StatusEnum.VALID.value, | |||
| ~(cls.model.type == FileType.VIRTUAL.value), | |||
| cls.model.progress == 0, | |||
| cls.model.update_time >= tm, | |||
| cls.model.run == TaskStatus.RUNNING.value, | |||
| (Expression(cls.model.create_time, "%%", comm) == mod))\ | |||
| .order_by(cls.model.update_time.asc())\ | |||
| .paginate(1, items_per_page) | |||
| cls.model.update_time >= current_timestamp() - 1000 * 600, | |||
| cls.model.run == TaskStatus.RUNNING.value)\ | |||
| .order_by(cls.model.update_time.asc()) | |||
| return list(docs.dicts()) | |||
| @classmethod | |||
| @@ -90,12 +90,6 @@ def dispatch(): | |||
| try: | |||
| bucket, name = File2DocumentService.get_minio_address(doc_id=r["id"]) | |||
| file_bin = MINIO.get(bucket, name) | |||
| if REDIS_CONN.is_alive(): | |||
| try: | |||
| REDIS_CONN.set("{}/{}".format(bucket, name), file_bin, 12*60) | |||
| except Exception as e: | |||
| cron_logger.warning("Put into redis[EXCEPTION]:" + str(e)) | |||
| if r["type"] == FileType.PDF.value: | |||
| do_layout = r["parser_config"].get("layout_recognize", True) | |||
| pages = PdfParser.total_page_number(r["name"], file_bin) | |||
| @@ -107,18 +107,6 @@ def collect(comm, mod, tm): | |||
| def get_minio_binary(bucket, name): | |||
| global MINIO | |||
| if REDIS_CONN.is_alive(): | |||
| try: | |||
| for _ in range(30): | |||
| if REDIS_CONN.exist("{}/{}".format(bucket, name)): | |||
| time.sleep(1) | |||
| break | |||
| time.sleep(1) | |||
| r = REDIS_CONN.get("{}/{}".format(bucket, name)) | |||
| if r: return r | |||
| cron_logger.warning("Cache missing: {}".format(name)) | |||
| except Exception as e: | |||
| cron_logger.warning("Get redis[EXCEPTION]:" + str(e)) | |||
| return MINIO.get(bucket, name) | |||