第十一章 自己實現一致性hash算法


關於一致性hash算法的意義以及其相對於簡單求余法(除數求余法)的好處,查看第六章 memcached剖析

 

注意:真實的hash環的數據結構是二叉樹,這里為了簡便使用了列表List

1、一致性hash算法的使用地方

  • memcached服務器
  • Jedis分片機制

 

2、真實服務器節點沒有虛擬化的一致性hash算法實現

ServerNode:真實服務器節點

 1 package hash;
 2 
 3 /**
 4  * server節點
 5  */
 6 public class ServerNode {
 7     private String serverName;
 8     private long serverHash;
 9 
10     public String getServerName() {
11         return serverName;
12     }
13 
14     public void setServerName(String serverName) {
15         this.serverName = serverName;
16     }
17 
18     public long getServerHash() {
19         return serverHash;
20     }
21 
22     public void setServerHash(long serverHash) {
23         this.serverHash = serverHash;
24     }
25 
26     /**
27      * 下邊重寫hashcode()和equals()是為了在刪除節點的時候只根據傳入的serverName刪除即可
28      */
29     @Override
30     public int hashCode() {
31         final int prime = 31;
32         int result = 1;
33         result = prime * result
34                 + ((serverName == null) ? 0 : serverName.hashCode());
35         return result;
36     }
37 
38     @Override
39     public boolean equals(Object obj) {
40         if (this == obj)
41             return true;
42         if (obj == null)
43             return false;
44         if (getClass() != obj.getClass())
45             return false;
46         ServerNode other = (ServerNode) obj;
47         if (serverName == null) {
48             if (other.serverName != null)
49                 return false;
50         } else if (!serverName.equals(other.serverName))
51             return false;
52         return true;
53     }
54     
55     
56 }
View Code

注意:

  • serverName可以自己取名,這里取名為"ip:port"
  • 對於hashCode()和equals()方法的重寫僅僅是為了刪除服務器節點的時候,只根據serverName就可以刪除,而不需要再計算服務器節點的hash值

 

ServerComparator:真實服務器比較器

 1 package hash;
 2 
 3 import java.util.Comparator;
 4 
 5 /**
 6  * 服務器排序比較器
 7  */
 8 public class ServerComparator implements Comparator<ServerNode> {
 9 
10     public int compare(ServerNode node1, ServerNode node2) {
11         if(node1.getServerHash() <= node2.getServerHash()) {
12             return -1;//node1<node2
13         }
14         return 1;//node1>node2
15     }
16 
17 }
View Code

注意:

  • 關於java的比較器,有兩種:(后者用的多一些)
    • javabean實現comparable接口,實現compareTo()方法
    • 另外建一個類實現comparator接口,實現其中的compare()方法

 

ConsistentHash:一致性hash實現類

  1 package hash;
  2 
  3 import java.util.ArrayList;
  4 import java.util.Collections;
  5 import java.util.List;
  6 import java.util.zip.CRC32;
  7 
  8 /**
  9  * 一致性hash實現(數據結構:list)(服務器沒有虛擬化)
 10  * 一致性hash的真正數據結構是二叉樹
 11  */
 12 public class ConsistentHash {
 13     private List<ServerNode> servers = new ArrayList<ServerNode>();//存放服務器
 14     
 15     /** 計算服務器和存儲的鍵的hash值 */
 16     public long hash(String str){
 17         CRC32 crc32 = new CRC32();
 18         crc32.update(str.getBytes());
 19         return crc32.getValue();
 20     }
 21     
 22     /**
 23      * 添加server到環上
 24      * @param serverName ip:port
 25      */
 26     public void addServer(String serverName){
 27 
 28         ServerNode node = new ServerNode();
 29         node.setServerName(serverName);
 30         node.setServerHash(hash(serverName));
 31         
 32         servers.add(node);
 33         Collections.sort(servers, new ServerComparator());
 34     }
 35     
 36     /**
 37      * 從環上刪除server節點
 38      */
 39     public void deleteServer(String serverName){
 40 
 41         ServerNode node = new ServerNode();
 42         node.setServerName(serverName);
 43         
 44         servers.remove(node);
 45     }
 46     
 47     /**
 48      * 獲取一個緩存key應該存放的位置
 49      * @param cachekey 緩存的key
 50      * @return 緩存的服務器節點
 51      */
 52     public ServerNode getServer(String cachekey){
 53         long keyHash = hash(cachekey);
 54         
 55         for(ServerNode node : servers){
 56             if(keyHash<=node.getServerHash()){
 57                 return node;
 58             }
 59         }
 60         
 61         return servers.get(0);//如果node沒有合適放置位置,放在第一台服務器上去
 62     }
 63     
 64     /****************測試*******************/
 65     public void printServers(){
 66         for(ServerNode server : servers){
 67             System.out.println(server.getServerName()+"-->"+server.getServerHash());
 68         }
 69     }
 70     
 71     public static void main(String[] args) {
 72         ConsistentHash ch = new ConsistentHash();
 73         ch.addServer("127.0.0.1:11211");
 74         ch.addServer("127.0.0.1:11212");
 75         ch.addServer("127.0.0.2:11211");
 76         ch.addServer("127.0.0.2:11212");
 77         
 78         ch.printServers();
 79         
 80         ServerNode node = ch.getServer("hello");
 81         System.out.println(ch.hash("hello")+"-->"+node.getServerName()+"-->"+node.getServerHash());
 82         
 83         ServerNode node2 = ch.getServer("name");
 84         System.out.println(ch.hash("name")+"-->"+node2.getServerName()+"-->"+node2.getServerHash());
 85         
 86         ServerNode node3 = ch.getServer("a");
 87         System.out.println(ch.hash("a")+"-->"+node3.getServerName()+"-->"+node3.getServerHash());
 88         
 89         /********************刪除節點*********************/
 90         ch.deleteServer("127.0.0.1:11212");
 91         ch.printServers();
 92         
 93         ServerNode node0 = ch.getServer("hello");
 94         System.out.println(ch.hash("hello")+"-->"+node0.getServerName()+"-->"+node0.getServerHash());
 95         
 96         ServerNode node02 = ch.getServer("name");
 97         System.out.println(ch.hash("name")+"-->"+node02.getServerName()+"-->"+node02.getServerHash());
 98         
 99         ServerNode node03 = ch.getServer("a");
100         System.out.println(ch.hash("a")+"-->"+node03.getServerName()+"-->"+node03.getServerHash());
101         
102     }
103 }
View Code

注意:

  • 在計算服務器節點和存儲的key的hash值的時候,不僅僅可以使用crc32算法,還可以使用MD5算法等等,只要是最后得出的結果是一個>=0&&<=232的數就好
  • 在這個實現中,並沒有將真實服務器節點進行虛擬化

 

3、真實服務器節點虛擬化后的一致性hash算法實現

為什么要虛擬化,查看第六章 memcached剖析 ,這里只列出幾條原因:

  • 在memcached服務器較少的情況下,很難平均的分布到hash環上,這樣就會造成負載不均衡--引入虛擬化節點,可以解決這個問題
  • 當一台memcached宕機時,其原先所承受的壓力全部給了其下一個節點,為了將其原先所承受的壓力盡可能的分布給所有剩余的memcached節點,引入虛擬化節點可以達到這個目的
  • 當新添加了一台memcached服務器server1時,server1只會緩解其中的一台服務器(即server1插入環后,server1的下一個節點)的壓力,為了可以讓server1盡可能的緩解所有的memcached服務器的壓力,引入虛擬節點可以達到這個目的

 

VirtualServerNode:虛擬節點

 1 package hash2;
 2 
 3 /**
 4  * 虛擬節點
 5  */
 6 public class VirtualServerNode {
 7     private String serverName;//真實節點名稱
 8     private long virtualServerHash;//虛擬節點hash
 9 
10     public String getServerName() {
11         return serverName;
12     }
13 
14     public void setServerName(String serverName) {
15         this.serverName = serverName;
16     }
17 
18     public long getVirtualServerHash() {
19         return virtualServerHash;
20     }
21 
22     public void setVirtualServerHash(long virtualServerHash) {
23         this.virtualServerHash = virtualServerHash;
24     }
25 
26 }
View Code

注意:

  • 該類中的serverName是該虛擬節點對應的真實節點的名稱,這里就是"ip:port"
  • 真正的虛擬節點的名稱是serverName-i(其中,i是0~virtualCount的整數值),這一塊兒請查看ConsistentHashWithVirtualNode的addServer(String serverName)

 

VirtualServerComparator:虛擬節點比較器

 1 package hash2;
 2 
 3 import java.util.Comparator;
 4 
 5 /**
 6  * 虛擬節點比較器
 7  */
 8 public class VirtualServerComparator implements Comparator<VirtualServerNode> {
 9 
10     public int compare(VirtualServerNode node1, VirtualServerNode node2) {
11         if(node1.getVirtualServerHash() <= node2.getVirtualServerHash()) {
12             return -1;
13         }
14         return 1;
15     }
16 
17 }
View Code

 

ConsistentHashWithVirtualNode:真實節點虛擬化后的一致性hash算法

  1 package hash2;
  2 
  3 import java.util.ArrayList;
  4 import java.util.Collections;
  5 import java.util.List;
  6 import java.util.zip.CRC32;
  7 
  8 /**
  9  * 具有虛擬節點的一致性hash實現(數據結構:list)
 10  * 一致性hash的真正數據結構是二叉樹
 11  */
 12 public class ConsistentHashWithVirtualNode {
 13     private List<VirtualServerNode> virtualServers = new ArrayList<VirtualServerNode>();//存放虛擬節點
 14     private static final int virtualCount = 8;//每個真實節點虛擬成8個虛擬節點
 15     
 16     /** 計算服務器和存儲的鍵的hash值 */
 17     public long hash(String str){
 18         CRC32 crc32 = new CRC32();
 19         crc32.update(str.getBytes());
 20         return crc32.getValue();
 21     }
 22     
 23     /**
 24      * 添加server的虛擬節點到環上
 25      * @param serverName ip:port
 26      */
 27     public void addServer(String serverName){
 28 
 29         for(int count=0;count<virtualCount;count++){
 30             VirtualServerNode node = new VirtualServerNode();
 31             node.setServerName(serverName);
 32             node.setVirtualServerHash(hash(serverName+"-"+count));//虛擬節點的名字:serverName+"-"+count
 33             virtualServers.add(node);
 34         }
 35         
 36         Collections.sort(virtualServers, new VirtualServerComparator());
 37     }
 38     
 39     /**
 40      * 從環上刪除server節點(需要刪除所有的該server節點對應的虛擬節點)
 41      */
 42     public void deleteServer(String serverName){
 43         
 44         /*
 45          * 在這種刪除的時候,會出現java.util.ConcurrentModificationException
 46          * 這是因為此處的遍歷方式為使用ArrayList內部類Itr進行遍歷,
 47          * 在遍歷的過程中發生了remove、add等操作,導致modCount發生了變化,
 48          * 產生並發修改異常,
 49          * 可以使用下邊的那一種方式來進行遍歷(遍歷方式不是Itr),
 50          * 再這樣的遍歷過程中,add和remove都是沒有問題的
 51          */
 52         /*for(VirtualServerNode node : virtualServers){
 53             if(node.getServerName().equals(serverName)){
 54                 virtualServers.remove(node);
 55             }
 56         }*/
 57         for(int i=0;i<virtualServers.size();i++) {
 58             VirtualServerNode node = virtualServers.get(i);
 59             if(node.getServerName().equals(serverName)) {
 60                 virtualServers.remove(node);
 61             }
 62         }
 63         
 64     }
 65     
 66     /**
 67      * 獲取一個緩存key應該存放的位置
 68      * @param cachekey 緩存的key
 69      * @return 緩存的服務器節點
 70      */
 71     public VirtualServerNode getServer(String cachekey){
 72         long keyHash = hash(cachekey);
 73         
 74         for(VirtualServerNode node : virtualServers){
 75             if(keyHash<=node.getVirtualServerHash()){
 76                 return node;
 77             }
 78         }
 79         
 80         return virtualServers.get(0);//如果node沒有合適放置位置,放在第一台服務器上去
 81     }
 82     
 83     /****************測試*******************/
 84     public void printServers(){
 85         for(VirtualServerNode server : virtualServers){
 86             System.out.println(server.getServerName()+"-->"+server.getVirtualServerHash());
 87         }
 88     }
 89     
 90     public static void main(String[] args) {
 91         ConsistentHashWithVirtualNode ch = new ConsistentHashWithVirtualNode();
 92         ch.addServer("127.0.0.1:11211");
 93         ch.addServer("127.0.0.1:11212");
 94         ch.addServer("127.0.0.2:11211");
 95         ch.addServer("127.0.0.2:11212");
 96         
 97         ch.printServers();
 98         
 99         VirtualServerNode node = ch.getServer("hello");
100         System.out.println(ch.hash("hello")+"-->"+node.getServerName()+"-->"+node.getVirtualServerHash());
101         
102         VirtualServerNode node2 = ch.getServer("name");
103         System.out.println(ch.hash("name")+"-->"+node2.getServerName()+"-->"+node2.getVirtualServerHash());
104         
105         VirtualServerNode node3 = ch.getServer("a");
106         System.out.println(ch.hash("a")+"-->"+node3.getServerName()+"-->"+node3.getVirtualServerHash());
107         
108         /*********************刪除節點之后**********************/
109         ch.deleteServer("127.0.0.1:11212");
110         ch.printServers();
111         
112         VirtualServerNode node0 = ch.getServer("hello");
113         System.out.println(ch.hash("hello")+"-->"+node0.getServerName()+"-->"+node0.getVirtualServerHash());
114         
115         VirtualServerNode node02 = ch.getServer("name");
116         System.out.println(ch.hash("name")+"-->"+node02.getServerName()+"-->"+node02.getVirtualServerHash());
117         
118         VirtualServerNode node03 = ch.getServer("a");
119         System.out.println(ch.hash("a")+"-->"+node03.getServerName()+"-->"+node03.getVirtualServerHash());
120         
121     }
122 }
View Code

注意:

  • 在實際操作中,一台memcached服務器虛擬成150台比較合適(100~200)
  • 從環上刪除節點的算法寫的較差,但是考慮到刪除節點的操作在實際使用中用的比較少(宕機比較少,人為的刪除節點也較少),也無所謂
  • 刪除節點的時候,注意使用foreach語法糖去遍歷的時候,在遍歷的過程中不可以做刪除、增加操作,否則會拋出並發修改異常,具體的原因見注釋和第二章 ArrayList源碼解析;想要實現在遍歷的過程中進行刪除、增加操作,使用簡單for循環,見如上代碼


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM