大家一般都用logging日志打印,但logging是線程安全的,多進程也有很多介紹,引入一些文件鎖,對logging做好配置,能過支持。
但通過測試,發現多進程時還是容易出現重復寫入文件或者打印正常漏寫入文件的問題。
我的日志需求比較簡單,能夠區分文件,正確的寫入日志文件。
引入文件鎖;日志寫入函數封裝到一個操作_Logger類中; 日志名稱和寫入級別封裝到一個業務類Logger中。
本范例基於python3實現。本范例20個進程並發,分別寫入3個文件,每s每個文件寫入超過100行數據,日志文件中沒有數據冗余,也沒有數據遺漏。
# -*-coding:utf-8-*-
"""
Author:yinshunyao
Date:2017/3/5 0005下午 10:50
"""
# import logging
import os
import time
# 利用第三方系統鎖實現文件鎖定和解鎖
if os.name == 'nt':
import win32con, win32file, pywintypes
LOCK_EX = win32con.LOCKFILE_EXCLUSIVE_LOCK
LOCK_SH = 0 # The default value
LOCK_NB = win32con.LOCKFILE_FAIL_IMMEDIATELY
__overlapped = pywintypes.OVERLAPPED()
def lock(file, flags):
hfile = win32file._get_osfhandle(file.fileno())
win32file.LockFileEx(hfile, flags, 0, 0xffff0000, __overlapped)
def unlock(file):
hfile = win32file._get_osfhandle(file.fileno())
win32file.UnlockFileEx(hfile, 0, 0xffff0000, __overlapped)
elif os.name == 'posix':
from fcntl import LOCK_EX
def lock(file, flags):
fcntl.flock(file.fileno(), flags)
def unlock(file):
fcntl.flock(file.fileno(), fcntl.LOCK_UN)
else:
raise RuntimeError("File Locker only support NT and Posix platforms!")
class _Logger:
file_path = ''
#初始化日志路徑
@staticmethod
def init():
if not _Logger.file_path:
_Logger.file_path = '%s/Log' % os.path.abspath(os.path.dirname(__file__))
return True
@staticmethod
def _write(messge, file_name):
if not messge:
return True
messge = messge.replace('\t', ',')
file = '{}/{}'.format(_Logger.file_path, file_name)
while True:
try:
f = open(file, 'a+')
lock(f, LOCK_EX)
break
except:
time.sleep(0.01)
continue
# 確保緩沖區內容寫入到文件
while True:
try:
f.write(messge + '\n')
f.flush()
break
except:
time.sleep(0.01)
continue
while True:
try:
unlock(f)
f.close()
return True
except:
time.sleep(0.01)
continue
@staticmethod
def write(message, file_name, only_print=False):
if not _Logger.init(): return
print(message)
if not only_print:
_Logger._write(message, file_name)
class Logger:
def __init__(self, logger_name, file_name=''):
self.logger_name = logger_name
self.file_name = file_name
# 根據消息級別,自定義格式,生成消息
def _build_message(self, message, level):
try:
return '[%s]\t[%5s]\t[%8s]\t%s' \
% (time.strftime('%Y-%m-%d %H:%M:%S'), level, self.logger_name, message)
except Exception as e:
print('解析日志消息異常:{}'.format(e))
return ''
def warning(self, message):
_Logger.write(self._build_message(message, 'WARN'), self.file_name)
def warn(self, message):
_Logger.write(self._build_message(message, 'WARN'), self.file_name)
def error(self, message):
_Logger.write(self._build_message(message, 'ERROR'), self.file_name)
def info(self, message):
_Logger.write(self._build_message(message, 'INFO'), self.file_name, True)
def debug(self, message):
_Logger.write(self._build_message(message, 'DEBUG'), self.file_name)
# 循環打印日志測試函數
def _print_test(count):
logger = Logger(logger_name='test{}'.format(count), file_name='test{}'.format(count % 3))
key = 0
while True:
key += 1
# print('{}-{}'.format(logger, key))
logger.debug('%d' % key)
logger.error('%d' % key)
if __name__ == '__main__':
from multiprocessing import Pool, freeze_support
freeze_support()
# 進程池進行測試
pool = Pool(processes=20)
count = 0
while count < 20:
count += 1
pool.apply_async(func=_print_test, args=(count,))
else:
pool.close()
pool.join()