loguru多進程切分日志異常分析


測試代碼

# main.py
from mylog import log
import multiprocessing
import time, os, sys, datetime


def worker():
    while True:
        time.sleep(0.1)
        log.info(str(os.getpid()) + " --- " + "Logging...")
        #print(str(id(log)) + "\n")


if __name__ == "__main__":
    print(os.getpid())
    log.info("davy test.!")
    #print(str(id(log)))
    #fname = time.strftime('%Y%m%d%H%M', time.localtime())
    #print(fname)
    #sys.exit(0)

    multiprocessing.set_start_method("spawn")
    processes = [multiprocessing.Process(target=worker) for _ in range(4)]

    for p in processes:
        p.start()

    for p in processes:
        p.join()

mylog.py

import logging.handlers

from loguru import logger

import concurrenthandler


class mylog:
    def __new__(self):
        logger.remove(handler_id=None)
        log = logger
        # log.add(concurrenthandler.TimedRotatingFileHandler_MP("davy.log", 'D', 1, 0) )

        log.add("./logs/davy.log",
                rotation="10 KB", enqueue=False
                )

        return log

log = mylog()

主要調整main.py里的multiprocessing.set_start_method = fork / spawn (這倆其實差不多,不過 當Loguru的equeue=True的時候,不能顯式設置這個值了,不然會報錯。

loguru日志庫的_file_sink.py里面 Class FileSink中的write方法

if self._rotation_function is not None and self._rotation_function(message, self._file):
            print(os.getpid(), id(self._file),str(self._file.tell() + len(message)), self._file.name)
            self._terminate_file(is_rotating=True)
            print(self, self._file.fileno(), self._file.tell(), " rotate file end....\n")

測試 equeue=False時候, 每個子進程在log.info的時候,都會執行這個方法寫入文件,由於子進程的文件描述符是從父進程拷貝過來的,所以,都會寫入davy.log,是沒有問題的。

當寫滿10KB的時候, self._rotation_function方法校驗,就會返回True,_rotation_function = Rotation.rotation_size這個方法(因為loguru.add的時候,rotation設置的是 10KB,在初始化的時候,會選取這個方法 ,見 _make_rotation_function 函數的邏輯, 也在_file_sink.py中),然后執行 _terminate_file,(這里面會調用 os.rename davy.log --> davy.2022.01.01.11.12.333.log這中格式的備份文件,然后重新open davy.log,復制給self._file。

這里就會出現問題了,即便是假設日志寫入不頻繁,每個子進程都可以完美的不同時間執行log.info,我們看看會發生什么。(這里很容易測試, 在 write方法首尾加上文件鎖,就可以保證進程不會並發執行了)

# set_start_method = spawn

3878
3884 139935998440944 10072 /home/davy/PycharmProjects/logurudemo/logs/davy.log
qiefen le wenjian -- 0
<loguru._file_sink.FileSink object at 0x7f4563f4c340> 4 0 rotate file end....

3886 140586464327152 10072 /home/davy/PycharmProjects/logurudemo/logs/davy.log
qiefen le wenjian -- 0
<loguru._file_sink.FileSink object at 0x7fdcd6be8340> 4 0 rotate file end....

3887 139707968435696 10072 /home/davy/PycharmProjects/logurudemo/logs/davy.log
qiefen le wenjian -- 0
<loguru._file_sink.FileSink object at 0x7f104c4f0340> 4 0 rotate file end....

3885 140430168053232 10072 /home/davy/PycharmProjects/logurudemo/logs/davy.log
qiefen le wenjian -- 0
<loguru._file_sink.FileSink object at 0x7fb872c2a340> 4 0 rotate file end....

# set_start_menthod = fork

3927
3928 140395167977184 10038 /home/davy/PycharmProjects/logurudemo/logs/davy.log
qiefen le wenjian -- 0
<loguru._file_sink.FileSink object at 0x7fb04c14aeb0> 3 0 rotate file end....

3929 140395167977184 10038 /home/davy/PycharmProjects/logurudemo/logs/davy.log
qiefen le wenjian -- 0
<loguru._file_sink.FileSink object at 0x7fb04c14aeb0> 3 0 rotate file end....

3931 140395167977184 10038 /home/davy/PycharmProjects/logurudemo/logs/davy.log
qiefen le wenjian -- 0
<loguru._file_sink.FileSink object at 0x7fb04c14aeb0> 3 0 rotate file end....

3930 140395167977184 10038 /home/davy/PycharmProjects/logurudemo/logs/davy.log
qiefen le wenjian -- 0
<loguru._file_sink.FileSink object at 0x7fb04c14aeb0> 3 0 rotate file end....

上面是設置 spawn和fork 后,輸出的日志,就明顯可以看出fork和 spawn的區別了,fork明顯file和 filesink實例的地址各個子進程都一樣(這里的一樣沒什么關系,反正不同進程映射的物理地址也不一樣,只不過明顯能看出來,完全從父進程COPY過來而已,這里fork和spawn都會導致切分混亂,生成4個日志文件。),這里不管地址是否一樣,有一點是相同的,就是每個子進程打開的文件描述符,都對應着davy.log,是不會變的。

從代碼里,就能理解了,當第一個進程檢查size超標,然后切分日志,這里  執行os.rename(str,str),這里就會出現問題了,因為 rename只是改了名,i_node是不會變的,也就是說,只要打開過這個文件的文件描述符,依然是指向了改名后的文件。然后, Open新的davy.log,復制給 self._file,這樣子,這個進程的_file,就可以繼續寫入空白的davy.log了。

而其他進程的self._file,跟進程1沒有任何關系(即便是虛擬地址一樣,都沒有任何關系,fork由 copy_on_write功能,只要有修改,其實都會跟父進程割離了)

假設這里進程1完成了最后的self._file.write(message)后,第二個進程進來了,由於進程2的 self._file的文件描述符還是指向了原來的davy.log,所以,檢查文件大於10K,依然會執行一邊切分文件的動作(但是 os.rename卻是把 進程1生成的davy.log重命名成新的備份文件了),進程3/4,也是一樣的道理,所以這種情況,一定會發生切分混亂的。

同理,即便是用gunicorn之類的啟動flask或者fastapi之類的程序,肯定也是一樣的效果。

再說  enqueue=True的時候,代碼里,其實是靠multiprocessing.SimpleQueue 來實現的日志的讀取寫入的。

設置了enqueue=True,就不能設置set_start_method了,不然報錯。(有說multiprocessing模式fork)

調試觀察,就能知道,其實只是父進程提前啟動了一個線程去監聽這個queue,而子進程在log.info的時候,只是向這個queue里發送數據,然后由主進程做實際的write動作,那肯定不會導致切分異常的。

但是這種情況,如果我啟動兩個python進程去執行測試用例,那跟第一種情況,就沒什么區別了,而測試一下,結果的確如此,畢竟 multiprocessing的queue是通過內核queue或者pipe來實現的,所以只有父子進程可以通信,(命名管道之類的除外,那是別的進程間通訊方式了,loguru的實現,保證不了無關系進程之間的進程安全的)。

以上的理解

這時候,去理解uvicorn之類的,在啟動fastapi的時候, 在 loguru _handler.py里的初始化那里

# _hanlder.py 大概 85行左右
if self._enqueue:
            print(os.getpid(), " init equeue = true")
            self._queue = multiprocessing.SimpleQueue()
            self._confirmation_event = multiprocessing.Event()
            self._confirmation_lock = multiprocessing.Lock()
            self._owner_process_pid = os.getpid()
            self._thread = Thread(
                target=self._queued_writer, daemon=True, name="loguru-writer-%d" % self._id
            )
            self._thread.start()

執行 uvicorn fastdemo:app --workers=4 ,就能看到  print 打印了4次,跟真實的4個不同的進程初始化了loguru一樣,multiprocess.simplequeue是保證不了安全性的,都是個干個的,所以日志文件,在切分的時候,依然是執行了4此切分。

 

方法一:

如果要一定要保證切分的安全性,可以采用別人寫好的 logging.handlers,(應該有一個比較常用的並行concurrent_log的庫好像),不過loguru如果通過這種添加handlers的方式去執行的話,就沒啥意義了,切分動作,都委托給了handler了,自己也不能設置rotation,只能在引用的別人的handlers里設置,多此一舉的感覺,nb_log里有提到那幾個庫。

 

方法二:

不考慮性能,直接在 _file_sink.py的write里,首尾不但加文件鎖, 而且還要  close --》 open 一下 log文件,保證每次檢查的文件,都是重命名后的那個新文件,而不是自己一直保持的文件描述符對應的那個log文件。不過這種性能的確是效率頗低,不但有文件鎖,還會頻繁執行 close和open操作,針對不頻繁的日志場景還湊合。

 

方法三:

   可以看看nb_log如何實現的,不太想研究這個了,有時間再看吧。

 

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM