美團Leaf號段模式詳解


號段模式 VS 批量生成ID

所謂 號段模式 跟 批量生成ID 有什么區別? 乍一看,感覺基本上是一個意思。 其實還是不同的!
 
批量生成ID
一次按需批量生成多個ID( 也就是一批),每次生成都需要訪問數據庫,將數據庫中對應的批 消耗掉,並在內存中記錄下來。
批量生成是 一次獲取一批的數據, 比如一次性一百個ID。 這些id 是可以無序的, 這個時候就不存在所謂的最大值、最小值;當然也可以是順序遞增的,那么其實 只需要最小值、最大值 就可以了吧。但是,它還是 一次性獲取100個, 這個算是它的缺點; 當然,用多少就是多少, 用不完的 如果可以 退還, 那么是非常理想的情況。
優點:避免了每次生成ID都要訪問數據庫並帶來壓力,提高性能
缺點:屬於本地生成策略,存在單點故障,服務重啟造成ID不連續
 
號段模式
是 一次獲取一個 id 段(所謂號, 就是id吧)。什么是 id 段? 就是一個id 的某個取值的區間段,包括最小值、最大值, 直白的說就是 兩個值:最小值、最大值。一般就獲取一個 最大值和步長就好了( 起始值 由兩者做減法而得出)。
有了最小值、最大值,而且 要求id 完全嚴格的順序遞增, 那么客戶端保存一個當前的id值, 使用的時候讓它在 號段區間內遞增,那么就可以 大幅減輕數據庫壓力。
此模式,數據庫中一個表保存 最大值和步長 即可。或者保存一個最小值和步長吧!
 
它如何避免 id 的重復發放? 也就是如何避免id沖突?
每個客戶端每次都重新獲取一個號段,就可以避免id沖突。就是說每次都在 最大值的基礎上增加一個步長,然后返回新的最大值。

號段模式的一般流程

1 客戶端(本地) 嘗試獲取 分布式id,本地存在號段(或稱號段緩存)? 執行2,否則3
2 當前id > 當前號段最大值? 執行3,否則4
3 從數據庫獲取新的號段, 包括步長、最大值,並設置它為 本地 當前號段。
4 從本地 當前號段 獲取新的id。
 
如下圖:
 

 

 

 可見, 訪問數據庫, 減少到只有一個地方, 也就是第三步,讀寫數據庫的頻率從1減小到了1/step。
 

號段模式的新的問題

 一個問題是,如果 客戶端突然異常,號段沒有用完,下次客戶端重啟了,再次訪問數據庫,就會獲取新的號段。導致id 就不連續了。 不連續 其實 一般場景也問題不大,雖然不連續,但是還是可以保證順序遞增的,只是 不再是 依次遞增了罷。再說, 某些業務,可能還要求 不能單調遞增,以 防止 被 猜測出 業務數據呢!
 
另外的 問題是, 在分布式環境, 可能有多個客戶端,這樣做 無法保證全局有序。 比如客戶端C1、C2 先后非常短的時間去獲取號段,C1 獲取的是0-1000, C2 獲取的是1000-2000, c1 使用到3, 而c2 可能是使用到 1002, 那么, 我們的業務上就出現這樣的數據: 雖然相隔時間不多,但是 id 值 相差很大。 如果有更多的客戶端,可能出現的id 差更大!
 
—— 怎么辦? 看場景, 其實 我們可以給客戶端加一個標志,那么此種情況仍然可以保證 至少是 單個客戶端 順序遞增的。而整體上, 可以保證 基本有序。 就是說如果不要求保證 整體、全局有序, 那么這個問題 也不是問題, 忽略即可。 其他情況呢? 那就真不好解決,只能等待一個號段消費完成,再去消費下一個號段,顯然非常不合理。
 
分布式id 一定是id 嗎? 也不一定, 只要是 作用跟id 差不多就可以了。
  

美團Leaf 號段模式

美團Leaf 在普通的號段基礎上,又進一步優化, 提出了 “雙buffer優化” , 但是感覺這個 所謂的“雙buffer優化” , 意義不大, 它僅僅是在 “取DB的時候網絡發生抖動,或者DB發生慢查詢就會導致整個系統的響應時間變慢”, 這種可能性不是沒有, 但是過於吹毛求疵了吧! 當然, 對應美團這樣的公司,可能有這個需求,一般公司 肯定是不需要這樣高要求的!
 
不管怎么樣, 我們先看看 所謂的“雙buffer優化” :
它的理念是 “ DB取號段的過程能夠做到無阻塞,不需要在DB取號段的時候阻塞請求線程,即當號段消費到某個點時就異步的把下一個號段加載到內存中。而不需要等到號段用盡的時候才去更新號段。這樣做就可以很大程度上的降低系統的TP999指標 ”,
 
雙buffer優化的詳細實現如下圖:

 

 

 采用雙buffer的方式,Leaf服務內部有兩個號段緩存區segment。當前號段已下發10%時,如果下一個號段未更新,則另啟一個更新線程去更新下一個號段。當前號段全部下發完后,如果下個號段准備好了則切換到下個號段為當前segment接着下發,循環往復。
  • 每個biz-tag都有消費速度監控,通常推薦segment長度設置為服務高峰期發號QPS的600倍(10分鍾),這樣即使DB宕機,Leaf仍能持續發號10-20分鍾不受影響。
  • 每次請求來臨時都會判斷下個號段的狀態,從而更新此號段,所以偶爾的網絡抖動不會影響下個號段的更新。
 ——
"當前號段已下發10%時,如果下一個號段未更新,則另啟一個更新線程去更新下一個號段。當前號段全部下發完后,如果下個號段准備好了則切換到下個號段為當前segment接着下發,循環往復"
—— 1 什么是“下一個號段未更新” , 如果已經更新呢 ?
—— 2 如果下個號段沒有准備好,咋辦?
——  說話不能只說半截, 考慮需要全面啊。 不能只有if ,沒有else。。
 
當前號段已使用超過10%時,則啟動新線程獲取下一個可用號段,然后緩存到本地? 這樣做, 感覺也是可行的,不過會導致 號段差更大吧。
這個圖,不太友好, 看了相當於沒看。
 
具體還是得看源碼 ↓↓
 

美團Leaf號段模式源碼分析

 
public class SegmentIDGenImpl implements IDGen {
    private static final Logger logger = LoggerFactory.getLogger(SegmentIDGenImpl.class);

    /**
     * IDCache未初始化成功時的異常碼
     */
    private static final long EXCEPTION_ID_IDCACHE_INIT_FALSE = -1;
    /**
     * key不存在時的異常碼
     */
    private static final long EXCEPTION_ID_KEY_NOT_EXISTS = -2;
    /**
     * SegmentBuffer中的兩個Segment均未從DB中裝載時的異常碼
     */
    private static final long EXCEPTION_ID_TWO_SEGMENTS_ARE_NULL = -3;
    /**
     * 最大步長不超過100,0000
     */
    private static final int MAX_STEP = 1000000;
    /**
     * 一個Segment維持時間為15分鍾
     */
    private static final long SEGMENT_DURATION = 15 * 60 * 1000L;
    
    // 最多5個線程,也就是最多5個任務同時執行(因為可能有多個tag,如果tag 只有1、2個,那么沒必要5個線程);idle時間是60s;
    // SynchronousQueue意思是,只能有一個線程執行一個tag的任務,立即執行,執行完立即獲取;其他的都只能阻塞式等待
    private ExecutorService service = new ThreadPoolExecutor(5, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new UpdateThreadFactory());
    private volatile boolean initOK = false;
    
    // 注意它包含了所有的SegmentBuffer,k是業務類型,v是對應的
    // 全局緩存
    private Map<String, SegmentBuffer> cache = new ConcurrentHashMap<String, SegmentBuffer>();
    private IDAllocDao dao;

    public static class UpdateThreadFactory implements ThreadFactory {

        private static int threadInitNumber = 0;

        private static synchronized int nextThreadNum() {
            return threadInitNumber++;
        }

        @Override
        public Thread newThread(Runnable r) {
            return new Thread(r, "Thread-Segment-Update-" + nextThreadNum());
        }
    }

    @Override
    public boolean init() {
        logger.info("Init ...");
        // 確保加載到kv后才初始化成功
        updateCacheFromDb();
        initOK = true;
        updateCacheFromDbAtEveryMinute();
        return initOK;
    }

    private void updateCacheFromDbAtEveryMinute() {// 顧名思義, 每分鍾執行一次。執行什么? 執行 updateCacheFromDb方法
        ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread t = new Thread(r);
                t.setName("check-idCache-thread");
                t.setDaemon(true);
                return t;
            }
        });
        service.scheduleWithFixedDelay(new Runnable() {
            @Override
            public void run() {
                updateCacheFromDb();
            }
        }, 60, 60, TimeUnit.SECONDS);
    }


// 通過數據庫 來更新cache緩存, 也就是Map<String, SegmentBuffer> cache。注意它包含了所有的SegmentBuffer
// 總結就是, 把db新增的tag, 初始化並添加到cache,把db刪除的tag,從cache中刪除;db中更新的呢?這里不管,后面由更新線程去維護到cache
// init的時候執行一次,后面每分鍾執行一次
    private void updateCacheFromDb() {
        logger.info("update cache from db");
        StopWatch sw = new Slf4JStopWatch();
        try {
            List<String> dbTags = dao.getAllTags();// 可能有新加的tags, 這里僅僅加載 tag,不包括value
            if (dbTags == null || dbTags.isEmpty()) {
                return;
            }
            List<String> cacheTags = new ArrayList<String>(cache.keySet());
            Set<String> insertTagsSet = new HashSet<>(dbTags);
            Set<String> removeTagsSet = new HashSet<>(cacheTags);
            //db中新加的tags灌進cache
            for(int i = 0; i < cacheTags.size(); i++){
                String tmp = cacheTags.get(i);
                if(insertTagsSet.contains(tmp)){
                    insertTagsSet.remove(tmp); // 數據庫中已經存在於cache中的,是舊的,先刪除
                }
            }
            for (String tag : insertTagsSet) {// 現在insertTagsSet的部分都是全新的tags
                SegmentBuffer buffer = new SegmentBuffer();// 默認 SegmentBuffer的init 是false
                buffer.setKey(tag);
                Segment segment = buffer.getCurrent();// currentPos 永遠只會在 0/1之間切換
                segment.setValue(new AtomicLong(0));// value表示 實際的分布式的唯一id值
                segment.setMax(0); // max 是 當前號段的 最大值; step 是步長,它會根據消耗的速度自動調整!
                segment.setStep(0);// 默認 value、max、 step 都是0 —— 也就是, 全部重置!
                cache.put(tag, buffer);// 添加到全局的 cache
                logger.info("Add tag {} from db to IdCache, SegmentBuffer {}", tag, buffer);
            }
            //cache中已失效的tags從cache刪除
            for(int i = 0; i < dbTags.size(); i++){
                String tmp = dbTags.get(i);
                if(removeTagsSet.contains(tmp)){ // 兩個contains 都是在尋找重疊部分
                    removeTagsSet.remove(tmp);//cache中存在於數據庫中的,先從緩存中刪除
                }
            }
            for (String tag : removeTagsSet) { // 現在removeTagsSet的部分都是是舊的、已經失效的tags
                cache.remove(tag); // 又從全局的 cache中刪除。
                logger.info("Remove tag {} from IdCache", tag);
            }
        } catch (Exception e) {
            logger.warn("update cache from db exception", e);
        } finally {
            sw.stop("updateCacheFromDb");
        }
    }

    @Override
    public Result get(final String key) {
        if (!initOK) { // 全局cache  是否初始化完成? 一般是不會說沒有完成初始化
            return new Result(EXCEPTION_ID_IDCACHE_INIT_FALSE, Status.EXCEPTION);
        }
        if (cache.containsKey(key)) {
            SegmentBuffer buffer = cache.get(key);
            if (!buffer.isInitOk()) {// 對應key的buffer 是否初始化完成? 第一次調用get方法肯定是false
                synchronized (buffer) { // 沒完成則加鎖, 因為可能有多線程調用此get 方法
                    if (!buffer.isInitOk()) {// 再次判斷, 防止 xxx問題
                        try {
                            updateSegmentFromDb(key, buffer.getCurrent());// 通過數據庫 初始化、更新buffer中 的當前號段
                            logger.info("Init buffer. Update leafkey {} {} from db", key, buffer.getCurrent());
                            buffer.setInitOk(true);// 必須到這里 才算完成buffer的 初始化; 也是全局唯一被調用的地方!
                        } catch (Exception e) {
                            logger.warn("Init buffer {} exception", buffer.getCurrent(), e);
                        }
                    }
                }
            }
            // 到這里,肯定已經完成對應key的buffer的初始化, 所以直接從號段緩存中 獲取id
            return getIdFromSegmentBuffer(cache.get(key));// 直接從號段緩存中 獲取id
        }
        //可能數據庫中不存在對應的key, 那就暫時只能異常返回
        return new Result(EXCEPTION_ID_KEY_NOT_EXISTS, Status.EXCEPTION);
    }

// 通過數據庫 更新buffer中 的號段
// 注意 initOK為true,號段 緩存可能還沒有初始化; 也就是init() 方法實際並沒有對緩存做 初始化。
// 全局被調用的地方, 就兩處
    public void updateSegmentFromDb(String key, Segment segment) {
        StopWatch sw = new Slf4JStopWatch();
        SegmentBuffer buffer = segment.getBuffer();
        LeafAlloc leafAlloc;
        if (!buffer.isInitOk()) {// 再次判斷,第一次調用get方法肯定init=false,於是進入這個if
            leafAlloc = dao.updateMaxIdAndGetLeafAlloc(key);// 直接查詢數據庫
            buffer.setStep(leafAlloc.getStep());
            buffer.setMinStep(leafAlloc.getStep());//leafAlloc中的step為DB中的step; 最小步長,表明真正的步長是可能大於minStep的
        } else if (buffer.getUpdateTimestamp() == 0) {// 如果buffer已經init過,第二次會進這個if, 因為setUpdateTimestamp全局被調用兩次,這個if和下一個else
            leafAlloc = dao.updateMaxIdAndGetLeafAlloc(key);//
            buffer.setUpdateTimestamp(System.currentTimeMillis());// 唯二的全局被調用;為什么有這個if, 為什么特地需要特地調用這個方法
            buffer.setStep(leafAlloc.getStep());// 和上一個if 是不是重復了? 代碼是不是有些啰嗦?
            buffer.setMinStep(leafAlloc.getStep());// minStep的作用僅僅是為了 動態調整, 實際使用的還是 step !!
        } else {// 如果buffer已經init過,第三次會進這個else; 這里其實是根據duration  動態調整步長!
            long duration = System.currentTimeMillis() - buffer.getUpdateTimestamp();
            int nextStep = buffer.getStep();
            if (duration < SEGMENT_DURATION) {// 如果duration 太短了, 那么說明步長有點小,id 消耗快,那么進入調整。
                if (nextStep * 2 > MAX_STEP) { // 如果*2 會超出最大步長,那就不調整了,否則就*2
                    //do nothing
                } else {
                    nextStep = nextStep * 2;
                }
            } else if (duration < SEGMENT_DURATION * 2) { 如果 SEGMENT_DURATION < duration < SEGMENT_DURATION * 2, 那么不管
                //do nothing with nextStep
            } else {// 否則就是 > SEGMENT_DURATION * 2; 那么 步長除以2,但是不能小於min
                nextStep = nextStep / 2 >= buffer.getMinStep() ? nextStep / 2 : nextStep;
            }
            logger.info("leafKey[{}], step[{}], duration[{}mins], nextStep[{}]", key, buffer.getStep(), String.format("%.2f",((double)duration / (1000 * 60))), nextStep);
            LeafAlloc temp = new LeafAlloc();
            temp.setKey(key);
            temp.setStep(nextStep);
            leafAlloc = dao.updateMaxIdByCustomStepAndGetLeafAlloc(temp);// 動態調整記錄到數據庫中~
            buffer.setUpdateTimestamp(System.currentTimeMillis());// 唯二的全局被調用
            buffer.setStep(nextStep);
            buffer.setMinStep(leafAlloc.getStep());//leafAlloc的step為DB中的step
        }
        // must set value before set max
        long value = leafAlloc.getMaxId() - buffer.getStep();//leafAlloc.getMaxId()是后端新的最大值,buffer.getStep()是新的步長
        segment.getValue().set(value);// value 和max 構成了 號段的起止值; value 其實命名並不好,容易讓人誤解!
        segment.setMax(leafAlloc.getMaxId()); // value 、max、step 都設置好了,表示 號段初始化、更新完成
        segment.setStep(buffer.getStep());
        sw.stop("updateSegmentFromDb", key + " " + segment);
    }

// 直接從號段緩存中 獲取id
// 獲取的時候, 需要檢查是否超過了 10%, 超過則 另外啟動任務去異步 加載新的 號段!
    public Result getIdFromSegmentBuffer(final SegmentBuffer buffer) {
        while (true) {
            buffer.rLock().lock();
            try {
                final Segment segment = buffer.getCurrent();
                // 每次get 都會進入這里,但是其實 isNextReady條件 只有設置一次
                if (!buffer.isNextReady() && (segment.getIdle() < 0.9 * segment.getStep()) && buffer.getThreadRunning().compareAndSet(false, true)) {
                    service.execute(new Runnable() {// 異步執行
                        @Override
                        public void run() { // 進入了這個方法,buffer.getThreadRunning 已經被cas設置為了true; 保證了 這個方法不會有並發
                            Segment next = buffer.getSegments()[buffer.nextPos()];
                            boolean updateOk = false;
                            try {
                                updateSegmentFromDb(buffer.getKey(), next);// next 是指使用 下一個號段;這里也就是更新下一個號段
                                updateOk = true;
                                logger.info("update segment {} from db {}", buffer.getKey(), next);
                            } catch (Exception e) {
                                logger.warn(buffer.getKey() + " updateSegmentFromDb exception", e);
                            } finally {
                                if (updateOk) {
                                    buffer.wLock().lock();// 這里使用了 寫鎖, 是因為有寫 的並發競爭
                                    buffer.setNextReady(true);// 表示下一個號段已經准備好!
                                    buffer.getThreadRunning().set(false);// 操作完畢,又把buffer線程狀態設置為false,因為task 已經執行完
                                    buffer.wLock().unlock();
                                } else {
                                    buffer.getThreadRunning().set(false);// 不管怎么樣,需要把buffer線程狀態設置為false,因為task 已經執行完
                                }
                            }
                        }
                    });
                }
                long value = segment.getValue().getAndIncrement();// 這里可能有讀的並發,
                if (value < segment.getMax()) { // 未達到號段的最大值,也就是最右端
                    return new Result(value, Status.SUCCESS);// 到這里, 就成功返回;    一般情況 就是進入這個if 然后返回!
                }
            } finally {
                buffer.rLock().unlock(); // 不管怎么樣,這里釋放讀鎖
            }
            //什么情況會需要wait? 上面的方法沒有進入上一個if已經達到號段的最大值! 或者出現異常,當然一般不會有異常;
            //達到號段的最大值 意味着需要使用下一個號段,
            // waitAndSleep其實是等待 正在執行的線程把任務執行完成;具體是 判斷並自旋10000,然后超過10000,那就每次sleep 10毫秒,然后退出..
            waitAndSleep(buffer);// 總之,waitAndSleep保證沒有正在執行的更新線程; 但也不是100% 保證!
            buffer.wLock().lock();// 執行寫鎖, 排斥任何其他的線程!
            try {
                final Segment segment = buffer.getCurrent();// 能繼續使用同一個號段嗎?
                long value = segment.getValue().getAndIncrement();
                if (value < segment.getMax()) {// 為什么需要再次判斷? max會在已經確定的情況下變化? 也不會,大概是保險起見?
                    return new Result(value, Status.SUCCESS);
                }
                if (buffer.isNextReady()) {// 已經達到號段的最大值,此時前面必然已經完成了新號段的獲取, 肯定進入此判斷
                    buffer.switchPos(); // 全局唯一的 切換!   雖然完成了切換, 但是不立即獲取value,而是等待下一次的循環!
                    buffer.setNextReady(false);
                } else {// 這種情況, 不太可能發生吧!
                    logger.error("Both two segments in {} are not ready!", buffer);
                    return new Result(EXCEPTION_ID_TWO_SEGMENTS_ARE_NULL, Status.EXCEPTION);
                }
            } finally {
                buffer.wLock().unlock();
            }
        }
    }

    private void waitAndSleep(SegmentBuffer buffer) {
        int roll = 0;
        while (buffer.getThreadRunning().get()) {
            roll += 1;
            if(roll > 10000) {
                try {
                    TimeUnit.MILLISECONDS.sleep(10);
                    break;// 不管怎么樣,需要退出, 不能在這個方法等太久。 就是說, 極端情況下, 此方法不能保證 等待正在執行的任務完成。
                } catch (InterruptedException e) {
                    logger.warn("Thread {} Interrupted",Thread.currentThread().getName());
                    break;
                }
            }
        }
    }

    public List<LeafAlloc> getAllLeafAllocs() {
        return dao.getAllLeafAllocs();
    }

    public Map<String, SegmentBuffer> getCache() {
        return cache;
    }

    public IDAllocDao getDao() {
        return dao;
    }

    public void setDao(IDAllocDao dao) {
        this.dao = dao;
    }
}

代碼質量還是不錯的,簡直是神乎其神的刀法!

 

美團leaf號段模式的部署和使用

 
對於數據庫的leaf_alloc 表而言,我們只會更新其max_id 字段,不會更新step;
@Update("UPDATE leaf_alloc SET max_id = max_id + #{step} WHERE biz_tag = #{key}")
@Update("UPDATE leaf_alloc SET max_id = max_id + step WHERE biz_tag = #{tag}")

step 是初始化完成的,默認應該是使用方。

官方提供了leaf-core、leaf-server。 其中 leaf-core是核心算法的實現, 但是它主要依賴於mybatis、數據庫,但是它不能直接運行。 leaf-server 調用了它, 並負責完成leaf_alloc 表的初始化。對於號段模式,主要就是在SegmentService, 其核心作用就是 實例化了一個IDGen(具體是SegmentIDGenImpl),然后 使用它對外提供分布式id服務。 

前面說錯了,leaf_alloc 表的初始化並不是 leaf-server 程序完成的,而是需要手動新增自己需要的業務數據, 其實就是一條insert 語句,參見官方:

insert into leaf_alloc(biz_tag, max_id, step, description) values('leaf-segment-test', 1, 2000, 'Test leaf Segment Mode Get Id')

這個務必記住。

 

實際怎么做?

因為它不知道你的業務tag和step 是多少啊。 tag 當然是自定義, step 對於高並發的項目就1000,一般小項目就100夠了吧! 

 

所以呢,想簡單使用的話,一般直接部署一個leaf-server就好了。

 

leaf-server 是必須的嗎

leaf-server 可以獨立運行和工作,並提供了rest http接口來獲取id,好像也非常合理好用,我們直接使用它就可以了,那么它是不是必須的呢?肯定不是。原因是:

1 它本身有可能存在單點的問題了。 

2 如果你不想通過http方式獲取id(這樣顯然有性能損失)

3 它強制依賴了druid

 

我認為我們可以適當做一些改造、集成。 就是把leaf-server 當做一個普通jar 引入,或者只用其中需要的幾個類,或者,完全不用它,就自己寫; 因為它強制依賴了druid,如果我們已經使用了比如hikari 這樣的數據連接池, 需要統一連接池, 不想引入其他的連接池, 那么我們完全可以 按照自己的方式實例化IDGen !

 
 
參考
 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM