原文鏈接:http://nerds.weddingpartyapp.com/tech/2015/01/21/rxjava-share-publish-refcount-and-all-that-jazz/
看源碼知道.share()操作符是.publish().refcount()調用鏈的包裝。
先來看ConnectedObservable
“ConnectedObservable” – This is a kind of observable which doesn’t emit items even if subscribed to.
It only starts emitting items after its .connect() method is called.
因為這個原因,在ConnectedObservable的connect這個方法被調用之前,connected obesrvable也被認為是“冷”和不活躍。
再看publish方法
.publish()– This method allows us to change an ordinary observable into a “ConnectedObservable”.
Simply call this method on an ordinary observable and it becomes a connected one.
現在我們知道了share操作符的1/2,那么為什么需要運用Connected Observable這個操作符呢?文檔上是這么寫的:
In this way you can wait for all intended Subscribers to subscribe to the Observable before the Observable begins emitting items.
這就意味着publish可以調用多個subscriber。當你有超過一個訂閱者的時候,處理每個訂閱和正確的銷毀他們變得棘手。 為了使這個更方便,Rx發明了這個魔法操作符refcount():
refcount() – This operator keeps track of how many subscribers are subscribed to the resulting Observable and
refrains from disconnecting from the source ConnectedObservable until all such Observables are unsubscribed.
refcount本質上在后台維護着一個引用計數器,當一個subscription需要取消訂閱或者銷毀的時候,發出一個正確的動作。
我們再次看一下debouncedBuffer的例子,看一下在哪,share是怎么用的。
Observable<Object> tapEventEmitter = _rxBus.toObserverable().share(); // which is really the same as: Observable<Object> tapEventEmitter = _rxBus.toObserverable().publish().refcount();
我們現在有了一個"shareable"的名字叫"tapEventEmitter"的observable。 因為他是可以分享的,而且還不是“live”(share操作符中的publish調用使其變成一個ConnectedObservable), 我們可以用他構成我們的Observables,而且要確保我們有了一個原始的observable的引用 (這個例子中原始的observable是_rxBus.toObserverable()).
Observable<Object> tapEventEmitter = _rxBus.toObserverable().share(); Observable<Object> debouncedEventEmitter = tapEventEmitter.debounce(1, TimeUnit.SECONDS); tapEventEmitter.buffer(debouncedEventEmitter) //...
所有的這一切看起來都很好。然而這個實現會有一個可能的競爭條件。因為這有兩個subscribers(debounce and buffer)而且會在不同的時間點發生,所以競爭條件就會發生。 記住RxBus是由hot/live Subject支持的不斷的發射數據。通過使用share操作符,我們保證引用的是同一個資源。 而不是subscribers在不同的時間點訂閱,他們會收到准確的相同的數據。
The race condition is when the two consumers subscribe. Often on a hot stream it doesn’t matter when subscribers come and go,and refCount is perfect for that.
The race condition refCount protects against is having only 1 active subscription upstream. However,if 2 Subscribers subscribe to a refcounted stream that emits 1, 2, 3, 4, 5, the first may get 1, 2, 3, 4, 5 and the second may get 2, 3, 4, 5.
To ensure all subscribers start at exactly the same time and get the exact same values, refCount can not be used.
Either ConnectableObservable with a manual, imperative invocation of connect needs to be done, or the variant of publish(function)which connects everything within the function before connecting the upstream.
在我們的用法中幾乎立即執行所以沒有什么關系。但是我們原始的意圖是把debouncedBuffer方法作為一個單獨的操作符。 如果相同的事件沒有被發射出去,從概念上看起來是不正確的。
通過Bean后來的建議,我添加了一個更好的第三方的實現,用來處理這種競爭條件。
// don't start emitting items just yet by turning the observable to a connected one ConnectableObservable<Object> tapEventEmitter = _rxBus.toObserverable().publish(); tapEventEmitter.publish((Func1) (stream) -> { // inside `publish`, "stream" is truly multicasted // applying the same technique for getting a debounced buffer sequence return stream.buffer(stream.debounce(1, TimeUnit.SECONDS)); }).subscribe((Action1) (taps) { _showTapCount(taps.size()); }); // start listening to events now tapEventEmitter.connect();