gunicorn geventworker 解析


  在前面的文章曾介紹過gunicorn的syncworker,本文介紹其中一種asyncworker:GeventWorker。類圖如下:

 
  可見GeventWorker重載了init_process(這個方法是Worker唯一暴露給外界的接口),源碼如下:
 1         def init_process(self):
 2             # monkey patch here
 3             self.patch()
 4 
 5             # reinit the hub
 6             from gevent import hub
 7             hub.reinit()
 8 
 9             # then initialize the process
10             super(GeventWorker, self).init_process()

  首先是monkey-patch,這個是使用gevent所必須的的;然后調用hub.reinit,這個在fork子進程之后必須調用;最后調用基類的init_process。關於gevent hub的reinit,在gevent源碼的doc里寫的很清楚:

  This should be called *immediately* after :func:`os.fork` in the child process. This is done automatically by :func:`gevent.os.fork` or if the :mod:`os` module has beenmonkey-patched. If this function is not called in a forked process, symptoms may include hanging of functions like :func:`socket.getaddrinfo`, and the hub's threadpool is unlikelyto work.

  前面也提到了init_process會帶用到run方法,進入循環,我們看看GeventWorker.run:

  

 1     def run(self):
 2         servers = []
 3         ssl_args = {}
 4 
 5         if self.cfg.is_ssl:
 6             ssl_args = dict(server_side=True, **self.cfg.ssl_options)
 7 
 8         for s in self.sockets:
 9             s.setblocking(1)
10             pool = Pool(self.worker_connections) # Pool是Gevent.Pool
11             if self.server_class is not None:
12                 environ = base_environ(self.cfg)
13                 environ.update({
14                     "wsgi.multithread": True,
15                     "SERVER_SOFTWARE": VERSION,
16                 })
17                 server = self.server_class(
18                     s, application=self.wsgi, spawn=pool, log=self.log,
19                     handler_class=self.wsgi_handler, environ=environ,
20                     **ssl_args)
21             else:
22                 hfun = partial(self.handle, s) # self.handle為請求到達時候的處理函數
23                 server = StreamServer(s, handle=hfun, spawn=pool, **ssl_args)
24 
25             server.start()
26             servers.append(server)
27 
28         while self.alive: # 每隔1s告知Arbiter自己還活着
29             self.notify() 
30             gevent.sleep(1.0)   # 切換回hub

   邏輯也比較簡單,首先是建立TCP Server(gevent.StreamServer),並且傳入tcp連接到達時得處理函數self.handle, 然后TCP Server開始工作(server.start), 關於gevent具體怎么在多個協程之間調度,可以參加之前的文章。最后的while循環(28行)用於向Arbiter發心跳,並通過gevent.sleep立即切回hub。再來看看上述代碼的第10行:pool = Pool(self.worker_connections) ,這個worker_connections屬性來自gunicorn的配置,默認為1000,表明單個worker可以維持的連接數目。然后gevent.Pool的作用可以再gevent tutorial查看,簡單來說,就是規定了可以在單個線程內可以並發的greenlet.greenlet的數目。當TCP server接受到請求之后,調用self.handle處理請求,源代碼在基類AsyncWorker,實現基本同SyncWorker。

   

references:

http://www.cnblogs.com/xybaby/p/6297147.html

http://www.cnblogs.com/xybaby/p/6370799.html

http://docs.gunicorn.org/en/latest/settings.html

http://sdiehl.github.io/gevent-tutorial/#groups-and-pools 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM