使用zookeeper 實現一致性hash。
redis服務啟動時,將自己的路由信息通過臨時節點方式寫入zk,客戶端通過zk client讀取可用的路由信息。
服務端
使用python 腳本寫的守護進程:https://github.com/LittlePeng/redis-manager
腳本部署在redis-server本機,定時ping redis-server
節點失效的情況:
1.服務器與ZK服務器失去連接 Session Expired ,環境網絡波動造成,需要根據網絡情況設置適當zookeeper的Timeout時間,避免此情況發生
2. 服務器宕機,Zookeeper server 發現zkclient ping超時,就會通知節點下線
3. redis-server 掛了,redis-manager ping 超時主動斷開與zookeeper server的連接
客戶端
需要zkclient監控 節點變化,及時更新路由策略
下面是C# 版本一致性hash算法:
1: class KetamaNodeLocator2: {
3: private Dictionary<long, RedisCluster> ketamaNodes;4: private HashAlgorithm hashAlg;5: private int numReps = 160;6: private long[] keys;7:
8: public KetamaNodeLocator(List<RedisCluster> nodes)9: {
10: ketamaNodes = new Dictionary<long, RedisCluster>();11:
12: //對所有節點,生成nCopies個虛擬結點13: for (int j = 0; j < nodes.Count; j++) {14: RedisCluster node = nodes[j];
15: int numReps = node.Weight;16:
17: //每四個虛擬結點為一組18: for (int i = 0; i < numReps / 4; i++) {19: byte[] digest = ComputeMd5(20: String.Format("{0}_{1}_{2}", node.RoleName, node.RouteValue, i));21:
22: /** Md5是一個16字節長度的數組,將16字節的數組每四個字節一組,23: * 分別對應一個虛擬結點,這就是為什么上面把虛擬結點四個划分一組的原因*/24: for (int h = 0; h < 4; h++) {25:
26: long rv = ((long)(digest[3 + h * 4] & 0xFF) << 24)27: | ((long)(digest[2 + h * 4] & 0xFF) << 16)28: | ((long)(digest[1 + h * 4] & 0xFF) << 8)29: | ((long)digest[0 + h * 4] & 0xFF);30:
31: rv = rv & 0xffffffffL; /* Truncate to 32-bits */32: ketamaNodes[rv] = node;
33: }
34: }
35: }
36:
37: keys = ketamaNodes.Keys.OrderBy(p => p).ToArray();
38: }
41: public RedisCluster GetWorkerNode(string k)42: {
43: byte[] digest = ComputeMd5(k);44: return GetNodeInner(Hash(digest, 0));45: }
46:
47: RedisCluster GetNodeInner(long hash)48: {
49: if (ketamaNodes.Count == 0)50: return null;51: long key = hash;52: int near = 0;53: int index = Array.BinarySearch(keys, hash);54: if (index < 0) {55: near = (~index);
56: if (near == keys.Length)57: near = 0;
58: }
59: else {60: near = index;
61: }
62:
63: return ketamaNodes[keys[near]];64: }
65:
66: public static long Hash(byte[] digest, int nTime)67: {
68: long rv = ((long)(digest[3 + nTime * 4] & 0xFF) << 24)69: | ((long)(digest[2 + nTime * 4] & 0xFF) << 16)70: | ((long)(digest[1 + nTime * 4] & 0xFF) << 8)71: | ((long)digest[0 + nTime * 4] & 0xFF);72:
73: return rv & 0xffffffffL; /* Truncate to 32-bits */74: }
79: public static byte[] ComputeMd5(string k)80: {
81: MD5 md5 = new MD5CryptoServiceProvider();82:
83: byte[] keyBytes = md5.ComputeHash(Encoding.UTF8.GetBytes(k));84: md5.Clear();
85: return keyBytes;86: }
87: }