一次雪花算法使用總結,附C++實現代碼


一、雪花算法簡介:

  1、雪花算法是Twitter 開源的分布式、自增長 id 生成算法;

  2、雪花算法生成的id是一個無符號長整型(unsigned long)的id,它占64個bit(8*8);

二、項目背景:

  1、多台服務器組成的集群;

  2、每台服務器同時啟動多個worker;

  3、每個worker使用雪花算法生成自增長id、再通過mycat進行批量入庫。

三、需求分析:

  1、自增長;

  2、分布式;

  顯然,雪花算法很適合我們。

四、問題分析及解決方案:

  1、Q:如何確保生成的id為正?

    A:讓id的第一個bit位固定為0。

  2、Q:如何確保id自增?

    A:使用毫秒級時間戳。

  3、Q:如何確保集群中不同的機器上的生成id不重復?

    A1:每台服務器有一個固定的機器id(hostid),這個能確保集群中不同的機器上的生成id不重復。

    A2:給每台服務器配置一個id,用這個id代替hostid,這個能確保集群中不同的機器上的生成id不重復。

  4、Q:如何確保同一台機器上不同的worker生成的id不重復?

    A1:每個worker即一個進程(pid),可以取進程id來區別不同worker。

    A2:將每個worker的pid映射成對應的workerid,並寫入配置文件中。

  5、Q:如何確保同一worker的同一毫秒內生成的id不重復?

    A:增加序號來控制,如果時間相同則改變序號值。

  

  通過上面的分析,我們可以確定雪花算法生成的id包括以下五部分:符號位、時間戳、hostid、workerid、序號

  1)符號位,無意義;

  2)時間戳,控制自增長;

  3)hostid,控制不同機器生成不重復的id;

  4)workerid,控制同一機器上不同進程生成不重復的id;

  5)序號,控制同一機器上同一進程且同一時刻生成不重復的id;

五、bit資源分配方案:

  1、符號位,固定1個bit;

  2、時間戳,時間戳越大,我們能夠使用的年限越多,36個bit大概可以使用兩年多,41個bit大概可以使用69年。為了不吃官司,我們的時間戳應該控制在36~41位;

  3、hostid,服務器自帶的hostid占6個字節(48個bit),顯然不能用它,所以我們需要給集群中的每一台服務器添加一個配置文件,每台服務器配置一個唯一的id作為hostid;

  4、workerid,我們知道進程id一般最大為0x7fff,占15個bit,顯然bit資源也不夠分,所以我們需要將每個worker的pid映射成對應的workerid,並寫入配置文件中;

  6、序號,根據實際情況設置范圍。

 

  綜上,我的分配方案如下:

  1)符號位,1bit;

  2)時間戳,41bit;

  3)hostid,5bit(0~31);

  4)workerid,5bit(0~31);

  6)序號,12bit(0~4095)

  該方案最多支持32台服務器的集群,每台服務器上最多同時啟動32個worker(具體還得根據服務器資源分配)。

六、hostid和workerid的配置文件(Severcfg.xml):

  1)Severcfg.xml

 其中Hostid需要手動配置,集群中每台服務器的Hostid必須不一致;WorkerInfo是由worker啟動腳本動態添加。

  2)worker啟動腳本:

#! /bin/bash
WORKER_DIR="/home/fleet/worker"
data_time=`date +'%Y-%m-%d'`
WORKER_NAME="/home/fleet/worker/worker.jar"
WORKER_PORT=23451
WORKER_COUNT=1
WORKER_LOG_PATH="/home/fleet/worker/logs"
SEVERCFG="/home/fleet/lib64/config/Severcfg.xml"

#判斷worker所在路徑是否為全路徑
if [[ ! $WORKER_NAME =~ ^\/.* ]];then
        WORKER_NAME=$WORKER_DIR/$WORKER_NAME
fi

#判斷日志路徑是否為全路徑
if [[ ! $WORKER_LOG_PATH =~ ^\/.* ]];then
        WORKER_LOG_PATH=$WORKER_DIR/$WORKER_LOG_PATH
fi

#判斷worker是否存在
if [ ! -f $WORKER_NAME ];then
        echo "$WORKER_NAME not exist!"
        exit 1
fi

#如果日志路徑不存在,創建之
if [ ! -d "$WORKER_LOG_PATH" ];then
        echo "mkdir $WORKER_LOG_PATH"
        mkdir $WORKER_LOG_PATH
fi

echo "WORKER_NAME:$WORKER_NAME, WORKER_PORT:<$WORKER_PORT~$[$WORKER_PORT+$[$WORKER_COUNT-1]]>, WORKER_COUNT:$WORKER_COUNT WORKER_LOG_PATH:$WORKER_LOG_PATH WORKER_DIR=$WORKER_DIR";

#start worker
cd $WORKER_DIR
source /home/fleet/.bashrc;
for ((i=0; i < $WORKER_COUNT; i++))
do
        #根據判斷端口是否被占用啟動worker
        pid=$(netstat -nlp | grep ":$WORKER_PORT" | awk '{print $7}' | awk -F"/" '{ print $1 }');
        if [ ! -n "$pid" ]; then
                WORKER_OUTFILE=$WORKER_LOG_PATH/worker$WORKER_PORT-$data_time.out
                echo "About to start process<$WORKER_NAME>, port:$WORKER_PORT, log:$WORKER_OUTFILE";
                nohup java -Xms1024m -Xmx1024m -XX:PermSize=256m -XX:MaxPermSize=512m -XX:MaxNewSize=512m -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=$WORKER_DIR -jar -Dserver.port=$WORKER_PORT -Dmanagement.server.port=$WORKER_PORT $WORKER_NAME >> $WORKER_OUTFILE 2>&1 &
        else
                 echo "WORKER_PORT:$WORKER_PORT already occupied";
        fi

        #將worker的進程id映射成0~31范圍內的id,並寫入配置文件中,注意進程id獲取方式與上面不一樣,因為啟動worker架包有延時。。。
        pid=$(ps -ef | grep $WORKER_NAME | grep "$WORKER_PORT" | grep -v grep | awk '{print $2}');
        if [ -n "$pid" ] && [ -f $SEVERCFG ]; then
                sed -i "/$WORKER_PORT/d" $SEVERCFG
                sed -i '/HostId/a\        <WorkerInfo workerport="'$WORKER_PORT'" processid="'$pid'" workerid="'$i'"/>' $SEVERCFG
        fi
        WORKER_PORT=$[$WORKER_PORT+1];
done

ulimit -c unlimited;
ulimit -c; 

  該腳本的主要作用是:以某個端口為起始,啟動多個worker,並將worker進程id映射成0~31范圍內的id,寫入配置文件中。

  3)worker停止腳本

#! /bin/sh
#stop workerjar
WORKER_DIR=$(cd $(dirname $0); pwd)
WORKER_NAME="worker.jar"
CLEAR_WORKER_CRONTAB="$WORKER_DIR/clear-worker-crontab.sh"
SEVERCFG="/home/fleet/lib64/config/Severcfg.xml"

if [ $# -eq 1 ];then
        WORKER_NAME=$1
else
        if [ $# -gt 1 ];then
                echo "Too many parameters"
                exit 1
        fi
fi

pid=`ps -aux |grep java | grep $WORKER_NAME | awk '{print $2}'`
if [ -n "$pid" ];then
        kill -15 $pid
        echo "kill worker process[$pid] success"
else
        echo "not find worker process"
fi

#刪除定時器
chmod 755 $CLEAR_WORKER_CRONTAB;
$CLEAR_WORKER_CRONTAB;

sed -i '/WorkerInfo/d' $SEVERCFG

  4)其他腳本此處略,包括定時器等。有需要可以在下面評論區評論。

七、接下來要做的就簡單了,讀取配置文件中的Hostid,獲取當前進程id和配置文件中的進程id做匹配,得到其對應的映射的workerid。

八、雪花算法的C++實現:

  1、頭文件Snowflake.h

/*
*
* 文件名稱:Snowflake.h
* 文件標識:
* 摘    要:通過SnowFlake算法生成一個64位大小的分布式自增長id
*
*/

#ifndef __SNOWFLAKE_H__
#define __SNOWFLAKE_H__

#include <mutex>
#include <atomic>

//#define SNOWFLAKE_ID_WORKER_NO_LOCK
typedef unsigned int UInt;
typedef unsigned long UInt64;

#ifdef SNOWFLAKE_ID_WORKER_NO_LOCK
typedef std::atomic<UInt> AtomicUInt;
typedef std::atomic<UInt64> AtomicUInt64;
#else
typedef UInt AtomicUInt;
typedef UInt64 AtomicUInt64;
#endif

namespace service{
	class Snowflake
	{
	public:
		Snowflake(void);
		~Snowflake(void);

		void setHostId(UInt HostId)
		{
			m_HostId = HostId;
		}
		void setWorkerId(UInt workerId)
		{
			m_WorkerId = workerId;
		}
		UInt64 GetId()
		{
			return GetDistributedId();
		}

	private:
		UInt64 GetTimeStamp();
		UInt64 tilNextMillis(UInt64 lastTimestamp);
		UInt64 GetDistributedId();

	private:

#ifndef SNOWFLAKE_ID_WORKER_NO_LOCK
		std::mutex mutex;
#endif

		/**
		* 開始時間截 (2019-09-30 00:00:00.000)
		*/
		const UInt64 twepoch = 1569772800000;

		/**
		* worker進程映射id所占的位數
		*/
		const UInt workerIdBits = 5;

		/**
		* 服務器id所占的位數
		*/
		const UInt hostIdBits = 5;

		/**
		* 序列所占的位數
		*/
		const UInt sequenceBits = 12;

		/**
		* worker進程映射ID向左移12位
		*/
		const UInt workerIdShift = sequenceBits;

		/**
		* 服務器id向左移17位
		*/
		const UInt hostIdShift = workerIdShift + workerIdBits;

		/**
		* 時間截向左移22位
		*/
		const UInt timestampLeftShift = hostIdShift + hostIdBits;

		/**
		* 支持的worker進程映射id,結果是31
		*/
		const UInt maxWorkerId = -1 ^ (-1 << workerIdBits);

		/**
		* 支持的服務器id,結果是31
		*/
		const UInt maxHostId = -1 ^ (-1 << hostIdBits);

		/**
		* 生成序列的掩碼,這里為4095
		*/
		const UInt sequenceMask = -1 ^ (-1 << sequenceBits);

		/**
		* worker進程映射id(0~31)
		*/
		UInt m_WorkerId;

		/**
		* 服務器id(0~31)
		*/
		UInt m_HostId;

		/**
		* 毫秒內序列(0~4095)
		*/
		AtomicUInt sequence{ 0 };

		/**
		* 上次生成ID的時間截
		*/
		AtomicUInt64 lastTimestamp{ 0 };
	};
}
#endif

  2、實現代碼Snowflake.cpp

#include "Snowflake.h"
#include <chrono>
#include <exception>
#include <sstream>

namespace service
{
	Snowflake::Snowflake(void)
	{
		m_HostId = 0;
		m_WorkerId = 0;
		sequence = 0;
		lastTimestamp = 0;
	}

	Snowflake::~Snowflake(void)
	{
	}

	UInt64 Snowflake::GetTimeStamp()
	{
		auto t = std::chrono::time_point_cast<std::chrono::milliseconds>(std::chrono::high_resolution_clock::now());
		return t.time_since_epoch().count();
	}

	UInt64 Snowflake::tilNextMillis(UInt64 lastTimestamp)
	{
		UInt64 timestamp = GetTimeStamp();
		while (timestamp <= lastTimestamp) {
			timestamp = GetTimeStamp();
		}
		return timestamp;
	}

	UInt64 Snowflake::GetDistributedId()
	{
#ifndef SNOWFLAKE_ID_WORKER_NO_LOCK
		std::unique_lock<std::mutex> lock{ mutex };
		AtomicUInt64 timestamp{ 0 };
#else
		static AtomicUInt64 timestamp{ 0 };
#endif

		timestamp = GetTimeStamp();
		// 如果當前時間小於上一次ID生成的時間戳,說明系統時鍾回退過這個時候應當拋出異常
		if (timestamp < lastTimestamp) {
			std::ostringstream s;
			s << "clock moved backwards.  Refusing to generate id for " << lastTimestamp - timestamp << " milliseconds";
			throw std::exception(std::runtime_error(s.str()));
		}

		if (lastTimestamp == timestamp) {
			// 如果是同一時間生成的,則進行毫秒內序列
			sequence = (sequence + 1) & sequenceMask;
			if (0 == sequence) {
				// 毫秒內序列溢出, 阻塞到下一個毫秒,獲得新的時間戳
				timestamp = tilNextMillis(lastTimestamp);
			}
		}
		else {
			sequence = 0;
		}

#ifndef SNOWFLAKE_ID_WORKER_NO_LOCK
		lastTimestamp = timestamp;
#else
		lastTimestamp = timestamp.load();
#endif

		// 移位並通過或運算拼到一起組成64位的ID
		return ((timestamp - twepoch) << timestampLeftShift)
			| (m_HostId << hostIdShift)
			| (m_WorkerId << workerIdShift)
			| sequence;
	}
}

  

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM