keras使用多進程


最近在工作中有一個需求:用訓練好的模型將數據庫中所有數據得出預測結果,並保存到另一張表上。數據庫中的數據是一篇篇文章,我訓練好的模型是對其中的四個段落分別分類,即我有四個模型,拿到文本后需要提取出這四個段落,並用對應模型分別預測這四個段落的類別,然后存入數據庫中。我是用keras訓練的模型,backend為tensorflow,因為數據量比較大,自然想到用多進程。在Windows上運行一點問題沒有,但是在Linux服務器上運行時發現每次都停在model.predict上不動了。

模型使用時大致如下:

# -*- coding: utf-8 -*-
import jieba
import numpy as np
import keras
import tensorflow as tf
from keras.preprocessing import sequence
from keras.models import load_model
from config import Config
import json


config_file = 'data/config.ini'
model_path = Config(config_file).get_value_str('cnn', 'model_path')
graph = tf.Graph()
with graph.as_default():
    session = tf.Session()
    with session.as_default():
        model = load_model(model_path)

graph_var = graph
session_var = session


def sentence_process(sentence):
    with open('data/words.json', encoding='utf-8') as f:
        words_json = json.load(f)
    words = words_json['words']
    word_to_id = words_json['word_to_id']
    max_length = words_json['max_length']
    segs = jieba.lcut(sentence)
    segs = filter(lambda x: len(x) >= 1, segs)
    segs = [x for x in segs if x]
    vector = []
    for seg in segs:
        if seg in words:
            vector.append(word_to_id[seg])
        else:
            vector.append(4999)
    return vector, max_length


def predict(sentence):
    vector, max_length = sentence_process(sentence)
    vector_np = np.array([vector])
    x_vector = sequence.pad_sequences(vector_np, max_length)
    with graph_var.as_default():
        with session_var.as_default():
            y = model.predict_proba(x_vector)
            if y[0][1] > 0.5:
                predict = 1
            else:
                predict = 0
    return predict
View Code

多進程使用大致如下: 

from multiprocessing import Pool
from classifaction.classify1 import predict1
from classifaction.classify2 import predict2
from classifaction.classify3 import predict3
from classifaction.classify4 import predict4


def main():
    '''
    get texts
    '''
    pool = Pool(processes=4, maxtasksperchild=1)
    pool.map(save_to_database, texts)
    pool.close()
    pool.join()


def save_to_database(texts):
    text1, text2, text3, text4 = texts[0], texts[1], texts[2], texts[3]
    label1 = predict1(text1)
    label2 = predict2(text2)
    label3 = predict3(text3)
    label4 = predict4(text4)


if __name__ == '__main__':
    main()
View Code

 

問題 1

在Linux服務器上運行時發現所有進程都停在model.predict上不動了。而在Windows下運行良好

方法

Google后發現很多遇到這個問題,也終於找到一個方法。可以看一下鏈接:

https://github.com/keras-team/keras/issues/9964

有一個方法是

As of TF 1.10, the library seems to be somewhat forkable. So you will have to test what you can do.

Also, something you can try is:
multiprocessing.set_start_method('spawn', force=True) if you're on UNIX and using Python3.

即在使用multiprocessing之前先設置一下。

python多進程內存復制

python對於多進程中使用的是copy on write機制,python 使用multiprocessing來創建多進程時,無論數據是否不會被更改,子進程都會復制父進程的狀態(內存空間數據等)。所以如果主進程耗的資源較多時,不小心就會造成不必要的大量的內存復制,從而可能導致內存爆滿的情況。

進程的啟動有spawn、fork、forkserver三種方式

spawn:調用該方法,父進程會啟動一個新的python進程,子進程只會繼承運行進程對象run()方法所需的那些資源。特別地,子進程不會繼承父進程中不必要的文件描述符和句柄。與使用forkforkserver相比,使用此方法啟動進程相當慢。

           Available on Unix and Windows. The default on Windows.

fork:父進程使用os.fork()來fork Python解釋器。子進程在開始時實際上與父進程相同,父進程的所有資源都由子進程繼承。請注意,安全創建多線程進程尚存在一定的問題。

          Available on Unix only. The default on Unix.

forkserver:當程序啟動並選擇forkserverstart方法時,將啟動服務器進程。從那時起,每當需要一個新進程時,父進程就會連接到服務器並請求它fork一個新進程。 fork服務器進程是單線程的,因此使用os.fork()是安全的。沒有不必要的資源被繼承。

         Available on Unix platforms which support passing file descriptors over Unix pipes.

要選擇以上某一種start方法,請在主模塊中使用multiprocessing.set_start_method()。並且multiprocessing.set_start_method()在一個程序中僅僅能使用一次。

 由上可見,Windows默認使用spawn方法,和Unix類系統如Linux和Mac默認使用的是fork方法,這就解析了為什么在Windows上可以運行,而在Linux上不能運行的原因。

在Linux服務器上運行時更改代碼如下:

import multiprocessing
from multiprocessing import Pool
from classifaction.classify1 import predict1
from classifaction.classify2 import predict2
from classifaction.classify3 import predict3
from classifaction.classify4 import predict4


def main():
    '''
    get texts
    '''
    pool = Pool(processes=4, maxtasksperchild=1)
    multiprocessing.set_start_method('spawn', force=True)
    pool.map(save_to_database, texts)
    pool.close()
    pool.join()


def save_to_database(texts):
    text1, text2, text3, text4 = texts[0], texts[1], texts[2], texts[3]
    label1 = predict1(text1)
    label2 = predict2(text2)
    label3 = predict3(text3)
    label4 = predict4(text4)


if __name__ == '__main__':
    main()
View Code

這樣就可以在Unix系統使用多進程了

 

問題 2

如果電腦上配置好了GPU,以tensorflow為backend,調用tensorflow時,默認會啟動GPU,這樣就沒法用多進程了。

方法

指定用CPU啟動

只需在程序首部添加以下代碼即可

 

import os
os.environ["CUDA_DEVICE_ORDER"] = "PCI_BUS_ID"
os.environ["CUDA_VISIBLE_DEVICES"] = "-1"


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM