分布式哈希表原理與實現(Python版和Java版)


 

 

本文介紹了分布式哈希表的概念及實現,這種描述非常適合代碼模擬實現。文末附上了實現的代碼(Java版和Python版)

本文內容(除代碼外)皆來自 《Foundations of Computers Systems Research》一書,自己翻譯的,轉載請注明出處,不准確的部分請告知,歡迎討論。

 

什么是分布式哈希表?

 

    分布式哈希表是一個基本的結構,被廣泛的應用在許多分布式系統中,用於組織動態改變的(分布式)節點集合和透明的資源定位服務,如,DHT 在許多 P2P 系統中作為一個基本結構提供高效透明的資源定位服務。所有節點都同等重要,這使得 DHT 更加的裝載平橫。而且,在 DHT 中具有一定規律的拓撲結構(環、樹、超立方體、網格、蝴蝶形……),這使得查找更加的高效。雖然,各種 DHT 實現的拓撲結構各不相同,但它們至少提供以下兩個基本的功能:
  • put(key, value): 插入一個鍵值對到 DHT 網絡中,不需要知道具體的存儲地點
  • value = get(key):根據 key 從 DHT 網絡中檢索相應的值,不需要知道具體的存儲地點

 

DHT 網絡設計的重點大多在以下幾個方面:
  • 拓撲結構(Topology structure)
  • 路由協議(Routing protocol)
  • 動態維護和失敗恢復(Dynamic maintenance and failure recovery)

 

 

各種 DHT 對比

DHTs Topology Node Degree Routing Hops
CAN mesh O(d) O(dn^(1/d))
Chord ring O(logn) O(logn)
Pastry tree + ring O(logn) O(logn)
Tapestry tree O(logn) O(logn)
Kademlia tree O(logn) O(logn)
Viceroy butterfly 7 O(logn)
Koord de Bruijn graph + ring 2 or O(logn) O(logn) or O(logn/loglogn)
Cycloid hypecube + ring O(d) with n = d2^d

 

一個 DHT 例子 —— Chord


Chord 的拓撲結構

  • Chord 使用一致性哈希(consistent hashing)將鍵(key)和節點(node)映射成一個 m 位的標識符
  • Chord 使用環形作為它的拓撲結構,節點按照順時針,標識符遞增排列。因此,每一個節點都有一個直接前驅和一個直接后繼。
  • 每一個節點負責存儲標識符范圍為(i-1, i] 的鍵。i 為節點的標識符,i-1 為當前節點前驅節點的標識符。
拓撲結構如右圖。N14 節點負責存儲標識符為(9,10,11,12,13,14)的鍵
 
                                  

Chord 的查找算法

    若是使用普通的遍歷查找,那么每個節點的度為1(只有后繼),查找的時間復雜度為log(n)。而 chord 在節點度數和時間復雜度之間做了個折中:增加節點的度,減少查找時間。
    每個節點都有一個 finger table,該表有 m 個條目,每個條目指向另外的節點。條目指向的節點計算公式為:(n + 2^k) % 2^m 。(n 為當前節點的標識符,m 是標識符空間的位數,0 <= k < m)。如右圖。
    為什么要設置這樣一個指向表呢?為了更快的查找。假設現在我們要從節點 n 開始,查找標識符 id。於是有如下的步驟:
  • 計算當前節點 n 到目標節點(存儲 id)的距離:d = (id-n+2^m) % 2^m,從這里可以知道:(n + d) % 2^m = id
  • 然后計算最大的 k,使得 2^k <= d。然后根據 finger table,跳轉到存儲標識符 n + 2^k 的節點。現在離目標又近了一步
  • 以上過程遞歸執行,直到目標標識符位於當前節點和其直接后繼之間,那么此時目標標識符就存儲在當前節點的直接后繼中
 


Chord 的動態維護和失敗節點恢復

    在分布式系統中,會經常的進行節點的動態加入和離開,而且節點也可能出問題,所以還需要考慮節點的動態維護,一些算法就要周期的執行來維護拓撲結構。舉一個例子,節點 N26 加入該網絡:

  • 節點 N26 向節點 n 請求加入網絡
  • n 會找到為標識符 26 負責的節點 N32,N32 就是 N26 的直接后繼
  • 節點 N26 會修改自己的后繼,指向 N32,然后執行一個算法更新后繼,並且給新后繼 N32 發送一條問候信息
  • N32 收到該消息后,發現 N26 離自己比當前前驅 N21 更近,所以,N32 就會重設它的前驅為 N26
  • N21 執行一個算法更新后繼,發現 N26 是自己的新后繼,於是修改指向,並發送問候信息給 N26
  • N26 收到消息后,發現自己的前驅為空,然后重設自己的前驅為 N21
上面的過程能夠很好的處理節點的加入,但當節點出了問題時,就很難得知其后繼節點信息,因此,Chord 還維護了一張后繼信息表,但不是記錄所有節點的,表長為 r = log(N),當節點出問題后,能夠有較大概率獲取正確的后繼節點信息。
 
 
 

學習案例


案例一:協同域名系統(Cooperative Domain Name System,CoDoNS)




    CoDoNS 是一種建立在 DHT 之上的自組織 P2P 網絡,它被布置在域名系統客戶端(DNS解析模塊,存在於終端設備中)與舊版 DNS 服務器之間,用來處理舊版 DNS 服務器的性能瓶頸。 傳統的 DNS 服務器具有如下的性能瓶頸(缺點):

  1. 抵御 DoS 攻擊能力差。服務器負載高,研究發現,有80%的域名僅被兩個域名服務器服務;鏈路負載高,研究發現,有30%的域名僅僅通過單一的路由鏈路與互聯網鏈接。這些問題很容易被惡意攻擊者利用。
  2. 域名,地址轉換具有很高的延遲
  3. 其緩存策略不允許它進行快速的更新傳播,而且可能不能快速反應突發變化
        CoNoNS 結構示意圖
 

                          

在CoDoNS 系統中,查詢及回應的流程如下:
  1. 客戶端發送一個查詢請求到本地 CoDoNS 服務器,若本地 CoDoNS 服務器有該域名的緩存備份,就立即返回,否則執行下一步
  2. 該查詢請求被傳送到一個中間 CoDoNS 服務器,同上,如該服務器有域名備份,就直接返回,否則執行下一步
  3. 中間 CoDoNS 服務器將該請求發送到 home node(也是一個 CoDoNS 服務器),若 home node 沒有該記錄,就從傳統 DNS 域名系統檢索該記錄
  4. home node 檢索到記錄后,直接將該信息返回到最初的 CoDoNS 服務器(即最開始的本地 CoDoNS 服務器),然后由它返回給客戶端。與此同時,home node 將該記錄插入到自己的緩存中,然后傳播該記錄給鄰居節點(基於某種優先級)。
 
home node 是指:其標識符最接近(在所有的 CoDoNS 服務器中)於某一個域名經過一致性哈希映射后的值。也就是說,home node 不是 一定的,它會隨着請求查詢的域名的不同而動態分配。

 


案例二:協同文件系統(Cooperative File System,CFS)


    一個理想的 P2P 系統應該具有高性能、容錯性、加載平衡、適應於突發的變化等特點。CFS 采用是分布式架構,具有如上的優點。它采用了分布式哈希塊存儲系統(DHash block storage system)和 Chord DHT,從而提供了一種細粒度(fine granular)的存儲服務。
    CFS 是一種只讀的塊級文件系統,其設計的關鍵之處在於,將每個文件划分成多個塊,並且將它們分布在多個服務器中,以實現加載平衡。由於現實生活中,各個服務器的存儲容量和網絡帶寬各不相同,因此,CFS 常常將一個實際的服務器划分成多個虛擬服務器,虛擬服務器的數量由該實際服務器的容量和帶寬決定。
    CFS 由兩層組成,上層為 DHash,它負責將文件塊分布到服務器中,維護合適的備份水平,緩存熱點文件以加速尋找。下層是 Chord 層,它負責定位具體塊的存儲服務器。Chord 中的每一個節點都維護了兩個數據結構:后繼列表和finger table。后繼列表是為了在節點出問題以后依然可以保持鏈接,finger table 是為了提供高效的查找定位服務。

 

 
 

代碼實現

Java(包含四個文件:測試類、節點類、Chord類、節點IP地址文件)

  1 //只寫了初始化數據結構、節點加入、節點查詢
  2 //沒有寫節點的移除,數據的存儲
  3 
  4 IPAddressList.txt
  5 211.138.116.246
  6 128.001.223.111
  7 176.002.222.123
  8 223.104.160.069
  9 112.017.247.082
 10 203.069.010.164
 11 
 12 
 13 
 14package DisturbedHashTable;
 15 
 16 import java.io.IOException;
 17 //測試類
 18 public class MainTest {
 19     public static void main  (String[] args) throws IOException{
 20         ChordDHT dht = new ChordDHT(6, "F:\\VSCodeProjects\\JavaProjects\\DisturbedHashTable\\IPAddressList.txt");
 21         dht.printNode();
 22 
 23         //測試加入
 24         Node node = new Node(6, "242.108.11.31");
 25         Node curNode = dht.getNode(32);
 26         dht.joinToRing(curNode, node, 6);
 27 
 28         var tmp = node.getSuccessor();
 29         System.out.printf("\n%d ", node.getIdentifier());
 30         while(tmp != node){
 31             System.out.printf("%d ", tmp.getIdentifier());
 32             tmp = tmp.getSuccessor();
 33         }
 34 
 35     }
 36 }
 37 
 38 //Chord類
 39 package DisturbedHashTable;
 40 
 41 import java.io.IOException;
 42 import java.nio.charset.StandardCharsets;
 43 import java.nio.file.Path;
 44 import java.util.HashMap;
 45 import java.util.Scanner;
 46 
 47 /**
 48  * 基本信息: 
 49  * 1. Chord 是一個環狀結構 
 50  * 2. 每個節點、鍵值都被映射成了 m 位的標識符
 51  * 3. 支持查找、插入、移除節點操作 
 52  * 4. 每個節點負責存儲標識符為 (i-1, i] 5. 每個節點都有一個 fingerTable
 53  * 
 54  * 實現想法: 
 55  * 1. 利用雙向循環鏈表作為環狀結構 (X) 不用鏈表,鏈表不能隨機訪問,用一個 HashMap 
 56  * 2. 標識符空間設置位 6 位 
 57  * 3. 哈希函數將各個節點的IP地址映射成為 6 位的標識符 
 58  * 4. 每個節點還有一個鏈表存儲其負責的標識符和 fingerTable
 59  * 
 60  */
 61 
 62  public class ChordDHT{
 63      //節點集合,鍵值是其標識符
 64     private HashMap<Integer, Node> nodeSet;  
 65     /**
 66        * 構造函數接收兩個參數:
 67        *    m        標識符空間位數
 68        *    srcfile 節點IP地址列表文件
 69        *
 70        */
 71       public ChordDHT(int m, String srcfile) throws IOException{
 72           this.nodeSet = new HashMap<Integer, Node>();
 73           Scanner inFile = new Scanner(Path.of(srcfile), StandardCharsets.UTF_8);
 74           Node firstNode = null;
 75 
 76           while(inFile.hasNext() == true){ //加入初始節點
 77               String ip = inFile.nextLine();
 78               Node node = new Node(m, ip);
 79               if(firstNode == null) firstNode = node;
 80               this.joinToRing(firstNode, node, m);
 81           }
 82           inFile.close();
 83           
 84       }
 85 
 86 
 87       /**
 88        * 處理新節點的加入
 89        * 除了改變前驅后繼關系,還可能存在key的遷移,
 90        * 因為這可能會改變某個節點負責的標識符范圍
 91        * @param curNode 當前節點 (受理加入請求的節點)
 92        * @param id 請求加入的節點
 93        * @param m 標識符空間位數
 94        * @return 是否加入成功
 95        */
 96       public boolean joinToRing(Node curNode, Node id, int m){
 97             
 98            //環不為空時 找到id的直接后繼, 並設置相應的域
 99           if(this.nodeSet.isEmpty() == false){
100               var successor = this.searchNode(curNode, id, m);
101               var predecessor = successor.getPredecessor();
102               id.setPredecessor(predecessor);
103               id.setSuccessor(successor);
104               successor.setPredecessor(id);
105               predecessor.setSuccessor(id);
106           }
107           //將節點放入環中, 包含了環為空的情況
108           this.nodeSet.put(id.getIdentifier(), id);
109           
110           //節點加入后,可能會改變一部分節點的fingerTable
111           for(var node: this.nodeSet.values()){
112               this.updateFingerTable(node, m);
113           }  
114           return true; 
115       }
116 
117       public void updateFingerTable(Node node, int m){
118           for(int i = 0; i < m; i++){ //對表的每一個條目
119               var aIdentifier = ((int)Math.pow(2, i) + node.getIdentifier()) % (int)Math.pow(2, m);
120               
121               //找到為此標識符負責的節點
122               for(var item: this.nodeSet.values()){
123                   if(item.inStorageBound(aIdentifier, m) == true){
124                       node.getFingerTable().put((int)Math.pow(2, i), item);
125                       break;
126                   }
127               }
128           }
129       }
130 
131 
132       public Node searchNode(Node curNode, Node id, int m){
133           //System.out.println("test searchNode");
134           Node result = null;
135           //出口
136           //負責id標識符的節點必是id的后繼節點
137           if(curNode.inStorageBound(id.getIdentifier(), m) == true){
138               result = curNode; 
139           }
140           
141           // 遞歸情況
142           else{
143               var dis = (id.getIdentifier() - curNode.getIdentifier() + (int)Math.pow(2, m)) % (int)Math.pow(2, m);
144               //找到最大的 k 使得 2^k <= dis
145               var maxk = 0;
146               while((int)Math.pow(2, maxk) <= dis) maxk++;
147               maxk--;
148               result = searchNode(curNode.getFingerTable().get((int)Math.pow(2, maxk)), id, m);
149           }
150           return result;
151       }
152 
153       public boolean isIn(int key){
154           return nodeSet.containsKey(key);
155       }
156 
157       public Node getNode(int key){
158           return nodeSet.get(key);
159       }
160 
161       public void printNode(){
162           for(var node: this.nodeSet.values()){
163               System.out.printf("identifier is %d, pre is %d, suc is %d\n",
164               node.getIdentifier(), node.getPredecessor().getIdentifier(), 
165               node.getSuccessor().getIdentifier());
166           }
167       }
168  }
169 
170 //節點類
171 package DisturbedHashTable;
172 
173 import java.sql.Time;
174 import java.util.HashMap;
175 import java.util.Iterator;
176 import java.util.LinkedList;
177 import java.util.Random;
178 
179 public class Node {
180     private int identifier;
181     private HashMap<Integer, Node> fingerTable; //鍵值為 1,2,4,8,16……
182     private HashMap<Integer, String> keyList; //存儲其負責的標識符,String 是 key
183     private Node predecessor; //前驅
184     private Node successor; //后繼
185 
186     public Node(int m, String ip){
187         //將ip地址映射成m位標識符,生成m個表項目
188         this.identifier = hashFunc(m, ip);
189         this.keyList = new HashMap<Integer, String>();
190         this.fingerTable = new HashMap<Integer, Node>();
191         for(int i = 0; i < m; i++){ 
192             this.fingerTable.put((int)Math.pow(2, i), this);
193         }
194         this.predecessor = this;
195         this.successor = this;
196     }
197 
198     private int hashFunc(int m, String ip){
199         Random rd = new Random(System.currentTimeMillis());
200         int result = 0;//rd.nextInt(999999); //增加隨機數,減少碰撞概率
201         for(int i = 0; i < ip.length(); i++){
202             if(ip.charAt(i) <= '9' && ip.charAt(i) >= '0') {
203                 result = (result + (int)ip.charAt(i))  % (int)Math.pow(2, m);
204             }
205         }
206         return result % (int)Math.pow(2, m);
207     }
208 
209     public HashMap<Integer, Node> getFingerTable(){
210         return this.fingerTable;
211     }
212 
213     
214 
215     //負責標識符的范圍
216     /**
217       * 這里可能有三種可能:
218       * 1. 前驅節點標識符更大(如,23 ---> 1)
219       * 2. 前驅節點標識符更小(如,23 ---> 27)
220       * 3. 前驅節點標識符一樣大(環中只有一個節點時)
221       */
222     public boolean inStorageBound(int aIdentifier, int m){
223         var lower = this.predecessor.identifier;
224         var upper = this.identifier;
225 
226         //第一種情況
227         if(lower > upper){
228             if(aIdentifier > lower && aIdentifier < (int)Math.pow(2, m)){
229                 return true;
230             }
231             if(aIdentifier < upper){
232                 return true;
233             }
234         }
235 
236         //第二種情況
237         if(aIdentifier > lower && aIdentifier <= upper){
238             return true;
239         }
240 
241         //第三種情況
242         //由於只有一個節點,所以它負責所有的標識符
243         if(lower == upper){
244             return true;
245         }
246 
247         return false;
248         
249     }
250 
251     public int getIdentifier(){
252         return this.identifier;
253     }
254 
255     public Node getPredecessor(){
256         return this.predecessor;
257     }
258 
259     public Node getSuccessor(){
260         return this.successor;
261     }
262 
263     public boolean setPredecessor(Node node){
264         this.predecessor = node;
265         return true;
266     }
267 
268     public boolean setSuccessor(Node node){
269         this.successor = node;
270         return true;
271     }
272 
273 }
View Code

 

