進程


https://www.cnblogs.com/Eva-J/articles/8253549.html 參考鏈接

multiprocess模塊

進程的生命周期:

 

1.主進程

2.子進程

 

開啟子進程的主進程:

 

主進程自己的代碼如果長,等待自己的代碼執行結束。

 

子進程的執行時間長,主進程會在主進程代碼執行完畢后等待子進程執行完畢后 主進程才結束

 

開啟一個進程

from multiprocessing import Process
import time


def func():
    print('我是一個子進程')


if __name__ == '__main__':
    # p是一個進程對象 還沒有啟動進程
    p = Process(target=func)  # 主進程

    # 啟動一個子進程. 操作系統創建新進程執行新進程中的代碼
    p.start()

    # 主進程
    # 一般都是異步開啟子進程,主進程先執行
    print('riven')
    print('mark')
    print('mimi')

 

 

傳參和查看進程號

Os.getpid :查看當前進程的進程號。

Os.getppid :查看當前進程的父進程號。

from multiprocessing import Process
import time
import os


# 傳值給子進程
def func(args, kwargs):
    print(args)
    print(kwargs)

    # 查看當前進程進程號.
    print(os.getpid())

    # 查看當前進程父進程號.
    print(os.getppid())


if __name__ == '__main__':
    # args = 傳入的參數
    p = Process(target=func, args=("我的乖乖", "我太難了"))  # 主進程

    # 啟動一個子進程. 操作系統創建新進程執行新進程中的代碼
    p.start()

    # 主進程
    # 一般都是異步開啟子進程,主進程先執行
    print('riven')
    print('mark')
    print('mimi')

    # 查看當前進程父進程號.
    print(os.getppid())

    # 查看當前進程進程號.
    print(os.getpid())

Join

加了join 將先執行子進程 再執行主進程。

from multiprocessing import Process
import time
import os


# 傳值給子進程
def func(args, kwargs):
    print(args)
    print(kwargs)


if __name__ == '__main__':
    # args = 傳入的參數
    p = Process(target=func, args=("我的乖乖", "我太難了"))  # 主進程

    # 啟動一個子進程. 操作系統創建新進程執行新進程中的代碼.
    # 感知一個子程序的結束,將異步程序改為同步.
    p.start()

    # 子進程
    p.join()
    print('先執行子進程 再執行主進程')

 

 

 

執行多個子進程(兩種方法)

1.基於函數

from multiprocessing import Process
import time
import os


# 傳值給子進程
def func(args):
    print('#' * args)


if __name__ == '__main__':
    # 啟動多個子進程
    re = []
    for i in range(20):
        p = Process(target=func, args=(i,))  # 主進程

        tt = re.append(p)

        # 1.啟動一個子進程. 操作系統創建新進程執行新進程中的代碼.
        # 2.感知一個子程序的結束,將異步程序改為同步.
        p.start()

        # 主進程
        # join = 先執行子進程 再執行主進程
        p.join()
    print('先執行子進程 再執行主進程')

2.基於類

from multiprocessing import Process
import time
import os


# 子進程
class Myprocess(Process):
    # 添加init屬性
    def __init__(self, arg1, arg2):
        super().__init__()
        self.arg1 = arg1
        self.arg2 = arg2

    # 必須執行一個run方法
    def run(self):

        # 查看當前進程的進程號
        print(self.pid)
        # 查看當前進程的名稱 
        print(self.name)

        print(self.arg1)
        print(self.arg2)


if __name__ == '__main__':
    # 啟動多個子進程
    for i in range(20):
        p = Myprocess('這是一個好的開始', '代碼改變世界')

        # 1.啟動一個子進程. 操作系統創建新進程執行新進程中的代碼.
        # 2.感知一個子程序的結束,將異步程序改為同步.
        p.start()

        p = Myprocess('good idea', '我想你了')
        p.start()

        # 主進程
        # join = 先執行子進程 再執行主進程
        p.join()
    print('先執行子進程 再執行主進程')

進程與進程之間的變量問題

from multiprocessing import Process
import time
import os


# 子進程
class Myprocess(Process):
    # 添加init屬性
    def __init__(self, arg1, arg2):
        super().__init__()
        self.arg1 = arg1
        self.arg2 = arg2

    # 必須執行一個run方法
    def run(self):
        global n
        n = 0


if __name__ == '__main__':
    # 啟動多個子進程
    for i in range(20):
        p = Myprocess('這是一個好的開始', '代碼改變世界')

        # 1.啟動一個子進程. 操作系統創建新進程執行新進程中的代碼.
        # 2.感知一個子程序的結束,將異步程序改為同步.
        p.start()

        # 主進程
        # join = 先執行子進程 再執行主進程
        p.join()
    print(n)

# PS:在每個進程中定義的變量,只能在本進程中使用

 

 

 

進程之間實現聊天

服務端

# 進程之間實現聊天
import socket
from multiprocessing import Process


# 子進程
def server(conn):
    # 接受數據
    ret = conn.recv(1024).decode('utf-8')
    print(ret)
    conn.send(b'Hello')


if __name__ == '__main__':

    # 創建一個socket
    sk = socket.socket()

    # 創建一個域名和端口
    sk.bind(('127.0.0.1', 8070))
    # 監聽客戶端的連接
    sk.listen()

    # 接受客戶端的數據
    conn, addr = sk.accept()

    # while 1:
        # 這個循環沒有一直在啟動進程,因為socket會亢住等待客戶端連接
    p = Process(target=server, args=(conn,))
    p.start()

客戶端

import socket
from multiprocessing import Process


def client(sk, msg):
    sk.send(bytes(msg, encoding='utf-8'))
    ret = sk.recv(1024)
    print(ret)


if __name__ == '__main__':
    sk = socket.socket()
    sk.connect(('127.0.0.1', 8070))
    # while 1:
    msg = input()
    p = Process(target=client, args=(sk, msg))
    p.start()

 

守護進程

p.terminate() :在主程序內結束一個子進程。 p.is_alive() :檢驗一個進程是否還活着的狀態。 p.name)() :這個進程的名字。 p.pid() :這個進程的進程號。

 

from multiprocessing import  Process
import time

def fun1():
    while 1:   # 給主進程 反饋信息,證明自己在運行。
        time.sleep(0.5)
        print('我還活着呢')




if __name__ == '__main__':
    p = Process(target=fun1)

    # 設置子進程為守護進程
    p.daemon = True
    p.start()

    # 結束一個子進程
    p.terminate()

    i = 0
    while i < 10 :
        print('我是主進程')
        time.sleep(1)

        # 檢驗一個主進程 是否還活着
        i = i + 1

# 守護進程會隨着主進程的代碼執行完畢而結束

 

 

 LOCK鎖

Lock:一次只能執行一個子程序,而且只能等執行完之后才能執行下一個。

 

不加鎖會造成數據不安全的操作。

 

import json
from multiprocessing import Process
from multiprocessing import Lock
import time


# 修改數據庫必須加鎖
def show(i):
    with open('ticket') as f:
        str = f.read()
        obj = json.loads(str)
        print('%s號查看了 余票: %s' % (i, obj['ticket']))


def buy_ticket(i, lock):

    # 拿鑰匙進門
    lock.acquire()

    with open('ticket') as f:
        str = f.read()
        obj = json.loads(str)
        time.sleep(0.1)

    if obj['ticket'] > 0:
        obj['ticket'] -= 1
        print('\033[34m%s 買到票了 \033[0m ' % i)

    else:
        print('\033[34m%s 沒買到票 \033[0m' % i)

    with open('ticket', 'w') as f1:
        f1.write(json.dumps(obj))

    # 還鑰匙
    lock.release()


if __name__ == '__main__':
    for i in range(10):
        p = Process(target=show, args=(i,))
        p.start()
    lock = Lock()

    for i in range(10):
        p = Process(target=buy_ticket, args=(i, lock))
        p.start()

 

 

信號量(Semaphore

 

Semaphore:                用鎖的原理實現的,內置了一個記數器。在同一時間只能有指定數量的進程執行某一段被控制住的代碼。

            

            # 限定進程訪問次數,指定一次進2個人 等他們出來后其他人才能進去

Sem.acquire():            獲取鑰匙。

Sem.release():             還鑰匙。

 

 

import time
from multiprocessing import Semaphore, Process
import random


def ktv(i, sem):
    # 獲取鑰匙
    sem.acquire()
    
    print('%s 走進ktv' % i)
    time.sleep(random.randint(1, 5))
    print('%s走出ktv' % i)
    
    # 歸還鑰匙
    sem.release()


if __name__ == '__main__':

    # 限定進程訪問次數,指定一次進2個人 等他們出來后其他人才能進去
    sem = Semaphore(2)
    
    for i in range(20):
        p = Process(target=ktv, args=(i, sem))
        p.start()

 

事件(Event

 

Set 和 cleat 
           分別用來修改一個事件的狀態 Ture或者 False
Is__set    
               用來查看一個事件的狀態
Wait 
 是依據事件的狀態來決定自己是否阻塞 False阻塞 True 不阻塞

 

# 事件(Event)
from multiprocessing import Event

# 一個信號可以使所有的進程都進入阻塞狀態
# 也可以控制所有的進程解除阻塞
# 一個世界被創建之后,默認是阻塞狀態

# 創建一個事件
e = Event()

# 查看一個事件的狀態,默認被設置成阻塞false
print(e.is_set())

# 將這個事件的狀態改為Ture
e.set()

# 是依據 e.is_set() 的值 來決定是否阻塞
e.wait()

# 查看一個事件的狀態,默認被設置成阻塞false
print(e.is_set())

# 將這個事件的狀態改為False
print(e.clear())

# 是依據 e.is_set() 的值 來決定是否阻塞
e.wait()

紅綠燈效應

 

from multiprocessing import Process, Event
import time
import random


def car(i, e):
    if e.is_set():
        print('車%s在等待' % i)
        e.wait()
    else:
        print('\033[33m車%s通過了\033[0m'%i)


def lint(e):
    while 1:
        # 判斷這個事件是否為Ture
        if e.is_set():
            print('\033[31m 紅燈亮了 \033[0m')
            time.sleep(2)

            # 將這個事件的狀態改為False
            e.clear()

        else:
            print('\033[32m 綠燈亮了 \033[0m')
            time.sleep(2)

            # 將這個事件的狀態改為Ture
            e.set()


if __name__ == '__main__':
    e = Event()
    p = Process(target=lint, args=(e,))
    p.start()
    for i in range(20):
        p1 = Process(target=car, args=(i, e))
        p1.start()
        time.sleep(random.randint(1, 2))

 

進程間的通信 ----隊列和管道

隊列

 

q = Queue(5)         #創建共享的進程隊列,如果省略此參數,則無大小限制。

q.put(1)                 #將1放入隊列。如果隊列已滿,此方法將阻塞至有空間可用為止.

q.full()                    #用於判斷隊列是否已經滿了。

q.get()                    #返回q中的一個項目。如果q為空,此方法將阻塞,直到隊列中有項目可用為止

q.get_nowait()      #和get方法一樣 但是q如果為空的話會報錯。

q.empty()               #用於判斷隊列是否已經為空。

Full  empty            #不完全准確。

 

from multiprocessing import Queue

# 創建共享的進程隊列,如果省略此參數,則無大小限制.
q = Queue(5)

# 將1 放入隊列。 如果隊列已滿,此方法將阻塞至有空間可用為止
q.put(1)
q.put(2)
q.put(3)
q.put(4)
q.put(5)

# 判斷隊列是否已經滿了。
print(q.full())

# 返回q中的一個項目。如果q為空,此方法將阻塞,直到隊列中有項目可用為止。
print(q.get())
print(q.get())
print(q.get())
print(q.get())
print(q.get())

#  get_nowait 和get方法一樣不過 get_nowait 如果發現沒有取到值就會報錯
try:
    print(q.get_nowait())
except Exception:
    print('隊列已經空了')

# 用於判斷隊列是否為空
print(q.empty())

 

子進程是與主進程之間通信的

 

from multiprocessing import Queue
from multiprocessing import Process

def func(e):

    # 放入隊列。如果隊列已經滿,此方法將阻塞至有空間可用為止
    e.put('我是Mark  I am Strong man')


if __name__ == '__main__':
    e = Queue()
    q = Process(target=func,args=(e,))
    q.start()

    # 在隊列中 獲取子進程放進來的值
    print(e.get())

 

子進程是可以與子進程之間通信的

 

例子: 生產者消費者模型

 

# 生產者
def producer(name, food, q):
    for i in range(20):
        time.sleep(random.random())

        f = "%s 制作了的第%s個%s" % (name, i, food)
        print(f)

        # 將數據放入隊列中
        q.put(f)


# 消費者
def chibaozi(name, q):
    while 1:
        # 在隊列中取值
        food = q.get()

        # 不能用字符形式格式,需要用is關鍵字才能和none配合
        if food is None:
            break
        print("%s 消費了 %s" % (name, food))


if __name__ == '__main__':
    # 創建一個隊列
    q = Queue()

    # 生產者
    qq = Process(target=producer, args=('Mark', '包子', q))
    qq1 = Process(target=producer, args=('Riven', '饅頭', q))

    # 消費之
    qq2 = Process(target=chibaozi, args=('黃埔', q))
    qq3 = Process(target=chibaozi, args=('佘義', q))

    # 統一啟動子進程
    qq.start()
    qq1.start()
    qq2.start()
    qq3.start()

    # 先執行子程序,后執行主程序代碼
    qq.join()
    qq1.join()

    # 放入None 讓消費者跳出循環
    q.put(None)
    q.put(None)

 

 

JoinableQueue

 

例 : 進階版(生產消費者模型)

 

q.join()  

  # 阻塞 直到一個隊列中的所有數據 全部被執行完畢。接受消費端發送過來的標記。

  生產者將使用此方法進行阻塞,直到隊列中所有項目均被處理。

  阻塞將持續到為隊列中的每個項目均調用q.task_done()方法為止。

 

q.task_done()   

內部執行了一個 count - 1的操作,發送信號給q.join使用者使用此方法發出信號,表示q.get()返回的項目已經被處理。

from multiprocessing import JoinableQueue
from multiprocessing import Process
import random
import time


# 生產者
def producer(name, food, q):
    for i in range(20):
        time.sleep(random.random())

        f = "%s 制作了的第%s個%s" % (name, i, food)
        print(f)

        # 將數據放入隊列中
        q.put(f)

    # 阻塞 直到一個隊列中的所有數據 全部被執行完畢。接受消費端發送過來的標記.
    q.join()


# 消費者
def chibaozi(name, q):
    while 1:
        # 在隊列中取值
        food = q.get()
        print("%s 消費了 %s" % (name, food))

        time.sleep(random.randint(1,3))


        # 在消費者這一端:每次獲取一個數據 處理一個數據.  發送一個記號:標志一個數據被處理成功
        # 內部執行了一個 count - 1 的操作
        q.task_done()


if __name__ == '__main__':
    # 創建一個隊列
    q = JoinableQueue()

    # 生產者
    qq = Process(target=producer, args=('Mark', '包子', q))
    qq1 = Process(target=producer, args=('Riven', '饅頭', q))

    # 消費者
    qq2 = Process(target=chibaozi, args=('黃埔', q))
    qq3 = Process(target=chibaozi, args=('佘義', q))

    # 統一啟動子進程
    qq.start()
    qq1.start()

    # 設置 消費者 為守護進程 主進程中的代碼執行完畢之后,子進程自動結束
    qq2.daemon =True
    qq3.daemon =True

    qq2.start()
    qq3.start()

    # 感知一個子程序的結束
    qq.join()
    qq1.join()

文字 總結:

#在生產者這一端:
     #每一次生產一個數據
     #且每一次生產的數據都放這隊列中
     #在隊列中刻上一個記號
     #當生產者全部生產完畢之后
     #join 信號:已經停止生產數據了
            #且要等待之前被刻上的記號都被消費完
            #當數據都被處理完時,join阻塞結束
#消費端  中把所有的任務消耗完
#生產端  中的join感知到,停止阻塞
#所有 生產端 進程結束
#主進程中的p.join結束
#主進程中代碼結束


#守護進程(消費者進程)結束.

在消費着者這一端:
    #每次獲取一個數據
    #處理一個數據
    #發送一個記號:標志一個數據被處理成功

 

管道

數據不安全性

#IPC。

#加鎖來控制操作管道的行為 來避免進程之間爭搶數據造成的數據不安全現象。

 

#隊列 進程之間數據安全。

#管道 + 鎖。

 

 例1、

from multiprocessing import Pipe

# 接受2個地址
conn1, conn2 = Pipe()

# conn1 是發送端,
conn1.send('123456')


# conn2 是接受端
print(conn2.recv())

例2、

from multiprocessing import Pipe
from multiprocessing import Process


def func(conn1):
    # 發送消息
    conn1.send('大傻我愛你')


if __name__ == "__main__":

    # 接受2個端口
    conn1, conn2 = Pipe()

    p = Process(target=func,args=(conn1,))
    p.start()
    
    # 接受消息
    print(conn2.recv())

例3、通過條件判斷關閉管道進程

 

from multiprocessing import Pipe
from multiprocessing import Process


def func(conn2):
    while 1:
        # 接收端
        ret = conn2.recv()
        print(ret)

        # 通過判斷條件來關閉進程
        if ret is None:
            break


if __name__ == "__main__":
    # 接受2個端口
    conn1, conn2 = Pipe()

    p = Process(target=func, args=(conn2,))
    p.start()
    for i in range(20):
        # 發送端
        conn1.send('吃了嗎?')
    conn1.send(None)

 

例4、使用close關閉進程的方法

當最后一個端口沒關的時候就會報錯。我們捕獲錯誤信息進行操作就可以了。

from multiprocessing import Pipe
from multiprocessing import Process


def func(conn2, conn1):
    # 發送端關閉
    conn1.close()

    while 1:

        # 管道只有一端沒有關閉就會報錯異常 我們又不能讓它每一次循環都關閉 只能try一下。
        try:

            # 接受端 接受消息
            ret = conn2.recv()
            print(ret)

        except EOFError:

            # 接受端關閉
            conn2.close()
            break


if __name__ == "__main__":
    # 接受2個端口
    conn1, conn2 = Pipe()

    p = Process(target=func, args=(conn2, conn1))
    p.start()

    # 主進程接受端關閉
    conn2.close()

    for i in range(20):
        # 發送端
        conn1.send('吃了嗎?')

    # 主進程 發送端關閉
    conn1.close()

 

 

 

基於 管道的生產者消費者模型

from multiprocessing import Pipe
from multiprocessing import Process
from multiprocessing import Lock
import time
import random


def func(name, food, conn1, conn2):
    # 關閉接受端
    conn2.close()

    for i in range(20):
        ret = "\033[31m %s制作了%s個%s\033[0" % (name, i, food)
        print(ret)

        # 發送數據到管道
        conn1.send(ret)

        time.sleep(random.randint(1, 2))

    # 關閉發送端
    conn1.close()


def chi(name, conn1, conn2, lock):
    # 把發送端關閉
    conn1.close()

    while 1:
        try:
            #加鎖
            lock.acquire()
            ret1 = conn2.recv()
            print("\033[32m %s吃了%s \033[0m" % (name, ret1))
            #加鎖
            lock.release()

        except EOFError:
            conn2.close()
            break


if __name__ == "__main__":
    # 接受2個端口
    conn1, conn2 = Pipe()

    # 創建鎖
    lock = Lock()

    p = Process(target=func, args=('Mark', '包子', conn1, conn2,))
    p.start()

    p2 = Process(target=chi, args=('黃埔', conn1, conn2, lock,))
    p2.start()

    # 記住一定要 關閉主進程
    conn1.close()
    conn2.close()

 

進程之間的數據共享Manager

例1、

from multiprocessing import Manager, Process


def main(dic):
    dic['count'] -=1
    print(dic)


if __name__ == "__main__":
    # 雖然進程間數據獨立,但可以通過Manager實現數據共享,事實上Manager的功能遠不止於此
    m = Manager()

    # 放入一個字典
    dic = m.dict({"count": 100})

    p_lst = []

    p = Process(target=main,args=(dic,))
    p.start()

    # 感知一個子程序的結束
    p.join()
    print('主進程',dic)

 

例2、會出現數據不安全

from multiprocessing import Manager, Process


def main(dic):
    dic['count'] -= 1
    print('子進程', dic)


if __name__ == "__main__":
    # 雖然進程間數據獨立,但可以通過Manager實現數據共享,事實上Manager的功能遠不止於此
    m = Manager()

    # 放入一個字典
    dic = m.dict({"count": 100})
for i in range(20):
        p = Process(target=main, args=(dic,))
        p.start()

        p.join()
    print('測試', dic)

 

數據不安全性可能會出現一個進程同時用一個數據

 

 

 

解決數據不安全問題(加鎖)

from multiprocessing import Manager
from multiprocessing import Process
from multiprocessing import Lock

def main(dic,lock):

    # 加鎖
 lock.acquire()
    dic['count'] -= 1
    print('子進程', dic)
    lock.release() if __name__ == "__main__":
    # 雖然進程間數據獨立,但可以通過Manager實現數據共享,事實上Manager的功能遠不止於此
    m = Manager()

    lock = Lock()

    # 放入一個字典
    dic = m.dict({"count": 100})


    for i in range(20):
        p = Process(target=main, args=(dic,lock))
        p.start()


        p.join()
    print('測試', dic)

 

 

進程池Pool

為什么要有進程池?進程池的概念。

在程序實際處理問題過程中,忙時會有成千上萬的任務需要被執行,閑時可能只有零星任務。

那么在成千上萬個任務需要被執行的時候,我們就需要去創建成千上萬個進程么?首先,創建進程需要消耗時間,銷毀進程也需要消耗時間。

第二即便開啟了成千上萬的進程,操作系統也不能讓他們同時執行,這樣反而會影響程序的效率。

因此我們不能無限制的根據任務開啟或者結束進程。那么我們要怎么做呢?

在這里,要給大家介紹一個進程池的概念,定義一個池子,在里面放上固定數量的進程,有需求來了,就拿一個池中的進程來處理任務,

等到處理完畢,進程並不關閉,而是將進程再放回進程池中繼續等待任務。如果有很多任務需要執行,池中的進程數量不夠,任務就要等待之前的進程執行任務完畢歸來,

拿到空閑進程才能繼續執行。也就是說,池中進程的數量是固定的,那么同一時間最多有固定數量的進程在運行。這樣不會增加操作系統的調度難度,還節省了開閉進程的時間,也一定程度上能夠實現並發效果。

 

# 效率。

# 每開啟進程,開啟屬於這個進程的內存空間。

# 寄存器 堆棧 文件。

進程池的數量是 cpu個數+1

# 進程過多 操作系統的調度。

# 更高級的進程: 在忙的時候可以 20+ ,在不忙的時候自動降為3個左右。

 

 例1、起一個進程池

from multiprocessing import Pool
from multiprocessing import Process


def func(n):
    for i in range(10):
        print(n + 1)


if __name__ == "__main__":
    # 開啟了5個進程
    pool = Pool(5)

 

 

進程池的同步調用(apply):(一般不用)

 對比:

1. 正常情況下先執行5個start 后執行5個end

from multiprocessing import Pool
from multiprocessing import Process
import time
import os

def func(n):

    print("子進程開始: %s"%n, os.getpid())

    time.sleep(1)

    print("子進程結束: %s" % n, os.getpid())


if __name__ == "__main__":

    # 開啟了5個進程
    pool = Pool(5)

    for i in range(10):

        # 正常情況下先執行5個start 后執行5個end
        p = Process(target=func,args=(i,))
        p.start()
子進程開始: 1 13304
子進程開始: 3 13276
子進程開始: 2 12304
子進程開始: 4 12384
子進程開始: 5 12380
子進程開始: 9 6200
子進程開始: 0 13288
子進程開始: 7 4288
子進程開始: 8 13244
子進程開始: 6 7688
子進程結束: 1 13304
子進程結束: 3 13276
子進程結束: 2 12304
子進程結束: 4 12384
子進程結束: 5 12380
子進程結束: 9 6200
子進程結束: 0 13288
子進程結束: 7 4288
子進程結束: 8 13244
子進程結束: 6 7688

 

2.使用apply(# 在使用了 apply后start和end變成了同步

 

from multiprocessing import Pool
from multiprocessing import Process
import time
import os

def func(n):

    print("子進程開始: %s"%n, os.getpid())

    time.sleep(1)

    print("子進程結束: %s" % n, os.getpid())
  return n

if __name__ == "__main__":

    # 開啟了5個進程(進程池中的進程 永遠都是活着的)
    pool = Pool(5)

    for i in range(10):

        # 正常情況下先執行5個start 后執行5個end
        # p = Process(target=func,args=(i,))

        # 在使用了 apply后start和end變成了同步
        p = pool.apply(func,args=(i,))

        # 獲取返回值
        print(p)

 

 

 

 

進程池的異步調用(用的比較多)

# 異步的apply_async用法:
主進程需要使用 jion,
等待進程池內任務都處理完,然后可以用get收集結果

否則, 主進程結束,進程池可能還沒來得及執行,也就跟着結束了.

 返回值:為了能使用返回值需要使用 obj.get()方法

# 使用get來獲取apply_aync 的結果,如果是apply,則沒有get方法
# 因為apply是同步執行,立刻獲取結果,也根本無需get

 

 

from multiprocessing import Pool
from multiprocessing import Process
import time
import os

def func(n):

    print("子進程開始: %s"%n, os.getpid())

    time.sleep(10)

    print("子進程結束: %s" % n, os.getpid())

    return n*10

if __name__ == "__main__":

    # 開啟了5個進程(進程池中的進程 永遠都是活着的)
    pool = Pool(5)
    lis = []

    for i in range(10):
        """
        # 異步的apply_async用法:
        如果使用異步提交的任務
        主進程需要使用 jion,等待進程池內任務都處理完,然后可以用get收集結果
        否則, 主進程結束,進程池可能還沒來得及執行,也就跟着結束了
        """

        ret = pool.apply_async(func,args=(i,))
        lis.append(ret)

    # 使用get來獲取apply_aync 的結果,如果是apply,則沒有get方法
    # 因為apply是同步執行,立刻獲取結果,也根本無需get
    for li in lis:
        print(li.get())


    # 結束進程池接受任務
    pool.close()

    # 感知進程池中的任務執行結束
    pool.join()

 

進程池的socket

 

 

 

 

 

 

 

 進程池中的回調函數

 

 

 

 

 

回調函數與爬蟲的應用

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM