上次說了很多Linux下進程相關知識,這邊不再復述,下面來說說Python的並發編程,如有錯誤歡迎提出~
如果遇到聽不懂的可以看上一次的文章:https://www.cnblogs.com/dotnetcrazy/p/9363810.html
官方文檔:https://docs.python.org/3/library/concurrency.html
在線預覽:http://github.lesschina.com/python/base/concurrency/2.並發編程-進程篇.html
1.進程篇¶
官方文檔:https://docs.python.org/3/library/multiprocessing.html
Code:https://github.com/lotapp/BaseCode/tree/master/python/5.concurrent/PythonProcess
1.1.進程(Process)¶
Python的進程創建非常方便,看個案例:(這種方法通用,fork只適用於Linux系)
import os
# 注意一下,導入的是Process不是process(Class是大寫開頭)
from multiprocessing import Process
def test(name):
print("[子進程-%s]PID:%d,PPID:%d" % (name, os.getpid(), os.getppid()))
def main():
print("[父進程]PID:%d,PPID:%d" % (os.getpid(), os.getppid()))
p = Process(target=test, args=("萌萌噠", )) # 單個元素的元組表達別忘了(x,)
p.start()
p.join() # 父進程回收子進程資源(內部調用了wait系列方法)
if __name__ == '__main__':
main()
運行結果:
[父進程]PID:25729,PPID:23434
[子進程-萌萌噠]PID:25730,PPID:25729
創建子進程時,傳入一個執行函數和參數,用start()方法來啟動進程即可
join()
方法是父進程回收子進程的封裝(主要是回收僵屍子進程(點我))
其他參數可以參考源碼 or 文檔,貼一下源碼的init
方法:
def __init__(self,group=None,target=None,name=None,args=(),kwargs={},*,daemon=None)
擴展:name:為當前進程實例的別名
p.is_alive()
判斷進程實例p是否還在執行p.terminate()
終止進程(發SIGTERM
信號)
上面的案例如果用OOP來實現就是這樣:(如果不指定方法,默認調Run方法)
import os
from multiprocessing import Process
class My_Process(Process):
# 重寫了Proce類的Init方法
def __init__(self, name):
self.__name = name
Process.__init__(self) # 調用父類方法
# 重寫了Process類的run()方法
def run(self):
print("[子進程-%s]PID:%d,PPID:%d" % (self.__name, os.getpid(),
os.getppid()))
def main():
print("[父進程]PID:%d,PPID:%d" % (os.getpid(), os.getppid()))
p = My_Process("萌萌噠") # 如果不指定方法,默認調Run方法
p.start()
p.join() # 父進程回收子進程資源(內部調用了wait系列方法)
if __name__ == '__main__':
main()
PS:multiprocessing.Process
自行處理僵死進程,不用像os.fork
那樣自己建立信號處理程序、安裝信號處理程序
1.1.源碼拓展¶
現在說說里面的一些門道(只想用的可以忽略)
新版本的封裝可能多層,這時候可以看看Python3.3.X系列(這個算是Python3早期版本了,很多代碼都暴露出來,比較明了直觀)
multiprocessing.process.py
# 3.4.x開始,Process有了一個BaseProcess
# https://github.com/python/cpython/blob/3.7/Lib/multiprocessing/process.py
# https://github.com/lotapp/cpython3/tree/master/Lib/multiprocessing/process.py
def join(self, timeout=None):
'''一直等到子進程over'''
self._check_closed()
# 斷言(False就觸發異常,提示就是后面的內容
# 開發中用的比較多,部署的時候可以python3 -O xxx 去除所以斷言
assert self._parent_pid == os.getpid(), "只能 join 一個子進程"
assert self._popen is not None, "只能加入一個已啟動的進程"
res = self._popen.wait(timeout) # 本質就是用了我們之前講的wait系列
if res is not None:
_children.discard(self) # 銷毀子進程
multiprocessing.popen_fork.py
# 3.4.x開始,在popen_fork文件中(以前是multiprocessing.forking.py)
# https://github.com/python/cpython/blob/3.7/Lib/multiprocessing/popen_fork.py
# https://github.com/lotapp/cpython3/tree/master/Lib/multiprocessing/popen_fork.py
def wait(self, timeout=None):
if self.returncode is None:
# 設置超時的一系列處理
if timeout is not None:
from multiprocessing.connection import wait
if not wait([self.sentinel], timeout):
return None
# 核心操作
return self.poll(os.WNOHANG if timeout == 0.0 else 0)
return self.returncode
# 回顧一下上次說的:os.WNOHANG - 如果沒有子進程退出,則不阻塞waitpid()調用
def poll(self, flag=os.WNOHANG):
if self.returncode is None:
try:
# 他的內部調用了waitpid
pid, sts = os.waitpid(self.pid, flag)
except OSError as e:
# 子進程尚未創建
# e.errno == errno.ECHILD == 10
return None
if pid == self.pid:
if os.WIFSIGNALED(sts):
self.returncode = -os.WTERMSIG(sts)
else:
assert os.WIFEXITED(sts), "Status is {:n}".format(sts)
self.returncode = os.WEXITSTATUS(sts)
return self.returncode
關於斷言的簡單說明:(別泛濫)
如果條件為真,它什么都不做,反之它觸發一個帶可選錯誤信息的AssertionError
def test(a, b):
assert b != 0, "哥哥,分母不能為0啊"
return a / b
def main():
test(1, 0)
if __name__ == '__main__':
main()
結果:
Traceback (most recent call last):
File "0.assert.py", line 11, in <module>
main()
File "0.assert.py", line 7, in main
test(1, 0)
File "0.assert.py", line 2, in test
assert b != 0, "哥哥,分母不能為0啊"
AssertionError: 哥哥,分母不能為0啊
運行的時候可以指定-O參數
來忽略assert
,eg:
python3 -O 0.assert.py
Traceback (most recent call last):
File "0.assert.py", line 11, in <module>
main()
File "0.assert.py", line 7, in main
test(1, 0)
File "0.assert.py", line 3, in test
return a / b
ZeroDivisionError: division by zero
擴展:
https://docs.python.org/3/library/unittest.html
https://www.cnblogs.com/shangren/p/8038935.html
1.2.進程池¶
多個進程就不需要自己手動去管理了,有Pool來幫你完成,先看個案例:
import os
import time
from multiprocessing import Pool # 首字母大寫
def test(name):
print("[子進程-%s]PID=%d,PPID=%d" % (name, os.getpid(), os.getppid()))
time.sleep(1)
def main():
print("[父進程]PID=%d,PPID=%d" % (os.getpid(), os.getppid()))
p = Pool(5) # 設置最多5個進程(不設置就默認為CPU核數)
for i in range(10):
# 異步執行
p.apply_async(test, args=(i, )) # 同步用apply(如非必要不建議用)
p.close() # 關閉池,不再加入新任務
p.join() # 等待所有子進程執行完畢回收資源(join可以指定超時時間,eg:`p.join(1)`)
print("over")
if __name__ == '__main__':
main()
圖示:(join可以指定超時時間,eg:p.join(1)
)
調用join()
之前必須先調用close()
,調用close()
之后就不能繼續添加新的Process
了(下面會說為什么)
1.3.源碼拓展¶
驗證一下Pool的默認大小是CPU的核數,看源碼:
multiprocessing.pool.py
# https://github.com/python/cpython/blob/3.7/Lib/multiprocessing/pool.py
# https://github.com/lotapp/cpython3/tree/master/Lib/multiprocessing/pool.py
class Pool(object):
def __init__(self, processes=指定的進程數,...):
if processes is None:
processes = os.cpu_count() or 1 # os.cpu_count() ~ CPU的核數
源碼里面apply_async
方法,是有回調函數(callback)的
def apply_async(self,func,args=(),kwds={},callback=None,error_callback=None):
if self._state != RUN:
raise ValueError("Pool not running")
result = ApplyResult(self._cache, callback, error_callback)
self._taskqueue.put(([(result._job, 0, func, args, kwds)], None))
return result
來看個例子:(和JQ很像)
import os
import time
from multiprocessing import Pool # 首字母大寫
def test(name):
print("[子進程%s]PID=%d,PPID=%d" % (name, os.getpid(), os.getppid()))
time.sleep(1)
return name
def error_test(name):
print("[子進程%s]PID=%d,PPID=%d" % (name, os.getpid(), os.getppid()))
raise Exception("[子進程%s]啊,我掛了~" % name)
def callback(result):
"""成功之后的回調函數"""
print("[子進程%s]執行完畢" % result) # 沒有返回值就為None
def error_callback(msg):
"""錯誤之后的回調函數"""
print(msg)
def main():
print("[父進程]PID=%d,PPID=%d" % (os.getpid(), os.getppid()))
p = Pool() # CPU默認核數
for i in range(5):
# 搞2個出錯的看看
if i > 2:
p.apply_async(
error_test,
args=(i, ),
callback=callback,
error_callback=error_callback) # 異步執行
else:
# 異步執行,成功后執行callback函數(有點像jq)
p.apply_async(test, args=(i, ), callback=callback)
p.close() # 關閉池,不再加入新任務
p.join() # 等待所有子進程執行完畢回收資源
print("over")
if __name__ == '__main__':
main()
輸出:
[父進程]PID=12348,PPID=10999
[子進程0]PID=12349,PPID=12348
[子進程2]PID=12351,PPID=12348
[子進程1]PID=12350,PPID=12348
[子進程3]PID=12352,PPID=12348
[子進程4]PID=12352,PPID=12348
[子進程3]啊,我掛了~
[子進程4]啊,我掛了~
[子進程0]執行完畢
[子進程2]執行完畢
[子進程1]執行完畢
over
接着上面繼續拓展,補充說說獲取函數返回值。上面是通過成功后的回調函數來獲取返回值
,這次說說自帶的方法:
import time
from multiprocessing import Pool, TimeoutError
def test(x):
"""開平方"""
time.sleep(1)
return x * x
def main():
pool = Pool()
task = pool.apply_async(test, (10, ))
print(task)
try:
print(task.get(timeout=1))
except TimeoutError as ex:
print("超時了~", ex)
if __name__ == '__main__':
main()
輸出:(apply_async
返回一個ApplyResult
類,里面有個get方法可以獲取返回值)
<multiprocessing.pool.ApplyResult object at 0x7fbc354f50b8>
超時了~
再舉個例子,順便把Pool
里面的map
和imap
方法搞個案例(類比jq)
import time
from multiprocessing import Pool
def test(x):
return x * x
if __name__ == '__main__':
with Pool(processes=4) as pool:
task = pool.apply_async(test, (10, ))
print(task.get(timeout=1))
obj_list = pool.map(test, range(10))
print(obj_list)
# 返回一個可迭代類的實例對象
obj_iter = pool.imap(test, range(10))
print(obj_iter)
next(obj_iter)
for i in obj_iter:
print(i, end=" ")
輸出:
100
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
<multiprocessing.pool.IMapIterator object at 0x7ff7f9734198>
1 4 9 16 25 36 49 64 81
微微看一眼源碼:(基礎忘了可以查看==> 點我 )
class IMapIterator(object):
def __init__(self, cache):
self._cond = threading.Condition(threading.Lock())
self._job = next(job_counter)
self._cache = cache
self._items = collections.deque()
self._index = 0
self._length = None
self._unsorted = {}
cache[self._job] = self
def __iter__(self):
return self # 返回一個迭代器
# 實現next方法
def next(self, timeout=None):
with self._cond:
try:
item = self._items.popleft()
except IndexError:
if self._index == self._length:
raise StopIteration from None
self._cond.wait(timeout)
try:
item = self._items.popleft()
except IndexError:
if self._index == self._length:
raise StopIteration from None
raise TimeoutError from None
success, value = item
if success:
return value
raise value
......
擴展:優雅殺死子進程的探討 https://segmentfault.com/q/1010000005077517
1.4.拓展之subprocess¶
官方文檔:https://docs.python.org/3/library/subprocess.html
還記得之前李代桃僵的execlxxx
系列嗎?
這不,subprocess
就是它的一層封裝,當然了要強大的多,先看個例子:(以os.execlp
的例子為引)
import subprocess
def main():
# os.execlp("ls", "ls", "-al") # 執行Path環境變量可以搜索到的命令
result = subprocess.run(["ls", "-al"])
print(result)
if __name__ == '__main__':
main()
輸出
總用量 44
drwxrwxr-x 2 dnt dnt 4096 8月 7 17:32 .
drwxrwxr-x 4 dnt dnt 4096 8月 6 08:01 ..
-rw-rw-r-- 1 dnt dnt 151 8月 3 10:49 0.assert.py
-rw-rw-r-- 1 dnt dnt 723 8月 5 18:00 1.process2.py
-rw-rw-r-- 1 dnt dnt 501 8月 3 10:20 1.process.py
-rw-rw-r-- 1 dnt dnt 1286 8月 6 08:16 2.pool1.py
-rw-rw-r-- 1 dnt dnt 340 8月 7 16:38 2.pool2.py
-rw-rw-r-- 1 dnt dnt 481 8月 7 16:50 2.pool3.py
-rw-rw-r-- 1 dnt dnt 652 8月 5 17:01 2.pool.py
-rw-rw-r-- 1 dnt dnt 191 8月 7 17:33 3.subprocess.py
CompletedProcess(args=['ls', '-al'], returncode=0)
文檔¶
現在看下官方的文檔描述來理解一下:
r"""
具有可訪問I / O流的子進程
Subprocesses with accessible I/O streams
此模塊允許您生成進程,連接到它們輸入/輸出/錯誤管道,並獲取其返回代碼。
This module allows you to spawn processes, connect to their
input/output/error pipes, and obtain their return codes.
完整文檔可以查看:https://docs.python.org/3/library/subprocess.html
For a complete description of this module see the Python documentation.
Main API
========
run(...): 運行命令,等待它完成,然后返回`CompletedProcess`實例。
Runs a command, waits for it to complete,
then returns a CompletedProcess instance.
Popen(...): 用於在新進程中靈活執行命令的類
A class for flexibly executing a command in a new process
Constants(常量)
---------
DEVNULL: 特殊值,表示應該使用`os.devnull`
Special value that indicates that os.devnull should be used
PIPE: 表示應創建`PIPE`管道的特殊值
Special value that indicates a pipe should be created
STDOUT: 特殊值,表示`stderr`應該轉到`stdout`
Special value that indicates that stderr should go to stdout
Older API(盡量不用,說不定以后就淘汰了)
=========
call(...): 運行命令,等待它完成,然后返回返回碼。
Runs a command, waits for it to complete, then returns the return code.
check_call(...): Same as call() but raises CalledProcessError()
if return code is not 0(返回值不是0就引發異常)
check_output(...): 與check_call()相同,但返回`stdout`的內容,而不是返回代碼
Same as check_call but returns the contents of stdout instead of a return code
getoutput(...): 在shell中運行命令,等待它完成,然后返回輸出
Runs a command in the shell, waits for it to complete,then returns the output
getstatusoutput(...): 在shell中運行命令,等待它完成,然后返回一個(exitcode,output)元組
Runs a command in the shell, waits for it to complete,
then returns a (exitcode, output) tuple
"""
其實看看源碼很有意思:(內部其實就是調用的os.popen
【進程先導篇講進程守護的時候用過】)
def run(*popenargs, input=None, capture_output=False,
timeout=None, check=False, **kwargs):
if input is not None:
if 'stdin' in kwargs:
raise ValueError('stdin和輸入參數可能都不會被使用。')
kwargs['stdin'] = PIPE
if capture_output:
if ('stdout' in kwargs) or ('stderr' in kwargs):
raise ValueError('不能和capture_outpu一起使用stdout 或 stderr')
kwargs['stdout'] = PIPE
kwargs['stderr'] = PIPE
with Popen(*popenargs, **kwargs) as process:
try:
stdout, stderr = process.communicate(input, timeout=timeout)
except TimeoutExpired:
process.kill()
stdout, stderr = process.communicate()
raise TimeoutExpired(
process.args, timeout, output=stdout, stderr=stderr)
except: # 包括KeyboardInterrupt的通信處理。
process.kill()
# 不用使用process.wait(),.__ exit__為我們做了這件事。
raise
retcode = process.poll()
if check and retcode:
raise CalledProcessError(
retcode, process.args, output=stdout, stderr=stderr)
return CompletedProcess(process.args, retcode, stdout, stderr)
返回值類型:CompletedProcess
# https://github.com/lotapp/cpython3/blob/master/Lib/subprocess.py
class CompletedProcess(object):
def __init__(self, args, returncode, stdout=None, stderr=None):
self.args = args
self.returncode = returncode
self.stdout = stdout
self.stderr = stderr
def __repr__(self):
"""對象按指定的格式顯示"""
args = [
'args={!r}'.format(self.args),
'returncode={!r}'.format(self.returncode)
]
if self.stdout is not None:
args.append('stdout={!r}'.format(self.stdout))
if self.stderr is not None:
args.append('stderr={!r}'.format(self.stderr))
return "{}({})".format(type(self).__name__, ', '.join(args))
def check_returncode(self):
"""如果退出代碼非零,則引發CalledProcessError"""
if self.returncode:
raise CalledProcessError(self.returncode, self.args, self.stdout,
self.stderr)
簡單demo¶
再來個案例體會一下方便之處:
import subprocess
def main():
result = subprocess.run(["ping", "www.baidu.com"])
print(result.stdout)
if __name__ == '__main__':
main()
圖示:
交互demo¶
再來個強大的案例(交互的程序都可以,比如 ftp
,nslookup
等等):popen1.communicate
import subprocess
def main():
process = subprocess.Popen(
["ipython3"],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
try:
# 對pstree進行交互
out, err = process.communicate(input=b'print("hello")', timeout=3)
print("Out:%s\nErr:%s" % (out.decode(), err.decode()))
except TimeoutError:
# 如果超時到期,則子進程不會被終止,需要自己處理一下
process.kill()
out, err = process.communicate()
print("Out:%s\nErr:%s" % (out.decode(), err.decode()))
if __name__ == '__main__':
main()
輸出:
IPython 6.4.0 -- An enhanced Interactive Python. Type '?' for help.
In [1]: hello
In [2]: Do you really want to exit ([y]/n)?
Err:
注意點:如果超時到期,則子進程不會被終止,需要自己處理一下(官方提醒)
通信demo¶
這個等會說進程間通信還會說,所以簡單舉個例子,老規矩拿ps aux | grep bash
說事:
import subprocess
def main():
# ps aux | grep bash
# 進程1獲取結果
p1 = subprocess.Popen(["ps", "-aux"], stdout=subprocess.PIPE)
# 得到進程1的結果再進行篩選
p2 = subprocess.Popen(["grep", "bash"], stdin=p1.stdout, stdout=subprocess.PIPE)
# 關閉寫段(結果已經獲取到進程2中了,防止干擾顯示)
p1.stdout.close()
# 與流程交互:將數據發送到stdin並關閉它。
msg_tuple = p2.communicate()
# 輸出結果
print(msg_tuple[0].decode())
if __name__ == '__main__':
main()
輸出:(以前案例:進程間通信~PIPE匿名管道)
dnt 2470 0.0 0.1 24612 5236 pts/0 Ss 06:01 0:00 bash
dnt 2512 0.0 0.1 24744 5760 pts/1 Ss 06:02 0:00 bash
dnt 20784 0.0 0.1 24692 5588 pts/2 Ss+ 06:21 0:00 /bin/bash
dnt 22377 0.0 0.0 16180 1052 pts/1 S+ 06:30 0:00 grep bash
其他擴展可以看看這篇文章:subprocess與Popen()
1.5.進程間通信~PIPE管道通信¶
這個比較有意思,看個案例:
from multiprocessing import Process, Pipe
def test(w):
w.send("[子進程]老爸,老媽回來記得喊我一下~")
msg = w.recv()
print(msg)
def main():
r, w = Pipe()
p1 = Process(target=test, args=(w, ))
p1.start()
msg = r.recv()
print(msg)
r.send("[父進程]滾犢子,趕緊寫作業,不然我得跪方便面!")
p1.join()
if __name__ == '__main__':
main()
結果:
老爸,老媽回來記得喊我一下~
滾犢子,趕緊寫作業,不然我得跪方便面!
multiprocessing.Pipe源碼分析¶
按照道理應該子進程自己寫完自己讀了,和上次講得不一樣啊?不急,先看看源碼:
# https://github.com/lotapp/cpython3/blob/master/Lib/multiprocessing/context.py
def Pipe(self, duplex=True):
'''返回由管道連接的兩個連接對象'''
from .connection import Pipe
return Pipe(duplex)
看看connection.Pipe
方法的定義部分,是不是雙向通信就看你是否設置duplex=True
# https://github.com/lotapp/cpython3/blob/master/Lib/multiprocessing/connection.py
if sys.platform != 'win32':
def Pipe(duplex=True):
'''返回管道兩端的一對連接對象'''
if duplex:
# 雙工內部其實是socket系列(下次講)
s1, s2 = socket.socketpair()
s1.setblocking(True)
s2.setblocking(True)
c1 = Connection(s1.detach())
c2 = Connection(s2.detach())
else:
# 這部分就是我們上次講的pipe管道
fd1, fd2 = os.pipe()
c1 = Connection(fd1, writable=False)
c2 = Connection(fd2, readable=False)
return c1, c2
else:
def Pipe(duplex=True):
# win平台的一系列處理
......
c1 = PipeConnection(h1, writable=duplex)
c2 = PipeConnection(h2, readable=duplex)
return c1, c2
通過源碼知道了,原來雙工是通過socket搞的啊~
再看個和原來一樣效果的案例:(不用關來關去的了,方便!)
from multiprocessing import Process, Pipe
def test(w):
# 只能寫
w.send("[子進程]老爸,咱們完了,老媽一直在門口~")
def main():
r, w = Pipe(duplex=False)
p1 = Process(target=test, args=(w, ))
p1.start() # 你把這個放在join前面就直接死鎖了
msg = r.recv() # 只能讀
print(msg)
p1.join()
if __name__ == '__main__':
main()
輸出:(可以思考下為什么start換個位置就死鎖
,提示:阻塞讀寫
)
[子進程]老爸,咱們完了,老媽一直在門口~
再舉個Pool
的例子,咱們就進入今天的重點了:
from multiprocessing import Pipe, Pool
def proc_test1(conn):
conn.send("[小明]小張,今天哥們要見一女孩,你陪我唄,我24h等你回復哦~")
msg = conn.recv()
print(msg)
def proc_test2(conn):
msg = conn.recv()
print(msg)
conn.send("[小張]不去,萬一被我帥氣的外表迷倒就坑了~")
def main():
conn1, conn2 = Pipe()
p = Pool()
p.apply_async(proc_test1, (conn1, ))
p.apply_async(proc_test2, (conn2, ))
p.close() # 關閉池,不再接收新任務
p.join() # 等待回收,必須先關才能join,不然會異常
if __name__ == '__main__':
main()
輸出:
[小明]小張,今天哥們要見一女孩,你陪我唄,我24h等你回復哦~
[小張]不去,萬一被我帥氣的外表迷倒就坑了~
pool.join源碼分析¶
看看源碼就理解了:看看Pool的join是啥情況?看源碼:
# https://github.com/python/cpython/blob/3.7/Lib/multiprocessing/pool.py
# https://github.com/lotapp/cpython3/blob/master/Lib/multiprocessing/pool.py
def join(self):
util.debug('joining pool')
if self._state == RUN:
# 沒關閉就join,這邊就會拋出一個異常
raise ValueError("Pool is still running")
elif self._state not in (CLOSE, TERMINATE):
raise ValueError("In unknown state")
self._worker_handler.join()
self._task_handler.join()
self._result_handler.join()
for p in self._pool:
p.join() # 循環join回收
在pool的__init__
的方法中,這幾個屬性:
self._processes = processes # 指定的進程數
self._pool = [] # 列表
self._repopulate_pool() # 給列表append內容的方法
將池進程的數量增加到指定的數量,join的時候會使用這個列表
def _repopulate_pool(self):
# 指定進程數-當前進程數,差幾個補幾個
for i in range(self._processes - len(self._pool)):
w = self.Process(target=worker,
args=(self._inqueue, self._outqueue,
self._initializer,
self._initargs, self._maxtasksperchild,
self._wrap_exception)
)
self._pool.append(w) # 重點來了
w.name = w.name.replace('Process', 'PoolWorker')
w.daemon = True # pool退出后,通過pool創建的進程都會退出
w.start()
util.debug('added worker')
注意:池的方法只能由創建它的進程使用
1.5.進程間通信~Queue管道通信(常用)¶
一步步的設局,從底層的的pipe()
->os.pipe
->PIPE
,現在終於到Queue
了,心酸啊,明知道上面兩個項目
里面基本上不會用,但為了你們能看懂源碼,說了這么久%>_<%
其實以后當我們從Queue
說到MQ
和RPC
之后,現在
講得這些進程間通信(IPC
)也基本上不會用了,但本質你得清楚,我盡量多分析點源碼,這樣你們以后看開源項目壓力會很小
歡迎批評指正~
引入案例¶
from multiprocessing import Process, Queue
def test(q):
q.put("[子進程]老爸,我出去嗨了")
print(q.get())
def main():
q = Queue()
p = Process(target=test, args=(q, ))
p.start()
msg = q.get()
print(msg)
q.put("[父進程]去吧比卡丘~")
p.join()
if __name__ == '__main__':
main()
輸出:(get
和put
默認是阻塞等待的)
[子進程]老爸,我出去嗨了
[父進程]去吧比卡丘~
源碼拓展¶
先看看Queue
的初始化方法:(不指定大小就是最大隊列數)
# 隊列類型,使用PIPE,緩存,線程
class Queue(object):
# ctx = multiprocessing.get_context("xxx")
# 上下文總共3種:spawn、fork、forkserver(擴展部分會提一下)
def __init__(self, maxsize=0, *, ctx):
# 默認使用最大容量
if maxsize <= 0:
from .synchronize import SEM_VALUE_MAX as maxsize
self._maxsize = maxsize # 指定隊列大小
# 創建了一個PIPE匿名管道(單向)
self._reader, self._writer = connection.Pipe(duplex=False)
# `multiprocessing/synchronize.py > Lock`
self._rlock = ctx.Lock() # 進程鎖(讀)【非遞歸】
self._opid = os.getpid() # 獲取PID
if sys.platform == 'win32':
self._wlock = None
else:
self._wlock = ctx.Lock() # 進程鎖(寫)【非遞歸】
# Semaphore信號量通常用於保護容量有限的資源
# 控制信號量,超了就異常
self._sem = ctx.BoundedSemaphore(maxsize)
# 不忽略PIPE管道破裂的錯誤
self._ignore_epipe = False
# 線程相關操作
self._after_fork()
# 向`_afterfork_registry`字典中注冊
if sys.platform != 'win32':
register_after_fork(self, Queue._after_fork)
關於get
和put
是阻塞的問題,看下源碼探探究竟:
q.get()
:收消息
def get(self, block=True, timeout=None):
# 默認情況是阻塞(lock加鎖)
if block and timeout is None:
with self._rlock:
res = self._recv_bytes()
self._sem.release() # 信號量+1
else:
if block:
deadline = time.monotonic() + timeout
# 超時拋異常
if not self._rlock.acquire(block, timeout):
raise Empty
try:
if block:
timeout = deadline - time.monotonic()
# 不管有沒有內容都去讀,超時就拋異常
if not self._poll(timeout):
raise Empty
elif not self._poll():
raise Empty
# 接收字節數據作為字節對象
res = self._recv_bytes()
self._sem.release() # 信號量+1
finally:
# 釋放鎖
self._rlock.release()
# 釋放鎖后,重新序列化數據
return _ForkingPickler.loads(res)
queue.put()
:發消息
def put(self, obj, block=True, timeout=None):
# 如果Queue已經關閉就拋異常
assert not self._closed, "Queue {0!r} has been closed".format(self)
# 記錄信號量的鎖
if not self._sem.acquire(block, timeout):
raise Full # 超過數量,拋個異常
# 條件變量允許一個或多個線程等待,直到另一個線程通知它們
with self._notempty:
if self._thread is None:
self._start_thread()
self._buffer.append(obj)
self._notempty.notify()
非阻塞get_nowait
和put_nowait
本質其實也是調用了get
和put
方法:
def get_nowait(self):
return self.get(False)
def put_nowait(self, obj):
return self.put(obj, False)
進程間通信1¶
說這么多不如來個例子看看:
from multiprocessing import Queue
def main():
q = Queue(3) # 只能 put 3條消息
q.put([1, 2, 3, 4]) # put一個List類型的消息
q.put({"a": 1, "b": 2}) # put一個Dict類型的消息
q.put({1, 2, 3, 4}) # put一個Set類型的消息
try:
# 不加timeout,就一直阻塞,等消息隊列有空位才能發出去
q.put("再加條消息唄", timeout=2)
# Full(Exception)是空實現,你可以直接用Exception
except Exception:
print("消息隊列已滿,隊列數%s,當前存在%s條消息" % (q._maxsize, q.qsize()))
try:
# 非阻塞,不能put就拋異常
q.put_nowait("再加條消息唄") # 相當於q.put(obj,False)
except Exception:
print("消息隊列已滿,隊列數%s,當前存在%s條消息" % (q._maxsize, q.qsize()))
while not q.empty():
print("隊列數:%s,當前存在%s條消息 內容%s" % (q._maxsize, q.qsize(), q.get_nowait()))
print("隊列數:%s,當前存在:%s條消息" % (q._maxsize, q.qsize()))
if __name__ == '__main__':
main()
輸出:
消息隊列已滿,隊列數3,當前存在3條消息
消息隊列已滿,隊列數3,當前存在3條消息
隊列數:3,當前存在3條消息 內容[1, 2, 3, 4]
隊列數:3,當前存在2條消息 內容{'a': 1, 'b': 2}
隊列數:3,當前存在1條消息 內容{1, 2, 3, 4}
隊列數:3,當前存在:0條消息
補充說明一下:
q._maxsize
隊列數(盡量不用_
開頭的屬性和方法)q.qsize()
查看當前隊列中存在幾條消息q.full()
查看是否滿了q.empty()
查看是否為空
再看個簡單點的子進程間通信:(鋪墊demo)
import os
import time
from multiprocessing import Process, Queue
def pro_test1(q):
print("[子進程1]PPID=%d,PID=%d,GID=%d"%(os.getppid(), os.getpid(), os.getgid()))
q.put("[子進程1]小明,今晚擼串不?")
# 設置一個簡版的重試機制(三次重試)
for i in range(3):
if not q.empty():
print(q.get())
break
else:
time.sleep((i + 1) * 2) # 第一次1s,第二次4s,第三次6s
def pro_test2(q):
print("[子進程2]PPID=%d,PID=%d,GID=%d"%(os.getppid(), os.getpid(), os.getgid()))
print(q.get())
time.sleep(4) # 模擬一下網絡延遲
q.put("[子進程2]不去,我今天約了妹子")
def main():
queue = Queue()
p1 = Process(target=pro_test1, args=(queue, ))
p2 = Process(target=pro_test2, args=(queue, ))
p1.start()
p2.start()
p1.join()
p2.join()
if __name__ == '__main__':
main()
輸出:(time python3 5.queue2.py
)
[子進程1]PPID=15220,PID=15221,GID=1000
[子進程2]PPID=15220,PID=15222,GID=1000
[子進程1]小明,今晚擼串不?
[子進程2]不去,我今天約了妹子
real 0m6.087s
user 0m0.053s
sys 0m0.035s
進程間通信2¶
多進程基本上都是用pool
,可用上面說的Queue
方法怎么報錯了?
import os
import time
from multiprocessing import Pool, Queue
def error_callback(msg):
print(msg)
def pro_test1(q):
print("[子進程1]PPID=%d,PID=%d,GID=%d" % (os.getppid(), os.getpid(),
os.getgid()))
q.put("[子進程1]小明,今晚擼串不?")
# 設置一個簡版的重試機制(三次重試)
for i in range(3):
if not q.empty():
print(q.get())
break
else:
time.sleep((i + 1) * 2) # 第一次1s,第二次4s,第三次6s
def pro_test2(q):
print("[子進程2]PPID=%d,PID=%d,GID=%d" % (os.getppid(), os.getpid(),
os.getgid()))
print(q.get())
time.sleep(4) # 模擬一下網絡延遲
q.put("[子進程2]不去,我今天約了妹子")
def main():
print("[父進程]PPID=%d,PID=%d,GID=%d" % (os.getppid(), os.getpid(),
os.getgid()))
queue = Queue()
p = Pool()
p.apply_async(pro_test1, args=(queue, ), error_callback=error_callback)
p.apply_async(pro_test2, args=(queue, ), error_callback=error_callback)
p.close()
p.join()
if __name__ == '__main__':
main()
輸出:(無法將multiprocessing.Queue
對象傳遞給Pool
方法)
[父進程]PPID=4223,PID=32170,GID=1000
Queue objects should only be shared between processes through inheritance
Queue objects should only be shared between processes through inheritance
real 0m0.183s
user 0m0.083s
sys 0m0.012s
下面會詳說,先看一下正確方式:(隊列換了一下,其他都一樣Manager().Queue()
)
import os
import time
from multiprocessing import Pool, Manager
def error_callback(msg):
print(msg)
def pro_test1(q):
print("[子進程1]PPID=%d,PID=%d,GID=%d" % (os.getppid(), os.getpid(),
os.getgid()))
q.put("[子進程1]小明,今晚擼串不?")
# 設置一個簡版的重試機制(三次重試)
for i in range(3):
if not q.empty():
print(q.get())
break
else:
time.sleep((i + 1) * 2) # 第一次1s,第二次4s,第三次6s
def pro_test2(q):
print("[子進程2]PPID=%d,PID=%d,GID=%d" % (os.getppid(), os.getpid(),
os.getgid()))
print(q.get())
time.sleep(4) # 模擬一下網絡延遲
q.put("[子進程2]不去,我今天約了妹子")
def main():
print("[父進程]PPID=%d,PID=%d,GID=%d" % (os.getppid(), os.getpid(),
os.getgid()))
queue = Manager().Queue()
p = Pool()
p.apply_async(pro_test1, args=(queue, ), error_callback=error_callback)
p.apply_async(pro_test2, args=(queue, ), error_callback=error_callback)
p.close()
p.join()
if __name__ == '__main__':
main()
輸出:
[父進程]PPID=4223,PID=31329,GID=1000
[子進程1]PPID=31329,PID=31335,GID=1000
[子進程2]PPID=31329,PID=31336,GID=1000
[子進程1]小明,今晚擼串不?
[子進程2]不去,我今天約了妹子
real 0m6.134s
user 0m0.133s
sys 0m0.035s
再拋個思考題:(Linux)
import os
import time
from multiprocessing import Pool, Queue
def error_callback(msg):
print(msg)
q = Queue()
def pro_test1():
global q
print("[子進程1]PPID=%d,PID=%d,GID=%d" % (os.getppid(), os.getpid(),
os.getgid()))
q.put("[子進程1]小明,今晚擼串不?")
# 設置一個簡版的重試機制(三次重試)
for i in range(3):
if not q.empty():
print(q.get())
break
else:
time.sleep((i + 1) * 2) # 第一次1s,第二次4s,第三次6s
def pro_test2():
global q
print("[子進程2]PPID=%d,PID=%d,GID=%d" % (os.getppid(), os.getpid(),
os.getgid()))
print(q.get())
time.sleep(4) # 模擬一下網絡延遲
q.put("[子進程2]不去,我今天約了妹子")
def main():
print("[父進程]PPID=%d,PID=%d,GID=%d" % (os.getppid(), os.getpid(),
os.getgid()))
q = Queue()
p = Pool()
p.apply_async(pro_test1, error_callback=error_callback)
p.apply_async(pro_test2, error_callback=error_callback)
p.close()
p.join()
if __name__ == '__main__':
main()
輸出:(為啥這樣也可以【提示:fork
】)
[父進程]PPID=12855,PID=16879,GID=1000
[子進程1]PPID=16879,PID=16880,GID=1000
[子進程2]PPID=16879,PID=16881,GID=1000
[子進程1]小明,今晚擼串不?
[子進程2]不去,我今天約了妹子
real 0m6.120s
user 0m0.105s
sys 0m0.024s
進程拓展¶
官方參考:https://docs.python.org/3/library/multiprocessing.html
1.上下文系¶
- spawn:(Win默認,Linux下也可以用【>=3.4】)
- 父進程啟動一個新的python解釋器進程。
- 子進程只會繼承運行進程對象run()方法所需的那些資源。
- 不會繼承父進程中不必要的文件描述符和句柄。
- 與使用fork或forkserver相比,使用此方法啟動進程相當慢。
- 可在Unix和Windows上使用。Windows上的默認設置。
- fork:(Linux下默認)
- 父進程用於os.fork()分叉Python解釋器。
- 子進程在開始時與父進程相同(這時候內部變量之類的還沒有被修改)
- 父進程的所有資源都由子進程繼承(用到多線程的時候可能有些問題)
- 僅適用於Unix。Unix上的默認值。
- forkserver:(常用)
- 當程序啟動並選擇forkserver start方法時,將啟動服務器進程。
- 從那時起,每當需要一個新進程時,父進程就會連接到服務器並請求它分叉一個新進程。
- fork服務器進程是單線程的,因此它可以安全使用os.fork()。沒有不必要的資源被繼承。
- 可在Unix平台上使用,支持通過Unix管道傳遞文件描述符。
這塊官方文檔很詳細,貼下官方的2個案例:
通過multiprocessing.set_start_method(xxx)
來設置啟動的上下文類型
import multiprocessing as mp
def foo(q):
q.put('hello')
if __name__ == '__main__':
mp.set_start_method('spawn') # 不要過多使用
q = mp.Queue()
p = mp.Process(target=foo, args=(q,))
p.start()
print(q.get())
p.join()
輸出:(set_start_method
不要過多使用)
hello
real 0m0.407s
user 0m0.134s
sys 0m0.012s
如果你把設置啟動上下文注釋掉:(消耗的總時間少了很多)
real 0m0.072s
user 0m0.057s
sys 0m0.016s
也可以通過multiprocessing.get_context(xxx)
獲取指定類型的上下文
import multiprocessing as mp
def foo(q):
q.put('hello')
if __name__ == '__main__':
ctx = mp.get_context('spawn')
q = ctx.Queue()
p = ctx.Process(target=foo, args=(q,))
p.start()
print(q.get())
p.join()
輸出:(get_context
在Python源碼里用的比較多,so=>也建議大家這么用)
hello
real 0m0.169s
user 0m0.146s
sys 0m0.024s
從結果來看,總耗時也少了很多
2.日記系列¶
說下日記相關的事情:
先看下multiprocessing
里面的日記記錄:
# https://github.com/lotapp/cpython3/blob/master/Lib/multiprocessing/context.py
def log_to_stderr(self, level=None):
'''打開日志記錄並添加一個打印到stderr的處理程序'''
from .util import log_to_stderr
return log_to_stderr(level)
更多Loging
模塊內容可以看官方文檔:https://docs.python.org/3/library/logging.html
這個是內部代碼,看看即可:
# https://github.com/lotapp/cpython3/blob/master/Lib/multiprocessing/util.py
def log_to_stderr(level=None):
'''打開日志記錄並添加一個打印到stderr的處理程序'''
# 全局變量默認是False
global _log_to_stderr
import logging
# 日記記錄轉換成文本
formatter = logging.Formatter(DEFAULT_LOGGING_FORMAT)
# 一個處理程序類,它將已適當格式化的日志記錄寫入流
handler = logging.StreamHandler() # 此類不會關閉流,因為用到了sys.stdout|sys.stderr
# 設置格式:'[%(levelname)s/%(processName)s] %(message)s'
handler.setFormatter(formatter)
# 返回`multiprocessing`專用的記錄器
logger = get_logger()
# 添加處理程序
logger.addHandler(handler)
if level:
# 設置日記級別
logger.setLevel(level)
# 現在log是輸出到stderr的
_log_to_stderr = True
return _logger
Logging
之前也有提過,可以看看:https://www.cnblogs.com/dotnetcrazy/p/9333792.html#2.裝飾器傳參的擴展(可傳可不傳)
來個案例:
import logging
from multiprocessing import Process, log_to_stderr
def test():
print("test")
def start_log():
# 把日記輸出定向到sys.stderr中
logger = log_to_stderr()
# 設置日記記錄級別
# 敏感程度:DEBUG、INFO、WARN、ERROR、CRITICAL
print(logging.WARN == logging.WARNING) # 這兩個是一樣的
level = logging.INFO
logger.setLevel(level) # 設置日記級別(一般都是WARN)
# 自定義輸出
# def log(self, level, msg, *args, **kwargs):
logger.log(level, "我是通用格式") # 通用,下面的內部也是調用的這個
logger.info("info 測試")
logger.warning("warning 測試")
logger.error("error 測試")
def main():
start_log()
# 做的操作都會被記錄下來
p = Process(target=test)
p.start()
p.join()
if __name__ == '__main__':
main()
輸出:
True
[INFO/MainProcess] 我是通用格式
[INFO/MainProcess] info 測試
[WARNING/MainProcess] warning 測試
[ERROR/MainProcess] error 測試
[INFO/Process-1] child process calling self.run()
test
[INFO/Process-1] process shutting down
[INFO/Process-1] process exiting with exitcode 0
[INFO/MainProcess] process shutting down
3.進程5態¶
之前忘記說了~現在快結尾了,補充一下進程5態:(來個草圖)
1.6.進程間狀態共享¶
應該盡量避免進程間狀態共享,但需求在那,所以還是得研究,官方推薦了兩種方式:
1.共享內存(Value
or Array
)¶
之前說過Queue
:在Process
之間使用沒問題,用到Pool
,就使用Manager().xxx
,Value
和Array
,就不太一樣了:
看看源碼:(Manager里面的Array和Process共享的Array不是一個概念,而且也沒有同步機制)
# https://github.com/lotapp/cpython3/blob/master/Lib/multiprocessing/managers.py
class Value(object):
def __init__(self, typecode, value, lock=True):
self._typecode = typecode
self._value = value
def get(self):
return self._value
def set(self, value):
self._value = value
def __repr__(self):
return '%s(%r, %r)' % (type(self).__name__, self._typecode, self._value)
value = property(get, set) # 給value設置get和set方法(和value的屬性裝飾器一樣效果)
def Array(typecode, sequence, lock=True):
return array.array(typecode, sequence)
以Process
為例看看怎么用:
from multiprocessing import Process, Value, Array
def proc_test1(value, array):
print("子進程1", value.value)
array[0] = 10
print("子進程1", array[:])
def proc_test2(value, array):
print("子進程2", value.value)
array[1] = 10
print("子進程2", array[:])
def main():
try:
value = Value("d", 3.14) # d 類型,相當於C里面的double
array = Array("i", range(10)) # i 類型,相當於C里面的int
print(type(value))
print(type(array))
p1 = Process(target=proc_test1, args=(value, array))
p2 = Process(target=proc_test2, args=(value, array))
p1.start()
p2.start()
p1.join()
p2.join()
print("父進程", value.value) # 獲取值
print("父進程", array[:]) # 獲取值
except Exception as ex:
print(ex)
else:
print("No Except")
if __name__ == '__main__':
main()
輸出:(Value
和Array
是進程|線程
安全的)
<class 'multiprocessing.sharedctypes.Synchronized'>
<class 'multiprocessing.sharedctypes.SynchronizedArray'>
子進程1 3.14
子進程1 [10, 1, 2, 3, 4, 5, 6, 7, 8, 9]
子進程2 3.14
子進程2 [10, 10, 2, 3, 4, 5, 6, 7, 8, 9]
父進程 3.14
父進程 [10, 10, 2, 3, 4, 5, 6, 7, 8, 9]
No Except
類型方面的對應關系:
typecode_to_type = {
'c': ctypes.c_char,
'u': ctypes.c_wchar,
'b': ctypes.c_byte,
'B': ctypes.c_ubyte,
'h': ctypes.c_short,
'H': ctypes.c_ushort,
'i': ctypes.c_int,
'I': ctypes.c_uint,
'l': ctypes.c_long,
'L': ctypes.c_ulong,
'q': ctypes.c_longlong,
'Q': ctypes.c_ulonglong,
'f': ctypes.c_float,
'd': ctypes.c_double
}
這兩個類型其實是ctypes
類型,更多的類型可以去` multiprocessing.sharedctypes`查看,來張圖: 回頭解決
GIL
的時候會用到C
系列或者Go
系列的共享庫(講線程的時候會說)
關於進程安全的補充說明:對於原子性操作就不用說,鐵定安全,但注意一下i+=1
並不是原子性操作:
from multiprocessing import Process, Value
def proc_test1(value):
for i in range(1000):
value.value += 1
def main():
value = Value("i", 0)
p_list = [Process(target=proc_test1, args=(value, )) for i in range(5)]
# 批量啟動
for i in p_list:
i.start()
# 批量資源回收
for i in p_list:
i.join()
print(value.value)
if __name__ == '__main__':
main()
輸出:(理論上應該是:5×1000=5000)
2153
稍微改一下才行:(進程安全:只是提供了安全的方法,並不是什么都不用你操心了)
# 通用方法
def proc_test1(value):
for i in range(1000):
if value.acquire():
value.value += 1
value.release()
# 官方案例:(Lock可以使用with托管)
def proc_test1(value):
for i in range(1000):
with value.get_lock():
value.value += 1
# 更多可以查看:`sharedctypes.SynchronizedBase` 源碼
輸出:(關於鎖這塊,后面講線程的時候會詳說,看看就好【語法的確比C#麻煩點】)
5000
看看源碼:(之前探討如何優雅的殺死子進程,其中就有一種方法使用了Value
)
def Value(typecode_or_type, *args, lock=True, ctx=None):
'''返回Value的同步包裝器'''
obj = RawValue(typecode_or_type, *args)
if lock is False:
return obj
# 默認支持Lock
if lock in (True, None):
ctx = ctx or get_context() # 獲取上下文
lock = ctx.RLock() # 獲取遞歸鎖
if not hasattr(lock, 'acquire'):
raise AttributeError("%r has no method 'acquire'" % lock)
# 一系列處理
return synchronized(obj, lock, ctx=ctx)
def Array(typecode_or_type, size_or_initializer, *, lock=True, ctx=None):
'''返回RawArray的同步包裝器'''
obj = RawArray(typecode_or_type, size_or_initializer)
if lock is False:
return obj
# 默認是支持Lock的
if lock in (True, None):
ctx = ctx or get_context() # 獲取上下文
lock = ctx.RLock() # 遞歸鎖屬性
# 查看是否有acquire屬性
if not hasattr(lock, 'acquire'):
raise AttributeError("%r has no method 'acquire'" % lock)
return synchronized(obj, lock, ctx=ctx)
擴展部分可以查看這篇文章:http://blog.51cto.com/11026142/1874807
2.服務器進程(Manager
)¶
官方文檔:https://docs.python.org/3/library/multiprocessing.html#managers
有一個服務器進程負責維護所有的對象,而其他進程連接到該進程,通過代理對象操作服務器進程當中的對象
通過返回的經理Manager()
將支持類型list、dict、Namespace、Lock、RLock、Semaphore、BoundedSemaphore、Condition、Event、Barrier、Queue
舉個簡單例子(后面還會再說):(本質其實就是多個進程通過代理,共同操作服務端內容
)
from multiprocessing import Pool, Manager
def test1(d, l):
d[1] = '1'
d['2'] = 2
d[0.25] = None
l.reverse()
def test2(d, l):
print(d)
print(l)
def main():
with Manager() as manager:
dict_test = manager.dict()
list_test = manager.list(range(10))
pool = Pool()
pool.apply_async(test1, args=(dict_test, list_test))
pool.apply_async(test2, args=(dict_test, list_test))
pool.close()
pool.join()
if __name__ == '__main__':
main()
輸出:
{1: '1', '2': 2, 0.25: None}
[9, 8, 7, 6, 5, 4, 3, 2, 1, 0]
服務器進程管理器比使用共享內存對象更靈活,因為它們可以支持任意對象類型。此外,單個管理器可以通過網絡在不同計算機上的進程共享。但是,它們比使用共享內存慢(畢竟有了“中介”
)
同步問題依然需要注意一下,舉個例子體會一下:
from multiprocessing import Manager, Process, Lock
def test(dict1, lock):
for i in range(100):
with lock: # 你可以把這句話注釋掉,然后就知道為什么加了
dict1["year"] += 1
def main():
with Manager() as m:
lock = Lock()
dict1 = m.dict({"year": 2000})
p_list = [Process(target=test, args=(dict1, lock)) for i in range(5)]
for i in p_list:
i.start()
for i in p_list:
i.join()
print(dict1)
if __name__ == '__main__':
main()
擴展補充:
multiprocessing.Lock
是一個進程安全對象,因此您可以將其直接傳遞給子進程並在所有進程中安全地使用它。- 大多數可變Python對象(如list,dict,大多數類)不能保證進程中安全,所以它們在進程間共享時需要使用
Manager
- 多進程模式的缺點是創建進程的代價大,在
Unix/Linux
系統下,用fork
調用還行,在Windows
下創建進程開銷巨大。
Manager這塊官方文檔很詳細,可以看看:https://docs.python.org/3/library/multiprocessing.html#managers
WinServer
的可以參考這篇 or 這篇埋坑記(Manager一般都是部署在Linux的,Win的客戶端不影響)
擴展補充¶
還記得之前的:無法將multiprocessing.Queue對象傳遞給Pool方法嗎?其實一般都是這兩種方式解決的:
- 使用Manager需要生成另一個進程來托管Manager服務器。 並且所有獲取/釋放鎖的調用都必須通過IPC發送到該服務器。
- 使用初始化程序在池創建時傳遞常規
multiprocessing.Queue()
這將使Queue
實例在所有子進程中全局共享
再看一下Pool的__init__
方法:
# processes:進程數
# initializer,initargs 初始化進行的操作
# maxtaskperchild:每個進程執行task的最大數目
# contex:上下文對象
def __init__(self, processes=None, initializer=None, initargs=(),
maxtasksperchild=None, context=None):
第一種方法不夠輕量級,在講案例前,稍微說下第二種方法:(也算把上面留下的懸念解了)
import os
import time
from multiprocessing import Pool, Queue
def error_callback(msg):
print(msg)
def pro_test1():
print("[子進程1]PPID=%d,PID=%d" % (os.getppid(), os.getpid()))
q.put("[子進程1]小明,今晚擼串不?")
# 設置一個簡版的重試機制(三次重試)
for i in range(3):
if not q.empty():
print(q.get())
break
else:
time.sleep((i + 1) * 2) # 第一次1s,第二次4s,第三次6s
def pro_test2():
print("[子進程2]PPID=%d,PID=%d" % (os.getppid(), os.getpid()))
print(q.get())
time.sleep(4) # 模擬一下網絡延遲
q.put("[子進程2]不去,我今天約了妹子")
def init(queue):
global q
q = queue
def main():
print("[父進程]PPID=%d,PID=%d" % (os.getppid(), os.getpid()))
queue = Queue()
p = Pool(initializer=init, initargs=(queue, ))
p.apply_async(pro_test1, error_callback=error_callback)
p.apply_async(pro_test2, error_callback=error_callback)
p.close()
p.join()
if __name__ == '__main__':
main()
輸出:(就是在初始化Pool的時候,傳了初始化執行的方法並傳了參數:alizer=init, initargs=(queue, ))
)
[父進程]PPID=13157,PID=24864
[子進程1]PPID=24864,PID=24865
[子進程2]PPID=24864,PID=24866
[子進程1]小明,今晚擼串不?
[子進程2]不去,我今天約了妹子
real 0m6.105s
user 0m0.071s
sys 0m0.042s
Win下亦通用(win下沒有os.getgid
)
1.7.分布式進程的案例¶
有了1.6
的基礎,咱們來個例子練練:
BaseManager
的縮略圖:
服務器端代碼:
from multiprocessing import Queue
from multiprocessing.managers import BaseManager
def main():
# 用來身份驗證的
key = b"8d969eef6ecad3c29a3a629280e686cf0c3f5d5a86aff3ca12020c923adc6c92"
get_zhang_queue = Queue() # 小張消息隊列
get_ming_queue = Queue() # 小明消息隊列
# 把Queue注冊到網絡上, callable參數關聯了Queue對象
BaseManager.register("get_zhang_queue", callable=lambda: get_zhang_queue)
BaseManager.register("get_ming_queue", callable=lambda: get_ming_queue)
# 實例化一個Manager對象。綁定ip+端口, 設置驗證秘鑰
manager = BaseManager(address=("192.168.36.235", 5438), authkey=key)
# 運行serve
manager.get_server().serve_forever()
if __name__ == '__main__':
main()
客戶端代碼1:
from multiprocessing.managers import BaseManager
def main():
"""客戶端1"""
key = b"8d969eef6ecad3c29a3a629280e686cf0c3f5d5a86aff3ca12020c923adc6c92"
# 注冊對應方法的名字(從網絡上獲取Queue)
BaseManager.register("get_ming_queue")
BaseManager.register("get_zhang_queue")
# 實例化一個Manager對象。綁定ip+端口, 設置驗證秘鑰
m = BaseManager(address=("192.168.36.235", 5438), authkey=key)
# 連接到服務器
m.connect()
q1 = m.get_zhang_queue() # 在自己隊列里面留言
q1.put("[小張]小明,老大明天是不是去外地辦事啊?")
q2 = m.get_ming_queue() # 獲取小明說的話
print(q2.get())
if __name__ == '__main__':
main()
客戶端代碼2:
from multiprocessing.managers import BaseManager
def main():
"""客戶端2"""
key = b"8d969eef6ecad3c29a3a629280e686cf0c3f5d5a86aff3ca12020c923adc6c92"
# 注冊對應方法的名字(從網絡上獲取Queue)
BaseManager.register("get_ming_queue")
BaseManager.register("get_zhang_queue")
# 實例化一個Manager對象。綁定ip+端口, 設置驗證秘鑰
m = BaseManager(address=("192.168.36.235", 5438), authkey=key)
# 連接到服務器
m.connect()
q1 = m.get_zhang_queue() # 獲取小張說的話
print(q1.get())
q2 = m.get_ming_queue() # 在自己隊列里面留言
q2.put("[小明]這幾天咱們終於可以不加班了(>_<)")
if __name__ == '__main__':
main()
輸出圖示:
服務器運行在Linux的測試:
其實還有一部分內容沒說,明天得出去辦點事,先到這吧,后面找機會繼續帶一下
參考文章:
進程共享的探討:python-sharing-a-lock-between-processes
多進程鎖的探討:trouble-using-a-lock-with-multiprocessing-pool-pickling-error
JoinableQueue擴展:https://www.cnblogs.com/smallmars/p/7093603.html
Python多進程編程:https://www.cnblogs.com/kaituorensheng/p/4445418.html
有深度但需要辯證看的兩篇文章:
跨進程對象共享:http://blog.ftofficer.com/2009/12/python-multiprocessing-3-about-queue
關於Queue:http://blog.ftofficer.com/2009/12/python-multiprocessing-2-object-sharing-across-process
NetCore並發編程¶
Python的線程、並行、協程下次說
示例代碼:https://github.com/lotapp/BaseCode/tree/master/netcore/4_Concurrency
先簡單說下概念(其實之前也有說,所以簡說下):
- 並發:同時做多件事情
- 多線程:並發的一種形式
- 並行處理:多線程的一種(線程池產生的一種並發類型,eg:異步編程)
- 響應式編程:一種編程模式,對事件進行響應(有點類似於JQ的事件)
Net里面很少用進程,在以前基本上都是線程+池+異步+並行+協程
我這邊簡單引入一下,畢竟主要是寫Python的教程,Net只是幫你們回顧一下,如果你發現還沒聽過這些概念,或者你的項目中還充斥着各種Thread
和ThreadPool
的話,真的得系統的學習一下了,現在官網的文檔已經很完善了,記得早幾年啥都沒有,也只能挖那些外國開源項目:
https://docs.microsoft.com/zh-cn/dotnet/standard/parallel-processing-and-concurrency
1.異步編程(Task)¶
Task的目的其實就是為了簡化Thread
和ThreadPool
的代碼,下面一起看看吧:
異步用起來比較簡單,一般IO,DB,Net用的比較多,很多時候都會采用重試機制,舉個簡單的例子:
/// <summary>
/// 模擬一個網絡操作(別忘了重試機制)
/// </summary>
/// <param name="url">url</param>
/// <returns></returns>
private async static Task<string> DownloadStringAsync(string url)
{
using (var client = new HttpClient())
{
// 設置第一次重試時間
var nextDelay = TimeSpan.FromSeconds(1);
for (int i = 0; i < 3; i++)
{
try
{
return await client.GetStringAsync(url);
}
catch { }
await Task.Delay(nextDelay); // 用異步阻塞的方式防止服務器被太多重試給阻塞了
nextDelay *= 2; // 3次重試機會,第一次1s,第二次2s,第三次4s
}
// 最后一次嘗試,錯誤就拋出
return await client.GetStringAsync(url);
}
}
然后補充說下Task異常的問題,當你await的時候如果有異常會拋出,在第一個await處捕獲處理即可
如果async
和await
就是理解不了的可以這樣想:async
就是為了讓await
生效(為了向后兼容)
對了,如果返回的是void,你設置成Task就行了,觸發是類似於事件之類的方法才使用void,不然沒有返回值都是使用Task
項目里經常有這么一個場景:等待一組任務完成后再執行某個操作,看個引入案例:
/// <summary>
/// 1.批量任務
/// </summary>
/// <param name="list"></param>
/// <returns></returns>
private async static Task<string[]> DownloadStringAsync(IEnumerable<string> list)
{
using (var client = new HttpClient())
{
var tasks = list.Select(url => client.GetStringAsync(url)).ToArray();
return await Task.WhenAll(tasks);
}
}
再舉一個場景:同時調用多個同效果的API,有一個返回就好了,其他的忽略
/// <summary>
/// 2.返回首先完成的Task
/// </summary>
/// <param name="list"></param>
/// <returns></returns>
private static async Task<string> GetIPAsync(IEnumerable<string> list)
{
using (var client = new HttpClient())
{
var tasks = list.Select(url => client.GetStringAsync(url)).ToArray();
var task = await Task.WhenAny(tasks); // 返回第一個完成的Task
return await task;
}
}
一個async方法被await調用后,當它恢復運行時就會回到原來的上下文中運行。
如果你的Task不再需要上下文了可以使用:task.ConfigureAwait(false)
,eg:寫個日記還要啥上下文?
逆天的建議是:在核心代碼里面一種使用ConfigureAwait
,用戶頁面相關代碼,不需要上下文的加上
其實如果有太多await在上下文里恢復那也是比較卡的,使用ConfigureAwait
之后,被暫停后會在線程池里面繼續運行
再看一個場景:比如一個耗時操作,我需要指定它的超時時間:
/// <summary>
/// 3.超時取消
/// </summary>
/// <returns></returns>
private static async Task<string> CancellMethod()
{
//實例化取消任務
var cts = new CancellationTokenSource();
cts.CancelAfter(TimeSpan.FromSeconds(3)); // 設置失效時間為3s
try
{
return await DoSomethingAsync(cts.Token);
}
// 任務已經取消會引發TaskCanceledException
catch (TaskCanceledException ex)
{
return "false";
}
}
/// <summary>
/// 模仿一個耗時操作
/// </summary>
/// <returns></returns>
private static async Task<string> DoSomethingAsync(CancellationToken token)
{
await Task.Delay(TimeSpan.FromSeconds(5), token);
return "ok";
}
異步這塊簡單回顧就不說了,留兩個擴展,你們自行探討:
- 進度方面的可以使用
IProgress<T>
,就當留個作業自己摸索下吧~ - 使用了異步之后盡量避免使用
task.Wait
ortask.Result
,這樣可以避免死鎖
Task其他新特征去官網看看吧,引入到此為止了。
2.並行編程(Parallel)¶
這個其實出來很久了,現在基本上都是用PLinq
比較多點,主要就是:
- 數據並行:重點在處理數據(eg:聚合)
- 任務並行:重點在執行任務(每個任務塊盡可能獨立,越獨立效率越高)
數據並行¶
以前都是Parallel.ForEach
這么用,現在和Linq結合之后非常方便.AsParallel()
就OK了
說很抽象看個簡單案例:
static void Main(string[] args)
{
IEnumerable<int> list = new List<int>() { 1, 2, 3, 4, 5, 7, 8, 9 };
foreach (var item in ParallelMethod(list))
{
Console.WriteLine(item);
}
}
/// <summary>
/// 舉個例子
/// </summary>
private static IEnumerable<int> ParallelMethod(IEnumerable<int> list)
{
return list.AsParallel().Select(x => x * x);
}
正常執行的結果應該是:
1
4
9
25
64
16
49
81
並行之后就是這樣了(不管順序了):
25
64
1
9
49
81
4
16
當然了,如果你就是對順序有要求可以使用:.AsOrdered()
/// <summary>
/// 舉個例子
/// </summary>
private static IEnumerable<int> ParallelMethod(IEnumerable<int> list)
{
return list.AsParallel().AsOrdered().Select(x => x * x);
}
其實實際項目中,使用並行的時候:任務時間適中,太長不適合,太短也不適合
記得大家在項目里經常會用到如Sum
,Count
等聚合函數,其實這時候使用並行就很合適
var list = new List<long>();
for (long i = 0; i < 1000000; i++)
{
list.Add(i);
}
Console.WriteLine(GetSumParallel(list));
private static long GetSumParallel(IEnumerable<long> list)
{
return list.AsParallel().Sum();
}
time dotnet PLINQ.dll
499999500000
real 0m0.096s
user 0m0.081s
sys 0m0.025s
不使用並行:(稍微多了點,CPU越密集差距越大)
499999500000
real 0m0.103s
user 0m0.092s
sys 0m0.021s
其實聚合有一個通用方法,可以支持復雜的聚合:(以上面sum為例)
.Aggregate(
seed:0,
func:(sum,item)=>sum+item
);
稍微擴展一下,PLinq也是支持取消的,.WithCancellation(CancellationToken)
Token的用法和上面一樣,就不復述了,如果需要和異步結合,一個Task.Run
就可以把並行任務交給線程池了
也可以使用Task的異步方法,設置超時時間,這樣PLinq超時了也就終止了
PLinq這么方便,其實也是有一些小弊端的,比如它會直接最大程度的占用系統資源,可能會影響其他的任務,而傳統的Parallel則會動態調整
任務並行(並行調用)¶
這個PLinq好像沒有對應的方法,有新語法你可以說下,來舉個例子:
await Task.Run(() =>
Parallel.Invoke(
() => Task.Delay(TimeSpan.FromSeconds(3)),
() => Task.Delay(TimeSpan.FromSeconds(2))
));
取消也支持:
Parallel.Invoke(new ParallelOptions() { CancellationToken = token }, actions);
擴充說明¶
其實還有一些比如數據流和響應編程沒說,這個之前都是用第三方庫,剛才看官網文檔,好像已經支持了,所以就不賣弄了,感興趣的可以去看看,其實項目里面有流數據相關的框架,eg:Spark
,都是比較成熟的解決方案了基本上也不太使用這些了。
然后還有一些沒說,比如NetCore里面不可變類型(列表、字典、集合、隊列、棧、線程安全字典等等)以及限流、任務調度等,這些關鍵詞我提一下,也方便你去搜索自己學習拓展
先到這吧,其他的自己探索一下吧,最后貼一些Nuget庫,你可以針對性的使用:
- 數據流:
Microsoft.Tpl.Dataflow
- 響應編程(Linq的Rx操作):
Rx-Main
- 不可變類型:
Microsoft.Bcl.Immutable
不得不感慨一句,微軟媽媽真的花了很多功夫,Net的並發編程比Python省心多了(完)