python-進程之間通信、多線程介紹


一、進程之間通信

進程的任務有三種狀態:運行,就緒,阻塞。

加鎖可以讓多個進程修改同一塊數據時,同一時間只能由一個任務可以進行修改,即串行的修改。犧牲了速度,保證了數據安全。

雖然可以使用文件共享數據實現進程間的通信,但是效率太低,還需要自己加鎖處理。為了解決這些問題,便使用到了multiprocessing模塊為我們提供的基於消息的IPC通信機制:隊列和管道

1.隊列和管道都是將數據存放於內存中

2.隊列是基於管道+鎖的機制實現的。

我們應該盡量避免使用共享數據,多使用隊列。

隊列:

創建隊列的類:

Queue([maxsize]):創建共享的進程隊列,Queue是多進程安全的隊列,可以使用Queue實現多進程之間的數據傳遞。 

maxsize是隊列中允許的最大項數,省略則無大小限制

q = Queue()
q.put() 括號里可以是任意類型,不能是大數據

主要方法:

1 q.put方法用以插入數據到隊列中,put方法還有兩個可選參數:blocked和timeout。如果blocked為True(默認值),並且timeout為正值,該方法會阻塞timeout指定的時間,直到該隊列有剩余的空間。如果超時,會拋出Queue.Full異常。如果blocked為False,但該Queue已滿,會立即拋出Queue.Full異常。沒有參數時,q.put的個數大於隊列數時,會一直阻塞住。
2 q.get方法可以從隊列讀取並且刪除一個元素。同樣,get方法有兩個可選參數:blocked和timeout。如果blocked為True(默認值),並且timeout為正值,那么在等待時間內沒有取到任何元素,會拋出Queue.Empty異常。如果blocked為False,有兩種情況存在,如果Queue有一個值可用,則立即返回該值,否則,如果隊列為空,則立即拋出Queue.Empty異常。沒有參數時,q.get的個數大於隊列數時,會一直阻塞住。

3.q.put_nowait()等價於q.put(block=False)隊列滿時再存也會拋異常
   q.get_nowait()等價於q.get(block=False)隊列為空取不出時會拋異常
from multiprocessing import Queue

q=Queue(3)  #1.不應該放大數據  2.可以任意類型
q.put(['first',])
q.put({'x':2,})
q.put(3)
# q.put(6)#阻塞在存放的時候,當有內容被取出時才推出阻塞模式

print(q.get())
print(q.get())
print(q.get())
# print(q.get())#阻塞在取的時候,當有新內容添加進去才退出阻塞狀態
test

生產者消費者模型:

在並發編程中使用生產者和消費者模式能夠解決絕大多數並發問題。該模式通過平衡生產線程和消費線程的工作能力來提高程序的整體處理數據的速度。通俗的講就是當程序中出現明顯的兩類任務,一類負責生產數據,一類負責處理數據,就可以引入這個模型來實現生產者與消費者的解耦合,平衡生產力與消費能力,從而提升效率。

初級版本:

import time
import random
from multiprocessing import Process,Queue

def producer(name,food,q):
    for i in range(4):
        res = '%s%s'%(food,i)
        time.sleep(random.randint(1,3))
        q.put(res)
        print('廚師[%s]生產了%s'%(name,res))

def consumer(name,q):
    while True:
        res = q.get()
        # if res ==None:break
        time.sleep(random.randint(1,3))
        print('吃貨%s吃了%s'%(name,res))


if __name__ == '__main__':
    q = Queue()
    #生產者
    p1 = Process(target=producer,args=('egon','包子',q))
    p2 = Process(target=producer,args=('lxx','豆漿',q))
    p3 = Process(target=producer,args=('cw','油條',q))
    #消費者
    c1 = Process(target=consumer,args=('alex',q))
    c2 = Process(target=consumer,args=('yxf',q))
    p1.start()
    p2.start()
    p3.start()
    c1.start()
    c2.start()
    print('')
test1

這個時候就有一個問題,主進程一直在阻塞狀態,永遠不會結束。這是因為消費者在取空以后 還一直在原地阻塞着。

解決方法:讓生產者在生產完畢后再往隊列中發送一個結束信號,這樣消費者在收到結束信號后跳出循環

版本二:

import time
import random
from multiprocessing import Process,Queue

def producer(name,food,q):
    for i in range(4):
        res = '%s%s'%(food,i)
        time.sleep(random.randint(1,3))
        q.put(res)
        print('廚師[%s]生產了%s'%(name,res))


def consumer(name,q):
    while True:
        res = q.get()
        if res ==None:break
        time.sleep(random.randint(1,3))
        print('吃貨%s吃了%s'%(name,res))


if __name__ == '__main__':
    q = Queue()
    #生產者
    p1 = Process(target=producer,args=('egon','包子',q))
    p2 = Process(target=producer,args=('lxx','豆漿',q))
    p3 = Process(target=producer,args=('cw','油條',q))
    #消費者
    c1 = Process(target=consumer,args=('alex',q))
    c2 = Process(target=consumer,args=('yxf',q))

    p1.start()
    p2.start()
    p3.start()
    c1.start()
    c2.start()
    p1.join()
    p2.join()
    p3.join()
    q.put(None)
    q.put(None)
    q.put(None)
    print('')
test2

但是有幾個生產者我們就要發送幾個None,這不夠優雅,於是我們導入了JoinableQueue模塊

#JoinableQueue([maxsize]):這就像是一個Queue對象,但隊列允許項目的使用者通知生成者項目已經被成功處理。通知進程是使用共享的信號和條件變量來實現的。

   #參數介紹:
    maxsize是隊列中允許最大項數,省略則無大小限制。    
  #方法介紹:
    JoinableQueue的實例p除了與Queue對象相同的方法之外還具有:
    q.task_done():使用者使用此方法發出信號,表示q.get()的返回項目已經被處理。如果調用此方法的次數大於從隊列中刪除項目的數量,將引發ValueError異常
    q.join():生產者調用此方法進行阻塞,直到隊列中所有的項目均被處理。阻塞將持續到隊列中的每個項目均調用q.task_done()方法為止

版本三:

import time
import random
from multiprocessing import Process,JoinableQueue

def producer(name,food,q):
    for i in range(4):
        res = '%s%s'%(food,i)
        time.sleep(random.randint(1,3))
        q.put(res)
        print('廚師[%s]生產了%s'%(name,res))


def consumer(name,q):
    while True:
        res = q.get()
        time.sleep(random.randint(1,3))
        print('吃貨%s吃了%s'%(name,res))
        q.task_done()

if __name__ == '__main__':
    q = JoinableQueue()

    #生產者
    p1 = Process(target=producer,args=('egon','包子',q))
    p2 = Process(target=producer,args=('lxx','豆漿',q))
    p3 = Process(target=producer,args=('cw','油條',q))
    #消費者
    c1 = Process(target=consumer,args=('alex',q))
    c2 = Process(target=consumer,args=('yxf',q))
    c1.daemon=True
    c2.daemon=True

    p1.start()
    p2.start()
    p3.start()
    c1.start()
    c2.start()
    p1.join()
    p2.join()
    p3.join()

    q.join()#主進程等q結束,即q內數據被取干凈了。
    print('')
View Code

 二、線程理論

進程是資源單位,由若干線程組成的,一個進程至少有一個線程,線程是操作系統直接支持的執行單元。一個進程相當於一塊空間,空間內有若干的線程,就好比進程是一個車間,線程就是車間的流水線。同一個進程之間的線程是資源共享的。進程是程序執行的過程,那么線程就是程序執行的代碼。

線程相比進程的優點:1.同一進程下,多個線程共享進程內的資源

          2.創建線程的開銷遠遠小於進程

三、線程的使用

(一).創建線程的兩種方式(和進程類似)

from threading import Thread
import time

def task(name):
    print('%s is running'%name)
    time.sleep(2)
    print('%s is done'%name)

if __name__ == '__main__':
    t = Thread(target=task,args=('進程一',))
    t.start()
    print('')
方式一

直接導入threading模塊

from threading import Thread
import time

class Mythead(Thread):
    def __init__(self,name):
        super().__init__()
        self.name = name

    def run(self):
        print('%s is running' % self.name)
        time.sleep(0.1)
        print('%s is done' % self.name)

if __name__ == '__main__':
    t = Mythead('線程一')
    t.start()
    print('')
方式二

繼承Therad類重寫run方法

(二).線程特性介紹

os模塊下的getpid()線程也適用
current_thread#查看當前線程
active_count#查看線程的活躍個數(線程已結束的不是活躍狀態)
current_thread().name查看線程名字
略

(三).守護線程

無論是進程還是線程都遵循  某線程/進程等待 主線程/進程  運行完畢后才會被銷毀。強調:運行完畢並非終止運行

#1.對主進程來說,運行完畢指的是主進程代碼運行完畢。
    主進程在其代碼結束后就已經算運行完畢了(守護進程在此時就被回收),然后主進程會一直等非守護的子進程都運行完畢后回收子進程的資源(否則會產生僵屍進程),才會結束,

#2.對主線程來說,運行完畢指的是主線程所在的進程內所有非守護線程統統運行完畢,主線程才算運行完畢。
    主線程在其他非守護線程運行完畢后才算運行完畢(守護線程在此時就被回收)。因為主線程的結束意味着進程的結束,進程整體的資源都將被回收,而進程必須保證非守護線程都運行完畢后才能結束。
from threading import Thread
from multiprocessing import Process
import time
def foo():
    print(123)
    time.sleep(1)
    print("end123")

def bar():
    print(456)
    time.sleep(3)
    print("end456")

if __name__ == '__main__':
    t1=Thread(target=foo)
    t2=Thread(target=bar)

    # t1=Process(target=foo)
    # t2=Process(target=bar)
    t1.daemon=True
    t1.start()
    t2.start()
    print("main-------")
進程與線程的區別

(四).線程的互斥鎖

不加鎖的情況

from threading import Thread,Lock
import time

mutex=Lock()
n=100
def task():
    global n
    # mutex.acquire()
    temp=n
    time.sleep(0.1)
    n=temp-1
    # mutex.release()

if __name__ == '__main__':
    t_l=[]
    for i in range(100):
        t=Thread(target=task)
        t_l.append(t)
        t.start()

    for t in t_l:
        t.join()
    print(n)
View Code

結果永遠為99,因為線程速度太快了,並發的時候都讀到了n=100,所以需要給他加上鎖

加上鎖的情況

from threading import Thread,Lock
import time

mutex=Lock()
n=100
def task():
    global n
    mutex.acquire()
    temp=n
    time.sleep(0.1)
    n=temp-1
    mutex.release()

if __name__ == '__main__':
    t_l=[]
    for i in range(100):
        t=Thread(target=task)
        t_l.append(t)
        t.start()

    for t in t_l:
        t.join()
    print(n)
View Code

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM