Python 線程池原理及實現


Python 線程池原理及實現

 

 前言

從Python3.2開始,標准庫為我們提供了 concurrent.futures 模塊,它提供了 ThreadPoolExecutor (線程池)和ProcessPoolExecutor (進程池)兩個類。

相比 threading 等模塊,該模塊通過 submit 返回的是一個 future 對象,它是一個未來可期的對象,通過它可以獲悉線程的狀態主線程(或進程)中可以獲取某一個線程(進程)執行的狀態或者某一個任務執行的狀態及返回值:

  1. 主線程可以獲取某一個線程(或者任務的)的狀態,以及返回值。
  2. 當一個線程完成的時候,主線程能夠立即知道。
  3. 讓多線程和多進程的編碼接口一致。

✨ 線程池的基本使用

# coding: utf-8 from concurrent.futures import ThreadPoolExecutor import time def spider(page): time.sleep(page) print(f"crawl task{page} finished") return page with ThreadPoolExecutor(max_workers=5) as t: # 創建一個最大容納數量為5的線程池 task1 = t.submit(spider, 1) task2 = t.submit(spider, 2) # 通過submit提交執行的函數到線程池中 task3 = t.submit(spider, 3) print(f"task1: {task1.done()}") # 通過done來判斷線程是否完成 print(f"task2: {task2.done()}") print(f"task3: {task3.done()}") time.sleep(2.5) print(f"task1: {task1.done()}") print(f"task2: {task2.done()}") print(f"task3: {task3.done()}") print(task1.result()) # 通過result來獲取返回值 
執行結果如下:
task1: False task2: False task3: False crawl task1 finished crawl task2 finished task1: True task2: True task3: False 1 crawl task3 finished 
  1. 使用 with 語句 ,通過 ThreadPoolExecutor 構造實例,同時傳入 max_workers 參數來設置線程池中最多能同時運行的線程數目。

  2. 使用 submit 函數來提交線程需要執行的任務到線程池中,並返回該任務的句柄(類似於文件、畫圖),注意 submit() 不是阻塞的,而是立即返回。

  3. 通過使用 done() 方法判斷該任務是否結束。上面的例子可以看出,提交任務后立即判斷任務狀態,顯示四個任務都未完成。在延時2.5后,task1 和 task2 執行完畢,task3 仍在執行中。

  4. 使用 result() 方法可以獲取任務的返回值。

✨ 主要方法:

wait

 wait(fs, timeout=None, return_when=ALL_COMPLETED) 

wait 接受三個參數:
fs: 表示需要執行的序列
timeout: 等待的最大時間,如果超過這個時間即使線程未執行完成也將返回
return_when:表示wait返回結果的條件,默認為 ALL_COMPLETED 全部執行完成再返回

還是用上面那個例子來熟悉用法
示例:

from concurrent.futures import ThreadPoolExecutor, wait, FIRST_COMPLETED, ALL_COMPLETED import time def spider(page): time.sleep(page) print(f"crawl task{page} finished") return page with ThreadPoolExecutor(max_workers=5) as t: all_task = [t.submit(spider, page) for page in range(1, 5)] wait(all_task, return_when=FIRST_COMPLETED) print('finished') print(wait(all_task, timeout=2.5)) # 運行結果 crawl task1 finished finished crawl task2 finished crawl task3 finished DoneAndNotDoneFutures(done={<Future at 0x28c8710 state=finished returned int>, <Future at 0x2c2bfd0 state=finished returned int>, <Future at 0x2c1b7f0 state=finished returned int>}, not_done={<Future at 0x2c3a240 state=running>}) crawl task4 finished 
  1. 代碼中返回的條件是:當完成第一個任務的時候,就停止等待,繼續主線程任務
  2. 由於設置了延時, 可以看到最后只有 task4 還在運行中

as_completed

上面雖然提供了判斷任務是否結束的方法,但是不能在主線程中一直判斷啊。最好的方法是當某個任務結束了,就給主線程返回結果,而不是一直判斷每個任務是否結束。

ThreadPoolExecutorThreadPoolExecutor 中 的 as_completed() 就是這樣一個方法,當子線程中的任務執行完后,直接用 result() 獲取返回結果

用法如下:

# coding: utf-8 from concurrent.futures import ThreadPoolExecutor, as_completed import time def spider(page): time.sleep(page) print(f"crawl task{page} finished") return page def main(): with ThreadPoolExecutor(max_workers=5) as t: obj_list = [] for page in range(1, 5): obj = t.submit(spider, page) obj_list.append(obj) for future in as_completed(obj_list): data = future.result() print(f"main: {data}") # 執行結果 crawl task1 finished main: 1 crawl task2 finished main: 2 crawl task3 finished main: 3 crawl task4 finished main: 4 

as_completed() 方法是一個生成器,在沒有任務完成的時候,會一直阻塞,除非設置了 timeout。

當有某個任務完成的時候,會 yield 這個任務,就能執行 for 循環下面的語句,然后繼續阻塞住,循環到所有的任務結束。同時,先完成的任務會先返回給主線程。

map

map(fn, *iterables, timeout=None) 

fn: 第一個參數 fn 是需要線程執行的函數;
iterables:第二個參數接受一個可迭代對象;
timeout: 第三個參數 timeout 跟 wait() 的 timeout 一樣,但由於 map 是返回線程執行的結果,如果 timeout小於線程執行時間會拋異常 TimeoutError。

用法如下:

import time from concurrent.futures import ThreadPoolExecutor def spider(page): time.sleep(page) return page start = time.time() executor = ThreadPoolExecutor(max_workers=4) i = 1 for result in executor.map(spider, [2, 3, 1, 4]): print("task{}:{}".format(i, result)) i += 1 # 運行結果 task1:2 task2:3 task3:1 task4:4 

使用 map 方法,無需提前使用 submit 方法,map 方法與 python 高階函數 map 的含義相同,都是將序列中的每個元素都執行同一個函數。

上面的代碼對列表中的每個元素都執行 spider() 函數,並分配各線程池。

可以看到執行結果與上面的 as_completed() 方法的結果不同,輸出順序和列表的順序相同,就算 1s 的任務先執行完成,也會先打印前面提交的任務返回的結果。

✨ 實戰

以某網站為例,演示線程池和單線程兩種方式爬取的差異

# coding: utf-8 import requests from concurrent.futures import ThreadPoolExecutor, as_completed import time import json from requests import adapters from proxy import get_proxies headers = { "Host": "splcgk.court.gov.cn", "Origin": "https://splcgk.court.gov.cn", "User-Agent": "Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/71.0.3578.98 Safari/537.36", "Referer": "https://splcgk.court.gov.cn/gzfwww/ktgg", } url = "https://splcgk.court.gov.cn/gzfwww/ktgglist?pageNo=1" def spider(page): data = { "bt": "", "fydw": "", "pageNum": page, } for _ in range(5): try: response = requests.post(url, headers=headers, data=data, proxies=get_proxies()) json_data = response.json() except (json.JSONDecodeError, adapters.SSLError): continue else: break else: return {} return json_data def main(): with ThreadPoolExecutor(max_workers=8) as t: obj_list = [] begin = time.time() for page in range(1, 15): obj = t.submit(spider, page) obj_list.append(obj) for future in as_completed(obj_list): data = future.result() print(data) print('*' * 50) times = time.time() - begin print(times) if __name__ == "__main__": main() 

運行結果如下:

 
多線程

可以看到,14 頁只花了 2 秒鍾就爬完了

下面我們可以使用單線程來爬取,代碼基本和上面的一樣,加個單線程函數
代碼如下:

def single(): begin = time.time() for page in range(1, 15): data = spider(page) print(data) print('*' * 50) times = time.time() - begin print(times) if __name__ == "__main__": single() 

運行結果:

 
單線程

可以看到,總共花了 19 秒。真是肉眼可見的差距啊!如果數據量大的話,運行時間差距會更大!



作者:GoPython
鏈接:https://www.jianshu.com/p/6d6e4f745c27
來源:簡書
著作權歸作者所有。商業轉載請聯系作者獲得授權,非商業轉載請注明出處。
 
 
 

多種方法實現 python 線程池

最近在做一個爬蟲相關的項目,單線程的整站爬蟲,耗時真的不是一般的巨大,運行一次也是心累,,,所以,要想實現整站爬蟲,多線程是不可避免的,那么python多線程又應該怎樣實現呢?這里主要要幾個問題(關於python多線程的GIL問題就不再說了,網上太多了)。

一、 既然多線程可以縮短程序運行時間,那么,是不是線程數量越多越好呢?

顯然,並不是,每一個線程的從生成到消亡也是需要時間和資源的,太多的線程會占用過多的系統資源(內存開銷,cpu開銷),而且生成太多的線程時間也是可觀的,很可能會得不償失,這里給出一個最佳線程數量的計算方式:

最佳線程數的獲取:

1、通過用戶慢慢遞增來進行性能壓測,觀察QPS(即每秒的響應請求數,也即是最大吞吐能力。),響應時間

2、根據公式計算:服務器端最佳線程數量=((線程等待時間+線程cpu時間)/線程cpu時間) * cpu數量

3、單用戶壓測,查看CPU的消耗,然后直接乘以百分比,再進行壓測,一般這個值的附近應該就是最佳線程數量。

二、為什么要使用線程池?

對於任務數量不斷增加的程序,每有一個任務就生成一個線程,最終會導致線程數量的失控,例如,整站爬蟲,假設初始只有一個鏈接a,那么,這個時候只啟動一個線程,運行之后,得到這個鏈接對應頁面上的b,c,d,,,等等新的鏈接,作為新任務,這個時候,就要為這些新的鏈接生成新的線程,線程數量暴漲。在之后的運行中,線程數量還會不停的增加,完全無法控制。所以,對於任務數量不端增加的程序,固定線程數量的線程池是必要的。

三、如何實現線程池?

這里,我分別介紹三種實現方式:

1、過去:

使用threadpool模塊,這是個python的第三方模塊,支持python2和python3,具體使用方式如下:

復制代碼
#! /usr/bin/env python
# -*- coding: utf-8 -*-

import threadpool
import time

def sayhello (a):
    print("hello: "+a)
    time.sleep(2)

def main():
    global result
    seed=["a","b","c"]
    start=time.time()
    task_pool=threadpool.ThreadPool(5)
    requests=threadpool.makeRequests(sayhello,seed)
    for req in requests:
        task_pool.putRequest(req)
    task_pool.wait()
    end=time.time()
    time_m = end-start
    print("time: "+str(time_m))
    start1=time.time()
    for each in seed:
        sayhello(each)
    end1=time.time()
    print("time1: "+str(end1-start1))

if __name__ == '__main__':
    main()
復制代碼

運行結果如下:

threadpool是一個比較老的模塊了,現在雖然還有一些人在用,但已經不再是主流了,關於python多線程,現在已經開始步入未來(future模塊)了

2、未來:

使用concurrent.futures模塊,這個模塊是python3中自帶的模塊,但是,python2.7以上版本也可以安裝使用,具體使用方式如下:

復制代碼
#! /usr/bin/env python
# -*- coding: utf-8 -*-

from concurrent.futures import ThreadPoolExecutor
import time

def sayhello(a):
    print("hello: "+a)
    time.sleep(2)

def main():
    seed=["a","b","c"]
    start1=time.time()
    for each in seed:
        sayhello(each)
    end1=time.time()
    print("time1: "+str(end1-start1))
    start2=time.time()
    with ThreadPoolExecutor(3) as executor:
        for each in seed:
            executor.submit(sayhello,each)
    end2=time.time()
    print("time2: "+str(end2-start2))
    start3=time.time()
    with ThreadPoolExecutor(3) as executor1:
        executor1.map(sayhello,seed)
    end3=time.time()
    print("time3: "+str(end3-start3))

if __name__ == '__main__':
    main()
復制代碼

運行結果如下:

注意到一點:

concurrent.futures.ThreadPoolExecutor,在提交任務的時候,有兩種方式,一種是submit()函數,另一種是map()函數,兩者的主要區別在於:

2.1、map可以保證輸出的順序, submit輸出的順序是亂的

2.2、如果你要提交的任務的函數是一樣的,就可以簡化成map。但是假如提交的任務函數是不一樣的,或者執行的過程之可能出現異常(使用map執行過程中發現問題會直接拋出錯誤)就要用到submit()

2.3、submit和map的參數是不同的,submit每次都需要提交一個目標函數和對應的參數,map只需要提交一次目標函數,目標函數的參數放在一個迭代器(列表,字典)里就可以。

3.現在?

這里要考慮一個問題,以上兩種線程池的實現都是封裝好的,任務只能在線程池初始化的時候添加一次,那么,假設我現在有這樣一個需求,需要在線程池運行時,再往里面添加新的任務(注意,是新任務,不是新線程),那么要怎么辦?

其實有兩種方式:

3.1、重寫threadpool或者future的函數:

這個方法需要閱讀源模塊的源碼,必須搞清楚源模塊線程池的實現機制才能正確的根據自己的需要重寫其中的方法。

3.2、自己構建一個線程池:

這個方法就需要對線程池的有一個清晰的了解了,附上我自己構建的一個線程池:

復制代碼
#! /usr/bin/env python
# -*- coding: utf-8 -*-

import threading
import Queue
import hashlib
import logging
from utils.progress import PrintProgress
from utils.save import SaveToSqlite


class ThreadPool(object):
    def __init__(self, thread_num, args):

        self.args = args
        self.work_queue = Queue.Queue()
        self.save_queue = Queue.Queue()
        self.threads = []
        self.running = 0
        self.failure = 0
        self.success = 0
        self.tasks = {}
        self.thread_name = threading.current_thread().getName()
        self.__init_thread_pool(thread_num)

    # 線程池初始化
    def __init_thread_pool(self, thread_num):
        # 下載線程
        for i in range(thread_num):
            self.threads.append(WorkThread(self))
        # 打印進度信息線程
        self.threads.append(PrintProgress(self))
        # 保存線程
        self.threads.append(SaveToSqlite(self, self.args.dbfile))

    # 添加下載任務
    def add_task(self, func, url, deep):
        # 記錄任務,判斷是否已經下載過
        url_hash = hashlib.new('md5', url.encode("utf8")).hexdigest()
        if not url_hash in self.tasks:
            self.tasks[url_hash] = url
            self.work_queue.put((func, url, deep))
            logging.info("{0} add task {1}".format(self.thread_name, url.encode("utf8")))

    # 獲取下載任務
    def get_task(self):
        # 從隊列里取元素,如果block=True,則一直阻塞到有可用元素為止。
        task = self.work_queue.get(block=False)

        return task

    def task_done(self):
        # 表示隊列中的某個元素已經執行完畢。
        self.work_queue.task_done()

    # 開始任務
    def start_task(self):
        for item in self.threads:
            item.start()

        logging.debug("Work start")

    def increase_success(self):
        self.success += 1

    def increase_failure(self):
        self.failure += 1

    def increase_running(self):
        self.running += 1

    def decrease_running(self):
        self.running -= 1

    def get_running(self):
        return self.running

    # 打印執行信息
    def get_progress_info(self):
        progress_info = {}
        progress_info['work_queue_number'] = self.work_queue.qsize()
        progress_info['tasks_number'] = len(self.tasks)
        progress_info['save_queue_number'] = self.save_queue.qsize()
        progress_info['success'] = self.success
        progress_info['failure'] = self.failure

        return progress_info

    def add_save_task(self, url, html):
        self.save_queue.put((url, html))

    def get_save_task(self):
        save_task = self.save_queue.get(block=False)

        return save_task

    def wait_all_complete(self):
        for item in self.threads:
            if item.isAlive():
                # join函數的意義,只有當前執行join函數的線程結束,程序才能接着執行下去
                item.join()

# WorkThread 繼承自threading.Thread
class WorkThread(threading.Thread):
    # 這里的thread_pool就是上面的ThreadPool類
    def __init__(self, thread_pool):
        threading.Thread.__init__(self)
        self.thread_pool = thread_pool

    #定義線程功能方法,即,當thread_1,...,thread_n,調用start()之后,執行的操作。
    def run(self):
        print (threading.current_thread().getName())
        while True:
            try:
                # get_task()獲取從工作隊列里獲取當前正在下載的線程,格式為func,url,deep
                do, url, deep = self.thread_pool.get_task()
                self.thread_pool.increase_running()

                # 判斷deep,是否獲取新的鏈接
                flag_get_new_link = True
                if deep >= self.thread_pool.args.deep:
                    flag_get_new_link = False

                # 此處do為工作隊列傳過來的func,返回值為一個頁面內容和這個頁面上所有的新鏈接
                html, new_link = do(url, self.thread_pool.args, flag_get_new_link)

                if html == '':
                    self.thread_pool.increase_failure()
                else:
                    self.thread_pool.increase_success()
                    # html添加到待保存隊列
                    self.thread_pool.add_save_task(url, html)

                # 添加新任務,即,將新頁面上的不重復的鏈接加入工作隊列。
                if new_link:
                    for url in new_link:
                        self.thread_pool.add_task(do, url, deep + 1)

                self.thread_pool.decrease_running()
                # self.thread_pool.task_done()
            except Queue.Empty:
                if self.thread_pool.get_running() <= 0:
                    break
            except Exception, e:
                self.thread_pool.decrease_running()
                # print str(e)
                break
復制代碼

 

 
分類:  python
 
 

python線程池(threadpool)

 

一、安裝

pip install threadpool   

二、使用介紹

(1)引入threadpool模塊

(2)定義線程函數   

(3)創建線程 池threadpool.ThreadPool()   

(4)創建需要線程池處理的任務即threadpool.makeRequests()   

(5)將創建的多個任務put到線程池中,threadpool.putRequest   

(6)等到所有任務處理完畢theadpool.pool()

復制代碼
import threadpool  
def ThreadFun(arg1,arg2):  
    pass  
def main():  
    device_list=[object1,object2,object3......,objectn]#需要處理的設備個數  
    task_pool=threadpool.ThreadPool(8)#8是線程池中線程的個數  
    request_list=[]#存放任務列表  
    #首先構造任務列表  
    for device in device_list:  
        request_list.append(threadpool.makeRequests(ThreadFun,[((device, ), {})]))  
    #將每個任務放到線程池中,等待線程池中線程各自讀取任務,然后進行處理,使用了map函數,不了解的可以去了解一下。  
    map(task_pool.putRequest,request_list)  
    #等待所有任務處理完成,則返回,如果沒有處理完,則一直阻塞  
    task_pool.poll()  
if __name__=="__main__":  
    main()  
復制代碼

說明:makeRequests存放的是要開啟多線程的函數,以及函數相關參數和回調函數,其中回調函數可以不寫(默認是無),也就是說makeRequests只需要2個參數就可以運行。

 

二、代碼實例

復制代碼
 
import time
def sayhello(str): print "Hello ",str time.sleep(2) name_list =['xiaozi','aa','bb','cc']
start_time = time.time() for i in range(len(name_list)): sayhello(name_list[i]) print '%d second'% (time.time()-start_time)
 
復制代碼

改用線程池代碼,花費時間更少,更效率

復制代碼
 
import time
import threadpool  
def sayhello(str):
    print "Hello ",str
    time.sleep(2)

name_list =['xiaozi','aa','bb','cc']
start_time = time.time()
pool = threadpool.ThreadPool(10) 
requests = threadpool.makeRequests(sayhello, name_list) 
[pool.putRequest(req) for req in requests] 
pool.wait() 
print '%d second'% (time.time()-start_time)
 
復制代碼

 

 當函數有多個參數的情況,函數調用時第一個解包list,第二個解包dict,所以可以這樣:

復制代碼
 
def hello(m, n, o):
    """"""
    print "m = %s, n = %s, o = %s"%(m, n, o)
     
 
if __name__ == '__main__':
     
   # 方法1  
    lst_vars_1 = ['1', '2', '3']
    lst_vars_2 = ['4', '5', '6']
    func_var = [(lst_vars_1, None), (lst_vars_2, None)]
    # 方法2
    dict_vars_1 = {'m':'1', 'n':'2', 'o':'3'}
    dict_vars_2 = {'m':'4', 'n':'5', 'o':'6'}
    func_var = [(None, dict_vars_1), (None, dict_vars_2)]    
     
    pool = threadpool.ThreadPool(2)
    requests = threadpool.makeRequests(hello, func_var)
    [pool.putRequest(req) for req in requests]
    pool.wait()        
 
復制代碼

 需要把所傳入的參數進行轉換,然后帶人線程池。

復制代碼
 
def getuserdic():
    username_list=['xiaozi','administrator']
    password_list=['root','','abc123!','123456','password','root']
    userlist = []
    
    for username in username_list:
        
        user =username.rstrip()
        for password in password_list:
            pwd = password.rstrip()
            userdic ={}
            userdic['user']=user
            userdic['pwd'] = pwd
            tmp=(None,userdic)
            userlist.append(tmp)
    return userlist
 
復制代碼

 

擴展閱讀:

Python 多進程 multiprocessing.Pool類詳解

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM