这个多进程切片安全的python按时间切割文件。
官方的 TimedRotatingFileHandler 在多进程下疯狂报错,
不信的话可以试试官方 TimedRotatingFileHandler 多进程写入文件日志,设置成每秒换一个新的文件写(主要是按天来切割要耽误很长的时间才能观察错误)
此日志handler采用批量聚合每隔1秒写入,在超高速写入时候,写入速度远超官方。
import os import time from pathlib import Path import queue import re import atexit from threading import Lock, Thread from nb_filelock import FileLock import logging # noinspection PyUnresolvedReferences from logging import LogRecord, FileHandler # noinspection PyPep8Naming class ConcurrentDayRotatingFileHandler(logging.Handler): """ 这个多进程切片安全的。 官方的 TimedRotatingFileHandler 在多进程下疯狂报错, 不信的话可以试试官方 TimedRotatingFileHandler 多进程写入文件日志,设置成每秒换一个新的文件写(主要是按天来切割要耽误很长的时间才能观察错误) """ file_handler_list = [] has_start_emit_all_file_handler_process_id_set = set() # 这个linux和windwos都兼容,windwos是多进程每个进程的变量has_start_emit_all_file_handler是独立的。linux是共享的。 __lock_for_rotate = Lock() @classmethod def _emit_all_file_handler(cls): while True: for hr in cls.file_handler_list: # very_nb_print(hr.buffer_msgs_queue.qsize()) # noinspection PyProtectedMember hr._write_to_file() time.sleep(1) # 每隔一秒钟批量写入一次,性能好了很多。 @classmethod def _start_emit_all_file_handler(cls): pass Thread(target=cls._emit_all_file_handler, daemon=True).start() # noinspection PyMissingConstructor def __init__(self, file_name: str, file_path: str, back_count=10): super().__init__() self.file_name = file_name self.file_path = file_path self.backupCount = back_count self.extMatch = re.compile(r"^\d{4}-\d{2}-\d{2}(\.\w+)?$", re.ASCII) self.extMatch2 = re.compile(r"^\d{2}-\d{2}-\d{2}(\.\w+)?$", re.ASCII) self.buffer_msgs_queue = queue.Queue() atexit.register(self._write_to_file) # 如果程序属于立马就能结束的,需要在程序结束前执行这个钩子,防止不到最后一秒的日志没记录到。 self.file_handler_list.append(self) if os.getpid() not in self.has_start_emit_all_file_handler_process_id_set: self._start_emit_all_file_handler() self.__class__.has_start_emit_all_file_handler_process_id_set.add(os.getpid()) def emit(self, record: LogRecord): """ emit已经在logger的handle方法中加了锁,所以这里的重置上次写入时间和清除buffer_msgs不需要加锁了。 :param record: :return: """ # noinspection PyBroadException try: msg = self.format(record) self.buffer_msgs_queue.put(msg) except Exception: self.handleError(record) def _write_to_file(self): buffer_msgs = '' while True: try: msg = self.buffer_msgs_queue.get(block=False) buffer_msgs += msg + '\n' except queue.Empty: break if buffer_msgs: time_str = time.strftime('%Y-%m-%d') # time_str = time.strftime('%H-%M-%S') # 方便测试用的,方便观察。 new_file_name = self.file_name + '.' + time_str path_obj = Path(self.file_path) / Path(new_file_name) path_obj.touch(exist_ok=True) with path_obj.open(mode='a') as f: f.write(buffer_msgs) with FileLock(self.file_path / Path(f'_delete_{self.file_name}.lock')): self._find_and_delete_files() def _find_and_delete_files(self): """ 这一段命名不规范是复制原来的官方旧代码。 Determine the files to delete when rolling over. More specific than the earlier method, which just used glob.glob(). """ dirName = self.file_path baseName = self.file_name fileNames = os.listdir(dirName) result = [] prefix = baseName + "." plen = len(prefix) for fileName in fileNames: if fileName[:plen] == prefix: suffix = fileName[plen:] # print(fileName, prefix,suffix) if self.extMatch.match(suffix) or self.extMatch2.match(suffix): result.append(os.path.join(dirName, fileName)) if len(result) < self.backupCount: result = [] else: result.sort() result = result[:len(result) - self.backupCount] # print(result) for r in result: Path(r).unlink() from concurrent.futures import ProcessPoolExecutor from logging.handlers import TimedRotatingFileHandler logger = logging.getLogger('lala') file_handler = ConcurrentDayRotatingFileHandler('test_my_cd.log', file_path='/pythonlogs') # file_handler = FileHandler('/pythonlogs/test_fhh.log') # 对比官方测试性能 # file_handler = TimedRotatingFileHandler('/pythonlogs/test_fhh.log') # 对比官方测试性能 file_handler.setFormatter(logging.Formatter('[%(asctime)s] - %(filename)s] - %(levelname)s: %(message)s')) logger.addHandler(file_handler) pool = ProcessPoolExecutor(10) def fun(x): for i in range(100000): # time.sleep(0.2) logger.warning(f"{x} {i}") if __name__ == '__main__': print('开始', time.strftime('%H_%M_%S')) for j in range(10): pool.submit(fun, j) pool.shutdown() print('结束', time.strftime('%H_%M_%S'))