號段模式 VS 批量生成ID
號段模式的一般流程

號段模式的新的問題
美團Leaf 號段模式

- 每個biz-tag都有消費速度監控,通常推薦segment長度設置為服務高峰期發號QPS的600倍(10分鍾),這樣即使DB宕機,Leaf仍能持續發號10-20分鍾不受影響。
- 每次請求來臨時都會判斷下個號段的狀態,從而更新此號段,所以偶爾的網絡抖動不會影響下個號段的更新。
美團Leaf號段模式源碼分析
public class SegmentIDGenImpl implements IDGen { private static final Logger logger = LoggerFactory.getLogger(SegmentIDGenImpl.class); /** * IDCache未初始化成功時的異常碼 */ private static final long EXCEPTION_ID_IDCACHE_INIT_FALSE = -1; /** * key不存在時的異常碼 */ private static final long EXCEPTION_ID_KEY_NOT_EXISTS = -2; /** * SegmentBuffer中的兩個Segment均未從DB中裝載時的異常碼 */ private static final long EXCEPTION_ID_TWO_SEGMENTS_ARE_NULL = -3; /** * 最大步長不超過100,0000 */ private static final int MAX_STEP = 1000000; /** * 一個Segment維持時間為15分鍾 */ private static final long SEGMENT_DURATION = 15 * 60 * 1000L; // 最多5個線程,也就是最多5個任務同時執行(因為可能有多個tag,如果tag 只有1、2個,那么沒必要5個線程);idle時間是60s; // SynchronousQueue意思是,只能有一個線程執行一個tag的任務,立即執行,執行完立即獲取;其他的都只能阻塞式等待 private ExecutorService service = new ThreadPoolExecutor(5, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new UpdateThreadFactory()); private volatile boolean initOK = false; // 注意它包含了所有的SegmentBuffer,k是業務類型,v是對應的 // 全局緩存 private Map<String, SegmentBuffer> cache = new ConcurrentHashMap<String, SegmentBuffer>(); private IDAllocDao dao; public static class UpdateThreadFactory implements ThreadFactory { private static int threadInitNumber = 0; private static synchronized int nextThreadNum() { return threadInitNumber++; } @Override public Thread newThread(Runnable r) { return new Thread(r, "Thread-Segment-Update-" + nextThreadNum()); } } @Override public boolean init() { logger.info("Init ..."); // 確保加載到kv后才初始化成功 updateCacheFromDb(); initOK = true; updateCacheFromDbAtEveryMinute(); return initOK; } private void updateCacheFromDbAtEveryMinute() {// 顧名思義, 每分鍾執行一次。執行什么? 執行 updateCacheFromDb方法 ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread t = new Thread(r); t.setName("check-idCache-thread"); t.setDaemon(true); return t; } }); service.scheduleWithFixedDelay(new Runnable() { @Override public void run() { updateCacheFromDb(); } }, 60, 60, TimeUnit.SECONDS); } // 通過數據庫 來更新cache緩存, 也就是Map<String, SegmentBuffer> cache。注意它包含了所有的SegmentBuffer // 總結就是, 把db新增的tag, 初始化並添加到cache,把db刪除的tag,從cache中刪除;db中更新的呢?這里不管,后面由更新線程去維護到cache // init的時候執行一次,后面每分鍾執行一次 private void updateCacheFromDb() { logger.info("update cache from db"); StopWatch sw = new Slf4JStopWatch(); try { List<String> dbTags = dao.getAllTags();// 可能有新加的tags, 這里僅僅加載 tag,不包括value if (dbTags == null || dbTags.isEmpty()) { return; } List<String> cacheTags = new ArrayList<String>(cache.keySet()); Set<String> insertTagsSet = new HashSet<>(dbTags); Set<String> removeTagsSet = new HashSet<>(cacheTags); //db中新加的tags灌進cache for(int i = 0; i < cacheTags.size(); i++){ String tmp = cacheTags.get(i); if(insertTagsSet.contains(tmp)){ insertTagsSet.remove(tmp); // 數據庫中已經存在於cache中的,是舊的,先刪除 } } for (String tag : insertTagsSet) {// 現在insertTagsSet的部分都是全新的tags SegmentBuffer buffer = new SegmentBuffer();// 默認 SegmentBuffer的init 是false buffer.setKey(tag); Segment segment = buffer.getCurrent();// currentPos 永遠只會在 0/1之間切換 segment.setValue(new AtomicLong(0));// value表示 實際的分布式的唯一id值 segment.setMax(0); // max 是 當前號段的 最大值; step 是步長,它會根據消耗的速度自動調整! segment.setStep(0);// 默認 value、max、 step 都是0 —— 也就是, 全部重置! cache.put(tag, buffer);// 添加到全局的 cache logger.info("Add tag {} from db to IdCache, SegmentBuffer {}", tag, buffer); } //cache中已失效的tags從cache刪除 for(int i = 0; i < dbTags.size(); i++){ String tmp = dbTags.get(i); if(removeTagsSet.contains(tmp)){ // 兩個contains 都是在尋找重疊部分 removeTagsSet.remove(tmp);//cache中存在於數據庫中的,先從緩存中刪除 } } for (String tag : removeTagsSet) { // 現在removeTagsSet的部分都是是舊的、已經失效的tags cache.remove(tag); // 又從全局的 cache中刪除。 logger.info("Remove tag {} from IdCache", tag); } } catch (Exception e) { logger.warn("update cache from db exception", e); } finally { sw.stop("updateCacheFromDb"); } } @Override public Result get(final String key) { if (!initOK) { // 全局cache 是否初始化完成? 一般是不會說沒有完成初始化 return new Result(EXCEPTION_ID_IDCACHE_INIT_FALSE, Status.EXCEPTION); } if (cache.containsKey(key)) { SegmentBuffer buffer = cache.get(key); if (!buffer.isInitOk()) {// 對應key的buffer 是否初始化完成? 第一次調用get方法肯定是false synchronized (buffer) { // 沒完成則加鎖, 因為可能有多線程調用此get 方法 if (!buffer.isInitOk()) {// 再次判斷, 防止 xxx問題 try { updateSegmentFromDb(key, buffer.getCurrent());// 通過數據庫 初始化、更新buffer中 的當前號段 logger.info("Init buffer. Update leafkey {} {} from db", key, buffer.getCurrent()); buffer.setInitOk(true);// 必須到這里 才算完成buffer的 初始化; 也是全局唯一被調用的地方! } catch (Exception e) { logger.warn("Init buffer {} exception", buffer.getCurrent(), e); } } } } // 到這里,肯定已經完成對應key的buffer的初始化, 所以直接從號段緩存中 獲取id return getIdFromSegmentBuffer(cache.get(key));// 直接從號段緩存中 獲取id } //可能數據庫中不存在對應的key, 那就暫時只能異常返回 return new Result(EXCEPTION_ID_KEY_NOT_EXISTS, Status.EXCEPTION); } // 通過數據庫 更新buffer中 的號段 // 注意 initOK為true,號段 緩存可能還沒有初始化; 也就是init() 方法實際並沒有對緩存做 初始化。 // 全局被調用的地方, 就兩處 public void updateSegmentFromDb(String key, Segment segment) { StopWatch sw = new Slf4JStopWatch(); SegmentBuffer buffer = segment.getBuffer(); LeafAlloc leafAlloc; if (!buffer.isInitOk()) {// 再次判斷,第一次調用get方法肯定init=false,於是進入這個if leafAlloc = dao.updateMaxIdAndGetLeafAlloc(key);// 直接查詢數據庫 buffer.setStep(leafAlloc.getStep()); buffer.setMinStep(leafAlloc.getStep());//leafAlloc中的step為DB中的step; 最小步長,表明真正的步長是可能大於minStep的 } else if (buffer.getUpdateTimestamp() == 0) {// 如果buffer已經init過,第二次會進這個if, 因為setUpdateTimestamp全局被調用兩次,這個if和下一個else leafAlloc = dao.updateMaxIdAndGetLeafAlloc(key);// buffer.setUpdateTimestamp(System.currentTimeMillis());// 唯二的全局被調用;為什么有這個if, 為什么特地需要特地調用這個方法 buffer.setStep(leafAlloc.getStep());// 和上一個if 是不是重復了? 代碼是不是有些啰嗦? buffer.setMinStep(leafAlloc.getStep());// minStep的作用僅僅是為了 動態調整, 實際使用的還是 step !! } else {// 如果buffer已經init過,第三次會進這個else; 這里其實是根據duration 動態調整步長! long duration = System.currentTimeMillis() - buffer.getUpdateTimestamp(); int nextStep = buffer.getStep(); if (duration < SEGMENT_DURATION) {// 如果duration 太短了, 那么說明步長有點小,id 消耗快,那么進入調整。 if (nextStep * 2 > MAX_STEP) { // 如果*2 會超出最大步長,那就不調整了,否則就*2 //do nothing } else { nextStep = nextStep * 2; } } else if (duration < SEGMENT_DURATION * 2) { 如果 SEGMENT_DURATION < duration < SEGMENT_DURATION * 2, 那么不管 //do nothing with nextStep } else {// 否則就是 > SEGMENT_DURATION * 2; 那么 步長除以2,但是不能小於min nextStep = nextStep / 2 >= buffer.getMinStep() ? nextStep / 2 : nextStep; } logger.info("leafKey[{}], step[{}], duration[{}mins], nextStep[{}]", key, buffer.getStep(), String.format("%.2f",((double)duration / (1000 * 60))), nextStep); LeafAlloc temp = new LeafAlloc(); temp.setKey(key); temp.setStep(nextStep); leafAlloc = dao.updateMaxIdByCustomStepAndGetLeafAlloc(temp);// 動態調整記錄到數據庫中~ buffer.setUpdateTimestamp(System.currentTimeMillis());// 唯二的全局被調用 buffer.setStep(nextStep); buffer.setMinStep(leafAlloc.getStep());//leafAlloc的step為DB中的step } // must set value before set max long value = leafAlloc.getMaxId() - buffer.getStep();//leafAlloc.getMaxId()是后端新的最大值,buffer.getStep()是新的步長 segment.getValue().set(value);// value 和max 構成了 號段的起止值; value 其實命名並不好,容易讓人誤解! segment.setMax(leafAlloc.getMaxId()); // value 、max、step 都設置好了,表示 號段初始化、更新完成 segment.setStep(buffer.getStep()); sw.stop("updateSegmentFromDb", key + " " + segment); } // 直接從號段緩存中 獲取id // 獲取的時候, 需要檢查是否超過了 10%, 超過則 另外啟動任務去異步 加載新的 號段! public Result getIdFromSegmentBuffer(final SegmentBuffer buffer) { while (true) { buffer.rLock().lock(); try { final Segment segment = buffer.getCurrent(); // 每次get 都會進入這里,但是其實 isNextReady條件 只有設置一次 if (!buffer.isNextReady() && (segment.getIdle() < 0.9 * segment.getStep()) && buffer.getThreadRunning().compareAndSet(false, true)) { service.execute(new Runnable() {// 異步執行 @Override public void run() { // 進入了這個方法,buffer.getThreadRunning 已經被cas設置為了true; 保證了 這個方法不會有並發 Segment next = buffer.getSegments()[buffer.nextPos()]; boolean updateOk = false; try { updateSegmentFromDb(buffer.getKey(), next);// next 是指使用 下一個號段;這里也就是更新下一個號段 updateOk = true; logger.info("update segment {} from db {}", buffer.getKey(), next); } catch (Exception e) { logger.warn(buffer.getKey() + " updateSegmentFromDb exception", e); } finally { if (updateOk) { buffer.wLock().lock();// 這里使用了 寫鎖, 是因為有寫 的並發競爭 buffer.setNextReady(true);// 表示下一個號段已經准備好! buffer.getThreadRunning().set(false);// 操作完畢,又把buffer線程狀態設置為false,因為task 已經執行完 buffer.wLock().unlock(); } else { buffer.getThreadRunning().set(false);// 不管怎么樣,需要把buffer線程狀態設置為false,因為task 已經執行完 } } } }); } long value = segment.getValue().getAndIncrement();// 這里可能有讀的並發, if (value < segment.getMax()) { // 未達到號段的最大值,也就是最右端 return new Result(value, Status.SUCCESS);// 到這里, 就成功返回; 一般情況 就是進入這個if 然后返回! } } finally { buffer.rLock().unlock(); // 不管怎么樣,這里釋放讀鎖 } //什么情況會需要wait? 上面的方法沒有進入上一個if已經達到號段的最大值! 或者出現異常,當然一般不會有異常; //達到號段的最大值 意味着需要使用下一個號段, // waitAndSleep其實是等待 正在執行的線程把任務執行完成;具體是 判斷並自旋10000,然后超過10000,那就每次sleep 10毫秒,然后退出.. waitAndSleep(buffer);// 總之,waitAndSleep保證沒有正在執行的更新線程; 但也不是100% 保證! buffer.wLock().lock();// 執行寫鎖, 排斥任何其他的線程! try { final Segment segment = buffer.getCurrent();// 能繼續使用同一個號段嗎? long value = segment.getValue().getAndIncrement(); if (value < segment.getMax()) {// 為什么需要再次判斷? max會在已經確定的情況下變化? 也不會,大概是保險起見? return new Result(value, Status.SUCCESS); } if (buffer.isNextReady()) {// 已經達到號段的最大值,此時前面必然已經完成了新號段的獲取, 肯定進入此判斷 buffer.switchPos(); // 全局唯一的 切換! 雖然完成了切換, 但是不立即獲取value,而是等待下一次的循環! buffer.setNextReady(false); } else {// 這種情況, 不太可能發生吧! logger.error("Both two segments in {} are not ready!", buffer); return new Result(EXCEPTION_ID_TWO_SEGMENTS_ARE_NULL, Status.EXCEPTION); } } finally { buffer.wLock().unlock(); } } } private void waitAndSleep(SegmentBuffer buffer) { int roll = 0; while (buffer.getThreadRunning().get()) { roll += 1; if(roll > 10000) { try { TimeUnit.MILLISECONDS.sleep(10); break;// 不管怎么樣,需要退出, 不能在這個方法等太久。 就是說, 極端情況下, 此方法不能保證 等待正在執行的任務完成。 } catch (InterruptedException e) { logger.warn("Thread {} Interrupted",Thread.currentThread().getName()); break; } } } } public List<LeafAlloc> getAllLeafAllocs() { return dao.getAllLeafAllocs(); } public Map<String, SegmentBuffer> getCache() { return cache; } public IDAllocDao getDao() { return dao; } public void setDao(IDAllocDao dao) { this.dao = dao; } }
代碼質量還是不錯的,簡直是神乎其神的刀法!
美團leaf號段模式的部署和使用
@Update("UPDATE leaf_alloc SET max_id = max_id + #{step} WHERE biz_tag = #{key}") @Update("UPDATE leaf_alloc SET max_id = max_id + step WHERE biz_tag = #{tag}")
step 是初始化完成的,默認應該是使用方。
官方提供了leaf-core、leaf-server。 其中 leaf-core是核心算法的實現, 但是它主要依賴於mybatis、數據庫,但是它不能直接運行。 leaf-server 調用了它, 並負責完成leaf_alloc 表的初始化。對於號段模式,主要就是在SegmentService, 其核心作用就是 實例化了一個IDGen(具體是SegmentIDGenImpl),然后 使用它對外提供分布式id服務。
前面說錯了,leaf_alloc 表的初始化並不是 leaf-server 程序完成的,而是需要手動新增自己需要的業務數據, 其實就是一條insert 語句,參見官方:
insert into leaf_alloc(biz_tag, max_id, step, description) values('leaf-segment-test', 1, 2000, 'Test leaf Segment Mode Get Id')
這個務必記住。
實際怎么做?
因為它不知道你的業務tag和step 是多少啊。 tag 當然是自定義, step 對於高並發的項目就1000,一般小項目就100夠了吧!
所以呢,想簡單使用的話,一般直接部署一個leaf-server就好了。
leaf-server 是必須的嗎
leaf-server 可以獨立運行和工作,並提供了rest http接口來獲取id,好像也非常合理好用,我們直接使用它就可以了,那么它是不是必須的呢?肯定不是。原因是:
1 它本身有可能存在單點的問題了。
2 如果你不想通過http方式獲取id(這樣顯然有性能損失)
3 它強制依賴了druid
我認為我們可以適當做一些改造、集成。 就是把leaf-server 當做一個普通jar 引入,或者只用其中需要的幾個類,或者,完全不用它,就自己寫; 因為它強制依賴了druid,如果我們已經使用了比如hikari 這樣的數據連接池, 需要統一連接池, 不想引入其他的連接池, 那么我們完全可以 按照自己的方式實例化IDGen !