python進程、多進程


進程:

進程(Process)是計算機中的程序關於某數據集合上的一次運行活動,是系統進行資源分配和調度的基本單位,是操作系統結構的基礎。在早期面向進程設計的計算機結構中,進程是程序的基本執行實體;在當代面向線程設計的計算機結構中,進程是線程的容器。程序是指令、數據及其組織形式的描述,進程是程序的實體。。

狹義定義:進程是正在運行的程序的實例(an instance of a computer program that is being executed)。

廣義定義:進程是一個具有一定獨立功能的程序關於某個數據集合的一次運行活動。它是操作系統動態執行的基本單元,在傳統的操作系統中,進程既是基本的分配單元,也是基本的執行單元。

進程是一個內核級的實體,進程結構的所有成分都在內核空間中,一個用戶程序不能直接訪問這些數據。

進程狀態:

 

創建、就緒、運行、阻塞、結束

進程的概念主要有兩點:

第一,進程是一個實體。每一個進程都有它自己的地址空間,一般情況下,包括文本區域(text region)、數據區域(data region)和堆棧(stack region)。文本區域存儲處理器執行的代碼;數據區域存儲變量和進程執行期間使用的動態分配的內存;堆棧區域存儲着活動過程調用的指令和本地變量。

第二,進程是一個“執行中的程序”。程序是一個沒有生命的實體,只有處理器賦予程序生命時,它才能成為一個活動的實體,我們稱其為進程。

進程是操作系統中最基本、重要的概念。是多任務系統出現后,為了刻畫系統內部出現的動態情況,描述系統內部各程序的活動規律引進的一個概念,所有多任務設計操作系統都建立在進程的基礎上。

進程是操作系統中分配資源的最小單位

線程是操作系統調度的最小單

 

線程屬於進程,一般一個進程會 有一個主線

 

程序+數據+運行是進

 

簡單理解就是一個程

 

Ps –ef

 

進程的特征

 

Ø動態性:進程的實質是程序在多任務系統中的一次執行過程,進程是動態產生,動態消亡的。

Ø並發性:任何進程都可以同其他進程一起並發執行

Ø獨立性:進程是一個能獨立運行的基本單位,同時也是系統分配資源和調度的獨立單位;

Ø異步性:由於進程間的相互制約,使進程具有執行的間斷性,即進程按各自獨立的、不可預知的速度執行

Ø結構特征:進程由程序、數據和進程控制塊三部分組成。

Ø多個不同的進程可以包含相同的程序:一個程序在不同的數據集里就構成不同的進程,能得到不同的結果;但是執行過程中,程序不能發生改變。

 

 

 

進程的狀態

就緒:可被執行的狀態,需要等CPU執

運行

組賽:等待數據或資源時,I/O

結束

CPU:寄存器(存儲),控制器(控制)

Cpu緩存

https://baike.baidu.com/item/CPU緩存/3728308?fr=aladdin

 

 

程序本身,數據,堆

 

單核CPU,運行時用時間片輪

 

cpu時用多線程好還是單線程好?

如果是純計算密集型的,開一個線程要快,因為CPU在同一時刻只能運行一個線程,如果程序沒有I/O操作,那一個線程塊,保證CPU告訴的執行這個線程,此時開多個線程,cpu要做切換,切換本身產生延

 

如果是大量i/o時,需要等待時,cpu不會等,如果只跑一個線程,cpu會等,如果多線程,一個需要等時,cpu會執行其他的線程,這樣cpu利用率要高

如果有大量I/O,多線程好,會提高cpu利用

 

進程特點

動態性

並發

獨立性:QQ和word進程是獨立的,沒有關

異步性:各自在執行當中,相互並行執

 

 

 

同步(順序運行)和異步(並行執行)

同步時:業務有1,2,3,4嚴格的順序,一步一步的執行,就是同

異步:業務沒有嚴格的執行的順序,注冊搜狐郵箱后,系統會把賬號同步給其他模塊賬戶的過程就是異

一般互聯網中業務都是異步

同步需要耗

 

 

結構:程序、數據、進程控制塊PCB(存進程相關的信息,id,副id,狀態等等)

不同的進程可以執行相同的程序,不同賬號登

 

CPU組成:運算器、控制器、寄存器

主頻3.5G,發展較

 

摩爾定律:沒18個月cpu速度翻一倍,現在失效

 

智能時代:吳

 

刻意練習

 

 

進程切換:

CPU時間片輪尋,

 

 

上下文:一個程序運行時的中間狀態和數據

比如跑到第5行,下次跑的時候,還是從第5行跑,這個狀態和數據就是上下文

 

 

進程運行狀態:

就緒狀態,運行狀態,阻塞狀

 

阻塞:I/O或進程同步,或網絡服務器發請求等待數據等,被阻

 

原語(原子操作):一段程序不能分割,操作系統的最小的一個指令集

 

loadnunner的事物是用來算時間差的,用來計時的,統計耗時

 

數據庫的事務要么都失敗,要么都成功

ram:random

room,read only

 

內存:隨機訪問存儲器RAM

內存更快,不能永久保存,磁盤慢,永久 保存數

 

互聯網口訣:分庫、分表、分布式計算、分布式緩存、異步技術

 

互聯網系統架構:

服務器端和客戶端

數據庫有沒有集群,怎么搭的

寫的多還是讀的多,主是寫,從是讀,主的少,從的

mysql:主從結構,主的是寫,從的是

 

掛起(等待、組賽):等待I/O、其他進程的結果(進程間同步)

同步、異步、阻塞、非阻塞

同步:死

異步:不

阻塞:等待一個條件的發

非阻塞:不需要等待條

 

 

進程的五態模型:

 

 

 

 

進程狀態轉換

 

 

活躍就緒:是指進程在主存並且可被調度的狀態。

靜止就緒(掛起就緒):是指進程被對換到輔存時的就緒狀態,是不能被直接調度的狀態,只有當主存中沒有活躍就緒態進程,或者是掛起就緒態進程具有更高的優先級,系統將把掛起就緒態進程調回主存並轉換為活躍就緒。

活躍阻塞:是指進程已在主存,一旦等待的事件產生便進入活躍就緒狀態。

靜止阻塞:是指進程對換到輔存時的阻塞狀態,一旦等待的事件產生便進入靜止就緒狀態。

 

linux:swap,內存不夠用時寫入swap(硬盤

 

pcb存貯進程的信息

程序計數器,program counter,記錄運行到哪一行

 

進程的創建過程:

一旦操作系統發現了要求創建新進程的事件后,便調用進程創建原語Creat()按下述步驟創建一個新進程。

1)申請空白PCB(進程控制塊)。為新進程申請獲得唯一的數字標識符,並從PCB集合中索取一個空白PCB。

2)為新進程分配資源。為新進程的程序和數據以及用戶棧分配必要的內存空間。顯然,此時操作系統必須知道新進程所需要的內存大小。

3)初始化進程控制塊。PCB的初始化包括:

①初始化標識信息,將系統分配的標識符和父進程標識符,填入新的PCB中。

②初始化處理機狀態信息,使程序計數器指向程序的入口地址,使棧指針指向棧頂。

③初始化處理機控制信息,將進程的狀態設置為就緒狀態或靜止就緒狀態,對於優先級,通常是將它設置為最低優先級,除非用戶以顯式的方式提出高優先級要求。

4)將新進程插入就緒隊列,如果進程就緒隊列能夠接納新進程,便將新進程插入到就緒隊列中

 

進程終止:

 

引起進程終止的事件

1)正常結束

在任何計算機系統中,都應該有一個表示進程已經運行完成的指示。例如,在批處理系統中,通常在程序的最后安排一條Hold指令或終止的系統調用。當程序運行到Hold指令時,將產生一個中斷,去通知OS本進程已經完成。

2)異常結束

在進程運行期間,由於出現某些錯誤和故障而迫使進程終止。這類異常事件很多,常見的有:越界錯誤,保護錯,非法指令,特權指令錯,運行超時,等待超時,算術運算錯,I/O故障。

3)外界干預

外界干預並非指在本進程運行中出現了異常事件,而是指進程應外界的請求而終止運行。這些干預有:操作員或操作系統干預,父進程請求,父進程終止。

 

 

如果系統發生了上述要求終止進程的某事件后,OS便調用進程終止原語,按下述過程去終止指定的進程。

1)根據被終止進程的標識符,從PCB集合中檢索出該進程的PCB,從中讀出該進程狀態。

2)若被終止進程正處於執行狀態,應立即終止該進程的執行,並置調度標志為真。用於指示該進程被終止后應重新進行調度。

3)若該進程還有子孫進程,還應將其所有子孫進程予以終止,以防他們成為不可控的進程。

4)將被終止的進程所擁有的全部資源,或者歸還給其父進程,或者歸還給系統。

5)將被終止進程(它的PCB)從所在隊列(或鏈表)中移出,等待其它程序來搜集

 

 

阻塞

1、請求系統服

2、啟動某種操

3、新數據尚未到

4、沒事兒

 

 

喚醒:

各種事兒已經准備完

 

 

調度算法:

先到顯

時間片輪

優先

 

 

Python 進程

 

multiprocessing較

進程間通信

ü 文件

ü 管道( Pipes Pipes )

ü Socket

ü 信號

ü 信號量

ü 共享內存

 

 

 

#!/user/bin/python

#encoding=utf-8

import os

print os.getpid()

 

pid = os.fork() # 創建一個子進

print "******",pid  #子進程id和0

if pid == 0:

  print 'I am child process (%s) and my parent is %s.' % (os.getpid(), os.getppid())

else:

  print 'I (%s) just created a child process (%s).' % (os.getpid(), pid)

 

[gstudent@iZ2zejbxp2btn9jh8knipuZ xxx]$ python test.py

610

****** 611

I (610) just created a child process (611).

****** 0

I am child process (611) and my parent is 610.

 

在linux中,執行fork函數之后

父進程拿到的返回的fork函數返回值是子進程的pid

子進程拿到的返回的fork函數返回值是0


父進程和子進程會分別執行后續未執行的代碼 ,子進程執行的時if,父進程執行的是else

 

 

fork() fork() 函數,它也屬於一個內建並 且只在 Linux 系統下存在。 它非常特殊普通的函數調用,一次返回但是 fork() fork() 調用一次,返回兩因為操 作系統自動把當前進程(稱為父)復制了一份(稱為子進程),然后分別在父進程和子內返回。

子進程永遠返回 0,而父進程 返回子的PID 。這樣做的理由是,一個父進程可 以fork() fork() 出很多子進程,所以父要 記下每個子進程的 ID ,而子進程只需要調 用getppid () 就可以拿到父進程的 ID ,子 進程只需要調用 os.getpid os.getpid () 函數可以獲取 自

 

創建進程

Multiprocessing

Multiprocessing模塊創建進程使用的是Process類。

Process類的構造方法:

help(multiprocessing.Process)

__init__(self, group=None, target=None, name=None, args=(), kwargs={})

參數說明:

group:進程所屬組,基本不用。

target:表示調用對象,一般為函數。

args:表示調用對象的位置參數元組。

name:進程別名。

kwargs:表示調用對象的字典。

創建子進程時,只需要傳入一個執行函數和函數的參數,創建一個Process實例,並用其start()方法啟動,這樣創建進程比fork()還要簡單。

join()方法表示等待子進程結束以后再繼續往下運行,通常用於進程間的同步。

注意:

Windows上要想使用進程模塊,就必須把有關進程的代碼寫在當前.py文件的if __name__ == ‘__main__’ :語句的下面,才能正常使用Windows下的進程模塊          Unix/Linux下則不需要。 

創建5個進程例子

 

#coding=utf-8

import multiprocessing

def do(n) :

  #獲取當前線程的名

  name = multiprocessing.current_process().name

  print name,'starting'

  print "worker ", n

  return

 

if __name__ == '__main__' :

  numList = []

  for i in xrange(5) :

   p = multiprocessing.Process(target=do, args=(i,))

   numList.append(p)

   p.start()

   p.join()#進程執行完畢后才繼續執

   print "Process end."

  print numList

 

 

 

c:\Python27\Scripts>python task_test.py

Process-1 starting

worker  0

Process end.

Process-2 starting

worker  1

Process end.

Process-3 starting

worker  2

Process end.

Process-4 starting

worker  3

Process end.

Process-5 starting

worker  4

Process end.

[<Process(Process-1, stopped)>, <Process(Process-2, stopped)>, <Process(Process-3, stopped)>, <Process(Process-4, stopped)>, <Process(Process-5, stopped)>]

 

 

 

 

把p.join()注釋

  p.start()  

   #p.join()

   print "Process end."

  print numList

 

c:\Python27\Scripts>python task_test.py

Process end.

Process end.

Process end.

Process end.

Process end.

[<Process(Process-1, started)>PP, rocess-1rocess-3< Process(Process-2, started)>s , tartings<

tartingProcess(Process-3, started)>P

wProcess-4 sworker rocess-2tarting

worker , orker  3

  < 0sProcess(Process-4, started)>2

tarting,

 

<wProcess(Process-5, started)>orker ]

1

Process-5 starting

worker  4

 

 

 

練習:三個進程,每個進程寫一個文件,每個文件中有進程的名稱和日期

 

#coding=utf-8

import multiprocessing

import time

 

def do(n) :

   fp=open(r"d:\\%s.txt"%n,'w')

   name=multiprocessing.current_process().name

   fp.write("%s %s"%(name,time.strftime("%Y-%m-%d %H:%M:%S")))

   fp.close()

   return

 

if __name__ == '__main__' :

  numList = []

  for i in xrange(3) :

   p = multiprocessing.Process(target=do, args=(i,))

   numList.append(p)

   p.start()

   p.join()

   print "Process end."

  print numList

 

 

Process-2 2018-03-31 14:55:34

 

 

 

Os.fork()multiprocessing結合使用

 

 

#!/usr/bin/python
# -*- coding: utf-8 -*-
from multiprocessing import Process 
import os 
import time 

def sleeper(name, seconds): 
    print "Process ID# %s" % (os.getpid()) 
    print "Parent Process ID# %s" % (os.getppid()) 
#
僅支持在linux,一個進程會有父進程和自己的IDwindows上就沒有父進程id
    print "%s will sleep for %s seconds" % (name, seconds) 
    time.sleep(seconds)
# if __name__ == "__main__": 
child_proc = Process(target = sleeper, args = ('bob', 5)) 
child_proc.start() 
print "in parent process after child process start" 
print "parent process about to join child process"
child_proc.join()
print "in parent process after child process join" 
print "the parent's parent process: %s" % (os.getppid())
沒運行成功

多進程模板程序

#coding=utf-8
import multiprocessing
import urllib2
import time

def func1(url) :
  response = urllib2.urlopen(url)
  html = response.read()
  print html[0:20]
  time.sleep(20)

def func2(url) :
  response = urllib2.urlopen(url)
  html = response.read()
  print html[0:20]
  time.sleep(20)


if __name__ == '__main__' :
    p1 = multiprocessing.Process(target=func1,args=("http://www.sogou.com",),name="gloryroad1")
    p2 = multiprocessing.Process(target=func2,args=("http://www.baidu.com",),name="gloryroad2")
    p1.start()
    p2.start()     
    p1.join()
    p2.join()
    time.sleep(10)
    print "done!"

 

 

c:\Python27\Scripts>python task_test.py

<!DOCTYPE html>

<!--

<!DOCTYPE html>

<ht

done! 

測試單進程和多進程程序執行的效率

 

#coding: utf-8

import multiprocessing

import time

def m1(x):

   time.sleep(0.01)

   return     x * x

 

if __name__ == '__main__':

   pool = multiprocessing.Pool(multiprocessing.cpu_count())

   i_list = range(1000)

   time1=time.time()

   pool.map(m1, i_list)

   time2=time.time()

   print 'time elapse:',time2-time1

 

   time1=time.time()

   map(m1, i_list)

   time2=time.time()

print 'time elapse:',time2-time1

 

 

c:\Python27\Scripts>python task_test.py

time elapse: 2.62400007248

time elapse: 10.2070000172

 

多線程只能用單核CPU

多進程好於多線程,因為多線程有同步

 

進程池

 在使用Python進行系統管理時,特別是同時操作多個文件目錄或者遠程控制多台主機,並行操作可以節約大量的時間。如果操作的對象數目不大時,還可以直接使用Process類動態的生成多個進程,十幾個還好,但是如果上百個甚至更多,那手動去限制進程數量就顯得特別的繁瑣,此時進程池就派上用場了。

Pool類可以提供指定數量的進程供用戶調用,當有新的請求提交到Pool中時,如果池還沒有滿,就會創建一個新的進程來執行請求。如果池滿,請求就會告知先等待,直到池中有進程結束,才會創建新的進程來執行這些請求。

pool類中方法

 

apply()

函數原型:apply(func[, args=()[, kwds={}]])

該函數用於傳遞不定參數,主進程會被阻塞直到函數執行結束(不建議使用,並且3.x以后不再使用)。

map()

函數原型:map(func, iterable[, chunksize=None])

Pool類中的map方法,與內置的map函數用法行為基本一致,它會使進程阻塞直到返回結果。

注意,雖然第二個參數是一個迭代器,但在實際使用中,必須在整個隊列都就緒后,程序才會運行子進程。

close()

關閉進程池(Pool),使其不再接受新的任務。

Close()要在join()之后使用

 pool.close()#關閉進程池,不再接受新的任務
 pool.join()#主進程阻塞等待子進程的退出

terminate()

立刻結束工作進程,不再處理未處理的任務。

join(),map(),close(),terminate(),apply()

 

join()

使主進程阻塞等待子進程的退出,join方法必須在closeterminate之后使用。

 

 

獲取CPU的核數

multiprocessing.cpu_count() #獲取cpu

創建進程池

#coding: utf-8

import multiprocessing

import os

import time

import random

def m1(x):

   time.sleep(random.random()*4)

   print "pid:",os.getpid(),x*x

   return x * x

 

if __name__ == '__main__':

   pool = multiprocessing.Pool(multiprocessing.cpu_count())

   i_list = range(8)

   print pool.map(m1, i_list)

 

c:\Python27\Scripts>python task_test.py

pid: 5584 0

pid: 8764 4

pid: 11708 9

pid: 6380 1

pid: 6380 49

pid: 5584 16

pid: 8764 25

pid: 11708 36

[0, 1, 4, 9, 16, 25, 36, 49]        

 

 

pool.apply_async(f, [10]), result.get(timeout = 1)

async只創建一個進程,每次只能傳入一個參數,傳一個列表,會返回列表第一個元素返回得結果

 創建簡單的進程池

#encoding=utf-8

from multiprocessing import Pool

 

def f(x):

    return x * x

 

if __name__ == '__main__':

    pool = Pool(processes = 4)      # start 4 worker processes

    #下邊這行是異步,不等待,一執行就一直往下走

    result = pool.apply_async(f, [10]) # evaluate "f(10)" asynchronously

    print result.get(timeout = 1)

    print pool.map(f, range(10))   # prints "[0, 1, 4,..., 81]"

 

 

c:\Python27\Scripts>python task_test.py

100

[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

 

#encoding=utf-8

from multiprocessing import Pool

import time

 

def f(x):

   time.sleep(2)

   return x * x

 

if __name__ == '__main__':

   pool = Pool(processes = 4)      # start 4 worker processes

   result = pool.apply_async(f, [10]) # evaluate "f(10)" asynchronously

   print result.get(timeout = 1)

print pool.map(f, range(10))   # prints "[0, 1, 4,..., 81]"

 

c:\Python27\Scripts>python task_test.py

Traceback (most recent call last):

  File "task_test.py", line 12, in<     module>

    print result.get(timeout = 1)

  File "C:\Python27\lib\multiprocessing\pool.py", line 568, in get

    raise TimeoutError

multiprocessing.TimeoutError

 

 help(pool.apply_async)

 >>> help(pool.apply_async)

Help on method apply_async in module multiprocessing.pool:

 

apply_async(self, func, args=(), kwds={}, callback=None) method of multiprocessing.pool.Pool instance

    Asynchronous equivalent of `apply()` builtin

 

#encoding=utf-8

from multiprocessing import Pool

import time

 

def f(x):

    time.sleep(2)

    return x * x

 

if __name__ == '__main__':

    pool = Pool(processes = 4)      # start 4 worker processes

    result = pool.apply_async(f, [10])  # evaluate "f(10)" asynchronously

    print result.get()#這里沒有設置時間,那進程就會死

    print pool.map(f, range(10))   # prints "[0, 1, 4,..., 81]"

 

 

也可以創建多個很多進

if __name__ == '__main__':

    pool = Pool(processes = 40)      # start 4 worker processes

 

 

進程切換快,還是線程切換快,

切換線程時,是不需要切換上下文的,進程的上下文不需要

 

 

 

from multiprocessing import Pool

def f(*x):

    for i in x:

        return i * i

 

if __name__ == '__main__':

    pool = Pool(processes = 4)      # start 4 worker processes

    result = pool.apply_async(f,tuple(range(10)))  # evaluate "f(10)" asynchronously

    print result.get(timeout = 1)

    print pool.map(f, range(10))   # prints "[0, 1, 4,..., 81]"

 

 

c:\Python27\Scripts>python task_test.py

0

[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

 

多進

#encoding=utf-8

from multiprocessing import Pool

 

def f(x):

    return x * x

 

if __name__ == '__main__':

    pool = Pool(processes = 4)      # start 4 worker processes

    result1 = pool.apply_async(f, [10])

    result2 = pool.apply_async(f, [100])

    result3 = pool.apply_async(f, [1000])  # evaluate "f(10)" asynchronously

    print result1.get(timeout = 1)

    print result2.get(timeout = 1)

    print result3.get(timeout = 1)

    print pool.map(f, range(10))   # prints "[0, 1, 4,..., 81]"

c:\Python27\Scripts>python task_test.py

100

10000

1000000

[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

 

修改

 

#encoding=utf-8

from multiprocessing import Pool

import time

def f(x):

    print "hello"

    time.sleep(2)

    print "over"

    return x * x

 

 

if __name__ == '__main__':

    pool = Pool(processes = 4)      # start 4 worker processes

    result1 = pool.apply_async(f, [10])

    result2 = pool.apply_async(f, [100])

   result3 = pool.apply_async(f, [1000])  # evaluate "f(10)" asynchronously

    print result1.get(timeout = 3)

    print result2.get(timeout = 3)

    print result3.get(timeout = 3)

    print pool.map(f, range(10))   # prints "[0, 1, 4,..., 81]"

 

c:\Python27\Scripts>python task_test.py

helloh

ello

hello

over

over

over

100

10000

1000000

hhhhelloelloelloello

 

 

 

ooverver

 

hhooelloelloverver

 

 

 

hhelloello

 

over

hoellover

 

oohververello

 

 

over

over

[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

 

注掉最后一行

#encoding=utf-8

from multiprocessing import Pool

import time

def f(x):

    print "hello"

    time.sleep(2)

    print "over"

    return x * x

 

 

if __name__ == '__main__':

    pool = Pool(processes = 4)      # start 4 worker processes

    result1 = pool.apply_async(f, [10])

    result2 = pool.apply_async(f, [100])

    result3 = pool.apply_async(f, [1000])  # evaluate "f(10)" asynchronously

    print result1.get(timeout = 3)

    print result2.get(timeout = 3)

    print result3.get(timeout = 3)

    #print pool.map(f, range(10))   # prints "[0, 1, 4,..., 81]"

c:\Python27\Scripts>python task_test.py

hello

hello

hello

over

over

over

100

10000

1000000

 

 

 

修改為同步

#encoding=utf-8

from multiprocessing import Pool

import time

def f(x):

    print "hello"

    time.sleep(2)

    print "over"

    return x * x

 

 

if __name__ == '__main__':

    f(10)

    f(109)

f(1000)

 

 

c:\Python27\Scripts>python task_test.py

hello

over

hello

over

hello

over

 

 

 

單進程和多進程時間比較

#encoding=utf-8
import time
from multiprocessing import Pool
def run(fn):
  #fn:
函數參數是數據列表的一個元素
  time.sleep(1)
  return fn * fn

if __name__ == "__main__":
  testFL = [1,2,3,4,5,6] 
  print 'Single process execution sequence:' #
順序執行(也就是串行執行,單進程)
  s = time.time()
  for fn in testFL:
    run(fn)

  e1 = time.time()
  print u"
順序執行時間:", int(e1 - s)

  print 'concurrent:' #
創建多個進程,並行執行
  pool = Pool(5)  #創建擁有5個進程數量的進程池
  #testFL:要處理的數據列表,run:處理testFL列表中數據的函數
  rl =pool.map(run, testFL)
  pool.close()#
關閉進程池,不再接受新的任務
  pool.join()#主進程阻塞等待子進程的退出
  e2 = time.time()
  print u"
並行執行時間:", int(e2 - e1)
  print rl

c:\Python27\Scripts>python task_test.py

Single process execution sequence:

順序執行時間: 6

concurrent:

並行執行時間: 2

[1, 4, 9, 16, 25, 36]

 

上例是一個創建多個進程並發處理與順序執行處理同一數據,所用時間的差別。從結果可以看出,並發執行的時間明顯比順序執行要快很多,但是進程是要耗資源的,所以進程數也不能開太大。

程序中的r1表示全部進程執行結束后全局的返回結果集,run函數有返回值,所以一個進程對應一個返回結果,這個結果存在一個列表中,也就是一個結果堆中,實際上是用了隊列的原理,等待所有進程都執行完畢,就返回這個列表(列表的順序不定)。

Pool對象調用join()方法會等待所有子進程執行完畢,調用join()之前必須先調用close(),讓其不再接受新的Process

 

 

同步進程(消息隊列Queue&JoinableQueue

隊列,先進先

multiprocessing.Queue類似於queue.Queue,一般用來多個進程間交互信息。Queue是進程和線程安全的。它實現了queue.Queue的大部分方法,但task_done()join()沒有實現。

multiprocessing.JoinableQueuemultiprocessing.Queue的子類,增加了task_done()方法和join()方法。

task_done():用來告訴queue一個task完成。一般在調用get()時獲得一個task,在task結束后調用task_done()來通知Queue當前task完成。

join():阻塞直到queue中的所有的task都被處理(即task_done方法被調用)。

Join()是主程序等我這個進程執行完畢了,程序才往下走

 

 

#encoding=utf-8

from multiprocessing import Process, Queue 

 

def offer(queue): 

  # 入隊

  queue.put("Hello World") 

 

if __name__ == '__main__': 

  # 創建一個隊列實

  q = Queue()

  p = Process(target = offer, args = (q,)) 

  p.start()

  print q.get() # 出隊

  p.join()

 

 

c:\Python27\Scripts>python task_test.py

Hello World

 

 

不同的進程之間不能修改對方的變量

但進程里的線程可以,因為線程在同一個進程里,變量共

 

 

>>> import Queue
>>> myqueue = Queue.Queue(maxsize = 10)
>>> myqueue.put(1)
>>> myqueue.put(2)
>>> myqueue.put(3)
>>> myqueue.get()
1
>>> myqueue.get()
2
>>> myqueue.get()
3

 

 

 

>>>import Queue

>>>q=Queue.Queue(5)

>>>q

<Queue.Queue instance at 0x04EA1968>

>>>q.put(1)

>>>q.put(2)

>>>q.put(3)

>>>q.put(4)

>>>q.put(5)

>>>q.full()

True

>>>q.qsize()

5

>>>q.empty()

False

>>>q.get(2)

1

>>> 

 

 

>>>q.empty()

True

 

q.get(False,2)q.get(True,2)

True表示等2秒,如果隊列空就報

False表示不等,如果隊列空立即報

>>>q.get(False,2)

Traceback (most recent call last):

  File "<stdin>", line 1, in< module>

  File "C:\Python27\lib\Queue.py", line 165, in get

    raise Empty

Queue.Empty

 

>>>q.get(True,2)

Traceback (most recent call last):

  File "<stdin>", line 1, in< module>

  File "C:\Python27\lib\Queue.py", line 176, in get

    raise Empty

Queue.Empty

 

 

 

 

get(self, block=True, timeout=None)

 |     Remove and return an item from the queue.

 |

 |     If optional args 'block' is true and 'timeout' is None (the default),

 |     block if necessary until an item is available. If 'timeout' is

 |      a non-negative number, it blocks at most 'timeout' seconds and raises

 |     the Empty exception if no item was available within that time.

 |     Otherwise ('block' is false), return an item if one is immediately

 |     available, else raise the Empty exception ('timeout' is ignored

 |     in that case).

 

 

>>>try:

...     q.get(False)

... except Queue.Empty:

...     print "empty!"

...

empty!

>>> 

 

練習:兩個進程,一個往隊列里寫10個數,一個進程讀取隊列的10個數

 

#encoding=utf-8

from multiprocessing import Process, Queue 

 

def offer(queue): 

  # 入隊

    for i in range(10):

        queue.put("%s"%i) 

 

def getter(queue):

    for i in range(10):

        print queue.get(False,1)

 

if __name__ == '__main__': 

    # 創建一個隊列實

    q = Queue()

    p = Process(target = offer, args = (q,))

    p2= Process(target = getter, args = (q,))

    p.start()

    p2.start()

   

p.join()

p2.join()

 

 

 

c:\Python27\Scripts>python task_test.py

0

1

2

3

4

5

6

7

8

9

 

 

 

#encoding=utf-8
import time
from multiprocessing import Process, Queue 

def set_data(queue): 
  # 入隊列
  for i in range(10):
      time.sleep(2)
      queue.put("Hello World"+str(i)) 

def get_data(queue): 
  for i in range(10):
  # 入隊列
      time.sleep(1)
      print queue.get("Hello World") 



if __name__ == '__main__': 
  # 創建一個隊列實例
  q = Queue()
  p1 = Process(target = set_data, args = (q,)) 
  p2 = Process(target = get_data, args = (q,))
  p1.start() 
  p2.start()
 
  p1.join()
  p2.join()
  print u"隊列是否為空?",q.empty()

 

c:\Python27\Scripts>python task_test.py

Hello World0

Hello World1

Hello World2

Hello World3

Hello World4

Hello World5

Hello World6

Hello World7

Hello World8

Hello World9

隊列是否為空? True

 

這就是兩個進程通過隊列進行信息同

 

下面的程序需要看下結果

#encoding=utf-8

from multiprocessing import Process, Queue 

import time

def offer(queue): 

  # 入隊

    for i in range(10):

        time.sleep(2)

        queue.put("%s"%i) 

 

 

def getter(queue):

    for i in range(10):

        time.sleep(1)

        print queue.get(False,1)

        print "queue.qsize():",queue.qsize()

 

if __name__ == '__main__': 

    # 創建一個隊列實

    q = Queue()

    p = Process(target = offer, args = (q,))

    p2= Process(target = getter, args = (q,))

    p.start()

    p2.start()

   

    p.join()

p2.join()

 

會報錯,因為2秒寫一次,1就讀,會讀空,所以報錯

c:\Python27\Scripts>python task_test.py

Process Process-2:

Traceback (most recent call last):

  File "C:\Python27\lib\multiprocessing\process.py", line 267, in _bootstrap

    self.run()

  File "C:\Python27\lib\multiprocessing\process.py", line 114, in run

    self._target(*self._args, **self._kwargs)

  File "c:\Python27\Scripts\task_test.py", line 12, in getter

    print queue.get(False,timeout=1)

  File "C:\Python27\lib\multiprocessing\queues.py", line 134, in get

    raise Empty

Empty

進程同步(使用queue

 

#encoding=utf-8

from multiprocessing import Process, Queue

import os, time, random

 

# 寫數據進程執行的代碼:

def write(q):

  for value in ['A', 'B', 'C']:

    print 'Put %s to queue...' % value

    q.put(value)

    time.sleep(random.random())

 

# 讀數據進程執行的代

def read(q):

  time.sleep(1)

  while not q.empty():

    # if not q.empty():

    print 'Get %s from queue.' % q.get(True)

    time.sleep(1) # 目的是等待寫隊列完

 

if __name__=='__main__':

    # 父進程創建Queue,並傳給各個子進

    q = Queue()

    pw = Process(target = write, args = (q,))

    pr = Process(target = read, args = (q,))

    # 啟動子進程pw,寫入:

    pw.start()

    # 啟動子進程pr,讀取:

    pr.start()

    # 等待pw結束:

    pw.join()

    pr.join()

print "Done!"

 

 

c:\Python27\Scripts>python task_test.py

Put A to queue...

Put B to queue...

Get A from queue.

Put C to queue...

Get B from queue.

Get C from queue.

Done!

 

由於操作系統對進程的調度時間不一樣,所以該程序每次執行的結果均不一樣。

程序讀隊列函數中為什么要加一句time.sleep(1),目的是等待些進程將數據寫到隊列中,防止有時寫進程還沒將數據寫進隊列,讀進程就開始讀了,導致讀不到數據。但是這種並不能有效的預防此種情況的出現。

 

下面是一片關於多進程的帖子,可以幫助理解一下

https://www.cnblogs.com/ManyQian/p/8930818.html

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM