本文介绍了分布式哈希表的概念及实现,这种描述非常适合代码模拟实现。文末附上了实现的代码(Java版和Python版)
本文内容(除代码外)皆来自 《Foundations of Computers Systems Research》一书,自己翻译的,转载请注明出处,不准确的部分请告知,欢迎讨论。
什么是分布式哈希表?
分布式哈希表是一个基本的结构,被广泛的应用在许多分布式系统中,用于组织动态改变的(分布式)节点集合和透明的资源定位服务,如,DHT 在许多 P2P 系统中作为一个基本结构提供高效透明的资源定位服务。所有节点都同等重要,这使得 DHT 更加的装载平横。而且,在 DHT 中具有一定规律的拓扑结构(环、树、超立方体、网格、蝴蝶形……),这使得查找更加的高效。虽然,各种 DHT 实现的拓扑结构各不相同,但它们至少提供以下两个基本的功能:
DHT 网络设计的重点大多在以下几个方面:
|
各种 DHT 对比
DHTs | Topology | Node Degree | Routing Hops |
CAN | mesh | O(d) | O(dn^(1/d)) |
Chord | ring | O(logn) | O(logn) |
Pastry | tree + ring | O(logn) | O(logn) |
Tapestry | tree | O(logn) | O(logn) |
Kademlia | tree | O(logn) | O(logn) |
Viceroy | butterfly | 7 | O(logn) |
Koord | de Bruijn graph + ring | 2 or O(logn) | O(logn) or O(logn/loglogn) |
Cycloid | hypecube + ring | 3 | O(d) with n = d2^d |
一个 DHT 例子 —— Chord
Chord 的拓扑结构
拓扑结构如右图。N14 节点负责存储标识符为(9,10,11,12,13,14)的键
|
![]() |
Chord 的查找算法 若是使用普通的遍历查找,那么每个节点的度为1(只有后继),查找的时间复杂度为log(n)。而 chord 在节点度数和时间复杂度之间做了个折中:增加节点的度,减少查找时间。 每个节点都有一个 finger table,该表有 m 个条目,每个条目指向另外的节点。条目指向的节点计算公式为:(n + 2^k) % 2^m 。(n 为当前节点的标识符,m 是标识符空间的位数,0 <= k < m)。如右图。 为什么要设置这样一个指向表呢?为了更快的查找。假设现在我们要从节点 n 开始,查找标识符 id。于是有如下的步骤:
|
![]() |
在分布式系统中,会经常的进行节点的动态加入和离开,而且节点也可能出问题,所以还需要考虑节点的动态维护,一些算法就要周期的执行来维护拓扑结构。举一个例子,节点 N26 加入该网络:
|
学习案例
CoNoNS 结构示意图
![]()
在CoDoNS 系统中,查询及回应的流程如下:
home node 是指:其标识符最接近(在所有的 CoDoNS 服务器中)于某一个域名经过一致性哈希映射后的值。也就是说,home node 不是 一定的,它会随着请求查询的域名的不同而动态分配。
|
|
代码实现
Java(包含四个文件:测试类、节点类、Chord类、节点IP地址文件) ![]() 1 //只写了初始化数据结构、节点加入、节点查询 2 //没有写节点的移除,数据的存储 3 4 IPAddressList.txt 5 211.138.116.246 6 128.001.223.111 7 176.002.222.123 8 223.104.160.069 9 112.017.247.082 10 203.069.010.164 11 12 13 14 package DisturbedHashTable; 15 16 import java.io.IOException; 17 //测试类 18 public class MainTest { 19 public static void main (String[] args) throws IOException{ 20 ChordDHT dht = new ChordDHT(6, "F:\\VSCodeProjects\\JavaProjects\\DisturbedHashTable\\IPAddressList.txt"); 21 dht.printNode(); 22 23 //测试加入 24 Node node = new Node(6, "242.108.11.31"); 25 Node curNode = dht.getNode(32); 26 dht.joinToRing(curNode, node, 6); 27 28 var tmp = node.getSuccessor(); 29 System.out.printf("\n%d ", node.getIdentifier()); 30 while(tmp != node){ 31 System.out.printf("%d ", tmp.getIdentifier()); 32 tmp = tmp.getSuccessor(); 33 } 34 35 } 36 } 37 38 //Chord类 39 package DisturbedHashTable; 40 41 import java.io.IOException; 42 import java.nio.charset.StandardCharsets; 43 import java.nio.file.Path; 44 import java.util.HashMap; 45 import java.util.Scanner; 46 47 /** 48 * 基本信息: 49 * 1. Chord 是一个环状结构 50 * 2. 每个节点、键值都被映射成了 m 位的标识符 51 * 3. 支持查找、插入、移除节点操作 52 * 4. 每个节点负责存储标识符为 (i-1, i] 5. 每个节点都有一个 fingerTable 53 * 54 * 实现想法: 55 * 1. 利用双向循环链表作为环状结构 (X) 不用链表,链表不能随机访问,用一个 HashMap 56 * 2. 标识符空间设置位 6 位 57 * 3. 哈希函数将各个节点的IP地址映射成为 6 位的标识符 58 * 4. 每个节点还有一个链表存储其负责的标识符和 fingerTable 59 * 60 */ 61 62 public class ChordDHT{ 63 //节点集合,键值是其标识符 64 private HashMap<Integer, Node> nodeSet; 65 /** 66 * 构造函数接收两个参数: 67 * m 标识符空间位数 68 * srcfile 节点IP地址列表文件 69 * 70 */ 71 public ChordDHT(int m, String srcfile) throws IOException{ 72 this.nodeSet = new HashMap<Integer, Node>(); 73 Scanner inFile = new Scanner(Path.of(srcfile), StandardCharsets.UTF_8); 74 Node firstNode = null; 75 76 while(inFile.hasNext() == true){ //加入初始节点 77 String ip = inFile.nextLine(); 78 Node node = new Node(m, ip); 79 if(firstNode == null) firstNode = node; 80 this.joinToRing(firstNode, node, m); 81 } 82 inFile.close(); 83 84 } 85 86 87 /** 88 * 处理新节点的加入 89 * 除了改变前驱后继关系,还可能存在key的迁移, 90 * 因为这可能会改变某个节点负责的标识符范围 91 * @param curNode 当前节点 (受理加入请求的节点) 92 * @param id 请求加入的节点 93 * @param m 标识符空间位数 94 * @return 是否加入成功 95 */ 96 public boolean joinToRing(Node curNode, Node id, int m){ 97 98 //环不为空时 找到id的直接后继, 并设置相应的域 99 if(this.nodeSet.isEmpty() == false){ 100 var successor = this.searchNode(curNode, id, m); 101 var predecessor = successor.getPredecessor(); 102 id.setPredecessor(predecessor); 103 id.setSuccessor(successor); 104 successor.setPredecessor(id); 105 predecessor.setSuccessor(id); 106 } 107 //将节点放入环中, 包含了环为空的情况 108 this.nodeSet.put(id.getIdentifier(), id); 109 110 //节点加入后,可能会改变一部分节点的fingerTable 111 for(var node: this.nodeSet.values()){ 112 this.updateFingerTable(node, m); 113 } 114 return true; 115 } 116 117 public void updateFingerTable(Node node, int m){ 118 for(int i = 0; i < m; i++){ //对表的每一个条目 119 var aIdentifier = ((int)Math.pow(2, i) + node.getIdentifier()) % (int)Math.pow(2, m); 120 121 //找到为此标识符负责的节点 122 for(var item: this.nodeSet.values()){ 123 if(item.inStorageBound(aIdentifier, m) == true){ 124 node.getFingerTable().put((int)Math.pow(2, i), item); 125 break; 126 } 127 } 128 } 129 } 130 131 132 public Node searchNode(Node curNode, Node id, int m){ 133 //System.out.println("test searchNode"); 134 Node result = null; 135 //出口 136 //负责id标识符的节点必是id的后继节点 137 if(curNode.inStorageBound(id.getIdentifier(), m) == true){ 138 result = curNode; 139 } 140 141 // 递归情况 142 else{ 143 var dis = (id.getIdentifier() - curNode.getIdentifier() + (int)Math.pow(2, m)) % (int)Math.pow(2, m); 144 //找到最大的 k 使得 2^k <= dis 145 var maxk = 0; 146 while((int)Math.pow(2, maxk) <= dis) maxk++; 147 maxk--; 148 result = searchNode(curNode.getFingerTable().get((int)Math.pow(2, maxk)), id, m); 149 } 150 return result; 151 } 152 153 public boolean isIn(int key){ 154 return nodeSet.containsKey(key); 155 } 156 157 public Node getNode(int key){ 158 return nodeSet.get(key); 159 } 160 161 public void printNode(){ 162 for(var node: this.nodeSet.values()){ 163 System.out.printf("identifier is %d, pre is %d, suc is %d\n", 164 node.getIdentifier(), node.getPredecessor().getIdentifier(), 165 node.getSuccessor().getIdentifier()); 166 } 167 } 168 } 169 170 //节点类 171 package DisturbedHashTable; 172 173 import java.sql.Time; 174 import java.util.HashMap; 175 import java.util.Iterator; 176 import java.util.LinkedList; 177 import java.util.Random; 178 179 public class Node { 180 private int identifier; 181 private HashMap<Integer, Node> fingerTable; //键值为 1,2,4,8,16…… 182 private HashMap<Integer, String> keyList; //存储其负责的标识符,String 是 key 183 private Node predecessor; //前驱 184 private Node successor; //后继 185 186 public Node(int m, String ip){ 187 //将ip地址映射成m位标识符,生成m个表项目 188 this.identifier = hashFunc(m, ip); 189 this.keyList = new HashMap<Integer, String>(); 190 this.fingerTable = new HashMap<Integer, Node>(); 191 for(int i = 0; i < m; i++){ 192 this.fingerTable.put((int)Math.pow(2, i), this); 193 } 194 this.predecessor = this; 195 this.successor = this; 196 } 197 198 private int hashFunc(int m, String ip){ 199 Random rd = new Random(System.currentTimeMillis()); 200 int result = 0;//rd.nextInt(999999); //增加随机数,减少碰撞概率 201 for(int i = 0; i < ip.length(); i++){ 202 if(ip.charAt(i) <= '9' && ip.charAt(i) >= '0') { 203 result = (result + (int)ip.charAt(i)) % (int)Math.pow(2, m); 204 } 205 } 206 return result % (int)Math.pow(2, m); 207 } 208 209 public HashMap<Integer, Node> getFingerTable(){ 210 return this.fingerTable; 211 } 212 213 214 215 //负责标识符的范围 216 /** 217 * 这里可能有三种可能: 218 * 1. 前驱节点标识符更大(如,23 ---> 1) 219 * 2. 前驱节点标识符更小(如,23 ---> 27) 220 * 3. 前驱节点标识符一样大(环中只有一个节点时) 221 */ 222 public boolean inStorageBound(int aIdentifier, int m){ 223 var lower = this.predecessor.identifier; 224 var upper = this.identifier; 225 226 //第一种情况 227 if(lower > upper){ 228 if(aIdentifier > lower && aIdentifier < (int)Math.pow(2, m)){ 229 return true; 230 } 231 if(aIdentifier < upper){ 232 return true; 233 } 234 } 235 236 //第二种情况 237 if(aIdentifier > lower && aIdentifier <= upper){ 238 return true; 239 } 240 241 //第三种情况 242 //由于只有一个节点,所以它负责所有的标识符 243 if(lower == upper){ 244 return true; 245 } 246 247 return false; 248 249 } 250 251 public int getIdentifier(){ 252 return this.identifier; 253 } 254 255 public Node getPredecessor(){ 256 return this.predecessor; 257 } 258 259 public Node getSuccessor(){ 260 return this.successor; 261 } 262 263 public boolean setPredecessor(Node node){ 264 this.predecessor = node; 265 return true; 266 } 267 268 public boolean setSuccessor(Node node){ 269 this.successor = node; 270 return true; 271 } 272 273 }
Python(都写在一个文件中,节点IP地址文件同上)
![]() 1 import random 2 import time 3 4 5 ringLenth = lambda x: int(pow(2, x)) 6 7 8 class Node: 9 ''' 哈希表中的节点数据结构 ''' 10 11 def __init__(self, ip, m): 12 self.id = self.generateId(ip = ip, m = m) 13 self.fingerTable = self.generateFT(m = m) 14 self.predecessor = self 15 self.sucessor = self 16 17 18 def generateId(self, ip, m): 19 result = 0 20 random.seed(time.time()) 21 for ch in ip: 22 result = (result + ord(ch) + random.randint(1, 99)) % ringLenth(m) 23 return result 24 25 26 def generateFT(self, m): 27 result = {} 28 for i in range(m): 29 result[int(pow(2, i))] = self 30 return result 31 32 33 def updateFT(self, m, dht): 34 for item in self.fingerTable: 35 pointer = (item + self.id) % ringLenth(m) 36 for node in dht.getNodes(): 37 if node.isInBound(pointer, m): 38 self.fingerTable[item] = node 39 break 40 41 42 def isInBound(self, pointer, m): 43 # 每个节点负责的标识符范围,有三种情况 44 # case 1 环中只有一个节点时 45 if self.id == self.predecessor.getId(): 46 return True 47 # case 2 前驱节点标识符较小 48 elif self.id > self.predecessor.getId(): 49 if pointer > self.predecessor.getId() and pointer <= self.id: 50 return True 51 # case 3 前驱节点标识符较大 52 elif self.id < self.predecessor.getId(): 53 if pointer > self.predecessor.getId() and pointer < ringLenth(m): 54 return True 55 if pointer < self.id: 56 return True 57 58 else: 59 return False 60 61 def getSucessor(self): 62 return self.sucessor 63 64 def getPredecessor(self): 65 return self.predecessor 66 67 def setSuceesor(self, node): 68 self.sucessor = node 69 70 def setPredecessor(self, node): 71 self.predecessor = node 72 73 def getId(self): 74 return self.id 75 76 77 def getFT(self): 78 return self.fingerTable 79 80 81 82 83 class DHT: 84 ''' 哈希表数据结构 ''' 85 def __init__(self, fileName, m): 86 self.nodeSet = {} 87 firstNode = None 88 for ip in open(fileName, 'r').readlines(): 89 node = Node(ip, m) 90 if firstNode is None: 91 firstNode = node 92 self.joinToRing(firstNode, node, m) 93 94 95 def joinToRing(self, curNode, wnode, m): 96 if len(self.nodeSet) is not 0: 97 sucessor = self.searchSucessor(curNode, wnode, m) 98 predecessor = sucessor.getPredecessor() 99 100 wnode.setPredecessor(predecessor) 101 wnode.setSuceesor(sucessor) 102 sucessor.setPredecessor(wnode) 103 predecessor.setSuceesor(wnode) 104 105 self.nodeSet[wnode.getId()] = wnode 106 self.updateNodesFT(m) 107 108 109 def searchSucessor(self, curNode, wnode, m): 110 if curNode.isInBound(wnode.getId(), m): 111 return curNode 112 else: 113 dis = (wnode.getId() - curNode.getId() + ringLenth(m)) % ringLenth(m) 114 # 寻找最大的 k 使得 2^k <= dis 115 maxk = 0 116 while((2 ** maxk) <= dis): maxk += 1 117 maxk -= 1 118 return self.searchSucessor(curNode.getFT()[2 ** maxk], wnode, m) 119 120 121 def updateNodesFT(self, m): 122 for node in self.nodeSet: 123 self.nodeSet[node].updateFT(m, self) 124 125 def getNodes(self): 126 return self.nodeSet.values() 127 128 129 130 131 class Test: 132 ''' 测试哈希表是否运行正常 ''' 133 def __init__(self, fileName, m): 134 self.dht = DHT(fileName, m) 135 for node in self.dht.getNodes(): 136 print("I am node %d, My predecessor is node %d, and My sucessor is node %d" 137 % (node.getId(), node.getPredecessor().getId(), node.getSucessor().getId())) 138 139 140 141 if __name__ == "__main__": 142 test = Test("DistributedHashTable\\IPAddressList.txt", 6)
|