進程、線程和協程


 

一、進程

1、多任務原理

  多任務是指操作系統同時可以運行多個任務。

  • 單核CPU實現多任務原理:操作系統輪流讓各個任務交替執行;
  • 多核CPU實現多任務原理:真正的執行多任務只能在多核CPU上實現,多出來的任務輪流調度到每個核心上執行。
  • 並發:看上去一起執行,任務數多於CPU核心數;
  • 並行:真正的一起執行,任務數小於等於CPU核心數。

  實現多任務的方式:
    1、多進程模式
    2、多線程模式
    3、協程模式
    4、多進程+多線程模式

2、進程

  對於操作系統而言,一個任務就是一個進程;

  進程是系統中程序執行和資源分配的基本單元,每個進程都有自己的數據段、代碼段、堆棧段。


 

  下面是一小段程序,一個單任務的例子。在其中,有兩個輸出語句分別在在兩個不同的循環當中,單任務的執行方式,也就是最初學習時,當一個循環沒有結束的時候,無法執行到下面的程序當中。如果想要讓兩個循環可以同時在執行,就是在實現多任務,當然不是說同時輸出,而是兩個循環都在執行着。

 1 from time import sleep  2 # 只能執行到那一個循環,執行不了run,所以叫單任務
 3 def run():  4     while True:  5         print("&&&&&&&&&&&&&&&")  6         sleep(1.2)  7 
 8 if __name__ == "__main__":  9     while True: 10         print("**********") 11         sleep(1) 12     run()

  接下來啟用多任務,通過進程來實現。

  multiprocessing庫:跨平台版本的多進程模塊,提供了一個Process類來代表一個進程對象(fork僅適用於Linux)。

  下面的程序是在一個父進程中創建一個子進程,讓父進程和子進程可以都在執行,創建方式程序中已經很簡潔了。可以自己把這兩段程序復制下來運行一下,看看輸出的效果。

 1 from multiprocessing import Process  2 from time import sleep  3 import os  4 
 5 def run(str):  6     # os.getpid()獲取當前進程id號
 7     # os.getppid()獲取當前進程的父進程id號
 8     while True:  9         print("&&&&&&&&&&&&&&&%s--%s--%s" % (str, os.getpid(), os.getppid())) 10         sleep(0.5) 11 
12 if __name__ == "__main__": 13     print("主(父)進程啟動 %s" % (os.getpid())) 14     # 創建子進程
15     # target說明進程執行的任務
16     p = Process(target=run, args=("nice",)) 17     # 啟動進程
18  p.start() 19 
20     while True: 21         print("**********") 22         sleep(1)

  我想第一個單任務的程序就不必說了吧,就是一個死循環,一直沒有執行到下面的run函數。第二段程序是通過多進程實現的多任務,兩個循環都能執行到,我把結果截圖放下面,最好自己去試一下。

3、父子進程的先后順序

  上面的多進程的例子中輸出了那么多,我們使用的時候究竟是先執行哪個后執行哪個呢?根據我們的一般思維來說,我們寫的主函數其實就是父進程,在主函數中間,要調用的也就是子進程。

 

 1 from multiprocessing import Process  2 from time import sleep  3 import os  4 
 5 def run():  6     print("啟動子進程")  7     print("子進程結束")  8     sleep(3)  9 
10 if __name__ == "__main__": 11     print("父進程啟動") 12     p = Process(target=run) 13  p.start() 14 
15     # 父進程的結束不能影響子進程,讓進程等待子進程結束再執行父進程
16  p.join() 17 
18     print("父進程結束")

4、全局變量在多個進程中不能共享 

  在多進程的程序當中定義的全局變量在多個進程中是不能共享的,篇幅較長在這里就不舉例子了,可以自己試一下。這個也是和稍后要說的線程的一個區別,在線程中,變量是可以共享的,也因此衍生出一些問題,稍后再說。

5、啟動多個進程 

  在正常工作使用的時候,當然不止有有個一個兩個進程,畢竟這一兩個也起不到想要的效果。那么就需要采用更多的進程,這時候需要通過進程池來實現,就是在進程池中放好你要建立的進程,然后執行的時候,把他們都啟動起來,就可以同時進行了,在一定的環境下可以大大的提高效率。當然這個也和起初提到的有關,如果你的CPU是單核的,那么多進程也只是起到了讓幾個任務同時在執行着,並沒有提高效率,而且啟動進程的時候還要花費一些時間,因此在多核CPU當中更能發揮優勢。

  在multiprocessing中有個Pool方法,可以實現進程池。在利用進程池時可以設置要啟動幾個進程,一般情況下,它默認和你電腦的CPU核數一致,也可以自己設置,如果設置的進程數多於CPU核數,那多出來的進程會輪流調度到每個核心上執行。下面是啟動多個進程的過程。

 1 from multiprocessing import Pool  2 import os  3 import time  4 import random  5 
 6 
 7 def run(name):  8     print("子進程%s啟動--%s" % (name, os.getpid()))  9     start = time.time() 10     time.sleep(random.choice([1,2,3,4,5])) 11     end = time.time() 12     print("子進程%s結束--%s--耗時%.2f" % (name, os.getpid(), end-start)) 13 
14 if __name__ == "__main__": 15     print("啟動父進程") 16 
17     # 創建多個進程
18     # Pool 進程池 :括號里的數表示可以同時執行的進程數量
19     # Pool()默認大小是CPU核心數
20     pp = Pool(4) 21     for i in range(5): 22         # 創建進程,放入進程池,統一管理
23         pp.apply_async(run, args=(i,)) 24 
25     # 在調用join之前必須先調用close,調用close之后就不能再繼續添加新的進程了
26  pp.close() 27     # 進程池對象調用join還等待進程池中所有的子進程結束
28  pp.join() 29 
30     print("結束父進程")

6、文件拷貝(單進程與多進程對比)

(1)單進程實現

 1 from multiprocessing import Pool
 2 import time
 3 import os
 4 
 5 # 實現文件的拷貝
 6 def copyFile(rPath, wPath):
 7     fr = open(rPath, 'rb')
 8     fw = open(wPath, 'wb')
 9     context = fr.read()
10     fw.write(context)
11     fr.close()
12     fw.close()
13 
14 path = r'F:\python_note\線程、協程'
15 toPath = r'F:\python_note\test'
16 
17 # 讀取path下的所有文件
18 filesList = os.listdir(path)
19 
20 # 啟動for循環處理每一個文件
21 start = time.time()
22 for fileName in filesList:
23     copyFile(os.path.join(path,fileName), os.path.join(toPath,fileName))
24 
25 end = time.time()
26 print('總耗時:%.2f' % (end-start))
View Code

(2)多進程實現

 1 from multiprocessing import Pool
 2 import time
 3 import os
 4 
 5 # 實現文件的拷貝
 6 def copyFile(rPath, wPath):
 7     fr = open(rPath, 'rb')
 8     fw = open(wPath, 'wb')
 9     context = fr.read()
10     fw.write(context)
11     fr.close()
12     fw.close()
13 
14 path = r'F:\python_note\線程、協程'
15 toPath = r'F:\python_note\test'
16 
17 
18 if __name__ == "__main__":
19     # 讀取path下的所有文件
20     filesList = os.listdir(path)
21 
22     start = time.time()
23     pp = Pool(4)
24     for fileName in filesList:
25         pp.apply_async(copyFile, args=(os.path.join(
26             path, fileName), os.path.join(toPath, fileName)))
27     pp.close()
28     pp.join()
29     end = time.time()
30     print("總耗時:%.2f" % (end - start))
View Code

  上面兩個程序是兩種方法實現同一個目標的程序,可以將其中的文件路徑更換為你自己的路徑,可以看到最后計算出的耗時是多少。也許有人發現並不是多進程的效率就高,說的的確沒錯,因為創建進程也要花費時間,沒准啟動進程的時間遠多讓這一個核心運行所有核心用的時間要多。這個例子也只是演示一下如何使用,在大數據的任務下會有更深刻的體驗。

 7、進程對象

  我們知道Python是一個面向對象的語言。而且Python中萬物皆對象,進程也可以封裝成對象,來方便以后自己使用,只要把他封裝的足夠豐富,提供清晰的接口,以后使用時會快捷很多,這個就根據自己的需求自己可以試一下,不寫了。

 8、進程間通信

  上面提到過進程間的變量是不能共享的,那么如果有需要該怎么辦?通過隊列的方式進行傳遞。在父進程中創建隊列,然后把隊列傳到每個子進程當中,他們就可以共同對其進行操作。 

 1 from multiprocessing import Process, Queue  2 import os  3 import time  4 
 5 
 6 def write(q):  7     print("啟動寫子進程%s" % (os.getpid()))  8     for chr in ['A', 'B', 'C', 'D']:  9  q.put(chr) 10         time.sleep(1) 11     print("結束寫子進程%s" % (os.getpid())) 12 
13 def read(q): 14     print("啟動讀子進程%s" % (os.getpid())) 15     while True: 16         value = q.get() 17         print("value = "+value) 18     print("結束讀子進程%s" % (os.getpid())) 19 
20 if __name__ == "__main__": 21     # 父進程創建隊列,並傳遞給子進程
22     q = Queue() 23     pw = Process(target=write, args=(q,)) 24     pr = Process(target=read, args=(q,)) 25 
26  pw.start() 27  pr.start() 28     # 寫進程結束
29  pw.join() 30     # pr進程里是個死循環,無法等待期結束,只能強行結束
31  pr.terminate() 32     print("父進程結束")

 二、線程

1、線程

  • 在一個進程內部,要同時干多件事,就需要運行多個"子任務",我們把進程內的多個"子任務"叫做線程
  • 線程通常叫做輕型的進程,線程是共享內存空間,並發執行的多任務,每一個線程都共享一個進程的資源
  • 線程是最小的執行單元而進程由至少一個線程組成。如何調度進程和線程,完全由操作系統來決定,程序自己不能決定什么時候執行,執行多長時間

模塊:

1、_thread模塊 低級模塊(更接近底層)

2、threading模塊 高級模塊,對_thread進行了封裝

2、啟動一個線程

  同樣,先給一個多線程的例子,其中,仍然使用run函數作為其中的一個子線程,主函數為父線程。通過threading的Thread方法創建線程並開啟,join來等待子線程。

 1 import threading  2 import time  3 
 4 
 5 def run():  6     print("子線程(%s)啟動" % (threading.current_thread().name))  7 
 8     # 實現線程的功能
 9     time.sleep(1) 10     print("打印") 11     time.sleep(2) 12 
13     print("子線程(%s)結束" % (threading.current_thread().name)) 14 
15 
16 if __name__ == "__main__": 17     # 任何進程都默認會啟動一個線程,稱為主線程,主線程可以啟動新的子線程
18     # current_thread():返回線程的實例
19     print("主線程(%s)啟動" % (threading.current_thread().name)) 20 
21     # 創建子線程
22     t = threading.Thread(target=run, name="runThread") 23  t.start() 24 
25     # 等待線程結束
26  t.join() 27 
28     print("主線程(%s)結束" % (threading.current_thread().name))

3、線程間數據共享

  多線程和多進程最大的不同在於,多進程中,同一個變量,各自有一份拷貝存在每個進程中,互不影響。

  而多線程所有變量都由所有線程共享。所以任何一個變量都可以被任何一個線程修改,因此,線程之間共享數據最大的危險在於多個線程同時修改一個變量,容易把內容改亂了。

 1 import threading  2 
 3 
 4 num = 10
 5 
 6 def run(n):  7     global num  8     for i in range(10000000):  9         num = num + n 10         num = num - n 11 
12 if __name__ == "__main__": 13     t1 = threading.Thread(target=run, args=(6,)) 14     t2 = threading.Thread(target=run, args=(9,)) 15 
16  t1.start() 17  t2.start() 18  t1.join() 19  t2.join() 20 
21     print("num = ",num)

4、線程鎖

  在第三小點中已經提到了,多線程的一個缺點就是數據是共享的,如果有兩個線程正同時在修改這個數據,就會出現混亂,它自己也不知道該聽誰的了,尤其是在運算比較復雜,次數較多的時候,這種錯誤的機會會更大。

  當然,解決辦法也是有的,那就是利用線程鎖。加鎖的意思就是在其中一個線程正在對數據進行操作時,讓其他線程不得介入。這個加鎖和釋放鎖是由人來確定的。

  • 確保了這段代碼只能由一個線程從頭到尾的完整執行
  • 阻止了多線程的並發執行,要比不加鎖時候效率低。包含鎖的代碼段只能以單線程模式執行
  • 由於可以存在多個鎖,不同線程持有不同的鎖,並試圖獲取其他的鎖,可能造成死鎖導致多個線程掛起,只能靠操作系統強制終止
 1 def run(n):  2     global num  3     for i in range(10000000):  4  lock.acquire()  5         try:  6             num = num + n  7             num = num - n  8         finally:  9             # 修改完釋放鎖
10  lock.release() 11 
12 if __name__ == "__main__": 13     t1 = threading.Thread(target=run, args=(6,)) 14     t2 = threading.Thread(target=run, args=(9,)) 15 
16  t1.start() 17  t2.start() 18  t1.join() 19  t2.join() 20 
21     print("num = ",num)

  上面這段程序是循環多次num+n-n+n-n的過程,變量n分別設為6和9是在兩個不同的線程當中,程序中已經加了鎖,你可以先去掉試一下,當循環次數較小的時候也許還能正確,但次數一旦取的較高就會出現混亂。

  加鎖是在循環體當中,依次執行加減法,定義中說到確保一個線程從頭到尾的完整執行,也就是在計算途中,不會有其他的線程打擾。你可以想一下,如果一個線程執行完加法,正在執行減法,另一個線程進來了,它要先進行加法時的初始sum值該是多少呢,線程二不一定在線程一的什么時候進來,萬一剛進來時候,線程一恰好給sum賦值了,而線程二仍然用的是正准備進來時候的sum值,那從這里開始豈不已經分道揚鑣了。所以,運算的次數越多,結果會越離譜。

  這個說完了,還有一個小小的改進。你是否記得讀寫文件時候書寫的一種簡便形式,通過with來實現,可以避免我們忘記關閉文件,自動幫我們關閉。當然還有一些其他地方也用到了這個方法。這里也同樣適用。

1 # 與上面代碼功能相同,with lock可以自動上鎖與解鎖
2 with lock: 3     num = num + n 4     num = num - n

5、ThreadLocal

  • 創建一個全局的ThreadLocal對象
  • 每個線程有獨立的存儲空間
  • 每個線程對ThreadLocal對象都可以讀寫,但是互不影響

  根據名字也可以看出,也就是在本地建個連接,所有的操作在本地進行,每個線程之間沒有數據的影響。

 1 import threading  2 
 3 
 4 num = 0  5 local = threading.local()  6 
 7 def run(x, n):  8     x = x + n  9     x = x - n 10 
11 def func(n): 12     # 每個線程都有local.x
13     local.x = num 14     for i in range(10000000): 15  run(local.x, n) 16     print("%s-%d" % (threading.current_thread().name, local.x)) 17 
18 
19 if __name__ == "__main__": 20     t1 = threading.Thread(target=func, args=(6,)) 21     t2 = threading.Thread(target=func, args=(9,)) 22 
23  t1.start() 24  t2.start() 25  t1.join() 26  t2.join() 27 
28     print("num = ",num)

6、控制線程數量

 1 '''
 2 控制線程數量是指控制線程同時觸發的數量,可以拿下來這段代碼運行一下,下面啟動了5個線程,但是他們會兩個兩個的進行  3 '''
 4 import threading  5 import time  6 
 7 # 控制並發執行線程的數量
 8 sem = threading.Semaphore(2)  9 
10 def run(): 11  with sem: 12         for i in range(10): 13             print("%s---%d" % (threading.current_thread().name, i)) 14             time.sleep(1) 15 
16 
17 if __name__ == "__main__": 18     for i in range(5): 19         threading.Thread(target=run).start()

  上面的程序是有多個線程,但是每次限制同時執行的線程,通俗點說就是限制並發線程的上限;除此之外,也可以限制線程數量的下限,也就是至少達到多少個線程才能觸發。

 1 import threading  2 import time  3 
 4 
 5 # 湊夠一定數量的線程才會執行,否則一直等着
 6 bar = threading.Barrier(4)  7 
 8 def run():  9     print("%s--start" % (threading.current_thread().name)) 10     time.sleep(1) 11  bar.wait() 12     print("%s--end" % (threading.current_thread().name)) 13 
14 
15 if __name__ == "__main__": 16     for i in range(5): 17         threading.Thread(target=run).start()

7、定時線程

 1 import threading  2 
 3 
 4 def run():  5     print("***********************")  6 
 7 # 延時執行線程
 8 t = threading.Timer(5, run)  9 t.start() 10 
11 t.join() 12 print("父線程結束")

8、線程通信

 1 import threading  2 import time  3 
 4 
 5 def func():  6     # 事件對象
 7     event = threading.Event()  8     def run():  9         for i in range(5): 10             # 阻塞,等待事件的觸發
11  event.wait() 12             # 重置阻塞,使后面繼續阻塞
13  event.clear() 14             print("**************") 15     t = threading.Thread(target=run).start() 16     return event 17 
18 e = func() 19 
20 # 觸發事件
21 for i in range(5): 22     time.sleep(2) 23     e.set()

9、一個小栗子

  這個例子是用了生產者和消費者來模擬,要進行數據通信,還引入了隊列。先來理解一下。

 1 import threading  2 import queue  3 import time  4 import random  5 
 6 
 7 # 生產者
 8 def product(id, q):  9     while True: 10         num = random.randint(0, 10000) 11  q.put(num) 12         print("生產者%d生產了%d數據放入了隊列" % (id, num)) 13         time.sleep(3) 14     # 任務完成
15  q.task_done() 16 
17 # 消費者
18 def customer(id, q): 19     while True: 20         item = q.get() 21         if item is None: 22             break
23         print("消費者%d消費了%d數據" % (id, item)) 24         time.sleep(2) 25     # 任務完成
26  q.task_done() 27 
28 
29 if __name__ == "__main__": 30     # 消息隊列
31     q = queue.Queue() 32 
33     # 啟動生產者
34     for i in range(4): 35         threading.Thread(target=product, args=(i, q)).start() 36 
37     # 啟動消費者
38     for i in range(3): 39         threading.Thread(target=customer, args=(i, q)).start()

10、線程調度

 1 import threading  2 import time  3 
 4 
 5 # 線程條件變量
 6 cond = threading.Condition()  7 
 8 
 9 def run(): 10  with cond: 11         for i in range(0, 10, 2): 12             print(threading.current_thread().name, i) 13             time.sleep(1) 14             cond.wait()  # 阻塞
15             cond.notify()  # 告訴另一個線程可以執行
16 
17 
18 def run2(): 19  with cond: 20         for i in range(1, 10, 2): 21             print(threading.current_thread().name, i) 22             time.sleep(1) 23  cond.notify() 24  cond.wait() 25 
26 
27 threading.Thread(target=run).start() 28 threading.Thread(target=run2).start()

三、協程

1、協程

  • 子程序/子函數:在所有語言中都是層級調用,比如A調用B,在B執行的工程中又可以調用C,C執行完畢返回,B執行完畢返回最后是A執行完畢。是通過棧實現的,一個線程就是一個子程序,子程序調用總是一個入口,一次返回,調用的順序是明確的
  • 協程:看上去也是子程序,但執行過程中,在子程序的內部可中斷,然后轉而執行別的子程序,不是函數調用,有點類似CPU中斷
 1 # 這是一個子程序的調用
 2 def C():  3     print("C--start")  4     print("C--end")  5 
 6 def B():  7     print("B--start")  8  C()  9     print("B--end") 10 
11 def A(): 12     print("A--start") 13  B() 14     print("A--end") 15 
16 A()
  • 協程與子程序調用的結果類似,但不是通過在函數中調用另一個函數
  • 協程執行起來有點像線程,但協程的特點在於是一個線程
  • 與線程相比的優點:協程的執行效率極高,因為只有一個線程,也不存在同時寫變量的沖突,在協程中共享資源不加鎖,只需要判斷狀態

2、協程的原理

 1 # python對協程的支持是通過generator實現的
 2 def run():  3     print(1)  4     yield 10
 5     print(2)  6     yield 20
 7     print(3)  8     yield 30
 9 
10 # 協程的最簡單風格,控制函數的階段執行,節約線程或者進程的切換
11 # 返回值是一個生成器
12 m = run() 13 print(next(m)) 14 print(next(m)) 15 print(next(m))

3、數據傳輸

 1 # python對協程的支持是通過generator實現的
 2 def run():  3     print(1)  4     yield 10
 5     print(2)  6     yield 20
 7     print(3)  8     yield 30
 9 
10 # 協程的最簡單風格,控制函數的階段執行,節約線程或者進程的切換
11 # 返回值是一個生成器
12 m = run() 13 print(next(m)) 14 print(next(m)) 15 print(next(m))

4、小栗子

 1 def product(c):  2  c.send(None)  3     for i in range(5):  4         print("生產者產生數據%d" % (i))  5         r = c.send(str(i))  6         print("消費者消費了數據%s" % (r))  7  c.close()  8 
 9 
10 def customer(): 11     data = ""
12     while True: 13         n = yield data 14         if not n: 15             return
16         print("消費者消費了%s" % (n)) 17         data = "200"
18 
19 
20 c = customer() 21 product(c)

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM