多線程和多進程爬蟲
一.線程
1.什么是線程。
線程是操作系統能夠進行運算調度的最小單位。它被包含在進程中,是進城中的實際運作單位。一條線程指的是進程中一個單一順序的控制流,一個線程可以並發多個線程,每條線程執行不同的任務。
2.線程常用的方法
方法 |
說明 |
start() |
線程准備就緒,等待CPU調度 |
setName() |
為線程設置名稱 |
getName() |
獲取線程名稱 |
setDaemon() |
設置為守護線程 |
Join() |
逐個執行每個線程,執行完畢后繼續往下執行 |
run() |
線程被調度后會執行該方法,如果想自定義線程類,需要重寫run()方法 |
3.Threading類
3.1 線程的普通創建方式
Threadding用於提供線程相關的操作,線程是應用程序中工作的最小單元。
import threading import time def show(arg): time.sleep(1) print('thread' + str(arg)) for i in range(10): t = threading.Thread(target=show, args=(i,)) t.start() print('主線程結束') 結果: 主線程結束 thread0 thread1 thread2 thread5 thread4 thread3 thread7 thread6 thread8 thread9
上述代碼創建了10個“前台”線程,然后控制器就交給了CPU,CPU根據指定算法進行調度,分片執行指令。
3.2 自定義線程類
繼承自threading.Thread類來自定義線程類,但是其本質卻是重構thread類中的run()方法。
import threading
class myThread(threading.Thread): def __init__(self, sum): threading.Thread.__init__(self) self.sum = sum
def run(self): print('對象數是:',self.sum) if __name__ == '__main__':
t1 = myThread(1) t2 = myThread(2) t1.start() t2.start()
3.3 計算子線程執行的時間
PS:sleep的時候是不會占用CPU的,操作系統會把線程掛起。
import threading import time def show(n): time.sleep(1) print('thread' + str(n)) start_time = time.time() obj_list = []
for i in range(5): t = threading.Thread(target=show,args=(i,)) t.start() obj_list.append(t) for obj in obj_list: obj.join() print('花費的時間為:',time.time() - start_time)
3.4 守護線程
線程的setDaemon(True)將線程變成主線程的守護線程,意思是當主進程結束后,子線程也會隨之退出。意味着當主線程結束后,程序就結束了。
1 import threading 2 import time 3 4 def show(n): 5 time.sleep(1) 6 print('thread' + str(n)) 7 8 start_time = time.time() 9 obj_list = [] 10 11 for i in range(5): 12 t = threading.Thread(target=show,args=(i,)) 13 t.setDaemon(True) 14 t.start() 15 obj_list.append(t) 16 17 print('花費的時間為:',time.time() - start_time)
3.5 GIL(全局解釋器鎖)
在Python的運行環境中,無論電腦是單核還是雙核,操作系統同時只會執行一個線程。究其原因,是因為GIL(全局解釋器鎖)。
在Python中,一個線程要想要執行,必須要先拿到GIL。可以吧GIL想象成一個“通行證”,並且在一個進程中,GIL只有一個。沒有通行證的線程就不會被執行。
Python多線程的工作過程:
- 拿到公共數據
- 申請GIL
- Python解釋器調用os的原生線程
- os操作CPU執行運算
- 當該線程的執行時間到了之后,無論是否執行完,GIL被釋放
- 其他線程重復上面的操作
- 其他進程執行完成后,切換到原來的線程(從記錄的上下文繼續執行)
3.6 線程鎖(Lock,RLock)
由於線程之間是進行隨機調度,並且每個線程可能只執行n條執行之后,當多個線程同時修改同一條數據時可能會出現臟數據,所以,出現了線程鎖 - 同一時刻允許一個線程執行操作。
import threading,time def run(n): global num num += 1 num = 0 obj_list = [] for i in range(20000): t = threading.Thread(target=run,args=(i,)) t.start() obj_list.append(t) for obj in obj_list: obj.join() print('num:',num) 臟數據:19999
3.6.1 互斥鎖(Lock)
為了防止上面情況的發生,我們可以使用互斥鎖(Lock)來解決。
import threading,time lock = threading.Lock() # 實例化一個鎖對象 def run(n): lock.acquire() # 獲取鎖 global num num += 1 lock.release() # 釋放鎖 num = 0 obj_list = [] for i in range(20000): t = threading.Thread(target=run,args=(i,)) t.start() obj_list.append(t) for obj in obj_list: obj.join() print('num:',num)
3.6.2 遞歸鎖(RLock)
RLock的用法和Lock一樣,只是他支持嵌套。在多個鎖沒有釋放的時候一般會使用Rlock類。
import threading,time lock = threading.RLock() # 實例化一個鎖對象 num = 0 obj_list = [] def run(n): lock.acquire() # 獲取鎖 global num num += 1 lock.release() # 釋放鎖 for i in range(20000): t = threading.Thread(target=run,args=(i,)) t.start() obj_list.append(t) for obj in obj_list: obj.join() print('num:',num)
3.7 信號量(Semaphore)
互斥鎖 同時只允許一個線程更改數據,而Semaphore是同時允許一定數量的線程更改數據 ,比如廁所有3個坑,那最多只允許3個人上廁所,后面的人只能等里面有人出來了才能再進去。
import threading,time lock = threading.BoundedSemaphore(6) # 實例化一個鎖對象 def run(n): lock.acquire() # 獲取鎖 time.sleep(1) print('run the thread: %s' % n) lock.release() # 釋放鎖 num = 0 for i in range(200): t = threading.Thread(target=run,args=(i,)) t.start()
3.8事件(Event)
python線程的事件用於主線程控制其他線程的執行,事件是一個簡單的線程同步對象,主要提供了以下幾種方法:
方法 |
說明 |
clear() |
將flag設置為“false” |
set() |
將flag設置為“true” |
is_set() |
判斷是否設置了flag |
wait() |
一直監聽flag,沒有檢測到會一直處於阻塞狀態 |
事件處理的機制:全局定義了一個“Flag”,如果“Flag”值為 False,那么當程序執行 event.wait 方法時就會阻塞,如果“Flag”值為True,那么event.wait 方法時便不再阻塞。
import threading,time event = threading.Event() # 創建事件對象 def lighter(): count = 0 event.set() #初始值為綠燈 while 1: if 5 < count <= 10: event.clear() #紅燈,清楚標志位 print('\33[41;1mred light is on...\033[0m') elif count > 10: event.set() # 綠燈,設置標志位 count = 0 else: print('\33[41;1mred light is on...\033[0m') time.sleep(1) count += 1 def car(name): while True: if event.is_set(): # 判斷是否設置了標志位 print("[%s] 綠燈亮,請行駛..." % name) time.sleep(1) else: print("[%s] 紅燈亮,請等待..." % name) event.wait() print("[%s] 綠燈亮,開始行駛..." % name) light = threading.Thread(target=lighter,)
car = threading.Thread(target=car, args=('test',))
light.start()
car.start()
3.9條件(Condition)
使得線程等待,只有滿足某條件時,才釋放n個線程。
互斥鎖是最簡單的線程同步機制,Python提供的Condition對象提供了對復雜線程同步問題的支持。Condition被稱為條件變量,除了提供與Lock類似的acquire和release方法外,還提供了wait和notify方法。線程首先acquire一個條件變量,然后判斷一些條件。如果條件不滿足則wait;如果條件滿足,進行一些處理改變條件后,通過notify方法通知其他線程,其他處於wait狀態的線程接到通知后會重新判斷條件。不斷的重復這一過程,從而解決復雜的同步問題。
可以認為Condition對象維護了一個鎖(Lock/RLock)和一個waiting池。線程通過acquire獲得Condition對象,當調用wait方法時,線程會釋放Condition內部的鎖並進入blocked狀態,同時在waiting池中記錄這個線程。當調用notify方法時,Condition對象會從waiting池中挑選一個線程,通知其調用acquire方法嘗試取到鎖。
Condition對象的構造函數可以接受一個Lock/RLock對象作為參數,如果沒有指定,則Condition對象會在內部自行創建一個RLock。
除了notify方法外,Condition對象還提供了notifyAll方法,可以通知waiting池中的所有線程嘗試acquire內部鎖。由於上述機制,處於waiting狀態的線程只能通過notify方法喚醒,所以notifyAll的作用在於防止有線程永遠處於沉默狀態。
演示條件變量同步的經典問題是生產者與消費者問題:假設有一群生產者(Producer)和一群消費者(Consumer)通過一個市場來交互產品。生產者的”策略“是如果市場上剩余的產品少於1000個,那么就生產100個產品放到市場上;而消費者的”策略“是如果市場上剩余產品的數量多余100個,那么就消費3個產品。用Condition解決生產者與消費者問題的代碼如下:
import threading import time class Producer(threading.Thread): def run(self): global count while True: if con.acquire(): if count > 1000: con.wait() else: count = count+100 msg = self.name+' produce 100, count=' + str(count) print msg con.notify() con.release() time.sleep(1) class Consumer(threading.Thread): def run(self): global count while True: if con.acquire(): if count < 100: con.wait() else: count = count-3 msg = self.name+' consume 3, count='+str(count) print msg con.notify() con.release() time.sleep(1) count = 500 con = threading.Condition() def test(): for i in range(2): p = Producer() p.start() for i in range(5): c = Consumer() c.start() if __name__ == '__main__': test()
4.多線程的爬蟲實例
'''使用多線程爬取1500個url中的圖片'''
import threading,requests,time link_list = [] with open('url.txt','r') as f: for u in f.readlines(): url = u.split()[0].replace('\n','') link_list.append(url) start_time = time.time() class myThread(threading.Thread): def __init__(self,name,link_range): threading.Thread.__init__(self) self.name = name self.link_range = link_range self.userName = 0 def run(self): print('開始:',self.name) self.craw(self.name,self.link_range) print('結束:',self.name) def writeImages(self, ThreadName, url): print("正在存儲文件 %s ..." % ThreadName+str(self.userName)) path = r'D:\zhappian' + '\\' + ThreadName+str(self.userName) + '.png' file = open(path, 'wb') images = requests.get(url,timeout = 20).content file.write(images) file.close() self.userName += 1 def craw(self,name,link_range): for i in range(link_range[0],link_range[len(link_range)-1]): self.writeImages(name,link_list[i]) obj_list = [] url_list = [(0,300),(301,600),(601,900),(901,1200),(1201,1500)] # 創建新線程 for i in range(5): t = myThread('Thread-'+str(i+1), url_list[i]) t.start() obj_list.append(t) # 等待所有線程執行完成 for url in url_list: url.join() end_time = time.time() print('爬蟲的運行時間為:',end_time - start_time)
5.隊列(queue)
queue是python中的標准庫,俗稱隊列。在python中,多個線程之間的數據是共享的,多個線程進行數據交換的時候,不能夠保證數據的安全性和一致性,所以當多個線程需要進行數據交換的時候,隊列就出現了,隊列可以完美解決線程間的數據交換,保證線程間數據的安全性和一致性。
PS: 在python2.x中,模塊名為Queue
queue模塊有三種隊列及構造函數
l Python queue模塊的FIFO隊列先進先出。 queue.Queue(maxsize)
l LIFO類似於堆,即先進后出。 queue.LifoQueue(maxsize)
l 還有一種是優先級隊列級別越低越先出來。 queue.PriorityQueue(maxsize)
queue模塊中的常用方法:
方法 |
說明 |
queue.qsize() |
返回隊列的大小 |
queue.empty() |
如果隊列為空,返回True,反之False |
queue.full() |
如果隊列滿了,返回True,反之False (queue.full 與 maxsize 大小對應) |
queue.get([block[, timeout]]) |
獲取隊列,立即取出一個元素, timeout超時時間 |
queue.put(item[, timeout]]) |
寫入隊列,立即放入一個元素, timeout超時時間 |
queue.join() |
阻塞調用線程,直到隊列中的所有任務被處理掉, 實際上意味着等到隊列為空,再執行別的操作 |
queue.task_done() |
在完成一項工作之后,queue.task_done()函數向任務已經完成的隊列發送一個信號
|
5.1代碼實例
l 創建隊列
import queue q = queue.Queue()
l empty方法(如果隊列為空,返回True)
import queue q = queue.Queue() print(q.empty()) #輸出:True
l full方法(如果隊列滿了,返回True)
import queue q = queue.Queue(1) #指定隊列大小 q.put('a') print(q.full()) #輸出:True
l put方法和get方法
import queue q = queue.Queue() q.put('a') q.put('b') print(q.get()) #輸出:a print(q.get()) #輸出:b
l qsize方法(返回隊列里元素個數)
import queue q = queue.Queue() q.put('a') q.put('b') print(q.qsize()) #輸出:2
5.2 生產者消費者
import threading,time import queue q = queue.Queue(maxsize=10) # 生產者 def Producer(name): count = 1 while True: q.put("冠軍%s" % count) print("冠軍",count) count +=1 time.sleep(0.1) #消費者 def Consumer(name): #while q.qsize()>0: while True: print("[%s] 取到[%s]..." %(name, q.get())) time.sleep(1) p = threading.Thread(target=Producer,args=("IG",)) c = threading.Thread(target=Consumer,args=("LPL",)) c1 = threading.Thread(target=Consumer,args=("LCK",)) p.start() c.start() c1.start()
6.多線程和爬蟲實例。
import threading,requests,time import queue start_time = time.time() obj_list = [] work_queue = queue.Queue(1500) link_list = [] with open('url.txt','r') as f: for u in f.readlines(): url = u.split()[0].replace('\n','') link_list.append(url) # 填充隊列 for url in link_list: work_queue.put(url) class myThread(threading.Thread): def __init__(self,name,q): threading.Thread.__init__(self) self.name = name self.userName = 0 self.q = q def run(self): print('開始:',self.name) while True: try: self.craw(self.name,self.q) except Exception as e: break print('結束:',self.name) def writeImages(self, ThreadName, url): print("正在存儲文件 %s ..." % ThreadName+str(self.userName)) path = r'D:\zhappian' + '\\' + ThreadName+str(self.userName) + '.png' file = open(path, 'wb') images = requests.get(url,timeout = 20).content file.write(images) file.close() self.userName += 1 def craw(self,name,q): url = q.get(timeout = 2) try: self.writeImages(name, url) except Exception as e: print(q.qsize(),url,e) # 創建新線程 for i in range(5): t = myThread('Thread-'+str(i+1), work_queue) t.start() obj_list.append(t) # 等待所有線程執行完成 for url in obj_list: url.join() end_time = time.time() print('爬蟲的運行時間為:',end_time - start_time)