python爬蟲之多線程爬取


一、什么是多進程?

  像電腦上同時運行多個軟件,比如在打開微信的同時,也打開了QQ與釘釘,這就是多進程。

二、什么是多線程?

  一個進程中可以進行多種操作,即在QQ上既可以發送消息也可視頻/語音,這就是多線程。

三、主進程/子進程

  主進程下面可能會有好多子進程,即不一定一個運行的軟件就是一個進程,他下面可能會有很多個子進程。

 

四、主線程/子線程

  一個主線程下面可能會有多個子線程。

五、如何創建線程(Thread)

1、面向過程的創建方式

t = threading.Thread(target=s, name='xxx'xxx argxs=(x,y))

target:線程啟動之后需要執行的函數

name:線程的名字

threading.current_thread().name:獲取線程的name

args:主線程向子線程傳遞的參數

t.start():啟動線程

t.join():讓主線程等待子線程結束

#!/usr/local/bin/python3.7

import threading
import time

"""
    一個主線程,兩個子線程。
"""

def send_message(x):
    for i in range(1,5):
        print('%s在發%s'%(x,threading.current_thread().name))
        time.sleep(1)

def video(x):
    for i in range(1,5):
        print('%s在%s'%(x, threading.current_thread().name))
        time.sleep(1)

def main():
    # 主線程傳遞到子線程的參數
    x = '墨子李'
    # 創建發消息的線程
    sthread = threading.Thread(target=send_message, name='發消息', args=(x,))
    # 創建視頻的線程
    vthread = threading.Thread(target=video, name='視頻', args=(x,))
    # 啟動線程
    sthread.start()
    vthread.start()
    # 主線程等待子線程結束之后再結束
    sthread.join()
    vthread.join()
    print('我是主線程')
if __name__ == "__main__":
    main()

 

2、面向對象的創建方式

#!/usr/local/bin/python3.7

"""
    定義一個類,繼承自threading.Thread
"""
import threading
import time

class SendMessage(threading.Thread):
    def __init__(self, name, x):
        super().__init__()
        self.name = name
        self.x = x

    def run(self):
        for i in range(1, self.x):
            print('%s在發送第%s條消息'%(self.name, i))
            time.sleep(1)

class Video(threading.Thread):
    def __init__(self, name, x):
        super().__init__()
        self.name = name
        self.x = x

    def run(self):
        for i in range(1, self.x):
            print('%s視頻了%s分鍾'%(self.name, i))
            time.sleep(1)



def main():
    # 創建線程
    sthread = SendMessage('墨子李', 5)
    vthread = Video('墨子李', 5)

    # 啟動線程
    sthread.start()
    vthread.start()

    # 主線程等待子線程結束
    sthread.join()
    vthread.join()

if __name__ == "__main__":
    main()

 3、線程同步

  線程之間共享全局變量(進程之間不可以),會出現數據混亂的現象,這個時候要使用線程鎖來處理這種情況。

  創建鎖:s = threading.Lock()

  上鎖:s.acquire()

  釋放鎖:s.release()

4、隊列(queue)

  先進先出原則

  創建隊列:q = Queue(5)

  給隊列添加數據:q.put('xxx')

  q.put('xxx', False) 如果隊列滿,程序直接報錯

  q.put('xxx', True, 3) 如果隊列滿,程序等待3s再報錯

  q.get() 獲取數據,如果隊列為空卡在這里等待

  q.get(False) 如果隊列為空,程序直接報錯

  q.get(True, 3) 如果隊列為空,程序等待3s報錯

  q.empty() 判斷隊列是否為空

  q.full() 判斷隊列是否已滿

  q.qsize() 獲取隊列長度

#!/usr/local/bin/python3.7

from queue import Queue

# 創建隊列,規定隊列長度為5
q = Queue(5)

# 添加數據
q.put('1')
q.put('2')
q.put('3')
q.put('4')
q.put('5')
# 判斷隊列是否已滿,返回true
print(q.full())

# 獲取數據
print(q.get())
print(q.get())
print(q.get())
print(q.get())
print(q.get())


# 隊列長度 ,數據取完之后長度為0
print(q.qsize())
# 判斷隊列是否為空,此時返回True
print(q.empty())

 

5、多線程爬取 

1、分析

  兩類線程:下載、解析

  內容隊列:下載線程往隊列中put數據,解析線程從隊列get數據。

  url隊列:下載線程從url隊列get數據

  寫數據:上鎖

 

2、實例

 

#!/usr/local/bin/python3.7

"""
@File    :   duoxiancheng.py
@Time    :   2020/06/19
@Author  :   Mozili

"""

import requests
import threading
import queue
import time
from lxml import etree
import json



# 采集線程列表
crawl_thread_list = []
# 解析線程列表
parse_thread_list = []

class CrawlThread(threading.Thread):
    def __init__(self, name, page_queue, data_queue):
        super().__init__()
        self.name = name
        self.page_queue = page_queue
        self.data_queue = data_queue
        self.headers = {'User-Agent':'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_4) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/13.1 Safari/605.1.15'
    } 

    def run(self):
        print('%s開始采集數據....'%self.name)
        # 這里需要加循環讓線程繼續下去,不然執行一次之后就會停止
        while True:
            # 獲取頁碼隊列中的數據,拼接url
            url = 'http://www.ifanjian.net/jianwen-{}'
            # 判斷隊列是否為空
            if not self.page_queue.empty():
                url = url.format(self.page_queue.get())
                # print(url) 
            else:
                # print('數據已取完')
                break
            # 發送請求
            r = requests.get(url, headers=self.headers)
            # print('請求url----------------------------------------', r.url)
            # self.fp1.write(r.text)
            # 將數據添加到數據隊列
            self.data_queue.put(r.text)
            # print(self.data_queue.qsize())
            # print(self.data_queue.get())
        print('%s結束采集數據....'%self.name)
              

class ParseThread(threading.Thread):
    def __init__(self, name, data_queue, lock, fp):
        super().__init__()
        self.name = name
        self.data_queue = data_queue
        self.lock = lock
        self.fp = fp
    
    def run(self):
        
        items = []
        print('{}開始解析數據...'.format(self.name))
        while self.data_queue.qsize()!= 0:
            # 從隊列中獲取數據
            content = self.data_queue.get()
            # 解析數據
            tree = etree.HTML(content)
            time.sleep(1)
            # 獲取標題
            title_list = tree.xpath("//ul[@class='cont-list']/li/h2/a/text()")
            # print(title_list)
            # 獲取圖片鏈接
            src_list = tree.xpath("//ul[@class='cont-list']/li//p/img/@data-src")
            # print(src_list)
            for title in title_list:
                for src in src_list:
                    data = {
                        '標題':title,
                        '鏈接':src
                        }
                    items.append(data) 
                    # 加鎖
                    self.lock.acquire()
                    self.fp.write(json.dumps(data, ensure_ascii=False)+'\n')
                    # 釋放鎖
                    self.lock.release()
                    break 
        # print(self.data_queue.qsize())
        print('{}結束解析數據...'.format(self.name))
        
def creat_crawl_thread(page_queue, data_queue):

    crawl_name_list = ['采集線程1', '采集線程2', '采集線程3']
    for name in crawl_name_list:
        crawl_thread = CrawlThread(name, page_queue, data_queue)
        crawl_thread_list.append(crawl_thread) 

def creat_parse_thread(data_queue, lock, fp):

    parse_name_list = ['解析線程1', '解析線程2', '解析線程3']
    for name in parse_name_list:
        parse_thread = ParseThread(name, data_queue, lock, fp) 
        parse_thread_list.append(parse_thread)

def main():
    # 創建頁碼隊列
    page_queue = queue.Queue()
    # 給隊列添加頁碼
    for page in range(1, 51):
        page_queue.put(page)
    # 創建數據隊列
    data_queue = queue.Queue()
    # 創建鎖
    lock = threading.Lock()
    # 創建文件
    fp = open('Reptile/fanjian.json', 'a', encoding='utf8')
    # fp1 = open('Reptile/fanjian.txt', 'a', encoding='utf8')
    # 創建采集數據線程
    creat_crawl_thread(page_queue, data_queue)
    # 創建解析數據線程
    creat_parse_thread(data_queue, lock, fp)
    
    # 啟動采集數據線程
    for cthread in crawl_thread_list:
        cthread.start()
     
    time.sleep(3)
    # 啟動解析數據線程
    for pthread in parse_thread_list:
        pthread.start()
    
    # 主線程等待子線程結束
    cthread.join()
    pthread.join()
    
    print('主線程結束....')
    # 關閉文件
    # fp.close()

if __name__ == "__main__":
    main()

 

 

 

 

 

  

  


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM