Rust源碼分析:channel's upgrade


https://zhuanlan.zhihu.com/p/50101525

std::sync::mpsc::channel

本文分析Rust標准庫中的channel,channel(通道)作為線程間通信的一種方式被廣泛使用。

Rust提供了多生產者單消費者的channel。我們重點關注多個生產者的情況。

它的實現方式非常有趣。我把它分為通道升級跟並發隊列兩部分。

本文描述通道升級

對於一個channel()調用,我們得到的(sender, receiver)是oneshot的,這一點從源碼可以得到暗示:

#[stable(feature = "rust1", since = "1.0.0")] pub fn channel<T>() -> (Sender<T>, Receiver<T>) {  let a = Arc::new(oneshot::Packet::new());  (Sender::new(Flavor::Oneshot(a.clone())), Receiver::new(Flavor::Oneshot(a))) }

這里至少有四個結構:

  • oneshot::Packet:Packet,真正存放數據的地方。此處是單個數據(其他類型可能使用隊列)
  • Flavor::Oneshot。
  • Sender/Receiver。

我們分別看下他們的數據結構源碼,首先是oneshot::Packet,它位於mpsc/oneshot.rs:

pub struct Packet<T> {  // Internal state of the chan/port pair (stores the blocked thread as well)  state: AtomicUsize,  // One-shot data slot location  data: UnsafeCell<Option<T>>,  // when used for the second time, a oneshot channel must be upgraded, and  // this contains the slot for the upgrade  upgrade: UnsafeCell<MyUpgrade<T>>, }

可以看出data是為一個數據准備的。upgrade字段用於通道升級。

另外還有其他類型的Packet,查看同一文件夾發現有shared::Packet/stream::Packet/sync::Packet,他們分別位於shared.rs/stream.rs/sync.rs中。我們重點關注shared::Packet:

pub struct Packet<T> {  queue: mpsc::Queue<T>,  cnt: AtomicIsize, // How many items are on this channel  steals: UnsafeCell<isize>, // How many times has a port received without blocking?  to_wake: AtomicUsize, // SignalToken for wake up   // The number of channels which are currently using this packet.  channels: AtomicUsize,   // See the discussion in Port::drop and the channel send methods for what  // these are used for  port_dropped: AtomicBool,  sender_drain: AtomicIsize,   // this lock protects various portions of this implementation during  // select()  select_lock: Mutex<()>, }

清楚地看到queue字段,它用於存放數據。我們先不關注數據字段。

對於這四個類型的Packet,標准庫提供了enun Flavor<T>來做區分:

enum Flavor<T> {  Oneshot(Arc<oneshot::Packet<T>>),  Stream(Arc<stream::Packet<T>>),  Shared(Arc<shared::Packet<T>>),  Sync(Arc<sync::Packet<T>>), }

而我們的Sender/Receiver對象則非常簡單地通過存儲Flavor<T>來關聯到Packet:

pub struct Sender<T> {  inner: UnsafeCell<Flavor<T>>, } pub struct Receiver<T> {  inner: UnsafeCell<Flavor<T>>, }

我們再看一下fn channel:

pub fn channel<T>() -> (Sender<T>, Receiver<T>) {  let a = Arc::new(oneshot::Packet::new());  (Sender::new(Flavor::Oneshot(a.clone())), Receiver::new(Flavor::Oneshot(a))) }

就可以了解到Sender/Receiver里面都存了Flavor,根據Flavor的類型區分Packet的類型,同時Packet作為共享數據被安全地共享。

這就是我們調用channel得到的結果。因為我們重點關注多生產者的情況,所以我們再看一下Clone for Sender的實現:

impl<T> Clone for Sender<T> {  fn clone(&self) -> Sender<T> {  let packet = match *unsafe { self.inner() } {  Flavor::Oneshot(ref p) => {  let a = Arc::new(shared::Packet::new());  {  let guard = a.postinit_lock();  let rx = Receiver::new(Flavor::Shared(a.clone()));  let sleeper = match p.upgrade(rx) {  oneshot::UpSuccess |  oneshot::UpDisconnected => None,  oneshot::UpWoke(task) => Some(task),  };  a.inherit_blocker(sleeper, guard);  }  a  }  Flavor::Stream(ref p) => {  let a = Arc::new(shared::Packet::new());  {  let guard = a.postinit_lock();  let rx = Receiver::new(Flavor::Shared(a.clone()));  let sleeper = match p.upgrade(rx) {  stream::UpSuccess |  stream::UpDisconnected => None,  stream::UpWoke(task) => Some(task),  };  a.inherit_blocker(sleeper, guard);  }  a  }  Flavor::Shared(ref p) => {  p.clone_chan();  return Sender::new(Flavor::Shared(p.clone()));  }  Flavor::Sync(..) => unreachable!(),  };   unsafe {  let tmp = Sender::new(Flavor::Shared(packet.clone()));  mem::swap(self.inner_mut(), tmp.inner_mut());  }  Sender::new(Flavor::Shared(packet))  } }

代碼比較多,但我們關注Flavor::Oneshot的情況,先看下self.inner()的實現,它是通過 trait UnsafeFlavor來提供的接口:

trait UnsafeFlavor<T> {  fn inner_unsafe(&self) -> &UnsafeCell<Flavor<T>>;  unsafe fn inner_mut(&self) -> &mut Flavor<T> {  &mut *self.inner_unsafe().get()  }  unsafe fn inner(&self) -> &Flavor<T> {  &*self.inner_unsafe().get()  } } impl<T> UnsafeFlavor<T> for Sender<T> {  fn inner_unsafe(&self) -> &UnsafeCell<Flavor<T>> {  &self.inner  } }

考慮到Sender存了inner: UnsafeCell<Flavor<T>>,所以這里是通過unsafe的指針操作得到內部Flavor<T>的引用,然后匹配到Flavor::Oneshot的情況:

impl<T> Clone for Sender<T> {  fn clone(&self) -> Sender<T> {  let packet = match *unsafe { self.inner() } {  Flavor::Oneshot(ref p) => {  let a = Arc::new(shared::Packet::new());  {  let guard = a.postinit_lock();  let rx = Receiver::new(Flavor::Shared(a.clone()));  let sleeper = match p.upgrade(rx) {  oneshot::UpSuccess |  oneshot::UpDisconnected => None,  oneshot::UpWoke(task) => Some(task),  };  a.inherit_blocker(sleeper, guard);  }  a  }  ............  };   unsafe {  let tmp = Sender::new(Flavor::Shared(packet.clone()));  mem::swap(self.inner_mut(), tmp.inner_mut());  }  Sender::new(Flavor::Shared(packet))  } }

接下來通過Arc::new(shared::Packet::new()),創建了一個全新的shared::Packet,a。

然后調用a.postinit_lock(),我們看下它的代碼:

 pub fn postinit_lock(&self) -> MutexGuard<()> {  self.select_lock.lock().unwrap()  }

結合Shared::Packet的new函數:

 pub fn new() -> Packet<T> {  Packet {  queue: mpsc::Queue::new(),  cnt: AtomicIsize::new(0),  steals: UnsafeCell::new(0),  to_wake: AtomicUsize::new(0),  channels: AtomicUsize::new(2),  port_dropped: AtomicBool::new(false),  sender_drain: AtomicIsize::new(0),  select_lock: Mutex::new(()),  }  }

發現它只是個lock操作,guard作為返回的對象將來用於解鎖。

我們接着看原來的代碼,這一行是重點:

let rx = Receiver::new(Flavor::Shared(a.clone()));

我們根據新建的a,創建了一個Receiver rx,這里創建的rx是挺奇怪的事情。但是我們只能接着看代碼:

 let sleeper = match p.upgrade(rx) {  oneshot::UpSuccess |  oneshot::UpDisconnected => None,  oneshot::UpWoke(task) => Some(task),  };

這里的p就是原來的oneshot::Packet,傳入新建的rx,我們調用它的upgrade方法:

 pub fn upgrade(&self, up: Receiver<T>) -> UpgradeResult {  unsafe {  let prev = match *self.upgrade.get() {  NothingSent => NothingSent,  SendUsed => SendUsed,  _ => panic!("upgrading again"),  };  ptr::write(self.upgrade.get(), GoUp(up));   match self.state.swap(DISCONNECTED, Ordering::SeqCst) {  // If the channel is empty or has data on it, then we're good to go.  // Senders will check the data before the upgrade (in case we  // plastered over the DATA state).  DATA | EMPTY => UpSuccess,   // If the other end is already disconnected, then we failed the  // upgrade. Be sure to trash the port we were given.  DISCONNECTED => { ptr::replace(self.upgrade.get(), prev); UpDisconnected }   // If someone's waiting, we gotta wake them up  ptr => UpWoke(SignalToken::cast_from_usize(ptr))  }  }  }

根據初始化的upgrade字段的值,我們發現只能是NothingSent:

 pub fn new() -> Packet<T> {  Packet {  data: UnsafeCell::new(None),  upgrade: UnsafeCell::new(NothingSent),  state: AtomicUsize::new(EMPTY),  }  }

然后我們把GoUp(up)寫入了upgrade字段,那么現在我們新建的rx:Receiver也就到了upgrade字段里面,這里我們可以看下GoUp字段相關的代碼:

enum MyUpgrade<T> {  NothingSent,  SendUsed,  GoUp(Receiver<T>), }

接着將通過self.state.swap操作將狀態改變為DISCONNECTED,因為這個oneshot::Packet將要被淘汰,而我們只是把它的狀態從EMPTY變為DISCONNECTED,可以看下相關的代碼:

// Various states you can find a port in. const EMPTY: usize = 0; // initial state: no data, no blocked receiver const DATA: usize = 1; // data ready for receiver to take const DISCONNECTED: usize = 2; // channel is disconnected OR upgraded 

最后upgrade返回作為結果UpgradeResult 的UpSuccess標記。我們接着看原來clone的代碼:

impl<T> Clone for Sender<T> {  fn clone(&self) -> Sender<T> {  let packet = match *unsafe { self.inner() } {  Flavor::Oneshot(ref p) => {  let a = Arc::new(shared::Packet::new());  {  let guard = a.postinit_lock();  let rx = Receiver::new(Flavor::Shared(a.clone()));  let sleeper = match p.upgrade(rx) {  oneshot::UpSuccess |  oneshot::UpDisconnected => None,  oneshot::UpWoke(task) => Some(task),  };  a.inherit_blocker(sleeper, guard);  }  a  }  ............  };  ..................  } }

這里的p.upgrade(rx)的結果就是UpSuccess,那么sleeper 就是None。

我們接着看a.inherit_blocker(sleeper, guard)的實現:

 pub fn inherit_blocker(&self,  token: Option<SignalToken>,  guard: MutexGuard<()>) {  token.map(|token| {  assert_eq!(self.cnt.load(Ordering::SeqCst), 0);  assert_eq!(self.to_wake.load(Ordering::SeqCst), 0);  self.to_wake.store(unsafe { token.cast_to_usize() }, Ordering::SeqCst);  self.cnt.store(-1, Ordering::SeqCst);   unsafe { *self.steals.get() = -1; }  });   drop(guard);  }

被傳入的token也就是sleeper為None,None.map(||{})只是返回None,所以這里的操作只是通過guard釋放了鎖。到此,我們返回a,就是packet:Arc<shared::Packet<T>>。我們再接着看clone的代碼:

impl<T> Clone for Sender<T> {  fn clone(&self) -> Sender<T> {  let packet = match *unsafe { self.inner() } {  Flavor::Oneshot(ref p) => {  let a = Arc::new(shared::Packet::new());  {  let guard = a.postinit_lock();  let rx = Receiver::new(Flavor::Shared(a.clone()));  let sleeper = match p.upgrade(rx) {  oneshot::UpSuccess |  oneshot::UpDisconnected => None,  oneshot::UpWoke(task) => Some(task),  };  a.inherit_blocker(sleeper, guard);  }  a  }  ............  };   unsafe {  let tmp = Sender::new(Flavor::Shared(packet.clone()));  mem::swap(self.inner_mut(), tmp.inner_mut());  }  Sender::new(Flavor::Shared(packet))  } }

注意,我們通過Sender::new(Flavor::Shared(packet))返回了一個新的Sender對象,它基於shared::Packet。同時,我們構造了一個臨時的Sender對象tmp,然后通過mem::swap這種unsafe的內存操作,將當前的對象內部的inner替換掉,注意它是UnsafeCell<Flavor<T>>。

Flavor::Oneshot(Arc<oneshot::Packet<T>>)
=> Flavor::Shared(Arc<shared::Packet<T>>)

而這個tmp對象,我們看下它的drop方法,由於swap操作,走Flavor::OneShot路徑:

impl<T> Drop for Sender<T> {  fn drop(&mut self) {  match *unsafe { self.inner() } {  Flavor::Oneshot(ref p) => p.drop_chan(),  Flavor::Stream(ref p) => p.drop_chan(),  Flavor::Shared(ref p) => p.drop_chan(),  Flavor::Sync(..) => unreachable!(),  }  } }  pub fn drop_chan(&self) {  match self.state.swap(DISCONNECTED, Ordering::SeqCst) {  DATA | DISCONNECTED | EMPTY => {}   // If someone's waiting, we gotta wake them up  ptr => unsafe {  SignalToken::cast_from_usize(ptr).signal();  }  }  }

self.state字段已經是DISCONNECTED的值了,所以tmp被析構時不會有更多的操作。

以上是針對Flavor::Oneshot的clone實現,我們再看下如果接着調用clone的實現:

 fn clone(&self) -> Sender<T> {  let packet = match *unsafe { self.inner() } {  ............  Flavor::Shared(ref p) => {  p.clone_chan();  return Sender::new(Flavor::Shared(p.clone()));  }  Flavor::Sync(..) => unreachable!(),  };  ............  }

注意到它只會走Flavor::Shared的路徑,只返回一個新的Sender<Flavor::Shared<..>>而已

我們看下clone_chan的實現:

 pub fn clone_chan(&self) {  let old_count = self.channels.fetch_add(1, Ordering::SeqCst);   // See comments on Arc::clone() on why we do this (for `mem::forget`).  if old_count > MAX_REFCOUNT {  unsafe {  abort();  }  }  }

只是增加了一個關聯管道的計數。

綜合以上,我們現在有兩個Sender:

  • 一個是一開始的Sender,也就是代碼中的self,它內部的inner已經指向Flavor::Shared。
  • 另一個是clone出來的Sender,它一樣是指向Flavor::Shared,並且與第一個共享一個shared::Packet。

同時我們還有兩個Receiver:

  • 一個是一開始的Receiver,它內部的inner現在還是指向一開始的Flavor::Oneshot,里面包裹了初始的oneshot::Packet。
  • 另一個是Sender.clone()調用中創建的Receiver,它指向了Flavor::Shared。同時它被存放在了初始的oneshot::Packet里面。

也就是說通過第一個Receiver可得到oneshot::Packet,通過它可以得到Flavor::Shared,那么我們就可以成功實現Receiver的升級操作。

但是此刻當Sender的所有clone操作都完成時,Receiver是還沒升級的。為了查看Receiver何時升級,我們來看Receiver的recv函數:

 pub fn recv(&self) -> Result<T, RecvError> {  loop {  let new_port = match *unsafe { self.inner() } {  Flavor::Oneshot(ref p) => {  match p.recv(None) {  Ok(t) => return Ok(t),  Err(oneshot::Disconnected) => return Err(RecvError),  Err(oneshot::Upgraded(rx)) => rx,  Err(oneshot::Empty) => unreachable!(),  }  }  Flavor::Stream(ref p) => {  match p.recv(None) {  Ok(t) => return Ok(t),  Err(stream::Disconnected) => return Err(RecvError),  Err(stream::Upgraded(rx)) => rx,  Err(stream::Empty) => unreachable!(),  }  }  Flavor::Shared(ref p) => {  match p.recv(None) {  Ok(t) => return Ok(t),  Err(shared::Disconnected) => return Err(RecvError),  Err(shared::Empty) => unreachable!(),  }  }  Flavor::Sync(ref p) => return p.recv(None).map_err(|_| RecvError),  };  unsafe {  mem::swap(self.inner_mut(), new_port.inner_mut());  }  }  }

我們只關注Flavor::Oneshot的情況,得到內部的oneshot::Packet為p,調用p.recv(None):

 pub fn recv(&self, deadline: Option<Instant>) -> Result<T, Failure<T>> {  // Attempt to not block the thread (it's a little expensive). If it looks  // like we're not empty, then immediately go through to `try_recv`.  if self.state.load(Ordering::SeqCst) == EMPTY {  let (wait_token, signal_token) = blocking::tokens();  let ptr = unsafe { signal_token.cast_to_usize() };   // race with senders to enter the blocking state  if self.state.compare_and_swap(EMPTY, ptr, Ordering::SeqCst) == EMPTY {  if let Some(deadline) = deadline {  let timed_out = !wait_token.wait_max_until(deadline);  // Try to reset the state  if timed_out {  self.abort_selection().map_err(Upgraded)?;  }  } else {  wait_token.wait();  debug_assert!(self.state.load(Ordering::SeqCst) != EMPTY);  }  } else {  // drop the signal token, since we never blocked  drop(unsafe { SignalToken::cast_from_usize(ptr) });  }  }   self.try_recv()  }

此刻,由於之前Sender.clone()操作,這里的self.state已經是DISCONNECTED了,所以我們接着看self.try_recv():

 pub fn try_recv(&self) -> Result<T, Failure<T>> {  unsafe {  match self.state.load(Ordering::SeqCst) {  EMPTY => Err(Empty),  DATA => {  self.state.compare_and_swap(DATA, EMPTY, Ordering::SeqCst);  match (&mut *self.data.get()).take() {  Some(data) => Ok(data),  None => unreachable!(),  }  }  DISCONNECTED => {  match (&mut *self.data.get()).take() {  Some(data) => Ok(data),  None => {  match ptr::replace(self.upgrade.get(), SendUsed) {  SendUsed | NothingSent => Err(Disconnected),  GoUp(upgrade) => Err(Upgraded(upgrade))  }  }  }  }  // We are the sole receiver; there cannot be a blocking  // receiver already.  _ => unreachable!()  }  }  }

顯然,這里走的是DISCONNECTED 路徑,self.data初始值為None,所以這里的take()操作走None路徑,關鍵是下面的代碼:

 None => {  match ptr::replace(self.upgrade.get(), SendUsed) {  SendUsed | NothingSent => Err(Disconnected),  GoUp(upgrade) => Err(Upgraded(upgrade))  }  }

我們把self.upgrade里面存放的數據替換為SendUsed,同時取得原來的數據。

注意,這里取得的數據GoUp(upgrade),upgrade就是之前我們不知道為何創建的Receiver<T>,同時通過Err(Upgraded(upgrade))返回出去,這里的Upgraded是:

pub enum Failure<T> {  Empty,  Disconnected,  Upgraded(Receiver<T>), }

這個值一直返回到Receiver.recv()操作里面,

 pub fn recv(&self) -> Result<T, RecvError> {  loop {  let new_port = match *unsafe { self.inner() } {  Flavor::Oneshot(ref p) => {  match p.recv(None) {  Ok(t) => return Ok(t),  Err(oneshot::Disconnected) => return Err(RecvError),  Err(oneshot::Upgraded(rx)) => rx,  Err(oneshot::Empty) => unreachable!(),  }  }  ............  };  unsafe {  mem::swap(self.inner_mut(), new_port.inner_mut());  }  }  }

根據Err(oneshot::Upgraded(rx))匹配得到rx,也就是創建的那個Receiver。接着rx作為new_port,最后通過一樣的mem::swap操作把Receiver內部的Flavor<T>替換為Flavor::Shared模式的對象。

於是,我們看到Receiver已經成功升級為關聯到Flavor::Shared<shared::Packet<T>>的通道。

至此,Sender/Receiver從僅存放一個元素的通道升級為無限制容量的MPSC通道。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM