Blinker 是一個基於Python的強大的信號庫,它既支持簡單的對象到對象通信,也支持針對多個對象進行組播。Flask的信號機制就是基於它建立的。
Blinker的內核雖然小巧,但是功能卻非常強大,它支持以下特性:
- 支持注冊全局命名信號
- 支持匿名信號
- 支持自定義命名信號
- 支持與接收者之間的持久連接與短暫連接
- 通過弱引用實現與接收者之間的自動斷開連接
- 支持發送任意大小的數據
- 支持收集信號接收者的返回值
- 線程安全
創建信號
信號通過signal()
方法進行創建:
>>> from blinker import signal >>> initialized = signal("initialized") >>> initialized is signal("initialized") True
每次調用signal('name')
都會返回同一個信號對象。因此這里signal()
方法使用了單例模式。
訂閱信號
使用Signal.connect()
方法注冊一個函數,每當觸發信號的時候,就會調用該函數。該函數以觸發信號的對象作為參數,這個函數其實就是信號訂閱者。
>>> def subscriber(sender): ... print("Got a signal sent by %r" % sender) ... >>> ready = signal('ready') >>> ready.connect(subscriber) <function subscriber at 0x...>
觸發信號
使用Signal.send()
方法通知信號訂閱者。
下面定義類Processor
,在它的go()
方法中觸發前面聲明的ready
信號,send()
方法以self
為參數,也就是說Processor
的實例是信號的發送者。
>>> class Processor: ... def __init__(self, name): ... self.name = name ... ... def go(self): ... ready = signal('ready') ... ready.send(self) ... print("Processing.") ... complete = signal('complete') ... complete.send(self) ... ... def __repr__(self): ... return '<Processor %s>' % self.name ... >>> processor_a = Processor('a') >>> processor_a.go() Got a signal sent by <Processor a> Processing.
注意到go()
方法中的complete
信號沒?並沒有訂閱者訂閱該信號,但是依然可以觸發該信號。如果沒有任何訂閱者的信號,結果是什么信號也不會發送,而且Blinker
內部對這種情況進行了優化,以盡可能的減少內存開銷。
訂閱特定的發布者
默認情況下,任意發布者觸發信號,都會通知訂閱者。可以給Signal.connect()
傳遞一個可選的參數,以便限制訂閱者只能訂閱特定發送者。
>>> def b_subscriber(sender): ... print("Caught signal from processor_b.") ... assert sender.name == 'b' ... >>> processor_b = Processor('b') >>> ready.connect(b_subscriber, sender=processor_b) <function b_subscriber at 0x...>
現在訂閱者只訂閱了processor_b
發布的ready
信號:
>>> processor_a.go() Got a signal sent by <Processor a> Processing. >>> processor_b.go() Got a signal sent by <Processor b> Caught signal from processor_b. Processing.
通過信號收發數據
可以給send()
方法傳遞額外的關鍵字參數,這些參數會傳遞給訂閱者。
>>> send_data = signal('send-data') >>> @send_data.connect ... def receive_data(sender, **kw): ... print("Caught signal from %r, data %r" % (sender, kw)) ... return 'received!' ... >>> result = send_data.send('anonymous', abc=123) Caught signal from 'anonymous', data {'abc': 123}
send()
方法的返回值收集每個訂閱者的返回值,拼接成一個元組組成的列表。每個元組的組成為(receiver function, return value)。
匿名信號
前面我們創建的信號都是命名信號,每次調用Signal
構造器都會創建一個唯一的信號,,也就是說每次創建的信號是不一樣的。下面對前面的Processor
類進行改造,將signal
作為它的類屬性。
>>> from blinker import Signal >>> class AltProcessor: ... on_ready = Signal() ... on_complete = Signal() ... ... def __init__(self, name): ... self.name = name ... ... def go(self): ... self.on_ready.send(self) ... print("Alternate processing.") ... self.on_complete.send(self) ... ... def __repr__(self): ... return '<AltProcessor %s>' % self.name ...
上面創建的就是匿名信號。on_ready與on_complete是兩個不同的信號。
使用修飾器訂閱信號
除了使用connect()
方法訂閱信號之外,使用@connect
修飾器可以達到同樣的效果。
>>> apc = AltProcessor('c') >>> @apc.on_complete.connect ... def completed(sender): ... print "AltProcessor %s completed!" % sender.name ... >>> apc.go() Alternate processing. AltProcessor c completed!
盡管這樣用起來很方便,但是這種形式不支持訂閱指定的發送者。這時,可以使用connect_via()
:
>>> dice_roll = signal('dice_roll') >>> @dice_roll.connect_via(1) ... @dice_roll.connect_via(3) ... @dice_roll.connect_via(5) ... def odd_subscriber(sender): ... print("Observed dice roll %r." % sender) ... >>> result = dice_roll.send(3) Observed dice roll 3.
優化信號發送
信號通常會進行優化,以便快速的發送。不管有沒有訂閱者,都可以發送信號。如果發送信號時需要傳送的參數要計算很長時間,可以在發送之前使用receivers
屬性先檢查一下是否有訂閱者。
>>> bool(signal('ready').receivers) True >>> bool(signal('complete').receivers) False >>> bool(AltProcessor.on_complete.receivers) True
還可以檢查訂閱者是否訂閱了某個具體的信號發布者。
>>> signal('ready').has_receivers_for(processor_a) True
鏈接:https://www.jianshu.com/p/d2f2cfd1b140