背景:
(多線程執行同一個函數任務)某個應用場景需要從數據庫中取出幾十萬的數據時,需要對每個數據進行相應的操作。逐個數據處理過慢,於是考慮對數據進行分段線程處理:
-
方法一:使用threading模塊
代碼:
1 # -*- coding: utf-8 -*- 2 import math 3 import random 4 import time 5 from threading import Thread 6 7 _result_list = [] 8 9 10 def split_df(): 11 # 線程列表 12 thread_list = [] 13 # 需要處理的數據 14 _l = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] 15 # 每個線程處理的數據大小 16 split_count = 2 17 # 需要的線程個數 18 times = math.ceil(len(_l) / split_count) 19 count = 0 20 for item in range(times): 21 _list = _l[count: count + split_count] 22 # 線程相關處理 23 thread = Thread(target=work, args=(item, _list,)) 24 thread_list.append(thread) 25 # 在子線程中運行任務 26 thread.start() 27 count += split_count 28 29 # 線程同步,等待子線程結束任務,主線程再結束 30 for _item in thread_list: 31 _item.join() 32 33 34 def work(df, _list): 35 """ 36 每個線程執行的任務,讓程序隨機sleep幾秒 37 :param df: 38 :param _list: 39 :return: 40 """ 41 sleep_time = random.randint(1, 5) 42 print(f'count is {df},sleep {sleep_time},list is {_list}') 43 time.sleep(sleep_time) 44 _result_list.append(df) 45 46 47 if __name__ == '__main__': 48 split_df() 49 print(len(_result_list), _result_list)
測試結果:
-
方法二:使用ThreadPoolExecutor.map
代碼:
1 # -*- coding: utf-8 -*- 2 import math 3 import random 4 import time 5 from concurrent.futures import ThreadPoolExecutor 6 7 8 def split_list(): 9 # 線程列表 10 new_list = [] 11 count_list = [] 12 # 需要處理的數據 13 _l = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] 14 # 每個線程處理的數據大小 15 split_count = 2 16 # 需要的線程個數 17 times = math.ceil(len(_l) / split_count) 18 count = 0 19 for item in range(times): 20 _list = _l[count: count + split_count] 21 new_list.append(_list) 22 count_list.append(count) 23 count += split_count 24 return new_list, count_list 25 26 27 def work(df, _list): 28 """ 線程執行的任務,讓程序隨機sleep幾秒 29 :param df: 30 :param _list: 31 :return: 32 """ 33 sleep_time = random.randint(1, 5) 34 print(f'count is {df},sleep {sleep_time},list is {_list}') 35 time.sleep(sleep_time) 36 return sleep_time, df, _list 37 38 39 def use(): 40 new_list, count_list = split_list() 41 with ThreadPoolExecutor(max_workers=len(count_list)) as t: 42 results = t.map(work, new_list, count_list) 43 44 # 或執行如下兩行代碼 45 # pool = ThreadPoolExecutor(max_workers=5) 46 # 使用map的優點是 每次調用回調函數的結果不用手動的放入結果list中 47 # results = pool.map(work, new_list, count_list) 48 49 # map返回一個迭代器,其中的回調函數的參數 最好是可以迭代的數據類型,如list;如果有 多個參數 則 多個參數的 數據長度相同; 50 # 如: pool.map(work,[[1,2],[3,4]],[0,1]]) 中 [1,2]對應0 ;[3,4]對應1 ;其實內部執行的函數為 work([1,2],0) ; work([3,4],1) 51 # map返回的結果 是 有序結果;是根據迭代函數執行順序返回的結果 52 print(type(results)) 53 # 如下2行 會等待線程任務執行結束后 再執行其他代碼 54 for ret in results: 55 print(ret) 56 print('thread execute end!') 57 58 59 if __name__ == '__main__': 60 use()
測試結果:
參考鏈接:https://www.cnblogs.com/rgcLOVEyaya/p/RGC_LOVE_YAYA_1103_3days.html