多進程
1 import multiprocessing 2 import time 3 4 5 def demo1(): 6 while True: 7 print("demo1") 8 time.sleep(1) 9 10 11 def demo2(): 12 while True: 13 print("demo2") 14 time.sleep(1) 15 16 17 def main(): 18 p1 = multiprocessing.Process(target=demo1) 19 p2 = multiprocessing.Process(target=demo2) 20 p1.start() 21 p2.start() 22 23 24 if __name__ == '__main__': 25 main()
互斥鎖
1 import threading 2 import time 3 4 g_num = 0 5 6 7 def demo1(num): 8 global g_num 9 # 上鎖,如果之前沒有被上鎖,則上鎖成功 10 # 如果上鎖之前,已經被鎖上,那么會堵塞在這里,直到這個鎖被解開為止 11 mutex.acquire() 12 for i in range(num): 13 g_num += 1 14 # 解鎖 15 mutex.release() 16 print("%s" % g_num) 17 18 19 def demo2(num): 20 global g_num 21 mutex.acquire() 22 for i in range(num): 23 g_num += 1 24 # 解鎖 25 mutex.release() 26 print("%s" % g_num) 27 28 29 # 創建一個互斥鎖,默認沒有上鎖 30 mutex = threading.Lock() 31 32 33 def main(): 34 t1 = threading.Thread(target=demo1, args=(1000000,)) 35 t2 = threading.Thread(target=demo2, args=(1000000,)) 36 37 t1.start() 38 t2.start() 39 40 time.sleep(3) 41 print("%s" % g_num) 42 43 44 if __name__ == '__main__': 45 main()
隊列
1 import multiprocessing 2 3 4 def download_from_web(q): 5 # 模擬網上下載的數據 6 data = [1, 2, 3, 4, 5, 6] 7 for temp in data: 8 q.put(temp) 9 10 11 def analysis_data(q): 12 # 數據處理 13 waiting = list() 14 while True: 15 data = q.get() 16 waiting.append(data) 17 if q.empty(): 18 break 19 20 print(waiting) 21 22 23 def main(): 24 # 創建隊列 25 q = multiprocessing.Queue() 26 p1 = multiprocessing.Process(target=download_from_web, args=(q,)) 27 p2 = multiprocessing.Process(target=analysis_data, args=(q,)) 28 29 p1.start() 30 p2.start() 31 32 33 if __name__ == '__main__': 34 main()
進程池
1 from multiprocessing import Pool 2 import time 3 import random 4 import os 5 6 7 def worker(msg): 8 t_start = time.time() 9 print("%s 開始執行,進程號是 %d" % (msg, os.getpid())) 10 time.sleep(random.random() * 2) 11 t_stop = time.time() 12 print(msg, "執行完畢,耗時%.2f秒" % (t_stop - t_start)) 13 14 15 if __name__ == '__main__': 16 # 定義1個進程池,最大進程數為3 17 po = Pool(3) 18 for i in range(8): 19 # 調用目標,傳遞給目標的參數元組 20 po.apply_async(worker, (i,)) 21 print("-- start --") 22 # 關閉進程池,不再接收新的請求 23 po.close() 24 # 等待po中所有子進程執行完成,必須放在close語句后面 25 po.join() 26 print("-- end --")
協程
1 from gevent import monkey 2 import gevent 3 import time 4 5 monkey.patch_all() 6 7 8 def f1(n): 9 for i in range(n): 10 print(gevent.getcurrent(), i) 11 time.sleep(0.5) 12 13 14 def f2(n): 15 for i in range(n): 16 print(gevent.getcurrent(), i) 17 time.sleep(0.5) 18 19 20 def f3(n): 21 for i in range(n): 22 print(gevent.getcurrent(), i) 23 time.sleep(0.5) 24 25 26 if __name__ == '__main__': 27 gevent.joinall([ 28 gevent.spawn(f1, 3), 29 gevent.spawn(f2, 3), 30 gevent.spawn(f3, 3) 31 ])
使用協程下載圖片
1 import gevent 2 import time 3 from urllib import request 4 from gevent import monkey 5 from numba import jit 6 7 monkey.patch_all() 8 9 10 @jit 11 def cbk(a, b, c): 12 """ 回調函數 13 @a:已經下載的數據塊 14 @b:數據塊的大小 15 @c:遠程文件的大小 16 """ 17 per = 100.0 * a * b / c 18 if per > 100: 19 per = 100 20 print('\r下載已經完成 %.2f%%' % per, end="") 21 22 23 imgurl = ["https://ww3.sinaimg.cn/mw600/006XNEY7gy1fwj4w6ttb6j30hs0k4tc9.jpg", 24 "https://ww3.sinaimg.cn/mw600/0073tLPGgy1fx6pxmpwmuj30qo0zkn2x.jpg"] 25 26 headers = [ 27 ("User-agent", 28 "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/68.0.3440.106 Safari/537.36")] 29 30 31 @jit 32 def downloader(jpg_name, url): 33 opener = request.build_opener() 34 opener.addheaders = headers 35 request.install_opener(opener) 36 request.urlretrieve(url, jpg_name, cbk) 37 time.sleep(0.1) 38 39 40 @jit 41 def main(): 42 time1 = time.time() 43 gevent.joinall([ 44 gevent.spawn(downloader, "1.jpg", imgurl[0]), 45 gevent.spawn(downloader, "2.jpg", imgurl[1]), 46 ]) 47 time2 = time.time() 48 print(time2 - time1) 49 50 51 if __name__ == '__main__': 52 main()