Nginx一致性哈希模塊的Lua重新實現
技術背景:
最近在工作中使用了nginx+redis 的架構,redis在后台做分布式存儲,每個redis都存放不同的數據,這些數據都是某門戶網站通過Hadoop分析出來的用戶行為日志,key是uid,value是user profile,每小時更新量在500-800萬條記錄,而這些記錄一旦生成,我需要在5分鍾左右的時間完成所有導入過程。
首先,我在nginx中使用了第三方模塊HttpUpstreamConsistent來做負載均衡策略,針對不同用戶(uid)選取不同的backend redis:
upstream somestream { consistent_hash $arg_uid; server 10.50.1.3:11211; server 10.50.1.4:11211; server 10.50.1.5:11211; }
現在問題來了,由於Hadoop系統處理日志的速度非常快,如果把每條記錄都通過Nginx來寫入Redis中,這樣的速度是無法接受的,而且會影響Nginx對正常請求的服務能力。所以,需要將這些數據以離線的方式導入redis集群中,這樣就要重新實現HttpUpstreamConsistent模塊了,才能保證讀寫的哈希策略一致。
下面的源碼演示了如何將HttpUpstreamConsistent模塊翻譯成Lua的過程,(使用了CRC32作散列,依賴庫的路徑已列在Reference中)。
#!/usr/bin/lua -- chenqi@2014/04/02 --[Reference] --https://github.com/yaoweibin/ngx_http_consistent_hash --https://github.com/davidm/lua-digest-crc32lua local CRC = require('CRC32') local M = {} local CONSISTENT_BUCKETS = 1024 local VIRTUAL_NODE = 160 local HASH_PEERS = {} local CONTINUUM = {} local BUCKETS = {} local function hash_fn(key) return CRC.crc32(key) end -- in-place quicksort function quicksort(array,compareFunc) quick(array,1,#array,compareFunc) end function quick(array,left,right,compareFunc) if(left < right ) then local index = partion(array,left,right,compareFunc) quick(array,left,index-1,compareFunc) quick(array,index+1,right,compareFunc) end end function partion(array,left,right,compareFunc) local key = array[left] local index = left array[index],array[right] = array[right],array[index] local i = left while i< right do if compareFunc( key,array[i]) then array[index],array[i] = array[i],array[index] index = index + 1 end i = i + 1 end array[right],array[index] = array[index],array[right] return index; end -- binary search local function chash_find(point) local mid, lo, hi = 1, 1, #CONTINUUM while 1 do if point <= CONTINUUM[lo][2] or point > CONTINUUM[hi][2] then return CONTINUUM[lo] end -- test middle point mid = lo + math.floor((hi-lo)/2) -- perfect match if point <= CONTINUUM[mid][2] and point > (mid > 1 and CONTINUUM[mid-1][2] or 0) then return CONTINUUM[mid] end -- too low, go up if CONTINUUM[mid][2] < point then lo = mid + 1 else hi = mid - 1 end end end local function chash_init() local n = #HASH_PEERS if n == 0 then print("There is no backend servers") return end local C = {} for i,peer in ipairs(HASH_PEERS) do for k=1, math.floor(VIRTUAL_NODE * peer[1]) do local hash_data = peer[2] .. "-" .. (k - 1) table.insert(C, {peer[2], hash_fn(hash_data)}) end end quicksort(C, function(a,b) return a[2] > b[2] end) CONTINUUM = C --[[ for i=1,#C do print(CONTINUUM[i][1],CONTINUUM[i][2]) end --]] local step = math.floor(0xFFFFFFFF / CONSISTENT_BUCKETS) BUCKETS = {} for i=1, CONSISTENT_BUCKETS do table.insert(BUCKETS, i, chash_find(math.floor(step * (i - 1)))) -- print(BUCKETS[i][1],BUCKETS[i][2]) end end M.init = chash_init local function chash_get_upstream_crc32(point) return BUCKETS[(point % CONSISTENT_BUCKETS)+1][1] end M.get_upstream_crc32 = chash_get_upstream_crc32 local function chash_get_upstream(key) local point = math.floor(hash_fn(key)) return chash_get_upstream_crc32(point) end M.get_upstream = chash_get_upstream local function chash_add_upstream(upstream, weigth) weight = weight or 1 table.insert(HASH_PEERS, {weight, upstream}) end M.add_upstream = chash_add_upstream return M
API調用方式:
local redis_login= { "10.50.1.3:11211", "10.50.1.4:11211", "10.50.1.5:11211", } for k, backend in ipairs(redis_login) do chash_login.add_upstream(backend) end chash_login.init() uid="309473941" chash_login.chash_get_upstream(uid)
返回一個backend地址,將該uid對應的數據寫入對應的redis中即可,稍后可以使用Nginx讀到。
PS:關於redis的mass insertion問題,最高效的方式是批量寫入文件(文件格式遵循redis協議),然后使用 redis-cli --pipe 直接導入。
