在Python中使用多進程快速處理數據


轉自:https://blog.csdn.net/bryan__/article/details/78786648

 

數據分片:可以將數據分片處理的任務適合用多進程代碼處理,核心思路是將data分片,對每一片數據處理返回結果(可能是無序的),然后合並。應用場景:多進程爬蟲,類mapreduce任務。缺點是子進程會拷貝父進程所有狀態,內存浪費嚴重。

import math
from multiprocessing import Pool

def run(data, index, size):  # data 傳入數據,index 數據分片索引,size進程數
    size = math.ceil(len(data) / size)
    start = size * index
    end = (index + 1) * size if (index + 1) * size < len(data) else len(data)
    temp_data = data[start:end]
    # do something
    return data  # 可以返回數據,在后面收集起來

processor = 40
res = []
p = Pool(processor)
for i in range(processor):
    res.append(p.apply_async(run, args=(data, i, processor,)))
    print(str(i) + ' processor started !')
p.close()
p.join()
for i in res:
    print(i.get())  # 使用get獲得多進程處理的結果

 

 

分文件處理:當內存受限時,不能再繼續使用數據分片,因為子進程會拷貝父進程的所有狀態,導致內存的浪費。這時候可以考慮先把大文件分片保存到磁盤,然后del 釋放掉數據,接着在多進程處理的函數里面分別讀取,這樣子進程就會分別讀取需要處理的數據,而不會占用大量內存。

 

from multiprocessing import Pool
import pandas as pd
import math
data=pd.DataFrame({'user_id':[1,2,3,4],'item_id':[6,7,8,9]})
users=pd.DataFrame(data['user_id'].unique(),columns=['user_id'])
processor=4
p=Pool(processor)
l_data = len(users)
size = math.ceil(l_data / processor)
res = []
def run(i):
    data=pd.read_csv('../data/user_'+str(i)+'.csv')
    #todo
return data

for i in range(processor):
    start = size * i
    end = (i + 1) * size if (i + 1) * size < l_data else l_data
    user = users[start:end]
    t_data = pd.merge(data, user, on='user_id').reset_index(drop=True)
    t_data.to_csv('../data/user_'+str(i)+'.csv',index=False)
    print(len(t_data))

del data,l_data,users
for i in range(processor):
    res.append(p.apply_async(run, args=(i,)))
    print(str(i) + ' processor started !')
p.close()
p.join()
data = pd.concat([i.get() for i in res])

  

 

多進程數據共享:當需要修改共享的數據時,那么這個時候可以使用數據共享:

from multiprocessing import Process, Manager
# 每個子進程執行的函數
# 參數中,傳遞了一個用於多進程之間數據共享的特殊字典
def func(i, d):
    d[i] = i + 100
    print(d.values())
# 在主進程中創建特殊字典
m = Manager()
d = m.dict()
for i in range(5):
    # 讓子進程去修改主進程的特殊字典
    p = Process(target=func, args=(i, d))
    p.start()
p.join()
------------
[100]
[100, 101]
[100, 101, 102, 103]
[100, 101, 102, 103]
[100, 101, 102, 103, 104]

  


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM