分布式哈希表原理与实现(Python版和Java版)


 

 

本文介绍了分布式哈希表的概念及实现,这种描述非常适合代码模拟实现。文末附上了实现的代码(Java版和Python版)

本文内容(除代码外)皆来自 《Foundations of Computers Systems Research》一书,自己翻译的,转载请注明出处,不准确的部分请告知,欢迎讨论。

 

什么是分布式哈希表?

 

    分布式哈希表是一个基本的结构,被广泛的应用在许多分布式系统中,用于组织动态改变的(分布式)节点集合和透明的资源定位服务,如,DHT 在许多 P2P 系统中作为一个基本结构提供高效透明的资源定位服务。所有节点都同等重要,这使得 DHT 更加的装载平横。而且,在 DHT 中具有一定规律的拓扑结构(环、树、超立方体、网格、蝴蝶形……),这使得查找更加的高效。虽然,各种 DHT 实现的拓扑结构各不相同,但它们至少提供以下两个基本的功能:
  • put(key, value): 插入一个键值对到 DHT 网络中,不需要知道具体的存储地点
  • value = get(key):根据 key 从 DHT 网络中检索相应的值,不需要知道具体的存储地点

 

DHT 网络设计的重点大多在以下几个方面:
  • 拓扑结构(Topology structure)
  • 路由协议(Routing protocol)
  • 动态维护和失败恢复(Dynamic maintenance and failure recovery)

 

 

各种 DHT 对比

DHTs Topology Node Degree Routing Hops
CAN mesh O(d) O(dn^(1/d))
Chord ring O(logn) O(logn)
Pastry tree + ring O(logn) O(logn)
Tapestry tree O(logn) O(logn)
Kademlia tree O(logn) O(logn)
Viceroy butterfly 7 O(logn)
Koord de Bruijn graph + ring 2 or O(logn) O(logn) or O(logn/loglogn)
Cycloid hypecube + ring O(d) with n = d2^d

 

一个 DHT 例子 —— Chord


Chord 的拓扑结构

  • Chord 使用一致性哈希(consistent hashing)将键(key)和节点(node)映射成一个 m 位的标识符
  • Chord 使用环形作为它的拓扑结构,节点按照顺时针,标识符递增排列。因此,每一个节点都有一个直接前驱和一个直接后继。
  • 每一个节点负责存储标识符范围为(i-1, i] 的键。i 为节点的标识符,i-1 为当前节点前驱节点的标识符。
拓扑结构如右图。N14 节点负责存储标识符为(9,10,11,12,13,14)的键
 
                                  

Chord 的查找算法

    若是使用普通的遍历查找,那么每个节点的度为1(只有后继),查找的时间复杂度为log(n)。而 chord 在节点度数和时间复杂度之间做了个折中:增加节点的度,减少查找时间。
    每个节点都有一个 finger table,该表有 m 个条目,每个条目指向另外的节点。条目指向的节点计算公式为:(n + 2^k) % 2^m 。(n 为当前节点的标识符,m 是标识符空间的位数,0 <= k < m)。如右图。
    为什么要设置这样一个指向表呢?为了更快的查找。假设现在我们要从节点 n 开始,查找标识符 id。于是有如下的步骤:
  • 计算当前节点 n 到目标节点(存储 id)的距离:d = (id-n+2^m) % 2^m,从这里可以知道:(n + d) % 2^m = id
  • 然后计算最大的 k,使得 2^k <= d。然后根据 finger table,跳转到存储标识符 n + 2^k 的节点。现在离目标又近了一步
  • 以上过程递归执行,直到目标标识符位于当前节点和其直接后继之间,那么此时目标标识符就存储在当前节点的直接后继中
 


Chord 的动态维护和失败节点恢复

    在分布式系统中,会经常的进行节点的动态加入和离开,而且节点也可能出问题,所以还需要考虑节点的动态维护,一些算法就要周期的执行来维护拓扑结构。举一个例子,节点 N26 加入该网络:

  • 节点 N26 向节点 n 请求加入网络
  • n 会找到为标识符 26 负责的节点 N32,N32 就是 N26 的直接后继
  • 节点 N26 会修改自己的后继,指向 N32,然后执行一个算法更新后继,并且给新后继 N32 发送一条问候信息
  • N32 收到该消息后,发现 N26 离自己比当前前驱 N21 更近,所以,N32 就会重设它的前驱为 N26
  • N21 执行一个算法更新后继,发现 N26 是自己的新后继,于是修改指向,并发送问候信息给 N26
  • N26 收到消息后,发现自己的前驱为空,然后重设自己的前驱为 N21
上面的过程能够很好的处理节点的加入,但当节点出了问题时,就很难得知其后继节点信息,因此,Chord 还维护了一张后继信息表,但不是记录所有节点的,表长为 r = log(N),当节点出问题后,能够有较大概率获取正确的后继节点信息。
 
 
 

学习案例


案例一:协同域名系统(Cooperative Domain Name System,CoDoNS)




    CoDoNS 是一种建立在 DHT 之上的自组织 P2P 网络,它被布置在域名系统客户端(DNS解析模块,存在于终端设备中)与旧版 DNS 服务器之间,用来处理旧版 DNS 服务器的性能瓶颈。 传统的 DNS 服务器具有如下的性能瓶颈(缺点):

  1. 抵御 DoS 攻击能力差。服务器负载高,研究发现,有80%的域名仅被两个域名服务器服务;链路负载高,研究发现,有30%的域名仅仅通过单一的路由链路与互联网链接。这些问题很容易被恶意攻击者利用。
  2. 域名,地址转换具有很高的延迟
  3. 其缓存策略不允许它进行快速的更新传播,而且可能不能快速反应突发变化
        CoNoNS 结构示意图
 

                          

在CoDoNS 系统中,查询及回应的流程如下:
  1. 客户端发送一个查询请求到本地 CoDoNS 服务器,若本地 CoDoNS 服务器有该域名的缓存备份,就立即返回,否则执行下一步
  2. 该查询请求被传送到一个中间 CoDoNS 服务器,同上,如该服务器有域名备份,就直接返回,否则执行下一步
  3. 中间 CoDoNS 服务器将该请求发送到 home node(也是一个 CoDoNS 服务器),若 home node 没有该记录,就从传统 DNS 域名系统检索该记录
  4. home node 检索到记录后,直接将该信息返回到最初的 CoDoNS 服务器(即最开始的本地 CoDoNS 服务器),然后由它返回给客户端。与此同时,home node 将该记录插入到自己的缓存中,然后传播该记录给邻居节点(基于某种优先级)。
 
home node 是指:其标识符最接近(在所有的 CoDoNS 服务器中)于某一个域名经过一致性哈希映射后的值。也就是说,home node 不是 一定的,它会随着请求查询的域名的不同而动态分配。

 


案例二:协同文件系统(Cooperative File System,CFS)


    一个理想的 P2P 系统应该具有高性能、容错性、加载平衡、适应于突发的变化等特点。CFS 采用是分布式架构,具有如上的优点。它采用了分布式哈希块存储系统(DHash block storage system)和 Chord DHT,从而提供了一种细粒度(fine granular)的存储服务。
    CFS 是一种只读的块级文件系统,其设计的关键之处在于,将每个文件划分成多个块,并且将它们分布在多个服务器中,以实现加载平衡。由于现实生活中,各个服务器的存储容量和网络带宽各不相同,因此,CFS 常常将一个实际的服务器划分成多个虚拟服务器,虚拟服务器的数量由该实际服务器的容量和带宽决定。
    CFS 由两层组成,上层为 DHash,它负责将文件块分布到服务器中,维护合适的备份水平,缓存热点文件以加速寻找。下层是 Chord 层,它负责定位具体块的存储服务器。Chord 中的每一个节点都维护了两个数据结构:后继列表和finger table。后继列表是为了在节点出问题以后依然可以保持链接,finger table 是为了提供高效的查找定位服务。

 

 
 

代码实现

Java(包含四个文件:测试类、节点类、Chord类、节点IP地址文件)

  1 //只写了初始化数据结构、节点加入、节点查询
  2 //没有写节点的移除,数据的存储
  3 
  4 IPAddressList.txt
  5 211.138.116.246
  6 128.001.223.111
  7 176.002.222.123
  8 223.104.160.069
  9 112.017.247.082
 10 203.069.010.164
 11 
 12 
 13 
 14package DisturbedHashTable;
 15 
 16 import java.io.IOException;
 17 //测试类
 18 public class MainTest {
 19     public static void main  (String[] args) throws IOException{
 20         ChordDHT dht = new ChordDHT(6, "F:\\VSCodeProjects\\JavaProjects\\DisturbedHashTable\\IPAddressList.txt");
 21         dht.printNode();
 22 
 23         //测试加入
 24         Node node = new Node(6, "242.108.11.31");
 25         Node curNode = dht.getNode(32);
 26         dht.joinToRing(curNode, node, 6);
 27 
 28         var tmp = node.getSuccessor();
 29         System.out.printf("\n%d ", node.getIdentifier());
 30         while(tmp != node){
 31             System.out.printf("%d ", tmp.getIdentifier());
 32             tmp = tmp.getSuccessor();
 33         }
 34 
 35     }
 36 }
 37 
 38 //Chord类
 39 package DisturbedHashTable;
 40 
 41 import java.io.IOException;
 42 import java.nio.charset.StandardCharsets;
 43 import java.nio.file.Path;
 44 import java.util.HashMap;
 45 import java.util.Scanner;
 46 
 47 /**
 48  * 基本信息: 
 49  * 1. Chord 是一个环状结构 
 50  * 2. 每个节点、键值都被映射成了 m 位的标识符
 51  * 3. 支持查找、插入、移除节点操作 
 52  * 4. 每个节点负责存储标识符为 (i-1, i] 5. 每个节点都有一个 fingerTable
 53  * 
 54  * 实现想法: 
 55  * 1. 利用双向循环链表作为环状结构 (X) 不用链表,链表不能随机访问,用一个 HashMap 
 56  * 2. 标识符空间设置位 6 位 
 57  * 3. 哈希函数将各个节点的IP地址映射成为 6 位的标识符 
 58  * 4. 每个节点还有一个链表存储其负责的标识符和 fingerTable
 59  * 
 60  */
 61 
 62  public class ChordDHT{
 63      //节点集合,键值是其标识符
 64     private HashMap<Integer, Node> nodeSet;  
 65     /**
 66        * 构造函数接收两个参数:
 67        *    m        标识符空间位数
 68        *    srcfile 节点IP地址列表文件
 69        *
 70        */
 71       public ChordDHT(int m, String srcfile) throws IOException{
 72           this.nodeSet = new HashMap<Integer, Node>();
 73           Scanner inFile = new Scanner(Path.of(srcfile), StandardCharsets.UTF_8);
 74           Node firstNode = null;
 75 
 76           while(inFile.hasNext() == true){ //加入初始节点
 77               String ip = inFile.nextLine();
 78               Node node = new Node(m, ip);
 79               if(firstNode == null) firstNode = node;
 80               this.joinToRing(firstNode, node, m);
 81           }
 82           inFile.close();
 83           
 84       }
 85 
 86 
 87       /**
 88        * 处理新节点的加入
 89        * 除了改变前驱后继关系,还可能存在key的迁移,
 90        * 因为这可能会改变某个节点负责的标识符范围
 91        * @param curNode 当前节点 (受理加入请求的节点)
 92        * @param id 请求加入的节点
 93        * @param m 标识符空间位数
 94        * @return 是否加入成功
 95        */
 96       public boolean joinToRing(Node curNode, Node id, int m){
 97             
 98            //环不为空时 找到id的直接后继, 并设置相应的域
 99           if(this.nodeSet.isEmpty() == false){
100               var successor = this.searchNode(curNode, id, m);
101               var predecessor = successor.getPredecessor();
102               id.setPredecessor(predecessor);
103               id.setSuccessor(successor);
104               successor.setPredecessor(id);
105               predecessor.setSuccessor(id);
106           }
107           //将节点放入环中, 包含了环为空的情况
108           this.nodeSet.put(id.getIdentifier(), id);
109           
110           //节点加入后,可能会改变一部分节点的fingerTable
111           for(var node: this.nodeSet.values()){
112               this.updateFingerTable(node, m);
113           }  
114           return true; 
115       }
116 
117       public void updateFingerTable(Node node, int m){
118           for(int i = 0; i < m; i++){ //对表的每一个条目
119               var aIdentifier = ((int)Math.pow(2, i) + node.getIdentifier()) % (int)Math.pow(2, m);
120               
121               //找到为此标识符负责的节点
122               for(var item: this.nodeSet.values()){
123                   if(item.inStorageBound(aIdentifier, m) == true){
124                       node.getFingerTable().put((int)Math.pow(2, i), item);
125                       break;
126                   }
127               }
128           }
129       }
130 
131 
132       public Node searchNode(Node curNode, Node id, int m){
133           //System.out.println("test searchNode");
134           Node result = null;
135           //出口
136           //负责id标识符的节点必是id的后继节点
137           if(curNode.inStorageBound(id.getIdentifier(), m) == true){
138               result = curNode; 
139           }
140           
141           // 递归情况
142           else{
143               var dis = (id.getIdentifier() - curNode.getIdentifier() + (int)Math.pow(2, m)) % (int)Math.pow(2, m);
144               //找到最大的 k 使得 2^k <= dis
145               var maxk = 0;
146               while((int)Math.pow(2, maxk) <= dis) maxk++;
147               maxk--;
148               result = searchNode(curNode.getFingerTable().get((int)Math.pow(2, maxk)), id, m);
149           }
150           return result;
151       }
152 
153       public boolean isIn(int key){
154           return nodeSet.containsKey(key);
155       }
156 
157       public Node getNode(int key){
158           return nodeSet.get(key);
159       }
160 
161       public void printNode(){
162           for(var node: this.nodeSet.values()){
163               System.out.printf("identifier is %d, pre is %d, suc is %d\n",
164               node.getIdentifier(), node.getPredecessor().getIdentifier(), 
165               node.getSuccessor().getIdentifier());
166           }
167       }
168  }
169 
170 //节点类
171 package DisturbedHashTable;
172 
173 import java.sql.Time;
174 import java.util.HashMap;
175 import java.util.Iterator;
176 import java.util.LinkedList;
177 import java.util.Random;
178 
179 public class Node {
180     private int identifier;
181     private HashMap<Integer, Node> fingerTable; //键值为 1,2,4,8,16……
182     private HashMap<Integer, String> keyList; //存储其负责的标识符,String 是 key
183     private Node predecessor; //前驱
184     private Node successor; //后继
185 
186     public Node(int m, String ip){
187         //将ip地址映射成m位标识符,生成m个表项目
188         this.identifier = hashFunc(m, ip);
189         this.keyList = new HashMap<Integer, String>();
190         this.fingerTable = new HashMap<Integer, Node>();
191         for(int i = 0; i < m; i++){ 
192             this.fingerTable.put((int)Math.pow(2, i), this);
193         }
194         this.predecessor = this;
195         this.successor = this;
196     }
197 
198     private int hashFunc(int m, String ip){
199         Random rd = new Random(System.currentTimeMillis());
200         int result = 0;//rd.nextInt(999999); //增加随机数,减少碰撞概率
201         for(int i = 0; i < ip.length(); i++){
202             if(ip.charAt(i) <= '9' && ip.charAt(i) >= '0') {
203                 result = (result + (int)ip.charAt(i))  % (int)Math.pow(2, m);
204             }
205         }
206         return result % (int)Math.pow(2, m);
207     }
208 
209     public HashMap<Integer, Node> getFingerTable(){
210         return this.fingerTable;
211     }
212 
213     
214 
215     //负责标识符的范围
216     /**
217       * 这里可能有三种可能:
218       * 1. 前驱节点标识符更大(如,23 ---> 1)
219       * 2. 前驱节点标识符更小(如,23 ---> 27)
220       * 3. 前驱节点标识符一样大(环中只有一个节点时)
221       */
222     public boolean inStorageBound(int aIdentifier, int m){
223         var lower = this.predecessor.identifier;
224         var upper = this.identifier;
225 
226         //第一种情况
227         if(lower > upper){
228             if(aIdentifier > lower && aIdentifier < (int)Math.pow(2, m)){
229                 return true;
230             }
231             if(aIdentifier < upper){
232                 return true;
233             }
234         }
235 
236         //第二种情况
237         if(aIdentifier > lower && aIdentifier <= upper){
238             return true;
239         }
240 
241         //第三种情况
242         //由于只有一个节点,所以它负责所有的标识符
243         if(lower == upper){
244             return true;
245         }
246 
247         return false;
248         
249     }
250 
251     public int getIdentifier(){
252         return this.identifier;
253     }
254 
255     public Node getPredecessor(){
256         return this.predecessor;
257     }
258 
259     public Node getSuccessor(){
260         return this.successor;
261     }
262 
263     public boolean setPredecessor(Node node){
264         this.predecessor = node;
265         return true;
266     }
267 
268     public boolean setSuccessor(Node node){
269         this.successor = node;
270         return true;
271     }
272 
273 }
View Code

 

Python(都写在一个文件中,节点IP地址文件同上)

 

  1 import random
  2 import time
  3 
  4 
  5 ringLenth = lambda x: int(pow(2, x))
  6 
  7 
  8 class Node:
  9     ''' 哈希表中的节点数据结构 '''
 10     
 11     def __init__(self, ip, m):
 12         self.id = self.generateId(ip = ip, m = m)
 13         self.fingerTable = self.generateFT(m = m)
 14         self.predecessor = self
 15         self.sucessor = self
 16 
 17 
 18     def generateId(self, ip, m):
 19         result = 0
 20         random.seed(time.time())
 21         for ch in ip:
 22             result = (result + ord(ch) + random.randint(1, 99)) % ringLenth(m)
 23         return result
 24 
 25 
 26     def generateFT(self, m):
 27         result = {}
 28         for i in range(m):
 29             result[int(pow(2, i))] = self
 30         return result
 31 
 32 
 33     def updateFT(self, m, dht):
 34         for item in self.fingerTable:
 35             pointer = (item + self.id) % ringLenth(m)
 36             for node in dht.getNodes():
 37                 if node.isInBound(pointer, m):
 38                     self.fingerTable[item] = node
 39                     break
 40         
 41 
 42     def isInBound(self, pointer, m):
 43         # 每个节点负责的标识符范围,有三种情况
 44         # case 1 环中只有一个节点时
 45         if self.id == self.predecessor.getId():
 46             return True
 47         # case 2 前驱节点标识符较小
 48         elif self.id > self.predecessor.getId():
 49             if pointer > self.predecessor.getId() and pointer <= self.id:
 50                 return True
 51         # case 3 前驱节点标识符较大
 52         elif self.id < self.predecessor.getId():
 53             if pointer > self.predecessor.getId() and pointer < ringLenth(m):
 54                 return True
 55             if pointer < self.id:
 56                 return True
 57         
 58         else:
 59              return False
 60 
 61     def getSucessor(self):
 62         return self.sucessor
 63 
 64     def getPredecessor(self):
 65         return self.predecessor
 66 
 67     def setSuceesor(self, node):
 68         self.sucessor = node
 69     
 70     def setPredecessor(self, node):
 71         self.predecessor = node
 72 
 73     def getId(self):
 74         return self.id 
 75 
 76 
 77     def getFT(self):
 78         return self.fingerTable
 79  
 80 
 81 
 82 
 83 class DHT:
 84     ''' 哈希表数据结构 '''
 85     def __init__(self, fileName, m):
 86         self.nodeSet = {}
 87         firstNode = None
 88         for ip in open(fileName, 'r').readlines():
 89             node = Node(ip, m)
 90             if firstNode is None:
 91                 firstNode = node
 92             self.joinToRing(firstNode, node, m)
 93 
 94 
 95     def joinToRing(self, curNode, wnode, m):
 96         if len(self.nodeSet) is not 0:
 97             sucessor = self.searchSucessor(curNode, wnode, m)
 98             predecessor = sucessor.getPredecessor()
 99 
100             wnode.setPredecessor(predecessor)
101             wnode.setSuceesor(sucessor)
102             sucessor.setPredecessor(wnode)
103             predecessor.setSuceesor(wnode)
104         
105         self.nodeSet[wnode.getId()] = wnode
106         self.updateNodesFT(m)
107 
108 
109     def searchSucessor(self, curNode, wnode, m):
110         if curNode.isInBound(wnode.getId(), m):
111             return curNode
112         else:
113             dis = (wnode.getId() - curNode.getId() + ringLenth(m)) % ringLenth(m)
114             # 寻找最大的 k 使得 2^k <= dis
115             maxk = 0
116             while((2 ** maxk) <= dis): maxk += 1
117             maxk -= 1
118             return self.searchSucessor(curNode.getFT()[2 ** maxk], wnode, m)
119 
120 
121     def updateNodesFT(self, m):
122         for node in self.nodeSet:
123             self.nodeSet[node].updateFT(m, self)
124 
125     def getNodes(self):
126         return self.nodeSet.values()
127 
128 
129 
130 
131 class Test:
132     ''' 测试哈希表是否运行正常 ''' 
133     def __init__(self, fileName, m):
134         self.dht = DHT(fileName, m) 
135         for node in self.dht.getNodes():
136             print("I am node %d, My predecessor is node %d, and My sucessor is node %d"
137             % (node.getId(), node.getPredecessor().getId(), node.getSucessor().getId()))
138 
139 
140 
141 if __name__ == "__main__":
142     test = Test("DistributedHashTable\\IPAddressList.txt", 6)
View Code

 

 

 

 

 
 

 


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM