如果要獲取數據並分析,例如用for循環,那只能按順序讀取,這樣就會造成效率低下:
循環讀取多文件過慢,本文分別使用多線程、多進程方法對文件進行讀取
多線程
由於處理完文件往往需要獲取返回值,可以使用以下兩種方法:
import queue
q = queue.Queue()
def read_file(file):
with open(os.path.join(path,file),'r') as f:
q.put()
1 自定義get_result()方法,取返回值
import threading
class ReadThread(threading.Thread):
def __init__(self,file):
threading.Thread.__init__(self) #super(ReadThread, self).__init__()
self.file = file
def run(self):
self.res = read_file(self.file)
def get_result(self):
#注:此方法特別慢
return self.res
threads = []
for file in os.listdir(path):
t = ReadThread(file)
threads.append(t)
[t.start() for t in threads]
[t.join() for t in threads]
for t in threads:
t.get_result()
2 使用隊列
#改用多線程讀取
import threading
start = time.time()
df = pd.DataFrame()
threads = []
for file in os.listdir(path):
t = threading.Thread(target=read_file,args=(file,))
threads.append(t)
[t.start() for t in threads]
[t.join() for t in threads]
while not q.empty():
q.get()
q.task_done()
print("read time:",time.time()-start)
關於task_done
如果線程里每從隊列里取一次,但沒有執行task_done(),則join無法判斷隊列到底有沒有結束,在最后執行個join()是等不到結果的,會一直掛起。
理解Queue隊列中join()與task_done()的關系 - andyzhang- - 博客園 (cnblogs.com)
python偽多線程,適合IO密集型任務
如果是一個計算為主的程序(專業一點稱為CPU密集型程序),這一點確實是比較吃虧的,每個線程運行一遍,就相當於單線程再跑,甚至比單線程還要慢——CPU切換線程的上下文也是要有開銷的。但是,如果是一個磁盤或網絡為主的程序(IO密集型)就不同了。一個線程處在IO等待的時候,另一個線程還可以在CPU里面跑,有時候CPU閑着沒事干,所有的線程都在等着IO,這時候他們就是同時的了,而單線程的話此時還是在一個一個等待的。我們都知道IO的速度比起CPU來是慢到令人發指的,python的多線程就在這時候發揮作用了。比方說多線程網絡傳輸,多線程往不同的目錄寫文件,等等
python 實現多線程並返回函數返回值的三種方法_zxc的博客-CSDN博客_python多線程函數返回值
線程池
最佳線程數目 = ((線程等待時間+線程CPU時間)/線程CPU時間 )* CPU數目
多線程線程數設置多少合適_東東的專欄-CSDN博客_線程數設置多少合適
本文阻塞、非阻塞主要指取結果時是否使用回調函數,回調函數可以避免阻塞
線程池 +阻塞
def read_file(file):
with open(os.path.join(path,file),'r') as f:
data = json.load(f)
return data
#線程池,取結果時會阻塞
from concurrent.futures import ThreadPoolExecutor,as_completed
df = pd.DataFrame()
start_time = time.time()
with ThreadPoolExecutor(20) as executor: # 創建 ThreadPoolExecutor
future_list = [executor.submit(read_file, file) for file in os.listdir(path)] # 提交任務
for future in as_completed(future_list):
result = future.result() # 獲取任務結果
df = df.append(result,ignore_index=True)
print(time.time()-start_time)
線程池 +非阻塞
#線程池,非阻塞
from concurrent.futures import ThreadPoolExecutor,as_completed
df = pd.DataFrame()
start_time = time.time()
with ThreadPoolExecutor(20) as executor: # 創建 ThreadPoolExecutor
future_list = [executor.submit(read_file, file) for file in os.listdir(path)] # 提交任務
def get_result(future):
global df
df = df.append(future.result(),ignore_index=True)
for future in as_completed(future_list):
future.add_done_callback(get_result)
print(time.time()-start_time)
進程池
進程數一般設置為 核數-1
進程池 +阻塞
from concurrent.futures import ThreadPoolExecutor,as_completed
net_df = pd.DataFrame()
start_time = time.time()
# 提交任務
with ThreadPoolExecutor(20) as executor: # 創建 ThreadPoolExecutor
future_list = [executor.submit(read_file, file) for file in os.listdir(path)]
for future in as_completed(future_list):
result = future.result() # 獲取任務結果
net_df = net_df.append(result,ignore_index=True)
print(time.time()-start_time)
進程池 +非阻塞
#進程池,非阻塞獲取結果
from concurrent.futures import ProcessPoolExecutor,as_completed
df = pd.DataFrame()
start_time = time.time()
# 提交任務
with ProcessPoolExecutor(7) as executor: # 創建 ThreadPoolExecutor
future_list = [executor.submit(read_file, file) for file in os.listdir(path)]
def get_result(future):
global df
df = df.append(future.result(),ignore_index=True)
for future in as_completed(future_list):
future.add_done_callback(get_result)
print(time.time()-start_time)
Python線程池及其原理和使用(超級詳細) (biancheng.net)
[第53天: Python 線程池 - 純潔的微笑 - 博客園 (cnblogs.com)](https://www.cnblogs.com/ityouknow/p/12993166.html#:~:text=在 python 中使用線程池有兩種方式,一種是基於第三方庫 threadpool,,另一種是基於 python3 新引入的庫 concurrent.futures.ThreadPoolExecutor 。)