python利用(threading,ThreadPoolExecutor.map,ThreadPoolExecutor.submit) 三種多線程方式處理 list數據


 需求:在從銀行數據庫中取出 幾十萬數據時,需要對 每行數據進行相關操作,通過pandas的dataframe發現數據處理過慢,於是 對數據進行 分段后 通過 線程進行處理;

如下給出 測試版代碼,通過 list 分段模擬 pandas 的 dataframe ;

 1.使用 threading模塊

 1 # -*- coding: utf-8 -*-
 2 # (C) Guangcai Ren <renguangcai@jiaaocap.com>
 3 # All rights reserved
 4 # create time '2019/6/26 14:41'
 5 import math
 6 import random
 7 import time
 8 from threading import Thread
 9 
10 _result_list = []
11 
12 
13 def split_df():
14     # 線程列表
15     thread_list = []
16     # 需要處理的數據
17     _l = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
18     # 每個線程處理的數據大小
19     split_count = 2
20     # 需要的線程個數
21     times = math.ceil(len(_l) / split_count)
22     count = 0
23     for item in range(times):
24         _list = _l[count: count + split_count]
25         # 線程相關處理
26         thread = Thread(target=work, args=(item, _list,))
27         thread_list.append(thread)
28         # 在子線程中運行任務
29         thread.start()
30         count += split_count
31 
32     # 線程同步,等待子線程結束任務,主線程再結束
33     for _item in thread_list:
34         _item.join()
35 
36 
37 def work(df, _list):
38     """ 線程執行的任務,讓程序隨機sleep幾秒
39 
40     :param df:
41     :param _list:
42     :return:
43     """
44     sleep_time = random.randint(1, 5)
45     print(f'count is {df},sleep {sleep_time},list is {_list}')
46     time.sleep(sleep_time)
47     _result_list.append(df)
48 
49 
50 def use():
51     split_df()
52 
53 
54 if __name__ == '__main__':
55     y = use()
56     print(len(_result_list), _result_list)

 

響應結果如下:

 

注意點:

腳本中的  _result_list 在項目中 要 放在 函數中,不能直接放在 路由類中,否則會造成 多次請求 數據 污染;

定義線程任務時    thread = Thread(target=work, args=(item, _list,))     代碼中的 work函數 和 參數 要分開,否則 多線程無效

注意線程數不能過多

 

2.使用ThreadPoolExecutor.map 

 

# -*- coding: utf-8 -*-
# (C) Guangcai Ren <renguangcai@jiaaocap.com>
# All rights reserved
# create time '2019/6/26 14:41'
import math
import random
import time
from concurrent.futures import ThreadPoolExecutor


def split_list():
    # 線程列表
    new_list = []
    count_list = []
    # 需要處理的數據
    _l = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
    # 每個線程處理的數據大小
    split_count = 2
    # 需要的線程個數
    times = math.ceil(len(_l) / split_count)
    count = 0
    for item in range(times):
        _list = _l[count: count + split_count]
        new_list.append(_list)
        count_list.append(count)
        count += split_count
    return new_list, count_list


def work(df, _list):
    """ 線程執行的任務,讓程序隨機sleep幾秒

    :param df:
    :param _list:
    :return:
    """
    sleep_time = random.randint(1, 5)
    print(f'count is {df},sleep {sleep_time},list is {_list}')
    time.sleep(sleep_time)
    return sleep_time, df, _list


def use():
    pool = ThreadPoolExecutor(max_workers=5)
    new_list, count_list = split_list()
    # map返回一個迭代器,其中的回調函數的參數 最好是可以迭代的數據類型,如list;如果有 多個參數 則 多個參數的 數據長度相同;
    # 如: pool.map(work,[[1,2],[3,4]],[0,1]]) 中 [1,2]對應0 ;[3,4]對應1 ;其實內部執行的函數為 work([1,2],0) ; work([3,4],1)
    # map返回的結果 是 有序結果;是根據迭代函數執行順序返回的結果

    # 使用map的優點是 每次調用回調函數的結果不用手動的放入結果list中
    results = pool.map(work, new_list, count_list)
    print(type(results))
    # 如下2行 會等待線程任務執行結束后 再執行其他代碼
    for ret in results:
        print(ret)
    print('thread execute end!')

if __name__ == '__main__':
    use()

 

 

響應為:

 

3.使用 ThreadPoolExecutor.submit 

 1 # -*- coding: utf-8 -*-
 2 # (C) Guangcai Ren <renguangcai@jiaaocap.com>
 3 # All rights reserved
 4 # create time '2019/6/26 14:41'
 5 import math
 6 import random
 7 import time
 8 from concurrent.futures import ThreadPoolExecutor
 9 
10 # 線程池list
11 pool_list = []
12 
13 
14 def split_df(pool):
15     # 需要處理的數據
16     _l = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
17     # 每個線程處理的數據大小
18     split_count = 2
19     # 需要的線程個數
20     times = math.ceil(len(_l) / split_count)
21     count = 0
22     for item in range(times):
23         _list = _l[count: count + split_count]
24         # 線程相關處理
25         # submit方法提交可回調的函數,並返回一個future實例;future對象包含相關屬性
26         # 如: done(函數是否執行完成),result(函數執行結果),running(函數是否正在運行)
27         # 從而 可以在submit 后的代碼中 查看 相關任務運行情況
28         # 此方法 執行數據的結果是無序的,如果需要得到有序的結果,需要 for循環 每個future實例(線程池),如 此腳本代碼
29         f = pool.submit(work, item, _list)
30         pool_list.append(f)
31         count += split_count
32 
33 
34 def work(df, _list):
35     """ 線程執行的任務,讓程序隨機sleep幾秒
36 
37     :param df:
38     :param _list:
39     :return:
40     """
41     sleep_time = random.randint(1, 5)
42     print(f'count is {df},sleep {sleep_time},list is {_list}')
43     time.sleep(sleep_time)
44     return sleep_time, df, _list
45 
46 
47 def use():
48     pool = ThreadPoolExecutor(max_workers=5)
49     split_df(pool)
50     _result_list = []
51     for item in pool_list:
52         result_tuple = item.result()
53         _result_list.append(result_tuple[1])
54     return _result_list
55 
56 
57 if __name__ == '__main__':
58     _result_list = use()
59     print(len(_result_list), _result_list)

結果如下:

 

 

個人比較喜歡使用 第二中方法,代碼寫的少,返回的是有序結果,回調函數結果自動管理在generator中,直接for循環 map的結果即可;不用擔心在 項目中多次請求數據污染問題

 

 

 相關連接:

https://blog.csdn.net/dutsoft/article/details/54728706


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM