redis實現的簡單令牌桶


這里給出的令牌桶是以redis單節點或者集群為中間件. 不過, 這里的實現比較簡單, 主要提供兩個函數, 一個用於消費令牌, 一個用於添加令牌. 這里, 消費令牌和添加令牌都是通過lua來保證原子性.

消費令牌的代碼如下 :

// FetchToken 用來獲取某個key的一個令牌
func (acc *Accessor) FetchToken(key string) (bool, error) {
	/*
	 * KEYS[1] 表示特定的key, 這個key是當前的令牌數
	 */
	keyFetchScript :=
		`--[[測試顯示, 通過call, 可以將error返回給客戶端, 即使沒有使用return]]--
		local curNum = redis.call("DECR", KEYS[1])
		if (curNum >= 0)
		then
			return true
		end
		redis.call("INCR", KEYS[1])
		return false
		`

	keyFetchCmd := redis.NewScript(keyFetchScript)
	res, err := keyFetchCmd.Run(acc.client, []string{key}).Result()
	if err != nil && err != redis.Nil {
		return false, err
	}

	if res == redis.Nil {
		return false, nil
	}

	if val, ok := res.(int64); ok {
		return (val == 1), nil
	}

	return false, errors.New("res should be bool")
}

 這里每一個key都有一個輔助的key_idx, 每次增加key的令牌數, 都會使key_idx的值加1, 同時這個函數調用會返回對應的key_idx的值. 如果傳入的idx的值與key_idx值不相同, 則不會執行增加令牌數的操作. 這樣設計的目的是, 如果你在不同機器中啟動多個增加令牌數的程序, 而且這些程序啟動時間不同, 那么其中一個程序將會起到增加令牌數的效果, 而另外的程序不會新增令牌數. 當增加令牌數的這個程序意外關閉, 將會有新的增加令牌的程序起作用. 這個實現的思想類似於樂觀鎖.具體代碼如下:

// AddToken 用來添加某個key的令牌
func (acc *Accessor) AddToken(key string, keyIdx int, keyAdd int, keyLimit int) (int, error) {
	/* KEYS[1] 表示特定key,這個key是當前的令牌
	 * KEYS[2] 表示特定key的idx
	 * ARGV[1] 表示修改的key的增加的值
	 * ARGV[2] 表示修改的key的最大值
	 * ARGV[3] 表示修改的key的idx的序號
	 */
	// 實現思路, 先判斷這個key當前的序號與修改調用的序號是否一致,如果一致, 則進行修改,否則返回當前的序號
	keyAddScript :=
		`--[[測試顯示, 通過call, 可以將error返回給客戶端, 即使沒有使用return]]--
		local curIdx = redis.call("INCR", KEYS[2])
		if (curIdx ~= (ARGV[3]+1))
		then
			curIdx = redis.call("DECR", KEYS[2])
			return curIdx
		end
		local curNum = redis.call("INCRBY", KEYS[1], ARGV[1])
		local maxNum = tonumber(ARGV[2])
		if (curNum > maxNum) 
		then 
			redis.call("SET", KEYS[1], ARGV[2])
		end
		return curIdx
		`
	keyAddCmd := redis.NewScript(keyAddScript)
	res, err := keyAddCmd.Run(acc.client, []string{key, getKeyIdx(key)},
		keyAdd, keyLimit, keyIdx).Result()
	if err != nil && err != redis.Nil {
		return 0, err
	}

	if idx, ok := res.(int64); ok {
		return int(idx), nil
	}

	return 0, errors.New("res should be integer")
}
 

假設現在有多個節點,例如有20個節點,我希望每次都有3個節點作為添加令牌桶的節點,那么這個怎么實現呢?

/*
 * 作用: 判斷這個節點是否用於新增鍵令牌(以下稱為: 加令牌節點),
 * 從而實現每個redis(或者redis集群)總是有N個節點(例如2個或者3個)用於添加令牌操作
 * 我們可能采用多種方式獲取所有的key, 然后向對應的key增加令牌, 例如
 * (1) 通過數據庫獲取所有鍵值
 * (2) 通過遍歷redis獲取所有鍵值
 * (3) 直接讀取配置文件
 * (4) 通過遠程調用設置
 * 判斷方式通過如下實現:
 * (1) 實現這個判斷需要在redis中使用一個鍵值存儲信息, 這里使用"{}"的鍵值, 這個鍵存儲所有的加令牌節點
 *      1) 這個鍵使用類型為list
 *      2) 這個list中存儲的值為是否可用標示+":"+節點標識符(可用時, 為"1:節點標識符")
 * (2) 程序啟動時, 監聽這個鍵的修改操作
 *      1) "加令牌節點"監聽對這個鍵的包括LREM,LSET和PUSH類型的消息
 *      2) 非"加令牌節點"監聽對這個鍵的包括LREM類型的消息
 * (3) 然后, 讀取redis中鍵名為"{}"的鍵的值, 然后判斷當前"加令牌節點"的個數, 如果這個個數小於配置值,
 *  則修改redis中這個鍵的值,將自己設為"加令牌節點", 否則, 不做處理
 * (4) 對於每個設置為"加令牌節點"的應用, 會在這個list中排在后面的M個節點(例如2個或者3個)建立tcp連接,
 *  然后通過ping和pong消息, 來判斷這個節點是否可以連接.
 * (5) 如果A節點發現B節點不可連接(假設每秒發送一條消息,經過20次沒有發送成功), 向redis發送一條修改請求,
 *   請求中先查看B節點是否可用,如果B節點當前顯示可用, 那么修改"{}"的值,設置1+":"+節點標識符為
 *   0+":"+節點標識符(B節點), 如果顯示B節點已經不可用, 則繼續進行tcp通信, (如果B節點已經不存在,
 *   則斷開與這個節點的tcp連接,按照當前邏輯,這個應該不會出現)
 * (6) 節點繼續等待若干時間(例如15s), 在期間查看這個節點是否已經被重置正常,如果恢復正常,則繼續進行tcp通信,
* 查看是否存在問題,如果沒有恢復正常,從這個list中清除這個節點 * (7) 那些非"加令牌節點"接收到清楚的消息之后, 會申請自己成為"加令牌節點",會檢測當前"加令牌節點"的個數, * 如果條件滿足, 則將這個節點信息插入, 讓這個節點成為"加令牌節點", 然后, 每個當前"加令牌節點" * 讀取所有的"加令牌節點", 重新更新與哪些節點建立tcp連接.
* (8) 當前標識符為Ip:port
*/

 完整的代碼請參考如下地址:

 https://github.com/ss-torres/ratelimiter.git

在這個git地址中, 如果想調用db_test.go中的測試, 可以參考如下命令:

go test ratelimiter/db -args "localhost:6379" "" "hello"

 

當前這個實現,沒有充分利用redis的提供的功能,按照我的觀點來看,使用redis的subscribe和publish的實現可能更加簡單,之后可能會提供使用subscribe和publish的實現,然后再進行必要的詳細測試和benchmark。另外,使用zookeeper作為中間的協調中間件的實現可能更加簡單。

如果有什么好的建議, 或者有什么問題, 歡迎提出。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM