Python學習之路--進程,線程,協程


  1. 進程、與線程區別
  2. cpu運行原理
  3. python GIL全局解釋器鎖
  4. 線程
    1. 語法
    2. join
    3. 線程鎖之Lock\Rlock\信號量
    4. 將線程變為守護進程
    5. Event事件 
    6. queue隊列
    7. 生產者消費者模型
    8. Queue隊列
    9. 開發一個線程池
  5. 進程
    1. 語法
    2. 進程間通訊
    3. 進程池    

 

進程與線程

什么是線程(thread)?

線程是操作系統能夠進行運算調度的最小單位。它被包含在進程之中,是進程中的實際運作單位。一條線程指的是進程中一個單一順序的控制流,一個進程中可以並發多個線程,每條線程並行執行不同的任務

A thread is an execution context, which is all the information a CPU needs to execute a stream of instructions.

Suppose you're reading a book, and you want to take a break right now, but you want to be able to come back and resume reading from the exact point where you stopped. One way to achieve that is by jotting down the page number, line number, and word number. So your execution context for reading a book is these 3 numbers.

If you have a roommate, and she's using the same technique, she can take the book while you're not using it, and resume reading from where she stopped. Then you can take it back, and resume it from where you were.

Threads work in the same way. A CPU is giving you the illusion that it's doing multiple computations at the same time. It does that by spending a bit of time on each computation. It can do that because it has an execution context for each computation. Just like you can share a book with your friend, many tasks can share a CPU.

On a more technical level, an execution context (therefore a thread) consists of the values of the CPU's registers.

Last: threads are different from processes. A thread is a context of execution, while a process is a bunch of resources associated with a computation. A process can have one or many threads.

Clarification: the resources associated with a process include memory pages (all the threads in a process have the same view of the memory), file descriptors (e.g., open sockets), and security credentials (e.g., the ID of the user who started the process).

 

什么是進程(process)?

An executing instance of a program is called a process.

Each process provides the resources needed to execute a program. A process has a virtual address space, executable code, open handles to system objects, a security context, a unique process identifier, environment variables, a priority class, minimum and maximum working set sizes, and at least one thread of execution. Each process is started with a single thread, often called the primary thread, but can create additional threads from any of its threads.

進程與線程的區別?

    • Threads share the address space of the process that created it; processes have their own address space.
    • Threads have direct access to the data segment of its process; processes have their own copy of the data segment of the parent process.
    • Threads can directly communicate with other threads of its process; processes must use interprocess communication to communicate with sibling processes.
    • New threads are easily created; new processes require duplication of the parent process.
    • Threads can exercise considerable control over threads of the same process; processes can only exercise control over child processes.
    • Changes to the main thread (cancellation, priority change, etc.) may affect the behavior of the other threads of the process; changes to the parent process does not affect child processes 。  

 

Python threading模塊

線程有2種調用方式,如下:

直接調用

 1 import threading
 2 import time
 3  
 4 def sayhi(num): #定義每個線程要運行的函數
 5  
 6     print("running on number:%s" %num)
 7  
 8     time.sleep(3)
 9  
10 if __name__ == '__main__':
11  
12     t1 = threading.Thread(target=sayhi,args=(1,)) #生成一個線程實例
13     t2 = threading.Thread(target=sayhi,args=(2,)) #生成另一個線程實例
14  
15     t1.start() #啟動線程
16     t2.start() #啟動另一個線程
17  
18     print(t1.getName()) #獲取線程名
19     print(t2.getName())

 

繼承式調用

 1 import threading
 2 import time
 3  
 4 class MyThread(threading.Thread):
 5     def __init__(self,num):
 6         threading.Thread.__init__(self)
 7         self.num = num
 8  
 9     def run(self):#定義每個線程要運行的函數
10  
11         print("running on number:%s" %self.num)
12  
13         time.sleep(3)
14  
15 if __name__ == '__main__':
16  
17     t1 = MyThread(1)
18     t2 = MyThread(2)
19     t1.start()
20     t2.start()

 Threading用於提供線程相關的操作,線程是應用程序中工作的最小單元。

 1 #!/usr/bin/env python
 2 # -*- coding:utf-8 -*-
 3 import threading
 4 import time
 5   
 6 def show(arg):
 7     time.sleep(1)
 8     print 'thread'+str(arg)
 9   
10 for i in range(10):
11     t = threading.Thread(target=show, args=(i,))
12     t.start()
13   
14 print 'main thread stop'

上述代碼創建了10個“前台”線程,然后控制器就交給了CPU,CPU根據指定算法進行調度,分片執行指令。

更多方法:

    • start            線程准備就緒,等待CPU調度
    • setName      為線程設置名稱
    • getName      獲取線程名稱
    • setDaemon   設置為后台線程或前台線程(默認)
                         如果是后台線程,主線程執行過程中,后台線程也在進行,主線程執行完畢后,后台線程不論成功與否,均停止
                          如果是前台線程,主線程執行過程中,前台線程也在進行,主線程執行完畢后,等待前台線程也執行完成后,程序停止
    • join              逐個執行每個線程,執行完畢后繼續往下執行,該方法使得多線程變得無意義
    • run              線程被cpu調度后自動執行線程對象的run方法

 

Join & Daemon

Some threads do background tasks, like sending keepalive packets, or performing periodic garbage collection, or whatever. These are only useful when the main program is running, and it's okay to kill them off once the other, non-daemon, threads have exited.

Without daemon threads, you'd have to keep track of them, and tell them to exit, before your program can completely quit. By setting them as daemon threads, you can let them run and forget about them, and when your program quits, any daemon threads are killed automatically.

 1 import time
 2 import threading
 3  
 4 def run(n):
 5  
 6     print('[%s]------running----\n' % n)
 7     time.sleep(2)
 8     print('--done--')
 9  
10 def main():
11     for i in range(5):
12         t = threading.Thread(target=run,args=[i,])
13         #time.sleep(1)
14         t.start()
15         t.join(1)
16         print('starting thread', t.getName())
17  
18  
19 m = threading.Thread(target=main,args=[])
20 m.setDaemon(True) #將主線程設置為Daemon線程,它退出時,其它子線程會同時退出,不管是否執行完任務
21 m.start()
22 #m.join(timeout=2)
23 print("---main thread done----")

 

Note:Daemon threads are abruptly stopped at shutdown. Their resources (such as open files, database transactions, etc.) may not be released properly. If you want your threads to stop gracefully, make them non-daemonic and use a suitable signalling mechanism such as an Event.

  

 

 

線程鎖(互斥鎖Mutex)

一個進程下可以啟動多個線程,多個線程共享父進程的內存空間,也就意味着每個線程可以訪問同一份數據,此時,如果2個線程同時要修改同一份數據,會出現什么狀況?

 1 import time
 2 import threading
 3  
 4 def addNum():
 5     global num #在每個線程中都獲取這個全局變量
 6     print('--get num:',num )
 7     time.sleep(1)
 8     num  -=1 #對此公共變量進行-1操作
 9  
10 num = 100  #設定一個共享變量
11 thread_list = []
12 for i in range(100):
13     t = threading.Thread(target=addNum)
14     t.start()
15     thread_list.append(t)
16  
17 for t in thread_list: #等待所有線程執行完畢
18     t.join()
19  
20  
21 print('final num:', num )

 

正常來講,這個num結果應該是0, 但在python 2.7上多運行幾次,會發現,最后打印出來的num結果不總是0,為什么每次運行的結果不一樣呢? 哈,很簡單,假設你有A,B兩個線程,此時都 要對num 進行減1操作, 由於2個線程是並發同時運行的,所以2個線程很有可能同時拿走了num=100這個初始變量交給cpu去運算,當A線程去處完的結果是99,但此時B線程運算完的結果也是99,兩個線程同時CPU運算的結果再賦值給num變量后,結果就都是99。那怎么辦呢? 很簡單,每個線程在要修改公共數據時,為了避免自己在還沒改完的時候別人也來修改此數據,可以給這個數據加一把鎖, 這樣其它線程想修改此數據時就必須等待你修改完畢並把鎖釋放掉后才能再訪問此數據。 

*注:不要在3.x上運行,不知為什么,3.x上的結果總是正確的,可能是自動加了鎖

加鎖版本

 1 import time
 2 import threading
 3  
 4 def addNum():
 5     global num #在每個線程中都獲取這個全局變量
 6     print('--get num:',num )
 7     time.sleep(1)
 8     lock.acquire() #修改數據前加鎖
 9     num  -=1 #對此公共變量進行-1操作
10     lock.release() #修改后釋放
11  
12 num = 100  #設定一個共享變量
13 thread_list = []
14 lock = threading.Lock() #生成全局鎖
15 for i in range(100):
16     t = threading.Thread(target=addNum)
17     t.start()
18     thread_list.append(t)
19  
20 for t in thread_list: #等待所有線程執行完畢
21     t.join()
22  
23 print('final num:', num )

 

  

RLock(遞歸鎖)

說白了就是在一個大鎖中還要再包含子鎖

 1 import threading,time
 2  
 3 def run1():
 4     print("grab the first part data")
 5     lock.acquire()
 6     global num
 7     num +=1
 8     lock.release()
 9     return num
10 def run2():
11     print("grab the second part data")
12     lock.acquire()
13     global  num2
14     num2+=1
15     lock.release()
16     return num2
17 def run3():
18     lock.acquire()
19     res = run1()
20     print('--------between run1 and run2-----')
21     res2 = run2()
22     lock.release()
23     print(res,res2)
24  
25  
26 if __name__ == '__main__':
27  
28     num,num2 = 0,0
29     lock = threading.RLock()
30     for i in range(10):
31         t = threading.Thread(target=run3)
32         t.start()
33  
34 while threading.active_count() != 1:
35     print(threading.active_count())
36 else:
37     print('----all threads done---')
38     print(num,num2)

 

  

Semaphore(信號量)

互斥鎖 同時只允許一個線程更改數據,而Semaphore是同時允許一定數量的線程更改數據 ,比如廁所有3個坑,那最多只允許3個人上廁所,后面的人只能等里面有人出來了才能再進去。

 1 import threading,time
 2  
 3 def run(n):
 4     semaphore.acquire()
 5     time.sleep(1)
 6     print("run the thread: %s\n" %n)
 7     semaphore.release()
 8  
 9 if __name__ == '__main__':
10  
11     num= 0
12     semaphore  = threading.BoundedSemaphore(5) #最多允許5個線程同時運行
13     for i in range(20):
14         t = threading.Thread(target=run,args=(i,))
15         t.start()
16  
17 while threading.active_count() != 1:
18     pass #print threading.active_count()
19 else:
20     print('----all threads done---')
21     print(num)

 

 

Events

An event is a simple synchronization object;

the event represents an internal flag, and threads
can wait for the flag to be set, or set or clear the flag themselves.

event = threading.Event()

# a client thread can wait for the flag to be set
event.wait()

# a server thread can set or reset it
event.set()
event.clear()
If the flag is set, the wait method doesn’t do anything.
If the flag is cleared, wait will block until it becomes set again.
Any number of threads may wait for the same event.

python線程的事件用於主線程控制其他線程的執行,事件主要提供了三個方法 set、wait、clear。

事件處理的機制:全局定義了一個“Flag”,如果“Flag”值為 False,那么當程序執行 event.wait 方法時就會阻塞,如果“Flag”值為True,那么event.wait 方法時便不再阻塞。

clear:將“Flag”設置為False
set:將“Flag”設置為True

通過Event來實現兩個或多個線程間的交互,下面是一個紅綠燈的例子,即起動一個線程做交通指揮燈,生成幾個線程做車輛,車輛行駛按紅燈停,綠燈行的規則。

 1 import threading,time
 2 import random
 3 def light():
 4     if not event.isSet():
 5         event.set() #wait就不阻塞 #綠燈狀態
 6     count = 0
 7     while True:
 8         if count < 10:
 9             print('\033[42;1m--green light on---\033[0m')
10         elif count <13:
11             print('\033[43;1m--yellow light on---\033[0m')
12         elif count <20:
13             if event.isSet():
14                 event.clear()
15             print('\033[41;1m--red light on---\033[0m')
16         else:
17             count = 0
18             event.set() #打開綠燈
19         time.sleep(1)
20         count +=1
21 def car(n):
22     while 1:
23         time.sleep(random.randrange(10))
24         if  event.isSet(): #綠燈
25             print("car [%s] is running.." % n)
26         else:
27             print("car [%s] is waiting for the red light.." %n)
28 if __name__ == '__main__':
29     event = threading.Event()
30     Light = threading.Thread(target=light)
31     Light.start()
32     for i in range(3):
33         t = threading.Thread(target=car,args=(i,))
34         t.start()

 

  

queue隊列 

queue is especially useful in threaded programming when information must be exchanged safely between multiple threads.

class queue.Queue(maxsize=0) #先入先出
class queue.LifoQueue(maxsize=0) #last in fisrt out 
class queue.PriorityQueue(maxsize=0) #存儲數據時可設置優先級的隊列

Constructor for a priority queue. maxsize is an integer that sets the upperbound limit on the number of items that can be placed in the queue. Insertion will block once this size has been reached, until queue items are consumed. If maxsize is less than or equal to zero, the queue size is infinite.

The lowest valued entries are retrieved first (the lowest valued entry is the one returned by sorted(list(entries))[0]). A typical pattern for entries is a tuple in the form: (priority_number, data).

exception queue.Empty

Exception raised when non-blocking get() (or get_nowait()) is called on a Queue object which is empty.

exception queue.Full

Exception raised when non-blocking put() (or put_nowait()) is called on a Queue object which is full.

Queue. qsize ()
Queue. empty () #return True if empty  
Queue. full () # return True if full 
Queue. put (itemblock=Truetimeout=None)

Put item into the queue. If optional args block is true and timeout is None (the default), block if necessary until a free slot is available. If timeout is a positive number, it blocks at most timeout seconds and raises the Full exception if no free slot was available within that time. Otherwise (block is false), put an item on the queue if a free slot is immediately available, else raise the Full exception (timeout is ignored in that case).

Queue. put_nowait (item)

Equivalent to put(item, False).

Queue. get (block=Truetimeout=None)

Remove and return an item from the queue. If optional args block is true and timeout is None (the default), block if necessary until an item is available. If timeout is a positive number, it blocks at most timeout seconds and raises the Empty exception if no item was available within that time. Otherwise (block is false), return an item if one is immediately available, else raise the Empty exception (timeout is ignored in that case).

Queue. get_nowait ()

Equivalent to get(False).

Two methods are offered to support tracking whether enqueued tasks have been fully processed by daemon consumer threads.

Queue. task_done ()

Indicate that a formerly enqueued task is complete. Used by queue consumer threads. For each get() used to fetch a task, a subsequent call to task_done() tells the queue that the processing on the task is complete.

If a join() is currently blocking, it will resume when all items have been processed (meaning that a task_done() call was received for every item that had been put() into the queue).

Raises a ValueError if called more times than there were items placed in the queue.

Queue. join () block直到queue被消費完畢

生產者消費者模型

 

 1 import time,random
 2 import queue,threading
 3 q = queue.Queue()
 4 def Producer(name):
 5   count = 0
 6   while count <20:
 7     time.sleep(random.randrange(3))
 8     q.put(count)
 9     print('Producer %s has produced %s baozi..' %(name, count))
10     count +=1
11 def Consumer(name):
12   count = 0
13   while count <20:
14     time.sleep(random.randrange(4))
15     if not q.empty():
16         data = q.get()
17         print(data)
18         print('\033[32;1mConsumer %s has eat %s baozi...\033[0m' %(name, data))
19     else:
20         print("-----no baozi anymore----")
21     count +=1
22 p1 = threading.Thread(target=Producer, args=('A',))
23 c1 = threading.Thread(target=Consumer, args=('B',))
24 p1.start()
25 c1.start()

 

  

 

多進程multiprocessing

multiprocessing is a package that supports spawning processes using an API similar to the threading module. The multiprocessing package offers both local and remote concurrency, effectively side-stepping the Global Interpreter Lock by using subprocesses instead of threads. Due to this, the multiprocessing module allows the programmer to fully leverage multiple processors on a given machine. It runs on both Unix and Windows.

 1 from multiprocessing import Process
 2 import time
 3 def f(name):
 4     time.sleep(2)
 5     print('hello', name)
 6  
 7 if __name__ == '__main__':
 8     p = Process(target=f, args=('bob',))
 9     p.start()
10     p.join()

 注意:由於進程之間的數據需要各自持有一份,所以創建進程需要的非常大的開銷。

To show the individual process IDs involved, here is an expanded example:

 1 from multiprocessing import Process
 2 import os
 3  
 4 def info(title):
 5     print(title)
 6     print('module name:', __name__)
 7     print('parent process:', os.getppid())
 8     print('process id:', os.getpid())
 9     print("\n\n")
10  
11 def f(name):
12     info('\033[31;1mfunction f\033[0m')
13     print('hello', name)
14  
15 if __name__ == '__main__':
16     info('\033[32;1mmain process line\033[0m')
17     p = Process(target=f, args=('bob',))
18     p.start()
19     p.join()

 

 

進程間通訊  

不同進程間內存是不共享的,要想實現兩個進程間的數據交換,可以用以下方法:

Queues

使用方法跟threading里的queue差不多

 1 from multiprocessing import Process, Queue
 2  
 3 def f(q):
 4     q.put([42, None, 'hello'])
 5  
 6 if __name__ == '__main__':
 7     q = Queue()
 8     p = Process(target=f, args=(q,))
 9     p.start()
10     print(q.get())    # prints "[42, None, 'hello']"
11     p.join()

 

Pipes

The Pipe() function returns a pair of connection objects connected by a pipe which by default is duplex (two-way). For example:

 1 from multiprocessing import Process, Pipe
 2  
 3 def f(conn):
 4     conn.send([42, None, 'hello'])
 5     conn.close()
 6  
 7 if __name__ == '__main__':
 8     parent_conn, child_conn = Pipe()
 9     p = Process(target=f, args=(child_conn,))
10     p.start()
11     print(parent_conn.recv())   # prints "[42, None, 'hello']"
12     p.join()

 

The two connection objects returned by Pipe() represent the two ends of the pipe. Each connection object has send() and recv() methods (among others). Note that data in a pipe may become corrupted if two processes (or threads) try to read from or write to the same end of the pipe at the same time. Of course there is no risk of corruption from processes using different ends of the pipe at the same time.

 

Managers

A manager object returned by Manager() controls a server process which holds Python objects and allows other processes to manipulate them using proxies.

A manager returned by Manager() will support types listdictNamespaceLockRLockSemaphoreBoundedSemaphoreConditionEventBarrierQueueValue and Array. For example,

#沒有共享數據
from multiprocessing import Process
import time
li = []

def foo(i):
    li.append(i)
    print(''say hi",li)

if __name__=='__main__':
    for i in range(10):
        p=Process(target=foo,args=(i,))
        p.start()

print('ending',li)

#方法一:Array
from multiprocessing import Process,Array
temp = Array('i', [11,22,33,44])
 
def Foo(i):
    temp[i] = 100+i
    for item in temp:
        print i,'----->',item
if __name__=='__main__'for i in range(2):
        p = Process(target=Foo,args=(i,))
        p.start()

#方法二:manage.dict()共享數據
from multiprocessing import Process, Manager

def f(d, l):
    d[1] = '1'
    d['2'] = 2
    d[0.25] = None
    l.append(1)
    print(l)

if __name__ == '__main__':
    with Manager() as manager:
        d = manager.dict()
        l = manager.list(range(5))
        p_list = []
        for i in range(10):
            p = Process(target=f, args=(d, l))
            p.start()
            p_list.append(p)
        for res in p_list:
            res.join()
        print(d)

 當創建進程時(非使用時),共享數據會被拿到子進程中,當進程中執行完畢后,再賦值給原值。

進程鎖實例

 1 from multiprocessing import Process, Array, RLock
 2 
 3 def Foo(lock,temp,i):
 4     """
 5     將第0個數加100
 6     """
 7     lock.acquire()
 8     temp[0] = 100+i
 9     for item in temp:
10         print(i,'----->',item)
11     lock.release()
12 
13 lock = RLock()
14 temp = Array('i', [11, 22, 33, 44])
15 if __name__=='__main__':
16 
17     for i in range(20):
18         p = Process(target=Foo,args=(lock,temp,i,))
19         p.start()

 

 

進程同步

Without using the lock output from the different processes is liable to get all mixed up.

 1 from multiprocessing import Process, Lock
 2  
 3 def f(l, i):
 4     l.acquire()
 5     try:
 6         print('hello world', i)
 7     finally:
 8         l.release()
 9  
10 if __name__ == '__main__':
11     lock = Lock()
12  
13     for num in range(10):
14         Process(target=f, args=(lock, num)).start()

 

  

 

進程池  

進程池內部維護一個進程序列,當使用時,則去進程池中獲取一個進程,如果進程池序列中沒有可供使用的進進程,那么程序就會等待,直到進程池中有可用進程為止。

進程池中有兩個方法:

  • apply
  • apply_async
 1 #!/usr/bin/env python
 2 # -*- coding:utf-8 -*-
 3 from  multiprocessing import Process,Pool
 4 import time
 5 
 6 def Foo(i):
 7     time.sleep(5)
 8     print('第%s次'%i)
 9     return i+100
10 def Bar(arg):
11     print('what-->',arg)
12 #print pool.apply(Foo,(1,))
13 #print pool.apply_async(func =Foo, args=(1,)).get()
14 if __name__=='__main__':
15     pool = Pool(5)  #創建5個有進程的進程池
16     for i in range(10):
17         pool.apply_async(func=Foo, args=(i,),callback=Bar) #callback是回調
18     print('end')
19     pool.close() #先寫close,再寫join
20     pool.join()#進程池中進程執行完畢后再關閉,如果注釋,那么程序直接關閉。

 

協程

線程和進程的操作是由程序觸發系統接口,最后的執行者是系統;協程的操作則是程序員。

協程存在的意義:對於多線程應用,CPU通過切片的方式來切換線程間的執行,線程切換時需要耗時(保存狀態,下次繼續)。協程,則只使用一個線程,在一個線程中規定某個代碼塊執行順序。

協程的適用場景:當程序中存在大量不需要CPU的操作時(IO),適用於協程;

 

協程,又稱微線程,纖程。英文名Coroutine。一句話說明什么是線程:協程是一種用戶態的輕量級線程。

協程擁有自己的寄存器上下文和棧。協程調度切換時,將寄存器上下文和棧保存到其他地方,在切回來的時候,恢復先前保存的寄存器上下文和棧。因此:

協程能保留上一次調用時的狀態(即所有局部狀態的一個特定組合),每次過程重入時,就相當於進入上一次調用的狀態,換種說法:進入上一次離開時所處邏輯流的位置。

 

協程的好處:

無需線程上下文切換的開銷
無需原子操作鎖定及同步的開銷
方便切換控制流,簡化編程模型
高並發+高擴展性+低成本:一個CPU支持上萬的協程都不是問題。所以很適合用於高並發處理。
缺點:

無法利用多核資源:協程的本質是個單線程,它不能同時將 單個CPU 的多個核用上,協程需要和進程配合才能運行在多CPU上.當然我們日常所編寫的絕大部分應用都沒有這個必要,除非是cpu密集型應用。
進行阻塞(Blocking)操作(如IO時)會阻塞掉整個程序
使用yield實現協程操作例子

 1 import time
 2 import queue
 3 def consumer(name):
 4     print("--->starting eating baozi...")
 5     while True:
 6         new_baozi = yield
 7         print("[%s] is eating baozi %s" % (name,new_baozi))
 8         #time.sleep(1)
 9 
10 def producer():
11     r = con.__next__()
12     r = con2.__next__()
13     n = 0
14     while n < 5:
15         n +=1
16         con.send(n)
17         con2.send(n)
18         print("\033[32;1m[producer]\033[0m is making baozi %s" %n )
19 
20 
21 if __name__ == '__main__':
22     con = consumer("c1")
23     con2 = consumer("c2")
24     p = producer()

Greenlet

 1 from greenlet import greenlet
 2   
 3 def test1():
 4     print 12
 5     gr2.switch()
 6     print 34
 7     gr2.switch()
 8   
 9   
10 def test2():
11     print 56
12     gr1.switch()
13     print 78
14   
15 gr1 = greenlet(test1)
16 gr2 = greenlet(test2)
17 gr1.switch()

Gevent 

Gevent 是一個第三方庫,可以輕松通過gevent實現並發同步或異步編程,在gevent中用到的主要模式是Greenlet, 它是以C擴展模塊形式接入Python的輕量級協程。 Greenlet全部運行在主程序操作系統進程的內部,但它們被協作式地調度。

 1 import gevent
 2  
 3 def foo():
 4     print('Running in foo')
 5     gevent.sleep(0)
 6     print('Explicit context switch to foo again')
 7  
 8 def bar():
 9     print('Explicit context to bar')
10     gevent.sleep(0)
11     print('Implicit context switch back to bar')
12  
13 gevent.joinall([
14     gevent.spawn(foo),
15     gevent.spawn(bar),
16 ])

同步與異步的性能區別 

 1 import gevent
 2  
 3 def task(pid):
 4     """
 5     Some non-deterministic task
 6     """
 7     gevent.sleep(0.5)
 8     print('Task %s done' % pid)
 9  
10 def synchronous():
11     for i in range(1,10):
12         task(i)
13  
14 def asynchronous():
15     threads = [gevent.spawn(task, i) for i in range(10)]
16     gevent.joinall(threads)
17  
18 print('Synchronous:')
19 synchronous()
20  
21 print('Asynchronous:')
22 asynchronous()

上面程序的重要部分是將task函數封裝到Greenlet內部線程的gevent.spawn。 初始化的greenlet列表存放在數組threads中,此數組被傳給gevent.joinall 函數,后者阻塞當前流程,並執行所有給定的greenlet。執行流程只會在 所有greenlet執行完后才會繼續向下走。  

遇到IO阻塞時會自動切換任務

 1 from gevent import monkey; monkey.patch_all()
 2 import gevent
 3 from  urllib.request import urlopen
 4  
 5 def f(url):
 6     print('GET: %s' % url)
 7     resp = urlopen(url)
 8     data = resp.read()
 9     print('%d bytes received from %s.' % (len(data), url))
10  
11 gevent.joinall([
12         gevent.spawn(f, 'https://www.python.org/'),
13         gevent.spawn(f, 'https://www.yahoo.com/'),
14         gevent.spawn(f, 'https://github.com/'),
15 ])

通過gevent實現單線程下的多socket並發

server side 

 1 import sys
 2 import socket
 3 import time
 4 import gevent
 5  
 6 from gevent import socket,monkey
 7 monkey.patch_all()
 8 def server(port):
 9     s = socket.socket()
10     s.bind(('0.0.0.0', port))
11     s.listen(500)
12     while True:
13         cli, addr = s.accept()
14         gevent.spawn(handle_request, cli)
15 def handle_request(s):
16     try:
17         while True:
18             data = s.recv(1024)
19             print("recv:", data)
20             s.send(data)
21             if not data:
22                 s.shutdown(socket.SHUT_WR)
23  
24     except Exception as  ex:
25         print(ex)
26     finally:
27  
28         s.close()
29 if __name__ == '__main__':
30     server(8001)

client side

 1 import socket
 2  
 3 HOST = 'localhost'    # The remote host
 4 PORT = 8001           # The same port as used by the server
 5 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
 6 s.connect((HOST, PORT))
 7 while True:
 8     msg = bytes(input(">>:"),encoding="utf8")
 9     s.sendall(msg)
10     data = s.recv(1024)
11     #print(data)
12  
13     print('Received', repr(data))
14 s.close()

 

 

 

 

 

 

轉載--> http://www.cnblogs.com/wupeiqi/articles/5040827.html

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM