python多進程並發


由於Python下調用Linux的Shell命令都需要等待返回,所以常常我們設置的多線程都達不到效果,
因此在調用shell命令不需要返回時,使用threading模塊並不是最好的方法。
   http://www.coder4.com/archives/3352

Python提供了非常好用的多進程包multiprocessing,你只需要定義一個函數,Python會替你完成其他所有事情。
借助這個包,可以輕松完成從單進程到並發執行的轉換。

1、新建單一進程
如果我們新建少量進程,可以如下:
import multiprocessing
import time

def func(msg):
    for i in xrange(3):
        print msg
        time.sleep(1)

if __name__ == "__main__":
    p = multiprocessing.Process(target=func, args=("hello", ))
    p.start()
    p.join()
    print "Sub-process done."

2、使用進程池
是的,你沒有看錯,不是線程池。它可以讓你跑滿多核CPU,而且使用方法非常簡單。
注意要用apply_async,如果落下async,就變成阻塞版本了。

processes=4是最多並發進程數量。

import multiprocessing
import time

def func(msg):
    for i in xrange(3):
        print msg
        time.sleep(1)

if __name__ == "__main__":
    pool = multiprocessing.Pool(processes=4)
    for i in xrange(10):
        msg = "hello %d" %(i)
        pool.apply_async(func, (msg, ))
    pool.close()
    pool.join()
    print "Sub-process(es) done."

3、使用Pool,並需要關注結果

更多的時候,我們不僅需要多進程執行,還需要關注每個進程的執行結果,如下:

import multiprocessing
import time

def func(msg):
    for i in xrange(3):
        print msg
        time.sleep(1)
    return "done " + msg

if __name__ == "__main__":
    pool = multiprocessing.Pool(processes=4)
    result = []
    for i in xrange(10):
        msg = "hello %d" %(i)
        result.append(pool.apply_async(func, (msg, )))
    pool.close()
    pool.join()
    for res in result:
        print res.get()
    print "Sub-process(es) done."

根據網友評論中的反饋,在Windows下運行有可能崩潰(開啟了一大堆新窗口、進程),可以通過如下調用來解決:

multiprocessing.freeze_support()

附錄(自己的腳本):

#!/usr/bin/python
import threading
import subprocess
import datetime
import multiprocessing

def dd_test(round, th):
    test_file_arg = 'of=/zbkc/test_mds_crash/1m_%s_%s_{}' %(round, th)
    command = "seq 100 | xargs -i dd if=/dev/zero %s  bs=1M count=1"  %test_file_arg 
    print command
    subprocess.call(command,shell=True,stdout=open('/dev/null','w'),stderr=subprocess.STDOUT)


def mds_stat(round):
    p = subprocess.Popen("zbkc mds stat", shell = True, stdout = subprocess.PIPE)
    out = p.stdout.readlines()
    if out[0].find('active') != -1:
        command = "echo '0205pm %s round mds status OK, %s' >> /round_record" %(round, datetime.datetime.now())
        command_2 = "time (ls /zbkc/test_mds_crash/) 2>>/round_record"
        command_3 = "ls /zbkc/test_mds_crash | wc -l >> /round_record"
        subprocess.call(command,shell=True)
        subprocess.call(command_2,shell=True)
        subprocess.call(command_3,shell=True)
        return 1
    else:
        command = "echo '0205 %s round mds status abnormal, %s, %s' >> /round_record" %(round, out[0], datetime.datetime.now())
        subprocess.call(command,shell=True)
        return 0


#threads = []
for round in range(1, 1600):
    pool = multiprocessing.Pool(processes = 10) #使用進程池
    for th in range(10):
#        th_name = "thread-" + str(th)
#        threads.append(th_name)   #添加線程到線程列表
#        threading.Thread(target = dd_test, args = (round, th), name = th_name).start()  #創建多線程任務
        pool.apply_async(dd_test, (round, th))
    pool.close()
    pool.join()
    #等待線程完成
#    for t in threads:
#        t.join()    

    if mds_stat(round) == 0: 
        subprocess.call("zbkc -s",shell=True)
        break
原文: http://blog.csdn.net/werm520/article/details/43730593


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM