1、線程理論知識
概念:指的是一條流水線的工作過程的總稱,是一個抽象的概念,是CPU基本執行單位。
進程和線程之間的區別:
1. 進程僅僅是一個資源單位,其中包含程序運行所需的資源,而線程就相當於車間的流水線,負責執行具代碼。
2. 每個進程至少包含一個線程,由操作系統自動創建,稱之為主線程
3. 每個進程可以有任意數量的線程
4.創建進程的開銷要比創建進程小得多
5. 同一進程的線程間數據是共享的
6.線程之間是平等的,沒有子父級關系,同一進程下的各線程的PID相同
7. 創建線程的代碼可以寫在任意位置,不一定非要在main函數下。
為什么使用線程:
提高程序執行效率
2、開啟線程的兩種方式
和進程類似,但是開啟方式不一定非要建在main函數下。

# 第一種方式,實例化 Thread # from threading import Thread # # def task(): # print("subthread is running....") # # t = Thread(target=task) # t.start() # print('main is over....') # 第二種方式,繼承Thread類 from threading import Thread class MyThread(Thread): def run(self): print("subthread is running....")
3、主線程和子線程之間的關系
1. 主線程任務執行完畢后,主線程會等待所有子線程全部執行完畢后結束
2. 在同一進程中,所有線程都是平等的,沒有子父級關系

# 驗證主線程代碼執行完后會不會立即結束, import random import time import threading from threading import Thread def task(name): print("%s is running..." % name) time.sleep(random.randint(1, 3)) print(threading.enumerate()) print("%s is over....." % name) t = Thread(target=task, args=('aaa',)) t.start() print('main over....')
4、驗證線程和進程之間的區別

from threading import Thread import time def task(): global num time.sleep(1) num -= 1 num = 10 t = Thread(target=task,) t.start() t.join() print(num)

from multiprocessing import Process from threading import Thread import time def task(): pass def expense(cls): """用來測試線程或進程創建開銷""" lis = [] start = time.time() for i in range(50): p = cls(target=task,) p.start() lis.append(p) for p in lis: p.join() return time.time()-start
5、線程的安全問題
1.互斥鎖
數據共享必然會造成競爭,競爭就會造成數據錯亂問題。
解決辦法:和進程一樣,加互斥鎖。

from threading import Thread, Lock import time num = 10 def task(lock): global num lock.acquire() a = num time.sleep(0.5) num = a-1 lock.release() ts = [] lock = Lock() for i in range(10): t = Thread(target=task,args=(lock,)) t.start() ts.append(t) for t in ts: t.join() print(num)
2.死鎖
死鎖不是一種鎖,而是一種鎖的狀態,
一般出現死鎖的情況有兩種:
1. 對同一把鎖多次acquire.(使用RLOCK鎖,代替LOCK)
2. 兩個或兩個以上的進程或線程在執行過程中,因爭奪資源造成的相互等待現象。(解決辦法:能不加最好不加,要加就只加一把)

from threading import Thread, Lock import time def task1(name, locka, lockb): locka.acquire() print("%s拿到a鎖"%name) time.sleep(0.3) lockb.acquire() print('%s拿到b鎖'%name) lockb.release() locka.release() def task2(name, locka, lockb): lockb.acquire() print("%s拿到b鎖"%name) time.sleep(0.3) locka.acquire() print('%s拿到a鎖'%name) locka.release() lockb.release() locka = Lock() lockb = Lock() t1 = Thread(target=task1, args=('t1', locka, lockb)) t2 = Thread(target=task2, args=('t2', locka, lockb)) t1.start() t2.start()
3.可重入鎖
只能解決同一線程多次執行acquire情況。
只有一個線程所有的acquire都被釋放,其他線程才能拿到這個鎖。
也會發生死鎖現象。

from threading import Thread, RLock lock = RLock() lock.acquire() lock.acquire() lock.acquire() lock.acquire() print("over") lock = RLock() def task1(): lock.acquire() print('task1') def task2(): lock.acquire() print('task2') Thread(target=task1).start() Thread(target=task2).start()
4. 信號量
也是一種鎖,用來控制同一時間,有多少線程可以提供並發訪問,不是用來處理線程安全問題

from threading import Semaphore, Thread import time s_lock = Semaphore(3) def task(): s_lock.acquire() time.sleep(1) print("run.....") s_lock.release() for i in range(20): t = Thread(target=task) t.start()
6、守護線程
守護線程在所有非守護線程結束后結束。

import threading from threading import Thread import time def task1(): print('thread-1 is running...') time.sleep(3) print('thread-1 over....') def task2(): print('thread-2 is running...') time.sleep(1) print('thread-2 over....') if __name__ == '__main__': t1 = Thread(target=task1,) t2 = Thread(target=task2,) t1.setDaemon(True) t1.start() t2.start() print(t1.ident) print(threading.enumerate()) print("main over...")
7、GIL
全局解釋器鎖,是一互斥鎖,只有在Cpython解釋器存在。
為什么需要:因為一個python.exe進行運行只有一份解釋器,如果這個進程開啟的多個線程都要執行代碼,多線程之間就要競爭解釋器,一旦競爭就有可能出現問題。
帶來的好處:保證了多線程同時訪問解釋器時的數據安全問題。
帶來的問題:同一時間只有一個線程訪問解釋器,使得多線程無法真正的並發
出現的原因:默認情況下,一個進程只有一個線程不會是不會出問題,但不要忘了還有GC線程,一旦出現多個線程就可能出現問題,所以當初就簡單粗暴的加上了GIL鎖
GIL加鎖和解鎖時機:
加鎖:在調用解釋器時立即加鎖
解鎖:當前線程遇到IO時釋放,或者當前線程執行超過設定值釋放(py2計算的是執行代碼的行數,py3中計算的是時間)
解決辦法:使用多進程或使用其他的python解釋器
8、線程池和進程池
一種容器,本質十一存儲線程或進程的列表
為什么使用? 因為服務器不能無限開啟線程或進程,所以需要對線程數量加以控制,線程池就是幫我們完成線程/進程的創建、銷毀以及任務分配
特點:
線程池在創建時不會開啟線程,
等到任務提交時,如果沒有空閑線程,並且已存在的線程數量小於最大值,開啟新線程,
線程開啟后不會關閉,直到進程全部結束為止
(線程池的建立也要建在main函數下)

from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor pool= ProcessPoolExecutor(maxsize),創建進程池,maxsize為最大進程個數 res = pool.submit(task, 'a'), 提交任務 res.result(timeout),接收調用的返回值,timeout為超時時間,超時報錯 該函數是阻塞函數,會一直等待任務執行完畢 pool.shutdown(wait),所有任務執行完畢,阻塞函數 wait=True, 等待池內所有任務執行完畢后回收資源才繼續 wait=False,立即返回,並不會等待池內的任務執行完畢

from concurrent.futures import ThreadPoolExecutor import time def task(num): time.sleep(0.5) print("%s is running....."%num) return num**2 pool = ThreadPoolExecutor() ress = [] for i in range(10): res = pool.submit(task, i) ress.append(res) pool.shutdown(wait=False) for i in ress: print(i.result()) print('over')
9、同步異步阻塞非阻塞
阻塞和非阻塞都是指程序的運行狀態
阻塞:當程序執行遇到IO操作,無法繼續執行代碼
非阻塞:程序執行沒有遇到IO操作,或通過某種方式,使程序遇到了也不會停在原地,還可以繼續執行
同步異步指的是提交任務的方式
同步:發起任務后必須原地等待任務執行完成,才可以繼續執行
異步:發起任務后不用等待任務執行,可以立即執行其他操作
異步效率高於同步,發起異步任務方式:就是多線程和多進程
同步和阻塞的不同:阻塞一定使CPU已經切換,同步雖然在等待,但CPU沒有切走,還在當前進程中執行其他任務
10、異步回調
其實說的是回調函數,給異步任務綁定一個函數,當任務完成時會自動調用該函數。
優點:不用原地等待,任務結果立即獲取
線程池或進程池內內的調用回調函數方法add_done_back(), 且回調函數必須有且只有一個參數,就是調用對象本身。
線程池的回調函數是在子線程內執行,
進程池的回調函數是在主進程下執行

import requests from concurrent.futures import ThreadPoolExecutor import threading def product_data(url): data = requests.get(url) return data.text,url def parser_data(f): res = f.result() print(len(res[0]), res[1], "當前線程", threading.current_thread()) if __name__ == '__main__': urls = ['http://www.baidu.com','https://www.cnblogs.com/ywsun/', 'https://www.processon.com/'] pool = ThreadPoolExecutor() for url in urls: f = pool.submit(product_data, url) f.add_done_callback(parser_data)

import requests from concurrent.futures import ProcessPoolExecutor import os def product_data(url): data = requests.get(url) return data.text,url def parser_data(f): res = f.result() print(len(res[0]), res[1], ", callback pid", os.getpid() ) if __name__ == '__main__': urls = ['http://www.baidu.com','https://www.cnblogs.com/ywsun/', 'https://www.processon.com/'] pool = ProcessPoolExecutor() print('main process', os.getpid()) for url in urls: f = pool.submit(product_data, url) f.add_done_callback(parser_data)
11、線程隊列
queue 該模塊下提供了一些常見的數據容器,僅僅是容器,沒有數據共享特點
Queue,先進先出
LifoQueue,后進先出
PriorityQueue,可設置優先級的隊列。插入元組,第一個元素是優先級,可是數字、字母,對應的數值越小優先級越高

import queue q=queue.PriorityQueue() #put進入一個元組,元組的第一個元素是優先級(通常是數字,也可以是非數字之間的比較),數字越小優先級越高 q.put((2,'a')) q.put((1,'b')) q.put((3,'c')) print(q.get()) print(q.get()) print(q.get()) # 如果是字符,按照ASCII表來排序 q.put(('a', "sfsja")) q.put(('b', "sdfsdf")) q.put(('A', "sdfsdf")) q.put(('ae', "sdfsdf")) q.put(('ab', "sdfsdf")) print(q.get()) print(q.get()) print(q.get()) print(q.get()) print(q.get())
12、事件
用於線程間通訊,線程間本就是數據共享,也就是即使沒有事件,也沒有問題

from threading import Thread, Event import time import random boot= Event() def server(): print('啟動服務器。。。。') time.sleep(random.randint(1,3)) print('服務器運行。。。。。') boot.set() def connect(): print('開始嘗試連接') boot.wait() print('連接成功') t1 = Thread(target=server) t1.start() t2 = Thread(target=connect) t2.start()