Leaf的Github地址:
https://github.com/Meituan-Dianping/Leaf
Leaf美團技術團隊博客地址:
https://tech.meituan.com/2017/04/21/mt-leaf.html
關於Leaf的使用手冊、架構說明、Segment和Snowflake的特點和時鍾回撥解決辦法,參考上面的鏈接內容都能獲得到答案。拒絕重復搬磚。
在本篇博客里我想就Leaf的Segment模式的源碼實現做個簡單的注釋。代碼分支:master。
1.Segment
Segment是SegmentBuffer的成員屬性,cache中存儲的是SegmentBuffer,Segment是雙buffer的實現。
2.初始化Segment
在初始化Segment時,主要做兩件事情。1是根據數據庫表中配置的busi_tag更新緩存;2是添加定時任務,定時(一分鍾間隔)更新緩存。
@Override public boolean init() { logger.info("Init ..."); // 確保加載到kv后才初始化成功 updateCacheFromDb(); initOK = true; updateCacheFromDbAtEveryMinute(); return initOK; }
updateCacheFromDb():
private void updateCacheFromDb() { logger.info("update cache from db"); StopWatch sw = new Slf4JStopWatch(); try { //從配置的數據源中加載biz_tag List<String> dbTags = dao.getAllTags(); if (dbTags == null || dbTags.isEmpty()) { return; } //cache中的biz_tag.初始為空. List<String> cacheTags = new ArrayList<String>(cache.keySet()); //存儲本次更新操作,要從DB中加載進cache的biz_tag. Set<String> insertTagsSet = new HashSet<>(dbTags); //存儲失效的biz_tag:存在於cache,不存在於DB. Set<String> removeTagsSet = new HashSet<>(cacheTags); //過濾去重,得到需要存入進cache的biz_tag for(int i = 0; i < cacheTags.size(); i++){ String tmp = cacheTags.get(i); if(insertTagsSet.contains(tmp)){ insertTagsSet.remove(tmp); } } //存儲進cache for (String tag : insertTagsSet) { SegmentBuffer buffer = new SegmentBuffer(); buffer.setKey(tag); Segment segment = buffer.getCurrent(); segment.setValue(new AtomicLong(0)); //這里的max、step均為0.所以這一步僅僅將biz_tag存儲進了cache,並沒有對SegmentBuffer執行初始化操作. segment.setMax(0); segment.setStep(0); cache.put(tag, buffer); logger.info("Add tag {} from db to IdCache, SegmentBuffer {}", tag, buffer); } //cache中已失效的biz_tag從cache刪除 for(int i = 0; i < dbTags.size(); i++){ String tmp = dbTags.get(i); if(removeTagsSet.contains(tmp)){ removeTagsSet.remove(tmp); } } for (String tag : removeTagsSet) { cache.remove(tag); logger.info("Remove tag {} from IdCache", tag); } } catch (Exception e) { logger.warn("update cache from db exception", e); } finally { sw.stop("updateCacheFromDb"); } }
3.獲取到下一個序列號
@Override public Result get(final String key) { if (!initOK) { return new Result(EXCEPTION_ID_IDCACHE_INIT_FALSE, Status.EXCEPTION); } //判斷cache中是否有這個key——bizTag if (cache.containsKey(key)) { SegmentBuffer buffer = cache.get(key); //未初始化, if (!buffer.isInitOk()) { synchronized (buffer) { //再次判斷,以防重復初始化. if (!buffer.isInitOk()) { try { //對SegmentBuffer當前的Segment進行初始化 updateSegmentFromDb(key, buffer.getCurrent()); logger.info("Init buffer. Update leafkey {} {} from db", key, buffer.getCurrent()); buffer.setInitOk(true); } catch (Exception e) { logger.warn("Init buffer {} exception", buffer.getCurrent(), e); } } } } return getIdFromSegmentBuffer(cache.get(key)); } return new Result(EXCEPTION_ID_KEY_NOT_EXISTS, Status.EXCEPTION); }
updateSegmentFromDb();
public void updateSegmentFromDb(String key, Segment segment) { StopWatch sw = new Slf4JStopWatch(); SegmentBuffer buffer = segment.getBuffer(); LeafAlloc leafAlloc; //1.SegmentBuffer尚未初始化,則SegmentBuffer的step等於數據庫中的step; if (!buffer.isInitOk()) { leafAlloc = dao.updateMaxIdAndGetLeafAlloc(key); buffer.setStep(leafAlloc.getStep()); buffer.setMinStep(leafAlloc.getStep());//leafAlloc中的step為DB中的step } else if (buffer.getUpdateTimestamp() == 0) { //2.SegmentBuffer已經初始化完成: leafAlloc = dao.updateMaxIdAndGetLeafAlloc(key); buffer.setUpdateTimestamp(System.currentTimeMillis()); buffer.setStep(leafAlloc.getStep()); buffer.setMinStep(leafAlloc.getStep());//leafAlloc中的step為DB中的step } else { //2.SegmentBuffer已經初始化完成: //a) SegmentBuffer的存活時間小於15分鍾: // i. 如果SegmentBuffer當前的step*2大於最大值(一百萬),則什么也不做,不再擴大step // ii. 否則,SegmentBuffer的step擴大為原來的2倍。 //b) SegmentBuffer的存活時間小於30分鍾:什么也不做. //c) SegmentBuffer的存活時間在15至30分鍾之間: // i. 如果SegmentBuffer的step/2 大於等於數據庫中的step,那么就將SegmentBuffer的step值變為原來的二分之一,否則什么也不做。 long duration = System.currentTimeMillis() - buffer.getUpdateTimestamp(); int nextStep = buffer.getStep(); if (duration < SEGMENT_DURATION) { if (nextStep * 2 > MAX_STEP) { //do nothing } else { nextStep = nextStep * 2; } } else if (duration < SEGMENT_DURATION * 2) { //do nothing with nextStep } else { nextStep = nextStep / 2 >= buffer.getMinStep() ? nextStep / 2 : nextStep; } logger.info("leafKey[{}], step[{}], duration[{}mins], nextStep[{}]", key, buffer.getStep(), String.format("%.2f",((double)duration / (1000 * 60))), nextStep); LeafAlloc temp = new LeafAlloc(); temp.setKey(key); temp.setStep(nextStep); leafAlloc = dao.updateMaxIdByCustomStepAndGetLeafAlloc(temp); buffer.setUpdateTimestamp(System.currentTimeMillis()); buffer.setStep(nextStep); buffer.setMinStep(leafAlloc.getStep());//leafAlloc的step為DB中的step } // must set value before set max long value = leafAlloc.getMaxId() - buffer.getStep(); segment.getValue().set(value); segment.setMax(leafAlloc.getMaxId()); segment.setStep(buffer.getStep()); sw.stop("updateSegmentFromDb", key + " " + segment); }
getIdFromSegmentBuffer();
public Result getIdFromSegmentBuffer(final SegmentBuffer buffer) { while (true) { //讀鎖 buffer.rLock().lock(); try { //獲取到當前正在使用的Segment. final Segment segment = buffer.getCurrent(); //如果下一個Segment沒有初始化完成,當前Segment的ID閑置數量小於9成,並且沒有在執行Segment初始化操作,就去執行對下一個Segment的初始化. //只要當前號段消費數量達到了10%,就對下一個號段進行初始化. if (!buffer.isNextReady() && (segment.getIdle() < 0.9 * segment.getStep()) && buffer.getThreadRunning().compareAndSet(false, true)) { service.execute(new Runnable() { @Override public void run() { Segment next = buffer.getSegments()[buffer.nextPos()]; boolean updateOk = false; try { //根據biz_tag,根據DB對Segment初始化 updateSegmentFromDb(buffer.getKey(), next); updateOk = true; logger.info("update segment {} from db {}", buffer.getKey(), next); } catch (Exception e) { logger.warn(buffer.getKey() + " updateSegmentFromDb exception", e); } finally { if (updateOk) { //獲取寫鎖 buffer.wLock().lock(); buffer.setNextReady(true); buffer.getThreadRunning().set(false); buffer.wLock().unlock(); } else { buffer.getThreadRunning().set(false); } } } }); } //Segment的value記錄了當前已分配的ID最大值,加一后返回.原子性操作. long value = segment.getValue().getAndIncrement(); //如果超過了當前Segment緩存的最大值(當前Segment號段已經消費完畢),就進入阻塞等待. if (value < segment.getMax()) { return new Result(value, Status.SUCCESS); } } finally { buffer.rLock().unlock(); } //當前號段已經消費完了,下個號段正在有別的線程在執行初始化,進入等待. //做空轉. waitAndSleep(buffer); buffer.wLock().lock(); //下面這段代碼在做空轉之前已經執行過,這里為什么還要再次執行呢? //在當前線程空轉期間,可能已經有別的線程執行完畢了對下個Segment的初始化操作,並進行了切換.防止出現多次切換. try { final Segment segment = buffer.getCurrent(); long value = segment.getValue().getAndIncrement(); if (value < segment.getMax()) { return new Result(value, Status.SUCCESS); } //當前Segment緩存的號段已經消費完了,下一個Segment初始化好了就進行切換 if (buffer.isNextReady()) { //將Segment切換為下一個. buffer.switchPos(); buffer.setNextReady(false); } else { //ERROR.當前Segment號段滿了,下一個號段還未准備好. logger.error("Both two segments in {} are not ready!", buffer); return new Result(EXCEPTION_ID_TWO_SEGMENTS_ARE_NULL, Status.EXCEPTION); } } finally { buffer.wLock().unlock(); } } }
