|
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313 |
- #
- # Copyright 2024 The InfiniFlow Authors. All Rights Reserved.
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- #
- import os
- import typing
- import traceback
- import logging
- import inspect
- from logging.handlers import TimedRotatingFileHandler
- from threading import RLock
-
- from api.utils import file_utils
-
-
- class LoggerFactory(object):
- TYPE = "FILE"
- LOG_FORMAT = "[%(levelname)s] [%(asctime)s] [%(module)s.%(funcName)s] [line:%(lineno)d]: %(message)s"
- logging.basicConfig(format=LOG_FORMAT)
- LEVEL = logging.DEBUG
- logger_dict = {}
- global_handler_dict = {}
-
- LOG_DIR = None
- PARENT_LOG_DIR = None
- log_share = True
-
- append_to_parent_log = None
-
- lock = RLock()
- # CRITICAL = 50
- # FATAL = CRITICAL
- # ERROR = 40
- # WARNING = 30
- # WARN = WARNING
- # INFO = 20
- # DEBUG = 10
- # NOTSET = 0
- levels = (10, 20, 30, 40)
- schedule_logger_dict = {}
-
- @staticmethod
- def set_directory(directory=None, parent_log_dir=None,
- append_to_parent_log=None, force=False):
- if parent_log_dir:
- LoggerFactory.PARENT_LOG_DIR = parent_log_dir
- if append_to_parent_log:
- LoggerFactory.append_to_parent_log = append_to_parent_log
- with LoggerFactory.lock:
- if not directory:
- directory = file_utils.get_project_base_directory("logs")
- if not LoggerFactory.LOG_DIR or force:
- LoggerFactory.LOG_DIR = directory
- if LoggerFactory.log_share:
- oldmask = os.umask(000)
- os.makedirs(LoggerFactory.LOG_DIR, exist_ok=True)
- os.umask(oldmask)
- else:
- os.makedirs(LoggerFactory.LOG_DIR, exist_ok=True)
- for loggerName, ghandler in LoggerFactory.global_handler_dict.items():
- for className, (logger,
- handler) in LoggerFactory.logger_dict.items():
- logger.removeHandler(ghandler)
- ghandler.close()
- LoggerFactory.global_handler_dict = {}
- for className, (logger,
- handler) in LoggerFactory.logger_dict.items():
- logger.removeHandler(handler)
- _handler = None
- if handler:
- handler.close()
- if className != "default":
- _handler = LoggerFactory.get_handler(className)
- logger.addHandler(_handler)
- LoggerFactory.assemble_global_handler(logger)
- LoggerFactory.logger_dict[className] = logger, _handler
-
- @staticmethod
- def new_logger(name):
- logger = logging.getLogger(name)
- logger.propagate = False
- logger.setLevel(LoggerFactory.LEVEL)
- return logger
-
- @staticmethod
- def get_logger(class_name=None):
- with LoggerFactory.lock:
- if class_name in LoggerFactory.logger_dict.keys():
- logger, handler = LoggerFactory.logger_dict[class_name]
- if not logger:
- logger, handler = LoggerFactory.init_logger(class_name)
- else:
- logger, handler = LoggerFactory.init_logger(class_name)
- return logger
-
- @staticmethod
- def get_global_handler(logger_name, level=None, log_dir=None):
- if not LoggerFactory.LOG_DIR:
- return logging.StreamHandler()
- if log_dir:
- logger_name_key = logger_name + "_" + log_dir
- else:
- logger_name_key = logger_name + "_" + LoggerFactory.LOG_DIR
- # if loggerName not in LoggerFactory.globalHandlerDict:
- if logger_name_key not in LoggerFactory.global_handler_dict:
- with LoggerFactory.lock:
- if logger_name_key not in LoggerFactory.global_handler_dict:
- handler = LoggerFactory.get_handler(
- logger_name, level, log_dir)
- LoggerFactory.global_handler_dict[logger_name_key] = handler
- return LoggerFactory.global_handler_dict[logger_name_key]
-
- @staticmethod
- def get_handler(class_name, level=None, log_dir=None,
- log_type=None, job_id=None):
- if not log_type:
- if not LoggerFactory.LOG_DIR or not class_name:
- return logging.StreamHandler()
- # return Diy_StreamHandler()
-
- if not log_dir:
- log_file = os.path.join(
- LoggerFactory.LOG_DIR,
- "{}.log".format(class_name))
- else:
- log_file = os.path.join(log_dir, "{}.log".format(class_name))
- else:
- log_file = os.path.join(log_dir, "rag_flow_{}.log".format(
- log_type) if level == LoggerFactory.LEVEL else 'rag_flow_{}_error.log'.format(log_type))
-
- os.makedirs(os.path.dirname(log_file), exist_ok=True)
- if LoggerFactory.log_share:
- handler = ROpenHandler(log_file,
- when='D',
- interval=1,
- backupCount=14,
- delay=True)
- else:
- handler = TimedRotatingFileHandler(log_file,
- when='D',
- interval=1,
- backupCount=14,
- delay=True)
- if level:
- handler.level = level
-
- return handler
-
- @staticmethod
- def init_logger(class_name):
- with LoggerFactory.lock:
- logger = LoggerFactory.new_logger(class_name)
- handler = None
- if class_name:
- handler = LoggerFactory.get_handler(class_name)
- logger.addHandler(handler)
- LoggerFactory.logger_dict[class_name] = logger, handler
-
- else:
- LoggerFactory.logger_dict["default"] = logger, handler
-
- LoggerFactory.assemble_global_handler(logger)
- return logger, handler
-
- @staticmethod
- def assemble_global_handler(logger):
- if LoggerFactory.LOG_DIR:
- for level in LoggerFactory.levels:
- if level >= LoggerFactory.LEVEL:
- level_logger_name = logging._levelToName[level]
- logger.addHandler(
- LoggerFactory.get_global_handler(
- level_logger_name, level))
- if LoggerFactory.append_to_parent_log and LoggerFactory.PARENT_LOG_DIR:
- for level in LoggerFactory.levels:
- if level >= LoggerFactory.LEVEL:
- level_logger_name = logging._levelToName[level]
- logger.addHandler(
- LoggerFactory.get_global_handler(level_logger_name, level, LoggerFactory.PARENT_LOG_DIR))
-
-
- def setDirectory(directory=None):
- LoggerFactory.set_directory(directory)
-
-
- def setLevel(level):
- LoggerFactory.LEVEL = level
-
-
- def getLogger(className=None, useLevelFile=False):
- if className is None:
- frame = inspect.stack()[1]
- module = inspect.getmodule(frame[0])
- className = 'stat'
- return LoggerFactory.get_logger(className)
-
-
- def exception_to_trace_string(ex):
- return "".join(traceback.TracebackException.from_exception(ex).format())
-
-
- class ROpenHandler(TimedRotatingFileHandler):
- def _open(self):
- prevumask = os.umask(000)
- rtv = TimedRotatingFileHandler._open(self)
- os.umask(prevumask)
- return rtv
-
-
- def sql_logger(job_id='', log_type='sql'):
- key = job_id + log_type
- if key in LoggerFactory.schedule_logger_dict.keys():
- return LoggerFactory.schedule_logger_dict[key]
- return get_job_logger(job_id=job_id, log_type=log_type)
-
-
- def ready_log(msg, job=None, task=None, role=None, party_id=None, detail=None):
- prefix, suffix = base_msg(job, task, role, party_id, detail)
- return f"{prefix}{msg} ready{suffix}"
-
-
- def start_log(msg, job=None, task=None, role=None, party_id=None, detail=None):
- prefix, suffix = base_msg(job, task, role, party_id, detail)
- return f"{prefix}start to {msg}{suffix}"
-
-
- def successful_log(msg, job=None, task=None, role=None,
- party_id=None, detail=None):
- prefix, suffix = base_msg(job, task, role, party_id, detail)
- return f"{prefix}{msg} successfully{suffix}"
-
-
- def warning_log(msg, job=None, task=None, role=None,
- party_id=None, detail=None):
- prefix, suffix = base_msg(job, task, role, party_id, detail)
- return f"{prefix}{msg} is not effective{suffix}"
-
-
- def failed_log(msg, job=None, task=None, role=None,
- party_id=None, detail=None):
- prefix, suffix = base_msg(job, task, role, party_id, detail)
- return f"{prefix}failed to {msg}{suffix}"
-
-
- def base_msg(job=None, task=None, role: str = None,
- party_id: typing.Union[str, int] = None, detail=None):
- if detail:
- detail_msg = f" detail: \n{detail}"
- else:
- detail_msg = ""
- if task is not None:
- return f"task {task.f_task_id} {task.f_task_version} ", f" on {task.f_role} {task.f_party_id}{detail_msg}"
- elif job is not None:
- return "", f" on {job.f_role} {job.f_party_id}{detail_msg}"
- elif role and party_id:
- return "", f" on {role} {party_id}{detail_msg}"
- else:
- return "", f"{detail_msg}"
-
-
- def exception_to_trace_string(ex):
- return "".join(traceback.TracebackException.from_exception(ex).format())
-
-
- def get_logger_base_dir():
- job_log_dir = file_utils.get_rag_flow_directory('logs')
- return job_log_dir
-
-
- def get_job_logger(job_id, log_type):
- rag_flow_log_dir = file_utils.get_rag_flow_directory('logs', 'rag_flow')
- job_log_dir = file_utils.get_rag_flow_directory('logs', job_id)
- if not job_id:
- log_dirs = [rag_flow_log_dir]
- else:
- if log_type == 'audit':
- log_dirs = [job_log_dir, rag_flow_log_dir]
- else:
- log_dirs = [job_log_dir]
- if LoggerFactory.log_share:
- oldmask = os.umask(000)
- os.makedirs(job_log_dir, exist_ok=True)
- os.makedirs(rag_flow_log_dir, exist_ok=True)
- os.umask(oldmask)
- else:
- os.makedirs(job_log_dir, exist_ok=True)
- os.makedirs(rag_flow_log_dir, exist_ok=True)
- logger = LoggerFactory.new_logger(f"{job_id}_{log_type}")
- for job_log_dir in log_dirs:
- handler = LoggerFactory.get_handler(class_name=None, level=LoggerFactory.LEVEL,
- log_dir=job_log_dir, log_type=log_type, job_id=job_id)
- error_handler = LoggerFactory.get_handler(
- class_name=None,
- level=logging.ERROR,
- log_dir=job_log_dir,
- log_type=log_type,
- job_id=job_id)
- logger.addHandler(handler)
- logger.addHandler(error_handler)
- with LoggerFactory.lock:
- LoggerFactory.schedule_logger_dict[job_id + log_type] = logger
- return logger
|