線程、進程、協程和隊列


一.線程、進程

1.簡述

  • 進程是一個具有一定獨立功能的程序關於某個數據集合的一次運行活動。它是操作系統動態執行的基本單元,通俗講就是自定義一段程序的執行過程,即一個正在運行的程序。線程是進程的基本單位,又稱為輕量級進程。 * 不同的進程在內存中會開辟獨立的地址空間,默認進程之間的數據是不共享,線程是由進程創建,所以處在同一個進程中的所有線程都可以訪問該進程所包含的地址空間,當然也包含存儲在該空間中的所有資源。
  • 應用場景:

    IO密集型操作由於不占用CPU資源,所以一般使用線程來完成
    計算密集型操作靠cpu,所以一般使用進程來完成

  • 為什么使用多線程或多進程?
    多線程和多進程可以提供程序的並發處理能力。看下面需求:
    現在有10台主機,現在需要監控主機的存過狀態,默認使用單線程,如下:

import time


st = time.time()           #程序開始時間
def f1(arg):
    time.sleep(2)            #假設ping一次需要2s
    print("ping %s主機中..." % arg)
host_List = [0,1,2,3,4,5,6,7,8,9,]   #假設列表中1233。。表示10個主機
for i in host_List:
    f1(i)

cost_time = time.time() - st
print('程序耗時:%s' % cost_time)

 

程序運行結果:

ping 0主機中...
ping 1主機中...
ping 2主機中...
ping 3主機中...
ping 4主機中...
ping 5主機中...
ping 6主機中...
ping 7主機中...
ping 8主機中...
ping 9主機中...
程序耗時:20.002294063568115

 

發現耗時20s,這僅僅是10台機器,如果100台呢,效率會非常低。假如用了多線程呢?

import threading
import time


st = time.time()           #程序開始時間
def f1(arg):
    time.sleep(2)            #假設ping一次需要2s
    print("ping %s主機中..." % arg)
host_List = [0,1,2,3,4,5,6,7,8,9,]   #假設列表中1233。。表示10個主機
for i in host_List:
    t = threading.Thread(target=f1, args=(i,))
    t.start()
t.join()
cost_time = time.time() - st
print('程序耗時:%s' % cost_time)

 

運行結果:
ping 0主機中...
ping 1主機中...
ping 5主機中...
ping 4主機中...
ping 2主機中...
ping 3主機中...
ping 7主機中...
ping 6主機中...
ping 8主機中...
ping 9主機中...
程序耗時:2.002915382385254

 

從結果中看出,10個機器啟用10個線程並發去獨立ping,這樣耗時僅僅是單線程的耗時,效率大大提供。所以多進程多線程一般用來提高並發

2.線程進程的基本操作

創建

  • 線程

    • 創建方法
    import threading
    import time
    
    def f1(args):
      time.sleep(2)
      print(args)
    
    #方式1 直接使用thread模塊進行創建
    for i in range(10):
      t = threading.Thread(target=f1,args=(123,))   #target是要執行的任務(函數),args是任務(函數)的參數
      t.start()
    
    #方式2 使用自定義類創建
    
    class Mythread(threading.Thread):
      def __init__(self,func,args):
          self.func = func
          self.args = args
          super(Mythread,self).__init__()
      def run(self):
          self.func(self.args)
    
    obj = Mythread(f1,123)
    obj.start()

     

    上述代碼創建了10個“前台”線程,然后控制器就交給了CPU,CPU根據指定算法進行調度,分片執行指令

    • 線程的其他方法

      • start 線程准備就緒
      • setName 為線程設置名稱
      • getName 獲取線程名稱
      • setDaemon 設置為后台線程或前台線程(默認),注意需要設置在start前 如果是后台線程,主線程執行過程中,后台線程也在進行,主線程執行完畢后,后台線程不論成功與否,均停止 如果是前台線程,主線程執行過程中,前台線程也在進行,主線程執行完畢后,等待前台線程也執行完成后,程序停止
      • join 放在for循環內表示逐個執行每個線程,執行完畢后繼續往下執行,該方法使得多線程變得無意義,放在for循環外,會阻塞主進程,這樣主進程會等待線程執行完之后,再去繼續執行下面的代碼
      • run 線程被cpu調度后自動執行線程對象的run方法,這也是線程第二種創建方法的原理
    • 方法具體使用

      • 使用setDaemon

        默認不使用setDaemon 情況

      import threading
      import time
      
      def f1(args):
          time.sleep(2)
          print(args)
      
      print('-----start------')
      for i in range(10):
          t = threading.Thread(target=f1,args=(123,))
          t.setDaemon(False)        #默認為false 主線程等待子線程執行完之后再退出程序
          t.start()
      
      print('-------end------')
      執行效果:
      
      -----start------   
      -------end------        end主線程執行完畢,等待子線程執行
      123     子線程執行結果
      123
      123
      123
      123
      123
      123
      123
      123
      123
      前台進程

       

      使用setDaemon 情況:

      import threading
      import time
      #方式1
      
      def f1(args):
          time.sleep(2)
          print(args)
      
      print('-----start------')
      for i in range(10):
          t = threading.Thread(target=f1,args=(123,))
          t.setDaemon(True)        #設置為True, 主線程不管子線程是否執行完成,直接退出程序
          t.start()
      
      print('-------end------')
      執行結果:
      
      -----start------
      -------end------  主線程不等待子線程,直接執行完之后退出
      后台進程

       

      • 使用join

        默認不使用join的話,子線程會並發執行

      import threading
      import time
      #方式1
      
      def f1(args):
          time.sleep(2)
          print("子線程:",args)
      
      print('-----start------')
      for i in range(10):
          t = threading.Thread(target=f1,args=(i,))
          t.start()
      
      print('-------end------')
      
      執行效果:
      
      
      -----start------
      -------end------
      子線程: 0
      子線程: 8
      子線程: 3
      子線程: 4
      子線程: 7
      子線程: 1
      子線程: 2
      子線程: 5
      子線程: 9
      子線程: 6
      不使用join

       

      阻塞子線程

      import threading
      import time
      
      def f1(args):
          time.sleep(2)
          print("子線程:",args)
      
      print('-----start------')
      for i in range(10):
          t = threading.Thread(target=f1,args=(i,))
          t.start()
          t.join()        #join會阻塞剩下的所有線程
          print(time.time())    #每個線程執行時的時間
      
      print('-------end------')
      
      執行效果:發現線程是單個依次執行
      
      -----start------
      子線程: 0
      1468933921.031646
      子線程: 1
      1468933923.037936
      子線程: 2
      1468933925.040694
      子線程: 3
      1468933927.044142
      子線程: 4
      1468933929.047844
      子線程: 5
      1468933931.049584
      子線程: 6
      1468933933.053723
      子線程: 7
      1468933935.056232
      子線程: 8
      1468933937.061142
      子線程: 9
      1468933939.065674
      -------end------
      在for循環內

       

      阻塞主線程

      import threading
      import time
      
      def f1(args):
          time.sleep(2)
          print("子線程:",args)
      
      print('-----start------')
      for i in range(10):
          t = threading.Thread(target=f1,args=(i,))
          t.start()
      
      t.join()  # join會阻塞剩下的所有線程
      print('-------end------')
      執行效果:
      
      -----start------
      子線程: 1
      子線程: 0
      子線程: 4
      子線程: 5
      子線程: 6
      子線程: 2
      子線程: 3
      子線程: 7
      子線程: 9
      -------end------
      子線程: 8    #到最后一個線程之后,不再阻塞主線程,而子線程運行需要2s,所以會先打印主線程end,在打印子線程
      在for循環外

       

  • 進程

    • 創建方法

    進程的創建方法和線程類似

    import multiprocessing
    import time
    
    def f1(args):
        time.sleep(2)
        print("進程:",args)
    
    #方法1
    
    for i in range(10):
        t = multiprocessing.Process(target=f1,args=(i,))
        t.start()
    
    #方法2:
    
    class Myprocess(multiprocessing.Process):
        def __init__(self,func,args):
            self.func = func
            self.args = args
            super(Myprocess,self).__init__()
        def run(self):
            self.func(self.args)

     

    • 其它方法 進程同樣支持 join(),setDaemon(),run(),setName(),getName()等方法,和線程的使用一樣,不再贅述
    • 方法使用

      參考線程使用

線程鎖(Lock、RLock)

由於線程之間是進行隨機調度,並且每個線程可能只執行n條執行之后,當多個線程同時修改同一條數據時可能會出現臟數據,所以,出現了線程鎖 - 同一時刻允許一個線程執行操作。

  • 未使用線程鎖:
import threading
import time

NUM = 10


def f1(arg):
    global NUM

    NUM -= 1            #讓每個線程執行時,將NUM的值減去1
    time.sleep(2)
    print(NUM)


for i in range(10):
    t = threading.Thread(target=f1,args=(123,))
    t.start()

執行效果:發現每個線程同時都在操作NUM,最后打印的結果都是0

0
0
0
0
0
0
0
0
0
0

 

  • 使用線程鎖

使用線程鎖,當一個線程開始處理事務A時,先在事務A上把鎖,然后開始
處理事務A,處理完程之后,再解鎖。其他進程遇到線程鎖,則處於等待中
直到有線程解鎖了該事務

import threading
import time

NUM = 10


def f1(arg):
    global NUM
    arg.acquire()   #阻塞后面的線程

    NUM -= 1
    time.sleep(2)
    print(NUM)
    arg.release()       #放開后面的線程


lock = threading.Lock()
for i in range(10):
    t = threading.Thread(target=f1,args=(lock,))
    t.start()

 

執行效果:發現線程是逐步操作NUM的

9
8
7
6
5
4
3
2
1
0

 

此有別於join()方法,join是在線程從開始執行的時候,按照單線程依次執行,也就意味着所有的任務都是單線程執行,而線程鎖是針對執行的任務進行上鎖,解鎖

  • Rlock和lock的區別

Rlock支持遞歸上鎖,解鎖,lock只支持單個上鎖解鎖

import threading
import time

NUM = 10


def f1(arg,lock):
    global NUM
    print('線程:',arg,'執行1')
    lock.acquire()   #阻塞后面的線程
    NUM -= 1

    lock.acquire()    #繼續上鎖
    time.sleep(2)       #sleep 2秒
    print('線程:',arg,time.time())      #打印當前時間戳
    lock.release()          #解鎖
    print('線程執行結果:',arg,NUM)

    lock.release()       #放開后面的線程
    print(123)


lock = threading.RLock()
for i in range(10):
    t = threading.Thread(target=f1,args=(i,lock,))
    t.start()

 

執行結果:

print('線程:',arg,'執行1')操作沒有加鎖,所以所有線程運行至此
線程: 0 執行1
線程: 1 執行1
線程: 2 執行1
線程: 3 執行1
線程: 4 執行1
線程: 5 執行1
線程: 6 執行1
線程: 7 執行1
線程: 8 執行1
線程: 9 執行1
#print('線程:',arg,time.time()) 和    print('線程執行結果:',arg,NUM) 在鎖中,所以所有線程會先依次執行
線程: 0 1468937172.877005
線程執行結果: 0 9
123
線程: 1 1468937174.878281
線程執行結果: 1 8
123
線程: 2 1468937176.883292
線程執行結果: 2 7
123
線程: 3 1468937178.886487
線程執行結果: 3 6
123
線程: 4 1468937180.888464
線程執行結果: 4 5
123
線程: 5 1468937182.892491
線程執行結果: 5 4
123
線程: 6 1468937184.893167
線程執行結果: 6 3
123
線程: 7 1468937186.894769
線程執行結果: 7 2
123
線程: 8 1468937188.89809
線程執行結果: 8 1
123
線程: 9 1468937190.899442
線程執行結果: 9 0
123
View Code

 

信號量(Semaphore)

線程鎖 同時只允許一個線程更改數據,而Semaphore是同時允許一定數量的線程更改數據 ,比如廁所有3個坑,那最多只允許3個人上廁所,后面的人只能等里面有人出來了才能再進去。

import threading
import time
def f1(arg,lock):
    lock.acquire()
    print('線程:',arg)
    print(time.time())
    lock.release()
lock = threading.BoundedSemaphore(5)    #5表示最多同時運行5個線程

for i in range(10):
    t = threading.Thread(target=f1,args=(i,lock,))
    t.start()

 

執行結果:發現每5個線程的執行時間戳是一樣的。小數點后微秒可忽略

線程: 0
1468937937.129999
線程: 1
1468937937.130123
線程: 2
1468937937.130278
線程: 3
1468937937.13044
線程: 4
1468937937.130581
線程: 5
1468937937.130719
線程: 6
1468937937.13086
線程: 7
1468937937.130994
線程: 8
1468937937.131154
線程: 9
1468937937.131289
View Code

 

事件(event)

python線程的事件用於主線程控制其他線程的執行,一個線程發送/傳遞事件,另外的線程等待事件的觸發事件。主要提供了三個方法 set、wait、clear

事件處理的機制:全局定義了一個“Flag”,如果“Flag”值為 False,那么阻塞線程執行,如果“Flag”值為True,那么線程繼續執行。
wait()方法:當事件標志為False時將阻塞線程,當事件標志為True時,什么也不做
set()方法:它設置事件標志為True,並且喚醒其他線程。條件鎖對象保護程序修改事件標志狀態的關鍵部分
clear()方法正好相反,它設置時間標志為False

import threading
import time

def f1(arg,e):
    print('線程:',arg)
    e.wait()            #阻塞線程
    print('線程繼續執行:',arg)

event = threading.Event()

for i in range(3):
    t = threading.Thread(target=f1,args=(i,event))
    t.start()

event.clear()

res = input('>>')
if res == '1':
    event.set()   #放開 線程

 

執行結果:

線程: 0
線程: 1
線程: 2
>>1     #輸入1,觸發線程繼續執行的信號
線程繼續執行: 0
線程繼續執行: 1
線程繼續執行: 2

 

條件(Condition)

使得線程等待,條件是針對單個線程的,條件成立,則不再阻塞線程,
條件不成立,一直阻塞

import threading
import time
def condition_func():

    ret = False
    inp = input('>>>')
    if inp == '1':
        ret = True

    return ret


def run(n):
    con.acquire()
    con.wait_for(condition_func)   #當condition_func返回值為真時,觸發線程繼續運行
    print("run the thread: %s" %n)
    con.release()

if __name__ == '__main__':

    con = threading.Condition()
    for i in range(3):
        t = threading.Thread(target=run, args=(i,))
        t.start()

 

運行效果:

>>>1    #每次手動輸入一個1,觸發一個線程運行
run the thread: 0
>>>1
run the thread: 1
>>>1
run the thread: 2

 

Timer

定時器,指定n秒后執行某操作

from threading import Timer
 
 
def hello():
    print("hello, world")
 
t = Timer(1, hello)
t.start()  # after 1 seconds, "hello, world" will be printed

 

進程和線程一樣,同樣支持進程鎖、信號量、事件、條件、timer用法一摸一樣,可參考線程使用方法,不在贅述

3.進程之間數據共享

由於不同的進程會有各自的內存地址空間,所以進程之間的數據默認是不能共享的

  • 運行結果
from multiprocessing import Process
import time

li = []

def foo(i):
    li.append(i)
    print('say hi',li)

for i in range(10):
    p = Process(target=foo,args=(i,))
    p.start()

time.sleep(1)
print('ending',li)

 

運行結果:發現每個子進程都獨立操作li列表

say hi [0]
say hi [1]
say hi [2]
say hi [3]
say hi [4]
say hi [5]
say hi [6]
say hi [7]
say hi [8]
say hi [9]
ending []

 

  • 實現進程之間數據共享

    • 方法1:
    from multiprocessing import Process
    from multiprocessing import Array
    import time
    
    li = Array('i',10)
    
    #i 的類型  
    '''   
    'c': ctypes.c_char, 'u': ctypes.c_wchar,
    'b': ctypes.c_byte, 'B': ctypes.c_ubyte,
    'h': ctypes.c_short, 'H': ctypes.c_ushort,
    'i': ctypes.c_int, 'I': ctypes.c_uint,
    'l': ctypes.c_long, 'L': ctypes.c_ulong,
    'f': ctypes.c_float, 'd': ctypes.c_double
    '''
    def foo(i,q):
        q[i] = i + 100
        print('say hi',i)
    
    for i in range(10):
        p = Process(target=foo,args=(i,li,))
        p.start()
        p.join()
    
    time.sleep(4)
    for i in li:
        print(i)
    Array方法

     

    • 方法2
    from multiprocessing import Process
    from multiprocessing import Manager
    import multiprocessing
    import time
    
    obj = Manager()
    li = obj.dict()
    
    def foo(i,q):
        q[i] = i + 100
        print('say hi',i)
    
    for i in range(10):
        p = Process(target=foo,args=(i,li,))
        p.start()
        p.join()
    
    time.sleep(2)
    print(li)
    Dict方法

     

    • 方法3:
from multiprocessing import Process
from multiprocessing import queues
import multiprocessing
import time

li = queues.Queue(ctx=multiprocessing)

def foo(i,q):
    q.put(i)
    print('say hi',i)

for i in range(10):
    p = Process(target=foo,args=(i,li,))
    p.start()
    p.join()



time.sleep(4)
print(li.qsize())
queue方法

 

4.python內部隊列Queue

隊列(queue)是一種具有先進先出特征的線性數據結構,元素的增加只能在一端進行,元素的刪除只能在另一端進行。能夠增加元素的隊列一端稱為隊尾,可以刪除元素的隊列一端則稱為隊首。python內部支持一套輕量級queue隊列

  • queue隊列的方法:
    • Queue(maxsize=0) 先進先出隊列,maxsize表示隊列元素數量,0表示無限
    • LifoQueue(maxsize=0) 后進先出隊列
    • PriorityQueue(maxsize=0) 優先級隊列,優先級值越小,優先級越高
    • deque(maxsize=0) 雙向隊列
    • empty() 判斷隊列是否為空,為空時返回True,否則為False
    • full() 判斷隊列是否已滿,滿時返回True,否則為False
    • put(item,[block[,timeout]] 在隊尾插入一個項目。參數item為必需的,為插入項目的值;第二個block為可選參數,默認為True,表示當前隊列滿時,put()方法就使調用線程暫停,直到空出一個數據單元。如果block為False,put方法將引發Full異常
    • get() 從隊頭刪除並返回一個項目。可選參數為block,默認為True。如果隊列為空且block為True,get()就使調用線程暫停,直至有項目可用。如果隊列為空且block為False,隊列將引發Empty異常。
    • qsize() 返回隊列長度
    • clear() 清空隊列
    • join() 等到隊列為空(即隊列中所有的項都被取走,處理完畢),再執行別的操作
    • task_done() 在完成一項工作之后,Queue.task_done()函數向任務已經完成的隊列發送一個信號. 注意:在多線程下,注意task_done的位置,每次執行task_done(),unfinished_tasks就減1,應該在一切搞定后,再執行task_done.

隊列支持下面的四種情況:

  • 先進先出隊列
import queue

q = queue.Queue(4)  #創建隊列  容量為4

q.put(123)      #往隊列中插值
q.put(431)


print(q.maxsize)    #隊列容量
print(q.qsize())    #隊列目前元素的容量

print(q.get())    #隊列取值
print(q.get())

 

執行效果:

4
2
123
431

 

先進先出原則第一次存放的是123,第二次存放的是431,那么我們在獲取值得時候,第一次獲取的就是123,第二次就是431
如果隊列滿之后,再put 或者隊列為空時,再get,進程就就掛在哪里,put會等待,直到隊列中有空間之后才能put成功,get會等待,直到隊列中有元素之后,才能獲取到值,如果不需要等待,可以通過設置block=False來拋出異常,使用try捕捉異常

import queue

q = queue.Queue(5)

for i in range(8):
    try:
        q.put(i,block=False)
        print(i,'已提交隊列')
    except:
        print('隊列已滿')

for i in range(8):
    try:
        res = q.get(block=False)
        print('從隊列取出:',res)
    except:
        print('隊列已空')

 

效果:

0 已提交隊列
1 已提交隊列
2 已提交隊列
3 已提交隊列
4 已提交隊列
隊列已滿
隊列已滿
隊列已滿
從隊列取出: 0
從隊列取出: 1
從隊列取出: 2
從隊列取出: 3
從隊列取出: 4
隊列已空
隊列已空
隊列已空
View Code

 

  • 后進先出
import queue
q = queue.LifoQueue()
q.put(123)
q.put(456)
print(q.get())
print(q.get())
輸出結果:

456
123

 

  • 根據優先級處理
import queue
q = queue.PriorityQueue()   #根據優先級處理
q.put((2,"alex1"))
q.put((1,"alex2"))
q.put((3,"alex3"))
print(q.get())
print(q.get())
print(q.get())
輸出結果:

(1, 'alex2')
(2, 'alex1')
(3, 'alex3')

 

  • 雙向隊列
q = queue.deque()          #雙向隊列
q.append((123))         #右邊進
q.append(234)
print(q.pop())          #右邊出,后進先出
print(q.pop())


q.appendleft(555)       #左邊進
q.appendleft(666)
print(q.popleft())         #左邊出,后進先出
print(q.popleft())
輸出效果:

234
123
666
555

 

5.支持並發的兩種模型

生產者消費者模型

生產者消費者模型是通過平衡生產線程和消費線程的工作能力來提高程序的整體處理數據的速度

  • 為什么要使用生產者和消費者模式

在線程世界里,生產者就是生產數據的線程,消費者就是消費數據的線程。在多線程開發當中,如果生產者處理速度很快,而消費者處理速度很慢,那么生產者就必須等待消費者處理完,才能繼續生產數據。同樣的道理,如果消費者的處理能力大於生產者,那么消費者就必須等待生產者。為了解決這個問題於是引入了生產者和消費者模式。

  • 什么是生產者消費者模式

生產者消費者模式是通過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通訊,而通過阻塞隊列來進行通訊,所以生產者生產完數據之后不用等待消費者處理,直接扔給阻塞隊列,消費者不找生產者要數據,而是直接從阻塞隊列里取,阻塞隊列就相當於一個緩沖區,平衡了生產者和消費者的處理能力。

這個阻塞隊列就是用來給生產者和消費者解耦的

  • 實現:
import queue
import threading
import time



q = queue.Queue(30)   #創建一個隊列,用戶生產者和消費者通訊

#模擬訂單創建處理


def product(arg):        #生產者  創建訂單
    while True:
        q.put("訂單" + str(arg))
        print(arg,"創建訂單")


def cost(arg):          #消費者,處理訂單
    while True:
        print(arg , "處理:" ,q.get())
        time.sleep(2)    #sleep 2秒表示 消費者處理需要2s

#創建生產者線程
for i in range(3):
    t = threading.Thread(target=product,args=(i,))
    t.start()

#創建消費者線程
for c in range(10):
    t = threading.Thread(target=cost,args=(c,))
    t.start()

 

效果:

0 創建訂單
0 創建訂單
0 創建訂單
0 創建訂單
0 創建訂單
0 創建訂單
0 創建訂單
0 創建訂單
0 創建訂單
0 創建訂單
0 創建訂單
0 創建訂單
0 創建訂單
0 創建訂單
0 創建訂單
0 創建訂單
0 創建訂單
0 創建訂單
0 創建訂單
0 創建訂單
0 創建訂單
0 創建訂單
0 創建訂單
0 創建訂單
0 創建訂單
0 創建訂單
0 創建訂單
0 創建訂單
0 創建訂單
1 創建訂單
0 處理: 訂單0
1 創建訂單
1 處理: 訂單0
0 創建訂單
2 處理: 訂單0
2 創建訂單
3 處理: 訂單0
1 創建訂單
4 處理: 訂單0
0 創建訂單
5 處理: 訂單0
2 創建訂單
6 處理: 訂單0
1 創建訂單
7 處理: 訂單0
0 創建訂單
8 處理: 訂單0
2 創建訂單
9 處理: 訂單0
1 創建訂單
1 處理: 訂單0
2 處理: 訂單0
2 創建訂單
2 創建訂單
6 處理: 訂單0
1 創建訂單
3 處理: 訂單0
4 處理: 訂單0
5 處理: 訂單0
8 處理: 訂單0
2 創建訂單
2 創建訂單
2 創建訂單
2 創建訂單
0 處理: 訂單0
7 處理: 訂單0
2 創建訂單
2 創建訂單
9 處理: 訂單0
1 創建訂單
1 處理: 訂單0
3 處理: 訂單0
5 處理: 訂單0
8 處理: 訂單0
0 處理: 訂單0
2 創建訂單
6 處理: 訂單0
4 處理: 訂單0
2 處理: 訂單0
2 創建訂單
2 創建訂單
2 創建訂單
2 創建訂單
2 創建訂單
9 處理: 訂單0
0 創建訂單
0 創建訂單
0 創建訂單
7 處理: 訂單1
0 創建訂單
3 處理: 訂單1
1 處理: 訂單0
0 處理: 訂單2
4 處理: 訂單1
5 處理: 訂單0
7 處理: 訂單2
1 創建訂單
1 創建訂單
1 創建訂單
1 創建訂單
1 創建訂單
1 創建訂單
8 處理: 訂單1
6 處理: 訂單0
9 處理: 訂單2
1 創建訂單
1 創建訂單
1 創建訂單
2 處理: 訂單1
2 創建訂單
0 處理: 訂單2
4 處理: 訂單2
5 處理: 訂單1
8 處理: 訂單2
2 創建訂單
2 創建訂單
2 創建訂單
2 創建訂單
7 處理: 訂單2
2 創建訂單
6 處理: 訂單2
2 創建訂單
1 處理: 訂單2
0 創建訂單
9 處理: 訂單2
2 處理: 訂單2
3 處理: 訂單1
2 創建訂單
2 創建訂單
2 創建訂單
View Code

 

訂閱者模型

訂閱者模型即多個訂閱者訂閱一個頻道,發布者只需要將消息發布到該頻道中,則訂閱該頻道的訂閱者都會收到消息

redis實現消息訂閱模型,請點擊:http://www.cnblogs.com/pycode/p/cache.html

rabbitMQ實現消息訂閱模型,請點擊:http://www.cnblogs.com/pycode/p/rabbitmq.html

6.線程池和進程池

線程池

提高並發並不是線程越多越好,每個系統對於線程的數量都有一個臨界值,線程數量超過該臨界值后,反而會降低系統性能。線程的上下文切換,遇到大量線程,也就很耗時,所以線程池的定義就是定義一組線程,用於處理當前的事務,線程處理完當前事務后,在繼續處理其它事務。當事務超過線程池的處理能力,事務則等待出現空閑線程。線程池的線程數量也是可以根據系統性能調節額
python中沒有線程池的機制,即使是python3中提供了該機制,也很low,所以進程池一般需要自己定義
* 簡單實現線程池

利用隊列,事先將創建的線程放在隊列中,有事務需要執行時,從隊列中取出一個線程進行執行,執行完之后自動再往隊列中添加一個線程,實現隊列中的線程 終止一個,創建一個

import queue
import threading
import time


class Threadpoll:

    def __init__(self,maxsize=5):
        self.maxsize = maxsize         #線程池的線程最大數量
        self._q = queue.Queue(self.maxsize)     #線程隊列
        for i in range(self.maxsize):       #根據隊列容量,添加滿線程
            self.put_thread()               #調用線程put到隊列的方法

    def get_thread(self):           #獲取隊列中的線程
        return self._q.get()

    def put_thread(self):           #往隊列中put線程.put的值為線程的對象
        self._q.put(threading.Thread)


pool = Threadpoll(10)     #創建一個線程池,最大線程數量為10

def task(arg):          #執行的任務函數
    time.sleep(3)
    print(arg)
    pool.put_thread()    #執行完畢,線程已經停止,需要重新往隊列中put一個線程

for i in range(1000):
    t = pool.get_thread()   #獲取一個線程
    obj = t(target=task,args=(i,))      #指定線程執行的任務
    obj.start()     #開啟線程
Low版

 

  • 升級版線程池 簡單的線程池,只能實現可控的線程數量,實現處理多個事物,但是其中還是存在很多問題,如1.線程不重用,線程執行完之后,線程就死掉了,最終被垃圾回收機制處理,我們需要重新創建線程數量來填補隊列。2.線程數量是固定的,當事務數量小於線程數量時,多數線程處於等待狀態,造成線程浪費。下面將完美實現線程池
#!/usr/bin/env python
# -*- coding: UTF-8 -*-
#pyversion:python3.5
#owner:fuzj

import queue
import threading
import contextlib
import time

StopEvent = object()          #創建一個空的事務對象


class ThreadPool(object):
    '''線程池類'''

    def __init__(self, max_num, max_task_num = None):
        if max_task_num:                        #隊列中事務個數容量
            self.q = queue.Queue(max_task_num)
        else:                               #默認為不限制容量
            self.q = queue.Queue()
        self.max_num = max_num              #線程池線程的最大數量
        self.cancel = False             #線程執行完后是否終止線程
        self.terminal = False           #是否立即終止線程
        self.generate_list = []     #存放正在運行的線程
        self.free_list = []         #存放空閑的線程

    def run(self, func, args, callback=None):
        """
        線程池執行一個任務
        :param func: 任務函數
        :param args: 任務函數所需參數
        :param callback: 任務執行失敗或成功后執行的回調函數,回調函數有兩個參數1、任務函數執行狀態;2、任務函數返回值(默認為None,即:不執行回調函數)
        :return: 如果線程池已經終止,則返回True否則None
        """
        if self.cancel:
            return
        if len(self.free_list) == 0 and len(self.generate_list) < self.max_num:    #判斷是否創建新的線程:當沒有空閑線程和當前正在運行的線程總數量小於線程池的容量
            self.generate_thread()      #調用創建線程的方法
        w = (func, args, callback,)     #將執行事務(函數).參數和回調函數 組合為元組
        self.q.put(w)               #將事務放到隊列中,等待線程處理

    def generate_thread(self):
        """
        創建線程的方法
        """
        t = threading.Thread(target=self.call)     #創建線程,並調用call方法
        t.start()

    def call(self):
        """
        循環去獲取任務函數並執行任務函數
        """
        current_thread = threading.currentThread            #當前運行的線程
        self.generate_list.append(current_thread)           #將當前運行的線程放入運行列表中

        event = self.q.get()            #從隊列中獲取事務
        while event != StopEvent:       #循環從隊列中取事務,並判斷每次取的是否為空事務

            func, arguments, callback = event    #func 為事務名.arguments為參數,callback為回調函數
            try:                        #捕捉異常
                result = func(*arguments)           #執行事務(函數)
                success = True          #事務執行成功
            except Exception as e:          #事務執行出現異常
                success = False             #事務執行失敗
                result = None           #結果為None

            if callback is not None:        #判斷回調函數是否有實際內容
                try:
                    callback(success, result)       #將線程的執行結果和成功與否傳參,執行回調函數
                except Exception as e:              #捕捉異常
                    pass

            with self.worker_state(self.free_list, current_thread):     #執行worker_state方法,記錄等待線程
                if self.terminal:
                    event = StopEvent
                else:
                    event = self.q.get()
        else:

            self.generate_list.remove(current_thread)

    def close(self):
        """
        執行完所有的任務后,所有線程停止
        """
        self.cancel = True          #表示線程執行完則終止
        full_size = len(self.generate_list)         #取出正在運行的線程數量n
        while full_size:
            self.q.put(StopEvent)               #在隊列中put n個空事務
            full_size -= 1

    def terminate(self):
        """
        無論是否還有任務,終止線程
        """
        self.terminal = True        #強制執行線程終止

        while self.generate_list:       #根據現在正在運行的線程列表,往隊列中put空事務,有多少個線程put多少個空事務
            self.q.put(StopEvent)

        self.q.empty()      #判斷隊列是否為空

    @contextlib.contextmanager
    def worker_state(self, state_list, worker_thread):
        """
        用於記錄線程中正在等待的線程數
        """
        state_list.append(worker_thread)            #將空閑的線程放到空閑的列表中
        try:
            yield state_list
        finally:
            state_list.remove(worker_thread)        #當線程被調用之后,從空閑的線程列表中移除



# How to use


pool = ThreadPool(5)        #實例化線程池,線程最大數量為5

def callback(status, result):
    # status, execute action status
    # result, execute action return value
    pass

#執行的事務
def action(i):
    time.sleep(1)
    print(i)

for i in range(30):             #創建30個事務,然后調用線程池的線程執行
    ret = pool.run(action, (i,), callback)

time.sleep(5)
print(len(pool.generate_list), len(pool.free_list))
print(len(pool.generate_list), len(pool.free_list))
#pool.close()
pool.terminate()
終極版

 

PS 小知識:with的用法

with 語句是對try…expect…finally語法的一種簡化,並且提供了對於異常非常好的處理方式,適用於對資源進行訪問的場合,確保不管使用過程中是否發生異常都會執行必要的“清理”操作,釋放資源,比如文件使用后自動關閉、線程中鎖的自動獲取和釋放等。

 

```python

 

from contextlib import contextmanager

 

 

@contextmanager

def make_context():

    print('enter')  #with調用之一,會首先之行此代碼

    try:

        yield 113   #with 調用后返回的結果,通常是as定義的變量值

    except RuntimeError as err:

        print('error' , err)


    finally:

        print('exit') #with之行完成之后瞬間之行的語句


with make_context() as value :

    print(value)


輸出效果:

enter

113

exit

```

 

進程池

python中提哦那個了進程池的概念,可以直接使用

  • apply
from  multiprocessing import Process,Pool
import time

def f1(arg):
    time.sleep(2)
    print(arg+100,time.time())

pool = Pool(5)    #創建進程池
for i in range(8):
    pool.apply(func=f1, args=(i,))     #給進程指定任務
print('end')

 

輸出效果:發現進程是依次執行,沒有並發的效果

100 1469023658.397264
101 1469023660.400479
102 1469023662.40297
103 1469023664.408063
104 1469023666.410736
105 1469023668.413439
106 1469023670.414941
107 1469023672.417192
end

 

  • apply_sync
from  multiprocessing import Process,Pool
import time

def f1(arg):
    time.sleep(2)
    print(arg+100,time.time())

pool = Pool(5)    #創建進程池
for i in range(8):
    pool.apply_async(func=f1, args=(i,))     #給進程指定任務

print('end')
time.sleep(2)      #主進程等代2s

 

執行效果:發現事務沒有執行完,主進程終止了子進程

end
100 1469023994.189458
102 1469023994.189459
101 1469023994.189458
103 1469023994.189688
104 1469023994.190085

 

改進;

from  multiprocessing import Process,Pool
import time


def f1(arg):
    time.sleep(2)
    print(arg+100,time.time())



pool = Pool(5)    #創建進程池


for i in range(8):
    pool.apply_async(func=f1, args=(i,))     #給進程指定任務


print('end')
pool.close()        #所有任務執行完畢后基礎

time.sleep(2)
#pool.terminate()    #子進程立即終止
pool.join()  #進程池中進程執行完畢后再關閉,如果注釋,那么程序直接關閉。

print(123)

 

執行效果:

end
100 1469024133.797895
102 1469024133.797907
101 1469024133.797895
104 1469024133.7983
103 1469024133.797907
107 1469024135.800982
106 1469024135.800983
105 1469024135.801028
123

 

二.協程

1.簡介

線程和進程的操作是由程序觸發系統接口,最后的執行者是系統;協程的操作則是程序員。

協程存在的意義:對於多線程應用,CPU通過切片的方式來切換線程間的執行,線程切換時需要耗時(保存狀態,下次繼續)。協程,則只使用一個線程,在一個線程中規定某個代碼塊執行順序。
協成的原理:利用一個線程,分解一個線程成為多個微線程,注意此是從程序級別來分解的
協程的適用場景:當程序中存在大量不需要CPU的操作時(IO),適用於協程;

2.如何實現

greenlet和gevent需要手動安裝模塊。直接安裝gevent默認會把greenlet裝上
* 基於底層greenlet實現

from greenlet import greenlet


def test1():
    print(12)
    gr2.switch()  
    print(34)
    gr2.switch()


def test2():
    print(56)
    gr1.switch()
    print(78)

gr1 = greenlet(test1)
gr2 = greenlet(test2)
gr1.switch()

 

執行過程:解釋器從上倒下讀代碼后,讀到gr1.switch()時,開始執行gr1對應的test1函數,test1函數執行完print(12)后遇到gr2.swith(),會自動執行gr2的test2,test2函數中執行完print(56)遇到gr1.switch(),會繼續執行test1的 print(34),最后遇到gr2.switch(),會執行test2的print(78)
輸出效果:

12
56
34
78

 

  • 使用gevent實現
import gevent

def foo():
    print('Running in foo')
    gevent.sleep(0)
    print('Explicit context switch to foo again')

def bar():
    print('Explicit context to bar')
    gevent.sleep(0)
    print('Implicit context switch back to bar')

gevent.joinall([
    gevent.spawn(foo),
    gevent.spawn(bar),
])

 

效果:

Running in foo
Explicit context to bar
Explicit context switch to foo again
Implicit context switch back to bar

 

  • 舉例
from gevent import monkey; monkey.patch_all()
import gevent
import requests

def f(url):
    print('GET: %s' % url)
    resp = requests.get(url)
    data = resp.text
    print('%d bytes received from %s.' % (len(data), url))

gevent.joinall([
        gevent.spawn(f, 'https://www.python.org/'),
        gevent.spawn(f, 'https://www.yahoo.com/'),
        gevent.spawn(f, 'https://github.com/'),
])

 

效果:

GET: https://www.python.org/
GET: https://www.yahoo.com/
GET: https://github.com/
431218 bytes received from https://www.yahoo.com/.
25529 bytes received from https://github.com/.
47394 bytes received from https://www.python.org/.

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM