python中的線程是假線程,不同線程之間的切換是需要耗費資源的,因為需要存儲線程的上下文,不斷的切換就會耗費資源。。
python多線程適合io操作密集型的任務(如socket server 網絡並發這一類的);
python多線程不適合cpu密集操作型的任務,主要使用cpu來計算,如大量的數學計算。
那么如果有cpu密集型的任務怎么辦,可以通過多進程來操作(不是多線程)。
假如CPU有8核,每核CPU都可以用1個進程,每個進程可以用1個線程來進行計算。
進程之間不需要使用gil鎖,因為進程是獨立的,不會共享數據。
進程可以起很多個,但是8核CPU同時只能對8個任務進行操作。
多進程
#測試多進程 import multiprocessing import time def run(name): time.sleep(2) print ('hello',name) if __name__ == '__main__': for i in range(10): #起了10個進程 p = multiprocessing.Process(target=run,args=('msc%s' %i,)) p.start() 執行結果: hello msc1 hello msc0 hello msc2 hello msc3 hello msc5 hello msc4 hello msc6 hello msc7 hello msc8 hello msc9
import multiprocessing import time,threading def thread_run(): print (threading.get_ident()) #get_ident獲取當前線程id def run(name): time.sleep(2) print ('hello',name) t = threading.Thread(target=thread_run,) #在每個進程中又起了1個線程 t.start() if __name__ == '__main__': for i in range(10): #起了10個進程 p = multiprocessing.Process(target=run,args=('msc%s' %i,)) p.start() 執行結果: hello msc0 13996 hello msc2 14208 hello msc1 13964 hello msc3 14012 hello msc6 15192 hello msc7 15136 hello msc8 7036 hello msc4 12344 hello msc9 15332 hello msc5 13616
from multiprocessing import Process import os def info(title): print(title) print('module name:', __name__) print('parent process:', os.getppid()) #獲取父進程的id print('process id:', os.getpid()) #獲取自身的id print("\n\n") def f(name): info('\033[31;1mfunction f\033[0m') print('hello', name) if __name__ == '__main__': info('\033[32;1mmain process line\033[0m') ##直接調用函數 # p = Process(target=f, args=('bob',)) # p.start() # p.join() 執行結果: main process line module name: __main__ parent process: 7172 #父進程就是python
process id: 14880 #這個子進程就是python的代碼程序
##
每個進程都會有一個父進程。
from multiprocessing import Process import os def info(title): print(title) print('module name:', __name__) print('parent process:', os.getppid()) #獲取父進程的id print('process id:', os.getpid()) #獲取自身的id print("\n\n") def f(name): info('\033[31;1mcalled from child process function f\033[0m') print('hello', name) if __name__ == '__main__': info('\033[32;1mmain process line\033[0m') p = Process(target=f, args=('msc',)) #設置子進程 p.start() #啟動子進程 # p.join() 執行結果: main process line module name: __main__ parent process: 1136 #主進程pycharm process id: 14684 #子進程python代碼 called from child process function f module name: __mp_main__ parent process: 14684 #主進程python代碼(1136的子進程) process id: 15884 #python代碼(主進程14684)中的子進程的子15884 ## 每個進程都有主進程(父進程) hello msc
進程間通訊
默認進程之間數據是不共享的,如果一定要實現互訪可以通過Queue來實現,這個Queue和線程中的Queue使用方法一樣,不過線程中的Queue只能在線程之間使用。
線程 import queue import threading def f(): q.put([42,None,'heelo']) if __name__ == '__main__': q = queue.Queue() p = threading.Thread(target=f,) p.start() print (q.get()) p.join() 執行結果: [42, None, 'heelo'] ## 通過子線程put進去數據,然后在主線程get出內容,表明線程之間數據是可以共享的。
import queue from multiprocessing import Process def f(): q.put([66,None,'hello']) #這里的q屬於主進程 if __name__ == '__main__': q = queue.Queue() #主進程起的q p = Process(target=f,) ## 在主進程中來定義子進程;如果在主進程中啟動了子進程,那么主進程和子進程之間內存是獨立的。 ## 因為內存獨立,子進程p是無法訪問主進程def f()中的q的。 p.start() print (q.get()) p.join() 執行結果: Process Process-1: Traceback (most recent call last): File "C:\Python35\lib\multiprocessing\process.py", line 249, in _bootstrap self.run() File "C:\Python35\lib\multiprocessing\process.py", line 93, in run self._target(*self._args, **self._kwargs) File "D:\Program Files (x86)\python\day31\test.py", line 7, in f q.put([66,None,'hello']) #這里的q屬於主進程 NameError: name 'q' is not defined ##可以看到已經報錯,這是因為子進程不能訪問主進程的q
import queue from multiprocessing import Process def f(qq): qq.put([66,None,'hello']) if __name__ == '__main__': q = queue.Queue() p = Process(target=f,args=(q,)) #將父進程q傳給子進程 p.start() print (q.get()) p.join() 執行結果: Traceback (most recent call last): File "D:/Program Files (x86)/python/day31/test.py", line 13, in <module> p.start() File "C:\Python35\lib\multiprocessing\process.py", line 105, in start self._popen = self._Popen(self) File "C:\Python35\lib\multiprocessing\context.py", line 212, in _Popen return _default_context.get_context().Process._Popen(process_obj) File "C:\Python35\lib\multiprocessing\context.py", line 313, in _Popen return Popen(process_obj) File "C:\Python35\lib\multiprocessing\popen_spawn_win32.py", line 66, in __init__ reduction.dump(process_obj, to_child) File "C:\Python35\lib\multiprocessing\reduction.py", line 59, in dump ForkingPickler(file, protocol).dump(obj) TypeError: can't pickle _thread.lock objects ## 這是因為我們將線程的q傳給另一個進程,這是不可以的,線程只屬於當前進程,不能傳給其他進程。 ## 如果想將q傳給子進程,那么必須將進程q傳進去,而不是線程q。
from multiprocessing import Process,Queue ##大寫的Queue是進程隊列; queue是線程隊列 ##大寫的Queue需要從multiprocessing導入 def f(qq): qq.put([66,None,'hello']) if __name__ == '__main__': q = Queue() p = Process(target=f,args=(q,)) #將父進程q傳給子進程 p.start() print (q.get()) #父進程去get子進程的內容 p.join() 執行結果: [66, None, 'hello'] ##父進程可以get子進程put進去的內容了;從表面上看感覺是兩個進程共享了數據,其實不然。 '''
現在已經實現了進程間的通訊。父進程將q傳給子進程,其實是克隆了一份q給子進程,此時子進程就多了一個q進程隊列;
但是父進程又為什么能夠get子進程put進去的數據呢,這是因為當前兩個進程在內存空間依然是獨立的,只不過子進程put的數據 通過pickle序列化放到內存中一個中間的位置,然后父進程從這個中間的位置取到數據(而不是從子進程中取的數據)。
所以進程間的通訊不是共享數據,而是一個數據的傳遞。
'''
進程之間的數據還可以通過管道的方式來通訊 from multiprocessing import Process, Pipe def f(conn): conn.send([66, None, 'hello from child1']) #發送數據給parent_conn conn.close() #發完數據需要關閉 if __name__ == '__main__': parent_conn, child_conn = Pipe() ## 生成管道。 生成時會產生兩個返回對象,這兩個對象相當於兩端的電話,通過管道線路連接。 ## 兩個對象分別交給兩個變量。 p = Process(target=f, args=(child_conn,)) #child_conn需要傳給對端,用於send數據給parent_conn p.start() print(parent_conn.recv()) #parent_conn在這端,用於recv數據 p.join() 執行結果: [66, None, 'hello from child1']
from multiprocessing import Process, Pipe def f(conn): conn.send([66, None, 'hello from child1']) conn.send([66, None, 'hello from child2']) #發送兩次數據 conn.close() if __name__ == '__main__': parent_conn, child_conn = Pipe() p = Process(target=f, args=(child_conn,)) p.start() print(parent_conn.recv()) p.join() 執行結果: [66, None, 'hello from child1'] ## 可以看到這端只接收到了一次數據
from multiprocessing import Process, Pipe def f(conn): conn.send([66, None, 'hello from child1']) conn.send([66, None, 'hello from child2']) conn.close() if __name__ == '__main__': parent_conn, child_conn = Pipe() p = Process(target=f, args=(child_conn,)) p.start() print(parent_conn.recv()) print(parent_conn.recv()) #第二次接收數據 p.join() 執行結果: [66, None, 'hello from child1'] [66, None, 'hello from child2'] ##對端發送幾次,這端就需要接收幾次
from multiprocessing import Process, Pipe def f(conn): conn.send([66, None, 'hello from child1']) conn.send([66, None, 'hello from child2']) #發送兩次數據 conn.close() if __name__ == '__main__': parent_conn, child_conn = Pipe() p = Process(target=f, args=(child_conn,)) p.start() print(parent_conn.recv()) print(parent_conn.recv()) print(parent_conn.recv()) #對端發送兩次,本段接收三次 p.join() 執行結果: [66, None, 'hello from child1'] [66, None, 'hello from child2'] ## 程序卡主了,除非對端在發送一次數據。
from multiprocessing import Process, Pipe def f(conn): conn.send([66, None, 'hello from child1']) conn.send([66, None, 'hello from child2']) #發送兩次數據 print (conn.recv()) #接收數據 conn.close() if __name__ == '__main__': parent_conn, child_conn = Pipe() p = Process(target=f, args=(child_conn,)) p.start() print(parent_conn.recv()) print(parent_conn.recv()) parent_conn.send("data from parent_conn") #發送數據 p.join() 執行結果: [66, None, 'hello from child1'] [66, None, 'hello from child2'] data from parent_conn ##通過管道實現了相互發送接收數據(實現了數據傳遞)
進程間數據交互及共享
from multiprocessing import Process, Manager import os def f(d, l, n): d[1] = '1' #放入key和value到空字典中 d['2'] = 2 d[0.25] = None l.append(n) #將每個進程的n值放入列表中;每個進程的n值都不同。 print(l) if __name__ == '__main__': with Manager() as manager: #做一個別名,此時manager就相當於Manager() d = manager.dict() #生成一個可在多個進程之間傳遞和共享的字典 l = manager.list(range(5)) #生成一個可在多個進程之間傳遞和共享的列表;通過range(5)給列表中生成5個數據 p_list = [] for i in range(10): #生成10個進程 p = Process(target=f, args=(d, l, i)) #將字典和列表傳給每個進程,每個進程可以進行修改 p.start() p_list.append(p) # 將每個進程放入空列表中 for res in p_list: res.join() print(d) #所有進程都執行完畢后打印字典 print(l) #所有進程都執行完畢后打印列表 執行結果: #列表生成的時候自動加入了0-4這5個數;然后每個進程又把各自的n值加入列表 [0, 1, 2, 3, 4, 2] [0, 1, 2, 3, 4, 2, 3] [0, 1, 2, 3, 4, 2, 3, 4] [0, 1, 2, 3, 4, 2, 3, 4, 1] [0, 1, 2, 3, 4, 2, 3, 4, 1, 0] [0, 1, 2, 3, 4, 2, 3, 4, 1, 0, 5] [0, 1, 2, 3, 4, 2, 3, 4, 1, 0, 5, 6] [0, 1, 2, 3, 4, 2, 3, 4, 1, 0, 5, 6, 7] [0, 1, 2, 3, 4, 2, 3, 4, 1, 0, 5, 6, 7, 8] [0, 1, 2, 3, 4, 2, 3, 4, 1, 0, 5, 6, 7, 8, 9] #第十個進程把每個進程添加的n值都加入到列表 {0.25: None, 1: '1', '2': 2} #最后打印的字典 [0, 1, 2, 3, 4, 2, 3, 4, 1, 0, 5, 6, 7, 8, 9] #最后打印的列表 Process finished with exit code 0
from multiprocessing import Process, Manager import os def f(d, l): d[os.getpid()] = os.getpid() l.append(os.getpid()) print(l) if __name__ == '__main__': with Manager() as manager: d = manager.dict() #對字典做個調整,也將pid加入到字典中 l = manager.list(range(5)) p_list = [] for i in range(10): p = Process(target=f, args=(d, l)) p.start() p_list.append(p) for res in p_list: res.join() print(d) print(l) 執行結果: [0, 1, 2, 3, 4, 2240] [0, 1, 2, 3, 4, 2240, 10152] [0, 1, 2, 3, 4, 2240, 10152, 10408] [0, 1, 2, 3, 4, 2240, 10152, 10408, 6312] [0, 1, 2, 3, 4, 2240, 10152, 10408, 6312, 17156] [0, 1, 2, 3, 4, 2240, 10152, 10408, 6312, 17156, 6184] [0, 1, 2, 3, 4, 2240, 10152, 10408, 6312, 17156, 6184, 16168] [0, 1, 2, 3, 4, 2240, 10152, 10408, 6312, 17156, 6184, 16168, 11384] [0, 1, 2, 3, 4, 2240, 10152, 10408, 6312, 17156, 6184, 16168, 11384, 15976] [0, 1, 2, 3, 4, 2240, 10152, 10408, 6312, 17156, 6184, 16168, 11384, 15976, 16532] {2240: 2240, 10152: 10152, 10408: 10408, 6312: 6312, 17156: 17156, 6184: 6184, 16168: 16168, 11384: 11384, 15976: 15976, 16532: 16532} [0, 1, 2, 3, 4, 2240, 10152, 10408, 6312, 17156, 6184, 16168, 11384, 15976, 16532] ##現在我們看到可以實現進程間的數據共享、修改和傳遞。 ##Manager()自帶鎖,會控制進程之間同一時間修改數據; ##字典和列表的數據不是一份,而是因為10個進程,所以有10個字典和10個列表。每個進程修改后,都會copy給其他進程,其他進程可以對最新的數據進行修改,所以數據不會被修改亂。
進程同步
在進程里面也有鎖
from multiprocessing import Process, Lock #從multiprocessing導入Lock這個鎖 def f(l, i): l.acquire() #獲取修改數據的鎖 print('hello world', i) l.release() #釋放鎖 if __name__ == '__main__': lock = Lock() #實例鎖 for num in range(10): #生成10個進程 Process(target=f, args=(lock, num)).start() #執行子進程並傳入參數給子進程 執行結果: hello world 1 hello world 4 hello world 0 hello world 3 hello world 2 hello world 5 hello world 6 hello world 8 hello world 7 hello world 9 ## 可以看到一共10個進程,並不是連續的,說明執行進程的時候說不准先執行哪個進程。 '''
進程之間數據是獨立的,這里我們為什么又要加鎖呢,這是因為所有進程使用同一個屏幕來輸出數據;
比如 我們現在輸出的數據是 hello world x,在輸出的過程中很有可能其中一個進程還沒輸出完(比如只輸出了hello wo),另一個進程就執行輸出了(可能會在屏幕上看到hello wohello world0201的現象)。
所以需要通過鎖來控制同一時間只能有一個進程輸出數據到屏幕。
'''
進程池
執行多進程,子進程會從主進程復制一份完整數據,1個、10個進程可能還沒什么感覺,但是如果有100或1000,甚至更多個進程的時候開銷就會特別大,就會明顯感覺到多進程執行有卡頓現象。
進程池可以設定同一時間有多少個進程可以在CPU上運行。
from multiprocessing import Process, Pool #從multiprocessing導入pool import time,os def Foo(i): time.sleep(2) print("in process",os.getpid()) #打印進程id return i + 100 def Bar(arg): print('-->exec done:', arg) if __name__ == '__main__': ##這行代碼用途是如果主動執行該代碼的.py文件,則該代碼下面的代碼可以被執行;如果該.py模塊被導入到其他模塊中,從其他模塊執行該.py模塊,則該行下面的代碼不會被執行。 有些時候可以用這種方式用於測試,在該行代碼下面寫一些測試代碼。。 pool = Pool(5) #同時只能放入5個進程 for i in range(10): #創建10個進程,但是因為pool的限制,只有放入進程池中的5個進程才會被執行(),其他的被掛起了,如果進程池中其中有兩個進程執行完了,就會補進2個進程進去。 # pool.apply_async(func=Foo, args=(i,), callback=Bar) pool.apply(func=Foo, args=(i,)) #pool.apply用來將進程放入pool print('end') #執行完畢 pool.close() #允許pool中的進程關閉(close必須在join前面,可以理解close相當於一個開關吧) pool.join() # 進程池中進程執行完畢后再關閉,如果注釋,那么程序直接關閉。 執行結果: in process 13616 in process 10912 in process 12472 in process 15180 in process 12404 in process 13616 in process 10912 in process 12472 in process 15180 in process 12404 end ##可以看到通過串行的方式將結果打印出來,這是因為我們使用的是pool.apply。 pool.apply就是通過串行的方式來執行。
from multiprocessing import Process, Pool import time,os def Foo(i): time.sleep(2) print("in process",os.getpid()) return i + 100 def Bar(arg): print('-->exec done:', arg) if __name__ == '__main__': pool = Pool(5) for i in range(10): pool.apply_async(func=Foo, args=(i,)) ## 使用pool.apply_async就可以並行了 print('end') pool.close() # pool.join() 注釋掉 執行結果: end ## 只執行了print('end')代碼,其他進程的結果沒有看到,這是因為其他進程還沒有執行完成,主進程pool.close()就執行完了,close以后所有其他進程也不會在執行了。 ## 要想其他進程執行完成后在關閉,必須使用pool.join()
from multiprocessing import Process, Pool import time,os def Foo(i): time.sleep(2) print("in process",os.getpid()) return i + 100 def Bar(arg): print('-->exec done:', arg) if __name__ == '__main__': pool = Pool(5) for i in range(10): pool.apply_async(func=Foo, args=(i,)) print('end') pool.close() pool.join() 執行結果: end in process 14756 in process 14596 in process 10836 in process 12536 in process 12904 in process 14756 in process 14596 in process 10836 in process 12536 in process 12904 ##從執行結果來看,5個 5個的被打印出來。
回調
from multiprocessing import Process, Pool import time,os def Foo(i): time.sleep(2) print("in process",os.getpid()) return i + 100 def Bar(arg): print('-->exec done:', arg,os.getpid()) if __name__ == '__main__': pool = Pool(5) print ("主進程:",os.getpid()) #打印主進程id for i in range(10): pool.apply_async(func=Foo, args=(i,),callback=Bar) ##callback叫做回調,就是當執行完了func=Foo后,才會執行callback=Bar(每個進程執行完了后都會執行回調)。 ## 回調可以用於當執行完代碼后做一些后續操作,比如查看完命令后,通過回調進行備份;或者執行完什么動作后,做個日志等。 ## 備份、寫日志等在子進程中也可以執行,但是為什么要用回調呢! 這是因為如果用子進程,有10個子進程就得連接數據庫十次,而使用回調的話是用主進程連接數據庫,所以只連接一次就可以了,這樣寫能大大提高運行效率。 ##通過主進程建立數據庫的連接的話,因為在同一個進程中只能在數據庫建立一次連接,所以即使是多次被子進程回調,也不會重復建立連接的,因為數據庫會限制同一個進程最大連接數,這都是有數據庫設置的。 print('end') pool.close() pool.join() 執行結果: 主進程: 14340 #主進程是 14340 end in process 13936 -->exec done: 100 14340 #可以看出回調是通過主線程調用的 in process 15348 -->exec done: 101 14340 in process 10160 -->exec done: 102 14340 in process 11612 -->exec done: 103 14340 in process 14836 -->exec done: 104 14340 in process 13936 -->exec done: 105 14340 in process 15348 -->exec done: 106 14340 in process 10160 -->exec done: 107 14340 in process 11612 -->exec done: 108 14340 in process 14836 -->exec done: 109 14340
文章根據 代碼老兵 的分享博客,一點點搞出來的,多線程和進程讓我頭疼了三天,感謝大神們的分享的經驗,讓我少走彎路。