数据一致性算法


最近工作中遇到了数据一致性问题,为方便以后使用,特学习记录一下:

目前遇到现象:

(1)缓存与数据库数据不一致情况

(2)分布式系统中各节点数据不一致情况

原因:

并发情况下,执行顺序会引起写请求和读请求拿到的数据不一致,导致脏读、幻读等。

解决方案:

(1)针对本地缓存与数据库数据不一致问题,可以通过先更新数据库后删除缓存+读写分离来解决,具体可参考另一篇文章《guava缓存使用

<1>缓存失效

<2>请求A读缓存失败,读数据库

<3>请求A写缓存

<4>请求B写数据库,删除缓存

<5>下个请求读缓存失败,写缓存

为什么不会出现3/4步骤对调的情况,主要是数据库的读操作远快于写操作且读写分离的一个实现。如果还有担心会对调可用消息中间件进行处理。

(2)针对分布式数据不一致问题,一致性哈希算法,将数据存储在不同的机器上

  • 原理

1)一致性哈希将整个哈希值空间组织成一个虚拟的圆环,如某哈希函数H的值空间为0-2^32-1(即哈希值是一个32位无符号整形),整个哈希空间环起点和终点分别是0和2^32-1。

2)空间制定:整个空间按顺时针方向组织。0和232-1在零点中方向重合。

3)服务器节点位置:将分布式各个服务器使用Hash进行一个哈希,具体可以选择服务器的ip或主机名作为关键字进行哈希,这样每台机器就能确定其在哈希环上的位置。

  数据倾斜问题优化:可对每个服务器进行多个hash以保证机器节点的均衡分布。

4)数据节点位置:将数据key使用相同的函数Hash计算出哈希值,并确定此数据在环上的位置,从此位置沿环顺时针“行走”,第一台遇到的服务器就是其应该定位到的服务器。

 

 

  •  实操
  1. hash值计算:通过支持MD5与MurmurHash两种计算方式。
  2. 一致性的实现:通过java的TreeMap来模拟环状结构,实现均匀分布

 1.hash计算方法:MD5

import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;

/*
 * 实现一致性哈希算法中使用的哈希函数,使用MD5算法来保证一致性哈希的平衡性
 */
public class HashFunction {
    private MessageDigest md5 = null;

    public long hash(String key) {
        if (md5 == null) {
            try {
                md5 = MessageDigest.getInstance("MD5");
            } catch (NoSuchAlgorithmException e) {
                throw new IllegalStateException("no md5 algrithm found");
            }
        }

        md5.reset();
        md5.update(key.getBytes());
        byte[] bKey = md5.digest();
        //具体的哈希函数实现细节--每个字节 & 0xFF 再移位
        long result = ((long) (bKey[3] & 0xFF) << 24)
                | ((long) (bKey[2] & 0xFF) << 16
                        | ((long) (bKey[1] & 0xFF) << 8) | (long) (bKey[0] & 0xFF));
        return result & 0xffffffffL;
    }
}

2.环的定义

for (int i = 0; i < numberOfReplicas; i++) {
            // 对于一个实际机器节点 node, 对应 numberOfReplicas 个虚拟节点
            /*
             * 不同的虚拟节点(i不同)有不同的hash值,但都对应同一个实际机器node
             * 虚拟node一般是均衡分布在环上的,数据存储在顺时针方向的虚拟node上
             */
            circle.put(hashFunction.hash(node.toString() + i), node);
}

3.数据hash映射关系确定

if (!circle.containsKey(hash)) {
    //数据映射在两台虚拟机器所在环之间,就需要按顺时针方向寻找机器
    SortedMap<Long, T> tailMap = circle.tailMap(hash);
    hash = tailMap.isEmpty() ? circle.firstKey() : tailMap.firstKey();
}

4.测试

package com.unionpay.techjoin.admin.controller.userbuss;

import java.util.*;

public class ConsistentHash<T> {
    private HashFunction hashFunction;
    private int numberOfReplicas;// 节点的复制因子,实际节点个数 * numberOfReplicas = 虚拟节点个数
    private final SortedMap<Long, T> circle = new TreeMap<Long, T>();// 存储虚拟节点的hash值到真实节点的映射

    public ConsistentHash(HashFunction hashFunction, int numberOfReplicas,
                          Collection<T> nodes) {
        this.hashFunction = hashFunction;
        this.numberOfReplicas = numberOfReplicas;
        for (T node : nodes) {
            add(node);
        }
    }

    public void add(T node) {
        for (int i = 0; i < numberOfReplicas; i++)
            // 对于一个实际机器节点 node, 对应 numberOfReplicas 个虚拟节点
            /*
             * 不同的虚拟节点(i不同)有不同的hash值,但都对应同一个实际机器node
             * 虚拟node一般是均衡分布在环上的,数据存储在顺时针方向的虚拟node上
             */
            circle.put(hashFunction.hash(node.toString() + i), node);
    }

    public void remove(T node) {
        for (int i = 0; i < numberOfReplicas; i++)
            circle.remove(hashFunction.hash(node.toString() + i));
    }

    /*
     * 获得一个最近的顺时针节点,根据给定的key 取Hash
     * 然后再取得顺时针方向上最近的一个虚拟节点对应的实际节点
     * 再从实际节点中取得 数据
     */
    public T get(Object key) {
        if (circle.isEmpty())
            return null;
        long hash = hashFunction.hash((String) key);// node 用String来表示,获得node在哈希环中的hashCode
        if (!circle.containsKey(hash)) {//数据映射在两台虚拟机器所在环之间,就需要按顺时针方向寻找机器
            SortedMap<Long, T> tailMap = circle.tailMap(hash);
            hash = tailMap.isEmpty() ? circle.firstKey() : tailMap.firstKey();
        }
        return circle.get(hash);
    }

    public long getSize() {
        return circle.size();
    }

    /*
     * 查看MD5算法生成的hashCode值---表示整个哈希环中各个虚拟节点位置
     */
    public void testBalance(){
        Set<Long> sets  = circle.keySet();//获得TreeMap中所有的Key
        SortedSet<Long> sortedSets= new TreeSet<Long>(sets);//将获得的Key集合排序
        System.out.println("[1]节点为: ----");
        for(Long hashCode : sortedSets){
            System.out.print(hashCode);
            System.out.print(",");
        }
        System.out.println("----");
        System.out.println("[2]节点间距为: ----");
        /*
         * 查看用MD5算法生成的long hashCode 相邻两个hashCode的差值
         */
        Iterator<Long> it = sortedSets.iterator();
        Iterator<Long> it2 = sortedSets.iterator();
        if(it2.hasNext())
            it2.next();
        long keyPre, keyAfter;
        while(it.hasNext() && it2.hasNext()){
            keyPre = it.next();
            keyAfter = it2.next();
            System.out.print(keyAfter - keyPre);
            System.out.print(",");
        }
        System.out.println("----");
    }

    //根据key查找对应的节点
    public String getNodeForKey(String key) {
        long hash = hashFunction.hash(key);
        if (!circle.containsKey(hash)) {
            //数据映射在两台虚拟机器所在环之间,就需要按顺时针方向寻找机器
            SortedMap<Long, T> tailMap = circle.tailMap(hash);
            hash = tailMap.isEmpty() ? circle.firstKey() : tailMap.firstKey();
        }
        return circle.get(hash)+",hash为:"+hash;
    }

    public static void main(String[] args) {
        Set<String> nodes = new HashSet<String>();
        nodes.add("A");
        nodes.add("B");
        nodes.add("C");

        ConsistentHash<String> consistentHash = new ConsistentHash<String>(new HashFunction(), 2, nodes);
        System.out.println("hash circle size: " + consistentHash.getSize());
        consistentHash.testBalance();
        System.out.println("[3]数据分配情况为: ----");
        for (int i = 0; i < 10; i++){
            String Key="user_" + i;
            System.out.println("Key:"+i+"分配到的Server为:"+consistentHash.getNodeForKey(Key));
        }

        //新加一节点
        consistentHash.add("D");
        System.out.println("hash circle size: " + consistentHash.getSize());
        consistentHash.testBalance();
        System.out.println("[3]数据分配情况为: ----");
        for (int i = 0; i < 10; i++){
            String Key="user_" + i;
            System.out.println("Key:"+i+"分配到的Server为:"+consistentHash.getNodeForKey(Key));
        }

        //删除一节点
        consistentHash.remove("A");
        System.out.println("hash circle size: " + consistentHash.getSize());
        consistentHash.testBalance();
        System.out.println("[3]数据分配情况为: ----");
        for (int i = 0; i < 10; i++){
            String Key="user_" + i;
            System.out.println("Key:"+i+"分配到的Server为:"+consistentHash.getNodeForKey(Key));
        }
    }

}
  • 结果
hash circle size: 6
[1]节点为: ----
748451404,769404186,1696944585,1830063320,3862426151,3864615324,----
[2]节点间距为: ----
20952782,927540399,133118735,2032362831,2189173,----
[3]数据分配情况为: ----
Key:0分配到的Server为:B,hash为:748451404
Key:1分配到的Server为:B,hash为:1696944585
Key:2分配到的Server为:A,hash为:1830063320
Key:3分配到的Server为:A,hash为:3862426151
Key:4分配到的Server为:A,hash为:3862426151
Key:5分配到的Server为:B,hash为:748451404
Key:6分配到的Server为:A,hash为:3862426151
Key:7分配到的Server为:B,hash为:748451404
Key:8分配到的Server为:B,hash为:748451404
Key:9分配到的Server为:A,hash为:3862426151
hash circle size: 8
[1]节点为: ----
748451404,769404186,1696944585,1830063320,3372629518,3766042698,3862426151,3864615324,----
[2]节点间距为: ----
20952782,927540399,133118735,1542566198,393413180,96383453,2189173,----
[3]数据分配情况为: ----
Key:0分配到的Server为:B,hash为:748451404
Key:1分配到的Server为:B,hash为:1696944585
Key:2分配到的Server为:A,hash为:1830063320
Key:3分配到的Server为:D,hash为:3372629518
Key:4分配到的Server为:D,hash为:3372629518
Key:5分配到的Server为:B,hash为:748451404
Key:6分配到的Server为:D,hash为:3766042698
Key:7分配到的Server为:B,hash为:748451404
Key:8分配到的Server为:B,hash为:748451404
Key:9分配到的Server为:D,hash为:3372629518
hash circle size: 6
[1]节点为: ----
748451404,769404186,1696944585,3372629518,3766042698,3864615324,----
[2]节点间距为: ----
20952782,927540399,1675684933,393413180,98572626,----
[3]数据分配情况为: ----
Key:0分配到的Server为:B,hash为:748451404
Key:1分配到的Server为:B,hash为:1696944585
Key:2分配到的Server为:D,hash为:3372629518
Key:3分配到的Server为:D,hash为:3372629518
Key:4分配到的Server为:D,hash为:3372629518
Key:5分配到的Server为:B,hash为:748451404
Key:6分配到的Server为:D,hash为:3766042698
Key:7分配到的Server为:B,hash为:748451404
Key:8分配到的Server为:B,hash为:748451404
Key:9分配到的Server为:D,hash为:3372629518

 

从上面执行结果可以看出哈希一致性算法的优势,对于删除节点或新增节点仅影响对于片区内的。通过一致性hash算法可以很好的解决Redis分布式的问题,且当Redis server增加或减少的时候,之前存储的缓存命中率还是比较高的。


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM