Rust源碼分析:channel內部mpsc隊列


https://zhuanlan.zhihu.com/p/50176724

接着前面的channel的升級繼續講。

首先,之前的upgrade過程中內存的回收要稍微注意下。因為Receiver現在指向shared::Packet之后,那個new_port需要被析構,也就是調用drop函數,我們看下drop的實現:

impl<T> Drop for Receiver<T> {  fn drop(&mut self) {  match *unsafe { self.inner() } {  Flavor::Oneshot(ref p) => p.drop_port(),  Flavor::Stream(ref p) => p.drop_port(),  Flavor::Shared(ref p) => p.drop_port(),  Flavor::Sync(ref p) => p.drop_port(),  }  } }

由於之前的swap操作,走Flavor::Oneshot路徑:

 pub fn drop_port(&self) {  match self.state.swap(DISCONNECTED, Ordering::SeqCst) {  // An empty channel has nothing to do, and a remotely disconnected  // channel also has nothing to do b/c we're about to run the drop  // glue  DISCONNECTED | EMPTY => {}   // There's data on the channel, so make sure we destroy it promptly.  // This is why not using an arc is a little difficult (need the box  // to stay valid while we take the data).  DATA => unsafe { (&mut *self.data.get()).take().unwrap(); },   // We're the only ones that can block on this port  _ => unreachable!()  }  }

同樣是DISCONNECTED替換DISCONNECTED而已,沒有過多操作。

同時不再需要的oneshot::Packet也要被析構:

impl<T> Drop for Packet<T> {  fn drop(&mut self) {  assert_eq!(self.state.load(Ordering::SeqCst), DISCONNECTED);  } }

只是個DISCONNECTED的檢驗操作。

所以現在Sender/Receiver都存放了Flavor::Shared(Arc<shared::Packet<T>>),之前的Flavor::Oneshot(Arc<oneshot::Packet<T>>>和臨時產生的Sender/Receiver都不存在了。

並發隊列

所以我們接着關注內在的數據結構,通過跟蹤以下函數來分析:

  • Sender::send(&self, t: T)
  • Receiver::recv(&self)
  • Receiver::recv_timeout(&self, timeout: Duration)

Sender::send(&self, t: T):

 pub fn send(&self, t: T) -> Result<(), SendError<T>> {  let (new_inner, ret) = match *unsafe { self.inner() } {  Flavor::Oneshot(ref p) => {  if !p.sent() {  return p.send(t).map_err(SendError);  } else {  let a = Arc::new(stream::Packet::new());  let rx = Receiver::new(Flavor::Stream(a.clone()));  match p.upgrade(rx) {  oneshot::UpSuccess => {  let ret = a.send(t);  (a, ret)  }  oneshot::UpDisconnected => (a, Err(t)),  oneshot::UpWoke(token) => {  // This send cannot panic because the thread is  // asleep (we're looking at it), so the receiver  // can't go away.  a.send(t).ok().unwrap();  token.signal();  (a, Ok(()))  }  }  }  }  Flavor::Stream(ref p) => return p.send(t).map_err(SendError),  Flavor::Shared(ref p) => return p.send(t).map_err(SendError),  Flavor::Sync(..) => unreachable!(),  };   unsafe {  let tmp = Sender::new(Flavor::Stream(new_inner));  mem::swap(self.inner_mut(), tmp.inner_mut());  }  ret.map_err(SendError)  }

事實上,對於我們的case,只有需要關注一句代碼即可:

 Flavor::Shared(ref p) => return p.send(t).map_err(SendError),

這里的p是Arc<shared::Packet<T>>的一個引用。我們繼續看p.send(t):

 pub fn send(&self, t: T) -> Result<(), T> {  // See Port::drop for what's going on  if self.port_dropped.load(Ordering::SeqCst) { return Err(t) }   if self.cnt.load(Ordering::SeqCst) < DISCONNECTED + FUDGE {  return Err(t)  }   self.queue.push(t);  match self.cnt.fetch_add(1, Ordering::SeqCst) {  -1 => {  self.take_to_wake().signal();  }   n if n < DISCONNECTED + FUDGE => {  // see the comment in 'try' for a shared channel for why this  // window of "not disconnected" is ok.  self.cnt.store(DISCONNECTED, Ordering::SeqCst);   if self.sender_drain.fetch_add(1, Ordering::SeqCst) == 0 {  loop {  // drain the queue, for info on the thread yield see the  // discussion in try_recv  loop {  match self.queue.pop() {  mpsc::Data(..) => {}  mpsc::Empty => break,  mpsc::Inconsistent => thread::yield_now(),  }  }   if self.sender_drain.fetch_sub(1, Ordering::SeqCst) == 1 {  break  }  }  }  }   // Can't make any assumptions about this case like in the SPSC case.  _ => {}  }   Ok(())  }

同時,我們再看下shared::Packet的數據結構跟初始化信息:

const DISCONNECTED: isize = isize::MIN; const FUDGE: isize = 1024;  pub struct Packet<T> {  queue: mpsc::Queue<T>,  cnt: AtomicIsize, // How many items are on this channel  steals: UnsafeCell<isize>, // How many times has a port received without blocking?  to_wake: AtomicUsize, // SignalToken for wake up   channels: AtomicUsize,   port_dropped: AtomicBool,  sender_drain: AtomicIsize,   select_lock: Mutex<()>, }  pub fn new() -> Packet<T> {  Packet {  queue: mpsc::Queue::new(),  cnt: AtomicIsize::new(0),  steals: UnsafeCell::new(0),  to_wake: AtomicUsize::new(0),  channels: AtomicUsize::new(2),  port_dropped: AtomicBool::new(false),  sender_drain: AtomicIsize::new(0),  select_lock: Mutex::new(()),  }  }

我們發現:

  • port_dropped用於標記接收端是否已經drop。
  • cnt會計數當前存入多少個數據。同時cnt通過跟DISCONNECTED的比較來判斷消費者是否已斷開。
  • 如果send中發現消費的一方已經斷開,則會自己嘗試pop所有的數據,將他們清理掉。
  • 主要的操作是通過self.queue.push(t)來完成。

那這個self.queue是怎么實現的呢?看下它的代碼,位於文件sync/mpsc/mpsc_queue.rs:

pub struct Queue<T> {  head: AtomicPtr<Node<T>>,  tail: UnsafeCell<*mut Node<T>>, } unsafe impl<T: Send> Send for Queue<T> { } unsafe impl<T: Send> Sync for Queue<T> { } impl<T> Queue<T> {  pub fn new() -> Queue<T> {  let stub = unsafe { Node::new(None) };  Queue {  head: AtomicPtr::new(stub),  tail: UnsafeCell::new(stub),  }  }   pub fn push(&self, t: T) {  unsafe {  let n = Node::new(Some(t));  let prev = self.head.swap(n, Ordering::AcqRel);  (*prev).next.store(n, Ordering::Release);  }  }   pub fn pop(&self) -> PopResult<T> {  unsafe {  let tail = *self.tail.get();  let next = (*tail).next.load(Ordering::Acquire);   if !next.is_null() {  *self.tail.get() = next;  assert!((*tail).value.is_none());  assert!((*next).value.is_some());  let ret = (*next).value.take().unwrap();  let _: Box<Node<T>> = Box::from_raw(tail);  return Data(ret);  }   if self.head.load(Ordering::Acquire) == tail {Empty} else {Inconsistent}  }  }  ............ }

事實上,它采用了Non-intrusive MPSC node-based queue的算法,構造了一個mpsc的單向鏈表,感興趣的可以通過這個鏈接詳細了解。

這個算法的優點是:

  • push:並發特別快,無等待並且幾乎僅僅一個swap(XCHG指令)操作,通過不斷地先swap成為head,然后再鏈接prev_head.next = head來構造鏈表。

缺點是:

  • non-Linearability:不具備線性一致性,push操作會阻塞pop操作,pop操作中如果發現head != tail 同時 tail.next還沒來得變為非null,那么就觀察到整個隊列處於不一致的狀態,這種情況下這里的實現返回Inconsistent。

同時我們看一下Node的代碼:

struct Node<T> {  next: AtomicPtr<Node<T>>,  value: Option<T>, } impl<T> Node<T> {  unsafe fn new(v: Option<T>) -> *mut Node<T> {  Box::into_raw(box Node {  next: AtomicPtr::new(ptr::null_mut()),  value: v,  })  } }

相對以往不同的是new操作返回的是*mut Node<T>,這里通過Box::into_raw讓使用者自己負責Node的內存釋放。

另一方面,當我們Receiver.recv()時假如channel中沒有數據,那么就需要等待,所以我們再看下相關的代碼:

 pub fn recv(&self) -> Result<T, RecvError> {  loop {  let new_port = match *unsafe { self.inner() } {  Flavor::Oneshot(ref p) => {  match p.recv(None) {  Ok(t) => return Ok(t),  Err(oneshot::Disconnected) => return Err(RecvError),  Err(oneshot::Upgraded(rx)) => rx,  Err(oneshot::Empty) => unreachable!(),  }  }  Flavor::Stream(ref p) => {  match p.recv(None) {  Ok(t) => return Ok(t),  Err(stream::Disconnected) => return Err(RecvError),  Err(stream::Upgraded(rx)) => rx,  Err(stream::Empty) => unreachable!(),  }  }  Flavor::Shared(ref p) => {  match p.recv(None) {  Ok(t) => return Ok(t),  Err(shared::Disconnected) => return Err(RecvError),  Err(shared::Empty) => unreachable!(),  }  }  Flavor::Sync(ref p) => return p.recv(None).map_err(|_| RecvError),  };  unsafe {  mem::swap(self.inner_mut(), new_port.inner_mut());  }  }  }

只要看:

 pub fn recv(&self) -> Result<T, RecvError> {  loop {  let new_port = match *unsafe { self.inner() } {  .........  Flavor::Shared(ref p) => {  match p.recv(None) {  Ok(t) => return Ok(t),  Err(shared::Disconnected) => return Err(RecvError),  Err(shared::Empty) => unreachable!(),  }  }  };  ...........  }  }

接着看p.recv(),它的返回值決定了調用結果:

 pub fn recv(&self, deadline: Option<Instant>) -> Result<T, Failure> {  // This code is essentially the exact same as that found in the stream  // case (see stream.rs)  match self.try_recv() {  Err(Empty) => {}  data => return data,  }   let (wait_token, signal_token) = blocking::tokens();  if self.decrement(signal_token) == Installed {  if let Some(deadline) = deadline {  let timed_out = !wait_token.wait_max_until(deadline);  if timed_out {  self.abort_selection(false);  }  } else {  wait_token.wait();  }  }   match self.try_recv() {  data @ Ok(..) => unsafe { *self.steals.get() -= 1; data },  data => data,  }  }

這里的邏輯是,前面的self.try_recv假如返回了數據,那么直接返回數據即可。否則很可能channel為空,所以通過blocking::tokens()為Receiver准備阻塞相關的數據,然后通過decrement方法再次判斷是否有數據,從而進入阻塞狀態,decrement代碼:

 fn decrement(&self, token: SignalToken) -> StartResult {  unsafe {  assert_eq!(self.to_wake.load(Ordering::SeqCst), 0);  let ptr = token.cast_to_usize();  self.to_wake.store(ptr, Ordering::SeqCst);   let steals = ptr::replace(self.steals.get(), 0);   match self.cnt.fetch_sub(1 + steals, Ordering::SeqCst) {  DISCONNECTED => { self.cnt.store(DISCONNECTED, Ordering::SeqCst); }  n => {  assert!(n >= 0);  if n - steals <= 0 { return Installed }  }  }  self.to_wake.store(0, Ordering::SeqCst);  drop(SignalToken::cast_from_usize(ptr));  Abort  }  }

如上所示,將token: SignalToken的指針放入to_wake中,等待將來被喚醒。

所以這里通過self.cnt字段減除1+ steals來判斷隊列是否為空,原因在於這里的計數方式並不是每次pop一個數據就將cnt-1,也許是為了性能考慮,我們將pop的數據個數匯總在了steals字段中,然后等到steals足夠大或者發現channel為空了才去修改cnt的值。所以這里通過self.cnt - (1+ steals) 與 0 比較來判斷是否已有數據,如果沒有則返回Installed,否則清理數據再返回Abort。

我們先看下Installed之后的邏輯:

 if self.decrement(signal_token) == Installed {  if let Some(deadline) = deadline {  let timed_out = !wait_token.wait_max_until(deadline);  if timed_out {  self.abort_selection(false);  }  } else {  wait_token.wait();  }  }

對於我們的情況它只是調用 wait_token.wait(),代碼為:

impl WaitToken {  pub fn wait(self) {  while !self.inner.woken.load(Ordering::SeqCst) {  thread::park()  }  }  ...........

先檢查woken再調用park(),注意這里是與之前Send的send操作相匹配的:

 pub fn send(&self, t: T) -> Result<(), T> {  .............  self.queue.push(t);  match self.cnt.fetch_add(1, Ordering::SeqCst) {  -1 => {  self.take_to_wake().signal();  }  ..........

我們看下相關的代碼:

 fn take_to_wake(&self) -> SignalToken {  let ptr = self.to_wake.load(Ordering::SeqCst);  self.to_wake.store(0, Ordering::SeqCst);  assert!(ptr != 0);  unsafe { SignalToken::cast_from_usize(ptr) } impl SignalToken {  pub fn signal(&self) -> bool {  let wake = !self.inner.woken.compare_and_swap(false, true, Ordering::SeqCst);  if wake {  self.inner.thread.unpark();  }  wake  }  .... }

先設置woken再調用unpark()。如此一來確保等待的Receiver不會永遠睡眠。

我們再看下decrement返回Abort的情況:

 pub fn recv(&self, deadline: Option<Instant>) -> Result<T, Failure> {  match self.try_recv() {  Err(Empty) => {}  data => return data,  }   let (wait_token, signal_token) = blocking::tokens();  if self.decrement(signal_token) == Installed {  .............  }   match self.try_recv() {  data @ Ok(..) => unsafe { *self.steals.get() -= 1; data },  data => data,  }  }

只是再次調用self.try_recv()而已,至於這里為什么會有*self.steals.get()-=1的操作,那是要看try_recv操作本身了,它有一個默認steals+1的操作,但是這里的第二個self.try_recv()的計數已經cnt匯總了,所以這個不需要steals+1,我們通過-1來平衡:

 pub fn try_recv(&self) -> Result<T, Failure> {  let ret = match self.queue.pop() {  mpsc::Data(t) => Some(t),  mpsc::Empty => None,  mpsc::Inconsistent => {  let data;  loop {  thread::yield_now();  match self.queue.pop() {  mpsc::Data(t) => { data = t; break }  mpsc::Empty => panic!("inconsistent => empty"),  mpsc::Inconsistent => {}  }  }  Some(data)  }  };  match ret {  Some(data) => unsafe {  if *self.steals.get() > MAX_STEALS {  match self.cnt.swap(0, Ordering::SeqCst) {  DISCONNECTED => {  self.cnt.store(DISCONNECTED, Ordering::SeqCst);  }  n => {  let m = cmp::min(n, *self.steals.get());  *self.steals.get() -= m;  self.bump(n - m);  }  }  assert!(*self.steals.get() >= 0);  }  *self.steals.get() += 1;  Ok(data)  },  None => {  match self.cnt.load(Ordering::SeqCst) {  n if n != DISCONNECTED => Err(Empty),  _ => {  match self.queue.pop() {  mpsc::Data(t) => Ok(t),  mpsc::Empty => Err(Disconnected),  // with no senders, an inconsistency is impossible.  mpsc::Inconsistent => unreachable!(),  }  }  }  }  }  }

從代碼中可以看到,如果pop()取得數據則直接返回;如果Empty則返回None,從而讓Receiver可以陷入等待;如果Inconsistent 則說明隊列處於push操作稍慢的不一致狀態,我們的辦法就是通過thread::yield_now(),一直調用pop()直到返回數據或者None。

另外,的確是通過MAX_STEALS 這個字段先匯總steals的值:

 match ret {  Some(data) => unsafe {  if *self.steals.get() > MAX_STEALS {  match self.cnt.swap(0, Ordering::SeqCst) {  DISCONNECTED => {  self.cnt.store(DISCONNECTED, Ordering::SeqCst);  }  n => {  let m = cmp::min(n, *self.steals.get());  *self.steals.get() -= m;  self.bump(n - m);  }  }  assert!(*self.steals.get() >= 0);  }  *self.steals.get() += 1;  Ok(data)  },  ...............  }

假如steals足夠大,大於MAX_STEALS 我們才通過與cnt比較,然后從cnt中減除它。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM