eventlet詳解


正真工作才發現很懶,沒這么多時間寫文,畢竟小白,參照大神寫的,不喜勿噴

1.eventlet是什么
eventlet - 具有WSGI支持的異步框架
eventlet是python庫函數,一個是處理和網絡相關的,另一個可以通過協程實現並發
可以實現'並發'(綠色線程),非阻塞
對Python庫函數改寫,支持協程

綠色線程和普通線程區別
1. 綠色線程幾乎沒有開銷,不用像保留普通線程一樣保留“綠色線程”,每一個網絡連接對應至少一個“綠色線程”;

  2. 綠色線程需要人為的設置使其互相讓渡CPU控制權,而不是搶占。綠色線程既能夠共享數據結構,又不需要顯式的互斥控制,因為只有當一個綠色線程讓出了控制權后其他的綠色線程才能訪問彼此共享的數據結構。


下圖是eventlet中協程、hub、線程、進程之間的關系:

 

2.eventlet依賴的庫
1.greenlet
greenlet庫是其並發的基礎,eventlet庫簡單的對其進行封裝之后,就構成了GreenTread。

2.select.epoll
select庫中的epoll則是默認的網絡通信模型。正式由於這兩個庫的相對獨立性,可以從兩方面學習eventlet庫。

3.協程
協程看上去也是子程序,但執行過程中,在子程序內部可中斷,然后轉而執行別的子程序,在適當的時候再返回來接着執行。多核CPU使用多進程+協程

4.迭代器
4.1可迭代對象(iterable)實現了__iter__()方法的對象,通過調用iter()方法可以獲得一個迭代器(Iterator)。

4.2迭代器(iterator)是實現了iterator.__iter__()和iterator.__next__()方法的對象

4.3iterator是消耗型的,即每一個值被使用過后,就消失了

4.4for ... in ...語句的工作原理--即是一個生成器把所有數全推出來

4.5.yield用法,先返回了yield表達式的值,然后中斷,等待next()調用再執行.

4.6.generator其實有第2種調用方法(恢復執行),即通過send(value)方法將value作為yield表達式的當前值,要確保,generator是在yield處被暫停了,才能傳值,可以用next()方法


5.GreenLet

6.幾個主要API的理解
6.1 greenlet
1.每個協程都有自己的私有stack及局部變量;
2.同一時間內只有一個協程在運行,故無須對某些共享變量加鎖;
3.協程之間的執行順序,完成由程序來控制;

6.2 GreenThread
eventlet中對greenlet進行了簡單的封裝,GreenThread的調度通過hub來實現

6.3 GreenPool
該模塊提供對 greenthread 池的支持。
class eventlet.greenpool.GreenPool(size=1000)
類主要方法:
  1. free()
  2. imap(function, *iterables)
  3. resize(new_size)
  4. running()
  5. spawn(function, *args, **kwargs)
  6. spawn_n(function, *args, **kwargs)
  7. starmap(function, iterable)
  8. waitall()
  9. waiting()
1.
free()
  返回當前對象中可用的greenthreads。
  如果為 0 或更少,那么 spawn() 和 spawn_n() 將會阻塞調用 greenthread 直到有新的可用的 greenthread 為止。
  至於為什么此處可能返回負值,請查看3. resize()
2.
imap(function, *iterables)
  效果等同於 itertools.imap() ,在並發和內存使用上等同於 starmap() 。
  例如,可以非常方便地對文件做一些操作:
def worker(line):
return do_something(line)
pool = GreenPool()
for result in pool.imap(worker, open("filename", 'r')):
print(result)
3.
resize(new_size)
  改變當前允許同時工作的 greenthreads 最大數量
  如果當前有多於 new_size 的 greenthreads 處於工作中,它們可以完成自己的執行,只不過此時不許任何的新 greenthreads 被分配。只有當足夠數量的 greenthreads 完成自己的工作,然后工作中的 greenthreads 總數低於 new_size 時,新的 greenthreads 才能被分配。在此之前,free() 的返回值將會使負的。
4.
running()
  返回當前池子中正在執行任務的 greenthreads 。
5.
spawn(function, *args, **kwargs)
  從當前的池子中孵化一個可用的greenthread,在這個 greenthread 中執行 function ,參數 *args, **kwargs 為傳給 function 的參數。返回一個 GreenThread 對象,這個對象執行着 function ,可以通過該 GreenThread 對象獲取 function 的返回值。
  如果當前池子中沒有空余的 greenthread ,那么該方法阻塞直到有新的可用的 greenthreads 被釋放。
  該函數可以重用, function 可以調用同一個 GreenPool 對象的 spawn 方法,不用擔心死鎖。
6.
spawn_n(function, *args, **kwargs)
  創建一個 greenthread 來運行 function,效果等同於 spawn()。 只不過這個函數返回 None,即丟棄 function 的返回值。
7.
starmap(function, iterable)
  等同於 itertools.starmap(),除了對於可迭代對象中的每一個元素,都會在一個 greenthread 里面執行 func 。 並發的上限由池子的容量限制。在實際的操作中, starmap() 消耗的內存與池子的容量成比例,從而格外適合遍歷特別長的輸入列表。
8.
waitall()
  等待池子中的所有 greenthreads 完成工作。 
9.
waiting() 
  返回當前等待孵化的 greenthreads 數。

 


6.4 GreenPile(綠色線程池,可以有效的控制並發)
pile = eventlet.GreenPile(pool)
# 執行函數,把值放進去,函數可以重用, function 可以調用同一個 GreenPool 對象的 spawn 方法,不用擔心死鎖。
pile.spawn(func,value)

一般
1. next()
等待下一個結果,掛起當前的 greenthread 直到結果可用為止。 當沒有更多的結果時,拋出 StopIteration 異常。

2. spawn(func, *args, **kw)
在它自己的 greenthread 中運行 func,結果儲存在 GreenPile 對象中,可以迭代該對象獲取這些結果。

 


7.例子
7.1 爬蟲
import eventlet
from eventlet.green import urllib2

urls = ["http://www.google.com/intl/en_ALL/images/logo.gif",
"https://wiki.secondlife.com/w/images/secondlife.jpg",
"http://us.i1.yimg.com/us.yimg.com/i/ww/beta/y3.gif"]

def fetch(url):
return urllib2.urlopen(url).read()

pool = eventlet.GreenPool()
for body in pool.imap(fetch, urls):
print("got body", len(body))

第2行引入綠化后的 urllib2,除了使用綠化后的套接字外,與原有的標准庫完全相同。

第11行創建一個綠色線程池,此處缺省容量為1000,線程池可以控制並發,限制內存消耗的上限;

第12行遍歷並行調用函數 fetch 后的結果,imap 可以並行調用函數 fetch ,返回結果的先后順序和執行的先后順序相同。

這個例子的關鍵就在於客戶端起了若干的綠色線程,並行收集網絡爬取的結果,同時由於綠色線程池加了內存帽,也不會因為url列表過大而消耗過多的內存。


7.2 簡單socket服務器
import eventlet

def handle(client):
while True:
c = client.recv(1)
if not c: break
client.sendall(c)

server = eventlet.listen(('0.0.0.0', 6000))
pool = eventlet.GreenPool(10000)
while True:
new_sock, address = server.accept()
pool.spawn_n(handle, new_sock)
  server = eventlet.listen(('0.0.0.0', 6000)) 一句創建一個監聽套接字;

  pool = eventlet.GreenPool(10000) 一句創建一個綠色線程池,最多可以容納10000個客戶端連接;

  new_sock, address = server.accept() 一句很特殊,由於這里創建的服務器套接字是經過綠化的,所以當多個連接到來時在accept()這里不會阻塞,而是並行接收

  pool.spawn_n(handle, new_sock) 一句為每一個客戶端創建一個綠色線程,該綠色線程不在乎回調函數 handle 的執行結果,也就是完全將客戶端套接字交給回調 handle 處理。


7.3
Feed 挖掘機

  該用例下,一個服務端同時也是另一個服務的客戶端,比如代理等,這里 GreenPile 就發揮作用了。

  下面的例子中,服務端從客戶端接收 POST 請求,請求中包括含有 RSS feed 的URL,服務端並發地到 feed 服務器那里取回所有的 feed 然后將他們的標題返回給客戶端:

import eventlet
feedparser = eventlet.import_patched('feedparser')

pool = eventlet.GreenPool()

def fetch_title(url):
d = feedparser.parse(url)
return d.feed.get('title', '')

def app(environ, start_response):
pile = eventlet.GreenPile(pool)
for url in environ['wsgi.input'].readlines():
pile.spawn(fetch_title, url)
titles = '\n'.join(pile)
start_response('200 OK', [('Content-type', 'text/plain')])
return [titles]
  

使用綠色線程池的好處是控制並發,如果沒有這個並發控制的話,客戶端可能會讓服務端在 feed 服務器那里起很多的連接,導致服務端被feed服務器給 ban 掉。


總結:
1. Greenthread Spawn(產生greenthread綠色線程)
spawn(func, *args, **kwargs)

2. Greenthread Control(控制greenthread)
eventlet.GreenPool;eventlet.GreenPile;eventlet.Queue

3. Network Convenience Functions(和網絡相關的函數)
綠化socket


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM