最近拜讀瑞安·米切爾的書關於並行抓取問題有很通俗的介紹:
“網頁抓去的速度很快,起碼通常比雇佣幾十個實習生手動網上復制數據要快很多。當然隨着技術的不斷進步和享樂適應,人們還是在某個時刻覺得‘不夠快’,於是把目光轉向分布式計算。
和其他領域不同的是,網頁抓取不能單純依靠‘給問題增加更多進程’來提升速度,雖然運行一個process很快,但是兩個進程未必能讓速度提升一倍,而當運行三個乃更多時,可能你的所有請求都會被遠程服務器封殺,因為他認為你是在惡意攻擊。”
然而,某些場景里使用網頁並行抓取或者並行線程(thread)/進程仍然有些好處:
1.從多個數據源(多個遠程服務器),而不只是在一個數據源收集數據。
2.在已經收集到的數據上執行更加復雜/執行時間更長的操作(例如圖像分析或者OCR處理)。
3.從大型web服務器收集數據,如果你已經付費,或者創建多個連接是使用協議允許的行為。
一.pythom3.x 使用的是_thread,而thread模塊已經廢棄
1.1下面用_thread實現一個小例子
1 import time 2 import _thread 3 def print. time(threadName, delay, iterations): 4 5 start = int(time . time()) 6 7 for i in range(0 ,iterations): 8 9 time .sleep(delay) 10 11 seconds_ elapsed = str(int(time.time()) - start) 12 13 print ("[] []". format(seconds_ elapsed, threadName)) 14 15 try: 16 _thread.start_new_thread(print_time, ('Fizz', 3, 33)) 17 _thread.start_new_thread(print_time, ('Buzz',5,20)) 18 _thread.start_new_thread(print_time,('Counter',1,100)) 19 except: 20 21 print ('Error:unable to start_thread') 22 23 while 1: 24 25 pass
上面的例子開啟了三個線程,分別3,5,1秒打印不同次數的名字
1.2 threading 模塊
_thread是python相當底層的模塊,雖然可以有詳細的管理操作,但是苦於沒有高級函數,使用起來不太方便。
threadign模塊是一個高級接口,更加便捷使用線程,與此同時也體現了_thread模塊的特性。
在threading與os模塊里,分別調用了current_thread()和getpid() 該函數來獲取當前的線程/進程號,能有更好的執行效果。
1 import os 2 import threading 3 from threading import Thread 4 5 import time 6 7 userlist = ["黎明", "張學友", "劉德華"] 8 9 10 def work(n): 11 print(f"開始給{n}打電話,進程號:{os.getpid()},線程號是:{threading.current_thread()}") # 這里加f 是字符串格式化,作用類似於format 12 time.sleep(3) 13 print(f"給{n}打電話結束。進程號:{os.getpid()}線程號是:{threading.current_thread()}") 14 15 16 if __name__ == '__main__': 17 # 單線程 18 # for d in userlist: 19 # work(d) 20 plist = [] 21 # # 循環創建多進程部門,類似增加 22 # for d in userlist: 23 # # 進程對象 24 # p = Process(target=work, args=(d,)) 25 # # 生成進程 26 # p.start() 27 # # 生成的進程加入列表 28 # plist.append(p) 29 # 30 # # 阻塞終止進程的執行 31 # [i.join() for i in plist] 32 33 # 多線程類似部門增加人手 34 for d in userlist: 35 # 利用for來生成線程對象,然后再為它們傳入你想要傳入的數據(可以不一樣) 36 p = Thread(target=work, args=(d,)) 37 # 生成線程 38 p.start() 39 # 生成的線程加入列表 40 plist.append(p) 41 42 # 阻塞終止線程的執行(join():創建的都是子線程。如果不加join().主線程運行完,程序就結束了,作用就是等待子線程運行完畢。) 43 [i.join() for i in plist]
你也可以單個的創建,threading的優勢是創建的線程其他線程無法訪問的線程局部數據(local thread data),這樣的優勢對於爬蟲尤其明顯,它們抓取不同的網站,那么每個線程都可以
專注於自己要抓取的目標。
import threading def crawler(url): data = threading.local() data.visited = [] #爬去目標網站 threading.Thread(target=crawler, args=('http://www.douban.com')).start()
這樣就解決了線程之間的共享對象導致競爭條件問題。目標很明顯:不需要共享就不要共享(使用local data),為了安全的共享,就需要用到Queue模塊(后面會有示例)
threading的保姆職責甚至可以達到高度定制:isAlive函數的默認行為查看是否有線程仍處於活躍狀態。當一個線程崩潰或者重啟后才會返回True:
threading.Thread(target=crawler) t.start() while True: time.sleep(1) if not t.isAlive:
t = Thread(target=crawler, args=(d,))
t.start()
這樣可以達到簡單的監控方法。
2021-04-01
19:36:13
1.3import threading
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor import os import time user_list = ['樹花', '歐陽娜娜', '迪麗熱巴', '鞠婧禕', '宋祖兒'] user_list1 = ['樹花', '歐陽娜娜', '迪麗熱巴', '鞠婧禕', '宋祖兒'] def work(n): print(f"開始給{n}打電話,進程號是{os.getpid()},線程號是:{threading.current_thread()}"+'\n') time.sleep(2) print(f"給{n}打電話結束,進程號是{os.getpid()},線程號是:{threading.current_thread()}"+'\n') def work1(n): print(f"開始給{n}打電話,進程號是{os.getpid()},線程號是:{threading.current_thread()}"+'\n') time.sleep(2) print(f"給{n}打電話結束,進程號是{os.getpid()},線程號是:{threading.current_thread()}"+'\n') if __name__ == '__main__': ''' 創建線程池的時候規定三個線程,根據指派任務為三種情況: 1.= 三個任務 理論上同時執行三個任務 2.> 三個任務 先執行前三個任務,在這其中根據完成速度先后繼續分配線程給后面的任務 3.< 三個任務 同時執行分配的任務,無任務線程等待阻塞 ''' # 創建線程池(進程池) with ThreadPoolExecutor(max_workers=3) as pool: # pool = ProcessPoolExecutor(max_workers=3) # 循環指派任務和參數 [pool.submit(work,user) for user in user_list] # [pool.submit(work1,[user]) for user in user_list1] # 關閉線程池(進程池) pool.shutdown(wait=True)
print('主線程完成')
1.3.1
如果程序不希望直接調用 result() 方法阻塞線程,則可通過 Future 的add_done_callback() 方法來添加回調函數,該回調函數形如 fn(future)。當線程任務完成后,程序會自動觸發該回調函數,並將對應的 Future 對象作為參數傳給該回調函數,直接調用result函數結果。
def test(value1, value2=None): print("%s threading is printed %s, %s"%(threading.current_thread().name, value1, value2)) time.sleep(2) return 'finished' def test_result(future): print(future.result()) if __name__ == "__main__": threadPool = ThreadPoolExecutor(max_workers=4, thread_name_prefix="test_") for i in range(0,10): future = threadPool.submit(test, i,i+1) future.add_done_callback(test_result) # 回調函數 print(future.result()) threadPool.shutdown(wait=True) print('main finished')
1.3.2 map()
from concurrent.futures import ThreadPoolExecutor def test(value1, value2=None): print("%s threading is printed %s, %s"%(threading.current_thread().name, value1, value2)) # time.sleep(2) return '*******' if __name__ == "__main__": args1 = [1,2] #假設1,2分別代表一個url args2 = [3,4] with ThreadPoolExecutor(max_workers=4, thread_name_prefix="test_") as threadPool: threadPool.map(test, args1,args2) # 這是運行一次test的參數,眾所周知map可以讓test執行多次,一個[]代表參數,這里會請求1,3和2,4,執行兩次。 threadPool.shutdown(wait=True)
執行結果:
test__0 threading is printed 1, 3 test__1 threading is printed 2, 4
上面程序使用 map() 方法來啟動 4個線程(該程序的線程池包含 4 個線程,如果繼續使用只包含兩個線程的線程池,此時將有一個任務處於等待狀態,必須等其中一個任務完成,線程空閑出來
才會獲得執行的機會),map() 方法的返回值將會收集每個線程任務的返回結果。
通過上面程序可以看出,使用 map() 方法來啟動線程,並收集線程的執行結果,不僅具有代碼簡單的優點,而且雖然程序會以並發方式來執行 test() 函數,
但最后收集的 test() 函數的執行結果,依然與傳入參數的結果保持一致。
2021-04-03
00:01:27
1.4 利用隊列達到線程之間安全通信
這一個面向對象的簡單爬蟲,使用了上面提到的線程池和Queue。
import os import threading import time from queue import Queue from concurrent.futures import ThreadPoolExecutor import requests from lxml import etree class quibai(): def __init__(self): self.base_url = "https://www.qiushibaike.com/imgrank/page/{}/" self.headers = { "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_3) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/83.0.4103.61 Safari/537.36", } self.queue_list_url = Queue() self.queue_parse_url = Queue() self.queue_content = Queue() def list_url(self): # return [self.base_url.format(i) for i in range(1,14)] for i in range(1, 14): url_s = self.base_url.format(i) self.queue_list_url.put(url_s) # print(self.queue_list_url.get()) def get_imformations(self): while True: url = self.queue_list_url.get() response = requests.get(url=url, headers=self.headers) time.sleep(1) # return response.content.decode('utf-8') self.queue_parse_url.put(response.content.decode('utf-8')) self.queue_list_url.task_done() def analysis(self): while True: resp = self.queue_parse_url.get() res = etree.HTML(resp) item = {} picture = res.xpath('//div[@class="author clearfix"]//img/@src') item["picture"] = ["https:" + i for i in picture] name = res.xpath('//div[@class="author clearfix"]//h2/text()') item["name"] = [n.replace("\n", "") for n in name] # for n in name: title = res.xpath('//div[@class="content"]/span/text()') item["title"] = [t.replace("\n", "") for t in title] photo = res.xpath('//div[@class="thumb"]/a/img/@src') item["photo"] = ["https:" + i for i in photo] item["Fabulous"] = res.xpath('//span[@class="stats-vote"]/i/text()') item["comment"] = res.xpath('//span[@class="stats-comments"]//i/text()') gender = res.xpath('//div[@class="author clearfix"]/div/@class') for i in range(len(gender)): re = gender[i].split(" ")[1] if re == "womenIcon": gender[i] = "男" else: gender[i] = "女" item["gender"] = gender info = {} # # print(item['picture']) for name, gender, picture, title, photo, Fabulous, comment in zip(item['name'], item['gender'], item['picture'], item['title'], item['photo'], item['Fabulous'], item['comment']): info['name'] = name info['gender'] = gender info['picture'] = picture info['title'] = title info['photo'] = photo info['Fabulous'] = Fabulous info['comment'] = comment # return name, gender, picture, title, photo, Fabulous, comment self.queue_content.put(info) self.queue_parse_url.task_done() def save_data(self): if not os.path.exists("story_info"): os.mkdir("story_info") s = 1 while True: item_list = self.queue_content.get() with open("story_info/test_{}.txt".format(s), 'w', encoding='utf-8') as f: f.writelines(str(item_list)) s += 1 self.queue_content.task_done() def go(self): """ 1.設置基礎信息,鏈接數據庫 2.獲取url列表 3。獲取請求 4。get信息 5.save data from 'def get' to mysql 6.改造成線程 :return: """ # li_url = self.list_url() # print(li_url) # for j in li_url: # resp = self.get_imformations(j) # print(resp) # name, gender, picture, title, photo, Fabulous, comment =self.analysis(resp) # print(self.analysis(resp)) thread_list = [] # 在下面同時循環建立線程對象 for j in range(3): t1 = threading.Thread(target=self.list_url) thread_list.append(t1) t2 = threading.Thread(target=self.get_imformations) thread_list.append(t2) t3 = threading.Thread(target=self.analysis) thread_list.append(t3) t4 = threading.Thread(target=self.save_data) thread_list.append(t4) for t in thread_list: t.setDaemon(True) # 把子線程設置成守護線程,該線程不signifisant,當主線程結束,子線程結束 t.start() print(t) for q in [self.queue_list_url, self.queue_parse_url, self.queue_content]: q.join() # 讓主線程隊列先阻塞等待,當所有子線程完成后再完成。 print(q) print("主線程結束") if __name__ == '__main__': start_time=time.time() star = quibai() star.go() end_time=time.time() print('執行完畢,花了:{}的時間'.format(end_time-start_time))
1.4.1 鎖與線程同步
我上面的例子使用的了隊列來實現了線程的同步和安全,而鎖也可以實現共享資源的同步訪問,特別在一些場景下有十分重要的應用,例如io操作、數據庫查詢,在博客總能看到大佬們對我這種的小白的調侃告誡 “畢加索”。 這里就不寫🔒來充字數了,偷個懶放兩篇文章建議食用,哈哈。
1. 菜鳥教程基礎講解
2.線程通信
水平有限,求指正勿噴。
溜了溜了,吃飯去。
2021-04-04
12:01:13