Python(都寫在一個文件中,節點IP地址文件同上)

 

  1 import random
  2 import time
  3 
  4 
  5 ringLenth = lambda x: int(pow(2, x))
  6 
  7 
  8 class Node:
  9     ''' 哈希表中的節點數據結構 '''
 10     
 11     def __init__(self, ip, m):
 12         self.id = self.generateId(ip = ip, m = m)
 13         self.fingerTable = self.generateFT(m = m)
 14         self.predecessor = self
 15         self.sucessor = self
 16 
 17 
 18     def generateId(self, ip, m):
 19         result = 0
 20         random.seed(time.time())
 21         for ch in ip:
 22             result = (result + ord(ch) + random.randint(1, 99)) % ringLenth(m)
 23         return result
 24 
 25 
 26     def generateFT(self, m):
 27         result = {}
 28         for i in range(m):
 29             result[int(pow(2, i))] = self
 30         return result
 31 
 32 
 33     def updateFT(self, m, dht):
 34         for item in self.fingerTable:
 35             pointer = (item + self.id) % ringLenth(m)
 36             for node in dht.getNodes():
 37                 if node.isInBound(pointer, m):
 38                     self.fingerTable[item] = node
 39                     break
 40         
 41 
 42     def isInBound(self, pointer, m):
 43         # 每個節點負責的標識符范圍,有三種情況
 44         # case 1 環中只有一個節點時
 45         if self.id == self.predecessor.getId():
 46             return True
 47         # case 2 前驅節點標識符較小
 48         elif self.id > self.predecessor.getId():
 49             if pointer > self.predecessor.getId() and pointer <= self.id:
 50                 return True
 51         # case 3 前驅節點標識符較大
 52         elif self.id < self.predecessor.getId():
 53             if pointer > self.predecessor.getId() and pointer < ringLenth(m):
 54                 return True
 55             if pointer < self.id:
 56                 return True
 57         
 58         else:
 59              return False
 60 
 61     def getSucessor(self):
 62         return self.sucessor
 63 
 64     def getPredecessor(self):
 65         return self.predecessor
 66 
 67     def setSuceesor(self, node):
 68         self.sucessor = node
 69     
 70     def setPredecessor(self, node):
 71         self.predecessor = node
 72 
 73     def getId(self):
 74         return self.id 
 75 
 76 
 77     def getFT(self):
 78         return self.fingerTable
 79  
 80 
 81 
 82 
 83 class DHT:
 84     ''' 哈希表數據結構 '''
 85     def __init__(self, fileName, m):
 86         self.nodeSet = {}
 87         firstNode = None
 88         for ip in open(fileName, 'r').readlines():
 89             node = Node(ip, m)
 90             if firstNode is None:
 91                 firstNode = node
 92             self.joinToRing(firstNode, node, m)
 93 
 94 
 95     def joinToRing(self, curNode, wnode, m):
 96         if len(self.nodeSet) is not 0:
 97             sucessor = self.searchSucessor(curNode, wnode, m)
 98             predecessor = sucessor.getPredecessor()
 99 
100             wnode.setPredecessor(predecessor)
101             wnode.setSuceesor(sucessor)
102             sucessor.setPredecessor(wnode)
103             predecessor.setSuceesor(wnode)
104         
105         self.nodeSet[wnode.getId()] = wnode
106         self.updateNodesFT(m)
107 
108 
109     def searchSucessor(self, curNode, wnode, m):
110         if curNode.isInBound(wnode.getId(), m):
111             return curNode
112         else:
113             dis = (wnode.getId() - curNode.getId() + ringLenth(m)) % ringLenth(m)
114             # 尋找最大的 k 使得 2^k <= dis
115             maxk = 0
116             while((2 ** maxk) <= dis): maxk += 1
117             maxk -= 1
118             return self.searchSucessor(curNode.getFT()[2 ** maxk], wnode, m)
119 
120 
121     def updateNodesFT(self, m):
122         for node in self.nodeSet:
123             self.nodeSet[node].updateFT(m, self)
124 
125     def getNodes(self):
126         return self.nodeSet.values()
127 
128 
129 
130 
131 class Test:
132     ''' 測試哈希表是否運行正常 ''' 
133     def __init__(self, fileName, m):
134         self.dht = DHT(fileName, m) 
135         for node in self.dht.getNodes():
136             print("I am node %d, My predecessor is node %d, and My sucessor is node %d"
137             % (node.getId(), node.getPredecessor().getId(), node.getSucessor().getId()))
138 
139 
140 
141 if __name__ == "__main__":
142     test = Test("DistributedHashTable\\IPAddressList.txt", 6)
View Code

 

 

 

 

 
 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM