需求:在從銀行數據庫中取出 幾十萬數據時,需要對 每行數據進行相關操作,通過pandas的dataframe發現數據處理過慢,於是 對數據進行 分段后 通過 線程進行處理;
如下給出 測試版代碼,通過 list 分段模擬 pandas 的 dataframe ;
1.使用 threading模塊
1 # -*- coding: utf-8 -*- 2 # (C) Guangcai Ren <renguangcai@jiaaocap.com> 3 # All rights reserved 4 # create time '2019/6/26 14:41' 5 import math 6 import random 7 import time 8 from threading import Thread 9 10 _result_list = [] 11 12 13 def split_df(): 14 # 線程列表 15 thread_list = [] 16 # 需要處理的數據 17 _l = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] 18 # 每個線程處理的數據大小 19 split_count = 2 20 # 需要的線程個數 21 times = math.ceil(len(_l) / split_count) 22 count = 0 23 for item in range(times): 24 _list = _l[count: count + split_count] 25 # 線程相關處理 26 thread = Thread(target=work, args=(item, _list,)) 27 thread_list.append(thread) 28 # 在子線程中運行任務 29 thread.start() 30 count += split_count 31 32 # 線程同步,等待子線程結束任務,主線程再結束 33 for _item in thread_list: 34 _item.join() 35 36 37 def work(df, _list): 38 """ 線程執行的任務,讓程序隨機sleep幾秒 39 40 :param df: 41 :param _list: 42 :return: 43 """ 44 sleep_time = random.randint(1, 5) 45 print(f'count is {df},sleep {sleep_time},list is {_list}') 46 time.sleep(sleep_time) 47 _result_list.append(df) 48 49 50 def use(): 51 split_df() 52 53 54 if __name__ == '__main__': 55 y = use() 56 print(len(_result_list), _result_list)
響應結果如下:
注意點:
腳本中的 _result_list 在項目中 要 放在 函數中,不能直接放在 路由類中,否則會造成 多次請求 數據 污染;
定義線程任務時 thread = Thread(target=work, args=(item, _list,)) 代碼中的 work函數 和 參數 要分開,否則 多線程無效
注意線程數不能過多
2.使用ThreadPoolExecutor.map
# -*- coding: utf-8 -*- # (C) Guangcai Ren <renguangcai@jiaaocap.com> # All rights reserved # create time '2019/6/26 14:41' import math import random import time from concurrent.futures import ThreadPoolExecutor def split_list(): # 線程列表 new_list = [] count_list = [] # 需要處理的數據 _l = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] # 每個線程處理的數據大小 split_count = 2 # 需要的線程個數 times = math.ceil(len(_l) / split_count) count = 0 for item in range(times): _list = _l[count: count + split_count] new_list.append(_list) count_list.append(count) count += split_count return new_list, count_list def work(df, _list): """ 線程執行的任務,讓程序隨機sleep幾秒 :param df: :param _list: :return: """ sleep_time = random.randint(1, 5) print(f'count is {df},sleep {sleep_time},list is {_list}') time.sleep(sleep_time) return sleep_time, df, _list def use(): pool = ThreadPoolExecutor(max_workers=5) new_list, count_list = split_list() # map返回一個迭代器,其中的回調函數的參數 最好是可以迭代的數據類型,如list;如果有 多個參數 則 多個參數的 數據長度相同; # 如: pool.map(work,[[1,2],[3,4]],[0,1]]) 中 [1,2]對應0 ;[3,4]對應1 ;其實內部執行的函數為 work([1,2],0) ; work([3,4],1) # map返回的結果 是 有序結果;是根據迭代函數執行順序返回的結果 # 使用map的優點是 每次調用回調函數的結果不用手動的放入結果list中 results = pool.map(work, new_list, count_list) print(type(results)) # 如下2行 會等待線程任務執行結束后 再執行其他代碼 for ret in results: print(ret) print('thread execute end!') if __name__ == '__main__': use()
響應為:
3.使用 ThreadPoolExecutor.submit
1 # -*- coding: utf-8 -*- 2 # (C) Guangcai Ren <renguangcai@jiaaocap.com> 3 # All rights reserved 4 # create time '2019/6/26 14:41' 5 import math 6 import random 7 import time 8 from concurrent.futures import ThreadPoolExecutor 9 10 # 線程池list 11 pool_list = [] 12 13 14 def split_df(pool): 15 # 需要處理的數據 16 _l = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] 17 # 每個線程處理的數據大小 18 split_count = 2 19 # 需要的線程個數 20 times = math.ceil(len(_l) / split_count) 21 count = 0 22 for item in range(times): 23 _list = _l[count: count + split_count] 24 # 線程相關處理 25 # submit方法提交可回調的函數,並返回一個future實例;future對象包含相關屬性 26 # 如: done(函數是否執行完成),result(函數執行結果),running(函數是否正在運行) 27 # 從而 可以在submit 后的代碼中 查看 相關任務運行情況 28 # 此方法 執行數據的結果是無序的,如果需要得到有序的結果,需要 for循環 每個future實例(線程池),如 此腳本代碼 29 f = pool.submit(work, item, _list) 30 pool_list.append(f) 31 count += split_count 32 33 34 def work(df, _list): 35 """ 線程執行的任務,讓程序隨機sleep幾秒 36 37 :param df: 38 :param _list: 39 :return: 40 """ 41 sleep_time = random.randint(1, 5) 42 print(f'count is {df},sleep {sleep_time},list is {_list}') 43 time.sleep(sleep_time) 44 return sleep_time, df, _list 45 46 47 def use(): 48 pool = ThreadPoolExecutor(max_workers=5) 49 split_df(pool) 50 _result_list = [] 51 for item in pool_list: 52 result_tuple = item.result() 53 _result_list.append(result_tuple[1]) 54 return _result_list 55 56 57 if __name__ == '__main__': 58 _result_list = use() 59 print(len(_result_list), _result_list)
結果如下:
個人比較喜歡使用 第二中方法,代碼寫的少,返回的是有序結果,回調函數結果自動管理在generator中,直接for循環 map的結果即可;不用擔心在 項目中多次請求數據污染問題
相關連接: