python/進程同步鎖
python/同步鎖
同步鎖:通常被用來實現共享資源的同步訪問,為每一個共享資源創建一個Lock對象當你需要訪問該資源時,調用qcuqire方法來獲取鎖對象(如果其他線程已經獲得該鎖,則當前線程需等待期被釋放),待資源訪問完后,在調用release方法釋放鎖
實例如下:
1 #同步鎖 2 import time #導入時間模塊 3 import threading #導入threading模塊 4 num=100 #設置一個全局變量 5 lock=threading.Lock() 6 def sudnum(): #定一個函數sudnum' 7 global num #聲明全局變量 8 lock.acquire() 9 temp=num #讀取全局變量num 10 time.sleep(0) #增加一個休眠功能 11 num=temp-1 #把從全局拿來的變量進行減一的操作 12 lock.release() 13 l=[] #在全局創建一個空了表 14 for i in range(100): #從0到100進行循環 15 t=threading.Thread(target=sudnum) #在循環中創建子線程,共創建100個 16 t.start() #循環啟動子線程 17 l.append(t) #把循環創建的實例化添加到列表中 18 19 for f in l: #從列表里遍歷內容給f: 20 f.join() #循環設置列表的內容結束 21 22 print('Result:',num) #打印通過多次子線程更改過的變量內容 23 運行結果 24 Result: 0 25 26 Process finished with exit code 0
死鎖:
所謂死鎖,就是指倆個或倆個以上的進程或線程在執行過程中,因爭奪資源而造成的一種互相等待的現象,若無外力作用,它們都將無法說推進下去
實例如下:
1 #死鎖 2 import threading #導入模塊 3 import time #導入模塊 4 5 mutexA = threading.Lock() #把threading下Lock類賦值給mutexA 6 mutexB = threading.Lock() #把threading下Lock類賦值給mutexB 7 8 class MyThread(threading.Thread): #定義MyThread類 並繼承threading下的Thread類功能 9 10 def __init__(self): #初始化實例化 11 threading.Thread.__init__(self) #初始父類實例化 12 13 def run(self): #定義run函數 (此函數是固定函數) 14 self.fun1() #實例化對象引用執行fun1函數 15 self.fun2() #實例化對象引用執行fun2函數 16 17 def fun1(self): #定義fun1函數 18 19 mutexA.acquire() # 獲取公共鎖,如果鎖被占用,則阻塞在這里,等待鎖的釋放 20 21 print ("I am %s , get res: %s---%s" %(self.name, "ResA",time.time())) 22 23 mutexB.acquire() # 獲取公共鎖,如果鎖被占用,則阻塞在這里,等待鎖的釋放 24 print ("I am %s , get res: %s---%s" %(self.name, "ResB",time.time())) 25 mutexB.release() # 釋放公共鎖 26 27 mutexA.release() # 釋放公共鎖 28 29 30 def fun2(self): #定義fun2函數 31 32 mutexB.acquire() # 獲取公共鎖,如果鎖被占用,則阻塞在這里,等待鎖的釋放 33 print ("I am %s , get res: %s---%s" %(self.name, "ResB",time.time())) 34 time.sleep(0.2) 35 36 mutexA.acquire() # 獲取公共鎖,如果鎖被占用,則阻塞在這里,等待鎖的釋放 37 print ("I am %s , get res: %s---%s" %(self.name, "ResA",time.time())) 38 mutexA.release() # 釋放公共鎖 39 40 mutexB.release() # 釋放公共鎖 41 42 if __name__ == "__main__": 43 44 print("start---------------------------%s"%time.time()) 45 46 for i in range(0, 10): 47 my_thread = MyThread() 48 my_thread.start() 49 50 運行結果 51 start---------------------------1494320240.1851542 52 I am Thread-1 , get res: ResA---1494320240.1856549 53 I am Thread-1 , get res: ResB---1494320240.1861556 54 I am Thread-1 , get res: ResB---1494320240.1861556 55 I am Thread-2 , get res: ResA---1494320240.186656
實際for循環10次,就是創建10個子線程,但是執行結果就運行到第二個子線程和第一子線程就出現了死鎖的現象,第一個子線程把A鎖釋放掉時第二個子線程獲取到A鎖。第一個子線程釋放了B鎖,然后又獲取了B鎖,現在第二個子線程獲得了A鎖,第一個子線程獲得了B鎖,第二個子線程想要獲取B鎖,但是第一個子線程沒有釋放掉。第一個子線程想要獲取到A鎖 第二個子線程沒有釋放。就出現倆個子線程都相互等對方釋放獲取的鎖。
遞歸鎖:
1 #遞歸鎖 2 import threading #導入模塊 3 import time #導入模塊 4 5 RLock = threading.RLock() #把threading下RLock類賦值給RLock 6 7 8 class MyThread(threading.Thread): #定義MyThread類 並繼承threading下的Thread類功能 9 10 def __init__(self): #初始化實例化 11 threading.Thread.__init__(self) #初始父類實例化 12 13 def run(self): #定義run函數 (此函數是固定函數) 14 self.fun1() #實例化對象引用執行fun1函數 15 self.fun2() #實例化對象引用執行fun2函數 16 17 def fun1(self): #定義fun1函數 18 19 RLock.acquire() # 獲取公共鎖,如果鎖被占用,則阻塞在這里,等待鎖的釋放 20 21 print ("I am %s , na res: %s---%s" %(self.name, "ResA",time.time())) 22 23 RLock.acquire() # 獲取公共鎖,如果鎖被占用,則阻塞在這里,等待鎖的釋放 24 print ("I am %s , na res: %s---%s" %(self.name, "ResB",time.time())) 25 RLock.release() # 釋放公共鎖 26 27 RLock.release() # 釋放公共鎖 28 29 30 def fun2(self): #定義fun2函數 31 32 RLock.acquire() # 獲取公共鎖,如果鎖被占用,則阻塞在這里,等待鎖的釋放 33 print ("I am %s , na res: %s---%s" %(self.name, "ResB",time.time())) 34 time.sleep(0.2) 35 36 RLock.acquire() # 獲取公共鎖,如果鎖被占用,則阻塞在這里,等待鎖的釋放 37 print ("I am %s , na res: %s---%s" %(self.name, "ResA",time.time())) 38 RLock.release() # 釋放公共鎖 39 40 RLock.release() # 釋放公共鎖 41 42 if __name__ == "__main__": 43 44 print("start---------------------------%s"%time.time()) 45 46 for i in range(0, 10): 47 my_thread = MyThread() 48 my_thread.start() 49 50 運行結果 51 start---------------------------1494324391.4339159 52 I am Thread-1 , na res: ResA---1494324391.4344165 53 I am Thread-1 , na res: ResB---1494324391.4344165 54 I am Thread-1 , na res: ResB---1494324391.4344165 55 I am Thread-1 , na res: ResA---1494324391.63575 56 I am Thread-2 , na res: ResA---1494324391.63575 57 I am Thread-2 , na res: ResB---1494324391.63575 58 I am Thread-2 , na res: ResB---1494324391.63575 59 I am Thread-2 , na res: ResA---1494324391.836299 60 I am Thread-4 , na res: ResA---1494324391.836299 61 I am Thread-4 , na res: ResB---1494324391.8367958 62 I am Thread-4 , na res: ResB---1494324391.8367958 63 I am Thread-4 , na res: ResA---1494324392.040432 64 I am Thread-6 , na res: ResA---1494324392.040432 65 I am Thread-6 , na res: ResB---1494324392.040432 66 I am Thread-7 , na res: ResA---1494324392.040432 67 I am Thread-7 , na res: ResB---1494324392.040432 68 I am Thread-7 , na res: ResB---1494324392.040432 69 I am Thread-7 , na res: ResA---1494324392.2415655 70 I am Thread-9 , na res: ResA---1494324392.2415655 71 I am Thread-9 , na res: ResB---1494324392.2420657 72 I am Thread-9 , na res: ResB---1494324392.2420657 73 I am Thread-9 , na res: ResA---1494324392.4427023 74 I am Thread-3 , na res: ResA---1494324392.4427023 75 I am Thread-3 , na res: ResB---1494324392.4427023 76 I am Thread-3 , na res: ResB---1494324392.4427023 77 I am Thread-3 , na res: ResA---1494324392.643367 78 I am Thread-6 , na res: ResB---1494324392.643367 79 I am Thread-6 , na res: ResA---1494324392.8445525 80 I am Thread-8 , na res: ResA---1494324392.8445525 81 I am Thread-8 , na res: ResB---1494324392.8445525 82 I am Thread-8 , na res: ResB---1494324392.8445525 83 I am Thread-8 , na res: ResA---1494324393.0449915 84 I am Thread-5 , na res: ResA---1494324393.0449915 85 I am Thread-5 , na res: ResB---1494324393.0449915 86 I am Thread-5 , na res: ResB---1494324393.0449915 87 I am Thread-5 , na res: ResA---1494324393.2456653 88 I am Thread-10 , na res: ResA---1494324393.2456653 89 I am Thread-10 , na res: ResB---1494324393.2456653 90 I am Thread-10 , na res: ResB---1494324393.2456653 91 I am Thread-10 , na res: ResA---1494324393.446061 92 93 Process finished with exit code 0
遞歸鎖就是調用threading下的RLock類功能實現的,RLock它自帶有計數功能,每讓線程獲取到以后就會就進行自加一的功能(RLock默認數值是0,只要RLock不是0線程就不能進行獲取),只要進行一進行釋放功能RLock就會進行自減一的功能直到為0時。
Event對象:
線程的一個關鍵特性是每個線程都是獨立運行且狀態不可預測。如果程序中的其他線程需要通過判斷某個線程的狀態來確定自己下一步的操作,這時線程同步問題就會變非常棘手。為了解決這些問題,我們需要使用threading庫中的Event對象。對象包含一個可有線程設置的信號標志,它允許線程等待某些事情的發生。在初始情況下,Event對象的標志為假,name這個線程將會被一直阻塞至該標志為真。一個線程如果講義個Event對象的信號標志設置為真,他將喚醒所有等待這個Event對象的線程。如果一個線程等待一個已經被設置為真的Event對象,那么它將忽略這個事情,繼續執行
1 1 event.isSet()返回event的狀態值 2 2 3 3 event.wait()如果event.isSet()==False將阻塞線程 4 4 5 5 event.set()設置event的狀態值為True,所有阻塞池的線程激活進入就緒狀態,等待操作系統調度 6 6 7 7 event.clear() 恢復event的狀態值為Flase
可以考慮一種應用場景,例如,我們有多少個線程從Redis隊列中讀取數據來處理,這些線程都要嘗試去連接Redis的服務,一般情況下,如果Redis連接不成功,在各個線程的代碼中,都會去嘗試重新連接。如果我們想要再啟動是確保Redis服務正常,才讓那些工作線程去連接Redis服務器,那么我們就可以采用threading.Even機制來協調各個工作線程的連接操作:主線程中回去嘗試連接Redis服務,如果正常的話,觸發事件,各工作線程會嘗試連接Redis服務。
實例如下:
1 import threading 2 import time 3 import logging 4 5 logging.basicConfig(level=logging.DEBUG, format='(%(threadName)-10s) %(message)s',) 6 7 def worker(event): 8 logging.debug('Waiting for redis ready...') 9 event.wait() 10 logging.debug('redis ready, and connect to redis server and do some work [%s]', time.ctime()) 11 time.sleep(1) 12 13 def main(): 14 readis_ready = threading.Event() 15 t1 = threading.Thread(target=worker, args=(readis_ready,), name='t1') 16 t1.start() 17 18 t2 = threading.Thread(target=worker, args=(readis_ready,), name='t2') 19 t2.start() 20 21 logging.debug('first of all, check redis server, make sure it is OK, and then trigger the redis ready event') 22 time.sleep(3) 23 readis_ready.set() 24 25 if __name__=="__main__": 26 main() 27 運行結果 28 (t1 ) Waiting for redis ready... 29 (t2 ) Waiting for redis ready... 30 (MainThread) first of all, check redis server, make sure it is OK, and then trigger the redis ready event 31 (t1 ) redis ready, and connect to redis server and do some work [Tue May 9 19:10:09 2017] 32 (t2 ) redis ready, and connect to redis server and do some work [Tue May 9 19:10:09 2017] 33 34 Process finished with exit code 0
threading.Event的wait方法還接受一個超時參數,默認情況下如果事情一致沒有發生,wait方法會一直阻塞下去,而加入這個超時參數之后,如果阻塞時間超過這個參數設定的值之后,wait方法會返回。對應於上面的應用場景,如果Redis服務器一致沒有啟動,我們希望子線程能夠打印一些日志來不斷地提醒我們當前沒有一個可以連接的Redis服務,我們就可以通過設置這個超時參數來表達成這樣的目的:
semaphore(信號量)
Semaphore管理一個內置的計算器
每當調用acquire()時內置計數器-1
調用release()時內置計算器-1
計算器不能小於0,檔計數器為0時,acquire()將阻塞線程直到其他線程調用release()
實例:(同時只有5個線程可以獲得semaphore,即可以限制最大連接數為5)
1 import threading 2 import time 3 4 semaphore=threading.Semaphore(5) #最大一次性進行次數 5 6 def func(): 7 semaphore.acquire() 8 print(threading.currentThread().getName()+'grt semaphore') 9 time.sleep(2) 10 semaphore.release() 11 for i in range(20): 12 t1=threading.Thread(target=func) 13 t1.start() 14 15 運行結果 16 Thread-1grt semaphore 17 Thread-2grt semaphore 18 Thread-3grt semaphore 19 Thread-4grt semaphore 20 Thread-5grt semaphore 21 Thread-6grt semaphore 22 Thread-7grt semaphore 23 Thread-8grt semaphore 24 Thread-9grt semaphore 25 Thread-10grt semaphore 26 Thread-12grt semaphore 27 Thread-13grt semaphore 28 Thread-14grt semaphore 29 Thread-15grt semaphore 30 Thread-11grt semaphore 31 Thread-17grt semaphore 32 Thread-18grt semaphore 33 Thread-19grt semaphore 34 Thread-20grt semaphore 35 Thread-16grt semaphore 36 37 Process finished with exit code 0
multiprocessing模塊:
multiprocessing包是Python中多進程管包。與threading.Thread類似,他可以利用multiprocessing.Procsess對象來創建一個進程。該進程可以運行在python程序內部編寫的函數。該Process對象與Thread的用法相同,也有start()run()join()的方法。此外multiorcessing包中也有Lock/Event/Semaphore/Condition類(這些對象可以像多線程那樣,通過參數傳遞給各個進程),用以同步進程,器用法與threading包中的同名類一致。所以,
multiprocessing的很大一部分與threading使用同一套API(接口),只不過換到了多進程的情境。
python的進程調用
方法一:
1 ##Process類調用 2 from multiprocessing import Process 3 import time 4 def f(name): 5 6 print('hello',name,time.ctime()) 7 time.sleep(1) 8 9 if __name__ == '__main__': 10 l=[] 11 for i in range(3): 12 p=Process(target=('alvin:%s'%i)) 13 l.append(p) 14 p.start() 15 for i in l: 16 i.join() 17 print('ending')
方法二:
1 ##繼承Peocess類調用 2 from multiprocessing import Process 3 import time 4 class MyProcess(Process): 5 def __init__(self): 6 super(MyProcess, self).__init__() 7 8 def run(self): 9 print('hello',self.name,time.ctime()) 10 time.sleep(1) 11 12 if __name__ == '__main__': 13 l=[] 14 for i in range(3): 15 p=MyProcess() 16 p.start() 17 l.append(p) 18 for i in l: 19 i.join() 20 21 print('engding')
process類
構造方法:
Process([group [, target [, name [, args [, kwargs]]]]])
group:線程組,目前還沒有實現,庫引用中提示必須是None
target:要執行的方法
name:進程名
args/kwarges:要傳入方法的參數。
實例方法:
is_aive()返回進程是否在運行
join([timeout])阻塞當期那上下文環境的進程,直到調用此方法的進程終止或到達指定的timeout(可選參數)
start()進程准備就緒,等待CPU調度
run()stat()調用run方法,如果實力進程時未制定傳入target,這star執行t默認run()方法
terminate()不管任務是否完成,立即停止工作進程
屬性:
daemon 和線程的setDeanon功能一樣
name 進程名字
pid 進程號
1 from multiprocessing import Process 2 import os 3 import time 4 def info(name): 5 6 7 print("name:",name) 8 print('parent process:', os.getppid()) 9 print('process id:', os.getpid()) 10 print("------------------") 11 time.sleep(1) 12 13 def foo(name): 14 15 info(name) 16 17 if __name__ == '__main__': 18 19 info('main process line') 20 21 22 p1 = Process(target=info, args=('alvin',)) 23 p2 = Process(target=foo, args=('egon',)) 24 p1.start() 25 p2.start() 26 27 p1.join() 28 p2.join() 29 30 print("ending")
通過tasklist(Win)或者ps-elf|grep(linux)命令檢測每一個進程號(PID)對應的進程名
協程:
yiel與協程
import time def consumer(): r='' while True: n=yield r if not n: return print('[CONSUMER]---Consuming %s...'%n) time.sleep(1) r='200 OK' def prduce(c): next(c) n=0 while n<5: n+=1 print('[CONSUMER]---Consuming %s...' % n) cr=c.send(n) print('[CONSUMER]---Consuming %s...'%n) c.close() if __name__ == '__main__': c=consumer() prduce(c) 運行結果 [CONSUMER]---Consuming 1... [CONSUMER]---Consuming 1... [CONSUMER]---Consuming 1... [CONSUMER]---Consuming 2... [CONSUMER]---Consuming 2... [CONSUMER]---Consuming 2... [CONSUMER]---Consuming 3... [CONSUMER]---Consuming 3... [CONSUMER]---Consuming 3... [CONSUMER]---Consuming 4... [CONSUMER]---Consuming 4... [CONSUMER]---Consuming 4... [CONSUMER]---Consuming 5... [CONSUMER]---Consuming 5... [CONSUMER]---Consuming 5... Process finished with exit code 0