美團Leaf——全局序列生成器


Leaf的Github地址:

https://github.com/Meituan-Dianping/Leaf

Leaf美團技術團隊博客地址:

https://tech.meituan.com/2017/04/21/mt-leaf.html

關於Leaf的使用手冊、架構說明、Segment和Snowflake的特點和時鍾回撥解決辦法,參考上面的鏈接內容都能獲得到答案。拒絕重復搬磚。

在本篇博客里我想就Leaf的Segment模式的源碼實現做個簡單的注釋。代碼分支:master。

1.Segment

Segment是SegmentBuffer的成員屬性,cache中存儲的是SegmentBuffer,Segment是雙buffer的實現。

2.初始化Segment

在初始化Segment時,主要做兩件事情。1是根據數據庫表中配置的busi_tag更新緩存;2是添加定時任務,定時(一分鍾間隔)更新緩存。

    @Override
    public boolean init() {
        logger.info("Init ...");
        // 確保加載到kv后才初始化成功
        updateCacheFromDb();
        initOK = true;
        updateCacheFromDbAtEveryMinute();
        return initOK;
    }

updateCacheFromDb():

private void updateCacheFromDb() {
        logger.info("update cache from db");
        StopWatch sw = new Slf4JStopWatch();
        try {
            //從配置的數據源中加載biz_tag
            List<String> dbTags = dao.getAllTags();
            if (dbTags == null || dbTags.isEmpty()) {
                return;
            }
            //cache中的biz_tag.初始為空.
            List<String> cacheTags = new ArrayList<String>(cache.keySet());
            //存儲本次更新操作,要從DB中加載進cache的biz_tag.
            Set<String> insertTagsSet = new HashSet<>(dbTags);
            //存儲失效的biz_tag:存在於cache,不存在於DB.
            Set<String> removeTagsSet = new HashSet<>(cacheTags);
            //過濾去重,得到需要存入進cache的biz_tag
            for(int i = 0; i < cacheTags.size(); i++){
                String tmp = cacheTags.get(i);
                if(insertTagsSet.contains(tmp)){
                    insertTagsSet.remove(tmp);
                }
            }
            //存儲進cache
            for (String tag : insertTagsSet) {
                SegmentBuffer buffer = new SegmentBuffer();
                buffer.setKey(tag);
                Segment segment = buffer.getCurrent();
                segment.setValue(new AtomicLong(0));
                //這里的max、step均為0.所以這一步僅僅將biz_tag存儲進了cache,並沒有對SegmentBuffer執行初始化操作.
                segment.setMax(0);
                segment.setStep(0);
                cache.put(tag, buffer);
                logger.info("Add tag {} from db to IdCache, SegmentBuffer {}", tag, buffer);
            }
            //cache中已失效的biz_tag從cache刪除
            for(int i = 0; i < dbTags.size(); i++){
                String tmp = dbTags.get(i);
                if(removeTagsSet.contains(tmp)){
                    removeTagsSet.remove(tmp);
                }
            }
            for (String tag : removeTagsSet) {
                cache.remove(tag);
                logger.info("Remove tag {} from IdCache", tag);
            }
        } catch (Exception e) {
            logger.warn("update cache from db exception", e);
        } finally {
            sw.stop("updateCacheFromDb");
        }
    }
View Code

 

3.獲取到下一個序列號

    @Override
    public Result get(final String key) {
        if (!initOK) {
            return new Result(EXCEPTION_ID_IDCACHE_INIT_FALSE, Status.EXCEPTION);
        }
        //判斷cache中是否有這個key——bizTag
        if (cache.containsKey(key)) {
            SegmentBuffer buffer = cache.get(key);
            //未初始化,
            if (!buffer.isInitOk()) {
                synchronized (buffer) {
                    //再次判斷,以防重復初始化.
                    if (!buffer.isInitOk()) {
                        try {
                            //對SegmentBuffer當前的Segment進行初始化
                            updateSegmentFromDb(key, buffer.getCurrent());
                            logger.info("Init buffer. Update leafkey {} {} from db", key, buffer.getCurrent());
                            buffer.setInitOk(true);
                        } catch (Exception e) {
                            logger.warn("Init buffer {} exception", buffer.getCurrent(), e);
                        }
                    }
                }
            }
            return getIdFromSegmentBuffer(cache.get(key));
        }
        return new Result(EXCEPTION_ID_KEY_NOT_EXISTS, Status.EXCEPTION);
    }

updateSegmentFromDb();

public void updateSegmentFromDb(String key, Segment segment) {
        StopWatch sw = new Slf4JStopWatch();
        SegmentBuffer buffer = segment.getBuffer();
        LeafAlloc leafAlloc;
        //1.SegmentBuffer尚未初始化,則SegmentBuffer的step等於數據庫中的step;
        if (!buffer.isInitOk()) {
            leafAlloc = dao.updateMaxIdAndGetLeafAlloc(key);
            buffer.setStep(leafAlloc.getStep());
            buffer.setMinStep(leafAlloc.getStep());//leafAlloc中的step為DB中的step
        } else if (buffer.getUpdateTimestamp() == 0) {
            //2.SegmentBuffer已經初始化完成:
            leafAlloc = dao.updateMaxIdAndGetLeafAlloc(key);
            buffer.setUpdateTimestamp(System.currentTimeMillis());
            buffer.setStep(leafAlloc.getStep());
            buffer.setMinStep(leafAlloc.getStep());//leafAlloc中的step為DB中的step
        } else {
            //2.SegmentBuffer已經初始化完成:
            //a)    SegmentBuffer的存活時間小於15分鍾:
            //   i.    如果SegmentBuffer當前的step*2大於最大值(一百萬),則什么也不做,不再擴大step
            //  ii.    否則,SegmentBuffer的step擴大為原來的2倍。
            //b)    SegmentBuffer的存活時間小於30分鍾:什么也不做.
            //c)    SegmentBuffer的存活時間在15至30分鍾之間:
            //   i.    如果SegmentBuffer的step/2 大於等於數據庫中的step,那么就將SegmentBuffer的step值變為原來的二分之一,否則什么也不做。
            long duration = System.currentTimeMillis() - buffer.getUpdateTimestamp();
            int nextStep = buffer.getStep();
            if (duration < SEGMENT_DURATION) {
                if (nextStep * 2 > MAX_STEP) {
                    //do nothing
                } else {
                    nextStep = nextStep * 2;
                }
            } else if (duration < SEGMENT_DURATION * 2) {
                //do nothing with nextStep
            } else {
                nextStep = nextStep / 2 >= buffer.getMinStep() ? nextStep / 2 : nextStep;
            }
            logger.info("leafKey[{}], step[{}], duration[{}mins], nextStep[{}]", key, buffer.getStep(), String.format("%.2f",((double)duration / (1000 * 60))), nextStep);
            LeafAlloc temp = new LeafAlloc();
            temp.setKey(key);
            temp.setStep(nextStep);
            leafAlloc = dao.updateMaxIdByCustomStepAndGetLeafAlloc(temp);
            buffer.setUpdateTimestamp(System.currentTimeMillis());
            buffer.setStep(nextStep);
            buffer.setMinStep(leafAlloc.getStep());//leafAlloc的step為DB中的step
        }
        // must set value before set max
        long value = leafAlloc.getMaxId() - buffer.getStep();
        segment.getValue().set(value);
        segment.setMax(leafAlloc.getMaxId());
        segment.setStep(buffer.getStep());
        sw.stop("updateSegmentFromDb", key + " " + segment);
    }
View Code

 

getIdFromSegmentBuffer();

    public Result getIdFromSegmentBuffer(final SegmentBuffer buffer) {
        while (true) {
            //讀鎖
            buffer.rLock().lock();
            try {
                //獲取到當前正在使用的Segment.
                final Segment segment = buffer.getCurrent();
                //如果下一個Segment沒有初始化完成,當前Segment的ID閑置數量小於9成,並且沒有在執行Segment初始化操作,就去執行對下一個Segment的初始化.
                //只要當前號段消費數量達到了10%,就對下一個號段進行初始化.
                if (!buffer.isNextReady() && (segment.getIdle() < 0.9 * segment.getStep()) && buffer.getThreadRunning().compareAndSet(false, true)) {
                    service.execute(new Runnable() {
                        @Override
                        public void run() {
                            Segment next = buffer.getSegments()[buffer.nextPos()];
                            boolean updateOk = false;
                            try {
                                //根據biz_tag,根據DB對Segment初始化
                                updateSegmentFromDb(buffer.getKey(), next);
                                updateOk = true;
                                logger.info("update segment {} from db {}", buffer.getKey(), next);
                            } catch (Exception e) {
                                logger.warn(buffer.getKey() + " updateSegmentFromDb exception", e);
                            } finally {
                                if (updateOk) {
                                    //獲取寫鎖
                                    buffer.wLock().lock();
                                    buffer.setNextReady(true);
                                    buffer.getThreadRunning().set(false);
                                    buffer.wLock().unlock();
                                } else {
                                    buffer.getThreadRunning().set(false);
                                }
                            }
                        }
                    });
                }
                //Segment的value記錄了當前已分配的ID最大值,加一后返回.原子性操作.
                long value = segment.getValue().getAndIncrement();
                //如果超過了當前Segment緩存的最大值(當前Segment號段已經消費完畢),就進入阻塞等待.
                if (value < segment.getMax()) {
                    return new Result(value, Status.SUCCESS);
                }
            } finally {
                buffer.rLock().unlock();
            }
            //當前號段已經消費完了,下個號段正在有別的線程在執行初始化,進入等待.
            //做空轉.
            waitAndSleep(buffer);
            buffer.wLock().lock();
            //下面這段代碼在做空轉之前已經執行過,這里為什么還要再次執行呢?
            //在當前線程空轉期間,可能已經有別的線程執行完畢了對下個Segment的初始化操作,並進行了切換.防止出現多次切換.
            try {
                final Segment segment = buffer.getCurrent();
                long value = segment.getValue().getAndIncrement();
                if (value < segment.getMax()) {
                    return new Result(value, Status.SUCCESS);
                }
                //當前Segment緩存的號段已經消費完了,下一個Segment初始化好了就進行切換
                if (buffer.isNextReady()) {
                    //將Segment切換為下一個.
                    buffer.switchPos();
                    buffer.setNextReady(false);
                } else {
                    //ERROR.當前Segment號段滿了,下一個號段還未准備好.
                    logger.error("Both two segments in {} are not ready!", buffer);
                    return new Result(EXCEPTION_ID_TWO_SEGMENTS_ARE_NULL, Status.EXCEPTION);
                }
            } finally {
                buffer.wLock().unlock();
            }
        }
    }
View Code

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM