Python的並行求和例子


先上一個例子,這段代碼是為了評估一個預測模型寫的,詳細評價說明在

https://www.kaggle.com/c/how-much-did-it-rain/details/evaluation,

它的核心是要計算

在實際計算過程中,n很大(1126694),以至於單進程直接計算時間消耗巨大(14分10秒),

所以這里參考mapReduce的思想,嘗試使用多進程的方式進行計算,即每個進程計算一部分n,最后將結果相加再計算C

代碼如下:

import csv
import sys
import logging
import argparse
import numpy as np
import multiprocessing
import time

# configure logging
logger = logging.getLogger("example")

handler = logging.StreamHandler(sys.stderr)
handler.setFormatter(logging.Formatter(
    '%(asctime)s %(levelname)s %(name)s: %(message)s'))

logger.addHandler(handler)
logger.setLevel(logging.DEBUG)

def H(n, z):
    return (n-z) >= 0

def evaluate(args, start, end):
    '''handle range[start, end)'''
    logger.info("Started %d to %d" %(start, end))
    expReader = open('train_exp.csv','r')
    expReader.readline()
    for i in range(start):
        _ = expReader.readline()
    predFile = open(args.predict)
    for i in range(start+1):
        _ = predFile.readline()
    predReader = csv.reader(predFile, delimiter=',')
    squareErrorSum = 0
    totalLines = end - start
    for i, row in enumerate(predReader):
        if i == totalLines:
            logger.info("Completed %d to %d" %(start, end))
            break
        expId, exp = expReader.readline().strip().split(',')
        exp = float(exp)
        predId = row[0]
        row = np.array(row, dtype='float')
        #assert expId == predId
        #lineSum = 0
        for j in xrange(1,71):
            n = j - 1
            squareErrorSum += (row[j]-(n>=exp))**2
            #squareErrorSum += (row[j]-H(n,exp))**2
            #lineSum += (row[j]-H(n,exp))**2
    logger.info('SquareErrorSum %d to %d: %f' %(start, end, squareErrorSum))
    return squareErrorSum

def fileCmp(args):
    '''check number of lines in two files are same'''
    for count, line in enumerate(open('train_exp.csv')):
        pass
    expLines = count + 1 - 1 #discare header
    for count, line in enumerate(open(args.predict)):
        pass
    predictLines = count + 1 - 1
    print 'Lines(exp, predict):', expLines, predictLines
    assert expLines == predictLines
    evaluate.Lines = expLines
    
if __name__ == "__main__":
    # set up logger
    parser = argparse.ArgumentParser(description=__doc__)
    parser.add_argument('--predict', 
                        help=("path to an predict probability file, this will "
                              "predict_changeTimePeriod.csv"))
    args = parser.parse_args()
    fileCmp(args)
    pool = multiprocessing.Pool(processes=multiprocessing.cpu_count())
    result = []
    blocks = multiprocessing.cpu_count()
    linesABlock = evaluate.Lines / blocks
    for i in xrange(blocks-1):
        result.append(pool.apply_async(evaluate, (args, i*linesABlock, (i+1)*linesABlock)))
    result.append(pool.apply_async(evaluate, (args, (i+1)*linesABlock, evaluate.Lines+1)))
    pool.close()
    pool.join()
    result = [res.get() for res in result]
    print result
    print 'evaluate.Lines', evaluate.Lines
    score = sum(result) / (70*evaluate.Lines)
    print "score:", score

 

這里是有幾個CPU核心就分成幾個進程進行計算,希望盡量榨干CPU的計算能力。實際上運行過程中CPU的占用率也一直是100%

測試后計算結果與單進程一致,計算時間縮短為6分27秒,只快了一倍。

提升沒有想象中的大。

經過嘗試直接用StringIO將原文件每個進程加載一份到內存在進行處理速度也沒有進一步提升,結合CPU的100%占用率考慮看起來是因為計算能力還不夠。

看來計算密集密集型的工作還是需要用C來寫的:)

C的實現要比python快太多了,單線程只需要50秒就能搞定,詳見:

http://www.cnblogs.com/instant7/p/4313649.html


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM