python-並發編程


python-並發編程

鑒於本篇文章較長,如需快速查找,使用右側紫色目錄或者ctrl+F直接搜索可以節省您的時間。

1,背景知識

  顧名思義,進程即正在執行的一個過程。進程是對正在運行程序的一個抽象。

  進程的概念起源於操作系統,是操作系統最核心的概念,也是操作系統提供的最古老也是最重要的抽象概念之一。操作系統的其他所有內容都是圍繞進程的概念展開的。

  所以想要真正了解進程,必須事先了解操作系統。點擊進入

    PS:即使可以利用的cpu只有一個(早期的計算機確實如此),也能保證支持(偽)並發的能力。將一個單獨的cpu變成多個虛擬的cpu(多道技術:時間多路復用和空間多路復用+硬件上支持隔離),沒有進程的抽象,現代計算機將不復存在

 1,操作系統的作用:

   1:隱藏丑陋復雜的硬件接口,提供良好的抽象接口

   2:管理、調度進程,並且將多個進程對硬件的競爭變得有序

 2, 多道技術:

   1.產生背景:針對單核,實現並發

     ps: 現在的主機一般是多核,那么每個核都會利用多道技術 有4個cpu,運行於cpu1的某個程序遇到io阻塞,會等到io結束再重新調度,會被調度到4個 cpu中的任意一個,具體由操作系統調度算法決定。

   2.空間上的復用:如內存中同時有多道程序

   3.時間上的復用:復用一個cpu的時間片 強調:遇到io切,占用cpu時間過長也切,核心在於切之前將進程的狀態保存下來,這樣 才能保證下次切換回來時,能基於上次切走的位置繼續運行

2,多進程

1,何為進程

進程:正在進行的一個過程或者說一個任務。而負責執行任務則是cpu。

2,進程與程序的區別

程序僅僅只是一堆代碼而已,而進程指的是程序的運行過程。

想象一位有一手好廚藝的計算機科學家qiuma正在為他的女兒元昊烘制生日蛋糕。

他有做生日蛋糕的食譜,

廚房里有所需的原料:面粉、雞蛋、韭菜,蒜泥等。

在這個比喻中:

做蛋糕的食譜就是程序(即用適當形式描述的算法)

計算機科學家就是處理器(cpu)

而做蛋糕的各種原料就是輸入數據。

進程就是廚師閱讀食譜、取來各種原料以及烘制蛋糕等一系列動作的總和。

現在假設計算機科學家qiuma的兒子ql哭着跑了進來,說:Hey, Dad, my head got stung by a bee.

科學家qiuma想了想,處理兒子ql蟄傷的任務比給女兒元昊做蛋糕的任務更重要,於是

計算機科學家就記錄下他照着食譜做到哪兒了(保存進程的當前狀態),然后拿出一本急救手冊,按照其中的指示處理蟄傷。這里,我們看到處理機從一個進程(做蛋糕)切換到另一個高優先級的進程(實施醫療救治),每個進程擁有各自的程序(食譜和急救手冊)。當蜜蜂蟄傷處理完之后,這位計算機科學家又回來做蛋糕,從他離開時的那一步繼續做下去
For example

需要強調的是:同一個程序執行兩次,那也是兩個進程,比如打開暴風影音,雖然都是同一個軟件,但是一個可以播放蒼井空,一個可以播放飯島愛。

3,並發與並行

無論是並行還是並發,在用戶看來都是'同時'運行的,不管是進程還是線程,都只是一個任務而已,真是干活的是cpu,cpu來做這些任務,而一個cpu同一時刻只能執行一個任務

1, 並發:是偽並行,即看起來是同時運行。單個cpu+多道技術就可以實現並發

2,並行:同時運行,只有具備多個cpu才能實現並行

  單核下,可以利用多道技術,多個核,每個核也都可以利用多道技術(多道技術是針對單核而言的)

有四個核,六個任務,這樣同一時間有四個任務被執行,假設分別被分配給了cpu1,cpu2,cpu3,cpu4,

一旦任務1遇到I/O就被迫中斷執行,此時任務5就拿到cpu1的時間片去執行,這就是單核下的多道技術

而一旦任務1的I/O結束了,操作系統會重新調用它(需知進程的調度、分配給哪個cpu運行,由操作系統說了算)

可能被分 配給四個cpu中的任意一個去執行

4,進程的創建(了解)

  但凡是硬件,都需要有操作系統去管理,只要有操作系統,就有進程的概念,就需要有創建進程的方式,一些操作系統只為一個應用程序設計,比如微波爐中的控制器,一旦啟動微波爐,所有的進程都已經存在。

  而對於通用系統(跑很多應用程序),需要有系統運行過程中創建或撤銷進程的能力,主要分為4種形式創建新的進程

  1. 系統初始化(查看進程linux中用ps命令,windows中用任務管理器,前台進程負責與用戶交互,后台運行的進程與用戶無關,運行在后台並且只在需要時才喚醒的進程,稱為守護進程,如電子郵件、web頁面、新聞、打印)

  2. 一個進程在運行過程中開啟了子進程(如nginx開啟多進程,os.fork,subprocess.Popen等)

  3. 用戶的交互式請求,而創建一個新進程(如用戶雙擊暴風影音)

  4. 一個批處理作業的初始化(只在大型機的批處理系統中應用)

  無論哪一種,新進程的創建都是由一個已經存在的進程執行了一個用於創建進程的系統調用而創建的:

  1. 在UNIX中該系統調用是:fork,fork會創建一個與父進程一模一樣的副本,二者有相同的存儲映像、同樣的環境字符串和同樣的打開文件(在shell解釋器進程中,執行一個命令就會創建一個子進程)

  2. 在windows中該系統調用是:CreateProcess,CreateProcess既處理進程的創建,也負責把正確的程序裝入新進程。

  關於創建的子進程,UNIX和windows

  1.相同的是:進程創建后,父進程和子進程有各自不同的地址空間(多道技術要求物理層面實現進程之間內存的隔離),任何一個進程的在其地址空間中的修改都不會影響到另外一個進程。

  2.不同的是:在UNIX中,子進程的初始地址空間是父進程的一個副本,提示:子進程和父進程是可以有只讀的共享內存區的。但是對於windows系統來說,從一開始父進程與子進程的地址空間就是不同的。

5,進程的終止(了解)

  1,正常退出(自願,如用戶點擊交互式頁面的叉號,或程序執行完畢調用發起系統調用正常退出,在linux中用exit,在windows中用ExitProcess)

  2,出錯退出(自願,python a.py中a.py不存在)

  3,嚴重錯誤(非自願,執行非法指令,如引用不存在的內存,1/0等,可以捕捉異常,try...except...)

  4,被其他進程殺死(非自願,如kill -9)

6,進程的層次結構

  無論UNIX還是windows,進程只有一個父進程,不同的是:

  1. 在UNIX中所有的進程,都是以init進程為根,組成樹形結構。父子進程共同組成一個進程組,這樣,當從鍵盤發出一個信號時,該信號被送給當前與鍵盤相關的進程組中的所有成員。

  2. 在windows中,沒有進程層次的概念,所有的進程都是地位相同的,唯一類似於進程層次的暗示,是在創建進程時,父進程得到一個特別的令牌(稱為句柄),該句柄可以用來控制子進程,但是父進程有權把該句柄傳給其他子進程,這樣就沒有層次了。

7,進程的狀態

tail -f access.log |grep '404'

  執行程序tail,開啟一個子進程,執行程序grep,開啟另外一個子進程,兩個進程之間基於管道'|'通訊,將tail的結果作為grep的輸入。

  進程grep在等待輸入(即I/O)時的狀態稱為阻塞,此時grep命令都無法運行

其實在兩種情況下會導致一個進程在邏輯上不能運行,

  1. 進程掛起是自身原因,遇到I/O阻塞,便要讓出CPU讓其他進程去執行,這樣保證CPU一直在工作

  2. 與進程無關,是操作系統層面,可能會因為一個進程占用時間過多,或者優先級等原因,而調用其他的進程去使用CPU。

因而一個進程由三種狀態

 

8,進程並發的實現(了解)

  進程並發的實現在於,硬件中斷一個正在運行的進程,把此時進程運行的所有狀態保存下來,為此,操作系統維護一張表格,即進程表(process table),每個進程占用一個進程表項(這些表項也稱為進程控制塊)

 

  該表存放了進程狀態的重要信息:程序計數器、堆棧指針、內存分配狀況、所有打開文件的狀態、帳號和調度信息,以及其他在進程由運行態轉為就緒態或阻塞態時,必須保存的信息,從而保證該進程在再次啟動時,就像從未被中斷過一樣。

3,開啟子進程的兩種方式

1,multiprocessing模塊

  python中的多線程無法利用多核優勢,如果想要充分地使用多核CPU的資源(os.cpu_count()查看),在python中大部分情況需要使用多進程。Python提供了multiprocessing。
    multiprocessing模塊用來開啟子進程,並在子進程中執行我們定制的任務(比如函數),該模塊與多線程模塊threading的編程接口類似。

  multiprocessing模塊的功能眾多:支持子進程、通信和共享數據、執行不同形式的同步,提供了Process、Queue、Pipe、Lock等組件。

    需要再次強調的一點是:與線程不同,進程沒有任何共享狀態,進程修改的數據,改動僅限於該進程內。

2,Process類的介紹

 創建進程的類

Process([group [, target [, name [, args [, kwargs]]]]]),由該類實例化得到的對象,表示一個子進程中的任務(尚未啟動) 強調: 1. 需要使用關鍵字的方式來指定參數 2. args指定的為傳給target函數的位置參數,是一個元組形式,必須有逗號

 參數介紹:

group參數未使用,值始終為None

target表示調用對象,即子進程要執行的任務

args表示調用對象的位置參數元組,args=(1,2,'egon',)

kwargs表示調用對象的字典,kwargs={'name':'egon','age':18}

name為子進程的名稱

 方法介紹:

p.start():啟動進程,並調用該子進程中的p.run()

p.run():進程啟動時運行的方法,正是它去調用target指定的函數,我們自定義類的類中一定要實現該方法

p.terminate():強制終止進程p,不會進行任何清理操作,如果p創建了子進程,該子進程就成了僵屍進程,使用該方法需要特別小心這種情況。如果p還保存了一個鎖那么也將不會被釋放,進而導致死鎖

p.is_alive():如果p仍然運行,返回True

p.join([timeout]):主線程等待p終止(強調:是主線程處於等的狀態,而p是處於運行的狀態)。timeout是可選的超時時間,需要強調的是,p.join只能join住start開啟的進程,而不能join住run開啟的進程

 屬性介紹:

p.daemon:默認值為False,如果設為True,代表p為后台運行的守護進程,當p的父進程終止時,p也隨之終止,並且設定為True后,p不能創建自己的新進程,必須在p.start()之前設置

p.name:進程的名稱

p.pid:進程的pid

3,Process類的使用

  注意:在windows中Process()必須放到# if __name__ == '__main__':下

Since Windows has no fork, the multiprocessing module starts a new Python process and imports the calling module. 
If Process() gets called upon import, then this sets off an infinite succession of new processes (or until your machine runs out of resources). 
This is the reason for hiding calls to Process() inside

if __name__ == "__main__"
since statements inside this if-statement will not get called upon import.
由於Windows沒有fork,多處理模塊啟動一個新的Python進程並導入調用模塊。 
如果在導入時調用Process(),那么這將啟動無限繼承的新進程(或直到機器耗盡資源)。 
這是隱藏對Process()內部調用的原,使用if __name__ == “__main __”,這個if語句中的語句將不會在導入時被調用。
詳細解釋

創建並開啟子進程的兩種方式

方式一

# 方式一:
from multiprocessing import Process  # 可以開啟發起子進程調用
import time


def task(name):
    print('%s is running' % name)
    time.sleep(1)
    print('%s is done' % name)


if __name__ == '__main__':
    # Process(target=task,kwargs={'name':'子進程1'})  # kwargs可以按照字典的方式傳參數,args按照位置的方式傳參數
    p = Process(target=task, args=('子進程1',))  # task后不加括號,加括號立馬就會執行。args括號里面必須加逗號,組成一個元組
    p.start()  # 僅僅只是給操作系統發送了一個信號,由操作系統將父進程地址空間中的數據拷貝給子進程,作為子進程運行的初始狀態,開啟后再運行task

    print('主')

# 執行結果因為執行進程會有一段時間,所以先做打印操作
import time
import random
from multiprocessing import Process

def piao(name):
    print('%s piaoing' %name)
    time.sleep(random.randrange(1,5))
    print('%s piao end' %name)

if __name__ == '__main__':
    #實例化得到四個對象
    p1=Process(target=piao,args=('egon',)) #必須加,號
    p2=Process(target=piao,args=('alex',))
    p3=Process(target=piao,args=('wupeqi',))
    p4=Process(target=piao,args=('yuanhao',))

    #調用對象下的方法,開啟四個進程
    p1.start()
    p2.start()
    p3.start()
    p4.start()
    print('')
方式一 例二

方式二

# 方式二
from multiprocessing import Process
import time
class MyProcess(Process):
    def __init__(self, name):
        super().__init__()  # 將父類的功能進行重用
        self.name = name

    def run(self):  # 這里一定要用run,下面start將調用這個run
        print('%s is running' % self.name)
        time.sleep(3)
        print('%s is done' % self.name)

if __name__ == '__main__':
    p = MyProcess('子進程1')
    p.start()  # 觸發上面的run方法
    print('主')
import time
import random
from multiprocessing import Process

class Piao(Process):
    def __init__(self,name):
        super().__init__()
        self.name=name
    def run(self):
        print('%s piaoing' %self.name)

        time.sleep(random.randrange(1,5))
        print('%s piao end' %self.name)

if __name__ == '__main__':
    #實例化得到四個對象
    p1=Piao('egon')
    p2=Piao('alex')
    p3=Piao('wupeiqi')
    p4=Piao('yuanhao')

    #調用對象下的方法,開啟四個進程
    p1.start() #start會自動調用run
    p2.start()
    p3.start()
    p4.start()
    print('')
方式二 例二

4,查看進程的pid與ppid

使用pid和ppid可以分別查看子進程和父進程

from multiprocessing import Process
import time, os

def task():
    print('%s is running,parent id is <%s>' % (os.getpid(), os.getppid()))  # 查看子進程和父進程
    time.sleep(3)
    print('%s is done,parent id is <%s>' % (os.getpid(), os.getppid()))

if __name__ == '__main__':
    p = Process(target=task,)
    p.start()

    print('主', os.getpid(), os.getppid())  # 查看子進程和父進程,此時父進程為pycharm

###
主 7032 8916
14468 is running,parent id is <7032>
14468 is done,parent id is <7032>

5,Process對象的其他屬性

 join方法

  在主進程運行過程中如果想並發地執行其他的任務,我們可以開啟子進程,此時主進程的任務與子進程的任務分兩種情況

情況一:在主進程的任務與子進程的任務彼此獨立的情況下,主進程的任務先執行完畢后,主進程還需要等待子進程執行完畢,然后統一回收資源。

情況二:如果主進程的任務在執行到某一個階段時,需要等待子進程執行完畢后才能繼續執行,就需要有一種機制能夠讓主進程檢測子進程是否運行完畢,在子進程執行完畢后才繼續執行,否則一直在原地阻塞,這就是join方法的作用

# join方法
from multiprocessing import Process
import time,os

def task():
    print('%s is running,parent id is <%s>' % (os.getpid(), os.getppid()))
    time.sleep(3)
    print('%s is done,parent id is <%s>' % (os.getpid(), os.getppid()))

if __name__ == '__main__':
    p = Process(target=task,)
    p.start()

    p.join()  # 加入join方法后一定會等到子進程結束以后才會執行主進程
    print('主', os.getpid(), os.getppid())
    print(p.pid)  # 驗證了存在僵屍進程

###
5064 is running,parent id is <5796>
5064 is done,parent id is <5796>
主 5796 8916
5064

 join並發

from multiprocessing import Process
import time,os

def task(name,n):
    print('%s is running' % name)
    time.sleep(n)


if __name__ == '__main__':
    start = time.time()
    p1 = Process(target=task, args=('子進程1', 3))  # args根據位置傳參
    p2 = Process(target=task, args=('子進程2', 1))
    p3 = Process(target=task, args=('子進程3', 2))
    p_l = [p1, p2, p3]

    p1.start()
    p2.start()
    p3.start()

    p1.join()  # 這三個仍然是並發執行,只是等待最長的程序執行完才結束
    p2.join()
    p3.join()

    print('', (time.time()-start))

###
子進程1 is running
子進程2 is running
子進程3 is running
主 3.125457525253296
join 並發

 join串行

from multiprocessing import Process
import time,os

def task(name,n):
    print('%s is running' % name)
    time.sleep(n)


if __name__ == '__main__':
    start = time.time()
    p1 = Process(target=task, args=('子進程1', 3))  # args根據位置傳參
    p2 = Process(target=task, args=('子進程2', 1))
    p3 = Process(target=task, args=('子進程3', 2))
    p_l = [p1, p2, p3]

from multiprocessing import Process
import time,os

def task(name,n):
    print('%s is running' % name)
    time.sleep(n)


if __name__ == '__main__':
    start = time.time()
    p1 = Process(target=task, args=('子進程1', 3))  # args根據位置傳參
    p2 = Process(target=task, args=('子進程2', 1))
    p3 = Process(target=task, args=('子進程3', 2))
    p_l = [p1, p2, p3]

    p1.start()  # 每個都是執行完以后再進行下一步
    p1.join()  
    p2.start()
    p2.join()
    p3.start()
    p3.join()

    print('', (time.time()-start))

###
子進程1 is running
子進程2 is running
子進程3 is running
主 6.322813510894775
join 串行

 產看子進程存活狀態alive及取名name

from multiprocessing import Process
import time, os

def task():
    print('%s is running,parent id is <%s>' % (os.getpid(), os.getppid()))
    time.sleep(3)
    print('%s is done,parent id is <%s>' % (os.getpid(), os.getppid()))


if __name__ == '__main__':
    p = Process(target=task,)
    p.start()
    # print(p.is_alive())  # True
    p.join()
    print('', os.getpid(), os.getppid())
    print(p.pid)
    # print(p.is_alive())  # Flase


    p = Process(target=task,name='sub——Precsss')
    p.start()
    p.terminate()  # 這里僅僅是給系統發要求讓p死掉,但是是需要時間的,所以立即執行is_alive則進程還是活的
    time.sleep(3)
    print(p.is_alive())
    print('')
    print(p.name)  # 如果沒有,則默認位p取名,這里已經取名為sub——Precsss

###
7600 is running,parent id is <16108>
7600 is done,parent id is <16108>16108 11268
7600
False
主
sub——Precsss
查看子進程存活狀態及取名

 進程之間的內存空間是隔離的

from multiprocessing import Process

n = 100  # 在windows系統中應該把全局變量定義在if __name__ == '__main__'之上就可以了

def work():
    global n
    n = 0
    print('子進程內: ', n)


if __name__ == '__main__':
    p = Process(target=work)
    p.start()
    print('主進程內: ', n)

###
主進程內:  100
子進程內:  0
進程之間的內存空間是隔離的

6,守護進程

主進程創建子進程,然后將該進程設置成守護自己的進程,守護進程就好比崇禎皇帝身邊的老太監,崇禎皇帝已死老太監就跟着殉葬了。

關於守護進程需要強調兩點:

其一:守護進程會在主進程代碼執行結束后就終止

其二:守護進程內無法再開啟子進程,否則拋出異常:AssertionError: daemonic processes are not allowed to have children

如果我們有兩個任務需要並發執行,那么開一個主進程和一個子進程分別去執行就ok了,如果子進程的任務在主進程任務結束后就沒有存在的必要了,那么該子進程應該在開啟前就被設置成守護進程。主進程代碼運行結束,守護進程隨即終止

from multiprocessing import Process
import time

def task(name):
    print('%s is running' %name)
    time.sleep(2)
    # p = Process(target=time.sleep,args=(3,))  # 子進程不允許開啟子進程,否則報錯
    # p.start()


if __name__ == '__main__':
    p = Process(target=task, args=('子進程1',))
    p.daemon = True  #一定要在p.start()前設置,設置p為守護進程,禁止p創建子進程,並且父進程代碼執行結束,p即終止運行
    p.start()
    # p.join()  # 此處如果沒有join,則還沒等子進程開啟,就隨着主進程的結束而結束了
    print('主')  #只要終端打印出這一行內容,那么守護進程p也就跟着結束掉了

###
主

 主進程結束,只會讓守護進程跟着結束,但是其他子進程還是會繼續運行

from multiprocessing import Process

import time
def foo():
    print(123)
    time.sleep(1)
    print("end123")

def bar():
    print(456)
    time.sleep(3)
    print("end456")

if __name__ == '__main__':
    p1 = Process(target=foo)
    p2 = Process(target=bar)

    p1.daemon = True
    p1.start()
    p2.start()
    print("main-------")

###
main-------
456
end456
example

7,互斥鎖

 保證數據輸出不會錯亂

未加互斥鎖

from multiprocessing import Process
import time


def task(name):
    print('%s 1' % name)
    time.sleep(1)
    print('%s 2' % name)
    time.sleep(1)
    print('%s 3' % name)


if __name__ == '__main__':
    for i in range(3):
        p = Process(target=task, args=('進程%s' % i,))
        p.start()

###由於大家爭相搶輸出這個資源,導致數據輸出錯亂,但是卻輸出很快,效率高
進程0 1
進程1 1
進程2 1
進程0 2
進程1 2
進程2 2
進程0 3
進程1 3
進程2 3

 添加互斥鎖

from multiprocessing import Process, Lock
import time


def task(name, mutex):
    mutex.acquire()
    print('%s 1' % name)
    time.sleep(1)
    print('%s 2' % name)
    time.sleep(1)
    print('%s 3' % name)
    mutex.release()


if __name__ == '__main__':
    mutex = Lock()
    for i in range(3):
        p = Process(target=task, args=('進程%s' % i, mutex))  # 在這添加互斥鎖,使得子進程使用的是父進程傳承下來的那把鎖,而不是copy過來的
        p.start()

### 此時加上了互斥鎖,雖然執行效率沒有之前那么高,但是卻確保了執行順序
進程0 1
進程0 2
進程0 3
進程1 1
進程1 2
進程1 3
進程2 1
進程2 2
進程2 3

 模擬搶票過程

模擬搶票過程,需要通過互斥鎖Lock達到串行執行,確保只能有一個人才能購票成功,保證了數據安全

json模塊

from multiprocessing import Process, Lock
import json
import time


def search(name):
    time.sleep(1)
    dic = json.load(open('db.txt', 'r', encoding='utf-8'))
    print('<%s> 查看到剩余票數【%s】' % (name, dic['count']))


def get(name):
    time.sleep(1)
    dic = json.load(open('db.txt', 'r', encoding='utf-8'))
    if dic['count'] > 0:
        dic['count'] -= 1
        time.sleep(3)
        json.dump(dic, open('db.txt', 'w', encoding='utf-8'))
        print('<%s> 購票成功' % name)


def task(name, mutex):
    search(name)  # 查票操作每個人都可以執行,並且是並發執行的
    mutex.acquire()  # 在這里可以添加互斥鎖,達到串行執行,使得只能有一個人購票成功
    get(name)
    mutex.release()


if __name__ == '__main__':
    mutex = Lock()
    for i in range(10):
        p = Process(target=task,args=('路人%s' % i, mutex))
        p.start()

###  此時db.txt數據為{"count": 2}
<路人0> 查看到剩余票數【2】
<路人4> 查看到剩余票數【2】
<路人3> 查看到剩余票數【2】
<路人1> 查看到剩余票數【2】
<路人2> 查看到剩余票數【2】
<路人6> 查看到剩余票數【2】
<路人5> 查看到剩余票數【2】
<路人7> 查看到剩余票數【2】
<路人9> 查看到剩余票數【2】
<路人8> 查看到剩余票數【2】
<路人0> 購票成功
<路人4> 購票成功

8,互斥鎖與join

利用互斥鎖,可以通過添加互斥鎖的位置,實現部分程序執行達到串行的效果,其他程序仍然可以並行執行,而添加了join只能執行完了之后才能執行下一個,若作為每項都添加一個join,則都要串行執行。從而大大降低了效率。

from multiprocessing import Process, Lock
import json
import time


def search(name):
    time.sleep(1)
    dic = json.load(open('db.txt', 'r', encoding='utf-8'))
    print('<%s> 查看到剩余票數【%s】' % (name, dic['count']))


def get(name):
    time.sleep(1)
    dic = json.load(open('db.txt', 'r', encoding='utf-8'))
    if dic['count'] > 0:
        dic['count'] -= 1
        time.sleep(3)
        json.dump(dic, open('db.txt', 'w', encoding='utf-8'))
        print('<%s> 購票成功' % name)
    else:
        print('<%s> 購票失敗' % name)


def task(name,):
    search(name)
    # mutex.acquire()
    get(name)
    # mutex.release()


if __name__ == '__main__':
    # mutex=Lock()
    for i in range(10):
        p = Process(target=task, args=('路人%s' % i,))
        p.start()
        p.join()
<路人0> 查看到剩余票數【3<路人0> 購票成功
<路人1> 查看到剩余票數【2<路人1> 購票成功
<路人2> 查看到剩余票數【1<路人2> 購票成功
<路人3> 查看到剩余票數【0】
<路人3> 購票失敗
<路人4> 查看到剩余票數【0】
<路人4> 購票失敗
<路人5> 查看到剩余票數【0】
<路人5> 購票失敗
<路人6> 查看到剩余票數【0】
<路人6> 購票失敗
<路人7> 查看到剩余票數【0】
<路人7> 購票失敗
<路人8> 查看到剩余票數【0】
<路人8> 購票失敗
<路人9> 查看到剩余票數【0】
<路人9> 購票失敗
執行結果

9,隊列的使用

 解決大家共享硬盤文件,效率低以及使用內存解決加鎖這個繁雜的步驟

 multiprocessing模塊提供了IPC(internet process communicate)進程之間的通信,隊列以及管道,這兩種方式都是使用消息傳遞的,隊列就是管道加鎖的實現

 創建隊列的類(底層就是以管道和鎖定的方式實現):

 Queue([maxsize]):創建共享的進程隊列,Queue是多進程安全的隊列,可以使用Queue實現多進程之間的數據傳遞。

 參數介紹:

 maxsize是隊列中允許最大項數,可以放置任意類型的數據,省略則無大小限制。

 但需要明確:

  1、隊列內存放的是消息而非大數據

  2、隊列占用的是內存空間,因而maxsize即便是無大小限制也受限於內存大小

 主要方法介紹:

  q.put方法用以插入數據到隊列中。數據不宜過大。

  q.get方法可以從隊列讀取並且刪除一個元素。

隊列的使用

from multiprocessing import Process,Queue

q=Queue(3)

#put ,get ,put_nowait,get_nowait,full,empty
q.put(1)
q.put(2)
q.put(3)
print(q.full())  # 判斷是否滿了
# q.put(4) #再放就阻塞住了

print(q.get())
print(q.get())
print(q.get())
print(q.empty())  # 判斷是否空了
# print(q.get()) #再取就阻塞住了

10,生產者消費者模型

 1,兩者的介紹

為什么要使用生產者消費者模型

生產者指的是生產數據的任務,消費者指的是處理數據的任務,在並發編程中,如果生產者處理速度很快,而消費者處理速度很慢,那么生產者就必須等待消費者處理完,才能繼續生產數據。同樣的道理,如果消費者的處理能力大於生產者,那么消費者就必須等待生產者。為了解決這個問題於是引入了生產者和消費者模式。

什么是生產者和消費者模式

生產者消費者模式是通過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通訊,而通過阻塞隊列來進行通訊,所以生產者生產完數據之后不用等待消費者處理,直接扔給阻塞隊列,消費者不找生產者要數據,而是直接從阻塞隊列里取,阻塞隊列就相當於一個緩沖區,平衡了生產者和消費者的處理能力。

這個阻塞隊列就是用來給生產者和消費者解耦的

 2,兩者的實現

from multiprocessing import Process, Queue
import time

def producer(q, name):
    for i in range(3):
        res = '包子%s' % i
        time.sleep(0.5)
        print('%s 生產了%s' % (name, res))

        q.put(res)

def consumer(q, name):
    while True:
        res = q.get()
        if res is None: break
        time.sleep(1)
        print('%s 吃了%s' % (name, res))


if __name__ == '__main__':
    # 容器
    q = Queue()

    # 生產者們
    p1 = Process(target=producer, args=(q, '生產者1'))
    p2 = Process(target=producer, args=(q, '生產者2'))
    p3 = Process(target=producer, args=(q, '生產者3'))

    # 消費者們
    c1 = Process(target=consumer, args=(q, '消費者1'))
    c2 = Process(target=consumer, args=(q, '消費者2'))

    p1.start()
    p2.start()
    p3.start()
    c1.start()
    c2.start()

    p1.join()
    p2.join()
    p3.join()
    q.put(None)
    q.put(None)
    print('主')
###
生產者1 生產了包子0
生產者2 生產了包子0
生產者3 生產了包子0
生產者1 生產了包子1
生產者2 生產了包子1
生產者3 生產了包子1
生產者1 生產了包子2
消費者1 吃了包子0
生產者2 生產了包子2
消費者2 吃了包子0
生產者3 生產了包子2
主
消費者1 吃了包子0
消費者2 吃了包子1
消費者1 吃了包子1
消費者2 吃了包子1
消費者1 吃了包子2
消費者2 吃了包子2
消費者1 吃了包子2
運行結果

 

 3,消費者生產者總結

  1、程序中有兩類角色

    一類負責生產數據(生產者)
    一類負責處理數據(消費者)
  2、引入生產者消費者模型為了解決的問題是

    平衡生產者與消費者之間的速度差
    程序解開耦合
  3、如何實現生產者消費者模型

    生產者<--->隊列<--->消費者

3,JoinableQueue 的使用

這就像是一個Queue對象,但隊列允許項目的使用者通知生成者項目已經被成功處理。通知進程是使用共享的信號和條件變量來實現的。

參數介紹

maxsize是隊列中允許最大項數,省略則無大小限制。

方法介紹

JoinableQueue的實例p除了與Queue對象相同的方法之外還具有:
q.task_done():使用者使用此方法發出信號,表示q.get()的返回項目已經被處理。如果調用此方法的次數大於從隊列中刪除項目的數量,將引發ValueError異常
q.join():生產者調用此方法進行阻塞,直到隊列中所有的項目均被處理。阻塞將持續到隊列中的每個項目均調用q.task_done()方法為止

基於JoinableQueue實現生產者消費者模型

from multiprocessing import Process, JoinableQueue
import time


def producer(q):
    for i in range(2):
        res = '包子%s' % i
        time.sleep(0.5)
        print('生產者生產了%s' % res)

        q.put(res)
    q.join()


def consumer(q):
    while True:
        res = q.get()
        if res is None: break
        time.sleep(1)
        print('消費者吃了%s' % res)
        q.task_done()  # 發出信號項目已處理完成,省去了后面需另外添加的q.put(None)


if __name__ == '__main__':
    # 容器
    q = JoinableQueue()

    # 生產者們
    p1 = Process(target=producer, args=(q,))
    p2 = Process(target=producer, args=(q,))
    p3 = Process(target=producer, args=(q,))

    # 消費者們
    c1 = Process(target=consumer, args=(q,))
    c2 = Process(target=consumer, args=(q,))
    c1.daemon = True  # 若不添加守護進程,則會卡在消費者
    c2.daemon = True

    p1.start()
    p2.start()
    p3.start()
    c1.start()
    c2.start()

    p1.join()
    p2.join()
    p3.join()
    print('主')
生產者生產了包子0
生產者生產了包子0
生產者生產了包子0
生產者生產了包子1
生產者生產了包子1
生產者生產了包子1
消費者吃了包子0
消費者吃了包子0
消費者吃了包子0
消費者吃了包子1
消費者吃了包子1
消費者吃了包子1
主
result

 

 

4,多線程

1,什么是線程

在傳統操作系統中,每個進程有一個地址空間,而且默認就有一個控制線程

線程顧名思義,就是一條流水線工作的過程(流水線的工作需要電源,電源就相當於cpu),而一條流水線必須屬於一個車間,一個車間的工作過程是一個進程,車間負責把資源整合到一起,是一個資源單位,而一個車間內至少有一條流水線。

所以,進程只是用來把資源集中到一起(進程只是一個資源單位,或者說資源集合),而線程才是cpu上的執行單位。

 

多線程(即多個控制線程)的概念是,在一個進程中存在多個線程,多個線程共享該進程的地址空間,相當於一個車間內有多條流水線,都共用一個車間的資源。例如,北京地鐵與上海地鐵是不同的進程,而北京地鐵里的13號線是一個線程,北京地鐵所有的線路共享北京地鐵所有的資源,比如所有的乘客可以被所有線路拉。

2,開啟線程的兩種方式

方式一:

import time
import random
from threading import Thread


def piao(name):
    print('%s running' % name)
    time.sleep(random.randrange(1, 3))
    print('%s run end' % name)


if __name__ == '__main__':
    t1 = Thread(target=piao, args=('qiuma',))
    t1.start()
    print('主線程')

方式二:

import time
import random
from threading import Thread


class MyThread(Thread):
    def __init__(self, name):
        super().__init__()
        self.name = name

    def run(self):
        print('%s running' % self.name)

        time.sleep(random.randrange(1, 3))
        print('%s run end' % self.name)


if __name__ == '__main__':
    t1 = MyThread('quima')
    t1.start()
    print('主線程')

3,多線程與多進程的區別

 ①開啟速度

   在主進程下開啟線程

import time
from threading import Thread
from multiprocessing import Process


def run(name):
    print('%s running' % name)
    time.sleep(2)
    print('%s run end' % name)


if __name__ == '__main__':
    t1 = Thread(target=run, args=('qiuma',))
    t1.start()
    print('主線程')

  執行結果表明,幾乎是t.start ()的同時就將線程開啟了,然后先打印出了qiuma running,證明線程的創建開銷極小

qiuma running
主線程
qiuma run end

  在主進程下開啟子進程

import time
from threading import Thread
from multiprocessing import Process


def piao(name):
    print('%s running' % name)
    time.sleep(1)
    print('%s run end' % name)


if __name__ == '__main__':
    p1 = Process(target=piao, args=('qiuma',))
    p1.start()

    # t1 = Thread(target=piao, args=('qiuma',))
    # t1.start()
    print('主進程')

  執行結果表明p.start ()將開啟進程的信號發給操作系統后,操作系統要申請內存空間,讓好拷貝父進程地址空間到子進程,開銷遠大於線程

主進程
qiuma running
qiuma run end

 ②pid的不同

   1、在主進程下開啟多個線程,每個線程都跟主進程的pid一樣【pid(process id:進程的id號)】

from threading import Thread
import os


def task():  # 此處一定為task
    print('子線程:%s' % (os.getpid()))  # 一個進程內多個線程是平級的


if __name__ == '__main__':
    t1 = Thread(target=task,)
    t2 = Thread(target=task,)
    t1.start()
    t2.start()
    
    print('主進程:', os.getpid())

# 執行結果,主線程和子線程id都是一個值

執行結果

子線程:14912
子線程:14912
主線程:14912

  2,開多個進程,每個進程都有不同的pid

from threading import Thread
from multiprocessing import Process
import os


def task():
    print('子進程PID:%s  父進程的PID:%s' % (os.getpid(), os.getppid()))  # 查看父進程和子進程的id


if __name__ == '__main__':
    p1 = Process(target=task,)
    p1.start()

    print('主進程', os.getpid())

  執行結果

主進程 13580
子進程PID:3084  父進程的PID:13580

  3,同一進程內的多個線程共享該進程的地址空間,父進程與子進程不共享地址空間

     進程之間的地址空間是隔離的

from threading import Thread
from multiprocessing import Process

n = 100
def task():
    global n
    n = 0


if __name__ == '__main__':
    p1 = Process(target=task,)
    p1.start()
    p1.join()

    print('主進程', n)

###
主進程 100

  根據執行結果,毫無疑問子進程p已經將自己的全局的n改成了0,但改的僅僅是它自己的,查看父進程的n仍然為100

     同一進程內開啟的多個線程是共享該進程地址空間的

from threading import Thread
from multiprocessing import Process

n = 100
def task():
    global n
    n = 0


if __name__ == '__main__':
    t1 = Thread(target=task,)
    t1.start()
    t1.join()

    print('主線程', n)

###
主線程 0

  根據執行結果,查看結果為0,因為同一進程內的線程之間共享進程內的數據

 ③總結兩者的區別

    1,啟動線程的速度要比啟動進程的速度快很多,啟動進程的開銷更大

     2,在主進程下面開啟的多個線程,每個線程都和主進程的pid(進程的id)一致

     3,在主進程下開啟多個子進程,每個進程都有不一樣的pid

     4,同一進程內的多個線程共享該進程的地址空間

    5,父進程與子進程不共享地址空間,表明進程之間的地址空間是隔離的

4,Thread對象的其他屬性

Thread實例對象的方法

  isAlive(): 返回線程是否活動的。
  getName(): 返回線程名。
  setName(): 設置線程名。

threading模塊提供的一些方法:

  threading.currentThread(): 返回當前的線程變量。
  threading.enumerate(): 返回一個包含正在運行的線程的list。正在運行指線程啟動后、結束前,不包括啟動前和終止后的線程。
  threading.activeCount(): 返回正在運行的線程數量,與len(threading.enumerate())有相同的結果。

from threading import Thread, currentThread, active_count, enumerate
import time


def task():
    print('%s is ruuning' % currentThread().getName())  # 獲取當前線程的名字
    time.sleep(2)
    print('%s is done' % currentThread().getName())


if __name__ == '__main__':
    t = Thread(target=task, name='子線程1')
    t.start()
    t.setName('兒子線程1')  # 改子線程名字
    t.join()
    print(t.getName())  # 獲取子線程名字
    currentThread().setName('主線程')  # 修改主線程名字
    print(t.isAlive())  # 查看線程是否存活


    print('主線程',currentThread().getName())  # 查看主線程名字

    t.join()
    print(active_count())  # 查看活躍的線程數
    print(enumerate())  # 將當前活躍的線程顯示出來

###
子線程1 is ruuning
兒子線程1 is done
兒子線程1
False
主線程 主線程
1
[<_MainThread(主線程, started 14480)>]

5,守護線程

無論是進程還是線程,都遵循:守護xxx會等待主xxx運行完畢后被銷毀

需要強調的是:運行完畢並非終止運行

  1、對主進程來說,運行完畢指的是主進程代碼運行完畢

  2、對主線程來說,運行完畢指的是主線程所在的進程內所有非守護線程統統運行完畢,主線程才算運行完畢

 詳細解釋

  1、主進程在其代碼結束后就已經算運行完畢了(守護進程在此時就被回收),然后主進程會一直等非守護的子進程都運行完畢后回收子進程的資源(否則會產生僵屍進程),才會結束,

  2、主線程在其他非守護線程運行完畢后才算運行完畢(守護線程在此時就被回收)。因為主線程的結束意味着進程的結束,進程整體的資源都將被回收,而進程必須保證非守護線程都運行完畢后才能結束。

 驗證一:當主線程結束的時候,守護線程也跟着結束了

from threading import Thread
import time


def sayhi(name):
    time.sleep(2)
    print('%s say hello' % name)  # 因為主進程結束以后,主線程也跟着結束,然后守護線程還沒有打印,也跟着死了


if __name__ == '__main__':
    t = Thread(target=sayhi, args=('qiuma',))
    # t.setDaemon(True) #必須在t.start()之前設置
    t.daemon = True  # 這種方式和上面代碼方式設置守護線程效果一樣
    t.start()

    print('主線程')
    print(t.is_alive())

###結果不會打印say hello字樣
主線程
True

 驗證二:只要是有其他非守護線程還沒有運行完畢,守護線程就不會被回收,進程只有當非守護線程都運行完畢才會結束

from threading import Thread
import time


def foo():
    print(123)
    time.sleep(0.5)
    print("end123")


def bar():
    print(456)
    time.sleep(1)
    print("end456")


if __name__ == '__main__':
    t1 = Thread(target=foo)
    t2 = Thread(target=bar)

    t1.daemon = True
    t1.start()
    t2.start()
    print("main-------")

###
123
456
main-------
end123
end456

6,互斥鎖

對於進程的互斥鎖來說,將並行變為了串行,犧牲了效率,保證了安全

對於線程來說,一個進程內的多個線程是共享彼此的地址空間的,因此彼此之間數據也是共享的,由此代來的競爭可能將數據改亂。

from threading import Thread, Lock
import time

n = 100


def task():
    global n
    mutex.acquire()  # 開始加鎖
    temp = n
    time.sleep(0.1)  # 未加鎖之前,100個線程都停留在這並且temp都等於100
    n = temp-1
    mutex.release()  # 解鎖


if __name__ == '__main__':
    mutex = Lock()
    t_l = []
    for i in range(100):
        t = Thread(target=task)
        t_l.append(t)
        t.start()

    for t in t_l:
        t.join()

    print('主', n)

###
主 0

7,GIL全局解釋器鎖

 GIL(Global Interpreter Lock)在Cpython解釋器中,同一個進程下開啟的多線程,同一時刻只能有一個線程執行,無法利用多核優勢

首先需要明確的一點是GIL並不是Python的特性,它是在實現Python解析器(CPython)時所引入的一個概念。就好比C++是一套語言(語法)標准,但是可以用不同的編譯器來編譯成可執行代碼。>有名的編譯器例如GCC,INTEL C++,Visual C++等。Python也一樣,同樣一段代碼可以通過CPython,PyPy,Psyco等不同的Python執行環境來執行。像其中的JPython就沒有GIL。然而因為CPython是大部分環境下默認的Python執行環境。所以在很多人的概念里CPython就是Python,也就想當然的把GIL歸結為Python語言的缺陷。所以這里要先明確一點:GIL並不是Python的特性,Python完全可以不依賴於GIL
進一步解釋

 

 1,GIL介紹

GIL本質就是一把互斥鎖,既然是互斥鎖,所有互斥鎖的本質都一樣,都是將並發運行變成串行,以此來控制同一時間內共享數據只能被一個任務所修改,進而保證數據安全。

可以肯定的一點是:保護不同的數據的安全,就應該加不同的鎖。

首先確定一點:每次執行python程序,都會產生一個獨立的進程。

驗證python test.py只會產生一個進程

#test.py內容
import os,time
print(os.getpid())
time.sleep(1000)

#打開終端執行
python3 test.py

#在windows下查看
tasklist |findstr python

#在linux下下查看
ps aux |grep python

  在一個python的進程內,不僅有test.py的主線程或者由該主線程開啟的其他線程,還有解釋器開啟的垃圾回收等解釋器級別的線程,總之,所有線程都運行在這一個進程內,毫無疑問

 

  1、所有數據都是共享的,這其中,代碼作為一種數據也是被所有線程共享的(test.py的所有代碼以及Cpython解釋器的所有代碼) 例如:test.py定義一個函數work(代碼內容如下圖),在進程內所有線程都能訪問到work的代碼,於是我們可以開啟三個線程然后target都指向該代碼,能訪問到意味着就是可以執行。

  2、所有線程的任務,都需要將任務的代碼當做參數傳給解釋器的代碼去執行,即所有的線程要想運行自己的任務,首先需要解決的是能夠訪問到解釋器的代碼。 

綜上:

如果多個線程的target=work,那么執行流程是

多個線程先訪問到解釋器的代碼,即拿到執行權限,然后將target的代碼交給解釋器的代碼去執行

   解釋器的代碼是所有線程共享的,所以垃圾回收線程也可能訪問到解釋器的代碼而去執行,這就導致了一個問題:對於同一個數據100,可能線程1執行x=100的同時,而垃圾回收執行的是回收100的操作,解決這種問題沒有什么高明的方法,就是加鎖處理,如下圖的GIL,保證python解釋器同一時間只能執行一個任務的代碼

 

 在Cpython解釋器當中要利用多核優勢,只能是開多進程,每個進程開一個線程去執行

 2,GIL與Lock

  鎖的目的是為了保護共享的數據,同一時間只能有一個線程來修改共享的數據

  然后,我們可以得出結論:保護不同的數據就應該加不同的鎖。

 GIL 與Lock是兩把鎖,保護的數據不一樣,前者是解釋器級別的(當然保護的就是解釋器級別的數據,比如垃圾回收的數據),后者是保護用戶自己開發的應用程序的數據,很明顯GIL不負責這件事,只能用戶自定義加鎖處理,即Lock,如下圖

  總結:GIL保證一個進程內的多個線程同一時間只能有一個執行,從而保證了python垃圾回收的線程安全,多個線程只能有一個運行。不同的數據,應該加上不同的鎖,解釋器級別的GIL 鎖只能保護解釋器級別的數據,用戶自己開發的應用程序的數據需要自己加上另外一把鎖來去保護。工作流程是線程們首先搶的不是mutex鎖,而是GIL鎖,將GIL當成是一種執行權限。

 3,GIL與多線程

  有了GIL的存在,同一時刻同一進程中只有一個線程被執行,進程可以利用多核,但是開銷大,而python的多線程開銷小,但卻無法利用多核優勢。

  那么如何解決這個問題呢?

首先: 1、cpu到底是用來做計算的,還是用來做I/O的?cpu是用來做計算的

    2、多cpu,意味着可以有多個核並行完成計算,所以多核提升的是計算性能

    3、每個cpu一旦遇到I/O阻塞,仍然需要等待,所以多核對I/O操作沒什么用處

1、對計算來說,cpu越多越好,但是對於I/O來說,再多的cpu也沒用
2、當然對運行一個程序來說,隨着cpu的增多執行效率肯定會有所提高(不管提高幅度多大,總會有所提高),這是因為一個程序基本上不會是純計算或者純I/O,所以我們只能相對的去看一個程序到底是計算密集型還是I/O密集型,從而進一步分析python的多線程到底有無用武之地

 

現在的計算機基本上都是多核,python對於計算密集型的任務開多線程的效率並不能帶來多大性能上的提升,甚至不如串行(沒有大量切換),但是,對於IO密集型的任務效率還是有顯著提升的。

 4,多線程性能測試

  如果並發的多個任務是計算密集型:多進程效率高

from multiprocessing import Process
from threading import Thread
import os,time
def work():
    res=0
    for i in range(100000000):
        res*=i


if __name__ == '__main__':
    l=[]
    print(os.cpu_count())  # 打印本機的核心數
    start=time.time()
    for i in range(4):
        # p=Process(target=work) #耗時5s多  # 顯示多進程程序的執行時間
        p=Thread(target=work) #耗時18s多  # 顯示多線程程序執行的時間
        l.append(p)
        p.start()
    for p in l:
        p.join()
    stop=time.time()
    print('run time is %s' %(stop-start))

  如果並發的多個任務是I/O密集型:多線程效率高

from multiprocessing import Process
from threading import Thread
import threading
import os,time
def work():
    time.sleep(2)
    print('===>')

if __name__ == '__main__':
    l=[]
    print(os.cpu_count()) #本機為4核
    start=time.time()
    for i in range(400):
        # p=Process(target=work) #耗時12s多,大部分時間耗費在創建進程上
        p=Thread(target=work) #耗時2s多
        l.append(p)
        p.start()
    for p in l:
        p.join()
    stop=time.time()
    print('run time is %s' %(stop-start))

應用:

多線程用於IO密集型,如socket,爬蟲,web
多進程用於計算密集型,如金融分析

 5,死鎖與遞歸鎖

 1,死鎖現象

   所謂死鎖: 是指兩個或兩個以上的進程或線程在執行過程中,因爭奪資源而造成的一種互相等待的現象,若無外力作用,它們都將無法推進下去。此時稱系統處於死鎖狀態或系統產生了死鎖,這些永遠在互相等待的進程稱為死鎖進程,如下就是死鎖:

# 死鎖
from threading import Thread, Lock
import time

mutexA = Lock()
mutexB = Lock()


class MyThread(Thread):
    def run(self):
        self.f1()
        self.f2()

    def f1(self):
        mutexA.acquire()
        print('%s 拿到了A鎖' % self.name)

        mutexB.acquire()
        print('%s 拿到了B鎖' % self.name)
        mutexB.release()

        mutexA.release()


    def f2(self):
        mutexB.acquire()
        print('%s 拿到了B鎖' % self.name)
        time.sleep(0.1)

        mutexA.acquire()
        print('%s 拿到了A鎖' % self.name)
        mutexA.release()

        mutexB.release()


if __name__ == '__main__':
    for i in range(10):
        t = MyThread()
        t.start()

###
Thread-1 拿到了A鎖
Thread-1 拿到了B鎖
Thread-1 拿到了B鎖
Thread-2 拿到了A鎖
# 出現了死鎖,整個程序阻塞住

  由於線程的開啟消耗非常小,所以啟動速度很快,這時線程一無競爭者,拿到了A鎖,緊接着拿到了B鎖,當兩個鎖釋放以后,線程一繼續拿到了A鎖,這時其他進程還在搶f1中的A鎖,而進程一已經進入f2中拿到了B鎖,就在此時,經過休息0.1s的時候,線程二在f1中拿到了A鎖,需要繼續拿B鎖的時候,此時B鎖卻在f2中被線程一拿着,剛好線程一此時又需要A鎖,就這樣,線程一拿到了B鎖,需要A鎖,線程二拿着A鎖,需要B鎖,然后程序就在這里卡住不能執行下去了

 2,解決死鎖,遞歸鎖

 遞歸鎖出現的問題就是,不同進程或者線程因爭奪資源會出現相互等待的現象。

而遞歸鎖可以解決此類問題,這個RLock內部維護着一個Lock和一個counter變量,counter記錄了acquire的次數,從而使得資源可以被多次require。直到一個線程所有的acquire都被release,其他的線程才能獲得資源。上面的例子如果使用RLock代替Lock,則不會發生死鎖,二者的區別是:遞歸鎖可以連續acquire多次,而互斥鎖只能acquire一次

# 遞歸鎖:可以連續acquire多次,每acquire一次計數器+1,只有計數為0時,才能被搶到acquire
from threading import Thread, RLock
import time

mutexB = mutexA = RLock()

class MyThread(Thread):
    def run(self):
        self.f1()
        self.f2()

    def f1(self):
        mutexA.acquire()
        print('%s 拿到了A鎖' % self.name)

        mutexB.acquire()
        print('%s 拿到了B鎖' % self.name)
        mutexB.release()

        mutexA.release()


    def f2(self):
        mutexB.acquire()
        print('%s 拿到了B鎖' % self.name)
        time.sleep(0.1)

        mutexA.acquire()
        print('%s 拿到了A鎖' % self.name)
        mutexA.release()

        mutexB.release()

if __name__ == '__main__':
    for i in range(10):
        t = MyThread()
        t.start()

 執行結果:

Thread-1 拿到了A鎖
Thread-1 拿到了B鎖
Thread-1 拿到了B鎖
Thread-1 拿到了A鎖
Thread-2 拿到了A鎖
Thread-2 拿到了B鎖
Thread-2 拿到了B鎖
Thread-2 拿到了A鎖
Thread-4 拿到了A鎖
Thread-4 拿到了B鎖
Thread-4 拿到了B鎖
Thread-4 拿到了A鎖
Thread-6 拿到了A鎖
Thread-6 拿到了B鎖
Thread-6 拿到了B鎖
Thread-6 拿到了A鎖
Thread-8 拿到了A鎖
Thread-8 拿到了B鎖
Thread-8 拿到了B鎖
Thread-8 拿到了A鎖
Thread-10 拿到了A鎖
Thread-10 拿到了B鎖
Thread-10 拿到了B鎖
Thread-10 拿到了A鎖
Thread-5 拿到了A鎖
Thread-5 拿到了B鎖
Thread-5 拿到了B鎖
Thread-5 拿到了A鎖
Thread-9 拿到了A鎖
Thread-9 拿到了B鎖
Thread-9 拿到了B鎖
Thread-9 拿到了A鎖
Thread-7 拿到了A鎖
Thread-7 拿到了B鎖
Thread-7 拿到了B鎖
Thread-7 拿到了A鎖
Thread-3 拿到了A鎖
Thread-3 拿到了B鎖
Thread-3 拿到了B鎖
Thread-3 拿到了A鎖
result

8,信號量,Event,定時器

 1,信號量

   信號量也是一把鎖,可以指定信號量為5,對比互斥鎖同一時間只能有一個任務搶到鎖去執行,信號量同一時間可以有5個任務拿到鎖去執行,如果說互斥鎖是合租房屋的人去搶一個廁所,那么信號量就相當於一群路人爭搶公共廁所,公共廁所有多個坑位,這意味着同一時間可以有多個人上公共廁所,但公共廁所容納的人數是一定的,這便是信號量的大小

from threading import Thread, Semaphore, currentThread
import time, random

sm = Semaphore(3)  # 信號量大小


def task():
    # sm.acquire()
    # print('%s in' %currentThread().getName())
    # sm.release()
    with sm:  # 上面注釋代碼可簡寫成下列代碼
        print('%s in' % currentThread().getName())
        time.sleep(random.randint(1, 2))


if __name__ == '__main__':
    for i in range(10):
        t = Thread(target=task)
        t.start()

Semaphore管理一個內置的計數器, 每當調用acquire()時內置計數器-1; 調用release() 時內置計數器+1; 計數器不能小於0;當計數器為0時,acquire()將阻塞線程直到其他線程調用release()。

 2,Event

 線程的一個關鍵特性是每個線程都是獨立運行且狀態不可預測。如果程序中的其 他線程需要通過判斷某個線程的狀態來確定自己下一步的操作,這時線程同步問題就會變得非常棘手。為了解決這些問題,我們需要使用threading庫中的Event對象。 對象包含一個可由線程設置的信號標志,它允許線程等待某些事件的發生。在 初始情況下,Event對象中的信號標志被設置為假。如果有線程等待一個Event對象, 而這個Event對象的標志為假,那么這個線程將會被一直阻塞直至該標志為真。一個線程如果將一個Event對象的信號標志設置為真,它將喚醒所有等待這個Event對象的線程。如果一個線程等待一個已經被設置為真的Event對象,那么它將忽略這個事件, 繼續執行

from threading import Event

event.isSet():返回event的狀態值;

event.wait():如果 event.isSet()==False將阻塞線程;

event.set(): 設置event的狀態值為True,所有阻塞池的線程激活進入就緒狀態, 等待操作系統調度;

event.clear():恢復event的狀態值為False。

 

from threading import Thread,Event
import time

event = Event()
# event.wait()  # 開啟等待
# event.set()  # 結束等待


def student(name):
    print('學生%s 正在聽課' % name)
    event.wait(2)
    print('學生%s 課間活動' % name)


def teacher(name):
    print('老師%s 正在授課' % name)
    time.sleep(7)
    event.set()


if __name__ == '__main__':
    stu1 = Thread(target=student,args=('alex',))
    stu2 = Thread(target=student,args=('wxx',))
    stu3 = Thread(target=student,args=('yxx',))
    t1 = Thread(target=teacher,args=('egon',))

    stu1.start()
    stu2.start()
    stu3.start()
    t1.start()

###
學生wash 正在聽課
學生wxx 正在聽課
學生yxx 正在聽課
老師qiuma 正在授課
學生yxx 課間活動
學生wash 課間活動
學生wxx 課間活動
Event模擬高中上課過程

 

 wait和set都可以自行設置時間,wait可以等待自己設置的時間

例如,有多個工作線程嘗試鏈接MySQL,我們想要在鏈接前確保MySQL服務正常才讓那些工作線程去連接MySQL服務器,如果連接不成功,都會去嘗試重新連接。那么我們就可以采用threading.Event機制來協調各個工作線程的連接操作

from threading import Thread,Event,currentThread
import time

event = Event()

def conn():
    n = 0
    while not event.is_set():  # is_set檢查是否被設置
        if n == 3:
            print('%s try too many times' % currentThread().getName())
            return
        print('%s try %s' % (currentThread().getName(), n))
        event.wait(0.5)  # 可以設置等待時間
        n += 1

    print('%s is connected' % currentThread().getName())


def check():
    print('%s is checking' % currentThread().getName())
    time.sleep(2)  # 若檢查在等待0.5秒之后還未結束,則回返回conn函數中的檢測信息
    event.set()


if __name__ == '__main__':
    for i in range(3):
        t = Thread(target=conn)
        t.start()
    t = Thread(target=check)
    t.start()

  執行結果

###
Thread-1 try 0
Thread-2 try 0
Thread-3 try 0
Thread-4 is checking
Thread-2 try 1
Thread-3 try 1
Thread-1 try 1
Thread-2 try 2
Thread-1 try 2
Thread-3 try 2
Thread-2 try too many times
Thread-3 try too many times
Thread-1 try too many times
執行結果

 3,定時器

 定時器,指定n秒后執行某操作

from threading import Timer

def hello():
    print("hello, world")

t = Timer(1, hello)
t.start()  # after 1 seconds, "hello, world" will be printed

  實例:驗證碼的實現

from threading import Timer
import random

class Code:
    def __init__(self):
        self.make_cache()

    def make_cache(self, interval=60):  # interval 設置默認等待時間為5秒鍾
        self.cache=self.make_code()
        print(self.cache)
        self.t = Timer(interval, self.make_cache)
        self.t.start()

    def make_code(self, n=4):
        res = ''
        for i in range(n):
            s1 = str(random.randint(0, 9))  # 隨機取出ASCII表里面數字,並轉為字符,方便后面拼接
            s2 = chr(random.randint(65, 90))  # 隨機取出ASCII表中大小寫字母
            res += random.choice([s1, s2])
        return res

    def check(self):
        while True:
            code = input('請輸入你的驗證碼>>: ').strip()
            if code.upper() == self.cache:
                print('驗證碼輸入正確')
                self.t.cancel()
                break


obj = Code()
obj.check()

9,線程queue

 queue is especially useful in threaded programming when information must be exchanged safely between multiple threads.

 有三種不同的用法

 ①class queue.Queue(maxsize=0) #隊列:先進先出

import queue

q = queue.Queue(3)  # 先進先出->隊列

q.put('first')
q.put(2)
q.put('third')
# q.put(4)
# q.put(4,block=False) #q.put_nowait(4)
# q.put(4,block=True,timeout=3)  # block = True時,阻塞,timeout=3,等待三秒后如果還沒有從里面取出數據,則阻塞


print(q.get())
print(q.get())
print(q.get())
# print(q.get(block=False))  # block=False相等於q.get_nowait()
# print(q.get_nowait())

# print(q.get(block=True,timeout=3))

### 后果先進先出
first
2
third

 ②class queue.LifoQueue(maxsize=0) #堆棧:last in fisrt out

q=queue.LifoQueue(3) #后進先出->堆棧(list in First out Queue)
q.put('first')
q.put(2)
q.put('third')

print(q.get())
print(q.get())
print(q.get())

###結果 先進后出
third
2
first

 ③class queue.PriorityQueue(maxsize=0) #優先級隊列:存儲數據時可設置優先級的隊列

q=queue.PriorityQueue(3)  # 優先級隊列

q.put((10,'one'))
q.put((40,'two'))
q.put((30,'three'))

print(q.get())
print(q.get())
print(q.get())

### 結果根據優先級打印出
10
40
30

10,進程池與線程池

 基於多進程或多線程實現並發的套接字通信會使服務的開啟的進程數或線程數都會隨着並發的客戶端數目地增多而增多最后可能因不堪重負而癱瘓,而進程池或線程池的用途就是對服務端開啟的進程數或線程數加以控制,讓機器在一個自己可以承受的范圍內運行,例如進程池,就是用來存放進程的池子,本質還是基於多進程,只不過是對開啟進程的數目加上了限制

官網:https://docs.python.org/dev/library/concurrent.futures.html

concurrent.futures模塊提供了高度封裝的異步調用接口
ThreadPoolExecutor:線程池,提供異步調用
ProcessPoolExecutor: 進程池,提供異步調用
Both implement the same interface, which is defined by the abstract Executor class.
介紹

 

 基本方法:

1、submit(fn, *args, **kwargs)
異步提交任務

2、map(func, *iterables, timeout=None, chunksize=1) 
取代for循環submit的操作

3、shutdown(wait=True) 
相當於進程池的pool.close()+pool.join()操作
wait=True,等待池內所有任務執行完畢回收完資源后才繼續
wait=False,立即返回,並不會等待池內的任務執行完畢
但不管wait參數為何值,整個程序都會等到所有任務執行完畢
submit和map必須在shutdown之前

4、result(timeout=None)
取得結果

5、add_done_callback(fn)
回調函數

 

 兩者用法

分別導入concurrent.futures下面的ProcessPoolExecutor 進程模塊以及ThreadPoolExecutor線程模塊

from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
import os, time, random


def task(name):
    print('name:%s pid:%s run' % (name, os.getpid()))
    time.sleep(random.randint(1, 3))


if __name__ == '__main__':
    pool = ProcessPoolExecutor(4)  # 設置最大同時運行的進程數
    # pool = ThreadPoolExecutor(5)  # 設置最大同時運行的線程數

    for i in range(10):
        pool.submit(task, 'qiuma %s' % i)  # 異步提交任務,提交后不用管進程是否執行

    pool.shutdown(wait=True)  # 將進程池的入口關閉,等待任務提交結束后才執行后面的任務
                              # 默認wait=True
    print('主')

###進程池運行結果
name:qiuma 0 pid:15028 run
name:qiuma 1 pid:5524 run
name:qiuma 2 pid:16012 run
name:qiuma 3 pid:8032 run
name:qiuma 4 pid:15028 run
name:qiuma 5 pid:15028 run
name:qiuma 6 pid:5524 run
name:qiuma 7 pid:8032 run
name:qiuma 8 pid:16012 run
name:qiuma 9 pid:8032 run
主

11,異步調用與回調機制

異步調用與回調機制是提交任務的兩種方式,

# 提交任務的兩種方式
# 1、同步調用:提交完任務后,就在原地等待任務執行完畢,拿到結果,再執行下一行代碼,導致程序是串行執行,同步是提交任務的一種方式,不是阻塞的結果

from concurrent.futures import ThreadPoolExecutor
import time
import random


def la(name):
    print('%s is laing' % name)
    time.sleep(random.randint(3, 5))
    res = random.randint(7, 13)*'#'  # 模擬隨機長度的字符串,代表每個人拉的量不一樣
    return {'name': name, 'res': res}


def weigh(shit):
    name = shit['name']
    size = len(shit['res'])
    print('%s 拉了 《%s》kg' % (name, size))


if __name__ == '__main__':
    pool = ThreadPoolExecutor(13)

    shit1 = pool.submit(la, 'alex').result()
    weigh(shit1)

    shit2 = pool.submit(la, 'wupeiqi').result()  # result取得結果
    weigh(shit2)

    shit3 = pool.submit(la, 'yuanhao').result()
    weigh(shit3)

print("#############################")

# 2、異步調用:提交完任務后,不地等待任務執行完畢,

from concurrent.futures import ThreadPoolExecutor
import time
import random


def la(name):
    print('%s is laing' % name)
    time.sleep(random.randint(3, 5))
    res = random.randint(7, 13)*'#'
    return {'name': name, 'res': res}


def weigh(shit):
    shit = shit.result()
    name = shit['name']
    size = len(shit['res'])
    print('%s 拉了 %skg' % (name, size))


if __name__ == '__main__':
    pool = ThreadPoolExecutor(13)

    pool.submit(la, 'alex').add_done_callback(weigh)

    pool.submit(la, 'wupeiqi').add_done_callback(weigh)

    pool.submit(la, 'yuanhao').add_done_callback(weigh)

# 自動觸發weigh這個功能就叫回調函數,前面調用了la函數,后面會自動觸發weigh這個功能


###
alex is laing
alex 拉了 《11》kg
wupeiqi is laing
wupeiqi 拉了 《9》kg
yuanhao is laing
yuanhao 拉了 《13》kg
#############################
alex is laing
wupeiqi is laing
yuanhao is laing
alex 拉了 9kg
yuanhao 拉了 10kg
wupeiqi 拉了 7kg

  

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM