這里給出的令牌桶是以redis單節點或者集群為中間件. 不過, 這里的實現比較簡單, 主要提供兩個函數, 一個用於消費令牌, 一個用於添加令牌. 這里, 消費令牌和添加令牌都是通過lua來保證原子性.
消費令牌的代碼如下 :
// FetchToken 用來獲取某個key的一個令牌
func (acc *Accessor) FetchToken(key string) (bool, error) {
/*
* KEYS[1] 表示特定的key, 這個key是當前的令牌數
*/
keyFetchScript :=
`--[[測試顯示, 通過call, 可以將error返回給客戶端, 即使沒有使用return]]--
local curNum = redis.call("DECR", KEYS[1])
if (curNum >= 0)
then
return true
end
redis.call("INCR", KEYS[1])
return false
`
keyFetchCmd := redis.NewScript(keyFetchScript)
res, err := keyFetchCmd.Run(acc.client, []string{key}).Result()
if err != nil && err != redis.Nil {
return false, err
}
if res == redis.Nil {
return false, nil
}
if val, ok := res.(int64); ok {
return (val == 1), nil
}
return false, errors.New("res should be bool")
}
這里每一個key都有一個輔助的key_idx, 每次增加key的令牌數, 都會使key_idx的值加1, 同時這個函數調用會返回對應的key_idx的值. 如果傳入的idx的值與key_idx值不相同, 則不會執行增加令牌數的操作. 這樣設計的目的是, 如果你在不同機器中啟動多個增加令牌數的程序, 而且這些程序啟動時間不同, 那么其中一個程序將會起到增加令牌數的效果, 而另外的程序不會新增令牌數. 當增加令牌數的這個程序意外關閉, 將會有新的增加令牌的程序起作用. 這個實現的思想類似於樂觀鎖.具體代碼如下:
// AddToken 用來添加某個key的令牌
func (acc *Accessor) AddToken(key string, keyIdx int, keyAdd int, keyLimit int) (int, error) {
/* KEYS[1] 表示特定key,這個key是當前的令牌
* KEYS[2] 表示特定key的idx
* ARGV[1] 表示修改的key的增加的值
* ARGV[2] 表示修改的key的最大值
* ARGV[3] 表示修改的key的idx的序號
*/
// 實現思路, 先判斷這個key當前的序號與修改調用的序號是否一致,如果一致, 則進行修改,否則返回當前的序號
keyAddScript :=
`--[[測試顯示, 通過call, 可以將error返回給客戶端, 即使沒有使用return]]--
local curIdx = redis.call("INCR", KEYS[2])
if (curIdx ~= (ARGV[3]+1))
then
curIdx = redis.call("DECR", KEYS[2])
return curIdx
end
local curNum = redis.call("INCRBY", KEYS[1], ARGV[1])
local maxNum = tonumber(ARGV[2])
if (curNum > maxNum)
then
redis.call("SET", KEYS[1], ARGV[2])
end
return curIdx
`
keyAddCmd := redis.NewScript(keyAddScript)
res, err := keyAddCmd.Run(acc.client, []string{key, getKeyIdx(key)},
keyAdd, keyLimit, keyIdx).Result()
if err != nil && err != redis.Nil {
return 0, err
}
if idx, ok := res.(int64); ok {
return int(idx), nil
}
return 0, errors.New("res should be integer")
}
假設現在有多個節點,例如有20個節點,我希望每次都有3個節點作為添加令牌桶的節點,那么這個怎么實現呢?
/* * 作用: 判斷這個節點是否用於新增鍵令牌(以下稱為: 加令牌節點), * 從而實現每個redis(或者redis集群)總是有N個節點(例如2個或者3個)用於添加令牌操作 * 我們可能采用多種方式獲取所有的key, 然后向對應的key增加令牌, 例如 * (1) 通過數據庫獲取所有鍵值 * (2) 通過遍歷redis獲取所有鍵值 * (3) 直接讀取配置文件 * (4) 通過遠程調用設置 * 判斷方式通過如下實現: * (1) 實現這個判斷需要在redis中使用一個鍵值存儲信息, 這里使用"{}"的鍵值, 這個鍵存儲所有的加令牌節點 * 1) 這個鍵使用類型為list * 2) 這個list中存儲的值為是否可用標示+":"+節點標識符(可用時, 為"1:節點標識符") * (2) 程序啟動時, 監聽這個鍵的修改操作 * 1) "加令牌節點"監聽對這個鍵的包括LREM,LSET和PUSH類型的消息 * 2) 非"加令牌節點"監聽對這個鍵的包括LREM類型的消息 * (3) 然后, 讀取redis中鍵名為"{}"的鍵的值, 然后判斷當前"加令牌節點"的個數, 如果這個個數小於配置值, * 則修改redis中這個鍵的值,將自己設為"加令牌節點", 否則, 不做處理 * (4) 對於每個設置為"加令牌節點"的應用, 會在這個list中排在后面的M個節點(例如2個或者3個)建立tcp連接, * 然后通過ping和pong消息, 來判斷這個節點是否可以連接. * (5) 如果A節點發現B節點不可連接(假設每秒發送一條消息,經過20次沒有發送成功), 向redis發送一條修改請求, * 請求中先查看B節點是否可用,如果B節點當前顯示可用, 那么修改"{}"的值,設置1+":"+節點標識符為 * 0+":"+節點標識符(B節點), 如果顯示B節點已經不可用, 則繼續進行tcp通信, (如果B節點已經不存在, * 則斷開與這個節點的tcp連接,按照當前邏輯,這個應該不會出現) * (6) 節點繼續等待若干時間(例如15s), 在期間查看這個節點是否已經被重置正常,如果恢復正常,則繼續進行tcp通信,
* 查看是否存在問題,如果沒有恢復正常,從這個list中清除這個節點 * (7) 那些非"加令牌節點"接收到清楚的消息之后, 會申請自己成為"加令牌節點",會檢測當前"加令牌節點"的個數, * 如果條件滿足, 則將這個節點信息插入, 讓這個節點成為"加令牌節點", 然后, 每個當前"加令牌節點" * 讀取所有的"加令牌節點", 重新更新與哪些節點建立tcp連接.
* (8) 當前標識符為Ip:port */
完整的代碼請參考如下地址:
https://github.com/ss-torres/ratelimiter.git
在這個git地址中, 如果想調用db_test.go中的測試, 可以參考如下命令:
go test ratelimiter/db -args "localhost:6379" "" "hello"
當前這個實現,沒有充分利用redis的提供的功能,按照我的觀點來看,使用redis的subscribe和publish的實現可能更加簡單,之后可能會提供使用subscribe和publish的實現,然后再進行必要的詳細測試和benchmark。另外,使用zookeeper作為中間的協調中間件的實現可能更加簡單。
如果有什么好的建議, 或者有什么問題, 歡迎提出。
