【disruptor】2、disruptor中生產者線程與消費者之間的協調


 

 

 

由於ringbuffer是一個環形的隊列,那么生產者和消費者在遍歷這個隊列的時候,如何制衡呢?

 

1、生產快,消費慢,數據丟失?

生產者速度過快,導致一個對象還沒消費完,就循環生產了一個新的對象要加入ringbuffer,導致消費不完整,造成數據丟失?

 我們注意到,在我們獲取生產者下一個位置的時候,是通過ringbuffer的next方法,而這個next方式是調用了sequencer的next方法

 

 

 

----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

這個對象,在我們創建disruptor對象的時候,創建的

 

 

 

 

 

 

所以這個ringbuffer就是disruptor中的sequencer對象,那么在進行獲取next的時候,這里是如何獲取下一個的呢?是否會對這個生產獲取下一個序列進行相應的等待策略,避免產生相應的干擾!!!

 

這個各位看官還需多看看里面的代碼以及封裝(特別是封裝,真是九轉十八彎),多熟悉,我這繞着繞着很容易就繞暈了,剛開始也是雲里霧里。

 

----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

 

回歸主線,繼續看看next方法

 

long nextSequence = nextValue + n;      這個標識這次要生產的數據到什么位子
long wrapPoint = nextSequence - bufferSize;   是否超出范圍,第一次是沒有超出最開始的ringbuffer大小,第二次就是判斷是否要對沒有被消費的部分進行覆蓋了
long cachedGatingSequence = this.cachedValue;   這個是消費者消費的情況,判斷從哪里開始到當前位置是沒有被消費的

 

 

 

 那么這個判斷條件是什么意思呢?

 wrapPoint > cachedGatingSequence   這個條件是判斷,當前超出的部分是否會覆蓋到還未被消費的部分數據

 

cachedGatingSequence > nextValue   這個判斷是,未被消費的位置開始,是否在下一個生產者位置的前面,

  如果還未被消費的標識比下一個要被生產的位置還小,那說明生產在消費的前面,消費者可以繼續消費

  如果未被消費的標識比下一個要生產的位置還大,消費盤跑到生產前面了,會造成重復消費

 

 由於是環形的隊列,當消費者和生產者同時啟動的時候,而cachedValue  ,nextValue  是累加計數的,那么我們要保證生產者不能超過消費者一個ringbuffer的大小,不然會產生消費數據丟失

所以,nextValue  + 當前要生產的數據  - ringbuffersize 要保證比未被消費的地方小,不然超出到消費者前面一圈,那么部分還沒有被消費就又被從新設置了

 

 

2、生產慢,消費快,數據重復消費?

生產者速度過慢,導致消費者消費一圈了,生產者還沒有生產新的數據出來,會不會導致重復消費?

 

這里的關鍵就是兩個變量

cachedValue 和 nextValue

cachedValue  進行累加統計被消費的個數,可以記錄消費到哪里了的位置

nextValue  進行記錄累加統計被生產的位置,可以記錄生產到那個位置了

 

通過這兩個變量保證消費的進度永遠保持在生產的進度后面,也就是 cachedValue  < nextValue  的時候才可以繼續生產和消費,違背這個規則就要對一方進行阻塞,或者自旋

 

 

 

總結:

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM