【源碼】RingBuffer(一)——生產者


純CAS為啥比加鎖要快?

同樣是修改數據,一個采用加鎖的方式保證原子性,一個采用CAS的方式保證原子性。

都是能夠達到目的的,但是常用的鎖(例如顯式的Lock和隱式的synchonized),都會把獲取不到鎖的線程掛起,相對於CAS的不掛起,多了掛起和喚醒的開銷。

題外話:CAS與鎖的關系

CAS只是在這個場景下,比使用鎖來得更純粹,因為只做數據更新,所以開銷更少。但是業務上為了保證一系列操作的原子性,還是要使用鎖的。而且鎖的底層實現,也依賴於類似於CAS這樣的原子性操作。

尾指針是如何管理的,如何防止覆蓋舊數據?

別的帖子都說RingBuffer中不維護尾指針,尾指針由消費者維護(所謂維護指針,就是修改、移動指針)其實這一句話有點誤導性,如果RingBuffer不知道尾部在哪里,那它的數據存儲肯定就會出問題,例如把還沒消費過的數據給覆蓋了。

確實,消費者會自行維護自己的消費指針(消費者指針是消費者消費過的最后一條數據的序號,下一篇中會詳細講到),RingBuffer也不會去干涉消費者指針的維護,但是它會引用所有消費者的指針,讀取他們的值,以此作為“尾部”的判斷依據。實際上就是最慢的那個消費者作為邊界

我們直接來看代碼,這個是RingBuffer的publishEvent方法,我們看到,它首先取得一個可用的序列號,然后再將數據放入該序列號的對應位置中。

@Override
public void publishEvent(EventTranslator<E> translator)
{
    //1.先通過原子操作,得到一個可用的序號
    final long sequence = sequencer.next();
    //2.將該序號對應位置的元素進行轉換,接着發布
    translateAndPublish(translator, sequence);
}

我們來看看這個序列號是如何取得的。我們先看Sequencer的SingleProducerSequencer實現。這里就是判斷如果生產者新指針的位置是否會超過尾部,如果超過尾部就掛起片刻,后續再嘗試(生產者的等待方式是固定的,不像消費者有一個等待策略)

這里附上幾個圖可能更好理解:(右邊是后續補充的用“畫圖”畫的,對單元格添加一些顏色進行區分)

情況1:隊列已滿,生產者嘗試使用新序號14,但由於(14 - 8 = 6),由於最慢的消費者目前消費的最后一條數據的序號是5,5號之后的數據還沒被消費,6 > 5,所以序號14還不能用。生產者線程掛起,下次再次嘗試。

 情況2:消費者1消費了序號6的數據。(14 - 8 = 6) 不大於 6,這時序號14可用,生產者得到可用的序號。

 

    @Override
    public long next()
    {
        return next(1);
    }

    /**
     * @see Sequencer#next(int)
     */
    @Override
    public long next(int n)
    {
        if (n < 1 || n > bufferSize)
        {
            throw new IllegalArgumentException("n must be > 0 and < bufferSize");
        }

        long nextValue = this.nextValue; //當前RingBuffer的游標,即生產者的位置指針

        long nextSequence = nextValue + n; 
        long wrapPoint = nextSequence - bufferSize; //減掉一圈
        long cachedGatingSequence = this.cachedValue; //上一次緩存的最小的消費者指針

        //條件1:生產者指針的位置超過當前消費最小的指針
        //條件2:為特殊情況,這里先不考慮,詳見:
        if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue)
        {
            cursor.setVolatile(nextValue);  // StoreLoad fence

            long minSequence;
            //再次遍歷所有消費者的指針,確認是否超過
            //如果超過,則等待
            while (wrapPoint > (minSequence = Util.getMinimumSequence(gatingSequences, nextValue)))
            {
                LockSupport.parkNanos(1L); // TODO: Use waitStrategy to spin?
            }

            this.cachedValue = minSequence;
        }

        this.nextValue = nextSequence;

        return nextSequence;
    }

另外對於多生產者的情況,在不會越界的情況下,需要通過CAS來保證獲取序號的原子性。具體可以查看MultiProducerSequencer的next方法。

 消費者指針是如何讀取的?

RingBuffer如何知道有哪些消費者?哪些gatingSequense是從哪里來的?

在構建RingBuffer注冊處理類的時候,就將消費者Sequense注冊到RingBuffer中了。

看代碼的話,定位到gatingSequences在AbastractSequencer,對應的有個addGatingSequenses方法用於注入gatingSequence

public abstract class AbstractSequencer implements Sequencer {
    //...
    protected volatile Sequence[] gatingSequences = new Sequence[0];

    @Override
    public final void addGatingSequences(Sequence... gatingSequences)
    {
        SequenceGroups.addSequences(this, SEQUENCE_UPDATER, this, gatingSequences);
    }

    //...
}

再查看addGatingSequences被調用的地方,即通過RingBuffer的方法,設置到Sequencer中,這個Sequencer是生產者使用的序號管理器

public final class RingBuffer<E> extends RingBufferFields<E> implements Cursored, EventSequencer<E>, EventSink<E> {
    //...
    protected final Sequencer sequencer;
    
    public void addGatingSequences(Sequence... gatingSequences) {
        sequencer.addGatingSequences(gatingSequences);
    }
    //...
}

而RingBuffer的addGatingSequence則在Disruptor配置處理器的時候被調用

public class Disruptor<T> {
    //...
    private final RingBuffer<T> ringBuffer;
    private final ConsumerRepository<T> consumerRepository = new ConsumerRepository<>();
    
    
    public EventHandlerGroup<T> handleEventsWith(final EventProcessor... processors)
    {
        for (final EventProcessor processor : processors)
        {
            consumerRepository.add(processor);
        }

        final Sequence[] sequences = new Sequence[processors.length];
        for (int i = 0; i < processors.length; i++)
        {
            sequences[i] = processors[i].getSequence();
        }

        ringBuffer.addGatingSequences(sequences);

        return new EventHandlerGroup<>(this, consumerRepository, Util.getSequencesFor(processors));
    }
    //...
}

緩存的意義是什么?

我們看到在SiingleProducerSequencer的next方法中,會緩存上一次的消費者最小序列號,這有什么用呢?

用途就是不需要每次都讀取各消費者的序號,只要沒超過上一次的最小值的地方都可以直接分配,如果超過了,則進行再次判斷

為啥讀取最小值不需要保證原子性?

看了這個獲取最小消費序號的,可能會奇怪,為啥這個操作不需要上鎖,這個不是會獲取到舊值嗎?

確實,這個最小值獲取到的時候,實際上數值已經變更。但是由於我們的目的是為了防止指針越位,所以用舊值是沒有問題的。(舊值<=實際上的最小值)

public static long getMinimumSequence(final Sequence[] sequences, long minimum)
    {
        for (int i = 0, n = sequences.length; i < n; i++)
        {
            long value = sequences[i].get();
            minimum = Math.min(minimum, value);
        }

        return minimum;
    }

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM