雪花算法-Snowflake
Snowflake是Twitter提出來的一個算法,其目的是生成一個64bit的整數:
- 1bit:一般是符號位,不做處理
- 41bit:用來記錄時間戳,這里可以記錄69年,如果設置好起始時間比如今年是2018年,那么可以用到2089年,到時候怎么辦?要是這個系統能用69年,我相信這個系統早都重構了好多次了。
- 10bit:10bit用來記錄機器ID,總共可以記錄1024台機器,一般用前5位代表數據中心,后面5位是某個數據中心的機器ID
- 12bit:循環位,用來對同一個毫秒之內產生不同的ID,12位可以最多記錄4095個,也就是在同一個機器同一毫秒最多記錄4095個,多余的需要進行等待下毫秒。
上面只是一個將64bit划分的標准,當然也不一定這么做,可以根據不同業務的具體場景來划分,比如下面給出一個業務場景:
- 服務目前QPS10萬,預計幾年之內會發展到百萬。
- 當前機器三地部署,上海,北京,深圳都有。
- 當前機器10台左右,預計未來會增加至百台。
這個時候我們根據上面的場景可以再次合理的划分62bit,QPS幾年之內會發展到百萬,那么每毫秒就是千級的請求,目前10台機器那么每台機器承擔百級的請求,為了保證擴展,后面的循環位可以限制到1024,也就是2^10,那么循環位10位就足夠了。
機器三地部署我們可以用3bit總共8來表示機房位置,當前的機器10台,為了保證擴展到百台那么可以用7bit 128來表示,時間位依然是41bit,那么還剩下64-10-3-7-41-1 = 2bit,還剩下2bit可以用來進行擴展。
- 時鍾回撥
因為機器的原因會發生時間回撥,我們的雪花算法是強依賴我們的時間的,如果時間發生回撥,有可能會生成重復的ID,在我們上面的nextId中我們用當前時間和上一次的時間進行判斷,如果當前時間小於上一次的時間那么肯定是發生了回撥,算法會直接拋出異常.
# Twitter's Snowflake algorithm implementation which is used to generate distributed IDs.
# https://github.com/twitter-archive/snowflake/blob/snowflake-2010/src/main/scala/com/twitter/service/snowflake/IdWorker.scala
import time
import logging
from .exceptions import InvalidSystemClock
# 64位ID的划分
WORKER_ID_BITS = 5
DATACENTER_ID_BITS = 5
SEQUENCE_BITS = 12
# 最大取值計算
MAX_WORKER_ID = -1 ^ (-1 << WORKER_ID_BITS) # 2**5-1 0b11111
MAX_DATACENTER_ID = -1 ^ (-1 << DATACENTER_ID_BITS)
# 移位偏移計算
WOKER_ID_SHIFT = SEQUENCE_BITS
DATACENTER_ID_SHIFT = SEQUENCE_BITS + WORKER_ID_BITS
TIMESTAMP_LEFT_SHIFT = SEQUENCE_BITS + WORKER_ID_BITS + DATACENTER_ID_BITS
# 序號循環掩碼
SEQUENCE_MASK = -1 ^ (-1 << SEQUENCE_BITS)
# Twitter元年時間戳
TWEPOCH = 1288834974657
logger = logging.getLogger('flask.app')
class IdWorker(object):
"""
用於生成IDs
"""
def __init__(self, datacenter_id, worker_id, sequence=0):
"""
初始化
:param datacenter_id: 數據中心(機器區域)ID
:param worker_id: 機器ID
:param sequence: 其實序號
"""
# sanity check
if worker_id > MAX_WORKER_ID or worker_id < 0:
raise ValueError('worker_id值越界')
if datacenter_id > MAX_DATACENTER_ID or datacenter_id < 0:
raise ValueError('datacenter_id值越界')
self.worker_id = worker_id
self.datacenter_id = datacenter_id
self.sequence = sequence
self.last_timestamp = -1 # 上次計算的時間戳
def _gen_timestamp(self):
"""
生成整數時間戳
:return:int timestamp
"""
return int(time.time() * 1000)
def get_id(self):
"""
獲取新ID
:return:
"""
timestamp = self._gen_timestamp()
# 時鍾回撥
if timestamp < self.last_timestamp:
logging.error('clock is moving backwards. Rejecting requests until {}'.format(self.last_timestamp))
raise InvalidSystemClock
if timestamp == self.last_timestamp:
self.sequence = (self.sequence + 1) & SEQUENCE_MASK
if self.sequence == 0:
timestamp = self._til_next_millis(self.last_timestamp)
else:
self.sequence = 0
self.last_timestamp = timestamp
new_id = ((timestamp - TWEPOCH) << TIMESTAMP_LEFT_SHIFT) | (self.datacenter_id << DATACENTER_ID_SHIFT) | \
(self.worker_id << WOKER_ID_SHIFT) | self.sequence
return new_id
def _til_next_millis(self, last_timestamp):
"""
等到下一毫秒
"""
timestamp = self._gen_timestamp()
while timestamp <= last_timestamp:
timestamp = self._gen_timestamp()
return timestamp
if __name__ == '__main__':
worker = IdWorker(1, 2, 0)
print(worker.get_id())
同文件夾下建立exceptions.py
class InvalidSystemClock(Exception):
"""
時鍾回撥異常
"""
pass
配置文件中添加,對應的是機器ID和序列號
# Snowflake ID Worker 參數
DATACENTER_ID = 0
WORKER_ID = 0
SEQUENCE = 0