爬蟲里的多線程基本使用


最近拜讀瑞安·米切爾的書關於並行抓取問題有很通俗的介紹:

  “網頁抓去的速度很快,起碼通常比雇佣幾十個實習生手動網上復制數據要快很多。當然隨着技術的不斷進步和享樂適應,人們還是在某個時刻覺得‘不夠快’,於是把目光轉向分布式計算。

 和其他領域不同的是,網頁抓取不能單純依靠‘給問題增加更多進程’來提升速度,雖然運行一個process很快,但是兩個進程未必能讓速度提升一倍,而當運行三個乃更多時,可能你的所有請求都會被遠程服務器封殺,因為他認為你是在惡意攻擊。”

然而,某些場景里使用網頁並行抓取或者並行線程(thread)/進程仍然有些好處:

 

1.從多個數據源(多個遠程服務器),而不只是在一個數據源收集數據。

2.在已經收集到的數據上執行更加復雜/執行時間更長的操作(例如圖像分析或者OCR處理)。

3.從大型web服務器收集數據,如果你已經付費,或者創建多個連接是使用協議允許的行為。

 

一.pythom3.x 使用的是_thread,而thread模塊已經廢棄

1.1下面用_thread實現一個小例子

 

 1 import time
 2 import  _thread
 3 def print. time(threadName, delay, iterations):
 4 
 5     start = int(time . time())
 6 
 7     for i in range(0 ,iterations):
 8 
 9         time .sleep(delay)
10 
11         seconds_ elapsed = str(int(time.time()) - start)
12 
13         print ("[] []". format(seconds_ elapsed, threadName))
14     
15 try:
16     _thread.start_new_thread(print_time, ('Fizz', 3, 33))
17     _thread.start_new_thread(print_time, ('Buzz',5,20))
18     _thread.start_new_thread(print_time,('Counter',1,100))
19 except:
20 
21     print ('Error:unable to start_thread')
22 
23 while 1:
24 
25 pass                

上面的例子開啟了三個線程,分別3,5,1秒打印不同次數的名字

 

1.2 threading 模塊

 

 _thread是python相當底層的模塊,雖然可以有詳細的管理操作,但是苦於沒有高級函數,使用起來不太方便。

threadign模塊是一個高級接口,更加便捷使用線程,與此同時也體現了_thread模塊的特性。

在threading與os模塊里,分別調用了current_thread()和getpid() 該函數來獲取當前的線程/進程號,能有更好的執行效果。

 1 import os
 2 import threading
 3 from threading import Thread
 4 
 5 import time
 6 
 7 userlist = ["黎明", "張學友", "劉德華"]
 8 
 9 
10 def work(n):
11     print(f"開始給{n}打電話,進程號:{os.getpid()},線程號是:{threading.current_thread()}")  # 這里加f 是字符串格式化,作用類似於format
12     time.sleep(3)
13     print(f"給{n}打電話結束。進程號:{os.getpid()}線程號是:{threading.current_thread()}")
14 
15 
16 if __name__ == '__main__':
17     # 單線程
18     # for d in userlist:
19     #     work(d)
20     plist = []
21     # # 循環創建多進程部門,類似增加
22     # for d in userlist:
23     #     # 進程對象
24     #     p = Process(target=work, args=(d,))
25     #     # 生成進程
26     #     p.start()
27     #     # 生成的進程加入列表
28     #     plist.append(p)
29     #
30     # # 阻塞終止進程的執行
31     # [i.join() for i in plist]
32 
33     # 多線程類似部門增加人手
34     for d in userlist: 
35         # 利用for來生成線程對象,然后再為它們傳入你想要傳入的數據(可以不一樣)
36         p = Thread(target=work, args=(d,))
37         # 生成線程
38         p.start()
39         # 生成的線程加入列表
40         plist.append(p)
41 
42     # 阻塞終止線程的執行(join():創建的都是子線程。如果不加join().主線程運行完,程序就結束了,作用就是等待子線程運行完畢。)
43     [i.join() for i in plist]

你也可以單個的創建,threading的優勢是創建的線程其他線程無法訪問的線程局部數據(local thread data),這樣的優勢對於爬蟲尤其明顯,它們抓取不同的網站,那么每個線程都可以

專注於自己要抓取的目標。

import threading

def  crawler(url):
       data = threading.local()
       data.visited = []
       #爬去目標網站

threading.Thread(target=crawler, args=('http://www.douban.com')).start()

這樣就解決了線程之間的共享對象導致競爭條件問題。目標很明顯:不需要共享就不要共享(使用local data),為了安全的共享,就需要用到Queue模塊(后面會有示例)

threading的保姆職責甚至可以達到高度定制:isAlive函數的默認行為查看是否有線程仍處於活躍狀態。當一個線程崩潰或者重啟后才會返回True:

threading.Thread(target=crawler) t.start() while True: time.sleep(1) if not t.isAlive:

t = Thread(target=crawler, args=(d,))

t.start()

這樣可以達到簡單的監控方法。

 

2021-04-01

19:36:13

1.3import threading

from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor import os import time user_list = ['樹花', '歐陽娜娜', '迪麗熱巴', '鞠婧禕', '宋祖兒'] user_list1 = ['樹花', '歐陽娜娜', '迪麗熱巴', '鞠婧禕', '宋祖兒'] def work(n): print(f"開始給{n}打電話,進程號是{os.getpid()},線程號是:{threading.current_thread()}"+'\n') time.sleep(2) print(f"給{n}打電話結束,進程號是{os.getpid()},線程號是:{threading.current_thread()}"+'\n') def work1(n): print(f"開始給{n}打電話,進程號是{os.getpid()},線程號是:{threading.current_thread()}"+'\n') time.sleep(2) print(f"給{n}打電話結束,進程號是{os.getpid()},線程號是:{threading.current_thread()}"+'\n') if __name__ == '__main__': '''  創建線程池的時候規定三個線程,根據指派任務為三種情況: 1.= 三個任務 理論上同時執行三個任務 2.> 三個任務 先執行前三個任務,在這其中根據完成速度先后繼續分配線程給后面的任務 3.< 三個任務 同時執行分配的任務,無任務線程等待阻塞 '''  # 創建線程池(進程池) with ThreadPoolExecutor(max_workers=3) as pool: # pool = ProcessPoolExecutor(max_workers=3) # 循環指派任務和參數 [pool.submit(work,user) for user in user_list] # [pool.submit(work1,[user]) for user in user_list1] # 關閉線程池(進程池) pool.shutdown(wait=True)
print('主線程完成')


 
        

1.3.1

      如果程序不希望直接調用 result() 方法阻塞線程,則可通過 Future 的add_done_callback() 方法來添加回調函數,該回調函數形如 fn(future)。當線程任務完成后,程序會自動觸發該回調函數,並將對應的 Future 對象作為參數傳給該回調函數,直接調用result函數結果。

 

def test(value1, value2=None):
    print("%s threading is printed %s, %s"%(threading.current_thread().name, value1, value2))
    time.sleep(2)
    return 'finished'

def test_result(future):
    print(future.result())

if __name__ == "__main__":

    threadPool = ThreadPoolExecutor(max_workers=4, thread_name_prefix="test_")
    for i in range(0,10):
        future = threadPool.submit(test, i,i+1)
        future.add_done_callback(test_result) # 回調函數
        print(future.result())

    threadPool.shutdown(wait=True)
    print('main finished')

 

1.3.2 map()

from concurrent.futures import ThreadPoolExecutor

def test(value1, value2=None):
    print("%s threading is printed %s, %s"%(threading.current_thread().name, value1, value2))
#     time.sleep(2)
    return '*******'

if __name__ == "__main__":

    args1 = [1,2]  #假設1,2分別代表一個url
    args2 = [3,4]

    with ThreadPoolExecutor(max_workers=4, thread_name_prefix="test_") as threadPool:

        threadPool.map(test, args1,args2) # 這是運行一次test的參數,眾所周知map可以讓test執行多次,一個[]代表參數,這里會請求1,3和2,4,執行兩次。

        threadPool.shutdown(wait=True)

執行結果:

test__0 threading is printed 1, 3
test__1 threading is printed 2, 4


    上面程序使用 map() 方法來啟動 4個線程(該程序的線程池包含 4 個線程,如果繼續使用只包含兩個線程的線程池,此時將有一個任務處於等待狀態,必須等其中一個任務完成,線程空閑出來
才會獲得執行的機會),map() 方法的返回值將會收集每個線程任務的返回結果。
    通過上面程序可以看出,使用 map() 方法來啟動線程,並收集線程的執行結果,不僅具有代碼簡單的優點,而且雖然程序會以並發方式來執行 test() 函數,
但最后收集的 test() 函數的執行結果,依然與傳入參數的結果保持一致。

2021-04-03

00:01:27

 

1.4 利用隊列達到線程之間安全通信

 

這一個面向對象的簡單爬蟲,使用了上面提到的線程池和Queue。

import os
import threading
import time
from queue import Queue
from concurrent.futures import ThreadPoolExecutor
import requests
from lxml import etree


class quibai():
    def __init__(self):
        self.base_url = "https://www.qiushibaike.com/imgrank/page/{}/"
        self.headers = {
            "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_3) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/83.0.4103.61 Safari/537.36", }
        self.queue_list_url = Queue()
        self.queue_parse_url = Queue()
        self.queue_content = Queue()

    def list_url(self):
        # return [self.base_url.format(i) for i in range(1,14)]
        for i in range(1, 14):
            url_s = self.base_url.format(i)
            self.queue_list_url.put(url_s)
            # print(self.queue_list_url.get())

    def get_imformations(self):
        while True:
            url = self.queue_list_url.get()

            response = requests.get(url=url, headers=self.headers)
            time.sleep(1)
            # return response.content.decode('utf-8')
            self.queue_parse_url.put(response.content.decode('utf-8'))
            self.queue_list_url.task_done()

    def analysis(self):
        while True:
            resp = self.queue_parse_url.get()
            res = etree.HTML(resp)
            item = {}
            picture = res.xpath('//div[@class="author clearfix"]//img/@src')
            item["picture"] = ["https:" + i for i in picture]
            name = res.xpath('//div[@class="author clearfix"]//h2/text()')
            item["name"] = [n.replace("\n", "") for n in name]
            # for n in name:

            title = res.xpath('//div[@class="content"]/span/text()')
            item["title"] = [t.replace("\n", "") for t in title]
            photo = res.xpath('//div[@class="thumb"]/a/img/@src')
            item["photo"] = ["https:" + i for i in photo]
            item["Fabulous"] = res.xpath('//span[@class="stats-vote"]/i/text()')
            item["comment"] = res.xpath('//span[@class="stats-comments"]//i/text()')

            gender = res.xpath('//div[@class="author clearfix"]/div/@class')
            for i in range(len(gender)):
                re = gender[i].split(" ")[1]
                if re == "womenIcon":
                    gender[i] = ""
                else:
                    gender[i] = ""
            item["gender"] = gender
            info = {}
            # # print(item['picture'])
            for name, gender, picture, title, photo, Fabulous, comment in zip(item['name'], item['gender'],
                                                                              item['picture'], item['title'],
                                                                              item['photo'], item['Fabulous'],
                                                                              item['comment']):
                info['name'] = name
                info['gender'] = gender
                info['picture'] = picture
                info['title'] = title
                info['photo'] = photo
                info['Fabulous'] = Fabulous
                info['comment'] = comment
            # return name, gender, picture, title, photo, Fabulous, comment
            self.queue_content.put(info)
            self.queue_parse_url.task_done()

    def save_data(self):
        if not os.path.exists("story_info"):
            os.mkdir("story_info")
        s = 1
        while True:
            item_list = self.queue_content.get()
            with open("story_info/test_{}.txt".format(s), 'w', encoding='utf-8') as f:
                f.writelines(str(item_list))
            s += 1
            self.queue_content.task_done()

    def go(self):
        """
        1.設置基礎信息,鏈接數據庫
        2.獲取url列表
        3。獲取請求
        4。get信息
        5.save data from 'def get' to mysql
        6.改造成線程
        :return:
        """
        # li_url = self.list_url()
        # print(li_url)
        # for j in li_url:
        # resp = self.get_imformations(j)
        # print(resp)
        # name, gender, picture, title, photo, Fabulous, comment =self.analysis(resp)
        # print(self.analysis(resp))
        thread_list = []
        # 在下面同時循環建立線程對象
        
         for j in range(3):
             t1 = threading.Thread(target=self.list_url)
             thread_list.append(t1)
        
             t2 = threading.Thread(target=self.get_imformations)
        
             thread_list.append(t2)
             t3 = threading.Thread(target=self.analysis)
        
             thread_list.append(t3)
             t4 = threading.Thread(target=self.save_data)
        
             thread_list.append(t4)
         for t in thread_list:
             t.setDaemon(True)  # 把子線程設置成守護線程,該線程不signifisant,當主線程結束,子線程結束
             t.start()
             print(t)
         for q in [self.queue_list_url, self.queue_parse_url, self.queue_content]:
             q.join()  # 讓主線程隊列先阻塞等待,當所有子線程完成后再完成。
             print(q)
         print("主線程結束")

        if __name__ == '__main__':
    start_time=time.time()
    star = quibai()
    star.go()
    end_time=time.time()
    print('執行完畢,花了:{}的時間'.format(end_time-start_time))

 

1.4.1  鎖與線程同步

 

我上面的例子使用的了隊列來實現了線程的同步和安全,而鎖也可以實現共享資源的同步訪問,特別在一些場景下有十分重要的應用,例如io操作、數據庫查詢,在博客總能看到大佬們對我這種的小白的調侃告誡 “畢加索”。 這里就不寫🔒來充字數了,偷個懶放兩篇文章建議食用,哈哈。

1. 菜鳥教程基礎講解

2.線程通信

3.線程鎖的進一步理解

 

水平有限,求指正勿噴。 

溜了溜了,吃飯去。

2021-04-04

12:01:13

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM