python 多進程下按天分割日志handler。ConcurrentDayRotatingFileHandler


這個多進程切片安全的python按時間切割文件。
官方的 TimedRotatingFileHandler 在多進程下瘋狂報錯,
不信的話可以試試官方 TimedRotatingFileHandler 多進程寫入文件日志,設置成每秒換一個新的文件寫(主要是按天來切割要耽誤很長的時間才能觀察錯誤)




此日志handler采用批量聚合每隔1秒寫入,在超高速寫入時候,寫入速度遠超官方。

import os
import time
from pathlib import Path
import queue
import re
import atexit
from threading import Lock, Thread
from nb_filelock import FileLock
import logging
# noinspection PyUnresolvedReferences
from logging import LogRecord, FileHandler


# noinspection PyPep8Naming
class ConcurrentDayRotatingFileHandler(logging.Handler):
    """
    這個多進程切片安全的。
    官方的 TimedRotatingFileHandler 在多進程下瘋狂報錯,
    不信的話可以試試官方 TimedRotatingFileHandler 多進程寫入文件日志,設置成每秒換一個新的文件寫(主要是按天來切割要耽誤很長的時間才能觀察錯誤)
    """
    file_handler_list = []
    has_start_emit_all_file_handler_process_id_set = set()  # 這個linux和windwos都兼容,windwos是多進程每個進程的變量has_start_emit_all_file_handler是獨立的。linux是共享的。
    __lock_for_rotate = Lock()

    @classmethod
    def _emit_all_file_handler(cls):
        while True:
            for hr in cls.file_handler_list:
                # very_nb_print(hr.buffer_msgs_queue.qsize())
                # noinspection PyProtectedMember
                hr._write_to_file()
            time.sleep(1)  # 每隔一秒鍾批量寫入一次,性能好了很多。

    @classmethod
    def _start_emit_all_file_handler(cls):
        pass
        Thread(target=cls._emit_all_file_handler, daemon=True).start()

    # noinspection PyMissingConstructor
    def __init__(self, file_name: str, file_path: str, back_count=10):
        super().__init__()
        self.file_name = file_name
        self.file_path = file_path
        self.backupCount = back_count
        self.extMatch = re.compile(r"^\d{4}-\d{2}-\d{2}(\.\w+)?$", re.ASCII)
        self.extMatch2 = re.compile(r"^\d{2}-\d{2}-\d{2}(\.\w+)?$", re.ASCII)

        self.buffer_msgs_queue = queue.Queue()
        atexit.register(self._write_to_file)  # 如果程序屬於立馬就能結束的,需要在程序結束前執行這個鈎子,防止不到最后一秒的日志沒記錄到。
        self.file_handler_list.append(self)
        if os.getpid() not in self.has_start_emit_all_file_handler_process_id_set:
            self._start_emit_all_file_handler()
            self.__class__.has_start_emit_all_file_handler_process_id_set.add(os.getpid())

    def emit(self, record: LogRecord):
        """
        emit已經在logger的handle方法中加了鎖,所以這里的重置上次寫入時間和清除buffer_msgs不需要加鎖了。
        :param record:
        :return:
        """
        # noinspection PyBroadException
        try:
            msg = self.format(record)
            self.buffer_msgs_queue.put(msg)
        except Exception:
            self.handleError(record)

    def _write_to_file(self):
        buffer_msgs = ''
        while True:
            try:
                msg = self.buffer_msgs_queue.get(block=False)
                buffer_msgs += msg + '\n'
            except queue.Empty:
                break
        if buffer_msgs:
            time_str = time.strftime('%Y-%m-%d')
            # time_str = time.strftime('%H-%M-%S')  # 方便測試用的,方便觀察。
            new_file_name = self.file_name + '.' + time_str
            path_obj = Path(self.file_path) / Path(new_file_name)
            path_obj.touch(exist_ok=True)
            with path_obj.open(mode='a') as f:
                f.write(buffer_msgs)
            with FileLock(self.file_path / Path(f'_delete_{self.file_name}.lock')):
                self._find_and_delete_files()

    def _find_and_delete_files(self):
        """
        這一段命名不規范是復制原來的官方舊代碼。
        Determine the files to delete when rolling over.

        More specific than the earlier method, which just used glob.glob().
        """
        dirName = self.file_path
        baseName = self.file_name
        fileNames = os.listdir(dirName)
        result = []
        prefix = baseName + "."
        plen = len(prefix)
        for fileName in fileNames:
            if fileName[:plen] == prefix:
                suffix = fileName[plen:]
                # print(fileName, prefix,suffix)
                if self.extMatch.match(suffix) or self.extMatch2.match(suffix):
                    result.append(os.path.join(dirName, fileName))
        if len(result) < self.backupCount:
            result = []
        else:
            result.sort()
            result = result[:len(result) - self.backupCount]
        # print(result)
        for r in result:
            Path(r).unlink()


from concurrent.futures import ProcessPoolExecutor
from logging.handlers import TimedRotatingFileHandler

logger = logging.getLogger('lala')
file_handler = ConcurrentDayRotatingFileHandler('test_my_cd.log', file_path='/pythonlogs')
# file_handler = FileHandler('/pythonlogs/test_fhh.log')  # 對比官方測試性能
# file_handler = TimedRotatingFileHandler('/pythonlogs/test_fhh.log') # 對比官方測試性能
file_handler.setFormatter(logging.Formatter('[%(asctime)s] - %(filename)s] - %(levelname)s: %(message)s'))
logger.addHandler(file_handler)

pool = ProcessPoolExecutor(10)


def fun(x):
    for i in range(100000):
        # time.sleep(0.2)
        logger.warning(f"{x} {i}")


if __name__ == '__main__':
    print('開始', time.strftime('%H_%M_%S'))
    for j in range(10):
        pool.submit(fun, j)
    pool.shutdown()
    print('結束', time.strftime('%H_%M_%S'))

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM