https://zhuanlan.zhihu.com/p/50176724
接着前面的channel的升級繼續講。
首先,之前的upgrade過程中內存的回收要稍微注意下。因為Receiver現在指向shared::Packet之后,那個new_port需要被析構,也就是調用drop函數,我們看下drop的實現:
impl<T> Drop for Receiver<T> { fn drop(&mut self) { match *unsafe { self.inner() } { Flavor::Oneshot(ref p) => p.drop_port(), Flavor::Stream(ref p) => p.drop_port(), Flavor::Shared(ref p) => p.drop_port(), Flavor::Sync(ref p) => p.drop_port(), } } }
由於之前的swap操作,走Flavor::Oneshot路徑:
pub fn drop_port(&self) { match self.state.swap(DISCONNECTED, Ordering::SeqCst) { // An empty channel has nothing to do, and a remotely disconnected // channel also has nothing to do b/c we're about to run the drop // glue DISCONNECTED | EMPTY => {} // There's data on the channel, so make sure we destroy it promptly. // This is why not using an arc is a little difficult (need the box // to stay valid while we take the data). DATA => unsafe { (&mut *self.data.get()).take().unwrap(); }, // We're the only ones that can block on this port _ => unreachable!() } }
同樣是DISCONNECTED替換DISCONNECTED而已,沒有過多操作。
同時不再需要的oneshot::Packet也要被析構:
impl<T> Drop for Packet<T> { fn drop(&mut self) { assert_eq!(self.state.load(Ordering::SeqCst), DISCONNECTED); } }
只是個DISCONNECTED的檢驗操作。
所以現在Sender/Receiver都存放了Flavor::Shared(Arc<shared::Packet<T>>),之前的Flavor::Oneshot(Arc<oneshot::Packet<T>>>和臨時產生的Sender/Receiver都不存在了。
並發隊列
所以我們接着關注內在的數據結構,通過跟蹤以下函數來分析:
- Sender::send(&self, t: T)
- Receiver::recv(&self)
- Receiver::recv_timeout(&self, timeout: Duration)
Sender::send(&self, t: T):
pub fn send(&self, t: T) -> Result<(), SendError<T>> { let (new_inner, ret) = match *unsafe { self.inner() } { Flavor::Oneshot(ref p) => { if !p.sent() { return p.send(t).map_err(SendError); } else { let a = Arc::new(stream::Packet::new()); let rx = Receiver::new(Flavor::Stream(a.clone())); match p.upgrade(rx) { oneshot::UpSuccess => { let ret = a.send(t); (a, ret) } oneshot::UpDisconnected => (a, Err(t)), oneshot::UpWoke(token) => { // This send cannot panic because the thread is // asleep (we're looking at it), so the receiver // can't go away. a.send(t).ok().unwrap(); token.signal(); (a, Ok(())) } } }