本文介紹了分布式哈希表的概念及實現,這種描述非常適合代碼模擬實現。文末附上了實現的代碼(Java版和Python版)
本文內容(除代碼外)皆來自 《Foundations of Computers Systems Research》一書,自己翻譯的,轉載請注明出處,不准確的部分請告知,歡迎討論。
什么是分布式哈希表?
分布式哈希表是一個基本的結構,被廣泛的應用在許多分布式系統中,用於組織動態改變的(分布式)節點集合和透明的資源定位服務,如,DHT 在許多 P2P 系統中作為一個基本結構提供高效透明的資源定位服務。所有節點都同等重要,這使得 DHT 更加的裝載平橫。而且,在 DHT 中具有一定規律的拓撲結構(環、樹、超立方體、網格、蝴蝶形……),這使得查找更加的高效。雖然,各種 DHT 實現的拓撲結構各不相同,但它們至少提供以下兩個基本的功能:
DHT 網絡設計的重點大多在以下幾個方面:
|
各種 DHT 對比
DHTs | Topology | Node Degree | Routing Hops |
CAN | mesh | O(d) | O(dn^(1/d)) |
Chord | ring | O(logn) | O(logn) |
Pastry | tree + ring | O(logn) | O(logn) |
Tapestry | tree | O(logn) | O(logn) |
Kademlia | tree | O(logn) | O(logn) |
Viceroy | butterfly | 7 | O(logn) |
Koord | de Bruijn graph + ring | 2 or O(logn) | O(logn) or O(logn/loglogn) |
Cycloid | hypecube + ring | 3 | O(d) with n = d2^d |
一個 DHT 例子 —— Chord
Chord 的拓撲結構
拓撲結構如右圖。N14 節點負責存儲標識符為(9,10,11,12,13,14)的鍵
|
![]() |
Chord 的查找算法 若是使用普通的遍歷查找,那么每個節點的度為1(只有后繼),查找的時間復雜度為log(n)。而 chord 在節點度數和時間復雜度之間做了個折中:增加節點的度,減少查找時間。 每個節點都有一個 finger table,該表有 m 個條目,每個條目指向另外的節點。條目指向的節點計算公式為:(n + 2^k) % 2^m 。(n 為當前節點的標識符,m 是標識符空間的位數,0 <= k < m)。如右圖。 為什么要設置這樣一個指向表呢?為了更快的查找。假設現在我們要從節點 n 開始,查找標識符 id。於是有如下的步驟:
|
![]() |
在分布式系統中,會經常的進行節點的動態加入和離開,而且節點也可能出問題,所以還需要考慮節點的動態維護,一些算法就要周期的執行來維護拓撲結構。舉一個例子,節點 N26 加入該網絡:
|
學習案例
CoNoNS 結構示意圖
![]()
在CoDoNS 系統中,查詢及回應的流程如下:
home node 是指:其標識符最接近(在所有的 CoDoNS 服務器中)於某一個域名經過一致性哈希映射后的值。也就是說,home node 不是 一定的,它會隨着請求查詢的域名的不同而動態分配。
|
|
代碼實現
Java(包含四個文件:測試類、節點類、Chord類、節點IP地址文件) ![]() 1 //只寫了初始化數據結構、節點加入、節點查詢 2 //沒有寫節點的移除,數據的存儲 3 4 IPAddressList.txt 5 211.138.116.246 6 128.001.223.111 7 176.002.222.123 8 223.104.160.069 9 112.017.247.082 10 203.069.010.164 11 12 13 14 package DisturbedHashTable; 15 16 import java.io.IOException; 17 //測試類 18 public class MainTest { 19 public static void main (String[] args) throws IOException{ 20 ChordDHT dht = new ChordDHT(6, "F:\\VSCodeProjects\\JavaProjects\\DisturbedHashTable\\IPAddressList.txt"); 21 dht.printNode(); 22 23 //測試加入 24 Node node = new Node(6, "242.108.11.31"); 25 Node curNode = dht.getNode(32); 26 dht.joinToRing(curNode, node, 6); 27 28 var tmp = node.getSuccessor(); 29 System.out.printf("\n%d ", node.getIdentifier()); 30 while(tmp != node){ 31 System.out.printf("%d ", tmp.getIdentifier()); 32 tmp = tmp.getSuccessor(); 33 } 34 35 } 36 } 37 38 //Chord類 39 package DisturbedHashTable; 40 41 import java.io.IOException; 42 import java.nio.charset.StandardCharsets; 43 import java.nio.file.Path; 44 import java.util.HashMap; 45 import java.util.Scanner; 46 47 /** 48 * 基本信息: 49 * 1. Chord 是一個環狀結構 50 * 2. 每個節點、鍵值都被映射成了 m 位的標識符 51 * 3. 支持查找、插入、移除節點操作 52 * 4. 每個節點負責存儲標識符為 (i-1, i] 5. 每個節點都有一個 fingerTable 53 * 54 * 實現想法: 55 * 1. 利用雙向循環鏈表作為環狀結構 (X) 不用鏈表,鏈表不能隨機訪問,用一個 HashMap 56 * 2. 標識符空間設置位 6 位 57 * 3. 哈希函數將各個節點的IP地址映射成為 6 位的標識符 58 * 4. 每個節點還有一個鏈表存儲其負責的標識符和 fingerTable 59 * 60 */ 61 62 public class ChordDHT{ 63 //節點集合,鍵值是其標識符 64 private HashMap<Integer, Node> nodeSet; 65 /** 66 * 構造函數接收兩個參數: 67 * m 標識符空間位數 68 * srcfile 節點IP地址列表文件 69 * 70 */ 71 public ChordDHT(int m, String srcfile) throws IOException{ 72 this.nodeSet = new HashMap<Integer, Node>(); 73 Scanner inFile = new Scanner(Path.of(srcfile), StandardCharsets.UTF_8); 74 Node firstNode = null; 75 76 while(inFile.hasNext() == true){ //加入初始節點 77 String ip = inFile.nextLine(); 78 Node node = new Node(m, ip); 79 if(firstNode == null) firstNode = node; 80 this.joinToRing(firstNode, node, m); 81 } 82 inFile.close(); 83 84 } 85 86 87 /** 88 * 處理新節點的加入 89 * 除了改變前驅后繼關系,還可能存在key的遷移, 90 * 因為這可能會改變某個節點負責的標識符范圍 91 * @param curNode 當前節點 (受理加入請求的節點) 92 * @param id 請求加入的節點 93 * @param m 標識符空間位數 94 * @return 是否加入成功 95 */ 96 public boolean joinToRing(Node curNode, Node id, int m){ 97 98 //環不為空時 找到id的直接后繼, 並設置相應的域 99 if(this.nodeSet.isEmpty() == false){ 100 var successor = this.searchNode(curNode, id, m); 101 var predecessor = successor.getPredecessor(); 102 id.setPredecessor(predecessor); 103 id.setSuccessor(successor); 104 successor.setPredecessor(id); 105 predecessor.setSuccessor(id); 106 } 107 //將節點放入環中, 包含了環為空的情況 108 this.nodeSet.put(id.getIdentifier(), id); 109 110 //節點加入后,可能會改變一部分節點的fingerTable 111 for(var node: this.nodeSet.values()){ 112 this.updateFingerTable(node, m); 113 } 114 return true; 115 } 116 117 public void updateFingerTable(Node node, int m){ 118 for(int i = 0; i < m; i++){ //對表的每一個條目 119 var aIdentifier = ((int)Math.pow(2, i) + node.getIdentifier()) % (int)Math.pow(2, m); 120 121 //找到為此標識符負責的節點 122 for(var item: this.nodeSet.values()){ 123 if(item.inStorageBound(aIdentifier, m) == true){ 124 node.getFingerTable().put((int)Math.pow(2, i), item); 125 break; 126 } 127 } 128 } 129 } 130 131 132 public Node searchNode(Node curNode, Node id, int m){ 133 //System.out.println("test searchNode"); 134 Node result = null; 135 //出口 136 //負責id標識符的節點必是id的后繼節點 137 if(curNode.inStorageBound(id.getIdentifier(), m) == true){ 138 result = curNode; 139 } 140 141 // 遞歸情況 142 else{ 143 var dis = (id.getIdentifier() - curNode.getIdentifier() + (int)Math.pow(2, m)) % (int)Math.pow(2, m); 144 //找到最大的 k 使得 2^k <= dis 145 var maxk = 0; 146 while((int)Math.pow(2, maxk) <= dis) maxk++; 147 maxk--; 148 result = searchNode(curNode.getFingerTable().get((int)Math.pow(2, maxk)), id, m); 149 } 150 return result; 151 } 152 153 public boolean isIn(int key){ 154 return nodeSet.containsKey(key); 155 } 156 157 public Node getNode(int key){ 158 return nodeSet.get(key); 159 } 160 161 public void printNode(){ 162 for(var node: this.nodeSet.values()){ 163 System.out.printf("identifier is %d, pre is %d, suc is %d\n", 164 node.getIdentifier(), node.getPredecessor().getIdentifier(), 165 node.getSuccessor().getIdentifier()); 166 } 167 } 168 } 169 170 //節點類 171 package DisturbedHashTable; 172 173 import java.sql.Time; 174 import java.util.HashMap; 175 import java.util.Iterator; 176 import java.util.LinkedList; 177 import java.util.Random; 178 179 public class Node { 180 private int identifier; 181 private HashMap<Integer, Node> fingerTable; //鍵值為 1,2,4,8,16…… 182 private HashMap<Integer, String> keyList; //存儲其負責的標識符,String 是 key 183 private Node predecessor; //前驅 184 private Node successor; //后繼 185 186 public Node(int m, String ip){ 187 //將ip地址映射成m位標識符,生成m個表項目 188 this.identifier = hashFunc(m, ip); 189 this.keyList = new HashMap<Integer, String>(); 190 this.fingerTable = new HashMap<Integer, Node>(); 191 for(int i = 0; i < m; i++){ 192 this.fingerTable.put((int)Math.pow(2, i), this); 193 } 194 this.predecessor = this; 195 this.successor = this; 196 } 197 198 private int hashFunc(int m, String ip){ 199 Random rd = new Random(System.currentTimeMillis()); 200 int result = 0;//rd.nextInt(999999); //增加隨機數,減少碰撞概率 201 for(int i = 0; i < ip.length(); i++){ 202 if(ip.charAt(i) <= '9' && ip.charAt(i) >= '0') { 203 result = (result + (int)ip.charAt(i)) % (int)Math.pow(2, m); 204 } 205 } 206 return result % (int)Math.pow(2, m); 207 } 208 209 public HashMap<Integer, Node> getFingerTable(){ 210 return this.fingerTable; 211 } 212 213 214 215 //負責標識符的范圍 216 /** 217 * 這里可能有三種可能: 218 * 1. 前驅節點標識符更大(如,23 ---> 1) 219 * 2. 前驅節點標識符更小(如,23 ---> 27) 220 * 3. 前驅節點標識符一樣大(環中只有一個節點時) 221 */ 222 public boolean inStorageBound(int aIdentifier, int m){ 223 var lower = this.predecessor.identifier; 224 var upper = this.identifier; 225 226 //第一種情況 227 if(lower > upper){ 228 if(aIdentifier > lower && aIdentifier < (int)Math.pow(2, m)){ 229 return true; 230 } 231 if(aIdentifier < upper){ 232 return true; 233 } 234 } 235 236 //第二種情況 237 if(aIdentifier > lower && aIdentifier <= upper){ 238 return true; 239 } 240 241 //第三種情況 242 //由於只有一個節點,所以它負責所有的標識符 243 if(lower == upper){ 244 return true; 245 } 246 247 return false; 248 249 } 250 251 public int getIdentifier(){ 252 return this.identifier; 253 } 254 255 public Node getPredecessor(){ 256 return this.predecessor; 257 } 258 259 public Node getSuccessor(){ 260 return this.successor; 261 } 262 263 public boolean setPredecessor(Node node){ 264 this.predecessor = node; 265 return true; 266 } 267 268 public boolean setSuccessor(Node node){ 269 this.successor = node; 270 return true; 271 } 272 273 }
Python(都寫在一個文件中,節點IP地址文件同上)
![]() 1 import random 2 import time 3 4 5 ringLenth = lambda x: int(pow(2, x)) 6 7 8 class Node: 9 ''' 哈希表中的節點數據結構 ''' 10 11 def __init__(self, ip, m): 12 self.id = self.generateId(ip = ip, m = m) 13 self.fingerTable = self.generateFT(m = m) 14 self.predecessor = self 15 self.sucessor = self 16 17 18 def generateId(self, ip, m): 19 result = 0 20 random.seed(time.time()) 21 for ch in ip: 22 result = (result + ord(ch) + random.randint(1, 99)) % ringLenth(m) 23 return result 24 25 26 def generateFT(self, m): 27 result = {} 28 for i in range(m): 29 result[int(pow(2, i))] = self 30 return result 31 32 33 def updateFT(self, m, dht): 34 for item in self.fingerTable: 35 pointer = (item + self.id) % ringLenth(m) 36 for node in dht.getNodes(): 37 if node.isInBound(pointer, m): 38 self.fingerTable[item] = node 39 break 40 41 42 def isInBound(self, pointer, m): 43 # 每個節點負責的標識符范圍,有三種情況 44 # case 1 環中只有一個節點時 45 if self.id == self.predecessor.getId(): 46 return True 47 # case 2 前驅節點標識符較小 48 elif self.id > self.predecessor.getId(): 49 if pointer > self.predecessor.getId() and pointer <= self.id: 50 return True 51 # case 3 前驅節點標識符較大 52 elif self.id < self.predecessor.getId(): 53 if pointer > self.predecessor.getId() and pointer < ringLenth(m): 54 return True 55 if pointer < self.id: 56 return True 57 58 else: 59 return False 60 61 def getSucessor(self): 62 return self.sucessor 63 64 def getPredecessor(self): 65 return self.predecessor 66 67 def setSuceesor(self, node): 68 self.sucessor = node 69 70 def setPredecessor(self, node): 71 self.predecessor = node 72 73 def getId(self): 74 return self.id 75 76 77 def getFT(self): 78 return self.fingerTable 79 80 81 82 83 class DHT: 84 ''' 哈希表數據結構 ''' 85 def __init__(self, fileName, m): 86 self.nodeSet = {} 87 firstNode = None 88 for ip in open(fileName, 'r').readlines(): 89 node = Node(ip, m) 90 if firstNode is None: 91 firstNode = node 92 self.joinToRing(firstNode, node, m) 93 94 95 def joinToRing(self, curNode, wnode, m): 96 if len(self.nodeSet) is not 0: 97 sucessor = self.searchSucessor(curNode, wnode, m) 98 predecessor = sucessor.getPredecessor() 99 100 wnode.setPredecessor(predecessor) 101 wnode.setSuceesor(sucessor) 102 sucessor.setPredecessor(wnode) 103 predecessor.setSuceesor(wnode) 104 105 self.nodeSet[wnode.getId()] = wnode 106 self.updateNodesFT(m) 107 108 109 def searchSucessor(self, curNode, wnode, m): 110 if curNode.isInBound(wnode.getId(), m): 111 return curNode 112 else: 113 dis = (wnode.getId() - curNode.getId() + ringLenth(m)) % ringLenth(m) 114 # 尋找最大的 k 使得 2^k <= dis 115 maxk = 0 116 while((2 ** maxk) <= dis): maxk += 1 117 maxk -= 1 118 return self.searchSucessor(curNode.getFT()[2 ** maxk], wnode, m) 119 120 121 def updateNodesFT(self, m): 122 for node in self.nodeSet: 123 self.nodeSet[node].updateFT(m, self) 124 125 def getNodes(self): 126 return self.nodeSet.values() 127 128 129 130 131 class Test: 132 ''' 測試哈希表是否運行正常 ''' 133 def __init__(self, fileName, m): 134 self.dht = DHT(fileName, m) 135 for node in self.dht.getNodes(): 136 print("I am node %d, My predecessor is node %d, and My sucessor is node %d" 137 % (node.getId(), node.getPredecessor().getId(), node.getSucessor().getId())) 138 139 140 141 if __name__ == "__main__": 142 test = Test("DistributedHashTable\\IPAddressList.txt", 6)
|