python/進程同步鎖


python/進程同步鎖

python/同步鎖

同步鎖:通常被用來實現共享資源的同步訪問,為每一個共享資源創建一個Lock對象當你需要訪問該資源時,調用qcuqire方法來獲取鎖對象(如果其他線程已經獲得該鎖,則當前線程需等待期被釋放),待資源訪問完后,在調用release方法釋放鎖

實例如下:

 1 #同步鎖
 2 import time   #導入時間模塊
 3 import threading  #導入threading模塊
 4 num=100    #設置一個全局變量
 5 lock=threading.Lock()
 6 def sudnum():   #定一個函數sudnum'
 7     global num    #聲明全局變量
 8     lock.acquire()
 9     temp=num     #讀取全局變量num
10     time.sleep(0)   #增加一個休眠功能
11     num=temp-1     #把從全局拿來的變量進行減一的操作
12     lock.release()
13 l=[]    #在全局創建一個空了表
14 for i in range(100):   #從0到100進行循環
15     t=threading.Thread(target=sudnum)   #在循環中創建子線程,共創建100個
16     t.start()   #循環啟動子線程
17     l.append(t)   #把循環創建的實例化添加到列表中
18 
19 for f in l:  #從列表里遍歷內容給f:
20     f.join()  #循環設置列表的內容結束
21 
22 print('Result:',num)   #打印通過多次子線程更改過的變量內容
23 運行結果
24 Result: 0
25 
26 Process finished with exit code 0

死鎖:

所謂死鎖,就是指倆個或倆個以上的進程或線程在執行過程中,因爭奪資源而造成的一種互相等待的現象,若無外力作用,它們都將無法說推進下去

實例如下:

 1 #死鎖
 2 import threading    #導入模塊
 3 import time         #導入模塊
 4 
 5 mutexA = threading.Lock()   #把threading下Lock類賦值給mutexA
 6 mutexB = threading.Lock()   #把threading下Lock類賦值給mutexB
 7 
 8 class MyThread(threading.Thread):   #定義MyThread類 並繼承threading下的Thread類功能
 9 
10     def __init__(self):     #初始化實例化
11         threading.Thread.__init__(self)   #初始父類實例化
12 
13     def run(self):     #定義run函數 (此函數是固定函數)
14         self.fun1()    #實例化對象引用執行fun1函數
15         self.fun2()    #實例化對象引用執行fun2函數
16 
17     def fun1(self):    #定義fun1函數
18 
19         mutexA.acquire()  # 獲取公共鎖,如果鎖被占用,則阻塞在這里,等待鎖的釋放
20 
21         print ("I am %s , get res: %s---%s" %(self.name, "ResA",time.time()))
22 
23         mutexB.acquire()  # 獲取公共鎖,如果鎖被占用,則阻塞在這里,等待鎖的釋放
24         print ("I am %s , get res: %s---%s" %(self.name, "ResB",time.time()))
25         mutexB.release()  # 釋放公共鎖
26 
27         mutexA.release()  # 釋放公共鎖
28 
29 
30     def fun2(self):   #定義fun2函數
31 
32         mutexB.acquire()  # 獲取公共鎖,如果鎖被占用,則阻塞在這里,等待鎖的釋放
33         print ("I am %s , get res: %s---%s" %(self.name, "ResB",time.time()))
34         time.sleep(0.2)
35 
36         mutexA.acquire()  # 獲取公共鎖,如果鎖被占用,則阻塞在這里,等待鎖的釋放
37         print ("I am %s , get res: %s---%s" %(self.name, "ResA",time.time()))
38         mutexA.release()  # 釋放公共鎖
39 
40         mutexB.release()  # 釋放公共鎖
41 
42 if __name__ == "__main__":
43 
44     print("start---------------------------%s"%time.time())
45 
46     for i in range(0, 10):
47         my_thread = MyThread()
48         my_thread.start()
49 
50 運行結果
51 start---------------------------1494320240.1851542
52 I am Thread-1 , get res: ResA---1494320240.1856549
53 I am Thread-1 , get res: ResB---1494320240.1861556
54 I am Thread-1 , get res: ResB---1494320240.1861556
55 I am Thread-2 , get res: ResA---1494320240.186656

實際for循環10次,就是創建10個子線程,但是執行結果就運行到第二個子線程和第一子線程就出現了死鎖的現象,第一個子線程把A鎖釋放掉時第二個子線程獲取到A鎖。第一個子線程釋放了B鎖,然后又獲取了B鎖,現在第二個子線程獲得了A鎖,第一個子線程獲得了B鎖,第二個子線程想要獲取B鎖,但是第一個子線程沒有釋放掉。第一個子線程想要獲取到A鎖 第二個子線程沒有釋放。就出現倆個子線程都相互等對方釋放獲取的鎖。

遞歸鎖:

 1 #遞歸鎖
 2 import threading    #導入模塊
 3 import time         #導入模塊
 4 
 5 RLock = threading.RLock()   #把threading下RLock類賦值給RLock
 6 
 7 
 8 class MyThread(threading.Thread):   #定義MyThread類 並繼承threading下的Thread類功能
 9 
10     def __init__(self):     #初始化實例化
11         threading.Thread.__init__(self)   #初始父類實例化
12 
13     def run(self):     #定義run函數 (此函數是固定函數)
14         self.fun1()    #實例化對象引用執行fun1函數
15         self.fun2()    #實例化對象引用執行fun2函數
16 
17     def fun1(self):    #定義fun1函數
18 
19         RLock.acquire()  # 獲取公共鎖,如果鎖被占用,則阻塞在這里,等待鎖的釋放
20 
21         print ("I am %s , na res: %s---%s" %(self.name, "ResA",time.time()))
22 
23         RLock.acquire()  # 獲取公共鎖,如果鎖被占用,則阻塞在這里,等待鎖的釋放
24         print ("I am %s , na res: %s---%s" %(self.name, "ResB",time.time()))
25         RLock.release()  # 釋放公共鎖
26 
27         RLock.release()  # 釋放公共鎖
28 
29 
30     def fun2(self):   #定義fun2函數
31 
32         RLock.acquire()  # 獲取公共鎖,如果鎖被占用,則阻塞在這里,等待鎖的釋放
33         print ("I am %s , na res: %s---%s" %(self.name, "ResB",time.time()))
34         time.sleep(0.2)
35 
36         RLock.acquire()  # 獲取公共鎖,如果鎖被占用,則阻塞在這里,等待鎖的釋放
37         print ("I am %s , na res: %s---%s" %(self.name, "ResA",time.time()))
38         RLock.release()  # 釋放公共鎖
39 
40         RLock.release()  # 釋放公共鎖
41 
42 if __name__ == "__main__":
43 
44     print("start---------------------------%s"%time.time())
45 
46     for i in range(0, 10):
47         my_thread = MyThread()
48         my_thread.start()
49 
50 運行結果
51 start---------------------------1494324391.4339159
52 I am Thread-1 , na res: ResA---1494324391.4344165
53 I am Thread-1 , na res: ResB---1494324391.4344165
54 I am Thread-1 , na res: ResB---1494324391.4344165
55 I am Thread-1 , na res: ResA---1494324391.63575
56 I am Thread-2 , na res: ResA---1494324391.63575
57 I am Thread-2 , na res: ResB---1494324391.63575
58 I am Thread-2 , na res: ResB---1494324391.63575
59 I am Thread-2 , na res: ResA---1494324391.836299
60 I am Thread-4 , na res: ResA---1494324391.836299
61 I am Thread-4 , na res: ResB---1494324391.8367958
62 I am Thread-4 , na res: ResB---1494324391.8367958
63 I am Thread-4 , na res: ResA---1494324392.040432
64 I am Thread-6 , na res: ResA---1494324392.040432
65 I am Thread-6 , na res: ResB---1494324392.040432
66 I am Thread-7 , na res: ResA---1494324392.040432
67 I am Thread-7 , na res: ResB---1494324392.040432
68 I am Thread-7 , na res: ResB---1494324392.040432
69 I am Thread-7 , na res: ResA---1494324392.2415655
70 I am Thread-9 , na res: ResA---1494324392.2415655
71 I am Thread-9 , na res: ResB---1494324392.2420657
72 I am Thread-9 , na res: ResB---1494324392.2420657
73 I am Thread-9 , na res: ResA---1494324392.4427023
74 I am Thread-3 , na res: ResA---1494324392.4427023
75 I am Thread-3 , na res: ResB---1494324392.4427023
76 I am Thread-3 , na res: ResB---1494324392.4427023
77 I am Thread-3 , na res: ResA---1494324392.643367
78 I am Thread-6 , na res: ResB---1494324392.643367
79 I am Thread-6 , na res: ResA---1494324392.8445525
80 I am Thread-8 , na res: ResA---1494324392.8445525
81 I am Thread-8 , na res: ResB---1494324392.8445525
82 I am Thread-8 , na res: ResB---1494324392.8445525
83 I am Thread-8 , na res: ResA---1494324393.0449915
84 I am Thread-5 , na res: ResA---1494324393.0449915
85 I am Thread-5 , na res: ResB---1494324393.0449915
86 I am Thread-5 , na res: ResB---1494324393.0449915
87 I am Thread-5 , na res: ResA---1494324393.2456653
88 I am Thread-10 , na res: ResA---1494324393.2456653
89 I am Thread-10 , na res: ResB---1494324393.2456653
90 I am Thread-10 , na res: ResB---1494324393.2456653
91 I am Thread-10 , na res: ResA---1494324393.446061
92 
93 Process finished with exit code 0

遞歸鎖就是調用threading下的RLock類功能實現的,RLock它自帶有計數功能,每讓線程獲取到以后就會就進行自加一的功能(RLock默認數值是0,只要RLock不是0線程就不能進行獲取),只要進行一進行釋放功能RLock就會進行自減一的功能直到為0時。

Event對象:

線程的一個關鍵特性是每個線程都是獨立運行且狀態不可預測。如果程序中的其他線程需要通過判斷某個線程的狀態來確定自己下一步的操作,這時線程同步問題就會變非常棘手。為了解決這些問題,我們需要使用threading庫中的Event對象。對象包含一個可有線程設置的信號標志,它允許線程等待某些事情的發生。在初始情況下,Event對象的標志為假,name這個線程將會被一直阻塞至該標志為真。一個線程如果講義個Event對象的信號標志設置為真,他將喚醒所有等待這個Event對象的線程。如果一個線程等待一個已經被設置為真的Event對象,那么它將忽略這個事情,繼續執行

1 1 event.isSet()返回event的狀態值
2 2 
3 3 event.wait()如果event.isSet()==False將阻塞線程
4 4 
5 5 event.set()設置event的狀態值為True,所有阻塞池的線程激活進入就緒狀態,等待操作系統調度
6 6 
7 7 event.clear() 恢復event的狀態值為Flase

 

可以考慮一種應用場景,例如,我們有多少個線程從Redis隊列中讀取數據來處理,這些線程都要嘗試去連接Redis的服務,一般情況下,如果Redis連接不成功,在各個線程的代碼中,都會去嘗試重新連接。如果我們想要再啟動是確保Redis服務正常,才讓那些工作線程去連接Redis服務器,那么我們就可以采用threading.Even機制來協調各個工作線程的連接操作:主線程中回去嘗試連接Redis服務,如果正常的話,觸發事件,各工作線程會嘗試連接Redis服務。

實例如下:

 1 import threading
 2 import time
 3 import logging
 4 
 5 logging.basicConfig(level=logging.DEBUG, format='(%(threadName)-10s) %(message)s',)
 6 
 7 def worker(event):
 8     logging.debug('Waiting for redis ready...')
 9     event.wait()
10     logging.debug('redis ready, and connect to redis server and do some work [%s]', time.ctime())
11     time.sleep(1)
12 
13 def main():
14     readis_ready = threading.Event()
15     t1 = threading.Thread(target=worker, args=(readis_ready,), name='t1')
16     t1.start()
17 
18     t2 = threading.Thread(target=worker, args=(readis_ready,), name='t2')
19     t2.start()
20 
21     logging.debug('first of all, check redis server, make sure it is OK, and then trigger the redis ready event')
22     time.sleep(3)
23     readis_ready.set()
24 
25 if __name__=="__main__":
26     main()
27 運行結果
28 (t1        ) Waiting for redis ready...
29 (t2        ) Waiting for redis ready...
30 (MainThread) first of all, check redis server, make sure it is OK, and then trigger the redis ready event
31 (t1        ) redis ready, and connect to redis server and do some work [Tue May  9 19:10:09 2017]
32 (t2        ) redis ready, and connect to redis server and do some work [Tue May  9 19:10:09 2017]
33 
34 Process finished with exit code 0

 threading.Event的wait方法還接受一個超時參數,默認情況下如果事情一致沒有發生,wait方法會一直阻塞下去,而加入這個超時參數之后,如果阻塞時間超過這個參數設定的值之后,wait方法會返回。對應於上面的應用場景,如果Redis服務器一致沒有啟動,我們希望子線程能夠打印一些日志來不斷地提醒我們當前沒有一個可以連接的Redis服務,我們就可以通過設置這個超時參數來表達成這樣的目的:

 

semaphore(信號量)

Semaphore管理一個內置的計算器

每當調用acquire()時內置計數器-1

調用release()時內置計算器-1

計算器不能小於0,檔計數器為0時,acquire()將阻塞線程直到其他線程調用release()

實例:(同時只有5個線程可以獲得semaphore,即可以限制最大連接數為5)

 1 import threading
 2 import time
 3 
 4 semaphore=threading.Semaphore(5)  #最大一次性進行次數
 5 
 6 def func():
 7     semaphore.acquire()
 8     print(threading.currentThread().getName()+'grt semaphore')
 9     time.sleep(2)
10     semaphore.release()
11 for i in range(20):
12     t1=threading.Thread(target=func)
13     t1.start()
14 
15 運行結果
16 Thread-1grt semaphore
17 Thread-2grt semaphore
18 Thread-3grt semaphore
19 Thread-4grt semaphore
20 Thread-5grt semaphore
21 Thread-6grt semaphore
22 Thread-7grt semaphore
23 Thread-8grt semaphore
24 Thread-9grt semaphore
25 Thread-10grt semaphore
26 Thread-12grt semaphore
27 Thread-13grt semaphore
28 Thread-14grt semaphore
29 Thread-15grt semaphore
30 Thread-11grt semaphore
31 Thread-17grt semaphore
32 Thread-18grt semaphore
33 Thread-19grt semaphore
34 Thread-20grt semaphore
35 Thread-16grt semaphore
36 
37 Process finished with exit code 0

multiprocessing模塊:

multiprocessing包是Python中多進程管包。與threading.Thread類似,他可以利用multiprocessing.Procsess對象來創建一個進程。該進程可以運行在python程序內部編寫的函數。該Process對象與Thread的用法相同,也有start()run()join()的方法。此外multiorcessing包中也有Lock/Event/Semaphore/Condition類(這些對象可以像多線程那樣,通過參數傳遞給各個進程),用以同步進程,器用法與threading包中的同名類一致。所以,

multiprocessing的很大一部分與threading使用同一套API(接口),只不過換到了多進程的情境。

python的進程調用

方法一:

 1 ##Process類調用
 2 from multiprocessing import Process
 3 import time
 4 def f(name):
 5 
 6     print('hello',name,time.ctime())
 7     time.sleep(1)
 8 
 9 if __name__ == '__main__':
10     l=[]
11     for i in range(3):
12         p=Process(target=('alvin:%s'%i))
13         l.append(p)
14         p.start()
15     for i in l:
16         i.join()
17     print('ending')

方法二:

 1 ##繼承Peocess類調用
 2 from multiprocessing import Process
 3 import  time
 4 class MyProcess(Process):
 5     def __init__(self):
 6         super(MyProcess, self).__init__()
 7 
 8     def run(self):
 9         print('hello',self.name,time.ctime())
10         time.sleep(1)
11 
12 if __name__ == '__main__':
13     l=[]
14     for i in range(3):
15         p=MyProcess()
16         p.start()
17         l.append(p)
18     for i in l:
19         i.join()
20 
21     print('engding')

process類

構造方法:

Process([group [, target [, name [, args [, kwargs]]]]])

group:線程組,目前還沒有實現,庫引用中提示必須是None

target:要執行的方法

name:進程名

args/kwarges:要傳入方法的參數。

實例方法:

is_aive()返回進程是否在運行

join([timeout])阻塞當期那上下文環境的進程,直到調用此方法的進程終止或到達指定的timeout(可選參數)

start()進程准備就緒,等待CPU調度

run()stat()調用run方法,如果實力進程時未制定傳入target,這star執行t默認run()方法

terminate()不管任務是否完成,立即停止工作進程

屬性:

daemon 和線程的setDeanon功能一樣

name 進程名字

pid 進程號

 

 
        
 1 from multiprocessing import Process
 2 import os
 3 import time
 4 def info(name):
 5 
 6 
 7     print("name:",name)
 8     print('parent process:', os.getppid())
 9     print('process id:', os.getpid())
10     print("------------------")
11     time.sleep(1)
12 
13 def foo(name):
14 
15     info(name)
16 
17 if __name__ == '__main__':
18 
19     info('main process line')
20 
21 
22     p1 = Process(target=info, args=('alvin',))
23     p2 = Process(target=foo, args=('egon',))
24     p1.start()
25     p2.start()
26 
27     p1.join()
28     p2.join()
29 
30     print("ending")
通過tasklist(Win)或者ps-elf|grep(linux)命令檢測每一個進程號(PID)對應的進程名
協程:
yiel與協程
import time
def consumer():
    r=''
    while True:
        n=yield r
        if not n:
            return
        print('[CONSUMER]---Consuming %s...'%n)
        time.sleep(1)
        r='200 OK'

def prduce(c):
    next(c)
    n=0
    while n<5:
        n+=1
        print('[CONSUMER]---Consuming %s...' % n)
        cr=c.send(n)
        print('[CONSUMER]---Consuming %s...'%n)
    c.close()
if __name__ == '__main__':
    c=consumer()
    prduce(c)

運行結果
[CONSUMER]---Consuming 1...
[CONSUMER]---Consuming 1...
[CONSUMER]---Consuming 1...
[CONSUMER]---Consuming 2...
[CONSUMER]---Consuming 2...
[CONSUMER]---Consuming 2...
[CONSUMER]---Consuming 3...
[CONSUMER]---Consuming 3...
[CONSUMER]---Consuming 3...
[CONSUMER]---Consuming 4...
[CONSUMER]---Consuming 4...
[CONSUMER]---Consuming 4...
[CONSUMER]---Consuming 5...
[CONSUMER]---Consuming 5...
[CONSUMER]---Consuming 5...

Process finished with exit code 0
 
        

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM