gunicorn Arbiter 源碼解析


  如 前文所述,Arbiter是gunicorn master進程的核心。Arbiter主要負責管理worker進程,包括啟動、監控、殺掉Worker進程;同時,Arbiter在某些信號發生的時候還可以熱更新(reload)App應用,或者在線升級gunicorn。Arbiter的核心代碼在一個文件里面,代碼量也不大,源碼在此: https://github.com/benoitc/gunicorn
  
Arbiter主要有以下方法:
setup:
    處理配置項,最重要的是worker數量和worker工作模型
 
init_signal
    注冊信號處理函數
 
handle_xxx:
    各個信號具體的處理函數
 
kill_worker,kill_workers:
    向worker進程發信號
 
spawn_worker, spawn_workers:
    fork出新的worker進程
 
murder_workers:
    殺掉一段時間內未響應的worker進程
 
manage_workers:
    根據配置文件的worker數量,以及當前active的worker數量,決定是要fork還是kill worker進程
 
reexec
    接收到信號 SIGUSR2調用,在線升級gunicorn
 
reload:
    接收到信號 SIGHUP調用,會根據新的配置新啟動worker進程,並殺掉之前的worker進程
 
sleep
    在沒有信號處理的時候,利用select的timeout進行sleep,可被喚醒
 
wakeup
    通過向管道寫消息,喚醒進程
 
run
    主循環
 
  Arbiter真正被其他代碼(Application)調用的函數只有__init__和run方法,在一句代碼里:
    Arbiter(self).run()
  上面代碼中的self即為Application實例,其中__init__調用setup進行配置項設置。下面是run方法偽代碼
def run()
    self.init_signal()
    self.LISTENERS = create_sockets(self.cfg, self.log)
    self.manage_workers()
    while True:
        if no signal in SIG_QUEUE
            self.sleep()
        else:
            handle_signal()

 

 關於fork子進程
  fork子進程的代碼在 spawn_worker, 源碼如下:
  
 1     def spawn_worker(self):
 2         self.worker_age += 1
 3         worker = self.worker_class(self.worker_age, self.pid, self.LISTENERS,
 4                                    self.app, self.timeout / 2.0,
 5                                    self.cfg, self.log)
 6         self.cfg.pre_fork(self, worker)
 7         pid = os.fork()
 8         if pid != 0:
 9             self.WORKERS[pid] = worker
10             return pid
11 
12         # Process Child
13         worker_pid = os.getpid()
14         try:
15             util._setproctitle("worker [%s]" % self.proc_name)
16             self.log.info("Booting worker with pid: %s", worker_pid)
17             self.cfg.post_fork(self, worker)
18             worker.init_process()
19             sys.exit(0)
20         except SystemExit:
21             raise
22         except AppImportError as e:
23             self.log.debug("Exception while loading the application",
24                            exc_info=True)
25             print("%s" % e, file=sys.stderr)
26             sys.stderr.flush()
27             sys.exit(self.APP_LOAD_ERROR)
28         except:
29             self.log.exception("Exception in worker process"),
30             if not worker.booted:
31                 sys.exit(self.WORKER_BOOT_ERROR)
32             sys.exit(-1)
33         finally:
34             self.log.info("Worker exiting (pid: %s)", worker_pid)
35             try:
36                 worker.tmp.close()
37                 self.cfg.worker_exit(self, worker)
38             except:
39                 self.log.warning("Exception during worker exit:\n%s",
40                                   traceback.format_exc())
Arbiter.spawn_worker
  主要流程:
    (1)加載worker_class並實例化(默認為同步模型 SyncWorker)
    (2)父進程(master進程)fork之后return,之后的邏輯都在子進程中運行
    (3)調用worker.init_process 進入循環,worker的所有工作都在這個循環中
    (4)循環結束之后,調用sys.exit(0)
    (5)最后,在finally中,記錄worker進程的退出
    
    下面是我自己寫的一點代碼,把主要的fork流程簡化了一下
 1 # prefork.py
 2 import sys
 3 import socket
 4 import select
 5 import os
 6 import time
 7  
 8 def do_sub_process():
 9     pid = os.fork()
10     if pid < 0:
11         print 'fork error'
12         sys.exit(-1)
13     elif pid > 0:
14         print 'fork sub process %d'  % pid
15         return
16  
17     # must be child process
18     time.sleep(1)
19     print 'sub process will exit', os.getpid(), os.getppid()
20     sys.exit(0)
21  
22 def main():
23     sub_num = 2
24     for i in range(sub_num):
25         do_sub_process()
26     time.sleep(10)
27     print 'main process will exit', os.getpid()
28  
29 if __name__ == '__main__':
30     main()
在測試環境下輸出:
  fork sub process 9601
  fork sub process 9602
  sub process will exit 9601 9600
  sub process will exit 9602 9600
  main process will exit 9600
 
  需要注意的是第20行調用了 sys.exit, 保證子進程的結束,否則會繼續main函數中for循環,以及之后的邏輯。注釋掉第19行重新運行,看輸出就明白了。
 
關於kill子進程
  master進程要kill worker進程就很簡單了,直接發信號,源碼如下:
  
 1     def kill_worker(self, pid, sig):
 2         """\
 3         Kill a worker
 4 
 5         :attr pid: int, worker pid
 6         :attr sig: `signal.SIG*` value
 7          """
 8         try:
 9             os.kill(pid, sig)
10         except OSError as e:
11             if e.errno == errno.ESRCH:
12                 try:
13                     worker = self.WORKERS.pop(pid)
14                     worker.tmp.close()
15                     self.cfg.worker_exit(self, worker)
16                     return
17                 except (KeyError, OSError):
18                     return
19             raise

 

 
關於sleep與wakeup
  我們再來看看Arbiter的sleep和wakeup。Arbiter在沒有信號需要處理的時候會"sleep",當然,不是真正調用time.sleep,否則信號來了也不能第一時間處理。這里得實現比較巧妙,利用了管道和select的timeout。看代碼就知道了
        def sleep(self):
        """\
        Sleep until PIPE is readable or we timeout.
        A readable PIPE means a signal occurred.
        """
            ready = select.select([self.PIPE[0]], [], [], 1.0) # self.PIPE = os.pipe()
            if not ready[0]: 
                return
            while os.read(self.PIPE[0], 1):
                pass

  代碼里面的注釋寫得非常清楚,要么PIPE可讀立即返回,要么等待超時。管道可讀是因為有信號發生。這里看看pipe函數

  os. pipe()

Create a pipe. Return a pair of file descriptors (r,w) usable for reading and writing, respectively.

 
  那我們看一下什么時候管道可讀:肯定是往管道寫入的東西,這就是wakeup函數的功能
        def wakeup(self):
            """
            Wake up the arbiter by writing to the PIPE
            """
            os.write(self.PIPE[1], b'.')

 

最后附上Arbiter的信號處理

  • QUITINT: Quick shutdown
  • TERM: Graceful shutdown. Waits for workers to finish their current requests up to the graceful timeout.
  • HUP: Reload the configuration, start the new worker processes with a new configuration and gracefully shutdown older workers. If the application is not preloaded (using the --preloadoption), Gunicorn will also load the new version.
  • TTIN: Increment the number of processes by one
  • TTOU: Decrement the number of processes by one
  • USR1: Reopen the log files
  • USR2Upgrade the Gunicorn on the fly. A separate TERM signal should be used to kill the old process. This signal can also be used to use the new versions of pre-loaded applications.
  • WINCH: Gracefully shutdown the worker processes when Gunicorn is daemonized.
 
 
reference:


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM