https://zhuanlan.zhihu.com/p/50176724
接着前面的channel的升级继续讲。
首先,之前的upgrade过程中内存的回收要稍微注意下。因为Receiver现在指向shared::Packet之后,那个new_port需要被析构,也就是调用drop函数,我们看下drop的实现:
impl<T> Drop for Receiver<T> { fn drop(&mut self) { match *unsafe { self.inner() } { Flavor::Oneshot(ref p) => p.drop_port(), Flavor::Stream(ref p) => p.drop_port(), Flavor::Shared(ref p) => p.drop_port(), Flavor::Sync(ref p) => p.drop_port(), } } }
由于之前的swap操作,走Flavor::Oneshot路径:
pub fn drop_port(&self) { match self.state.swap(DISCONNECTED, Ordering::SeqCst) { // An empty channel has nothing to do, and a remotely disconnected // channel also has nothing to do b/c we're about to run the drop // glue DISCONNECTED | EMPTY => {} // There's data on the channel, so make sure we destroy it promptly. // This is why not using an arc is a little difficult (need the box // to stay valid while we take the data). DATA => unsafe { (&mut *self.data.get()).take().unwrap(); }, // We're the only ones that can block on this port _ => unreachable!() } }
同样是DISCONNECTED替换DISCONNECTED而已,没有过多操作。
同时不再需要的oneshot::Packet也要被析构:
impl<T> Drop for Packet<T> { fn drop(&mut self) { assert_eq!(self.state.load(Ordering::SeqCst), DISCONNECTED); } }
只是个DISCONNECTED的检验操作。
所以现在Sender/Receiver都存放了Flavor::Shared(Arc<shared::Packet<T>>),之前的Flavor::Oneshot(Arc<oneshot::Packet<T>>>和临时产生的Sender/Receiver都不存在了。
并发队列
所以我们接着关注内在的数据结构,通过跟踪以下函数来分析:
- Sender::send(&self, t: T)
- Receiver::recv(&self)
- Receiver::recv_timeout(&self, timeout: Duration)
Sender::send(&self, t: T):
pub fn send(&self, t: T) -> Result<(), SendError<T>> { let (new_inner, ret) = match *unsafe { self.inner() } { Flavor::Oneshot(ref p) => { if !p.sent() { return p.send(t).map_err(SendError); } else { let a = Arc::new(stream::Packet::new()); let rx = Receiver::new(Flavor::Stream(a.clone())); match p.upgrade(rx) { oneshot::UpSuccess => { let ret = a.send(t); (a, ret) } oneshot::UpDisconnected => (a, Err(t)), oneshot::UpWoke(token) => { // This send cannot panic because the thread is // asleep (we're looking at it), so the receiver // can't go away. a.send(t).ok().unwrap(); token.signal(); (a, Ok(())) } } }