本文已收錄到1.1K Star數開源學習指南——《大廠面試指北》,如果想要了解更多大廠面試相關的內容及獲取《大廠面試指北》離線PDF版,請掃描下方二維碼碼關注公眾號“大廠面試”,謝謝大家了!
《大廠面試指北》最佳閱讀地址:
http://notfound9.github.io/interviewGuide/
《大廠面試指北》項目地址:
https://github.com/NotFound9/interviewGuide
獲取《大廠面試指北》離線PDF版,請掃描下方二維碼關注公眾號“大廠面試”
《大廠面試指北》項目截圖:
摘要
本文主要是對美團的分布式ID框架Leaf的原理進行介紹,針對Leaf原項目中的一些issue,對Leaf項目增加一些功能支持,問題修復及優化改進,改進后的項目地址在這里:
Leaf項目改進計划 https://github.com/NotFound9/Leaf
Leaf原理分析
Snowflake生成ID的模式
7849276-4d1955394baa3c6d.png
snowflake算法對於ID的位數是上圖這樣分配的:
1位的符號位+41位時間戳+10位workID+12位序列號
加起來一共是64個二進制位,正好與Java中的long類型的位數一樣。
美團的Leaf框架對於snowflake算法進行了一些位數調整,位數分配是這樣:
最大41位時間差+10位的workID+12位序列化
雖然看美團對Leaf的介紹文章里面說
Leaf-snowflake方案完全沿用snowflake方案的bit位設計,即是“1+41+10+12”的方式組裝ID號。
其實看代碼里面是沒有專門設置符號位的,如果timestamp過大,導致時間差占用42個二進制位,時間差的第一位為1時,可能生成的id轉換為十進制后會是負數:
//timestampLeftShift是22,workerIdShift是12
long id = ((timestamp - twepoch) << timestampLeftShift) | (workerId << workerIdShift) | sequence;
時間差是什么?
因為時間戳是以1970年01月01日00時00分00秒作為起始點,其實我們一般取的時間戳其實是起始點到現在的時間差,如果我們能確定我們取的時間都是某個時間點以后的時間,那么可以將時間戳的起始點改成這個時間點,Leaf項目中,如果不設置起始時間,默認是2010年11月4日09:42:54,這樣可以使得支持的最大時間增長,Leaf框架的支持最大時間是起始點之后的69年。
workID怎么分配?
Leaf使用Zookeeper作為注冊中心,每次機器啟動時去Zookeeper特定路徑/forever/下讀取子節點列表,每個子節點存儲了IP:Port及對應的workId,遍歷子節點列表,如果存在當前IP:Port對應的workId,就使用節點信息中存儲的workId,不存在就創建一個永久有序節點,將序號作為workId,並且將workId信息寫入本地緩存文件workerID.properties,供啟動時連接Zookeeper失敗,讀取使用。因為workId只分配了10個二進制位,所以取值范圍是0-1023。
序列號怎么生成?
序列號是12個二進制位,取值范圍是0到4095,主要保證同一個leaf服務在同一毫秒內,生成的ID的唯一性。
序列號是生成流程如下:
1.當前時間戳與上一個ID的時間戳在同一毫秒內,那么對sequence+1,如果sequence+1超過了4095,那么進行等待,等到下一毫秒到了之后再生成ID。
2.當前時間戳與上一個ID的時間戳不在同一毫秒內,取一個100以內的隨機數作為序列號。
if (lastTimestamp == timestamp) {
sequence = (sequence + 1) & sequenceMask;
if (sequence == 0) {
//seq 為0的時候表示是下一毫秒時間開始對seq做隨機
sequence = RANDOM.nextInt(100);
timestamp = tilNextMillis(lastTimestamp);
}
} else {
//如果是新的ms開始
sequence = RANDOM.nextInt(100);
}
lastTimestamp = timestamp;
segment生成ID的模式
5e4ff128.png
這種模式需要依賴MySQL,表字段biz_tag代表業務名,max_id代表該業務目前已分配的最大ID值,step代表每次Leaf往數據庫請求時,一次性分配的ID數量。
大致流程就是每個Leaf服務在內存中有兩個Segment實例,每個Segement保存一個分段的ID,
一個Segment是當前用於分配ID,有一個value屬性保存這個分段已分配的最大ID,以及一個max屬性這個分段最大的ID。
另外一個Segement是備用的,當一個Segement用完時,會進行切換,使用另一個Segement進行使用。
當一個Segement的分段ID使用率達到10%時,就會觸發另一個Segement去DB獲取分段ID,初始化好分段ID供之后使用。
Segment {
private AtomicLong value = new AtomicLong(0);
private volatile long max;
private volatile int step;
}
SegmentBuffer {
private String key;
private Segment[] segments; //雙buffer
private volatile int currentPos; //當前的使用的segment的index
private volatile boolean nextReady; //下一個segment是否處於可切換狀態
private volatile boolean initOk; //是否初始化完成
private final AtomicBoolean threadRunning; //線程是否在運行中
private final ReadWriteLock lock;
private volatile int step;
private volatile int minStep;
private volatile long updateTimestamp;
}
Leaf項目改進
目前Leaf項目存在的問題是
Snowflake生成ID相關:
1.注冊中心只支持Zookeeper
而對於一些小公司或者項目組,其他業務沒有使用到Zookeeper的話,為了部署Leaf服務而維護一個Zookeeper集群的代價太大。所以原項目中有issue在問”怎么支持非Zookeeper的注冊中心“,由於一般項目中使用MySQL的概率會大很多,所以增加了使用MySQL作為注冊中心,本地配置作為注冊中心的功能。
2.潛在的時鍾回撥問題
由於啟動前,服務器時間調到了以前的時間或者進行了回撥,連接Zookeeper失敗時會使用本地緩存文件workerID.properties中的workerId,而沒有校驗該ID生成的最大時間戳,可能會造成ID重復,對這個問題進行了修復。
3.時間差過大時,生成id為負數
因為缺少對時間差的校驗,當時間差過大,轉換為二進制數后超過41位后,在生成ID時會造成溢出,使得符號位為1,生成id為負數。
Segement生成ID相關:
沒有太多問題,主要是根據一些issue對代碼進行了性能優化。
具體改進如下:
Snowflake生成ID相關的改進:
1.針對Leaf原項目中的issue#84,增加zk_recycle模式(注冊中心為zk,workId循環使用)
2.針對Leaf原項目中的issue#100,增加MySQL模式(注冊中心為MySQL)
3.針對Leaf原項目中的issue#100,增加Local模式(注冊中心為本地項目配置)
4.針對Leaf原項目中的issue#84,修復啟動時時鍾回撥的問題
5.針對Leaf原項目中的issue#106,修復時間差過大,超過41位溢出,導致生成的id負數的問題
Segement生成ID相關的改進:
1.針對Leaf原項目中的issue#68,優化SegmentIDGenImpl.updateCacheFromDb()方法。
2.針對Leaf原項目中的 issue#88,使用位運算&替換取模運算
snowflake算法生成ID的相關改進
Leaf項目原來的注冊中心的模式(我們暫時命令為zk_normal模式)
使用Zookeeper作為注冊中心,每次機器啟動時去Zookeeper特定路徑下讀取子節點列表,如果存在當前IP:Port對應的workId,就使用節點信息中存儲的workId,不存在就創建一個永久有序節點,將序號作為workId,並且將workId信息寫入本地緩存文件workerID.properties,供啟動時連接Zookeeper失敗,讀取使用。
1.針對Leaf原項目中的issue#84,增加zk_recycle模式(注冊中心為zk,workId循環使用)
問題詳情:
issue#84:workid是否支持回收?
SnowflakeService模式中,workid是否支持回收?分布式環境下,每次重新部署可能就換了一個ip,如果沒有回收的話1024個機器標識很快就會消耗完,為什么zk不用臨時節點去存儲呢,這樣能動態感知服務上下線,對workid進行管理回收?
解決方案:
開發了zk_recycle模式,針對使用snowflake生成分布式ID的技術方案,原本是使用Zookeeper作為注冊中心為每個服務根據IP:Port分配一個固定的workId,workId生成范圍為0到1023,workId不支持回收,所以在Leaf的原項目中有人提出了一個issue#84 workid是否支持回收?,因為當部署Leaf的服務的IP和Port不固定時,如果workId不支持回收,當workId超過最大值時,會導致生成的分布式ID的重復。所以增加了workId循環使用的模式zk_recycle。
如何使用zk_recycle模式?
在Leaf/leaf-server/src/main/resources/leaf.properties中添加以下配置
//開啟snowflake服務
leaf.snowflake.enable=true
//leaf服務的端口,用於生成workId
leaf.snowflake.port=
//將snowflake模式設置為zk_recycle,此時注冊中心為Zookeeper,並且workerId可復用
leaf.snowflake.mode=zk_recycle
//zookeeper的地址
leaf.snowflake.zk.address=localhost:2181
啟動LeafServerApplication,調用/api/snowflake/get/test就可以獲得此種模式下生成的分布式ID。
curl domain/api/snowflake/get/test
1256557484213448722
zk_recycle模式實現原理
按照上面的配置在leaf.properties里面進行配置后,
if(mode.equals(SnowflakeMode.ZK_RECYCLE)) {//注冊中心為zk,對ip:port分配的workId是課循環利用的模式
String zkAddress = properties.getProperty(Constants.LEAF_SNOWFLAKE_ZK_ADDRESS);
RecyclableZookeeperHolder holder = new RecyclableZookeeperHolder(Utils.getIp(),port,zkAddress);
idGen = new SnowflakeIDGenImpl(holder);
if (idGen.init()) {
logger.info("Snowflake Service Init Successfully in mode " + mode);
} else {
throw new InitException("Snowflake Service Init Fail");
}
}
此時SnowflakeIDGenImpl使用的holder是RecyclableZookeeperHolder的實例,workId是可循環利用的,RecyclableZookeeperHolder工作流程如下:
1.首先會在未使用的workId池(zookeeper路徑為/snowflake/leaf.name/recycle/notuse/)中生成所有workId。
2.然后每次服務器啟動時都是去未使用的workId池取一個新的workId,然后放到正在使用的workId池(zookeeper路徑為/snowflake/leaf.name/recycle/inuse/)下,將此workId用於Id生成,並且定時上報時間戳,更新zookeeper中的節點信息。
3.並且定時檢測正在使用的workId池,發現某個workId超過最大時間沒有更新時間戳的workId,會把它從正在使用的workId池移出,然后放到未使用的workId池中,以供workId循環使用。
4.並且正在使用這個很長時間沒有更新時間戳的workId的服務器,在發現自己超過最大時間,還沒有上報時間戳成功后,會停止id生成服務,以防workId被其他服務器循環使用,導致id重復。
2.針對Leaf原項目中的issue#100,增加MySQL模式(注冊中心為MySQL)
問題詳情:
issue#100:如何使用非zk的注冊中心?
解決方案:
開發了mysql模式,這種模式注冊中心為MySQL,針對每個ip:port的workid是固定的。
如何使用這種mysql模式?
需要先在數據庫執行項目中的leaf_workerid_alloc.sql,完成建表,然后在Leaf/leaf-server/src/main/resources/leaf.properties中添加以下配置
//開啟snowflake服務
leaf.snowflake.enable=true
//leaf服務的端口,用於生成workId
leaf.snowflake.port=
//將snowflake模式設置為mysql,此時注冊中心為Zookeeper,workerId為固定分配
leaf.snowflake.mode=mysql
//mysql數據庫地址
leaf.jdbc.url=
leaf.jdbc.username=
leaf.jdbc.password=
啟動LeafServerApplication,調用/api/snowflake/get/test就可以獲得此種模式下生成的分布式ID。
curl domain/api/snowflake/get/test
1256557484213448722
實現原理
使用上面的配置后,此時SnowflakeIDGenImpl使用的holder是SnowflakeMySQLHolder的實例。實現原理與Leaf原項目默認的模式,使用Zookeeper作為注冊中心,每個ip:port的workid是固定的實現原理類似,只是注冊,獲取workid,及更新時間戳是與MySQL進行交互,而不是Zookeeper。
if (mode.equals(SnowflakeMode.MYSQL)) {//注冊中心為mysql
DruidDataSource dataSource = new DruidDataSource();
dataSource.setUrl(properties.getProperty(Constants.LEAF_JDBC_URL));
dataSource.setUsername(properties.getProperty(Constants.LEAF_JDBC_USERNAME));
dataSource.setPassword(properties.getProperty(Constants.LEAF_JDBC_PASSWORD));
dataSource.init();
// Config Dao
WorkerIdAllocDao dao = new WorkerIdAllocDaoImpl(dataSource);
SnowflakeMySQLHolder holder = new SnowflakeMySQLHolder(Utils.getIp(), port, dao);
idGen = new SnowflakeIDGenImpl(holder);
if (idGen.init()) {
logger.info("Snowflake Service Init Successfully in mode " + mode);
} else {
throw new InitException("Snowflake Service Init Fail");
}
}
3.針對Leaf原項目中的issue#100,增加Local模式(注冊中心為本地項目配置)
問題詳情:
issue#100:如何使用非zk的注冊中心?
解決方案:
開發了local模式,這種模式就是適用於部署Leaf服務的IP和Port基本不會變化的情況,就是在Leaf項目中的配置文件leaf.properties中顯式得配置某某IP:某某Port對應哪個workId,每次部署新機器時,將IP:Port的時候在項目中添加這個配置,然后啟動時項目會去讀取leaf.properties中的配置,讀取完寫入本地緩存文件workId.json,下次啟動時直接讀取workId.json,最大時間戳也每次同步到機器上的緩存文件workId.json中。
如何使用這種local模式?
在Leaf/leaf-server/src/main/resources/leaf.properties中添加以下配置
//開啟snowflake服務
leaf.snowflake.enable=true
//leaf服務的端口,用於生成workId
leaf.snowflake.port=
#注冊中心為local的的模式
#leaf.snowflake.mode=local
#leaf.snowflake.local.workIdMap=
#workIdMap的格式是這樣的{"Leaf服務的ip:端口":"固定的workId"},例如:{"10.1.46.33:8080":1,"10.1.46.33:8081":2}
啟動LeafServerApplication,調用/api/snowflake/get/test就可以獲得此種模式下生成的分布式ID。
curl domain/api/snowflake/get/test
1256557484213448722
4.針對Leaf原項目中的issue#84,修復啟動時時鍾回撥的問題
問題詳情:
issue#84:因為當使用默認的模式(我們暫時命令為zk_normal模式),注冊中心為Zookeeper,workId不可復用,上面介紹了這種模式的工作流程,當Leaf服務啟動時,連接Zookeeper失敗,那么會去本機緩存中讀取workerID.properties文件,讀取workId進行使用,但是由於workerID.properties中只存了workId信息,沒有存儲上次上報的最大時間戳,所以沒有進行時間戳判斷,所以如果機器的當前時間被修改到之前,就可能會導致生成的ID重復。
解決方案:
所以增加了更新時間戳到本地緩存的機制,每次在上報時間戳時將時間戳同時寫入本機緩存workerID.properties,並且當使用本地緩存workerID.properties中的workId時,對時間戳進行校驗,當前系統時間戳<緩存中的時間戳時,才使用這個workerId。
//連接失敗,使用本地workerID.properties中的workerID,並且對時間戳進行校驗。
try {
Properties properties = new Properties();
properties.load(new FileInputStream(new File(PROP_PATH.replace("{port}", port + ""))));
Long maxTimestamp = Long.valueOf(properties.getProperty("maxTimestamp"));
if (maxTimestamp!=null && System.currentTimeMillis() <maxTimestamp) {
throw new CheckLastTimeException("init timestamp check error,forever node timestamp gt this node time");
}
workerID = Integer.valueOf(properties.getProperty("workerID"));
LOGGER.warn("START FAILED ,use local node file properties workerID-{}", workerID);
} catch (Exception e1) {
LOGGER.error("Read file error ", e1);
return false;
}
//定時任務每3s執行一次updateNewData()方法,調用更新updateLocalWorkerID()更新緩存文件workerID.properties
void updateNewData(CuratorFramework curator, String path) {
try {
if (System.currentTimeMillis() < lastUpdateTime) {
return;
}
curator.setData().forPath(path, buildData().getBytes());
updateLocalWorkerID(workerID);
lastUpdateTime = System.currentTimeMillis();
} catch (Exception e) {
LOGGER.info("update init data error path is {} error is {}", path, e);
}
}
5.針對Leaf原項目中的issue#106,修復時間差過大,超過41位溢出,導致生成的id負數的問題
問題詳情:
因為Leaf框架是沿用snowflake的位數分配
最大41位時間差+10位的workID+12位序列化,但是由於snowflake是強制要求第一位為符號位0,否則生成的id轉換為十進制后會是復試,但是Leaf項目中沒有對時間差進行校驗,當時間戳過大或者自定義的twepoch設置不當過小,會導致計算得到的時間差過大,轉化為2進制后超過41位,且第一位為1,會導致生成的long類型的id為負數,例如當timestamp = twepoch+2199023255552L時,
此時在生成id時,timestamp - twepoch會等於2199023255552,2199023255552轉換為二進制后是1+41個0,此時生成的id由於符號位是1,id會是負數-9223372036854775793
long id = ((timestamp - twepoch) << timestampLeftShift) | (workerId << workerIdShift) | sequence;
解決方案:
//一開始將最大的maxTimeStamp計算好
this.maxTimeStamp = ~(-1L << timeStampBits) + twepoch;
//然后生成ID時進行校驗
if (timestamp>maxTimeStamp) {
throw new OverMaxTimeStampException("current timestamp is over maxTimeStamp, the generate id will be negative");
}
針對Segement生成分布式ID相關的改進
1.針對Leaf原項目中的issue#68,優化SegmentIDGenImpl.updateCacheFromDb()方法
針對issue#68里面的優化方案,對Segement Buffer的緩存數據與DB數據同步的工作流程進行了進一步優化,主要是對
對SegmentIDGenImpl.updateCacheFromDb()方法進行了優化。
原方案工作流程:
1.遍歷cacheTags,將dbTags的副本insertTagsSet中存在的元素移除,使得insertTagsSet只有db新增的tag
2.遍歷insertTagsSet,將這些新增的元素添加到cache中
3.遍歷dbTags,將cacheTags的副本removeTagsSet中存在的元素移除,使得removeTagsSet只有cache中過期的tag
4.遍歷removeTagsSet,將過期的元素移除cache
這種方案需要經歷四次循環,使用兩個HashSet分別存儲db中新增的tag,cache中過期的tag,
並且為了篩選出新增的tag,過期的tag,對每個現在使用的tag有兩次刪除操作,
原有方案代碼如下:
List<String> dbTags = dao.getAllTags();
if (dbTags == null || dbTags.isEmpty()) {
return;
}
List<String> cacheTags = new ArrayList<String>(cache.keySet());
Set<String> insertTagsSet = new HashSet<>(dbTags);
Set<String> removeTagsSet = new HashSet<>(cacheTags);
//db中新加的tags灌進cache
for(int i = 0; i < cacheTags.size(); i++){
String tmp = cacheTags.get(i);
if(insertTagsSet.contains(tmp)){
insertTagsSet.remove(tmp);
}
}
for (String tag : insertTagsSet) {
SegmentBuffer buffer = new SegmentBuffer();
buffer.setKey(tag);
Segment segment = buffer.getCurrent();
segment.setValue(new AtomicLong(0));
segment.setMax(0);
segment.setStep(0);
cache.put(tag, buffer);
logger.info("Add tag {} from db to IdCache, SegmentBuffer {}", tag, buffer);
}
//cache中已失效的tags從cache刪除
for(int i = 0; i < dbTags.size(); i++){
String tmp = dbTags.get(i);
if(removeTagsSet.contains(tmp)){
removeTagsSet.remove(tmp);
}
}
for (String tag : removeTagsSet) {
cache.remove(tag);
logger.info("Remove tag {} from IdCache", tag);
}
實際上我們並不需要這些中間過程,現方案工作流程:
只需要遍歷dbTags,判斷cache中是否存在這個key,不存在就是新增元素,進行新增。
遍歷cacheTags,判斷dbSet中是否存在這個key,不存在就是過期元素,進行刪除。
現有方案代碼:
List<String> dbTags = dao.getAllTags();
if (dbTags == null || dbTags.isEmpty()) {
return;
}
//將dbTags中新加的tag添加cache,通過遍歷dbTags,判斷是否在cache中存在,不存在就添加到cache
for (String dbTag : dbTags) {
if (cache.containsKey(dbTag)==false) {
SegmentBuffer buffer = new SegmentBuffer();
buffer.setKey(dbTag);
Segment segment = buffer.getCurrent();
segment.setValue(new AtomicLong(0));
segment.setMax(0);
segment.setStep(0);
cache.put(dbTag, buffer);
logger.info("Add tag {} from db to IdCache, SegmentBuffer {}", dbTag, buffer);
}
}
List<String> cacheTags = new ArrayList<String>(cache.keySet());
Set<String> dbTagSet = new HashSet<>(dbTags);
//將cache中已失效的tag從cache刪除,通過遍歷cacheTags,判斷是否在dbTagSet中存在,不存在說明過期,直接刪除
for (String cacheTag : cacheTags) {
if (dbTagSet.contains(cacheTag) == false) {
cache.remove(cacheTag);
logger.info("Remove tag {} from IdCache", cacheTag);
}
}
兩個方案對比:
- 空間復雜度
相比原方案需要使用兩個HashSet,這種方案的只需要使用一個hashSet,空間復雜度會低一些。 - 時間復雜度
總遍歷次數會比原來的少,時間復雜度更低,因為判斷是新增,過期的情況就直接處理了,不需要后續再單獨遍歷,
而且不需要對cache和dbtag的交集進行刪除操作,因為原來方案為了獲得新增的元素,是將dbSet的副本中現有元素進行刪除得到。 - 代碼可讀性
原方案是4個for循環,總共35行代碼,現方案是2個for循環,總共25行代碼,更加簡潔易懂。
2.針對Leaf原項目中的issue#88,使用位運算&替換取模運算
這個更新是針對這個issue#88 提出的問題,使用位運算&來代替取模運算%,執行效率更高。
原代碼:
public int nextPos() {
return (currentPos + 1) % 2;
}
現代碼:
public int nextPos() {
return (currentPos + 1) & 1;
}