聽說過的多進程,多線程到底是什么鬼


線程

1.何為線程

線程是操作系統能夠調度的最小單位,被包含在進程中,是進程的實際運作單位。一個進程可以並發多個線程。

2.線程的語法

創建並調用線程

 1 import threading
 2 import time
 3 
 4 def sayhi(num): #定義每個線程要運行的函數
 5 
 6     print("running on number:%s" %num)
 7 
 8     time.sleep(3)
 9 
10 if __name__ == '__main__':
11 
12     t1 = threading.Thread(target=sayhi,args=(1,)) #生成一個線程實例
13     t2 = threading.Thread(target=sayhi,args=(2,)) #生成另一個線程實例
14 
15     t1.start() #啟動線程
16     t2.start() #啟動另一個線程

此外還有一種繼承式調用

 1 import threading
 2 import time
 3 
 4 class MyThread(threading.Thread):
 5     def __init__(self,num):
 6         threading.Thread.__init__(self)
 7         self.num = num
 8 
 9     def run(self):#定義每個線程要運行的函數
10 
11         print("running on number:%s" %self.num)
12 
13         time.sleep(3)
14 
15 if __name__ == '__main__':
16 
17     t1 = MyThread(1)
18     t2 = MyThread(2)
19     t1.start()
20     t2.start()
繼承式調用

另外線程還有一些內置方法

  • start             線程准備就緒,等待CPU調度
  • setName       為線程設置名稱
  • getName       獲取線程名稱
  • setDaemon   設置為后台線程或前台線程(默認)
                       如果是后台線程,主線程執行過程中,后台線程也在進行,主線程執行完畢后,后台線程不論成功與否,均停止
                       如果是前台線程,主線程執行過程中,前台線程也在進行,主線程執行完畢后,等待前台線程也執行完成后,程序停止
  • join              逐個執行每個線程,執行完畢后繼續往下執行,該方法使得多線程變得無意義
  • run              線程被cpu調度后自動執行線程對象的run方法

此外我們還可以自定義線程

 1 import threading
 2 import time
 3  
 4  
 5 class MyThread(threading.Thread):
 6     def __init__(self,num):
 7         threading.Thread.__init__(self)
 8         self.num = num
 9  
10     def run(self):#定義每個線程要運行的函數
11  
12         print("running on number:%s" %self.num)
13  
14         time.sleep(3)
15  
16 if __name__ == '__main__':
17  
18     t1 = MyThread(1)
19     t2 = MyThread(2)
20     t1.start()
21     t2.start()
自定義線程

線程鎖(互斥鎖)

要知道,線程之間是可以共享數據的,同時線程之間是隨機調度的。這也就意味着多個線程修改同一項數據時會出現臟數據的情況。這時,就需要我們設置線程鎖。這樣同一個時刻就只能允許一個線程執行操作。

 1 import time
 2 import threading
 3 
 4 def addNum():
 5     global num #在每個線程中都獲取這個全局變量
 6     print('--get num:',num )
 7     time.sleep(1)
 8     lock.acquire() #修改數據前加鎖
 9     num  -=1 #對此公共變量進行-1操作
10     lock.release() #修改后釋放
11 
12 num = 100  #設定一個共享變量
13 thread_list = []
14 lock = threading.Lock() #生成全局鎖
15 for i in range(100):
16     t = threading.Thread(target=addNum)
17     t.start()
18     thread_list.append(t)
19 
20 for t in thread_list: #等待所有線程執行完畢
21     t.join()
22 
23 print('final num:', num )

遞歸鎖

紙面理解,就是鎖中鎖。不過,一般我們不會用到這么繁瑣的語法。這樣不僅使得代碼更加晦澀難懂,同時也會使自己的代碼出現邏輯錯誤。稍作了解就好。

 1 import threading,time
 2 
 3 def run1():
 4     print("grab the first part data")
 5     lock.acquire()
 6     global num
 7     num +=1
 8     lock.release()
 9     return num
10 def run2():
11     print("grab the second part data")
12     lock.acquire()
13     global  num2
14     num2+=1
15     lock.release()
16     return num2
17 def run3():
18     lock.acquire()
19     res = run1()
20     print('--------between run1 and run2-----')
21     res2 = run2()
22     lock.release()
23     print(res,res2)
24 
25 
26 if __name__ == '__main__':
27 
28     num,num2 = 0,0
29     lock = threading.RLock()
30     for i in range(10):
31         t = threading.Thread(target=run3)
32         t.start()
33 
34 while threading.active_count() != 1:
35     print(threading.active_count())
36 else:
37     print('----all threads done---')
38     print(num,num2)
遞歸鎖

信號量

線程鎖的存在使得一份數據同時只允許一個線程修改,當我們想指定多個線程修改同一份數據時,就可以使用信號量。

 1 import threading,time
 2 
 3 def run(n):
 4     semaphore.acquire()
 5     time.sleep(1)
 6     print("run the thread: %s\n" %n)
 7     semaphore.release()
 8 
 9 if __name__ == '__main__':
10 
11     num= 0
12     semaphore  = threading.BoundedSemaphore(5) #最多允許5個線程同時運行
13     for i in range(20):
14         t = threading.Thread(target=run,args=(i,))
15         t.start()
16 
17 while threading.active_count() != 1:
18     pass #print threading.active_count()
19 else:
20     print('----all threads done---')
21     print(num)

事件

通過Event來實現兩個或者多個線程間的交互,事件主要提供了三個方法 set、wait、clear。

事件處理的機制:全局定義了一個“Flag”,如果“Flag”值為 False,那么當程序執行 event.wait 方法時就會阻塞,如果“Flag”值為True,那么event.wait 方法時便不再阻塞。

  • clear:將“Flag”設置為False
  • set:將“Flag”設置為True
import threading,time
import random
def light():
    if not event.isSet():
        event.set() #wait就不阻塞 #綠燈狀態
    count = 0
    while True:
        if count < 10:
            print('\033[42;1m--green light on---\033[0m')
        elif count <13:
            print('\033[43;1m--yellow light on---\033[0m')
        elif count <20:
            if event.isSet():
                event.clear()
            print('\033[41;1m--red light on---\033[0m')
        else:
            count = 0
            event.set() #打開綠燈
        time.sleep(1)
        count +=1
def car(n):
    while 1:
        time.sleep(random.randrange(10))
        if  event.isSet(): #綠燈
            print("car [%s] is running.." % n)
        else:
            print("car [%s] is waiting for the red light.." %n)
if __name__ == '__main__':
    event = threading.Event()
    Light = threading.Thread(target=light)
    Light.start()
    for i in range(3):
        t = threading.Thread(target=car,args=(i,))
        t.start()

條件

使得線程等待,只有滿足某條件時,才釋放n個線程

 1 import threading
 2  
 3 def run(n):
 4     con.acquire()
 5     con.wait()
 6     print("run the thread: %s" %n)
 7     con.release()
 8  
 9 if __name__ == '__main__':
10  
11     con = threading.Condition()
12     for i in range(10):
13         t = threading.Thread(target=run, args=(i,))
14         t.start()
15  
16     while True:
17         inp = input('>>>')
18         if inp == 'q':
19             break
20         con.acquire()
21         con.notify(int(inp))
22         con.release()

守護線程

設置守護線程,則其沒有被設置的線程為主線。當主線程退出時,無論守護線程是否完成都會退出。

 1 import time
 2 import threading
 3 
 4 
 5 def run(n):
 6 
 7     print('[%s]------running----\n' % n)
 8     time.sleep(2)
 9     print('--done--')
10 
11 def main():
12     for i in range(5):
13         t = threading.Thread(target=run,args=[i,])
14         t.start()
15         t.join(1)
16         print('starting thread', t.getName())
17 
18 
19 m = threading.Thread(target=main,args=[])
20 m.setDaemon(True) #將main線程設置為Daemon線程,它做為程序主線程的守護線程,當主線程退出時,m線程也會退出,由m啟動的其它子線程會同時退出,不管是否執行完任務
21 m.start()
22 m.join(timeout=2)
23 print("---main thread done----")

Timer

定時器,指定n秒后執行某操作

from threading import Timer
 
 
def hello():
    print("hello, world")
 
t = Timer(1, hello)
t.start()  # after 1 seconds, "hello, world" will be printed

一個小小的補充 GIL(全局解釋權鎖)

在cpython的解釋器中才會存在GIL,其他的jpython這樣的編譯環境就不會存在這種情況。那么這個GIL到底是什么呢,它的存在會有什么影響呢?不急,聽我娓娓道來。

實際上,無論你啟多少個線程,你有多少個cpu, Python在執行的時候會淡定的在同一時刻只允許一個線程運行。當你使用多線程時,似乎感覺到了並發。這只是因為python上下文切換的太快了給你的錯覺。

那么GIL有什么作用呢?其實,GIL是來保證同一時間只能有一個線程來執行。

似乎和線程鎖有些相似?並不盡然,線程鎖是用戶態的鎖,而GIL則是編譯器自帶的。2.7版本以后,貌似線程鎖又被封裝到了編譯器中變得和GIL差不多,所以發現即使不使用線程鎖也不會出現臟數據(有待考證)。

 隊列queue

隊列在線程編程中特別有用,因為信息必須在多個線程之間安全地交換。現在所講的是python自帶的隊列,在后面的學習中會更加深入。目前稍作了解。

為什么我要使用隊列?

使用隊列對於我們編程而言有以下兩點好處:

  • 解耦,使程序之間實現松耦合
  • 提高處理效率

隊列的語法

  • class queue.Queue(maxsize=0)  #先入先出
  • class queue.LifoQueue(maxsize=0) #后入先出
  • class queue.PriortyQueue(maxsize=0) #存儲數據時可設置優先級隊列
 1 import queue
 2 
 3 q = queue.Queue()
 4 
 5 q.put(1)
 6 q.put(2)
 7 q.put(3)
 8 
 9 print(q.get())
10 print(q.get())
11 print(q.get())
12 
13 
14 #結果
15 1
16 2
17 3
先入先出
 1 import queue
 2 
 3 q = queue.LifoQueue()
 4 
 5 q.put(1)
 6 q.put(2)
 7 q.put(3)
 8 
 9 print(q.get())
10 print(q.get())
11 print(q.get())
12 
13 #結果
14 3
15 2
16 1
先入后出
 1 import queue
 2 
 3 q = queue.PriorityQueue()
 4 
 5 q.put((2,1))
 6 q.put((3,2))
 7 q.put((1,3))
 8 
 9 print(q.get())
10 print(q.get())
11 print(q.get())
12 
13 #結果
14 (1, 3)
15 (2, 1)
16 (3, 2)
指定輸出

生產者消費者模型

在並發編程中使用生產者和消費者模式能夠解決絕大多數並發問題。該模式通過平衡生產線程和消費線程的工作能力來提高程序的整體處理數據的速度。

為什么要使用生產者和消費者模式

在線程世界里,生產者就是生產數據的線程,消費者就是消費數據的線程。在多線程開發當中,如果生產者處理速度很快,而消費者處理速度很慢,那么生產者就必須等待消費者處理完,才能繼續生產數據。同樣的道理,如果消費者的處理能力大於生產者,那么消費者就必須等待生產者。為了解決這個問題於是引入了生產者和消費者模式。

什么是生產者消費者模式

生產者消費者模式是通過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通訊,而通過阻塞隊列來進行通訊,所以生產者生產完數據之后不用等待消費者處理,直接扔給阻塞隊列,消費者不找生產者要數據,而是直接從阻塞隊列里取,阻塞隊列就相當於一個緩沖區,平衡了生產者和消費者的處理能力。

下面來學習一個最基本的生產者消費者模型的例子

 1 import time,random
 2 import queue,threading
 3 q = queue.Queue()
 4 def Producer(name):
 5   count = 0
 6   while count <20:
 7     time.sleep(random.randrange(3))
 8     q.put(count)
 9     print('Producer %s has produced %s baozi..' %(name, count))
10     count +=1
11 def Consumer(name):
12   count = 0
13   while count <20:
14     time.sleep(random.randrange(4))
15     if not q.empty():
16         data = q.get()
17         print(data)
18         print('\033[32;1mConsumer %s has eat %s baozi...\033[0m' %(name, data))
19     else:
20         print("-----no baozi anymore----")
21     count +=1
22 p1 = threading.Thread(target=Producer, args=('A',))
23 c1 = threading.Thread(target=Consumer, args=('B',))
24 p1.start()
25 c1.start()

 

多進程

1.何為進程

以一個整體的形式暴露給操作系統管理,里面包含了對各種資源的調用,內存的管理,網絡管理的接口等資源,對各種資源的管理的集合,就稱為進程。

2.多進程的語法

創建並調用進程

 1 from multiprocessing import Process
 2 import time
 3 def f(name):
 4     time.sleep(2)
 5     print('hello', name)
 6 
 7 if __name__ == '__main__':
 8     p = Process(target=f, args=('bob',))
 9     p.start()
10     p.join()
 1 from multiprocessing import Process
 2 import os
 3 
 4 def info(title):
 5     print(title)
 6     print('module name:', __name__)
 7     print('parent process:', os.getppid())
 8     print('process id:', os.getpid())
 9     print("\n\n")
10 
11 def f(name):
12     info('\033[31;1mfunction f\033[0m')
13     print('hello', name)
14 
15 if __name__ == '__main__':
16     info('\033[32;1mmain process line\033[0m')
17     p = Process(target=f, args=('bob',))
18     p.start()
19     p.join()
獲取進程id

進程間的通訊

不同進程間內存是不共享的,要想實現兩個進程間的數據交換,可以用以下方法:

  • Queues
 1 from multiprocessing import Process, Queue
 2  
 3 def f(q):
 4     q.put([42, None, 'hello'])
 5  
 6 if __name__ == '__main__':
 7     q = Queue()
 8     p = Process(target=f, args=(q,))
 9     p.start()
10     print(q.get())    # prints "[42, None, 'hello']"
11     p.join()
  • Pipes
 1 from multiprocessing import Process, Pipe
 2  
 3 def f(conn):
 4     conn.send([42, None, 'hello'])
 5     conn.close()
 6  
 7 if __name__ == '__main__':
 8     parent_conn, child_conn = Pipe()
 9     p = Process(target=f, args=(child_conn,))
10     p.start()
11     print(parent_conn.recv())   # prints "[42, None, 'hello']"
12     p.join()
  • Managers(數據可共享)
 1 from multiprocessing import Process, Manager
 2  
 3 def f(d, l):
 4     d[1] = '1'
 5     d['2'] = 2
 6     d[0.25] = None
 7     l.append(1)
 8     print(l)
 9  
10 if __name__ == '__main__':
11     with Manager() as manager:
12         d = manager.dict()
13  
14         l = manager.list(range(5))
15         p_list = []
16         for i in range(10):
17             p = Process(target=f, args=(d, l))
18             p.start()
19             p_list.append(p)
20         for res in p_list:
21             res.join()
22  
23         print(d)
24         print(l)

進程鎖

當創建進程時(非使用時),共享數據會被拿到子進程中,當進程中執行完畢后,再賦值給原值。

 1 from multiprocessing import Process, Lock
 2  
 3 def f(l, i):
 4     l.acquire()
 5     try:
 6         print('hello world', i)
 7     finally:
 8         l.release()
 9  
10 if __name__ == '__main__':
11     lock = Lock()
12  
13     for num in range(10):
14         Process(target=f, args=(lock, num)).start()
進程鎖

進程池

進程池內部維護一個進程序列,當使用時,則去進程池中獲取一個進程,如果進程池序列中沒有可供使用的進進程,那么程序就會等待,直到進程池中有可用進程為止。

進程池中有兩個方法:

  • apply
  • apply_async
 1 from  multiprocessing import Process,Pool
 2 import time
 3  
 4 def Foo(i):
 5     time.sleep(2)
 6     return i+100
 7  
 8 def Bar(arg):
 9     print('-->exec done:',arg)
10  
11 pool = Pool(5)
12  
13 for i in range(10):
14     pool.apply_async(func=Foo, args=(i,),callback=Bar)
15     #pool.apply(func=Foo, args=(i,))
16  
17 print('end')
18 pool.close()
19 pool.join()#進程池中進程執行完畢后再關閉,如果注釋,那么程序直接關閉。

 

 

總結:

1.什么是進程和線程:

進程是一堆資源的集合

線程是一個指令

2.進程要操作cpu,必須要先創建一個線程,所有在同一個進程里的線程共享同一塊空間

3.進程和線程的區別:

一 線程是共享內存空間,進程的內存是獨立的

二 父進程創建兩個子進程,子進程是對父進程的克隆,兩個子進程是完全獨立的,不能互相訪問

  兩個線程共享同一個進程,同一個進程的線程可以直接交流,兩個進程想要通信,必須通過一個中間代理實現。

三 創建新線程很簡單,創建一個新的進程需要對父進程進行一次克隆

四 一個線程可以控制和操作同一個進程的其他線程,但是進程只能操作其子進程

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM