目錄:
一、queue 二、線程 基本使用 線程鎖 自定義線程池 生產者消費者模型(隊列) 三、進程 基本使用 進程鎖 進程數據共享 默認數據不共享 queues array Manager.dict 進程池 PS: IO密集型-多線程 計算密集型 - 多進程 四、協程 原理:利用一個線程,分解一個線程成為多個“微線程”==》程序級別 greenlet gevent pip3 install gevent
一、queue
1.1 queue用法
# 先進先出隊列
# put放數據,是否阻塞,阻塞時的超時事件
# get取數據(默認阻塞),是否阻塞,阻塞時的超時事件
# 隊列的最大長度:queue.Queue(2) 里面的數字
# qsize()真實個數
# maxsize 最大支持的個數
# join,task_done,阻塞進程,當隊列中任務執行完畢之后,不再阻塞
import queue q = queue.Queue(2) # q = queue.Queue()如果沒有參數的話,就是可以放無限多的數據。 print(q.empty()) # 返回隊列是否為空,空則為True,此處為True q.put(11) q.put(22) print(q.empty()) # 此處為False print(q.qsize()) # 返回隊列中現在有多少元素 # q.put(22) # q.put(33,block=False) # 如果隊列最大能放2個元素,這時候放了第三個,默認是阻塞的,block=False,如果就會報錯:queue.Full # q.put(33,block=True,timeout=2) # 設置為阻塞,如果timeout設置的時間之內,還沒有人來取,則就會報錯:queue.Full print(q.get()) print(q.get()) print(q.get(timeout=2)) # 隊列里的數據已經取完了,如果再取就會阻塞,這里timeout時間2秒,就是等待2秒,隊列里還沒有數據就報錯:queue.Empty
1.2 queue.join
# join:實際上意味着等到隊列為空,再執行別的操作,否則就一直阻塞,不是說get取完了,就不阻塞了,而是每次get之后,
# 要執行:task_done 告訴一聲已經取過了,等隊列為空,join才不阻塞。
下面的程序是阻塞的
q = queue.Queue(5) q.put(123) q.put(456) q.get() # q.task_done() q.get() # q.task_done() # 在完成一項工作之后,Queue.task_done()函數向任務已經完成的隊列發送一個信號 q.join()
下面的程序是不阻塞的:
q = queue.Queue(5) q.put(123) q.put(456) q.get() q.task_done() q.get() q.task_done() # 在完成一項工作之后,Queue.task_done()函數向任務已經完成的隊列發送一個信號 q.join()
1.3 其他隊列
import queue
# queue.Queue,先進先出隊列
# queue.LifoQueue,后進先出隊列
# queue.PriorityQueue,優先級隊列
# queue.deque,雙向對隊
# queue.Queue(2) 先進先出隊列
# put放數據,是否阻塞,阻塞時的超時事件
# get取數據(默認阻塞),是否阻塞,阻塞時的超時事件
# qsize()真實個數
# maxsize 最大支持的個數
# join,task_done,阻塞進程,當隊列中任務執行完畢之后,不再阻塞
import queue # q = queue.Queue(2) 先進先出隊列 # q = queue.LifoQueue() 后進先出隊列 # q = queue.PriorityQueue() 優先級隊列 # q = queue.deque() 雙向隊列 q = queue.LifoQueue() q.put(123) q.put(456) # 打印;456 print(q.get()) # 優先級最小的拿出來 # 如果優先級一樣,則是誰先放,就先取出誰 q = queue.PriorityQueue() q.put((1,'alex1')) q.put((1,'alex2')) q.put((1,'alex3')) q.put((3,'alex3')) # (1, 'alex1') print(q.get()) q = queue.deque() q.append(123) q.append(333) q.appendleft(456) # deque([456, 123, 333]) print(q) # 打印:456 print(q[0]) q.pop() # 從右邊刪除 # deque([456, 123]) print(q) q.popleft() # 從左邊刪除
python的隊列是在內存里創建的,python的進程退出了,則隊列也清空了。
二、生產者消費者模型(隊列)
1)生產者消費者模型的作用:
1、解決阻塞
2、就是解耦,修改生產者,不會影響消費者,反之亦然。
2)在生產環境,用生產者消費者模型,就可以解決:
1、處理瞬時並發的請求問題。瞬時的連接數就不會占滿。所以服務器就不會掛了。
2、客戶端提交一個請求,不用等待處理完畢,可以在頁面上做別的事情。
2.1)如果不用隊列存數據,服務端通過多線程來處理數據:
用戶往隊列存數據,服務器從隊列里取數據。
沒有隊列的話,就跟最大連接數有關系,每個服務器就有最大連接數。
客戶端要獲取服務器放回,服務器要查、修改數據庫或修改文件,要2分鍾,那客戶端就要掛起鏈接2分鍾,2萬個連接一半都要掛起,服務器就崩潰了。
如果沒有隊列,第一個用戶發來請求,連上服務器,占用連接,等待2分鍾。
第二個人來也要占用2分鍾。
web服務器
如果要處理並發,有10萬並發,如果:一台機器接收一個連接,需要10萬個機器,等待2分鍾就處理完了。
2.2)把請求放在隊列的好處
用戶發來請求,把請求放到隊列里,可以讓連接馬上斷開,不會阻塞,就不占用服務器的連接數了。如果看到訂單處理了沒,就要打開另外一個頁面,查看請求是否處理。
服務器查詢處理任務的時候,每個才花2分鍾,服務器耗時是沒有減少的。
但是這樣做,客戶端就不會持續的占用連接了。那瞬時的連接數就不會占滿。所以服務器就不會掛了。
但是后台要處理10萬個請求,也需要50台服務器。並不會減少服務器數量。
這樣就能處理瞬時並發的請求問題。
服務器只是處理請求,是修改數據庫的值,不是告訴客戶端。而是客戶端再發來請求,查詢數據庫已經修改的內容。
提交訂單之后,把這個訂單扔給隊列,程序返回“正在處理”,就不等待了,然后斷開這個連接,你可以在頁面里做別的事情,不用一直等待訂單處理完。這樣就不影響服務器的最大連接數。在頁面幫你發起一個alax請求,url,不停的請求(可能是定時器),我的訂單成功沒有,我的訂單成功沒有,如果訂單成功了,就自動返回頁面:訂單成功
如果不用隊列的話,一個請求就占用一個服務器,等待的人特別多,等待連接的個數太多了。服務器就掛掉了。
隊列就沒有最大個數限制,把請求發給隊列了,然后http鏈接就斷開了,就不用等待了。
12306買票的時候,下次再來請求的時候,就會告訴你,前面排了幾個人。
3)python queue的特點:
python的queue是內存級別的。rabbitmq可以把隊列發到別的服務器上處理。
所以python里的queue不能持久化,但是rabbitmq可以持久化。
queue.Queue()這樣寫,隊列就沒有最大個數限制。queue.Queue(5)就是說隊列里最多能放5個值
4)生產者消費者代碼示例:
import time,random import queue,threading q = queue.Queue() def Producer(name): count =0 while True: time.sleep(random.randrange(3)) if q.qsize()<3: # 只要盤子里小於3個包子,廚師就開始做包子 q.put(count) print("Producer %s has produced %s baozi.." %(name,count)) count += 1 def Consumer(name): count =0 while True: time.sleep(random.randrange(4)) if not q.empty(): # 只要盤子里有包子,顧客就要吃。 data = q.get() print(data) print('\033[32;1mConsumer %s has eat %s baozi...\033[0m' % (name,data)) else: # 盤子里沒有包子 print("---no baozi anymore----") count+=1 p1 = threading.Thread(target=Producer,args=('A',)) c1 = threading.Thread(target=Consumer,args=('B',)) c2 = threading.Thread(target=Consumer,args=('C',)) p1.start() c1.start() c2.start() ''' 當你設計復雜程序的時候,就可以用生產者消費者模型,來松耦合你的代碼,也可以減少阻塞。 '''
三、線程鎖
3.1 Lock,RLock
Lock只能鎖一次,RLock可以遞歸多層,Lock不支持多層鎖嵌套,我們一般用RLOCK
import threading import time NUM = 10 def func(lock): global NUM # 上鎖 lock.acquire() lock.acquire() NUM -= 1 time.sleep(2) print(NUM) # 開鎖 lock.release() lock.release() # Lock = threading.Lock() # 不支持嵌套鎖,一般不用 RLock = threading.RLock() # 一般用RLock,支持嵌套鎖。 for i in range(10): t = threading.Thread(target=func,args=(RLock,)) t.start() ''' 死鎖: 就是你也搶資源,我也搶資源,誰也搶不走就是死鎖。 如果是python,就是Lock,弄成嵌套鎖,不支持,則變成死鎖。 解決辦法: 用RLock,支持嵌套鎖 '''
3.2 信號量 BoundedSemaphore
如果用線程鎖,一次只允許一個進入,如果用信號量可以允許同時多少個一起進入。
每次5個線程同時執行,可能就會同時修改一個值。
import threading import time NUM = 10 def func(i,lock): global NUM # 上鎖 lock.acquire() # 總共30個 一次執行5個 25個,依次類推:20,15。。。 NUM -= 1 time.sleep(2) print('NUM:',str(NUM),'i:',i) # 開鎖 lock.release() # Lock = threading.Lock() # 不支持嵌套鎖,一般不用 # RLock = threading.RLock() # 一般用RLock,支持嵌套鎖。 lock = threading.BoundedSemaphore(5) # 參數是每次執行幾個線程 for i in range(30): t = threading.Thread(target=func,args=(i,lock,)) t.start()

''' 打印: NUM: 5 i: 2 NUM: 4 i: 0 NUM: 4 i: 4 NUM: 2 i: 3 NUM: 1 i: 1 NUM: 0 i: 6 NUM: 0 i: 5 NUM: -2 i: 7 NUM: -2 i: 8 NUM: -4 i: 9 NUM: -5 i: 10 NUM: -6 i: 11 NUM: -7 i: 12 NUM: -8 i: 13 NUM: -9 i: 14 NUM: -10 i: 15 NUM: -10 i: 16 NUM: -10 i: 18 NUM: -10 i: 17 NUM: -10 i: 19 '''
3.3 event紅綠燈
要么全部阻塞(紅燈),要么全部放開(綠燈)
import threading def func(i,e): print(i) # 10個線程並發打印:0-9 ,然后到wait的時候,就開始檢測是什么燈 e.wait() # 檢測是什么燈,如果是紅燈,停;綠燈,行 print(i+100) event = threading.Event() for i in range(10): t = threading.Thread(target=func,args=(i,event)) t.start() event.clear() # 設置成紅燈 inp = input('>>>') if inp == "1": event.set() #設置成綠燈

打印: 默認是紅燈 0 1 2 3 4 5 6 7 8 9 >>>1 # 輸入1,表示綠燈,就繼續執行 102 103 106 107 100 101 104 105 108 109
3.4 線程鎖條件-condition1
Lock,RLock:線程鎖使用場景:
Lock,RLock是多個用戶同時修改一份數據,可能會出現臟數據,數據就會亂,就加互斥鎖,一次只能讓一個人修改數據,就能解決。
condition,event,BoundedSemaphore 使用場景:
如果寫了個爬蟲,在建立數據庫連接,線程就等着,什么能數據庫能用了,就開通線程,再爬蟲。
event是kua一下,全走了。
notify維護一個隊列,傳幾個,就只能出去幾次。
import threading def func(i, con): print(i) con.acquire() con.wait() print(i + 100) con.release() c = threading.Condition() for i in range(10): t = threading.Thread(target=func, args=(i, c,)) t.start() while True: inp = input(">>>") if inp == 'q': break c.acquire() c.notify(int(inp)) c.release()

打印 0 1 2 3 4 5 6 7 8 9 >>>1 # 只讓1個線程運行 >>>100 2 # 再放出去2個線程 >>>102 101 3 # 再放出去3個線程 >>>103 104 105 4 # 再放出去4個線程,此時10個已經執行了 >>>108 109 106 107 5 # 再輸入5,又進入while循環 提示輸入:>>> >>>q # 輸入q就退出循環了。 Process finished with exit code 0
3.5 線程鎖條件-condition2
con.wait_for里傳一個函數名當參數,返回布爾值,是True,就執行下面的代碼。反之,就不執行。
無論是否返回True,都是用了一個線程。
import threading def condition(): ret = False r = input('>>>') if r == 'true': ret = True else: ret = False return ret def func(i,con): print(i) con.acquire() con.wait_for(condition) # 只能一個一個過 print(i+100) con.release() c = threading.Condition() for i in range(10): t = threading.Thread(target=func, args=(i,c,)) t.start()

''' 打印: 0 >>>1 2 3 4 5 6 7 8 9 s # 第0個線程, 雖然沒返回True,沒有答應101,但是還是使用了一個線程了。 >>>w # 第1個線程 >>>e # 第2個線程 >>>true # 第3個線程 103 >>>true 104 >>>true 105 >>>true 106 >>>true 107 >>>true 108 >>>true # 第10個線程 109 true # 線程執行完畢,一直等待,就一直阻塞 true w '''
3.6 線程鎖定時器
from threading import Timer def hello(): print("hello, world") t = Timer(1, hello) # 等1秒,執行hello t.start() # after 1 seconds, "hello, world" will be printed
四、自定義線程池
4.1 自定義線程池基礎版
import queue import threading import time class TheadPool: def __init__(self,maxsize = 5): self.maxsize = maxsize self._q = queue.Queue(maxsize) for i in range(maxsize): # 1、初始化的時候,先往隊列里放5個線程 self._q.put(threading.Thread) # 【threading.Thread, threading.Thread, threading.Thread, threading.Thread】 def get_thread(self): return self._q.get() def add_thread(self): self._q.put(threading.Thread) pool = TheadPool(5) def task(arg,p): # 2、線程並發執行,5個線程在瞬間(1秒鍾之內)從隊列里取出5個(執行get_thread()方法) # 5個線程在瞬間打印0-4,就是i print(arg) time.sleep(1) # 3、停了5秒 p.add_thread() # 4、5個線程執行:隊列添加線程(因為是5個線程執行,一個線程添加一個,隊列總共是5個線程) for i in range(20): # 5、然后這樣先取走5個,再put5個,然后打印i,就會出現:第一次打印:0-4,然后是:5-9,10-14,15-19 # threading.Thread類 t = pool.get_thread() obj = t(target=task,args=(i,pool,)) obj.start()
這個程序的問題:
線程沒有被重用,線程一下開到最大(浪費)

打印: 第一次打印:0-4,然后是:5-9,10-14,15-19 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
4.2 自定義線程池
4.2.1 自定義線程池思路
不要把隊列里放線程,而是放任務,開三個線程來從隊列里取任務,如果都取完了,就會阻塞
方法1:
設置超時時間
方法2:
往隊列尾部,加三個空值,如果取得是空值,則終止線程。
沒有空閑線程,並且已經創建的線程小於最大的線程數,這樣才會創建線程。
4.2.2 出現的問題
注意;之前的自定義線程池,如果定義queue的最多能放值的個數,pool = ThreadPool(5,5)
terminate就不好使了。
有的時候會一直阻塞,因為隊列里已經有5個了,再往里面put一個,就超出queue里最大的個數。
解決辦法是:
加上這一行
4.2.3自定義線程池代碼
import queue import threading import contextlib import time StopEvent = object() RUN = 0 # 定義線程池的三種狀態 CLOSE = 1 TERMINATE = 2 iNum=0 ''' 開啟最大個數為5個的隊列, ''' class ThreadPool(object): def __init__(self, max_num, max_task_num = None): if max_task_num: # 如果傳了最大隊列數,就設置,否則就是無限大。 self.q = queue.Queue(max_task_num) else: self.q = queue.Queue() self.max_num = max_num # 設置最大線程數 self.cancel = False # 假如已經執行close了,就不再執行任務,生成線程處理了 self.generate_list = [] # 已經生成的線程數列表 self.free_list = [] # 空閑的線程數列表 self._state = RUN def run(self, func, args, callback=None): """ 線程池執行一個任務 :param func: 任務函數 :param args: 任務函數所需參數 :param callback: 任務執行失敗或成功后執行的回調函數,回調函數有兩個參數1、任務函數執行狀態;2、任務函數返回值(默認為None,即:不執行回調函數) :return: 如果線程池已經終止,則返回True否則None """ if self.cancel: # 假如已經執行close了,就不再執行任務,生成線程處理了 return if len(self.free_list) == 0 and len(self.generate_list) < self.max_num: # 假如空閑的線程列表為空,並且已經生成的線程數小於最大線程數 self.generate_thread() # 創建線程 w = (func, args, callback,) # 把當前任務放入隊列,也就是run循環了300次,就有300個任務放入隊列 self.q.put(w) # 注意:隊列數是多少個,就要開啟幾個線程,因為當要關閉的線程池時, # 要把空對象加到隊列。線程判斷獲取到是空對象(此時已經把queue里的任務都取完了)就關閉線程。 global iNum iNum+=1 # print('qsize:',str(self.q.qsize())) def generate_thread(self): """ 創建一個線程 """ t = threading.Thread(target=self.call) # 執行call函數 t.start() def call(self): """ 循環去獲取任務函數並執行任務函數 """ current_thread = threading.currentThread # 獲取當前線程 self.generate_list.append(current_thread) # 把當前線程加入到已經生成線程列表 event = self.q.get() # 從隊列里取一個任務 while event != StopEvent: # 假如 這個任務不是空對象 func, arguments, callback = event # 傳進去的任務是個元組,由函數,參數,回調函數組成。 try: result = func(*arguments) # 執行任務,返回result success = True # 執行成功,返回狀態為True except Exception as e: success = False result = None else: if callback is not None: # 假如有回調函數 try: callback(success, result) # 把狀態和返回值傳給回調函數執行 except Exception as e: pass # 執行worker_state函數,空閑線程列表里是否加入個線程。在yield處執行with下的代碼 with self.worker_state(self.free_list, current_thread): if self._state == TERMINATE: # 假如線程池狀態是TERMINATE print(11111111111111111111111) event = StopEvent # 就把當前任務賦值為空對象,while循環不滿足,這樣就走else的內容 else: event = self.q.get() # 如果不是TERMINATE狀態,則把當前任務賦給event對象 else: # 如果while循環不滿足,或者while循環完了,沒有break,就執行else內容。 self.generate_list.remove(current_thread) # 隊列獲取到了空對象,就關閉線程(從列表中移除當前的線程) print(len(self.generate_list)) def close(self): # 先執行close(),再執行join() """ 執行完所有的任務后,所有線程停止 """ if self._state == RUN: self._state = CLOSE self.cancel = True full_size = len(self.generate_list) # 查看已經生成的線程數個數 while full_size: self.q.put(StopEvent) # 往隊列尾部加上一個空對象,由於隊列是先進先出的,所以空對象是最后獲取的,通過空對象就能關閉線程。 full_size -= 1 # 循環的次數為生成的線程的總個數 def terminate(self): # 直接執行terminate() """ 無論是否還有任務,終止線程 """ self._state = TERMINATE print("len:",str(len(self.generate_list))) while self.generate_list: # 假如線程列表不為空,就往隊列里加上空對象 print('q.qsize():',str(self.q.qsize())) self.q.get() self.q.put(StopEvent) # self.q = queue.Queue() print(self.q.empty()) # 查看隊列是否為空,相當於q.size==0 print('------------'+str(self.q.qsize())) def join(self): # CLOSE和join結合用 """Waits until all outstanding tasks have been completed.""" assert self._state in (CLOSE,) delay = 0.0005 if self._state==CLOSE: while self.q.qsize() > 0: delay = min(delay * 2, .05) @contextlib.contextmanager # 上下文管理器 def worker_state(self, state_list, worker_thread): # 傳入的是空閑線程列表和當前線程 """ 用於記錄線程中正在等待的線程數 """ state_list.append(worker_thread) # 把當前線程加到空閑線程里,yield前面的代碼相當於執行__enter__, try: yield # yield是執行with worker_state下的代碼, finally: # yield后面的代碼相當於執行__exit__ state_list.remove(worker_thread) # 執行完一個queue的所有任務了,就移除這個線程了。因為一個隊列對應着一個線程。 pool = ThreadPool(5,5) def callback(status, result): # status, execute action status # result, execute action return value pass def action(i): print(i) for i in range(200): ret = pool.run(action, (i,), callback) pool.terminate() # pool.close() # pool.join() print(1234234523452345234523452345234523452345234523455) # time.sleep(1) print(pool.q.qsize()) print(len(pool.generate_list), len(pool.free_list)) print('iNum:',iNum) # print(len(pool.generate_list), len(pool.free_list))
五、進程之間的數據共享
5.1 多進程
在windows里加main才能執行,如果在linux不加main可以執行。
在windows下,如果在程序里,不方便加main,只能放棄了。
from multiprocessing import Process def foo(i): print('say hi',i) if __name__ == '__main__': for i in range(10): p = Process(target=foo,args=(i,)) p.start()
5.2 daemon加join 主線程是否等子線程
主線程執行完,子線程是否終止掉?
5.2.1 默認是不等的
from multiprocessing import Process def foo(i): print('say hi',i) if __name__ == '__main__': for i in range(10): p = Process(target=foo,args=(i,)) # p.daemon = True p.start() # p.join() print(123456)

''' 打印: 123456 say hi 3 say hi 1 say hi 6 say hi 2 say hi 9 say hi 7 say hi 0 say hi 5 say hi 4 say hi 8 '''
5.2.2 daemon+join等子線程
from multiprocessing import Process def foo(i): print('say hi',i) if __name__ == '__main__': for i in range(10): p = Process(target=foo,args=(i,)) p.daemon = True p.start() p.join() print(123456)

''' 打印: say hi 0 say hi 1 say hi 2 say hi 3 say hi 4 say hi 5 say hi 6 say hi 7 say hi 8 say hi 9 123456 '''
5.3 進程默認數據不共享
5.3.1 線程中數據共享
這個例子是單線程,數據是共享的,無論單線程還是多線程都是共享的。
from multiprocessing import Process import multiprocessing def foo(i,li): li.append(i) print('say hi',i,li) if __name__ == '__main__': li = [] for i in range(10): # p = Process(target=foo,args=(i,li)) foo(i,li) # p.daemon = True # p.start() # p.join()

''' 正常的執行結果打印:最后是數組里有10個數 say hi 0 [0] say hi 1 [0, 1] say hi 2 [0, 1, 2] say hi 3 [0, 1, 2, 3] say hi 4 [0, 1, 2, 3, 4] say hi 5 [0, 1, 2, 3, 4, 5] say hi 6 [0, 1, 2, 3, 4, 5, 6] say hi 7 [0, 1, 2, 3, 4, 5, 6, 7] say hi 8 [0, 1, 2, 3, 4, 5, 6, 7, 8] say hi 9 [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] '''
5.3.2 進程默認數據不共享
#!/usr/bin/env python # -*- coding: utf-8 -*- __author__ = 'WangQiaomei' from multiprocessing import Process import multiprocessing def foo(i,li): li.append(i) print('say hi',i,li) if __name__ == '__main__': li = [] for i in range(10): p = Process(target=foo,args=(i,li)) # foo(i,li) # p.daemon = True p.start() # p.join()

''' 正常的執行結果打印:最后是數組里有10個數,但是多進程最后數組里只有9 say hi 0 [0] say hi 1 [1] say hi 2 [2] say hi 3 [3] say hi 4 [4] say hi 5 [5] say hi 6 [6] say hi 7 [7] say hi 8 [8] say hi 9 [9] '''
5.4 queues實現:進程之間數據共享
from multiprocessing import Process from multiprocessing import queues import multiprocessing def foo(i,arg): arg.put(i) print('say hi',i,arg.qsize()) if __name__ == '__main__': # li =[] li = queues.Queue(20,ctx=multiprocessing) for i in range(10): p = Process(target=foo,args=(i,li,)) # p.daemon = True p.start() # p.join()

''' 打印: say hi 1 1 say hi 5 3 say hi 0 5 say hi 3 6 say hi 7 7 say hi 6 7 say hi 2 7 say hi 9 9 say hi 4 9 say hi 8 10 '''
5.5數組和列表的區別:
數組和列表的特點比較:
1、數組類型一定:
數組只要定義好了,類型必須是一致的
python里列表里,可以放字符串也可以放數字。
2、數組個數一定:
創建數組的時候,就要指定數組多大,比如數組是10,再添加11個,就會報錯
列表是動態的,個數不一定。
數組和列表的相鄰元素的內存位置比較:
python的列表是基於c來實現。
python的列表相鄰的兩個元素在內存里,不一定挨着。是用鏈表實現的。
因為個數不限制,開始是10個長度,所以可能第11個被占用了。
每個元素,記錄上一個和下一個的位置在哪里。可以找到位置。
字符串和int類型的占用內存的位置大小肯定不一樣。所以數組,不只是長度一樣,類型也要一樣。
對於數組的話,相鄰的元素是挨着的。
數組是int類型,並且長度是確定的。所以是相鄰的。
數組的內存地址是連續的,列表不是,就是鏈表,鏈表是每個元素記錄上一個位置和下一個位置在哪里。
如圖,內存中:數組是黑框位置,列表是分散的分布:
5.6 數組實現:進程之間數據共享
from multiprocessing import Process from multiprocessing import queues import multiprocessing from multiprocessing import Array def foo(i,arg): # arg.put(i) # print('say hi',i,arg.qszie()) arg[i] = i + 100 for item in arg: print(item) print("====================") if __name__ == '__main__': # li =[] # li = queues.Queue(20,ctx=multiprocessing) li = Array('i',5) for i in range(5): p = Process(target=foo,args=(i,li,)) # p.daemon = True p.start() # p.join()

''' 打印: 0 0 102 0 0 ==================== 0 101 102 0 0 ==================== 0 101 102 103 0 ==================== 100 101 102 103 0 ==================== 100 101 102 103 104 ==================== '''
注意:Array的參數,寫了i,就只能放數字:
5.7 dict實現:進程之間數據共享
用Manager()對象創建一個特殊的字典。
For循環創建了多個進程,每個進程都可以利用dict。
dict.values()就是獲取它所有的值,
如果字典獲取的值是遞增的,說明數據是共享的。
5.7.1 AttributeError:'ForkAwareLocal' object has no attribute 'connection'
如果把join注釋就會報錯:
報錯:
conn = self._tls.connection
AttributeError: 'ForkAwareLocal' object has no attribute 'connection'
from multiprocessing import Process import multiprocessing from multiprocessing import Manager def foo(i,arg): arg[i] = i + 100 print(arg.values()) if __name__ == '__main__': obj = Manager() li = obj.dict() for i in range(10): p = Process(target=foo,args=(i,li,)) p.start() # p.join() # 方式1 # 方式2 # import time # time.sleep(10)
原因是:
li = obj.dict()是在主進程創建的
for循環里創建的是子進程,子進程是修改主進程:arg[i] = i + 100(arg就是li)
主進程和子進程都在執行,主進程里有個字典,子進程要修改這個字典。
進程和進程之間要通信的話,需要創建連接的。相當於兩邊都寫上一個socket,進程之間通過連接進行操作。
主進程執行到底部,說明執行完了,會把它里面的連接斷開了。
主進程把連接斷開了,子進程就連接不上主進程。
如果在底部寫停10秒,主進程就停止下來,並沒有執行完。主進程沒有執行完,連接還沒有斷開,那子進程就可以連接它了。
5.7.2 解決方法1:停10秒(不建議)
from multiprocessing import Process import multiprocessing from multiprocessing import Manager def foo(i,arg): arg[i] = i + 100 print(arg.values()) if __name__ == '__main__': obj = Manager() li = obj.dict() for i in range(10): p = Process(target=foo,args=(i,li,)) p.start() # p.join() # 方式1 # 方式2 import time time.sleep(10)

打印: [102] [102, 105] [102, 103, 105] [102, 103, 105, 107] [109, 102, 103, 105, 107] [101, 102, 103, 105, 107, 109] [101, 102, 103, 105, 106, 107, 109] [101, 102, 103, 105, 106, 107, 108, 109] [100, 101, 102, 103, 105, 106, 107, 108, 109] [100, 101, 102, 103, 104, 105, 106, 107, 108, 109]
5.7.3測試:
如果把sleep(10)改成sleep(0.1),那么只會打印前面幾行,然后又報之前的錯誤
打印:
[102]
[102, 105]
[102, 103, 105]
[102, 103, 105, 107]
接着報錯:
conn = self._tls.connection
AttributeError: 'ForkAwareLocal' object has no attribute 'connection'
5.7.4 解決辦法2:用join
p.join()
所以使用多進程的常規方法是,先調用start啟動進程,再調用join要求主進程等待當前子進程的結束。
join是用來阻塞當前線程的,每次循環:p.start()之后,p就提示主線程,需要等待p結束才向下執行,那主線程就乖乖的等着啦。
from multiprocessing import Process import multiprocessing from multiprocessing import Manager def foo(i,arg): arg[i] = i + 100 print(arg.values()) if __name__ == '__main__': obj = Manager() li = obj.dict() for i in range(10): p = Process(target=foo,args=(i,li,)) p.start() p.join() # 方式1: # 方式2 # import time # time.sleep(10)

打印: [102] [102, 105] [102, 103, 105] [102, 103, 105, 107] [109, 102, 103, 105, 107] [101, 102, 103, 105, 107, 109] [101, 102, 103, 105, 106, 107, 109] [101, 102, 103, 105, 106, 107, 108, 109] [100, 101, 102, 103, 105, 106, 107, 108, 109] [100, 101, 102, 103, 104, 105, 106, 107, 108, 109]
5.7.5 總結:進程之間共享的方式:
queues,數組和字典的方式
dict對類型沒有限制,跟使用字典是一模一樣的。用數組則限制了數據類型。
進程和進程之間要通信,是要連接的。
主進程執行到底部了,就執行完了,就把連接斷開了。子進程就連不上主進程了。
六、進程
6.1 進程鎖
沒有鎖,多個進程就會一起修改數據:

from multiprocessing import Process from multiprocessing import queues from multiprocessing import Array from multiprocessing import RLock, Lock, Event, Condition, Semaphore import multiprocessing import time def foo(i,lis): lis[0] = lis[0] - 1 # 因為停1秒,在1秒之內,10個進程都已經修改了數據。 time.sleep(1) # 停1秒前全都修改完 print('say hi',lis[0]) # 打印的全是0 if __name__ == "__main__": # li = [] li = Array('i', 1) li[0] = 10 for i in range(10): p = Process(target=foo,args=(i,li)) p.start()

''' 打印: say hi 0 say hi 0 say hi 0 say hi 0 say hi 0 say hi 0 say hi 0 say hi 0 say hi 0 say hi 0 '''
加把鎖就把進程鎖住了,同一時間只有一個進程可以運行,其他都等着。
RLock, Lock, Event, Condition, Semaphore # 這些方法跟線程的使用方法是一樣的
from multiprocessing import Process from multiprocessing import queues from multiprocessing import Array from multiprocessing import RLock, Lock, Event, Condition, Semaphore # 這些方法跟線程的使用方法是一樣的 import multiprocessing import time def foo(i,lis,lc): lc.acquire() # 加鎖 lis[0] = lis[0] - 1 time.sleep(1) print('say hi',lis[0]) lc.release() # 釋放鎖 if __name__ == "__main__": # li = [] li = Array('i', 1) li[0] = 10 lock = RLock() for i in range(10): p = Process(target=foo,args=(i,li,lock)) p.start()

''' 打印: say hi 9 say hi 8 say hi 7 say hi 6 say hi 5 say hi 4 say hi 3 say hi 2 say hi 1 say hi 0 '''
6.2 進程池
6.2.1 進程池串行-apply
apply從進程池里取進程,然后一個一個執行,第一個進程執行完,第二個進程才執行,進程之間是串行的操作。這樣就不是並發操作,沒有太大意義。
from multiprocessing import Pool def f1(arg): print(arg) if __name__ == "__main__": pool = Pool(5) for i in range(10): pool.apply(func=f1,args=(i,)) # apply執行函數,傳入參數 print('end')

''' 打印: 0 1 2 3 4 5 6 7 8 9 end '''
6.2.1 進程池異步-apply_rsync
from multiprocessing import Pool def f1(arg): print(arg) if __name__ == "__main__": pool = Pool(5) for i in range(10): # pool.apply(func=f1,args=(i,)) # apply執行函數,傳入參數 pool.apply_async(func=f1,args=(i,)) print('end')
''' 打印: end '''
這10個任務kua一下全執行了,主進程執行到end了。
主進程執行完了,子進程就被終止掉了。
主進程執行完了,就不再等子線程了,如果要等就要設置參數。
多線程線程默認也是,主進程不等子進程,多線程是:daemon=True加join來讓他等。
6.3 主線程等子線程
6.3.1 close等子線程全部執行完
join是終止進程,必須要前面執行close或者terminate方法。
執行close,等所有任務(10個)全部執行完,再終止
執行terminate,表示立即終止,不管你當前的任務是否執行完,都立即終止。
from multiprocessing import Pool import time def f1(arg): time.sleep(1) # 加這句是為了看出5個5個執行的效果。 print(arg) if __name__ == "__main__": pool = Pool(5) for i in range(10): # pool.apply(func=f1,args=(i,)) # apply執行函數,傳入參數 pool.apply_async(func=f1,args=(i,)) pool.close() pool.join() # join表示:主進程執行到這里的時候,夯住了,等子進程結束的時候,再往下執行。 print('end')
光執行join,會觸發下面的斷言錯誤:
assert self._state in (CLOSE, TERMINATE)
join源代碼有這句,只有符合這個條件的,才不會報錯。
這個條件就是:執行join之前,必須執行close或者terminate方法。
close+join:是等子線程全部執行完了,才繼續往下執行。
這是5個5個執行。因為是5個線程同時執行,總共要完成10個任務。

打印: 0 1 2 3 4 5 6 7 8 9 end
6.3.2 terminate立即終止
from multiprocessing import Pool import time def f1(arg): time.sleep(1) print(arg) if __name__ == "__main__": pool = Pool(5) for i in range(10): # pool.apply(func=f1,args=(i,)) # apply執行函數,傳入參數 pool.apply_async(func=f1,args=(i,)) time.sleep(1.5) pool.terminate() # 立即終止 pool.join() print('end')
光執行join,會觸發下面的斷言錯誤:
assert self._state in (CLOSE, TERMINATE)
join源代碼有這句,只有符合這個條件的,才不會報錯。
這個條件就是:執行join之前,必須執行close或者terminate方法。
terminate+join:是表示立即終止,不管你當前的任務是否執行完,都立即終止。

''' 打印: 0 1 2 3 4 end '''
七、協程
7.1 協程及gevent原理
IO密集型:用多線程+gevent(更好),多線程
計算密集型:用多進程
協程原理:利用一個線程,分解一個線程成為多個“微線程”==>程序級別
如果寫爬蟲,就訪問別的網站,拿別人源碼。http請求叫IO請求,用多線程。
假設要訪問3個url,創建3個線程,都在等待着,第一個有數據返回就繼續執行,以此類推。
在等待過程中,就什么事也沒干。
協程的方式。
計算機幫你創建進程、線程。線程是人為創建出來的。用一個線程,一會兒執行這個操作,一會兒執行那個操作。
協程是只用一個線程。程序員利用io多路復用的方式,讓協程:
先訪問一個url,不等待返回,就再訪問第二個url,訪問第三個url,然后也在等待。
greenlet本質是實現協程的。
注意:協程本身不高效,協程的本質只是程序員調用的,那為啥gevent這么高效率呢,是因為用了協程(greenlet)+IO多路復用的方式。
是IO多路復用的用法才能高效。所以用的時候就用gevent就好了。
用多線程:假設每爬一個網址需要2秒,3個url,就是3個請求,等待2秒,就可以繼續往下走。
如果用gevent,用單線程,單線程應該從上到下執行,用for循環讀取3個url,往地址發送url請求,就是IO請求,線程是不等待的。
for循環再拿第二個url,再發第三個url。在這過程中,誰先回來,就處理誰。
資源占用上,多線程占用了3個線程,2秒鍾,多線程啥也沒干,在等待。gevent在2秒鍾,只要發送請求了,接着就想干什么干什么。
7.2 greenlet協程
greenlet切換協程:
from greenlet import greenlet def test1(): print(12) gr2.switch() print(34) gr2.switch() def test2(): print(56) gr1.switch() print(78) gr1 = greenlet(test1) gr2 = greenlet(test2) gr1.switch()

''' 打印: 12 56 34 78 '''
7.3 gevent
greenlet切換協程:
import gevent def foo(): print('Running in foo') # 第1步 gevent.sleep(0) print('Explicit context switch to foo again') # 第3步 def bar(): print('Explicit context to bar') # 第2步 gevent.sleep(0) print('Implicit context switch back to bar') # 第4步 gevent.joinall([ gevent.spawn(foo), gevent.spawn(bar), ])

''' 打印: Running in foo Explicit context to bar Explicit context switch to foo again Implicit context switch back to bar '''
7.3 gevent 切換執行
greenlet切換執行協程的本質是執行如下代碼:
import gevent def foo(): print('Running in foo') # 第1步 gevent.sleep(0) print('Explicit context switch to foo again') # 第3步 def bar(): print('Explicit context to bar') # 第2步 gevent.sleep(0) print('Implicit context switch back to bar') # 第4步 gevent.joinall([ gevent.spawn(foo), gevent.spawn(bar), ])

''' 打印: Running in foo Explicit context to bar Explicit context switch to foo again Implicit context switch back to bar '''
但是平常我們用gevent,不用這么麻煩,而是使用下面的代碼就好了。
7.4 gevent使用方法
遇到IO操作自動切換:
from gevent import monkey; monkey.patch_all() import gevent import requests # 這個函數是發http請求的 def f(url): print('GET: %s' % url) resp = requests.get(url) data = resp.text # 獲取內容 print('%d bytes received from %s.' % (len(data), url)) gevent.joinall([ gevent.spawn(f, 'https://www.python.org/'), # 創建了一個協程 gevent.spawn(f, 'https://www.yahoo.com/'), # 創建了一個協程 gevent.spawn(f, 'https://github.com/'), # 創建了一個協程 ])
創建了三個協程。總共就一個線程,通過for循環發送三個url請求。然后等待結果,誰先回來,就處理誰。
通過requests.get(url)發送url請求,誰先回來,就拿到數據(data),拿到數據就可以處理數據了。
這都是在一個線程里執行的。

瞬間打印: GET: https://www.python.org/ GET: https://www.yahoo.com/ GET: https://github.com/ 然后等待哪個url先返回,就先打印 47394 bytes received from https://www.python.org/. 25528 bytes received from https://github.com/. 450811 bytes received from https://www.yahoo.com/. '''
gevent的使用場景舉例:
1、scrapy框架內部用的gevent。發請求性能比線程高很多。
2、做api(url)監控,把代碼發布到哪個url,得自動檢測下返回值是不是200,或是指定的狀態碼。
發布完成之后,就要發送http請求過去檢測一下返回的狀態碼。如果有20個url請求,就用gevent一下全給發了,就沒必要創建多個線程,一個線程就足以了,然后配合多進程+gevent,又可以利用多顆cpu的優勢了。
monkey.patch_all()是什么?
發送http請求,是request本質上調用socket來發。原來執行http請求,就會通知我一下,執行完了,默認socket是沒有這個功能的。這相當於把原來的socket修改了,修改成特殊功能的socket,發送請求如果完事了,會告訴你完事了。
其實內部就是把io請求做了個封裝而已。