flask之分析線程和協程


flask之分析線程和協程

01 思考:每個請求之間的關系

我們每一個請求進來的時候都開一個進程肯定不合理,那么如果每一個請求進來都是串行的,那么根本實現不了並發,所以我們假定每一個請求進來使用的是線程。

那么線程中數據互相不隔離,存在修改數據的時候數據不安全的問題。

假定我們的需求是,每個線程都要設置值,並且該線程打印該線程修改的值。

from threading import Thread,current_thread
import time

class Foo(object):
    def __init__(self):
        self.name = 0

locals_values = Foo()

def func(num):
    locals_values.name = num
    time.sleep(2)             # 取出該線程的名字
    print(locals_values.name, current_thread().name)

for i in range(10):
                                    # 設置該線程的名字
    t = Thread(target=func,args=(i,),name='線程%s'%i)
    t.start()

很明顯阻塞了2秒的時間所有的線程都完成了修改值,而2秒后所有的線程打印出來的時候都是9了,就產生了數據不安全的問題。

1552975111079

所以我們要解決這種線程不安全的問題,有如下兩種解決方案。

  • 方案一:是加鎖

  • 方案二:使用threading.local對象把要修改的數據復制一份,使得每個數據互不影響。

    我們要實現的並發是多個請求實現並發,而不是純粹的只是修改一個數據,所以第二種思路更適合做我們每個請求的並發,把每個請求對象的內容都復制一份讓其互相不影響。

    詳解:為什么不用加鎖的思路?加鎖的思路是多個線程要真正實現共用一個數據,並且該線程修改了數據之后會影響到其他線程,更適合類似於12306搶票的應用場景,而我們是要做請求對象的並發,想要實現的是該線程對於請求對象這部分內容有任何修改並不影響其他線程。所以使用方案二

02 threading.local

多個線程修改同一個數據,復制多份數據給每個線程用,為每個線程開辟一塊空間進行數據存儲

實例:

from threading import Thread,current_thread,local
import time

locals_values = local()
# 可以簡單理解為,識別到新的線程的時候,都會開辟一片新的內存空間,相當於每個線程對該值進行了拷貝。

def func(num):
    locals_values.name = num
    time.sleep(2)
    print(locals_values.name, current_thread().name)

for i in range(10):
    t = Thread(target=func,args=(i,),name='線程%s'%i)
    t.start()

1552976228090

如上通過threading.local實例化的對象,實現了多線程修改同一個數據,每個線程都復制了一份數據,並且修改的也都是自己的數據。達到了我們想要的效果。

03 通過字典自定義threading.local

實例:

from threading import get_ident,Thread,current_thread
# get_ident()可以獲取每個線程的唯一標記,
import time

class Local(object):
    storage = {}# 初始化一個字典
    get_ident = get_ident # 拿到get_ident的地址
    def set(self,k,v):
        ident =self.get_ident()# 獲取當前線程的唯一標記
        origin = self.storage.get(ident)
        if not origin:
            origin={}
        origin[k] = v
        self.storage[ident] = origin
    def get(self,k):
        ident = self.get_ident() # 獲取當前線程的唯一標記
        v= self.storage[ident].get(k)
        return v

locals_values = Local()
def func(num):
    # get_ident() 獲取當前線程的唯一標記
    locals_values.set('KEY',num)
    time.sleep(2)
    print(locals_values.get('KEY'),current_thread().name)

for i in range(10):
    t = Thread(target=func,args=(i,),name='線程%s'%i)
    t.start()

講解:

利用get_ident()獲取每個線程的唯一標記作為鍵,然后組織一個字典storage。

:{線程1的唯一標記:,線程2的唯一標記:.......}

 {
    15088: {'KEY': 0}, 
    8856: {'KEY': 1},
    17052: {'KEY': 2}, 
    8836: {'KEY': 3}, 
    13832: {'KEY': 4}, 
    15504: {'KEY': 5}, 
    16588: {'KEY': 6}, 
    5164: {'KEY': 7}, 
    560: {'KEY': 8}, 
    1812: {'KEY': 9}
                    }

運行效果

1552981453617

04 通過setattr和getattr實現自定義threthreading.local

實例

from threading import get_ident,Thread,current_thread
# get_ident()可以獲取每個線程的唯一標記,
import time

class Local(object):
    storage = {}# 初始化一個字典
    get_ident = get_ident # 拿到get_ident的地址

    def __setattr__(self, k, v):
        ident =self.get_ident()# 獲取當前線程的唯一標記
        origin = self.storage.get(ident)
        if not origin:
            origin={}
        origin[k] = v
        self.storage[ident] = origin
    def __getattr__(self, k):
        ident = self.get_ident() # 獲取當前線程的唯一標記
        v= self.storage[ident].get(k)
        return v

locals_values = Local()
def func(num):
    # get_ident() 獲取當前線程的唯一標記
    locals_values.KEY=num
    time.sleep(2)
    print(locals_values.KEY,current_thread().name)

for i in range(10):
    t = Thread(target=func,args=(i,),name='線程%s'%i)
    t.start()

05 每個對象有自己的存儲空間(字典)

我們可以自定義實現了threading.local的功能,但是現在存在一個問題,如果我們想生成多個Local對象,但是會導致多個Local對象所管理的線程設置的內容都放到了類屬性storage = 里面,所以我們如果想實現每一個Local對象所對應的線程設置的內容都放到自己的storage里面,就需要重新設計代碼。

實例:

from threading import get_ident,Thread,current_thread
# get_ident()可以獲取每個線程的唯一標記,
import time

class Local(object):
    def __init__(self):
        # 千萬不要按照注釋里這么寫,否則會造成遞歸死循環,死循環在__getattr__中,不理解的話可以全程使用debug測試。
        # self.storage = {}
        # self.get_ident =get_ident
        object.__setattr__(self,"storage",{})
        object.__setattr__(self,"get_ident",get_ident) # 借用父類設置對象的屬性,避免遞歸死循環。

    def __setattr__(self, k, v):
        ident =self.get_ident() # 獲取當前線程的唯一標記
        origin = self.storage.get(ident)
        if not origin:
            origin={}
        origin[k] = v
        self.storage[ident] = origin
    def __getattr__(self, k):
        ident = self.get_ident() # 獲取當前線程的唯一標記
        v= self.storage[ident].get(k)
        return v

locals_values = Local()
locals_values2 = Local()
def func(num):
    # get_ident() 獲取當前線程的唯一標記
    # locals_values.set('KEY',num)
    locals_values.KEY=num
    time.sleep(2)
    print(locals_values.KEY,current_thread().name)
    # print('locals_values2.storage:',locals_values2.storage) # 查看locals_values2.storage的私有的storage

for i in range(10):
    t = Thread(target=func,args=(i,),name='線程%s'%i)
    t.start()

顯示效果我就不做演示了,和前幾個案例演示效果一樣。

06 如果是你會如何設計flask的請求並發?

  • 情況一:單進程單線程,基於全局變量就可以做

  • 情況二:單進程多線程,基於threading.local對象做

  • 情況三:單進程多線程多協程,如何做?

    提示:協程屬於應用級別的,協程會替代操作系統自動切換遇到 IO的任務或者運行級別低的任務,而應用級別的切換速度遠高於操作系統的切換

    當然如果是自己來設計框架,為了提升程序的並發性能,一定是上訴的情況三,不光考慮多線程並且要多協程,那么該如何設計呢?

    在我們的flask中為了這種並發需求,依賴於底層的werkzeug外部包,werkzeug實現了保證多線程和多協程的安全,werkzeug基本的設計理念和上一個案例一致,唯一的區別就是在導入的時候做了一步處理,且看werkzeug源碼。

    werkzeug.local.py部分源碼

    ...
    
    try:
        from greenlet import getcurrent as get_ident # 拿到攜程的唯一標識
    except ImportError:
        try:
            from thread import get_ident #線程的唯一標識
        except ImportError:
            from _thread import get_ident
    
    class Local(object):
        ...
    
        def __init__(self):
            object.__setattr__(self, '__storage__', {})
            object.__setattr__(self, '__ident_func__', get_ident)
    
          ...
    
        def __getattr__(self, name):
            try:
                return self.__storage__[self.__ident_func__()][name]
            except KeyError:
                raise AttributeError(name)
    
        def __setattr__(self, name, value):
            ident = self.__ident_func__()
            storage = self.__storage__
            try:
                storage[ident][name] = value
            except KeyError:
                storage[ident] = {name: value}
    

    分析:

    原理就是在最開始導入線程和協程的唯一標識的時候統一命名為get_ident,並且先導入協程模塊的時候如果報錯說明不支持協程,就會去導入線程的get_ident,這樣無論是只有線程運行還是協程運行都可以獲取唯一標識,並且把這個標識的線程或協程需要設置的內容都分類存放於__storage__字典中。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM