使用 joblib 對 Pandas 數據進行並行處理


使用 joblib 對 Pandas 數據進行並行處理

如果需要對一個很大的數據集進行操作,而基於一列數據生成新的一列數據可能都需要耗費很長時間。

於是可以使用 joblib 進行並行處理。

假設我們有一個 dataframe 變量 data,要基於它的 source 列生成新的一列 double,其實就是把原來的 source 列做了個平方運算。感覺就這個簡單的運算,應該有更簡單的方法,在這里只是舉個例子,我們使用 apply 方法並行實現。

如果直接使用 apply 那么直接如下實現

import pandas as pd

def double_func(data):
    return pow(data,2)

data["double"] = data["source"].apply(double_func)

使用並行實現如下

import pandas as pd
from joblib import Parallel, delayed

def double_func(data):
    return pow(data,2)

def key_func(subset):
    subset["double"] = subset["source"].apply(double_func)

data_grouped = data.groupby(data.index)
results = Parallel(n_jobs=8)(delayed(key_func)(group) for name, group in data_grouped)
data = pd.concat(results)

基本原理就是把整個 dataframe 根據 index,每行生成了一個子數據集,而把每個子數據集作為子任務使用多進程運行,最終生成 results 是多進程運行生成的結果的 list,使用 concat 重新組合就是我們最終想要的結果了。

n_jobs 參數就是需要使用幾個進程池來運行程序。貌似一般 CPU 是幾核的用幾個進程會比較好?

其實速度並不是成倍減少的,具體原因我也……不太好講清,但是還是可以很大幅度提升運行速度的。


順便一提,如果數據集很大,程序一跑起來,根本不知道它跑得怎么樣了,還是說卡死了。

注意到,我們生成的 data_grouped 是一個可迭代的對象,那么就可以使用 tqdm 來可視化進度條

如果在 jupyter 里面使用的話,代碼可以是下面這樣

import pandas as pd
from joblib import Parallel, delayed
from tqdm import tqdm, tqdm_notebook

tqdm_notebook().pandas()

def double_func(data):
    return pow(data,2)

def key_func(subset):
    subset["double"] = subset["source"].apply(double_func)

data_grouped = data.groupby(data.index)
results = Parallel(n_jobs=8)(delayed(key_func)(group) for name, group in tqdm(data_grouped))
data = pd.concat(results)

友情提示,在我自己使用的時候遇到 bug ,提示無法從 Pandas 導入 PanelGroupby 的錯誤。查了許久才發現,是新版 Pandas 刪除了PanelGroupby 這個模塊。解決辦法其實就是……升級 tqdm,在最新版已經修復了這個 bug 了。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM