攻克python3-協程


協程

協程是線程的更小切分,又稱為“微線程”,是一種用戶態的輕量級線程。

與進程的區別:

相同點:

相同點存在於,當我們掛起一個執行流的時,我們要保存的東西:

  • 棧, 其實在你切換前你的局部變量,以及要函數的調用都需要保存,否則都無法恢復
  • 寄存器狀態,這個其實用於當你的執行流恢復后要做什么

而寄存器和棧的結合就可以理解為上下文,上下文切換的理解:
CPU看上去像是在並發的執行多個進程,這是通過處理器在進程之間切換來實現的,操作系統實現這種交錯執行的機制稱為上下文切換

操作系統保持跟蹤進程運行所需的所有狀態信息。這種狀態,就是上下文。
在任何一個時刻,操作系統都只能執行一個進程代碼,當操作系統決定把控制權從當前進程轉移到某個新進程時,就會進行上下文切換,即保存當前進程的上下文,恢復新進程的上下文,然后將控制權傳遞到新進程,新進程就會從它上次停止的地方開始。

不同點:

  • 執行流的調度者不同,進程是內核調度,而協程是在用戶態調度,也就是說進程的上下文是在內核態保存恢復的,而協程是在用戶態保存恢復的,很顯然用戶態的代價更低
  • 進程會被強占,而協程不會,也就是說協程如果不主動讓出CPU,那么其他的協程,就沒有執行的機會。
  • 對內存的占用不同,實際上協程可以只需要4K的棧就足夠了,而進程占用的內存要大的多
  • 從操作系統的角度講,多協程的程序是單進程,單協程

與線程的區別:

既然我們上面也說了,協程也被稱為微線程,下面對比一下協程和線程:

  • 線程之間需要上下文切換成本相對協程來說是比較高的,尤其在開啟線程較多時,但協程的切換成本非常低。
  • 同樣的線程的切換更多的是靠操作系統來控制,而協程的執行由我們自己控制。

  協程只是在單一的線程里不同的協程之間切換,其實和線程很像,線程是在一個進程下,不同的線程之間做切換,這也可能是協程稱為微線程的原因吧。

協程的優缺點

協程的優點:

  (1)無需線程上下文切換的開銷,協程避免了無意義的調度,由此可以提高性能(但也因此,程序員必須自己承擔調度的責任,同時,協程也失去了標准線程使用多CPU的能力)

  (2)無需原子操作鎖定及同步的開銷

  (3)方便切換控制流,簡化編程模型

  (4)高並發+高擴展性+低成本:一個CPU支持上萬的協程都不是問題。所以很適合用於高並發處理。

協程的缺點:

  (1)無法利用多核資源:協程的本質是個單線程,它不能同時將 單個CPU 的多個核用上,協程需要和進程配合才能運行在多CPU上.當然我們日常所編寫的絕大部分應用都沒有這個必要,除非是cpu密集型應用。

  (2)進行阻塞(Blocking)操作(如IO時)會阻塞掉整個程序

在python中實現協程

yield實現協程

import time
def consumer(name):             #生成器
    print("%s要開始吃包子了!"%(name))
    while True:
        baozi=yield         #暫停,記錄位置,返回跳出
        print("包子%s,%s吃了"%(baozi,name))

def producer(name):
    c=consumer("a")#只是變成一個生成器
    c.__next__()                #next 只喚醒yiedl不傳遞值
    for i in range(4):
        time.sleep(1)
        print("%s做了1個包子"%(name))
        c.send(i)               #喚醒yiedl並傳遞值

producer("phk")
View Code

greenlet實現協程

Python的 greenlet就相當於手動切換(.switch),去執行別的子程序,在“別的子程序”中又主動切換回來。。。

from greenlet import greenlet
def test1():
    print(12)
    gr2.switch()
    print(34)
    gr2.switch()
def test2():
    print(56)
    gr1.switch()#切換
    print(78)

gr1 = greenlet(test1) #啟動一個攜程
gr2 = greenlet(test2)
gr1.switch()
View Code

 gevent 實現協程

import gevent

def foo():
    print('Running in foo')
    gevent.sleep(2)
    print('Explicit context switch to foo again')
def bar():
    print('Explicit精確的 context內容 to bar')
    gevent.sleep(1)#假設遇到io 切換
    print('Implicit context switch back to bar')
def func3():
    print("running func3 ")
    gevent.sleep(3)
    print("running func3  again ")


gevent.joinall([
    gevent.spawn(foo), #生成,
    gevent.spawn(bar),
    gevent.spawn(func3),
])
View Code

同步與異步性能區別:

def task(pid):
    """
    Some non-deterministic task
    """
    gevent.sleep(0.5)
    print('Task %s done' % pid)
 
def synchronous():
    for i in range(1,10):
        task(i)
 
def asynchronous():
    threads = [gevent.spawn(task, i) for i in range(10)]
    gevent.joinall(threads)
 
print('Synchronous:')
synchronous()
 
print('Asynchronous:')
asynchronous()
View Code

上面程序的重要部分是將task函數封裝到greenlet內部線程的gevent.spawn。 初始化的greenlet列表存放在數組threads中,此數組被傳給gevent.joinall 函數,后者阻塞當前流程,並執行所有給定的greenlet。執行流程只會在 所有greenlet執行完后才會繼續向下走。

猴子補丁 Monkey patching

這個補丁是Gevent模塊最需要注意的問題,有了它,才會讓Gevent模塊發揮它的作用。我們往往使用Gevent是為了實現網絡通信的高並發,但是,Gevent直接修改標准庫里面大部分的阻塞式系統調用,包括socket、ssl、threading和 select等模塊,而變為協作式運行。但是我們無法保證你在復雜的生產環境中有哪些地方使用這些標准庫會由於打了補丁而出現奇怪的問題。

一種方法是使用gevent下的socket模塊,我們可以通過”from gevent import socket”來導入。不過更常用的方法是使用猴子布丁(Monkey patching)。使用猴子補丁褒貶不一,但是官網上還是建議使用”patch_all()”,而且在程序的第一行就執行。 

from gevent import monkey; monkey.patch_socket()
import gevent
import socket
  
urls = ['www.baidu.com', 'www.gevent.org', 'www.python.org']
jobs = [gevent.spawn(socket.gethostbyname, url) for url in urls]
gevent.joinall(jobs, timeout=5)
  
print (jobs)
View Code

上述代碼的第一行就是對socket標准庫打上猴子補丁,此后socket標准庫中的類和方法都會被替換成非阻塞式的,所有其他的代碼都不用修改,這樣協程的效率就真正體現出來了。Python中其它標准庫也存在阻塞的情況,gevent提供了”monkey.patch_all()”方法將所有標准庫都替換。

協程間的通信

事件(Event)對象 

greenlet協程間的異步通訊可以使用事件(Event)對象。該對象的”wait()”方法可以阻塞當前協程,而”set()”方法可以喚醒之前阻塞的協程。在下面的例子中,5個waiter協程都會等待事件evt,當setter協程在3秒后設置evt事件,所有的waiter協程即被喚醒。

import gevent
from gevent.event import Event

evt = Event()


def setter():
    print('Wait for me')
    gevent.sleep(3)  # 3秒后喚醒所有在evt上等待的協程
    print("Ok, I'm done")
    evt.set()  # 喚醒


def waiter():
    print("I'll wait for you")
    evt.wait()  # 等待
    print('Finish waiting')


gevent.joinall([
    gevent.spawn(setter),
    gevent.spawn(waiter),
    gevent.spawn(waiter),
    gevent.spawn(waiter),
    gevent.spawn(waiter),
    gevent.spawn(waiter)
])
View Code

AsyncResult事件

除了Event事件外,gevent還提供了AsyncResult事件,它可以在喚醒時傳遞消息。讓我們將上例中的setter和waiter作如下改動:

from gevent.event import AsyncResult
aevt = AsyncResult()
  
def setter():
    print 'Wait for me'
    gevent.sleep(3)  # 3秒后喚醒所有在evt上等待的協程
    print "Ok, I'm done"
    aevt.set('Hello!')  # 喚醒,並傳遞消息
  
def waiter():
    print("I'll wait for you")
    message = aevt.get()  # 等待,並在喚醒時獲取消息
    print 'Got wake up message: %s' % message
View Code

 

隊列 Queue

隊列Queue的概念相信大家都知道,我們可以用它的put和get方法來存取隊列中的元素。gevent的隊列對象可以讓greenlet協程之間安全的訪問。運行下面的程序,你會看到3個消費者會分別消費隊列中的產品,且消費過的產品不會被另一個消費者再取到:

import gevent
from gevent.queue import Queue

products = Queue()


def consumer(name):
    # while not products.empty():
    while True:
        try:
            print('%s got product %s' % (name, products.get_nowait()))
            gevent.sleep(0)
        except gevent.queue.Empty:
            break

    print('Quit')


def producer():
    for i in range(1, 10):
        products.put(i)


gevent.joinall([
    gevent.spawn(producer),
    gevent.spawn(consumer, 'steve'),
    gevent.spawn(consumer, 'john'),
    gevent.spawn(consumer, 'nancy'),
])
View Code

 注意:協程隊列跟線程隊列是一樣的,put和get方法都是阻塞式的,它們都有非阻塞的版本:put_nowait和get_nowait。如果調用get方法時隊列為空,則是不會拋出”gevent.queue.Empty”異常。我們只能使用get_nowait()的方式讓氣拋出異常。

信號量

信號量可以用來限制協程並發的個數。它有兩個方法,acquire和release。顧名思義,acquire就是獲取信號量,而release就是釋放。當所有信號量都已被獲取,那剩余的協程就只能等待任一協程釋放信號量后才能得以運行:

import gevent
from gevent.lock import BoundedSemaphore

sem = BoundedSemaphore(2)


def worker(n):
    sem.acquire()
    print('Worker %i acquired semaphore' % n)
    gevent.sleep(1)
    sem.release()
    print('Worker %i released semaphore' % n)


gevent.joinall([gevent.spawn(worker, i) for i in range(0, 6)])
View Code

上面的例子中,我們初始化了”BoundedSemaphore”信號量,並將其個數定為2。所以同一個時間,只能有兩個worker協程被調度。程序運行后的結果如下:

 

多並發的socket

import gevent
from gevent import  monkey
from gevent import socket
# import socket #兩種import方法都可以

monkey.patch_all()

def server(port):
    s = socket.socket()
    s.bind(('localhost', port))
    s.listen(500)
    while True:
        cli, addr = s.accept()
        gevent.spawn(handle_request, cli)


def handle_request(conn):
    try:
        while True:
            data = conn.recv(1024)
            print("recv:", data)
            conn.send(data)
            if not data:
                conn.shutdown(socket.SHUT_WR)

    except Exception as  ex:
        print(ex)
    finally:
        conn.close()


if __name__ == '__main__':
    server(9999)
服務器端
import socket

HOST = 'localhost'  # The remote host
PORT = 9999  # The same port as used by the server
s = socket.socket()
s.connect((HOST, PORT))
while True:
    msg = bytes(input(">>:"), encoding="utf8")
    s.sendall(msg)
    data = s.recv(1024)
    print('Received', data)
    s.send(b"200")
s.close()
用戶端

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM