python多線程執行同一個函數任務之threading、ThreadPoolExecutor.map


背景:

(多線程執行同一個函數任務)某個應用場景需要從數據庫中取出幾十萬的數據時,需要對每個數據進行相應的操作。逐個數據處理過慢,於是考慮對數據進行分段線程處理:

  • 方法一:使用threading模塊

代碼

 1 # -*- coding: utf-8 -*-
 2 import math
 3 import random
 4 import time
 5 from threading import Thread
 6 
 7 _result_list = []
 8 
 9 
10 def split_df():
11     # 線程列表
12     thread_list = []
13     # 需要處理的數據
14     _l = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
15     # 每個線程處理的數據大小
16     split_count = 2
17     # 需要的線程個數
18     times = math.ceil(len(_l) / split_count)
19     count = 0
20     for item in range(times):
21         _list = _l[count: count + split_count]
22         # 線程相關處理
23         thread = Thread(target=work, args=(item, _list,))
24         thread_list.append(thread)
25         # 在子線程中運行任務
26         thread.start()
27         count += split_count
28 
29     # 線程同步,等待子線程結束任務,主線程再結束
30     for _item in thread_list:
31         _item.join()
32 
33 
34 def work(df, _list):
35     """
36     每個線程執行的任務,讓程序隨機sleep幾秒
37     :param df:
38     :param _list:
39     :return:
40     """
41     sleep_time = random.randint(1, 5)
42     print(f'count is {df},sleep {sleep_time},list is {_list}')
43     time.sleep(sleep_time)
44     _result_list.append(df)
45 
46 
47 if __name__ == '__main__':
48     split_df()
49     print(len(_result_list), _result_list)

測試結果

 

  • 方法二:使用ThreadPoolExecutor.map

代碼

 1 # -*- coding: utf-8 -*-
 2 import math
 3 import random
 4 import time
 5 from concurrent.futures import ThreadPoolExecutor
 6 
 7 
 8 def split_list():
 9     # 線程列表
10     new_list = []
11     count_list = []
12     # 需要處理的數據
13     _l = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
14     # 每個線程處理的數據大小
15     split_count = 2
16     # 需要的線程個數
17     times = math.ceil(len(_l) / split_count)
18     count = 0
19     for item in range(times):
20         _list = _l[count: count + split_count]
21         new_list.append(_list)
22         count_list.append(count)
23         count += split_count
24     return new_list, count_list
25 
26 
27 def work(df, _list):
28     """ 線程執行的任務,讓程序隨機sleep幾秒
29     :param df:
30     :param _list:
31     :return:
32     """
33     sleep_time = random.randint(1, 5)
34     print(f'count is {df},sleep {sleep_time},list is {_list}')
35     time.sleep(sleep_time)
36     return sleep_time, df, _list
37 
38 
39 def use():
40     new_list, count_list = split_list()
41     with ThreadPoolExecutor(max_workers=len(count_list)) as t:
42         results = t.map(work, new_list, count_list)
43 
44     # 或執行如下兩行代碼
45     # pool = ThreadPoolExecutor(max_workers=5)
46     # 使用map的優點是 每次調用回調函數的結果不用手動的放入結果list中
47     # results = pool.map(work, new_list, count_list)
48 
49     # map返回一個迭代器,其中的回調函數的參數 最好是可以迭代的數據類型,如list;如果有 多個參數 則 多個參數的 數據長度相同;
50     # 如: pool.map(work,[[1,2],[3,4]],[0,1]]) 中 [1,2]對應0 ;[3,4]對應1 ;其實內部執行的函數為 work([1,2],0) ; work([3,4],1)
51     # map返回的結果 是 有序結果;是根據迭代函數執行順序返回的結果
52     print(type(results))
53     # 如下2行 會等待線程任務執行結束后 再執行其他代碼
54     for ret in results:
55         print(ret)
56     print('thread execute end!')
57 
58 
59 if __name__ == '__main__':
60     use()

測試結果

 

參考鏈接:https://www.cnblogs.com/rgcLOVEyaya/p/RGC_LOVE_YAYA_1103_3days.html


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM