python多線程:控制線程數量


背景

前段時間學習了python的多線程爬蟲,當時爬取一個圖片網站,開啟多線程后,並沒有限制線程的數量,也就是說,如果下載1000張圖片,會一次性開啟1000個子線程同時進行下載

 

現在希望控制線程數量:例如每次只下載5張,當下載完成后再下載另外5張,直至全部完成

查了一些資料,發現在python中,threading 模塊有提供 Semaphore類 和 BoundedSemaphore類來限制線程數

詳細說明可以看看下面幾篇文章,寫的很棒:

https://docs.python.org/3.5/library/threading.html?highlight=threading#semaphore-objects

https://www.liujiangblog.com/course/python/79

https://my.oschina.net/u/3524921/blog/920303

https://zhuanlan.zhihu.com/p/34159519

 

官網給出例子如下:

信號量通常用於保護容量有限的資源,例如數據庫服務器。在資源大小固定的任何情況下,都應使用有界信號量。在產生任何工作線程之前,您的主線程將初始化信號量:

maxconnections = 5
# ...
pool_sema = BoundedSemaphore(value=maxconnections)

產生后,工作線程在需要連接到服務器時會調用信號量的獲取和釋放方法:

with pool_sema:
    conn = connectdb()
    try:
        # ... use connection ...
    finally:
        conn.close()

改造之前的多線程爬蟲

首先貼出原來的代碼

# -*- coding:utf-8 -*-
import requests
from requests.exceptions import RequestException
import os, time
import re
from lxml import etree
import threading

lock = threading.Lock()
def get_html(url):
    """
    定義一個方法,用於獲取一個url頁面的響應內容
    :param url: 要訪問的url
    :return: 響應內容
    """
    response = requests.get(url, timeout=10)
    # print(response.status_code)
    try:
        if response.status_code == 200:

            # print(response.text)
            return response.text
        else:
             return None
    except RequestException:
        print("請求失敗")
        # return None


def parse_html(html_text):
    """
    定義一個方法,用於解析頁面內容,提取圖片url
    :param html_text:
    :return:一個頁面的圖片url集合
    """
    html = etree.HTML(html_text)

    if len(html) > 0:
        img_src = html.xpath("//img[@class='photothumb lazy']/@data-original")  # 元素提取方法
        # print(img_src)
        return img_src

    else:
        print("解析頁面元素失敗")

def get_image_pages(url):
    """
    獲取所查詢圖片結果的所有頁碼
    :param url: 查詢圖片url
    :return: 總頁碼數
    """

    html_text = get_html(url)  # 獲取搜索url響應內容
    # print(html_text)
    if html_text is not None:
        html = etree.HTML(html_text)  # 生成XPath解析對象
        last_page = html.xpath("//div[@class='pages']//a[last()]/@href")  # 提取最后一頁所在href鏈接
        print(last_page)
        if last_page:
            max_page = re.compile(r'(\d+)', re.S).search(last_page[0]).group()  # 使用正則表達式提取鏈接中的頁碼數字
            print(max_page)
            print(type(max_page))
            return int(max_page)  # 將字符串頁碼轉為整數並返回
        else:
            print("暫無數據")
            return None
    else:
        print("查詢結果失敗")


def get_all_image_url(page_number):
    """
    獲取所有圖片的下載url
    :param page_number: 爬取頁碼
    :return: 所有圖片url的集合
    """

    base_url = 'https://imgbin.com/free-png/naruto/'
    image_urls = []

    x = 1  # 定義一個標識,用於給每個圖片url編號,從1遞增
    for i in range(1, page_number):
        url = base_url + str(i)  # 根據頁碼遍歷請求url
        try:
            html = get_html(url)  # 解析每個頁面的內容
            if html:
                data = parse_html(html)  # 提取頁面中的圖片url
                # print(data)
                # time.sleep(3)
                if data:
                    for j in data:
                        image_urls.append({
                            'name': x,
                            'value': j
                        })
                        x += 1  # 每提取一個圖片url,標識x增加1
        except RequestException as f:
            print("遇到錯誤:", f)
            continue
    # print(image_urls)
    return image_urls

def get_image_content(url):
    """請求圖片url,返回二進制內容"""
    # print("正在下載", url)
    try:
        r = requests.get(url, timeout=15)
        if r.status_code == 200:
            return r.content
        return None
    except RequestException:
        return None

def main(url, image_name):
    """
    主函數:實現下載圖片功能
    :param url: 圖片url
    :param image_name: 圖片名稱
    :return:
    """
    semaphore.acquire()  # 加鎖,限制線程數
    print('當前子線程: {}'.format(threading.current_thread().name))
    save_path = os.path.dirname(os.path.abspath('.')) + '/pics/'
    try:
        file_path = '{0}/{1}.jpg'.format(save_path, image_name)
        if not os.path.exists(file_path):  # 判斷是否存在文件,不存在則爬取
            with open(file_path, 'wb') as f:
                f.write(get_image_content(url))
                f.close()

                print('第{}個文件保存成功'.format(image_name))

        else:
            print("第{}個文件已存在".format(image_name))

        semaphore.release()  # 解鎖imgbin-多線程-重寫run方法.py

    except FileNotFoundError as f:
        print("第{}個文件下載時遇到錯誤,url為:{}:".format(image_name, url))
        print("報錯:", f)
        raise

    except TypeError as e:
        print("第{}個文件下載時遇到錯誤,url為:{}:".format(image_name, url))
        print("報錯:", e)

class MyThread(threading.Thread):
    """繼承Thread類重寫run方法創建新進程"""
    def __init__(self, func, args):
        """

        :param func: run方法中要調用的函數名
        :param args: func函數所需的參數
        """
        threading.Thread.__init__(self)
        self.func = func
        self.args = args

    def run(self):
        print('當前子線程: {}'.format(threading.current_thread().name))
        self.func(self.args[0], self.args[1])
        # 調用func函數
        # 因為這里的func函數其實是上述的main()函數,它需要2個參數;args傳入的是個參數元組,拆解開來傳入


if __name__ == '__main__':
    start = time.time()
    print('這是主線程:{}'.format(threading.current_thread().name))

    urls = get_all_image_url(5)  # 獲取所有圖片url列表
    thread_list = []  # 定義一個列表,向里面追加線程
    semaphore = threading.BoundedSemaphore(5) # 或使用Semaphore方法
    for t in urls:
        # print(i)

        m = MyThread(main, (t["value"], t["name"]))  # 調用MyThread類,得到一個實例

        thread_list.append(m)

    for m in thread_list:

        m.start()  # 調用start()方法,開始執行

    for m in thread_list:
        m.join()  # 子線程調用join()方法,使主線程等待子線程運行完畢之后才退出


    end = time.time()
    print(end-start)
    # get_image_pages("https://imgbin.com/free-png/Naruto")
View Code

 

將代碼進行改造

1、下面的第8、9行表示調用 threading 的 BoundedSemaphore類,初始化信號量為5,把結果賦給變量 pool_sema

 1 if __name__ == '__main__':
 2     start = time.time()
 3     print('這是主線程:{}'.format(threading.current_thread().name))
 4 
 5     urls = get_all_image_url(5)  # 獲取所有圖片url列表
 6     thread_list = []  # 定義一個列表,向里面追加線程
 7 
 8     max_connections = 5  # 定義最大線程數
 9     pool_sema = threading.BoundedSemaphore(max_connections) # 或使用Semaphore方法
10     for t in urls:
11         # print(i)
12 
13         m = MyThread(main, (t["value"], t["name"]))  # 調用MyThread類,得到一個實例
14 
15         thread_list.append(m)
16 
17     for m in thread_list:
18 
19         m.start()  # 調用start()方法,開始執行
20 
21     for m in thread_list:
22         m.join()  # 子線程調用join()方法,使主線程等待子線程運行完畢之后才退出
23 
24 
25     end = time.time()
26     print(end-start)

2、修改main()函數

(1)方法一:通過with語句實現,第9行添加 with pool_sema

使用 with 語句來獲得一個鎖、條件變量或信號量,相當於調用 acquire();離開 with 塊后,會自動調用 release()

 1 def main(url, image_name):
 2     """
 3     主函數:實現下載圖片功能
 4     :param url: 圖片url
 5     :param image_name: 圖片名稱
 6     :return:
 7     """
 8     
 9     with pool_sema:
10         print('當前子線程: {}'.format(threading.current_thread().name))
11         save_path = os.path.dirname(os.path.abspath('.')) + '/pics/'
12         try:
13             file_path = '{0}/{1}.jpg'.format(save_path, image_name)
14             if not os.path.exists(file_path):  # 判斷是否存在文件,不存在則爬取
15                 with open(file_path, 'wb') as f:
16                     f.write(get_image_content(url))
17                     f.close()
18 
19                     print('第{}個文件保存成功'.format(image_name))
20 
21             else:
22                 print("第{}個文件已存在".format(image_name))
23 
24             
25 
26         except FileNotFoundError as f:
27             print("第{}個文件下載時遇到錯誤,url為:{}:".format(image_name, url))
28             print("報錯:", f)
29             raise
30 
31         except TypeError as e:
32             print("第{}個文件下載時遇到錯誤,url為:{}:".format(image_name, url))
33             print("報錯:", e)

(2)方法二:直接使用 acquire()和 release()

下面的第8行調用  acquire(),第24行調用release()

 1 def main(url, image_name):
 2     """
 3     主函數:實現下載圖片功能
 4     :param url: 圖片url
 5     :param image_name: 圖片名稱
 6     :return:
 7     """
 8     pool_sema.acquire()  # 加鎖,限制線程數
 9     # with pool_sema:
10     print('當前子線程: {}'.format(threading.current_thread().name))
11     save_path = os.path.dirname(os.path.abspath('.')) + '/pics/'
12     try:
13         file_path = '{0}/{1}.jpg'.format(save_path, image_name)
14         if not os.path.exists(file_path):  # 判斷是否存在文件,不存在則爬取
15             with open(file_path, 'wb') as f:
16                 f.write(get_image_content(url))
17                 f.close()
18 
19                 print('第{}個文件保存成功'.format(image_name))
20 
21         else:
22             print("第{}個文件已存在".format(image_name))
23 
24         pool_sema.release()  # 解鎖imgbin-多線程-重寫run方法.py
25 
26     except FileNotFoundError as f:
27         print("第{}個文件下載時遇到錯誤,url為:{}:".format(image_name, url))
28         print("報錯:", f)
29         raise
30 
31     except TypeError as e:
32         print("第{}個文件下載時遇到錯誤,url為:{}:".format(image_name, url))
33         print("報錯:", e)

最終效果是一樣的,每次啟用5個線程,完成后再啟動下一批

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM