線程、進程、協程和隊列


線程、進程、協程和隊列

一.線程、進程

1.簡述

  • 進程是一個具有一定獨立功能的程序關於某個數據集合的一次運行活動。它是操作系統動態執行的基本單元,通俗講就是自定義一段程序的執行過程,即一個正在運行的程序。線程是進程的基本單位,又稱為輕量級進程。 * 不同的進程在內存中會開辟獨立的地址空間,默認進程之間的數據是不共享,線程是由進程創建,所以處在同一個進程中的所有線程都可以訪問該進程所包含的地址空間,當然也包含存儲在該空間中的所有資源。
  • 應用場景:

    IO密集型操作由於不占用CPU資源,所以一般使用線程來完成
    計算密集型操作靠cpu,所以一般使用進程來完成

  • 為什么使用多線程或多進程?
    多線程和多進程可以提供程序的並發處理能力。看下面需求:
    現在有10台主機,現在需要監控主機的存過狀態,默認使用單線程,如下:

復制代碼
import time


st = time.time()           #程序開始時間
def f1(arg):
    time.sleep(2)            #假設ping一次需要2s
    print("ping %s主機中..." % arg)
host_List = [0,1,2,3,4,5,6,7,8,9,]   #假設列表中1233。。表示10個主機
for i in host_List:
    f1(i)

cost_time = time.time() - st
print('程序耗時:%s' % cost_time)
復制代碼

 

程序運行結果:

復制代碼
ping 0主機中...
ping 1主機中...
ping 2主機中...
ping 3主機中...
ping 4主機中...
ping 5主機中...
ping 6主機中...
ping 7主機中...
ping 8主機中...
ping 9主機中...
程序耗時:20.002294063568115
復制代碼

 

發現耗時20s,這僅僅是10台機器,如果100台呢,效率會非常低。假如用了多線程呢?

復制代碼
import threading
import time


st = time.time()           #程序開始時間
def f1(arg):
    time.sleep(2)            #假設ping一次需要2s
    print("ping %s主機中..." % arg)
host_List = [0,1,2,3,4,5,6,7,8,9,]   #假設列表中1233。。表示10個主機
for i in host_List:
    t = threading.Thread(target=f1, args=(i,))
    t.start()
t.join()
cost_time = time.time() - st
print('程序耗時:%s' % cost_time)
復制代碼

 

運行結果:
復制代碼
ping 0主機中...
ping 1主機中...
ping 5主機中...
ping 4主機中...
ping 2主機中...
ping 3主機中...
ping 7主機中...
ping 6主機中...
ping 8主機中...
ping 9主機中...
程序耗時:2.002915382385254
復制代碼

 

從結果中看出,10個機器啟用10個線程並發去獨立ping,這樣耗時僅僅是單線程的耗時,效率大大提供。所以多進程多線程一般用來提高並發

2.線程進程的基本操作

創建

  • 線程

    • 創建方法
    復制代碼
    import threading
    import time
    
    def f1(args):
      time.sleep(2)
      print(args)
    
    #方式1 直接使用thread模塊進行創建
    for i in range(10):
      t = threading.Thread(target=f1,args=(123,))   #target是要執行的任務(函數),args是任務(函數)的參數
      t.start()
    
    #方式2 使用自定義類創建
    
    class Mythread(threading.Thread):
      def __init__(self,func,args):
          self.func = func
          self.args = args
          super(Mythread,self).__init__()
      def run(self):
          self.func(self.args)
    
    obj = Mythread(f1,123)
    obj.start()
    復制代碼

     

    上述代碼創建了10個“前台”線程,然后控制器就交給了CPU,CPU根據指定算法進行調度,分片執行指令

    • 線程的其他方法

      • start 線程准備就緒
      • setName 為線程設置名稱
      • getName 獲取線程名稱
      • setDaemon 設置為后台線程或前台線程(默認),注意需要設置在start前 如果是后台線程,主線程執行過程中,后台線程也在進行,主線程執行完畢后,后台線程不論成功與否,均停止 如果是前台線程,主線程執行過程中,前台線程也在進行,主線程執行完畢后,等待前台線程也執行完成后,程序停止
      • join 放在for循環內表示逐個執行每個線程,執行完畢后繼續往下執行,該方法使得多線程變得無意義,放在for循環外,會阻塞主進程,這樣主進程會等待線程執行完之后,再去繼續執行下面的代碼
      • run 線程被cpu調度后自動執行線程對象的run方法,這也是線程第二種創建方法的原理
    • 方法具體使用

      • 使用setDaemon

        默認不使用setDaemon 情況

        前台進程

       

      使用setDaemon 情況:

        后台進程

       

      • 使用join

        默認不使用join的話,子線程會並發執行

        不使用join

       

      阻塞子線程

        在for循環內

       

      阻塞主線程

        在for循環外

       

  • 進程

    • 創建方法

    進程的創建方法和線程類似

    復制代碼
    import multiprocessing
    import time
    
    def f1(args):
        time.sleep(2)
        print("進程:",args)
    
    #方法1
    
    for i in range(10):
        t = multiprocessing.Process(target=f1,args=(i,))
        t.start()
    
    #方法2:
    
    class Myprocess(multiprocessing.Process):
        def __init__(self,func,args):
            self.func = func
            self.args = args
            super(Myprocess,self).__init__()
        def run(self):
            self.func(self.args)
    復制代碼

     

    • 其它方法 進程同樣支持 join(),setDaemon(),run(),setName(),getName()等方法,和線程的使用一樣,不再贅述
    • 方法使用

      參考線程使用

線程鎖(Lock、RLock)

由於線程之間是進行隨機調度,並且每個線程可能只執行n條執行之后,當多個線程同時修改同一條數據時可能會出現臟數據,所以,出現了線程鎖 - 同一時刻允許一個線程執行操作。

  • 未使用線程鎖:
復制代碼
import threading
import time

NUM = 10


def f1(arg):
    global NUM

    NUM -= 1            #讓每個線程執行時,將NUM的值減去1
    time.sleep(2)
    print(NUM)


for i in range(10):
    t = threading.Thread(target=f1,args=(123,))
    t.start()
復制代碼

執行效果:發現每個線程同時都在操作NUM,最后打印的結果都是0

復制代碼
0
0
0
0
0
0
0
0
0
0
復制代碼

 

  • 使用線程鎖

使用線程鎖,當一個線程開始處理事務A時,先在事務A上把鎖,然后開始
處理事務A,處理完程之后,再解鎖。其他進程遇到線程鎖,則處於等待中
直到有線程解鎖了該事務

復制代碼
import threading
import time

NUM = 10


def f1(arg):
    global NUM
    arg.acquire()   #阻塞后面的線程

    NUM -= 1
    time.sleep(2)
    print(NUM)
    arg.release()       #放開后面的線程


lock = threading.Lock()
for i in range(10):
    t = threading.Thread(target=f1,args=(lock,))
    t.start()
復制代碼

 

執行效果:發現線程是逐步操作NUM的

復制代碼
9
8
7
6
5
4
3
2
1
0
復制代碼

 

此有別於join()方法,join是在線程從開始執行的時候,按照單線程依次執行,也就意味着所有的任務都是單線程執行,而線程鎖是針對執行的任務進行上鎖,解鎖

  • Rlock和lock的區別

Rlock支持遞歸上鎖,解鎖,lock只支持單個上鎖解鎖

復制代碼
import threading
import time

NUM = 10


def f1(arg,lock):
    global NUM
    print('線程:',arg,'執行1')
    lock.acquire()   #阻塞后面的線程
    NUM -= 1

    lock.acquire()    #繼續上鎖
    time.sleep(2)       #sleep 2秒
    print('線程:',arg,time.time())      #打印當前時間戳
    lock.release()          #解鎖
    print('線程執行結果:',arg,NUM)

    lock.release()       #放開后面的線程
    print(123)


lock = threading.RLock()
for i in range(10):
    t = threading.Thread(target=f1,args=(i,lock,))
    t.start()
復制代碼

 

執行結果:

  View Code

 

信號量(Semaphore)

線程鎖 同時只允許一個線程更改數據,而Semaphore是同時允許一定數量的線程更改數據 ,比如廁所有3個坑,那最多只允許3個人上廁所,后面的人只能等里面有人出來了才能再進去。

復制代碼
import threading
import time
def f1(arg,lock):
    lock.acquire()
    print('線程:',arg)
    print(time.time())
    lock.release()
lock = threading.BoundedSemaphore(5)    #5表示最多同時運行5個線程

for i in range(10):
    t = threading.Thread(target=f1,args=(i,lock,))
    t.start()
復制代碼

 

執行結果:發現每5個線程的執行時間戳是一樣的。小數點后微秒可忽略

  View Code

 

事件(event)

python線程的事件用於主線程控制其他線程的執行,一個線程發送/傳遞事件,另外的線程等待事件的觸發事件。主要提供了三個方法 set、wait、clear

事件處理的機制:全局定義了一個“Flag”,如果“Flag”值為 False,那么阻塞線程執行,如果“Flag”值為True,那么線程繼續執行。
wait()方法:當事件標志為False時將阻塞線程,當事件標志為True時,什么也不做
set()方法:它設置事件標志為True,並且喚醒其他線程。條件鎖對象保護程序修改事件標志狀態的關鍵部分
clear()方法正好相反,它設置時間標志為False

復制代碼
import threading
import time

def f1(arg,e):
    print('線程:',arg)
    e.wait()            #阻塞線程
    print('線程繼續執行:',arg)

event = threading.Event()

for i in range(3):
    t = threading.Thread(target=f1,args=(i,event))
    t.start()

event.clear()

res = input('>>')
if res == '1':
    event.set()   #放開 線程
復制代碼

 

執行結果:

復制代碼
線程: 0
線程: 1
線程: 2
>>1     #輸入1,觸發線程繼續執行的信號
線程繼續執行: 0
線程繼續執行: 1
線程繼續執行: 2
復制代碼

 

條件(Condition)

使得線程等待,條件是針對單個線程的,條件成立,則不再阻塞線程,
條件不成立,一直阻塞

復制代碼
import threading
import time
def condition_func():

    ret = False
    inp = input('>>>')
    if inp == '1':
        ret = True

    return ret


def run(n):
    con.acquire()
    con.wait_for(condition_func)   #當condition_func返回值為真時,觸發線程繼續運行
    print("run the thread: %s" %n)
    con.release()

if __name__ == '__main__':

    con = threading.Condition()
    for i in range(3):
        t = threading.Thread(target=run, args=(i,))
        t.start()
復制代碼

 

運行效果:

>>>1    #每次手動輸入一個1,觸發一個線程運行
run the thread: 0
>>>1
run the thread: 1
>>>1
run the thread: 2

 

Timer

定時器,指定n秒后執行某操作

復制代碼
from threading import Timer
 
 
def hello():
    print("hello, world")
 
t = Timer(1, hello)
t.start()  # after 1 seconds, "hello, world" will be printed
復制代碼

 

進程和線程一樣,同樣支持進程鎖、信號量、事件、條件、timer用法一摸一樣,可參考線程使用方法,不在贅述

3.進程之間數據共享

由於不同的進程會有各自的內存地址空間,所以進程之間的數據默認是不能共享的

  • 運行結果
復制代碼
from multiprocessing import Process
import time

li = []

def foo(i):
    li.append(i)
    print('say hi',li)

for i in range(10):
    p = Process(target=foo,args=(i,))
    p.start()

time.sleep(1)
print('ending',li)
復制代碼

 

運行結果:發現每個子進程都獨立操作li列表

復制代碼
say hi [0]
say hi [1]
say hi [2]
say hi [3]
say hi [4]
say hi [5]
say hi [6]
say hi [7]
say hi [8]
say hi [9]
ending []
復制代碼

 

  • 實現進程之間數據共享

    • 方法1:
      Array方法

     

    • 方法2
      Dict方法

     

    • 方法3:
  queue方法

 

4.python內部隊列Queue

隊列(queue)是一種具有先進先出特征的線性數據結構,元素的增加只能在一端進行,元素的刪除只能在另一端進行。能夠增加元素的隊列一端稱為隊尾,可以刪除元素的隊列一端則稱為隊首。python內部支持一套輕量級queue隊列

  • queue隊列的方法:
    • Queue(maxsize=0) 先進先出隊列,maxsize表示隊列元素數量,0表示無限
    • LifoQueue(maxsize=0) 后進先出隊列
    • PriorityQueue(maxsize=0) 優先級隊列,優先級值越小,優先級越高
    • deque(maxsize=0) 雙向隊列
    • empty() 判斷隊列是否為空,為空時返回True,否則為False
    • full() 判斷隊列是否已滿,滿時返回True,否則為False
    • put(item,[block[,timeout]] 在隊尾插入一個項目。參數item為必需的,為插入項目的值;第二個block為可選參數,默認為True,表示當前隊列滿時,put()方法就使調用線程暫停,直到空出一個數據單元。如果block為False,put方法將引發Full異常
    • get() 從隊頭刪除並返回一個項目。可選參數為block,默認為True。如果隊列為空且block為True,get()就使調用線程暫停,直至有項目可用。如果隊列為空且block為False,隊列將引發Empty異常。
    • qsize() 返回隊列長度
    • clear() 清空隊列
    • join() 等到隊列為空(即隊列中所有的項都被取走,處理完畢),再執行別的操作
    • task_done() 在完成一項工作之后,Queue.task_done()函數向任務已經完成的隊列發送一個信號. 注意:在多線程下,注意task_done的位置,每次執行task_done(),unfinished_tasks就減1,應該在一切搞定后,再執行task_done.

隊列支持下面的四種情況:

  • 先進先出隊列
復制代碼
import queue

q = queue.Queue(4)  #創建隊列  容量為4

q.put(123)      #往隊列中插值
q.put(431)


print(q.maxsize)    #隊列容量
print(q.qsize())    #隊列目前元素的容量

print(q.get())    #隊列取值
print(q.get())
復制代碼

 

執行效果:

4
2
123
431

 

先進先出原則第一次存放的是123,第二次存放的是431,那么我們在獲取值得時候,第一次獲取的就是123,第二次就是431
如果隊列滿之后,再put 或者隊列為空時,再get,進程就就掛在哪里,put會等待,直到隊列中有空間之后才能put成功,get會等待,直到隊列中有元素之后,才能獲取到值,如果不需要等待,可以通過設置block=False來拋出異常,使用try捕捉異常

復制代碼
import queue

q = queue.Queue(5)

for i in range(8):
    try:
        q.put(i,block=False)
        print(i,'已提交隊列')
    except:
        print('隊列已滿')

for i in range(8):
    try:
        res = q.get(block=False)
        print('從隊列取出:',res)
    except:
        print('隊列已空')
復制代碼

 

效果:

  View Code

 

  • 后進先出
復制代碼
import queue
q = queue.LifoQueue()
q.put(123)
q.put(456)
print(q.get())
print(q.get())
輸出結果:

456
123
復制代碼

 

  • 根據優先級處理
復制代碼
import queue
q = queue.PriorityQueue()   #根據優先級處理
q.put((2,"alex1"))
q.put((1,"alex2"))
q.put((3,"alex3"))
print(q.get())
print(q.get())
print(q.get())
輸出結果:

(1, 'alex2')
(2, 'alex1')
(3, 'alex3')
復制代碼

 

  • 雙向隊列
復制代碼
q = queue.deque()          #雙向隊列
q.append((123))         #右邊進
q.append(234)
print(q.pop())          #右邊出,后進先出
print(q.pop())


q.appendleft(555)       #左邊進
q.appendleft(666)
print(q.popleft())         #左邊出,后進先出
print(q.popleft())
輸出效果:

234
123
666
555
復制代碼

 

5.支持並發的兩種模型

生產者消費者模型

生產者消費者模型是通過平衡生產線程和消費線程的工作能力來提高程序的整體處理數據的速度

  • 為什么要使用生產者和消費者模式

在線程世界里,生產者就是生產數據的線程,消費者就是消費數據的線程。在多線程開發當中,如果生產者處理速度很快,而消費者處理速度很慢,那么生產者就必須等待消費者處理完,才能繼續生產數據。同樣的道理,如果消費者的處理能力大於生產者,那么消費者就必須等待生產者。為了解決這個問題於是引入了生產者和消費者模式。

  • 什么是生產者消費者模式

生產者消費者模式是通過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通訊,而通過阻塞隊列來進行通訊,所以生產者生產完數據之后不用等待消費者處理,直接扔給阻塞隊列,消費者不找生產者要數據,而是直接從阻塞隊列里取,阻塞隊列就相當於一個緩沖區,平衡了生產者和消費者的處理能力。

這個阻塞隊列就是用來給生產者和消費者解耦的

  • 實現:
復制代碼
import queue
import threading
import time



q = queue.Queue(30)   #創建一個隊列,用戶生產者和消費者通訊

#模擬訂單創建處理


def product(arg):        #生產者  創建訂單
    while True:
        q.put("訂單" + str(arg))
        print(arg,"創建訂單")


def cost(arg):          #消費者,處理訂單
    while True:
        print(arg , "處理:" ,q.get())
        time.sleep(2)    #sleep 2秒表示 消費者處理需要2s

#創建生產者線程
for i in range(3):
    t = threading.Thread(target=product,args=(i,))
    t.start()

#創建消費者線程
for c in range(10):
    t = threading.Thread(target=cost,args=(c,))
    t.start()
復制代碼

 

效果:

  View Code

 

訂閱者模型

待講

6.線程池和進程池

線程池

提高並發並不是線程越多越好,每個系統對於線程的數量都有一個臨界值,線程數量超過該臨界值后,反而會降低系統性能。線程的上下文切換,遇到大量線程,也就很耗時,所以線程池的定義就是定義一組線程,用於處理當前的事務,線程處理完當前事務后,在繼續處理其它事務。當事務超過線程池的處理能力,事務則等待出現空閑線程。線程池的線程數量也是可以根據系統性能調節額
python中沒有線程池的機制,即使是python3中提供了該機制,也很low,所以進程池一般需要自己定義
* 簡單實現線程池

利用隊列,事先將創建的線程放在隊列中,有事務需要執行時,從隊列中取出一個線程進行執行,執行完之后自動再往隊列中添加一個線程,實現隊列中的線程 終止一個,創建一個

  Low版

 

  • 升級版線程池 簡單的線程池,只能實現可控的線程數量,實現處理多個事物,但是其中還是存在很多問題,如1.線程不重用,線程執行完之后,線程就死掉了,最終被垃圾回收機制處理,我們需要重新創建線程數量來填補隊列。2.線程數量是固定的,當事務數量小於線程數量時,多數線程處於等待狀態,造成線程浪費。下面將完美實現線程池
  終極版

 

進程池

python中提哦那個了進程池的概念,可以直接使用

  • apply
復制代碼
from  multiprocessing import Process,Pool
import time

def f1(arg):
    time.sleep(2)
    print(arg+100,time.time())

pool = Pool(5)    #創建進程池
for i in range(8):
    pool.apply(func=f1, args=(i,))     #給進程指定任務
print('end')
復制代碼

 

輸出效果:發現進程是依次執行,沒有並發的效果

復制代碼
100 1469023658.397264
101 1469023660.400479
102 1469023662.40297
103 1469023664.408063
104 1469023666.410736
105 1469023668.413439
106 1469023670.414941
107 1469023672.417192
end
復制代碼

 

  • apply_sync
復制代碼
from  multiprocessing import Process,Pool
import time

def f1(arg):
    time.sleep(2)
    print(arg+100,time.time())

pool = Pool(5)    #創建進程池
for i in range(8):
    pool.apply_async(func=f1, args=(i,))     #給進程指定任務

print('end')
time.sleep(2)      #主進程等代2s
復制代碼

 

執行效果:發現事務沒有執行完,主進程終止了子進程

end
100 1469023994.189458
102 1469023994.189459
101 1469023994.189458
103 1469023994.189688
104 1469023994.190085

 

改進;

復制代碼
from  multiprocessing import Process,Pool
import time


def f1(arg):
    time.sleep(2)
    print(arg+100,time.time())



pool = Pool(5)    #創建進程池


for i in range(8):
    pool.apply_async(func=f1, args=(i,))     #給進程指定任務


print('end')
pool.close()        #所有任務執行完畢后基礎

time.sleep(2)
#pool.terminate()    #子進程立即終止
pool.join()  #進程池中進程執行完畢后再關閉,如果注釋,那么程序直接關閉。

print(123)
復制代碼

 

執行效果:

復制代碼
end
100 1469024133.797895
102 1469024133.797907
101 1469024133.797895
104 1469024133.7983
103 1469024133.797907
107 1469024135.800982
106 1469024135.800983
105 1469024135.801028
123
復制代碼

 

二.協程

1.簡介

線程和進程的操作是由程序觸發系統接口,最后的執行者是系統;協程的操作則是程序員。

協程存在的意義:對於多線程應用,CPU通過切片的方式來切換線程間的執行,線程切換時需要耗時(保存狀態,下次繼續)。協程,則只使用一個線程,在一個線程中規定某個代碼塊執行順序。
協成的原理:利用一個線程,分解一個線程成為多個微線程,注意此是從程序級別來分解的
協程的適用場景:當程序中存在大量不需要CPU的操作時(IO),適用於協程;

2.如何實現

greenlet和gevent需要手動安裝模塊。直接安裝gevent默認會把greenlet裝上
* 基於底層greenlet實現

復制代碼
from greenlet import greenlet


def test1():
    print(12)
    gr2.switch()  
    print(34)
    gr2.switch()


def test2():
    print(56)
    gr1.switch()
    print(78)

gr1 = greenlet(test1)
gr2 = greenlet(test2)
gr1.switch()
復制代碼

 

執行過程:解釋器從上倒下讀代碼后,讀到gr1.switch()時,開始執行gr1對應的test1函數,test1函數執行完print(12)后遇到gr2.swith(),會自動執行gr2的test2,test2函數中執行完print(56)遇到gr1.switch(),會繼續執行test1的 print(34),最后遇到gr2.switch(),會執行test2的print(78)
輸出效果:

12
56
34
78

 

  • 使用gevent實現
復制代碼
import gevent

def foo():
    print('Running in foo')
    gevent.sleep(0)
    print('Explicit context switch to foo again')

def bar():
    print('Explicit context to bar')
    gevent.sleep(0)
    print('Implicit context switch back to bar')

gevent.joinall([
    gevent.spawn(foo),
    gevent.spawn(bar),
])
復制代碼

 

效果:

Running in foo
Explicit context to bar
Explicit context switch to foo again
Implicit context switch back to bar

 

  • 舉例
復制代碼
from gevent import monkey; monkey.patch_all()
import gevent
import requests

def f(url):
    print('GET: %s' % url)
    resp = requests.get(url)
    data = resp.text
    print('%d bytes received from %s.' % (len(data), url))

gevent.joinall([
        gevent.spawn(f, 'https://www.python.org/'),
        gevent.spawn(f, 'https://www.yahoo.com/'),
        gevent.spawn(f, 'https://github.com/'),
])
復制代碼

 

效果:

GET: https://www.python.org/
GET: https://www.yahoo.com/
GET: https://github.com/
431218 bytes received from https://www.yahoo.com/.
25529 bytes received from https://github.com/.
47394 bytes received from https://www.python.org/.

 

生命可貴、遠離shell|人生苦短、我用python


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM