數據分布之一致性哈希


一、數據分布

在分布式環境下,數據分布也即是將數據拆分,存放到不同節點上,是分布式系統中的基本問題之一。不同的數據分布方式需要權衡諸如伸縮性、數據傾斜(負載的均衡)、元數據維護等問題。沒有一種萬能的方案能夠解決所有的問題,不能脫離應用場景談優劣,應該要針對不同的應用場景選擇合適的方案。

一般而言,可以有以下幾種數據分布的方式:

1)哈希分區(或者叫余數法)

基本思想是根據數據的某項特征(如ID或者鍵)計算hash值,然后對節點數量N求摸,其邏輯為:hash(key) % N。這種方式的優點是設計簡單;缺點是擴展性不佳,增刪節點后,原有的映射關系大部分將失效,並且容易出現“數據傾斜”的現象。

2)按數據范圍分布

這種分區方式將數據按特征值的值域范圍划分為不同的區間,然后每個節點存儲不同區間的數據。

例如, 已知某業務系統中用戶 ID 的值域范圍是[1,100),集群有 3 個節點。則可以將用戶 ID的值域分為三個區間[1, 33)、 [33, 90)、 [90, 100),分別由 3 個節點Node1、Node2、Node3負責存儲。

3)按數據量分布

這種方式將數據視為一個順序增長的文件,並將這個文件按照某一較為固定的大小划分為若干數據塊(chunk),不同的數據塊分布到不同的服務器上,數據量分布數據與具體的數據特征無關。

4)一致性哈希

一致性哈希主要用在分布式緩存系統中,通過一種特殊的環形結構和分布規則來實現,改進的一致性哈希能夠比較好的解決擴展性問題和負載均衡問題。

本文主要討論一致性哈希的一些有趣的原理和特性,並實現一個簡潔地可演示和模擬的Demo算法,最后也簡單的提及Redis Cluster中的數據分布方式,其與一致性哈希的思想相似之處但也有些差別。

二、一致性哈希

2.1 概述

一致性哈希的概念最初在論文Consistent Hashing and Random Trees:Distributed Caching Protocols for Relieving Hot Spots on the World Wide Web的第四節Consistent Hashing中被提出來,具有如下四個特性,其陳述個人覺得比較理論化:

1、平衡性(Balance):平衡性是指哈希的結果能夠盡可能分布到所有的緩沖中去,這樣可以使得所有的緩沖空間都得到利用。很多哈希算法都能夠滿足這一條件。

2、單調性(Monotonicity):單調性是指如果已經有一些內容通過哈希分派到了相應的緩沖中,又有新的緩沖加入到系統中。哈希的結果應能夠保證原有已分配的內容可以被映射到原有的或者新的緩沖中去,而不會被映射到舊的緩沖集合中的其他緩沖區。 

3、分散性(Spread):在分布式環境中,終端有可能看不到所有的緩沖,而是只能看到其中的一部分。當終端希望通過哈希過程將內容映射到緩沖上時,由於不同終端所見的緩沖范圍有可能不同,從而導致哈希的結果不一致,最終的結果是相同的內容被不同的終端映射到不同的緩沖區中。這種情況顯然是應該避免的,因為它導致相同內容被存儲到不同緩沖中去,降低了系統存儲的效率。分散性的定義就是上述情況發生的嚴重程度。好的哈希算法應能夠盡量避免不一致的情況發生,也就是盡量降低分散性。 

4、負載(Load):負載問題實際上是從另一個角度看待分散性問題。既然不同的終端可能將相同的內容映射到不同的緩沖區中,那么對於一個特定的緩沖區而言,也可能被不同的用戶映射為不同 的內容。與分散性一樣,這種情況也是應當避免的,因此好的哈希算法應能夠盡量降低緩沖的負荷。

本文所講的一致性哈希算法滿足平衡性和單調性,分散性和負載並似乎不具備也沒有含義。

2.2 基本的算法原理

一致性哈希算法基本原理大致可以通過幾個步驟來解釋:構造一致性哈希環、節點映射、路由規則。以下以鍵值對緩存服務器為場景。

1)構造一致性哈希環

一致性哈希算法中首先有一個哈希函數,哈希函數產生hash值,所有可能的哈希值構成一個哈希空間,哈希空間為[0,2^32-1],這本來是一個“線性”的空間,但是在算法中通過恰當邏輯控制,使其首尾相銜接,也即是0=2^32,這樣就構造一個邏輯上的環形空間。

2)節點映射

將集群中的各節點映射到環上的某個一位置。比如集群中有三個節點,那么可以大致均勻的將其分布在環上。

3)路由規則

路由規則包括存儲(setX)和取值(getX)規則。

當需要存儲一個<key-value>對時,首先計算鍵key的hash值:hash(key),這個hash值必然對應於一致性hash環上的某個位置,然后沿着這個值按順時針找到第一個節點,並將該鍵值對存儲在該節點上。例如存儲<key1-value1>時,按此規則應該存儲在Node1服務器上(見下圖)。

當需要按某個鍵獲取值時,與上述規則基本相同,也是首先計算key的hash值,找到對應的節點,從該節點中獲取對應鍵的值。

整個算法的模型如下圖所示,

集群中有三個節點(Node1、Node2、Node3),五個鍵(key1、key2、key3、key4、key5),其路由規則為:

key1 -> Node1
key2、key3 -> Node2
key4、key5 -> Node3 

 不難發現,基本的一致性哈希算法有一些地方不太讓人滿意。

當集群中增加節點時,比如當在Node2和Node3之間增加了一個節點Node4,此時再訪問節點key4時,不能在Node4中命中,更一般的,介於Node2和Node4之間的key均失效,這樣的失效方式太過於“集中”和“暴力”,更好的方式應該是“平滑”和“分散”地失效。如下圖所示:

特別是當集群中節點本身比較少時,因增刪節點導致的不命中現象比較明顯。

除了上面的問題,還有一個比較明顯的問題是負載問題:增加節點只能對下一個相鄰節點有比較好的負載分擔效果,例如上圖中增加了節點Node4只能夠對Node3分擔部分負載,對集群中其他的節點基本沒有起到負載分擔的效果;類似地,刪除節點會導致下一個相鄰節點負載增加,而其他節點卻不能有效分擔負載壓力。

針對以上兩個主要的問題,特別是如何解決各節點負載動態均衡的問題,出現了一種通過增加虛擬節點的改進算法。

2.3 增加虛擬節點改進算法

為了在增刪節點的時候,各節點能夠保持動態的均衡,將每個真實節點虛擬出若干個虛擬節點,再將這些虛擬節點隨機映射到環上。此時每個真實節點不再映射到環上,真實節點只是用來存儲鍵值對,它負責接應各自的一組環上虛擬節點。當對鍵值對進行存取路由時,首先路由到虛擬節點上,再由虛擬節點找到真實的節點。

如下圖所示,三個節點真實節點:Node1、Node2和Node3,每個真實節點虛擬出三個虛擬節點:X#V1、X#V2和X#V3,這樣每個真實節點所負責的hash空間不再是連續的一段,而是分散在環上的各處,這樣就可以將局部的壓力均衡到不同的節點,虛擬節點越多,分散性越好,理論上負載就越傾向均勻,如下圖所示:

通俗的理解,增加虛擬節點其實是減小了路由規則過程中的粒度,使每個真實節點可以分攤局部壓力。

三、Demo實現

以下是針對帶虛擬節點的一致性哈希算法的一個簡單的Demo實現,重點在於演示其算法的工作原理。

元數據包括真實節點、虛擬節點以及各虛擬節點對應的真實節點映射關系。虛擬節點采用平衡二叉搜索樹存儲,虛擬節點名通過真實節點拼接序列號實現,這樣只要得到虛擬節點名截取其前綴就可以得到對應的真實節點,簡單方便。

通過增加節點和刪除節點模擬節點上線和下線的情況,並測試集群總節點變化過程中的負載均衡情況。

3.1 實現類

import java.util.*;
/**
 * 一致性哈希算法
 * author:Qcer
 * date:2018/07/18
 * */
public class ConsistentHash {
    // 每個真實節點負責多少個虛擬節點
    private int virtualNodesPerRealNode;
    private int totalVirtualNodes;
    // 真實結點列表
    private List<String> realNodes = new LinkedList<String>();
    // 真實結點與各虛擬的映射關系
    private HashMap<String,LinkedList<String>> mapping = new HashMap<>();
    // 虛擬節點,key表示虛擬節點的hash值,value表示虛擬節點的名稱,采用平衡二叉搜索樹結構存儲
    private SortedMap<Integer, String> virtualNodes = new TreeMap<Integer, String>();

    public ConsistentHash(String[] nodes,int virtualNodesPerRealNode){
        this.virtualNodesPerRealNode = virtualNodesPerRealNode;
        addNode(nodes);
    }

    // 使用FNV1_32_HASH算法計算服務器的Hash值,hash空間為[0,2^32-1],程序控制實現邏輯的環形結構
    private int getHash(String str){
        final int p = 16777619;
        int hash = (int)2166136261L;
        for (int i = 0; i < str.length(); i++){
            hash = (hash ^ str.charAt(i)) * p;
        }
        hash += hash << 13;
        hash ^= hash >> 7;
        hash += hash << 3;
        hash ^= hash >> 17;
        hash += hash << 5;
        // 如果算出來的值為負數則取其絕對值
        if (hash < 0)
            hash = Math.abs(hash);
        return hash;
    }

    // 根據某個key,首先訪問到虛擬節點,再訪問到真實節點。
    public String visit(String key){
        // 得到該key的hash值
        int hash = getHash(key);
        // 得到大於該hash值的所有Map
        SortedMap<Integer, String> subMap = virtualNodes.tailMap(hash);
        String virtualNode = null;
        if (subMap.isEmpty()){
            // 如果沒有比該key的hash值更大的,表明該hash值剛好是一致性hash環的尾端
            // 此時從0開始,順時針取第一個虛擬節點
            Integer i = virtualNodes.firstKey();
            // 返回對應的虛擬節點
            virtualNode = virtualNodes.get(i);
        } else {
            // 按順時針方向當前最近的虛擬結點
            Integer i = subMap.firstKey();
            // 返回對應的虛擬節點
            virtualNode = subMap.get(i);
        }
        // 截取virtualNode的前綴,獲得真實節點
        if(virtualNode != null){
            virtualNode = virtualNode.substring(0, virtualNode.indexOf("##"));
        }
        return virtualNode;
    }


    // 增加節點,模擬服務器上線的情況。
    public void addNode(String[] nodes) {
        // 維護元數據,包括真實節點信息,虛擬節點信息
        for (String node : nodes){
            // 維護真實節點信息
            realNodes.add(node);
            LinkedList<String> list = new LinkedList<>();
            // 維護虛擬節點信息,key為hash值,value的前綴為真實節點
            for(int count = 0, sequence = 0; count < virtualNodesPerRealNode;){
                String virtualNodeName = node + "##VN" + String.valueOf(sequence++);
                int hash = getHash(virtualNodeName);

                // 一般來講,當虛擬節點數量<<hash空間時,hash函數碰撞的可能性比較小,但嚴謹其見,此處應該考慮沖突。
                if (!virtualNodes.containsKey(hash)) {
                    virtualNodes.put(hash, virtualNodeName);
                    count++;
                    list.add(virtualNodeName);//維護虛擬節點與真實節點的映射關系
                }
            }
            mapping.put(node,list);
        }
        // 維護虛擬節點總數
        totalVirtualNodes = realNodes.size() * virtualNodesPerRealNode;
    }

    // 刪除節點,模擬服務器下線的情況。
    public void removeNode(String[] nodes) {
        for (String node : nodes) {
            if (realNodes.contains(node)) {
                realNodes.remove(node);
            }
            if (mapping.containsKey(node)) {
                LinkedList<String> list = mapping.remove(node);
                for (String virtual : list) {
                    virtualNodes.remove(getHash(virtual));
                }
            }
        }
        totalVirtualNodes = realNodes.size() * virtualNodesPerRealNode;
    }

    // 獲取元數據
    public void getMetaData() {
        System.out.println("真實節點:");
        for (int i = 0; i < realNodes.size(); i++) {
            System.out.println(realNodes.get(i));
        }
        System.out.println("虛擬節點數量:" + totalVirtualNodes);
        for (String str : mapping.keySet()) {
            System.out.println(mapping.get(str).size());
        }
    }
    
    // 測試增刪節點后各節點的負載
    public void testLoadBalance(String[] keys){
        System.out.println("真實節點數量:" + realNodes.size());
        System.out.println("虛擬節點數量:" + totalVirtualNodes);
        System.out.println("各節點負載情況:");
        int keyNumber = keys.length;
        int realNodeNumber = realNodes.size();
        String hitNode = "";
        int[] count = new int[realNodeNumber];
        for(int i = 0; i < keyNumber; i++) {
            hitNode = visit(keys[i]);
            for (int j = 0; j < realNodeNumber; j++){
                if (hitNode.equals(realNodes.get(j))){
                    count[j] += 1;
                }
            }
        }
        for (int i = 0; i < realNodeNumber; i++) {
            System.out.println("[Node"+i+"-"+realNodes.get(i)+"]" +" : "+count[i]);
        }
    }
}

 

3.2 測試類

/**一致性哈希算法Test類
 * author:Qcer
 * date:2018/07/18
 * */
public class ConsistentHashTest {

    // 產生隨機字符串,視為key
    public static String[] genKeys(int number) {
        String[] ary = new String[number];
        int length = 0;
        for(int j = 0; j < number; j++) {
            String temp = "";
            length = (int)(Math.random() * 8 + 2);
            for(int i = 0; i < length; i++) {
                int intValue = (int)(Math.random() * 26 + 97);
                temp += (char)intValue;
            }
            ary[j] = temp;
        }
        return ary;
    }

    public static void main(String[] args){
        String[] nodes = {
                "10.10.25.11:6379",
                "10.10.25.12:6379",
                "10.10.25.13:6379",
                "10.10.25.14:6379",
                "10.10.25.15:6379"};

        int keyCount = 10000;
        String[] keys = genKeys(keyCount);

        System.out.println("--------初始狀態-------");
        ConsistentHash ch = new ConsistentHash(nodes,200);
        ch.testLoadBalance(keys);

        System.out.println("--------模擬上線-------");
        String[] onLine = {"10.10.25.20:6379","10.10.25.21:6379"};
        ch.addNode(onLine);
        ch.testLoadBalance(keys);

        System.out.println("--------模擬下線-------");
        String[] offLine = {"10.10.25.11:6379","10.10.25.14:6379"};
        ch.removeNode(offLine);
        ch.testLoadBalance(keys);

        System.out.println("--------獲取元數據-------");
        ch.getMetaData();
    }
}

3.3 測試結果

--------初始狀態-------
真實節點數量:5
虛擬節點數量:1000
各節點負載情況:
[Node0-10.10.25.11:6379] : 1982
[Node1-10.10.25.12:6379] : 2157
[Node2-10.10.25.13:6379] : 2063
[Node3-10.10.25.14:6379] : 1659
[Node4-10.10.25.15:6379] : 2139
--------模擬上線-------
真實節點數量:7
虛擬節點數量:1400
各節點負載情況:
[Node0-10.10.25.11:6379] : 1373
[Node1-10.10.25.12:6379] : 1599
[Node2-10.10.25.13:6379] : 1382
[Node3-10.10.25.14:6379] : 1268
[Node4-10.10.25.15:6379] : 1416
[Node5-10.10.25.20:6379] : 1488
[Node6-10.10.25.21:6379] : 1474
--------模擬下線-------
真實節點數量:5
虛擬節點數量:1000
各節點負載情況:
[Node0-10.10.25.12:6379] : 2097
[Node1-10.10.25.13:6379] : 1909
[Node2-10.10.25.15:6379] : 1769
[Node3-10.10.25.20:6379] : 2131
[Node4-10.10.25.21:6379] : 2094
--------獲取元數據-------
真實節點:
10.10.25.12:6379
10.10.25.13:6379
10.10.25.15:6379
10.10.25.20:6379
10.10.25.21:6379
虛擬節點數量:1000

可見,在上下線的過程中,各節點能夠大致的保持一個動態的負載平衡。

四、Redis Cluster中的虛擬槽分區

在Redis Cluster中,依然采用了虛擬槽的方式,總共有16384=2^14個虛擬槽,其鍵與槽的映射關系為slot=CRC16(key)&16383,因此虛擬槽只是一個邏輯的概念,並不真實存存儲數據,虛擬槽背后的真實節點才是數據存放的地方。

每個真實節點會負責一部分的虛擬槽,采用虛擬槽分區的方式能夠解耦數據與節點的關系,方便集群的伸縮。在搭建集群的過程中,需要給定每個虛擬槽與真實節點之間的映射關系。

例如,以6個節點搭建一個小規模redis集群,其真實節點局域網IP和端口如下:

192.168.0.117:6390
192.168.0.117:6391
192.168.0.117:6392
192.168.0.117:6393
192.168.0.117:6394
192.168.0.117:6395

這里采用redis-trib.rb集群管理工具實現節點握手、虛擬槽分配、檢查等功能:

[root@localhost cluster]# 
[root@localhost cluster]# redis-trib.rb create --replicas 1 192.168.0.117:6390 192.168.0.117:6391 192.168.0.117:6392 192.168.0.117:6393 192.168.0.117:6394 192.168.0.117:6395
>>> Creating cluster
/usr/local/ruby/lib/ruby/gems/2.4.0/gems/redis-3.3.0/lib/redis/client.rb:459: warning: constant ::Fixnum is deprecated
>>> Performing hash slots allocation on 6 nodes...
Using 3 masters:
192.168.0.117:6390
192.168.0.117:6391
192.168.0.117:6392
Adding replica 192.168.0.117:6393 to 192.168.0.117:6390
Adding replica 192.168.0.117:6394 to 192.168.0.117:6391
Adding replica 192.168.0.117:6395 to 192.168.0.117:6392
M: c90dd52f29968f10bb99a8bdb9ad839009944406 192.168.0.117:6390 slots:0-5460 (5461 slots) master
M: 3b226aa47c0afe5aa76501a61db2ae2af6cfe5ff 192.168.0.117:6391 slots:5461-10922 (5462 slots) master
M: 03e45dc39322d0df04bf2cdaba2498f4918a3e76 192.168.0.117:6392 slots:10923-16383 (5461 slots) master
S: b3cb797097633c9f95bd4a706fcf9a3f09db5ca1 192.168.0.117:6393
   replicates c90dd52f29968f10bb99a8bdb9ad839009944406
S: 1149158458c4a2eaa249b1981e111b1ea1e2a542 192.168.0.117:6394
   replicates 3b226aa47c0afe5aa76501a61db2ae2af6cfe5ff
S: 5a5038176c110ff6f07d31f81c725caaa8ae7c74 192.168.0.117:6395
   replicates 03e45dc39322d0df04bf2cdaba2498f4918a3e76
Can I set the above configuration? (type 'yes' to accept): yes
>>> Nodes configuration updated
>>> Assign a different config epoch to each node
>>> Sending CLUSTER MEET messages to join the cluster
Waiting for the cluster to join..
>>> Performing Cluster Check (using node 192.168.0.117:6390)
M: c90dd52f29968f10bb99a8bdb9ad839009944406 192.168.0.117:6390
   slots:0-5460 (5461 slots) master
   1 additional replica(s)
M: 3b226aa47c0afe5aa76501a61db2ae2af6cfe5ff 192.168.0.117:6391 slots:5461-10922 (5462 slots) master 1 additional replica(s)
S: 1149158458c4a2eaa249b1981e111b1ea1e2a542 192.168.0.117:6394
   slots: (0 slots) slave
   replicates 3b226aa47c0afe5aa76501a61db2ae2af6cfe5ff
S: b3cb797097633c9f95bd4a706fcf9a3f09db5ca1 192.168.0.117:6393
   slots: (0 slots) slave
   replicates c90dd52f29968f10bb99a8bdb9ad839009944406
M: 03e45dc39322d0df04bf2cdaba2498f4918a3e76 192.168.0.117:6392 slots:10923-16383 (5461 slots) master 1 additional replica(s)
S: 5a5038176c110ff6f07d31f81c725caaa8ae7c74 192.168.0.117:6395
   slots: (0 slots) slave
   replicates 03e45dc39322d0df04bf2cdaba2498f4918a3e76
[OK] All nodes agree about slots configuration.
>>> Check for open slots...
>>> Check slots coverage...
[OK] All 16384 slots covered.
[root@localhost cluster]# 

從上面的過程中可以看到,總共的16384個虛擬槽被分為5461、5462、5461三部分,分別分配給三個master節點,另外3個slave節點由於只能從對應的master節點復制數據默認只讀不可寫,因此不分配虛擬槽。

當虛擬槽全部分配完成,集群處於可用狀態:

192.168.0.117:6391> 
192.168.0.117:6391> cluster keyslot qcer
(integer) 7408
192.168.0.117:6391> set qcer "hello world"
OK
192.168.0.117:6391> 

在集群伸縮的過程中,由於節點上線或者下線,需要進行虛擬槽的遷移。 

五、References

1、大型網站技術架構_核心原理與案例分析

2、分布式系統原理介紹

3、Redis開發和運維

4、Consistent Hashing and Random Trees:Distributed Caching Protocols for Relieving Hot Spots on the World Wide Web,SECTION 4 Consistent Hashing.

 

轉載請注明原文出處:https://www.cnblogs.com/qcblog/p/8886360.html 

謝謝!


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM