一、雪花算法簡介:
1、雪花算法是Twitter 開源的分布式、自增長 id 生成算法;
2、雪花算法生成的id是一個無符號長整型(unsigned long)的id,它占64個bit(8*8);
二、項目背景:
1、多台服務器組成的集群;
2、每台服務器同時啟動多個worker;
3、每個worker使用雪花算法生成自增長id、再通過mycat進行批量入庫。
三、需求分析:
1、自增長;
2、分布式;
顯然,雪花算法很適合我們。
四、問題分析及解決方案:
1、Q:如何確保生成的id為正?
A:讓id的第一個bit位固定為0。
2、Q:如何確保id自增?
A:使用毫秒級時間戳。
3、Q:如何確保集群中不同的機器上的生成id不重復?
A1:每台服務器有一個固定的機器id(hostid),這個能確保集群中不同的機器上的生成id不重復。
A2:給每台服務器配置一個id,用這個id代替hostid,這個能確保集群中不同的機器上的生成id不重復。
4、Q:如何確保同一台機器上不同的worker生成的id不重復?
A1:每個worker即一個進程(pid),可以取進程id來區別不同worker。
A2:將每個worker的pid映射成對應的workerid,並寫入配置文件中。
5、Q:如何確保同一worker的同一毫秒內生成的id不重復?
A:增加序號來控制,如果時間相同則改變序號值。
通過上面的分析,我們可以確定雪花算法生成的id包括以下五部分:符號位、時間戳、hostid、workerid、序號
1)符號位,無意義;
2)時間戳,控制自增長;
3)hostid,控制不同機器生成不重復的id;
4)workerid,控制同一機器上不同進程生成不重復的id;
5)序號,控制同一機器上同一進程且同一時刻生成不重復的id;
五、bit資源分配方案:
1、符號位,固定1個bit;
2、時間戳,時間戳越大,我們能夠使用的年限越多,36個bit大概可以使用兩年多,41個bit大概可以使用69年。為了不吃官司,我們的時間戳應該控制在36~41位;
3、hostid,服務器自帶的hostid占6個字節(48個bit),顯然不能用它,所以我們需要給集群中的每一台服務器添加一個配置文件,每台服務器配置一個唯一的id作為hostid;
4、workerid,我們知道進程id一般最大為0x7fff,占15個bit,顯然bit資源也不夠分,所以我們需要將每個worker的pid映射成對應的workerid,並寫入配置文件中;
6、序號,根據實際情況設置范圍。
綜上,我的分配方案如下:
1)符號位,1bit;
2)時間戳,41bit;
3)hostid,5bit(0~31);
4)workerid,5bit(0~31);
6)序號,12bit(0~4095)
該方案最多支持32台服務器的集群,每台服務器上最多同時啟動32個worker(具體還得根據服務器資源分配)。
六、hostid和workerid的配置文件(Severcfg.xml):
1)Severcfg.xml

其中Hostid需要手動配置,集群中每台服務器的Hostid必須不一致;WorkerInfo是由worker啟動腳本動態添加。
2)worker啟動腳本:
#! /bin/bash
WORKER_DIR="/home/fleet/worker"
data_time=`date +'%Y-%m-%d'`
WORKER_NAME="/home/fleet/worker/worker.jar"
WORKER_PORT=23451
WORKER_COUNT=1
WORKER_LOG_PATH="/home/fleet/worker/logs"
SEVERCFG="/home/fleet/lib64/config/Severcfg.xml"
#判斷worker所在路徑是否為全路徑
if [[ ! $WORKER_NAME =~ ^\/.* ]];then
WORKER_NAME=$WORKER_DIR/$WORKER_NAME
fi
#判斷日志路徑是否為全路徑
if [[ ! $WORKER_LOG_PATH =~ ^\/.* ]];then
WORKER_LOG_PATH=$WORKER_DIR/$WORKER_LOG_PATH
fi
#判斷worker是否存在
if [ ! -f $WORKER_NAME ];then
echo "$WORKER_NAME not exist!"
exit 1
fi
#如果日志路徑不存在,創建之
if [ ! -d "$WORKER_LOG_PATH" ];then
echo "mkdir $WORKER_LOG_PATH"
mkdir $WORKER_LOG_PATH
fi
echo "WORKER_NAME:$WORKER_NAME, WORKER_PORT:<$WORKER_PORT~$[$WORKER_PORT+$[$WORKER_COUNT-1]]>, WORKER_COUNT:$WORKER_COUNT WORKER_LOG_PATH:$WORKER_LOG_PATH WORKER_DIR=$WORKER_DIR";
#start worker
cd $WORKER_DIR
source /home/fleet/.bashrc;
for ((i=0; i < $WORKER_COUNT; i++))
do
#根據判斷端口是否被占用啟動worker
pid=$(netstat -nlp | grep ":$WORKER_PORT" | awk '{print $7}' | awk -F"/" '{ print $1 }');
if [ ! -n "$pid" ]; then
WORKER_OUTFILE=$WORKER_LOG_PATH/worker$WORKER_PORT-$data_time.out
echo "About to start process<$WORKER_NAME>, port:$WORKER_PORT, log:$WORKER_OUTFILE";
nohup java -Xms1024m -Xmx1024m -XX:PermSize=256m -XX:MaxPermSize=512m -XX:MaxNewSize=512m -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=$WORKER_DIR -jar -Dserver.port=$WORKER_PORT -Dmanagement.server.port=$WORKER_PORT $WORKER_NAME >> $WORKER_OUTFILE 2>&1 &
else
echo "WORKER_PORT:$WORKER_PORT already occupied";
fi
#將worker的進程id映射成0~31范圍內的id,並寫入配置文件中,注意進程id獲取方式與上面不一樣,因為啟動worker架包有延時。。。
pid=$(ps -ef | grep $WORKER_NAME | grep "$WORKER_PORT" | grep -v grep | awk '{print $2}');
if [ -n "$pid" ] && [ -f $SEVERCFG ]; then
sed -i "/$WORKER_PORT/d" $SEVERCFG
sed -i '/HostId/a\ <WorkerInfo workerport="'$WORKER_PORT'" processid="'$pid'" workerid="'$i'"/>' $SEVERCFG
fi
WORKER_PORT=$[$WORKER_PORT+1];
done
ulimit -c unlimited;
ulimit -c;
該腳本的主要作用是:以某個端口為起始,啟動多個worker,並將worker進程id映射成0~31范圍內的id,寫入配置文件中。
3)worker停止腳本
#! /bin/sh
#stop workerjar
WORKER_DIR=$(cd $(dirname $0); pwd)
WORKER_NAME="worker.jar"
CLEAR_WORKER_CRONTAB="$WORKER_DIR/clear-worker-crontab.sh"
SEVERCFG="/home/fleet/lib64/config/Severcfg.xml"
if [ $# -eq 1 ];then
WORKER_NAME=$1
else
if [ $# -gt 1 ];then
echo "Too many parameters"
exit 1
fi
fi
pid=`ps -aux |grep java | grep $WORKER_NAME | awk '{print $2}'`
if [ -n "$pid" ];then
kill -15 $pid
echo "kill worker process[$pid] success"
else
echo "not find worker process"
fi
#刪除定時器
chmod 755 $CLEAR_WORKER_CRONTAB;
$CLEAR_WORKER_CRONTAB;
sed -i '/WorkerInfo/d' $SEVERCFG
4)其他腳本此處略,包括定時器等。有需要可以在下面評論區評論。
七、接下來要做的就簡單了,讀取配置文件中的Hostid,獲取當前進程id和配置文件中的進程id做匹配,得到其對應的映射的workerid。
八、雪花算法的C++實現:
1、頭文件Snowflake.h
/*
*
* 文件名稱:Snowflake.h
* 文件標識:
* 摘 要:通過SnowFlake算法生成一個64位大小的分布式自增長id
*
*/
#ifndef __SNOWFLAKE_H__
#define __SNOWFLAKE_H__
#include <mutex>
#include <atomic>
//#define SNOWFLAKE_ID_WORKER_NO_LOCK
typedef unsigned int UInt;
typedef unsigned long UInt64;
#ifdef SNOWFLAKE_ID_WORKER_NO_LOCK
typedef std::atomic<UInt> AtomicUInt;
typedef std::atomic<UInt64> AtomicUInt64;
#else
typedef UInt AtomicUInt;
typedef UInt64 AtomicUInt64;
#endif
namespace service{
class Snowflake
{
public:
Snowflake(void);
~Snowflake(void);
void setHostId(UInt HostId)
{
m_HostId = HostId;
}
void setWorkerId(UInt workerId)
{
m_WorkerId = workerId;
}
UInt64 GetId()
{
return GetDistributedId();
}
private:
UInt64 GetTimeStamp();
UInt64 tilNextMillis(UInt64 lastTimestamp);
UInt64 GetDistributedId();
private:
#ifndef SNOWFLAKE_ID_WORKER_NO_LOCK
std::mutex mutex;
#endif
/**
* 開始時間截 (2019-09-30 00:00:00.000)
*/
const UInt64 twepoch = 1569772800000;
/**
* worker進程映射id所占的位數
*/
const UInt workerIdBits = 5;
/**
* 服務器id所占的位數
*/
const UInt hostIdBits = 5;
/**
* 序列所占的位數
*/
const UInt sequenceBits = 12;
/**
* worker進程映射ID向左移12位
*/
const UInt workerIdShift = sequenceBits;
/**
* 服務器id向左移17位
*/
const UInt hostIdShift = workerIdShift + workerIdBits;
/**
* 時間截向左移22位
*/
const UInt timestampLeftShift = hostIdShift + hostIdBits;
/**
* 支持的worker進程映射id,結果是31
*/
const UInt maxWorkerId = -1 ^ (-1 << workerIdBits);
/**
* 支持的服務器id,結果是31
*/
const UInt maxHostId = -1 ^ (-1 << hostIdBits);
/**
* 生成序列的掩碼,這里為4095
*/
const UInt sequenceMask = -1 ^ (-1 << sequenceBits);
/**
* worker進程映射id(0~31)
*/
UInt m_WorkerId;
/**
* 服務器id(0~31)
*/
UInt m_HostId;
/**
* 毫秒內序列(0~4095)
*/
AtomicUInt sequence{ 0 };
/**
* 上次生成ID的時間截
*/
AtomicUInt64 lastTimestamp{ 0 };
};
}
#endif
2、實現代碼Snowflake.cpp
#include "Snowflake.h"
#include <chrono>
#include <exception>
#include <sstream>
namespace service
{
Snowflake::Snowflake(void)
{
m_HostId = 0;
m_WorkerId = 0;
sequence = 0;
lastTimestamp = 0;
}
Snowflake::~Snowflake(void)
{
}
UInt64 Snowflake::GetTimeStamp()
{
auto t = std::chrono::time_point_cast<std::chrono::milliseconds>(std::chrono::high_resolution_clock::now());
return t.time_since_epoch().count();
}
UInt64 Snowflake::tilNextMillis(UInt64 lastTimestamp)
{
UInt64 timestamp = GetTimeStamp();
while (timestamp <= lastTimestamp) {
timestamp = GetTimeStamp();
}
return timestamp;
}
UInt64 Snowflake::GetDistributedId()
{
#ifndef SNOWFLAKE_ID_WORKER_NO_LOCK
std::unique_lock<std::mutex> lock{ mutex };
AtomicUInt64 timestamp{ 0 };
#else
static AtomicUInt64 timestamp{ 0 };
#endif
timestamp = GetTimeStamp();
// 如果當前時間小於上一次ID生成的時間戳,說明系統時鍾回退過這個時候應當拋出異常
if (timestamp < lastTimestamp) {
std::ostringstream s;
s << "clock moved backwards. Refusing to generate id for " << lastTimestamp - timestamp << " milliseconds";
throw std::exception(std::runtime_error(s.str()));
}
if (lastTimestamp == timestamp) {
// 如果是同一時間生成的,則進行毫秒內序列
sequence = (sequence + 1) & sequenceMask;
if (0 == sequence) {
// 毫秒內序列溢出, 阻塞到下一個毫秒,獲得新的時間戳
timestamp = tilNextMillis(lastTimestamp);
}
}
else {
sequence = 0;
}
#ifndef SNOWFLAKE_ID_WORKER_NO_LOCK
lastTimestamp = timestamp;
#else
lastTimestamp = timestamp.load();
#endif
// 移位並通過或運算拼到一起組成64位的ID
return ((timestamp - twepoch) << timestampLeftShift)
| (m_HostId << hostIdShift)
| (m_WorkerId << workerIdShift)
| sequence;
}
}

