python多線程/進程文件讀取


如果要獲取數據並分析,例如用for循環,那只能按順序讀取,這樣就會造成效率低下:

循環讀取多文件過慢,本文分別使用多線程、多進程方法對文件進行讀取

多線程

由於處理完文件往往需要獲取返回值,可以使用以下兩種方法:

import queue
q = queue.Queue()
def read_file(file):
    with open(os.path.join(path,file),'r') as f:
        q.put()

1 自定義get_result()方法,取返回值

import threading
class ReadThread(threading.Thread):
    def __init__(self,file):
        threading.Thread.__init__(self) #super(ReadThread, self).__init__()
        self.file = file
        
    def run(self):
        self.res = read_file(self.file)
        
    def get_result(self):
        #注:此方法特別慢
        return self.res

threads = []
for file in os.listdir(path):
	t = ReadThread(file)
    threads.append(t)
    
[t.start() for t in threads]
[t.join() for t in threads]
for t in threads:
	t.get_result()

2 使用隊列

#改用多線程讀取
import threading

start = time.time()
df = pd.DataFrame()

threads = []
for file in os.listdir(path):
    t = threading.Thread(target=read_file,args=(file,))
    threads.append(t)
    
[t.start() for t in threads]
[t.join() for t in threads]

while not q.empty():
    q.get()
    q.task_done()
    
print("read time:",time.time()-start)

關於task_done

如果線程里每從隊列里取一次,但沒有執行task_done(),則join無法判斷隊列到底有沒有結束,在最后執行個join()是等不到結果的,會一直掛起。

理解Queue隊列中join()與task_done()的關系 - andyzhang- - 博客園 (cnblogs.com)

python偽多線程,適合IO密集型任務

如果是一個計算為主的程序(專業一點稱為CPU密集型程序),這一點確實是比較吃虧的,每個線程運行一遍,就相當於單線程再跑,甚至比單線程還要慢——CPU切換線程的上下文也是要有開銷的。但是,如果是一個磁盤或網絡為主的程序(IO密集型)就不同了。一個線程處在IO等待的時候,另一個線程還可以在CPU里面跑,有時候CPU閑着沒事干,所有的線程都在等着IO,這時候他們就是同時的了,而單線程的話此時還是在一個一個等待的。我們都知道IO的速度比起CPU來是慢到令人發指的,python的多線程就在這時候發揮作用了。比方說多線程網絡傳輸,多線程往不同的目錄寫文件,等等

python 實現多線程並返回函數返回值的三種方法_zxc的博客-CSDN博客_python多線程函數返回值

線程池

最佳線程數目 = ((線程等待時間+線程CPU時間)/線程CPU時間 )* CPU數目

多線程線程數設置多少合適_東東的專欄-CSDN博客_線程數設置多少合適

本文阻塞、非阻塞主要指取結果時是否使用回調函數,回調函數可以避免阻塞

線程池 +阻塞

def read_file(file):
    with open(os.path.join(path,file),'r') as f:
    	data = json.load(f)
        return data

#線程池,取結果時會阻塞
from concurrent.futures import ThreadPoolExecutor,as_completed

df = pd.DataFrame()

start_time = time.time()
with ThreadPoolExecutor(20) as executor: # 創建 ThreadPoolExecutor 
    future_list = [executor.submit(read_file, file) for file in os.listdir(path)] # 提交任務

 
for future in as_completed(future_list):
    result = future.result() # 獲取任務結果
    df = df.append(result,ignore_index=True)

print(time.time()-start_time)

線程池 +非阻塞

#線程池,非阻塞
from concurrent.futures import ThreadPoolExecutor,as_completed

df = pd.DataFrame()

start_time = time.time()
with ThreadPoolExecutor(20) as executor: # 創建 ThreadPoolExecutor 
    future_list = [executor.submit(read_file, file) for file in os.listdir(path)] # 提交任務


def get_result(future):
    global df
    df = df.append(future.result(),ignore_index=True)

    
for future in as_completed(future_list):
    future.add_done_callback(get_result)

print(time.time()-start_time)

進程池

進程數一般設置為 核數-1

進程池 +阻塞

from concurrent.futures import ThreadPoolExecutor,as_completed

net_df = pd.DataFrame()

start_time = time.time()
# 提交任務
with ThreadPoolExecutor(20) as executor: # 創建 ThreadPoolExecutor 
    future_list = [executor.submit(read_file, file) for file in os.listdir(path)] 

    
for future in as_completed(future_list):
	result = future.result() # 獲取任務結果
	net_df = net_df.append(result,ignore_index=True)

print(time.time()-start_time)

進程池 +非阻塞

#進程池,非阻塞獲取結果
from concurrent.futures import ProcessPoolExecutor,as_completed

df = pd.DataFrame()

start_time = time.time()
# 提交任務
with ProcessPoolExecutor(7) as executor: # 創建 ThreadPoolExecutor 
    future_list = [executor.submit(read_file, file) for file in os.listdir(path)] 

def get_result(future):
    global df
    df = df.append(future.result(),ignore_index=True)

    
for future in as_completed(future_list):
    future.add_done_callback(get_result)

print(time.time()-start_time)

Python線程池及其原理和使用(超級詳細) (biancheng.net)

[第53天: Python 線程池 - 純潔的微笑 - 博客園 (cnblogs.com)](https://www.cnblogs.com/ityouknow/p/12993166.html#:~:text=在 python 中使用線程池有兩種方式,一種是基於第三方庫 threadpool,,另一種是基於 python3 新引入的庫 concurrent.futures.ThreadPoolExecutor 。)

Python concurrent.future線程池和進程池_Gordennizaicunzai的博客-CSDN博客


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM