Python3多進程與多線程區別及使用(1.進程)


進程和線程
是什么:
進程是指在系統中正在運行的一個應用程序;程序一旦運行就是進程,或者更專業化來說:進程是指程序執
行時的一個實例。
線程是進程的一個實體。
進程——資源分配的最小單位,線程——程序執行的最小單位。
線程進程的區別體現在幾個方面:
第一:因為進程擁有獨立的堆棧空間和數據段,所以每當啟動一個新的進程必須分配給它獨立的地址空間,建立眾多的數據表來維護它的代碼段、堆棧段和數據段,這對於多進程來說十分“奢侈”,系統開銷比較大,而線程不一樣,線程擁有獨立的堆棧空間,但是共享數據段,它們彼此之間使用相同的地址空間,共享大部分數據,比進程更節儉,開銷比較小,切換速度也比進程快,效率高,但是正由於進程之間獨立的特點,使得進程安全性比較高,也因為進程有獨立的地址空間,一個進程崩潰后,在保護模式下不會對其它進程產生影響,而線程只是一個進程中的不同執行路徑。一個線程死掉就等於整個進程死掉。
第二:體現在通信機制上面,正因為進程之間互不干擾,相互獨立,進程的通信機制相對很復雜,譬如管道,信號,消息隊列,共享內存,套接字等通信機制,而線程由於共享數據段所以通信機制很方便。。
3.屬於同一個進程的所有線程共享該進程的所有資源,包括文件描述符。而不同過的進程相互獨立。
4.線程又稱為輕量級進程,進程有進程控制塊,線程有線程控制塊;
5.線程必定也只能屬於一個進程,而進程可以擁有多個線程而且至少擁有一個線程;
第四:體現在程序結構上,舉一個簡明易懂的列子:當我們使用進程的時候,我們不自主的使用if else嵌套來判斷pid,使得程序結構繁瑣,但是當我們使用線程的時候,基本上可以甩掉它,當然程序內部執行功能單元需要使用的時候還是要使用,所以線程對程序結構的改善有很大幫助。
進程與線程的選擇取決以下幾點:
1、需要頻繁創建銷毀的優先使用線程;因為對進程來說創建和銷毀一個進程代價是很大的。
2、線程的切換速度快,所以在需要大量計算,切換頻繁時用線程,還有耗時的操作使用線程可提高應用程
序的響應
3、因為對CPU系統的效率使用上線程更占優,所以可能要發展到多機分布的用進程,多核分布用線程;
4、並行操作時使用線程,如C/S架構的服務器端並發線程響應用戶的請求;
5、需要更穩定安全時,適合選擇進程;需要速度時,選擇線程更好。
 
 
多進程
模塊:Multiprocessing
(原文有些許錯誤,本文筆記處已修改,未筆記處自行留意)
 
簡述:
Process類
用來描述一個進程對象
star()方法啟動進程
join()方法實現進程間的同步,等待所有進程退出
close()方法用來阻止多余的進程涌入進程池Pool造成進程阻塞
 
1 multiprocessing.Process(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)
target是函數名字
args是函數需要的參數, 以tuple的形式傳入
import multiprocessing
import os


def run_proc(name):
    print('Child process {0} {1} Running '.format(name, os.getpid()))


if __name__ == '__main__':
    print('Parent process {0} is Running'.format(os.getpid()))
    for i in range(5):
        p = multiprocessing.Process(target=run_proc, args=(str(i),))
        print('process start')
        p.start()
    p.join()
    print('Process close')



結果:
Parent process 5408 is Running
process start
process start
process start
process start
process start
Child process 0 1044 Running
Child process 1 1120 Running
Child process 3 10824 Running
Child process 2 9292 Running
Child process 4 10528 Running

 

Pool(進程池)
Pool 可以提供指定數量的進程供用戶使用,默認是 CPU 核數。當有新的請求提交到 Poll 的
時候,如果池子沒有滿,會創建一個進程來執行,否則就會讓該請求等待。
 
- Pool 對象調用 join 方法會等待所有的子進程執行完畢
- 調用 join 方法之前,必須調用 close
- 調用 close 之后就不能繼續添加新的 Process 了
pool=Pool(numprocess,initializer,initargs)
numproxess:需要創建的進程個數,如果忽略將使用cpu_count()的值。即系統上的CPU數量。
initializer:每個進程啟動時都要調用的對象。
initargs:為initalizer傳遞的參數。

 

apply_async(要調用的方法,參數列表,關鍵字參數列表):使用非阻塞方式調用指定方法,並行執行(同時執行)

apply(要調用的方法,參數列表,關鍵字參數列表):使用阻塞方式調用指定方法,,阻塞方式就是要等上一個進程退出后,下一個進程才開始運行。

close():關閉進程池,不再接受進的進程請求,但已經接受的進程還是會繼續執行。

terminate():不管程任務是否完成,立即結束。

join():主進程堵塞(就是不執行join下面的語句),直到子進程結束,注意,該方法必須在close或terminate之后使用。

pool.map(func,iterable,chunksize):將可調用對象func應用給iterable的每一項,然后以列表形式返回結果,
通過將iterable划分為多塊,並分配給工作進程,可以並行執行。chunksize指定每塊中的項數,
如果數據量較大,可以增大chunksize的值來提升性能。

pool.map_async(func,iterable,chunksize,callback):與map方法不同之處是返回結果是異步的,
如果callback指定,當結果可用時,結果會調用callback。

pool.imap(func,iterable,chunksize):與map()方法的不同之處是返回迭代器而非列表。

pool.imap_unordered(func,iterable,chunksize):與imap()不同之處是:結果的順序是根據從工作進程接收到的時間而定的。

pool.get(timeout):如果沒有設置timeout,將會一直等待結果,
如果設置了timeout,超過timeout將引發multiprocessing.TimeoutError異常。

pool.ready():如果調用完成,返回True

pool.successful():如果調用完成並且沒有引發異常,返回True,如果在結果就緒之前調用,將引發AssertionError異常。

pool.wait(timeout):等待結果變為可用,timeout為等待時間。

 

pool.apply_async
apply_async方法用來同步執行進程,允許多個進程同時進入池子。
import multiprocessing
import os
import time
def run_task(name):
  print('Task {0} pid {1} is running, parent id is {2}'.format(name, os.getpid(), os.getppid()))
  time.sleep(1)
  print('Task {0} end.'.format(name))
if __name__ == '__main__':
  print('current process {0}'.format(os.getpid()))
  #設定池內進程數
  p = multiprocessing.Pool(processes=3)
  for i in range(6):
    p.apply_async(run_task, args=(i,))
  print('Waiting for all subprocesses done...')
  p.close()
  p.join()
  print('All processes done!') 
結果
current process 562
Task 0 pid 778 is running, parent id is 562
Task 1 pid 779 is running, parent id is 562
Task 2 pid 780 is running, parent id is 562
Waiting for all subprocesses done...
Task 1 end.
Task 0 end.
Task 2 end.
Task 3 pid 779 is running, parent id is 562
Task 4 pid 778 is running, parent id is 562
Task 5 pid 780 is running, parent id is 562
Task 4 end.
Task 5 end.
Task 3 end.
All processes done!
 
pool.apply
apply(func[,args[,kwds]]))
該方法只能允許一個進程進入池子,在一個進程結束后,另一個進程才可以進入池子
import multiprocessing
import os
import time
def run_task(name):
  print('Task {0} pid {1} is running, parent id is {2}'.format(name, os.getpid(), os.getppid()))
  time.sleep(1)
  print('Task {0} end.'.format(name))
if __name__ == '__main__':
  print('current process {0}'.format(os.getpid()))
  p = multiprocessing.Pool(processes=3)
  for i in range(6):
    p.apply(run_task, args=(i,))
  print('Waiting for all subprocesses done...')
  p.close()
  p.join()
  print('All processes done!')
結果:
current process 562
Task 0 pid 785 is running, parent id is 562
Task 0 end.
Task 1 pid 786 is running, parent id is 562
Task 1 end.
Task 2 pid 787 is running, parent id is 562
Task 2 end.
Task 3 pid 785 is running, parent id is 562
Task 3 end.
Task 4 pid 786 is running, parent id is 562
Task 4 end.
Task 5 pid 787 is running, parent id is 562
Task 5 end.
Waiting for all subprocesses done...
All processes done!

 

Queue進程間通信
Queue 用來在多個進程間通信。Queue 有兩個方法,get 和 put。
 
put 方法
Put 方法用來插入數據到隊列中,有兩個可選參數,blocked 和 timeout。
- blocked = True(默認值),timeout 為正
該方法會阻塞 timeout 指定的時間,直到該隊列有剩余空間。如果超時,拋出 Queue.Full 異常。
blocked = False
如果 Queue 已滿,立刻拋出 Queue.Full 異常
get 方法
get 方法用來從隊列中讀取並刪除一個元素。有兩個參數可選,blocked 和 timeout
- blocked = True(默認),timeout 正值
等待時間內,沒有取到任何元素,會拋出 Queue.Empty 異常。
blocked = False
Queue 有一個值可用,立刻返回改值;Queue 沒有任何元素,會拋出 Queue.Empty 異常。
from multiprocessing import Process, Queue
import os, time, random
# 寫數據進程執行的代碼:
def proc_write(q,urls):
  print('Process(%s) is writing...' % os.getpid())
  for url in urls:
    q.put(url)
    print('Put %s to queue...' % url)
    time.sleep(random.random())
# 讀數據進程執行的代碼:
def proc_read(q):
  print('Process(%s) is reading...' % os.getpid())
  while True:
    url = q.get(True)
    print('Get %s from queue.' % url)
if __name__=='__main__':
  # 父進程創建Queue,並傳給各個子進程:
  q = Queue()
  proc_writer1 = Process(target=proc_write, args=(q,['url_1', 'url_2', 'url_3']))
  proc_writer2 = Process(target=proc_write, args=(q,['url_4','url_5','url_6']))
  proc_reader = Process(target=proc_read, args=(q,))
  # 啟動子進程proc_writer,寫入:
  proc_writer1.start()
  proc_writer2.start()
  # 啟動子進程proc_reader,讀取:
  proc_reader.start()
  # 等待proc_writer結束:
  proc_writer1.join()
  proc_writer2.join()
  # proc_reader進程里是死循環,無法等待其結束,只能強行終止:
  proc_reader.terminate()

 

Pipe 進程間通信
常用來在兩個進程間通信,兩個進程分別位於管道的兩端。
multiprocessing.Pipe([duplex])
 兩個示例
from multiprocessing import Process, Pipe
def send(pipe):
  pipe.send(['spam',42, 'egg'])  # send 傳輸一個列表
  pipe.close()
if __name__ == '__main__':
  (con1, con2) = Pipe()              # 創建兩個 Pipe 實例
  sender = Process(target=send, args=(con1, ))   # 函數的參數,args 一定是實例化之后的 Pip 變量,不能直接寫 args=(Pip(),)
  sender.start()                  # Process 類啟動進程
  print("con2 got: %s" % con2.recv())       # 管道的另一端 con2 從send收到消息
  con2.close()   

結果

con2 got: ['spam', 42, 'egg']

 

from multiprocessing import Process, Pipe


def talk(pipe):
    pipe.send(dict(name='Bob', spam=42))  # 傳輸一個字典
    reply = pipe.recv()  # 接收傳輸的數據
    print('talker got:', reply)


if __name__ == '__main__':
    (parentEnd, childEnd) = Pipe()  # 創建兩個 Pipe() 實例,也可以改成 conf1, conf2
    child = Process(target=talk, args=(childEnd,))  # 創建一個 Process 進程,名稱為 child
    child.start()  # 啟動進程
    print('parent got:', parentEnd.recv())  # parentEnd 是一個 Pip() 管道,可以接收 child Process 進程傳輸的數據
    parentEnd.send({x * 2 for x in 'spam'})  # parentEnd 是一個 Pip() 管道,可以使用 send 方法來傳輸數據
    child.join()  # 傳輸的數據被 talk 函數內的 pip 管道接收,並賦值給 reply
    print('parent exit')

結果:

parent got: {'name': 'Bob', 'spam': 42}
talker got: {'pp', 'mm', 'aa', 'ss'}
parent exit

 

更多共享數據方式:
multiprocessin:Array,Value (shared memory)  和 Manager(Server process manager)
 
Server process manager比 shared memory 更靈活,因為它可以支持任意的對象類型。另外,一個單獨的manager可以通過進程在網絡上不同的計算機之間共享,不過他比shared memory要慢。
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM