進程:
進程(Process)是計算機中的程序關於某數據集合上的一次運行活動,是系統進行資源分配和調度的基本單位,是操作系統結構的基礎。在早期面向進程設計的計算機結構中,進程是程序的基本執行實體;在當代面向線程設計的計算機結構中,進程是線程的容器。程序是指令、數據及其組織形式的描述,進程是程序的實體。。
狹義定義:進程是正在運行的程序的實例(an instance of a computer program that is being executed)。
廣義定義:進程是一個具有一定獨立功能的程序關於某個數據集合的一次運行活動。它是操作系統動態執行的基本單元,在傳統的操作系統中,進程既是基本的分配單元,也是基本的執行單元。
進程是一個內核級的實體,進程結構的所有成分都在內核空間中,一個用戶程序不能直接訪問這些數據。
進程狀態:
創建、就緒、運行、阻塞、結束
進程的概念主要有兩點:
第一,進程是一個實體。每一個進程都有它自己的地址空間,一般情況下,包括文本區域(text region)、數據區域(data region)和堆棧(stack region)。文本區域存儲處理器執行的代碼;數據區域存儲變量和進程執行期間使用的動態分配的內存;堆棧區域存儲着活動過程調用的指令和本地變量。
第二,進程是一個“執行中的程序”。程序是一個沒有生命的實體,只有處理器賦予程序生命時,它才能成為一個活動的實體,我們稱其為進程。
進程是操作系統中最基本、重要的概念。是多任務系統出現后,為了刻畫系統內部出現的動態情況,描述系統內部各程序的活動規律引進的一個概念,所有多任務設計操作系統都建立在進程的基礎上。
進程是操作系統中分配資源的最小單位
線程是操作系統調度的最小單位
線程屬於進程,一般一個進程會 有一個主線程
程序+數據+運行是進程
簡單理解就是一個程序
Ps –ef
進程的特征
Ø動態性:進程的實質是程序在多任務系統中的一次執行過程,進程是動態產生,動態消亡的。
Ø並發性:任何進程都可以同其他進程一起並發執行
Ø獨立性:進程是一個能獨立運行的基本單位,同時也是系統分配資源和調度的獨立單位;
Ø異步性:由於進程間的相互制約,使進程具有執行的間斷性,即進程按各自獨立的、不可預知的速度執行
Ø結構特征:進程由程序、數據和進程控制塊三部分組成。
Ø多個不同的進程可以包含相同的程序:一個程序在不同的數據集里就構成不同的進程,能得到不同的結果;但是執行過程中,程序不能發生改變。
進程的狀態
創建
就緒:可被執行的狀態,需要等CPU執行
運行:
組賽:等待數據或資源時,I/O,
結束:
CPU:寄存器(存儲),控制器(控制),
Cpu緩存
https://baike.baidu.com/item/CPU緩存/3728308?fr=aladdin
程序本身,數據,堆棧
單核CPU,運行時用時間片輪尋
單cpu時用多線程好還是單線程好?
如果是純計算密集型的,開一個線程要快,因為CPU在同一時刻只能運行一個線程,如果程序沒有I/O操作,那一個線程塊,保證CPU告訴的執行這個線程,此時開多個線程,cpu要做切換,切換本身產生延遲
如果是大量i/o時,需要等待時,cpu不會等,如果只跑一個線程,cpu會等,如果多線程,一個需要等時,cpu會執行其他的線程,這樣cpu利用率要高,
如果有大量I/O,多線程好,會提高cpu利用率
進程特點
動態性
並發性
獨立性:QQ和word進程是獨立的,沒有關系
異步性:各自在執行當中,相互並行執行
同步(順序運行)和異步(並行執行)
同步時:業務有1,2,3,4嚴格的順序,一步一步的執行,就是同步
異步:業務沒有嚴格的執行的順序,注冊搜狐郵箱后,系統會把賬號同步給其他模塊賬戶的過程就是異步
一般互聯網中業務都是異步的
同步需要耗時
結構:程序、數據、進程控制塊PCB(存進程相關的信息,id,副id,狀態等等)
不同的進程可以執行相同的程序,不同賬號登陸
CPU組成:運算器、控制器、寄存器
主頻3.5G,發展較慢
摩爾定律:沒18個月cpu速度翻一倍,現在失效了
智能時代:吳軍
刻意練習:
進程切換:
CPU時間片輪尋,進
上下文:一個程序運行時的中間狀態和數據
比如跑到第5行,下次跑的時候,還是從第5行跑,這個狀態和數據就是上下文
進程運行狀態:
就緒狀態,運行狀態,阻塞狀態
阻塞:I/O或進程同步,或網絡服務器發請求等待數據等,被阻塞
原語(原子操作):一段程序不能分割,操作系統的最小的一個指令集
loadnunner的事物是用來算時間差的,用來計時的,統計耗時
數據庫的事務要么都失敗,要么都成功
ram:random
room,read only
內存:隨機訪問存儲器RAM
內存更快,不能永久保存,磁盤慢,永久 保存數據
互聯網口訣:分庫、分表、分布式計算、分布式緩存、異步技術
互聯網系統架構:
服務器端和客戶端
數據庫有沒有集群,怎么搭的?
寫的多還是讀的多,主是寫,從是讀,主的少,從的多
mysql:主從結構,主的是寫,從的是讀
掛起(等待、組賽):等待I/O、其他進程的結果(進程間同步)
同步、異步、阻塞、非阻塞
同步:死等
異步:不等
阻塞:等待一個條件的發生
非阻塞:不需要等待條件
進程的五態模型:
進程狀態轉換
活躍就緒:是指進程在主存並且可被調度的狀態。
靜止就緒(掛起就緒):是指進程被對換到輔存時的就緒狀態,是不能被直接調度的狀態,只有當主存中沒有活躍就緒態進程,或者是掛起就緒態進程具有更高的優先級,系統將把掛起就緒態進程調回主存並轉換為活躍就緒。
活躍阻塞:是指進程已在主存,一旦等待的事件產生便進入活躍就緒狀態。
靜止阻塞:是指進程對換到輔存時的阻塞狀態,一旦等待的事件產生便進入靜止就緒狀態。
linux:swap,內存不夠用時寫入swap(硬盤)
pcb存貯進程的信息
程序計數器,program counter,記錄運行到哪一行
進程的創建過程:
一旦操作系統發現了要求創建新進程的事件后,便調用進程創建原語Creat()按下述步驟創建一個新進程。
1)申請空白PCB(進程控制塊)。為新進程申請獲得唯一的數字標識符,並從PCB集合中索取一個空白PCB。
2)為新進程分配資源。為新進程的程序和數據以及用戶棧分配必要的內存空間。顯然,此時操作系統必須知道新進程所需要的內存大小。
3)初始化進程控制塊。PCB的初始化包括:
①初始化標識信息,將系統分配的標識符和父進程標識符,填入新的PCB中。
②初始化處理機狀態信息,使程序計數器指向程序的入口地址,使棧指針指向棧頂。
③初始化處理機控制信息,將進程的狀態設置為就緒狀態或靜止就緒狀態,對於優先級,通常是將它設置為最低優先級,除非用戶以顯式的方式提出高優先級要求。
4)將新進程插入就緒隊列,如果進程就緒隊列能夠接納新進程,便將新進程插入到就緒隊列中
進程終止:
引起進程終止的事件
1)正常結束
在任何計算機系統中,都應該有一個表示進程已經運行完成的指示。例如,在批處理系統中,通常在程序的最后安排一條Hold指令或終止的系統調用。當程序運行到Hold指令時,將產生一個中斷,去通知OS本進程已經完成。
2)異常結束
在進程運行期間,由於出現某些錯誤和故障而迫使進程終止。這類異常事件很多,常見的有:越界錯誤,保護錯,非法指令,特權指令錯,運行超時,等待超時,算術運算錯,I/O故障。
3)外界干預
外界干預並非指在本進程運行中出現了異常事件,而是指進程應外界的請求而終止運行。這些干預有:操作員或操作系統干預,父進程請求,父進程終止。
如果系統發生了上述要求終止進程的某事件后,OS便調用進程終止原語,按下述過程去終止指定的進程。
1)根據被終止進程的標識符,從PCB集合中檢索出該進程的PCB,從中讀出該進程狀態。
2)若被終止進程正處於執行狀態,應立即終止該進程的執行,並置調度標志為真。用於指示該進程被終止后應重新進行調度。
3)若該進程還有子孫進程,還應將其所有子孫進程予以終止,以防他們成為不可控的進程。
4)將被終止的進程所擁有的全部資源,或者歸還給其父進程,或者歸還給系統。
5)將被終止進程(它的PCB)從所在隊列(或鏈表)中移出,等待其它程序來搜集
阻塞
1、請求系統服務
2、啟動某種操作
3、新數據尚未到達
4、沒事兒干
喚醒:
各種事兒已經准備完成
調度算法:
先到顯出
時間片輪轉
優先級
Python 進程
multiprocessing較多
進程間通信
ü 文件
ü 管道( Pipes Pipes )
ü Socket
ü 信號
ü 信號量
ü 共享內存
#!/user/bin/python
#encoding=utf-8
import os
print os.getpid()
pid = os.fork() # 創建一個子進程
print "******",pid #子進程id和0
if pid == 0:
print 'I am child process (%s) and my parent is %s.' % (os.getpid(), os.getppid())
else:
print 'I (%s) just created a child process (%s).' % (os.getpid(), pid)
[gstudent@iZ2zejbxp2btn9jh8knipuZ xxx]$ python test.py
610
****** 611
I (610) just created a child process (611).
****** 0
I am child process (611) and my parent is 610.
在linux中,執行fork函數之后
父進程拿到的返回的fork函數返回值是子進程的pid
子進程拿到的返回的fork函數返回值是0
父進程和子進程會分別執行后續未執行的代碼 ,子進程執行的時if,父進程執行的是else
fork() fork() 函數,它也屬於一個內建並 且只在 Linux 系統下存在。 它非常特殊普通的函數調用,一次返回但是 fork() fork() 調用一次,返回兩因為操 作系統自動把當前進程(稱為父)復制了一份(稱為子進程),然后分別在父進程和子內返回。
子進程永遠返回 0,而父進程 返回子的PID 。這樣做的理由是,一個父進程可 以fork() fork() 出很多子進程,所以父要 記下每個子進程的 ID ,而子進程只需要調 用getppid () 就可以拿到父進程的 ID ,子 進程只需要調用 os.getpid os.getpid () 函數可以獲取 自
創建進程
Multiprocessing
Multiprocessing模塊創建進程使用的是Process類。
Process類的構造方法:
help(multiprocessing.Process)
__init__(self, group=None, target=None, name=None, args=(), kwargs={})
參數說明:
group:進程所屬組,基本不用。
target:表示調用對象,一般為函數。
args:表示調用對象的位置參數元組。
name:進程別名。
kwargs:表示調用對象的字典。
創建子進程時,只需要傳入一個執行函數和函數的參數,創建一個Process實例,並用其start()方法啟動,這樣創建進程比fork()還要簡單。
join()方法表示等待子進程結束以后再繼續往下運行,通常用於進程間的同步。
注意:
在Windows上要想使用進程模塊,就必須把有關進程的代碼寫在當前.py文件的if __name__ == ‘__main__’ :語句的下面,才能正常使用Windows下的進程模塊 。Unix/Linux下則不需要。
創建5個進程例子
#coding=utf-8
import multiprocessing
def do(n) :
#獲取當前線程的名字
name = multiprocessing.current_process().name
print name,'starting'
print "worker ", n
return
if __name__ == '__main__' :
numList = []
for i in xrange(5) :
p = multiprocessing.Process(target=do, args=(i,))
numList.append(p)
p.start()
p.join()#進程執行完畢后才繼續執行
print "Process end."
print numList
c:\Python27\Scripts>python task_test.py
Process-1 starting
worker 0
Process end.
Process-2 starting
worker 1
Process end.
Process-3 starting
worker 2
Process end.
Process-4 starting
worker 3
Process end.
Process-5 starting
worker 4
Process end.
[<Process(Process-1, stopped)>, <Process(Process-2, stopped)>, <Process(Process-3, stopped)>, <Process(Process-4, stopped)>, <Process(Process-5, stopped)>]
把p.join()注釋掉
p.start()
#p.join()
print "Process end."
print numList
c:\Python27\Scripts>python task_test.py
Process end.
Process end.
Process end.
Process end.
Process end.
[<Process(Process-1, started)>PP, rocess-1rocess-3< Process(Process-2, started)>s , tartings<
tartingProcess(Process-3, started)>P
wProcess-4 sworker rocess-2tarting
worker , orker 3
< 0sProcess(Process-4, started)>2
tarting,
<wProcess(Process-5, started)>orker ]
1
Process-5 starting
worker 4
練習:三個進程,每個進程寫一個文件,每個文件中有進程的名稱和日期
#coding=utf-8
import multiprocessing
import time
def do(n) :
fp=open(r"d:\\%s.txt"%n,'w')
name=multiprocessing.current_process().name
fp.write("%s %s"%(name,time.strftime("%Y-%m-%d %H:%M:%S")))
fp.close()
return
if __name__ == '__main__' :
numList = []
for i in xrange(3) :
p = multiprocessing.Process(target=do, args=(i,))
numList.append(p)
p.start()
p.join()
print "Process end."
print numList
Process-2 2018-03-31 14:55:34
Os.fork()和multiprocessing結合使用
#!/usr/bin/python
# -*- coding: utf-8 -*-
from multiprocessing import Process
import os
import time
def sleeper(name, seconds):
print "Process ID# %s" % (os.getpid())
print "Parent Process ID# %s" % (os.getppid())
#僅支持在linux上,一個進程會有父進程和自己的ID,windows上就沒有父進程id
print "%s will sleep for %s seconds" % (name, seconds)
time.sleep(seconds)
# if __name__ == "__main__":
child_proc = Process(target = sleeper, args = ('bob', 5))
child_proc.start()
print "in parent process after child process start"
print "parent process about to join child process"
child_proc.join()
print "in parent process after child process join"
print "the parent's parent process: %s" % (os.getppid())
沒運行成功
多進程模板程序
#coding=utf-8
import multiprocessing
import urllib2
import time
def func1(url) :
response = urllib2.urlopen(url)
html = response.read()
print html[0:20]
time.sleep(20)
def func2(url) :
response = urllib2.urlopen(url)
html = response.read()
print html[0:20]
time.sleep(20)
if __name__ == '__main__' :
p1 = multiprocessing.Process(target=func1,args=("http://www.sogou.com",),name="gloryroad1")
p2 = multiprocessing.Process(target=func2,args=("http://www.baidu.com",),name="gloryroad2")
p1.start()
p2.start()
p1.join()
p2.join()
time.sleep(10)
print "done!"
c:\Python27\Scripts>python task_test.py
<!DOCTYPE html>
<!--
<!DOCTYPE html>
<ht
done!
測試單進程和多進程程序執行的效率
#coding: utf-8
import multiprocessing
import time
def m1(x):
time.sleep(0.01)
return x * x
if __name__ == '__main__':
pool = multiprocessing.Pool(multiprocessing.cpu_count())
i_list = range(1000)
time1=time.time()
pool.map(m1, i_list)
time2=time.time()
print 'time elapse:',time2-time1
time1=time.time()
map(m1, i_list)
time2=time.time()
print 'time elapse:',time2-time1
c:\Python27\Scripts>python task_test.py
time elapse: 2.62400007248
time elapse: 10.2070000172
多線程只能用單核CPU
多進程好於多線程,因為多線程有同步鎖
進程池
在使用Python進行系統管理時,特別是同時操作多個文件目錄或者遠程控制多台主機,並行操作可以節約大量的時間。如果操作的對象數目不大時,還可以直接使用Process類動態的生成多個進程,十幾個還好,但是如果上百個甚至更多,那手動去限制進程數量就顯得特別的繁瑣,此時進程池就派上用場了。
Pool類可以提供指定數量的進程供用戶調用,當有新的請求提交到Pool中時,如果池還沒有滿,就會創建一個新的進程來執行請求。如果池滿,請求就會告知先等待,直到池中有進程結束,才會創建新的進程來執行這些請求。
pool類中方法
apply():
函數原型:apply(func[, args=()[, kwds={}]])
該函數用於傳遞不定參數,主進程會被阻塞直到函數執行結束(不建議使用,並且3.x以后不再使用)。
map()
函數原型:map(func, iterable[, chunksize=None])
Pool類中的map方法,與內置的map函數用法行為基本一致,它會使進程阻塞直到返回結果。
注意,雖然第二個參數是一個迭代器,但在實際使用中,必須在整個隊列都就緒后,程序才會運行子進程。
close()
關閉進程池(Pool),使其不再接受新的任務。
Close()要在join()之后使用
pool.close()#關閉進程池,不再接受新的任務
pool.join()#主進程阻塞等待子進程的退出
terminate()
立刻結束工作進程,不再處理未處理的任務。
join(),map(),close(),terminate(),apply()
join()
使主進程阻塞等待子進程的退出,join方法必須在close或terminate之后使用。
獲取CPU的核數
multiprocessing.cpu_count() #獲取cpu
創建進程池
#coding: utf-8
import multiprocessing
import os
import time
import random
def m1(x):
time.sleep(random.random()*4)
print "pid:",os.getpid(),x*x
return x * x
if __name__ == '__main__':
pool = multiprocessing.Pool(multiprocessing.cpu_count())
i_list = range(8)
print pool.map(m1, i_list)
c:\Python27\Scripts>python task_test.py
pid: 5584 0
pid: 8764 4
pid: 11708 9
pid: 6380 1
pid: 6380 49
pid: 5584 16
pid: 8764 25
pid: 11708 36
[0, 1, 4, 9, 16, 25, 36, 49]
pool.apply_async(f, [10]), result.get(timeout = 1)
async只創建一個進程,每次只能傳入一個參數,傳一個列表,會返回列表第一個元素返回得結果
創建簡單的進程池
#encoding=utf-8
from multiprocessing import Pool
def f(x):
return x * x
if __name__ == '__main__':
pool = Pool(processes = 4) # start 4 worker processes
#下邊這行是異步,不等待,一執行就一直往下走
result = pool.apply_async(f, [10]) # evaluate "f(10)" asynchronously
print result.get(timeout = 1)
print pool.map(f, range(10)) # prints "[0, 1, 4,..., 81]"
c:\Python27\Scripts>python task_test.py
100
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
#encoding=utf-8
from multiprocessing import Pool
import time
def f(x):
time.sleep(2)
return x * x
if __name__ == '__main__':
pool = Pool(processes = 4) # start 4 worker processes
result = pool.apply_async(f, [10]) # evaluate "f(10)" asynchronously
print result.get(timeout = 1)
print pool.map(f, range(10)) # prints "[0, 1, 4,..., 81]"
c:\Python27\Scripts>python task_test.py
Traceback (most recent call last):
File "task_test.py", line 12, in< module>
print result.get(timeout = 1)
File "C:\Python27\lib\multiprocessing\pool.py", line 568, in get
raise TimeoutError
multiprocessing.TimeoutError
help(pool.apply_async)
>>> help(pool.apply_async)
Help on method apply_async in module multiprocessing.pool:
apply_async(self, func, args=(), kwds={}, callback=None) method of multiprocessing.pool.Pool instance
Asynchronous equivalent of `apply()` builtin
#encoding=utf-8
from multiprocessing import Pool
import time
def f(x):
time.sleep(2)
return x * x
if __name__ == '__main__':
pool = Pool(processes = 4) # start 4 worker processes
result = pool.apply_async(f, [10]) # evaluate "f(10)" asynchronously
print result.get()#這里沒有設置時間,那進程就會死等
print pool.map(f, range(10)) # prints "[0, 1, 4,..., 81]"
也可以創建多個很多進程
if __name__ == '__main__':
pool = Pool(processes = 40) # start 4 worker processes
進程切換快,還是線程切換快,
切換線程時,是不需要切換上下文的,進程的上下文不需要動
from multiprocessing import Pool
def f(*x):
for i in x:
return i * i
if __name__ == '__main__':
pool = Pool(processes = 4) # start 4 worker processes
result = pool.apply_async(f,tuple(range(10))) # evaluate "f(10)" asynchronously
print result.get(timeout = 1)
print pool.map(f, range(10)) # prints "[0, 1, 4,..., 81]"
c:\Python27\Scripts>python task_test.py
0
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
多進程
#encoding=utf-8
from multiprocessing import Pool
def f(x):
return x * x
if __name__ == '__main__':
pool = Pool(processes = 4) # start 4 worker processes
result1 = pool.apply_async(f, [10])
result2 = pool.apply_async(f, [100])
result3 = pool.apply_async(f, [1000]) # evaluate "f(10)" asynchronously
print result1.get(timeout = 1)
print result2.get(timeout = 1)
print result3.get(timeout = 1)
print pool.map(f, range(10)) # prints "[0, 1, 4,..., 81]"
c:\Python27\Scripts>python task_test.py
100
10000
1000000
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
修改:
#encoding=utf-8
from multiprocessing import Pool
import time
def f(x):
print "hello"
time.sleep(2)
print "over"
return x * x
if __name__ == '__main__':
pool = Pool(processes = 4) # start 4 worker processes
result1 = pool.apply_async(f, [10])
result2 = pool.apply_async(f, [100])
result3 = pool.apply_async(f, [1000]) # evaluate "f(10)" asynchronously
print result1.get(timeout = 3)
print result2.get(timeout = 3)
print result3.get(timeout = 3)
print pool.map(f, range(10)) # prints "[0, 1, 4,..., 81]"
c:\Python27\Scripts>python task_test.py
helloh
ello
hello
over
over
over
100
10000
1000000
hhhhelloelloelloello
ooverver
hhooelloelloverver
hhelloello
over
hoellover
oohververello
over
over
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
注掉最后一行:
#encoding=utf-8
from multiprocessing import Pool
import time
def f(x):
print "hello"
time.sleep(2)
print "over"
return x * x
if __name__ == '__main__':
pool = Pool(processes = 4) # start 4 worker processes
result1 = pool.apply_async(f, [10])
result2 = pool.apply_async(f, [100])
result3 = pool.apply_async(f, [1000]) # evaluate "f(10)" asynchronously
print result1.get(timeout = 3)
print result2.get(timeout = 3)
print result3.get(timeout = 3)
#print pool.map(f, range(10)) # prints "[0, 1, 4,..., 81]"
c:\Python27\Scripts>python task_test.py
hello
hello
hello
over
over
over
100
10000
1000000
修改為同步:
#encoding=utf-8
from multiprocessing import Pool
import time
def f(x):
print "hello"
time.sleep(2)
print "over"
return x * x
if __name__ == '__main__':
f(10)
f(109)
f(1000)
c:\Python27\Scripts>python task_test.py
hello
over
hello
over
hello
over
單進程和多進程時間比較
#encoding=utf-8
import time
from multiprocessing import Pool
def run(fn):
#fn: 函數參數是數據列表的一個元素
time.sleep(1)
return fn * fn
if __name__ == "__main__":
testFL = [1,2,3,4,5,6]
print 'Single process execution sequence:' #順序執行(也就是串行執行,單進程)
s = time.time()
for fn in testFL:
run(fn)
e1 = time.time()
print u"順序執行時間:", int(e1 - s)
print 'concurrent:' #創建多個進程,並行執行
pool = Pool(5) #創建擁有5個進程數量的進程池
#testFL:要處理的數據列表,run:處理testFL列表中數據的函數
rl =pool.map(run, testFL)
pool.close()#關閉進程池,不再接受新的任務
pool.join()#主進程阻塞等待子進程的退出
e2 = time.time()
print u"並行執行時間:", int(e2 - e1)
print rl
c:\Python27\Scripts>python task_test.py
Single process execution sequence:
順序執行時間: 6
concurrent:
並行執行時間: 2
[1, 4, 9, 16, 25, 36]
上例是一個創建多個進程並發處理與順序執行處理同一數據,所用時間的差別。從結果可以看出,並發執行的時間明顯比順序執行要快很多,但是進程是要耗資源的,所以進程數也不能開太大。
程序中的r1表示全部進程執行結束后全局的返回結果集,run函數有返回值,所以一個進程對應一個返回結果,這個結果存在一個列表中,也就是一個結果堆中,實際上是用了隊列的原理,等待所有進程都執行完畢,就返回這個列表(列表的順序不定)。
對Pool對象調用join()方法會等待所有子進程執行完畢,調用join()之前必須先調用close(),讓其不再接受新的Process。
同步進程(消息隊列Queue&JoinableQueue)
隊列,先進先出
multiprocessing.Queue類似於queue.Queue,一般用來多個進程間交互信息。Queue是進程和線程安全的。它實現了queue.Queue的大部分方法,但task_done()和join()沒有實現。
multiprocessing.JoinableQueue是multiprocessing.Queue的子類,增加了task_done()方法和join()方法。
task_done():用來告訴queue一個task完成。一般在調用get()時獲得一個task,在task結束后調用task_done()來通知Queue當前task完成。
join():阻塞直到queue中的所有的task都被處理(即task_done方法被調用)。
Join()是主程序等我這個進程執行完畢了,程序才往下走
#encoding=utf-8
from multiprocessing import Process, Queue
def offer(queue):
# 入隊列
queue.put("Hello World")
if __name__ == '__main__':
# 創建一個隊列實例
q = Queue()
p = Process(target = offer, args = (q,))
p.start()
print q.get() # 出隊列
p.join()
c:\Python27\Scripts>python task_test.py
Hello World
不同的進程之間不能修改對方的變量
但進程里的線程可以,因為線程在同一個進程里,變量共享
>>> import Queue
>>> myqueue = Queue.Queue(maxsize = 10)
>>> myqueue.put(1)
>>> myqueue.put(2)
>>> myqueue.put(3)
>>> myqueue.get()
1
>>> myqueue.get()
2
>>> myqueue.get()
3
>>>import Queue
>>>q=Queue.Queue(5)
>>>q
<Queue.Queue instance at 0x04EA1968>
>>>q.put(1)
>>>q.put(2)
>>>q.put(3)
>>>q.put(4)
>>>q.put(5)
>>>q.full()
True
>>>q.qsize()
5
>>>q.empty()
False
>>>q.get(2)
1
>>>
>>>q.empty()
True
q.get(False,2),q.get(True,2)
True表示等2秒,如果隊列空就報錯
False表示不等,如果隊列空立即報錯
>>>q.get(False,2)
Traceback (most recent call last):
File "<stdin>", line 1, in< module>
File "C:\Python27\lib\Queue.py", line 165, in get
raise Empty
Queue.Empty
>>>q.get(True,2)
Traceback (most recent call last):
File "<stdin>", line 1, in< module>
File "C:\Python27\lib\Queue.py", line 176, in get
raise Empty
Queue.Empty
get(self, block=True, timeout=None)
| Remove and return an item from the queue.
|
| If optional args 'block' is true and 'timeout' is None (the default),
| block if necessary until an item is available. If 'timeout' is
| a non-negative number, it blocks at most 'timeout' seconds and raises
| the Empty exception if no item was available within that time.
| Otherwise ('block' is false), return an item if one is immediately
| available, else raise the Empty exception ('timeout' is ignored
| in that case).
>>>try:
... q.get(False)
... except Queue.Empty:
... print "empty!"
...
empty!
>>>
練習:兩個進程,一個往隊列里寫10個數,一個進程讀取隊列的10個數
#encoding=utf-8
from multiprocessing import Process, Queue
def offer(queue):
# 入隊列
for i in range(10):
queue.put("%s"%i)
def getter(queue):
for i in range(10):
print queue.get(False,1)
if __name__ == '__main__':
# 創建一個隊列實例
q = Queue()
p = Process(target = offer, args = (q,))
p2= Process(target = getter, args = (q,))
p.start()
p2.start()
p.join()
p2.join()
c:\Python27\Scripts>python task_test.py
0
1
2
3
4
5
6
7
8
9
#encoding=utf-8
import time
from multiprocessing import Process, Queue
def set_data(queue):
# 入隊列
for i in range(10):
time.sleep(2)
queue.put("Hello World"+str(i))
def get_data(queue):
for i in range(10):
# 入隊列
time.sleep(1)
print queue.get("Hello World")
if __name__ == '__main__':
# 創建一個隊列實例
q = Queue()
p1 = Process(target = set_data, args = (q,))
p2 = Process(target = get_data, args = (q,))
p1.start()
p2.start()
p1.join()
p2.join()
print u"隊列是否為空?",q.empty()
c:\Python27\Scripts>python task_test.py
Hello World0
Hello World1
Hello World2
Hello World3
Hello World4
Hello World5
Hello World6
Hello World7
Hello World8
Hello World9
隊列是否為空? True
這就是兩個進程通過隊列進行信息同步
下面的程序需要看下結果
#encoding=utf-8
from multiprocessing import Process, Queue
import time
def offer(queue):
# 入隊列
for i in range(10):
time.sleep(2)
queue.put("%s"%i)
def getter(queue):
for i in range(10):
time.sleep(1)
print queue.get(False,1)
print "queue.qsize():",queue.qsize()
if __name__ == '__main__':
# 創建一個隊列實例
q = Queue()
p = Process(target = offer, args = (q,))
p2= Process(target = getter, args = (q,))
p.start()
p2.start()
p.join()
p2.join()
會報錯,因為2秒寫一次,1就讀,會讀空,所以報錯
c:\Python27\Scripts>python task_test.py
Process Process-2:
Traceback (most recent call last):
File "C:\Python27\lib\multiprocessing\process.py", line 267, in _bootstrap
self.run()
File "C:\Python27\lib\multiprocessing\process.py", line 114, in run
self._target(*self._args, **self._kwargs)
File "c:\Python27\Scripts\task_test.py", line 12, in getter
print queue.get(False,timeout=1)
File "C:\Python27\lib\multiprocessing\queues.py", line 134, in get
raise Empty
Empty
進程同步(使用queue)
#encoding=utf-8
from multiprocessing import Process, Queue
import os, time, random
# 寫數據進程執行的代碼:
def write(q):
for value in ['A', 'B', 'C']:
print 'Put %s to queue...' % value
q.put(value)
time.sleep(random.random())
# 讀數據進程執行的代碼
def read(q):
time.sleep(1)
while not q.empty():
# if not q.empty():
print 'Get %s from queue.' % q.get(True)
time.sleep(1) # 目的是等待寫隊列完成
if __name__=='__main__':
# 父進程創建Queue,並傳給各個子進程
q = Queue()
pw = Process(target = write, args = (q,))
pr = Process(target = read, args = (q,))
# 啟動子進程pw,寫入:
pw.start()
# 啟動子進程pr,讀取:
pr.start()
# 等待pw結束:
pw.join()
pr.join()
print "Done!"
c:\Python27\Scripts>python task_test.py
Put A to queue...
Put B to queue...
Get A from queue.
Put C to queue...
Get B from queue.
Get C from queue.
Done!
由於操作系統對進程的調度時間不一樣,所以該程序每次執行的結果均不一樣。
程序讀隊列函數中為什么要加一句time.sleep(1),目的是等待些進程將數據寫到隊列中,防止有時寫進程還沒將數據寫進隊列,讀進程就開始讀了,導致讀不到數據。但是這種並不能有效的預防此種情況的出現。
下面是一片關於多進程的帖子,可以幫助理解一下
https://www.cnblogs.com/ManyQian/p/8930818.html