redis客戶端、分布式鎖及數據一致性


  Redis Java客戶端有很多的開源產品比如Redission、Jedis、lettuce等。

  Jedis是Redis的Java實現的客戶端,其API提供了比較全面的Redis命令的支持;Redisson實現了分布式和可擴展的Java數據結構,和Jedis相比,功能較為簡單,不支持字符串操作,不支持排序、事務、管道、分區等Redis特性。Redisson主要是促進使用者對Redis的關注分離,從而讓使用者能夠將精力更集中地放在處理業務邏輯上。由於常用的是jedis,所以這邊使用jedis作為演示。

jedis-sentinel原理:

  客戶端通過連接到哨兵集群,通過發送Protocol.SENTINEL_GET_MASTER_ADDR_BY_NAME 命令,從哨兵機器中詢問master節點的信息,拿到master節點的ip和端口號以后,再到客戶端發起連接。連接以后,需要在客戶端建立監聽機制,當master重新選舉之后,客戶端需要重新連接到新的master節點。

  構造器代碼如下:

public JedisSentinelPool(String masterName, Set<String> sentinels, final GenericObjectPoolConfig poolConfig, int timeout, final String password, final int database) {

        this.poolConfig = poolConfig;
        this.timeout = timeout;
        this.password = password;
        this.database = database;

        HostAndPort master = initSentinels(sentinels, masterName);
        initPool(master);
}

  其中 masterName 為配置 sentinels的時候再sentinel.conf 所配置的master的名稱。 

initSentinels方法:

private HostAndPort initSentinels(Set<String> sentinels, final String masterName) {

    HostAndPort master = null;
    boolean sentinelAvailable = false;

    log.info("Trying to find master from available Sentinels...");
    // 有多個sentinels,遍歷這些個sentinels
    for (String sentinel : sentinels) {
     // host:port表示的sentinel地址轉化為一個HostAndPort對象。
      final HostAndPort hap = HostAndPort.parseString(sentinel);

      log.debug("Connecting to Sentinel {}", hap);

      Jedis jedis = null;
      try {
        // 連接到sentinel
        jedis = new Jedis(hap);
        // 根據masterName得到master的地址,返回一個list,host= list[0], port =// list[1]
        List<String> masterAddr = jedis.sentinelGetMasterAddrByName(masterName);

        // connected to sentinel...
        sentinelAvailable = true;

        if (masterAddr == null || masterAddr.size() != 2) {
          log.warn("Can not get master addr, master name: {}. Sentinel: {}", masterName, hap);
          continue;
        }
        // 如果在任何一個sentinel中找到了master,不再遍歷sentinels
        master = toHostAndPort(masterAddr);
        log.debug("Found Redis master at {}", master);
        break;
      } catch (JedisException e) {
        // resolves #1036, it should handle JedisException there's another chance
        // of raising JedisDataException
        log.warn(
          "Cannot get master address from sentinel running @ {}. Reason: {}. Trying next one.", hap,
          e.toString());
      } finally {
        if (jedis != null) {
          jedis.close();
        }
      }
    }
// 到這里,如果master為null,則說明有兩種情況,一種是所有的sentinels節點都down掉了,一種是master節點沒有被存活的sentinels監控到
    if (master == null) {
      if (sentinelAvailable) {
        // can connect to sentinel, but master name seems to not
        // monitored
        throw new JedisException("Can connect to sentinel, but " + masterName
            + " seems to be not monitored...");
      } else {
        throw new JedisConnectionException("All sentinels down, cannot determine where is "
            + masterName + " master is running...");
      }
    }
    //如果走到這里,說明找到了master的地址
    log.info("Redis master running at " + master + ", starting Sentinel listeners...");
    //啟動對每個sentinels的監聽為每個sentinel都啟動了一個監聽者MasterListener。MasterListener本身是一個線程,它會去訂閱sentinel上關於master節點地址改變的消息。
    for (String sentinel : sentinels) {
      final HostAndPort hap = HostAndPort.parseString(sentinel);
      MasterListener masterListener = new MasterListener(masterName, hap.getHost(), hap.getPort());
      // whether MasterListener threads are alive or not, process can be stopped
      masterListener.setDaemon(true);
      masterListeners.add(masterListener);
      masterListener.start();
    }

    return master;
  }

  可以看到initSentinels方法的參數有一個masterName,就是我們所需要查找的master的名字。一開始,遍歷多個sentinels,一個一個連接到sentinel,去詢問關於masterName的消息,可以看到是通過jedis.sentinelGetMasterAddrByName()方法去連接sentinel,並詢問當前的master的地址。點進這個方法去看看,源代碼是這樣寫的:從哨兵節點獲取master信息的方法:調用的是與Jedis綁定的client去發送一個"get-master-addr-by-name"命令

public List<String> sentinelGetMasterAddrByName(final String masterName) {
    client.sentinel(Protocol.SENTINEL_GET_MASTER_ADDR_BY_NAME, masterName);
    final List<Object> reply = client.getObjectMultiBulkReply();
    return BuilderFactory.STRING_LIST.build(reply);
  }

  調用initPool方法(構造函數中調用),那么會初始化Jedis實例創建工廠,如果不是第一次調用(MasterListener中調用),那么只對已經初始化的工廠進行重新設置。Jedis的JedisSentinelPool的實現僅僅適用於單個master-slave。

Jedis-cluster原理:

   先來看一下他的連接方式:

Set<HostAndPort> hostAndPorts=new HashSet<>();
HostAndPort hostAndPort=new HostAndPort("192.168.11.153",7000);
HostAndPort hostAndPort1=new HostAndPort("192.168.11.153",7001);
HostAndPort hostAndPort2=new HostAndPort("192.168.11.154",7003);
HostAndPort hostAndPort3=new HostAndPort("192.168.11.157",7006);
hostAndPorts.add(hostAndPort);
hostAndPorts.add(hostAndPort1);
hostAndPorts.add(hostAndPort2);
hostAndPorts.add(hostAndPort3);
JedisCluster jedisCluster=new JedisCluster(hostAndPorts,6000);
jedisCluster.set("wuzz","hello");

程序啟動初始化集群環境:

  1)、讀取配置文件中的節點配置,無論是主從,無論多少個,只拿第一個,獲取redis連接實例

  2)、用獲取的redis連接實例執行clusterNodes()方法,實際執行redis服務端cluster nodes命令,獲取主從配置信息

  3)、解析主從配置信息,先把所有節點存放到nodes的map集合中,key為節點的ip:port,value為當前節點的jedisPool

  4)、解析主節點分配的slots區間段,把slot對應的索引值作為key,第三步中拿到的jedisPool作為value,存儲在slots的map集合中就實現了slot槽索引值與jedisPool的映射,這個jedisPool包含了master的節點信息,所以槽和幾點是對應的,與redis服務端一致

從集群環境存取值:

1)、把key作為參數,執行CRC16算法,獲取key對應的slot值

2)、通過該slot值,去slots的map集合中獲取jedisPool實例

3)、通過jedisPool實例獲取jedis實例,最終完成redis數據存取工作

分布式鎖的實現:

  分布式鎖一般有三種實現方式:1. 數據庫樂觀鎖;2. 基於Redis的分布式鎖;3. 基於ZooKeeper的分布式鎖。本篇博客將介紹第二種方式,基於Redis實現分布式鎖。

  關於鎖,其實我們或多或少都有接觸過一些,比如synchronized、 Lock這些,這類鎖的目的很簡單,在多線程環境下,對共享資源的訪問造成的線程安全問題,通過鎖的機制來實現資源訪問互斥。那么什么是分布式鎖呢?或者為什么我們需要通過Redis來構建分布式鎖,其實最根本原因就是Score(范圍),因為在分布式架構中,所有的應用都是進程隔離的,在多進程訪問共享資源的時候我們需要滿足互斥性,就需要設定一個所有進程都能看得到的范圍,而這個范圍就是Redis本身。所以我們才需要把鎖構建到Redis中。Redis里面提供了一些比較具有能夠實現鎖特性的命令,比如SETEX(在鍵不存在的情況下為鍵設置值),那么我們可以基於這個命令來去實現一些簡單的鎖的操作.

  首先,為了確保分布式鎖可用,我們至少要確保鎖的實現同時滿足以下四個條件:

  1. 互斥性。在任意時刻,只有一個客戶端能持有鎖。
  2. 不會發生死鎖。即使有一個客戶端在持有鎖的期間崩潰而沒有主動解鎖,也能保證后續其他客戶端能加鎖。
  3. 具有容錯性。只要大部分的Redis節點正常運行,客戶端就可以加鎖和解鎖。
  4. 解鈴還須系鈴人。加鎖和解鎖必須是同一個客戶端,客戶端自己不能把別人加的鎖給解了。

1.引入依賴:

<dependency>
    <groupId>redis.clients</groupId>
    <artifactId>jedis</artifactId>
    <version>3.0.0</version>
</dependency>

2.來一個連接 redis 的工具類:

public class JedisConnectionUtils {
	private static JedisPool pool=null;
    static {
        JedisPoolConfig jedisPoolConfig=new JedisPoolConfig();
        jedisPoolConfig.setMaxTotal(100);
        pool=new JedisPool(jedisPoolConfig,"192.168.254.136",6399,5000,"wuzhenzhao");
    }
    public static Jedis getJedis(){
        return pool.getResource();
    }
}

3.加鎖:jedis.set(String key, String value, String nxxx, String expx, int time),這個set()方法一共有五個形參:

  • 第一個為key,我們使用key來當鎖,因為key是唯一的。
  • 第二個為value,我們傳的是requestId,很多童鞋可能不明白,有key作為鎖不就夠了嗎,為什么還要用到value?原因就是我們在上面講到可靠性時,分布式鎖要滿足第四個條件解鈴還須系鈴人,通過給value賦值為requestId,我們就知道這把鎖是哪個請求加的了,在解鎖的時候就可以有依據。requestId可以使用UUID.randomUUID().toString()方法生成。
  • 第三個為nxxx,這個參數我們填的是NX,意思是SET IF NOT EXIST,即當key不存在時,我們進行set操作;若key已經存在,則不做任何操作;
  • 第四個為expx,這個參數我們傳的是PX,意思是我們要給這個key加一個過期的設置,具體時間由第五個參數決定。
  • 第五個為time,與第四個參數相呼應,代表key的過期時間。
private final String LOCK_NAME = "DISTRIBUTEDLOCK";

    private static final String LOCK_SUCCESS = "OK";
    private static final String SET_IF_NOT_EXIST = "NX";
    private static final String SET_WITH_EXPIRE_TIME = "PX";

    /**
     * @param acquireTimeout 獲得鎖的超時時間
     * @param lockTimeout    鎖本身的過期時間
     * @return
     */
    public String acquireLock(long acquireTimeout, long lockTimeout) {
        String identifier = UUID.randomUUID().toString();//保證釋放鎖的時候是同一個持有鎖的人
        String lockKey = "lock:" + LOCK_NAME;
        int lockExpire = (int) (lockTimeout / 1000);
        Jedis jedis = null;
        try {//獲取連接
            jedis = JedisConnectionUtils.getJedis();
            long end = System.currentTimeMillis() + acquireTimeout;
            //獲取鎖的限定時間
            while (System.currentTimeMillis() < end) {
                String result = jedis.set(lockKey, identifier, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, lockTimeout);
                if (LOCK_SUCCESS.equals(result)) {
                    return identifier;
                }
                //表示沒有超時時間
                if (jedis.ttl(lockKey) == -1) {
                    jedis.expire(lockKey, lockExpire); //設置超時時間
                }
                try {
                    //等待片刻后進行獲取鎖的重試
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        } finally {
            jedis.close(); //回收
        }
        return null;
    }

3.釋放鎖

public boolean releaseLock(String lockName,String identifier){
        System.out.println(lockName+"開始釋放鎖:"+identifier);
        String lockKey="lock:"+lockName;
        Jedis jedis=null;
        boolean isRelease=false;
        try{
            jedis=JedisConnectionUtils.getJedis();
            while(true){
            	//Watch 命令用於監視一個(或多個) key ,如果在事務執行之前這個(或這些) key 被其他命令所改動,那么事務將被打斷
                jedis.watch(lockKey);
                //判斷是否為同一把鎖
                if(identifier.equals(jedis.get(lockKey))){
                	//標記事務開始
                    Transaction transaction=jedis.multi();
                    transaction.del(lockKey);
                    if(transaction.exec().isEmpty()){
                        continue;
                    }
                    isRelease=true;
                }else {
                	//TODO 異常
                }
                jedis.unwatch();
                break;
            }
        }finally {
            jedis.close();
        }
        return  isRelease;
    }

5.測試:

public class UnitTest  extends Thread{

    @Override
    public void run() {
        while(true){
        	RedisDemo distributedLock=new RedisDemo();
            String rs=distributedLock.acquireLock("updateOrder",
                    2000,5000);
            if(rs!=null){
                System.out.println(Thread.currentThread().getName()+"-> 成功獲得鎖:"+rs);
                try {
                    Thread.sleep(1000);
                    distributedLock.releaseLock("updateOrder",rs);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                break;
            }
        }
    }

    public static void main(String[] args) {
        UnitTest unitTest=new UnitTest();
        for(int i=0;i<10;i++){
            new Thread(unitTest,"tName:"+i).start();
        }
    }
}

  如果你的項目中Redis是多機部署的,那么可以嘗試使用Redisson實現分布式鎖,這是Redis官方提供的Java組件

管道模式:

  Redis服務是一種C/S模型,提供請求-響應式協議的TCP服務,所以當客戶端發起請求,服務端處理並返回結果到客戶端,一般是以阻塞形式等待服務端的響應,但這在批量處理連接時延遲問題比較嚴重,所以Redis為了提升或彌補這個問題,引入了管道技術:可以做到服務端未及時響應的時候,客戶端也可以繼續發送命令請求,做到客戶端和服務端互不影響,服務端並最終返回所有服務端的響應,大大提高了C/S模型交互的響應速度上有了質的提高。

  使用方法:

Jedis jedis=new Jedis("192.168.254.136",6399);
Pipeline pipeline=jedis.pipelined();
    for(int i=0;i<1000;i++){
    pipeline.incr("test");
}
pipeline.sync();

Redis緩存與數據一致性問題:

  對於讀多寫少的高並發場景,我們會經常使用緩存來進行優化。比如說支付寶的余額展示功能,實際上99%的時候都是查詢,1%的請求是變更(除非是土豪,每秒鍾都有收入在不斷更改余額),所以,我們在這樣的場景下,可以加入緩存,用戶->余額

  那么基於上面的這個出發點,問題就來了,當用戶的余額發生變化的時候,如何更新緩存中的數據,也就是說。我是先更新緩存中的數據再更新數據庫的數據;還是修改數據庫中的數據再更新緩存中的數據

  數據庫的數據和緩存中的數據如何達到一致性?首先,可以肯定的是,redis中的數據和數據庫中的數據不可能保證事務性達到統一的,這個是毫無疑問的,所以在實際應用中,我們都是基於當前的場景進行權衡降低出現不一致問題的出現概率。

更新緩存還是讓緩存失效?

  更新緩存表示數據不但會寫入到數據庫,還會同步更新緩存; 而讓緩存失效是表示只更新數據庫中的數據,然后刪除緩存中對應的key。那么這兩種方式怎么去選擇?這塊有一個衡量的指標。

1. 如果更新緩存的代價很小,那么可以先更新緩存,這個代價很小的意思是我不需要很復雜的計算去獲得最新的余額數字。

2. 如果是更新緩存的代價很大,意味着需要通過多個接口調用和數據查詢才能獲得最新的結果,那么可以先淘汰緩存。淘汰緩存以后后續的請求如果在緩存中找不到,自然去數據庫中檢索。

先操作數據庫還是先操作緩存?

  當客戶端發起事務類型請求時,假設我們以讓緩存失效作為緩存的的處理方式,那么又會存在兩個情況,

1. 先更新數據庫再讓緩存失效

2. 先讓緩存失效,再更新數據庫

  前面我們講過,更新數據庫和更新緩存這兩個操作,是無法保證原子性的,所以我們需要根據當前業務的場景的容忍性來選擇。也就是如果出現不一致的情況下,哪一種更新方式對業務的影響最小,就先執行影響最小的方案。

最終一致性的解決方案:

  對於分布式系統的數據最終一致性問題,我們可以引入消息中間件,對於失敗的緩存更新存入對應的 broker,並對其進行訂閱,當有消息來了,我們可以對由於網絡等非程序錯誤的異常緩存更新進行重試更新:

 關於緩存雪崩的解決方案:

  當緩存大規模滲透在整個架構中以后,那么緩存本身的可用性講決定整個架構的穩定性。那么接下來我們來討論下緩存在應用過程中可能會導致的問題。

緩存雪崩:

  緩存雪崩是指設置緩存時采用了相同的過期時間,導致緩存在某一個時刻同時失效,或者緩存服務器宕機宕機導致緩存全面失效,請求全部轉發到了DB層面,DB由於瞬間壓力增大而導致崩潰。緩存失效導致的雪崩效應對底層系統的沖擊是很大的。

解決方式:

1. 對緩存的訪問,如果發現從緩存中取不到值,那么通過加鎖或者隊列的方式保證緩存的單進程操作,從而避免失效時並並發請求全部落到底層的存儲系統上;但是這種方式會帶來性能上的損耗

2. 將緩存失效的時間分散,降低每一個緩存過期時間的重復率

3. 如果是因為緩存服務器故障導致的問題,一方面需要保證緩存服務器的高可用、另一方面,應用程序中可以采用多級緩存

緩存穿透:

  緩存穿透是指查詢一個根本不存在的數據,緩存和數據源都不會命中。出於容錯的考慮,如果從數據層查不到數據則不寫入緩存,即數據源返回值為 null 時,不緩存 null。緩存穿透問題可能會使后端數據源負載加大,由於很多后端數據源不具備高並發性,甚至可能造成后端數據源宕掉。

解決方式

1. 如果查詢數據庫也為空,直接設置一個默認值存放到緩存,這樣第二次到緩沖中獲取就有值了,而不會繼續訪問數據庫,這種辦法最簡單粗暴。比如,”key” , “&&”。在返回這個&&值的時候,我們的應用就可以認為這是不存在的key,那我們的應用就可以決定是否繼續等待繼續訪問,還是放棄掉這次操作。如果繼續等待訪問,過一個時間輪詢點后,再次請求這個key,如果取到的值不再是&&,則可以認為這時候key有值了,從而避免了透傳到數據庫,從而把大量的類似請求擋在了緩存之中。

2. 根據緩存數據Key的設計規則,將不符合規則的key進行過濾采用布隆過濾器,將所有可能存在的數據哈希到一個足夠大的BitSet中,不存在的數據將會被攔截掉,從而避免了對底層存儲系統的查詢壓力。

布隆過濾器:

  布隆過濾器是Burton Howard Bloom在1970年提出來的,一種空間效率極高的概率型算法和數據結構,主要用來判斷一個元素是否在集合中存在。因為他是一個概率型的算法,所以會存在一定的誤差,如果傳入一個值去布隆過濾器中檢索,可能會出現檢測存在的結果但是實際上可能是不存在的,但是肯定不會出現實際上不存在然后反饋存在的結果。因此,Bloom Filter不適合那些“零錯誤”的應用場合。而在能容忍低錯誤率的應用場合下,Bloom Filter通過極少的錯誤換取了存儲空間的極大節省。

  bitmap:

  所所謂的BitMap就是用一個bit位來標記某個元素所對應的value,而key即是該元素,由於BitMap使用了bit位來存儲數據,因此可以大大節省存儲空間.

  基本思想:

  這此我用一個簡單的例子來詳細介紹BitMap算法的原理。假設我們要對0-7內的5個元素(4,7,2,5,3)進行排序(這里假設元素沒有重復)。我們可以使用BitMap算法達到排序目的。要表示8個數,我們需要8個byte。

  1.首先我們開辟一個字節(8byte)的空間,將這些空間的所有的byte位都設置為0

  2.然后便利這5個元素,第一個元素是4,因為下邊從0開始,因此我們把第五個字節的值設置為1

  3.然后再處理剩下的四個元素,最終8個字節的狀態如下圖

  

  4.現在我們遍歷一次bytes區域,把值為1的byte的位置輸出(2,3,4,5,7),這樣便達到了排序的目的

  從上面的例子我們可以看出,BitMap算法的思想還是比較簡單的,關鍵的問題是如何確定10進制的數到2進制的映射圖

  假設需要排序或則查找的數的總數N=100000000,BitMap中1bit代表一個數字,1個int = 4Bytes = 4*8bit = 32 bit,那么N個數需要N/32 int空間。所以我們需要申請內存空間的大小為int a[1 + N/32],其中:a[0]在內存中占32為可以對應十進制數0-31,依次類推:

  a[0]-----------------------------> 0-31

  a[1]------------------------------> 32-63

  a[2]-------------------------------> 64-95

  a[3]--------------------------------> 96-127

  ......................................................

  那么十進制數如何轉換為對應的bit位,下面介紹用位移將十進制數轉換為對應的bit位:

  1.求十進制數在對應數組a中的下標

  十進制數0-31,對應在數組a[0]中,32-63對應在數組a[1]中,64-95對應在數組a[2]中………,使用數學歸納分析得出結論:對於一個十進制數n,其在數組a中的下標為:a[n/32]

  2.求出十進制數在對應數a[i]中的下標

  例如十進制數1在a[0]的下標為1,十進制數31在a[0]中下標為31,十進制數32在a[1]中下標為0。 在十進制0-31就對應0-31,而32-63則對應也是0-31,即給定一個數n可以通過模32求得在對應數組a[i]中的下標。

  3.位移

  對於一個十進制數n,對應在數組a[n/32][n%32]中,但數組a畢竟不是一個二維數組,我們通過移位操作實現置1

  a[n/32] |= 1 << n % 32 
  移位操作: 
  a[n>>5] |= 1 << (n & 0x1F)

  n & 0x1F 保留n的后五位 相當於 n % 32 求十進制數在數組a[i]中的下標

  布隆過濾器就是基於這么一個原理來實現的。假設集合里面有3個元素{x, y, z},哈希函數的個數為3。首先將位數組進行初始化,將里面每個位都設置位0。對於集合里面的每一個元素,將元素依次通過3個哈希函數進行映射,每次映射都會產生一個哈希值,這個值對應位數組上面的一個點,然后將位數組對應的位置標記為1。查詢W元素是否存在集合中的時候,同樣的方法將W通過哈希映射到位數組上的3個點。如果3個點的其中有一個點不為1,則可以判斷該元素一定不存在集合中。反之,如果3個點都為1,則該元素可能存在集合中

   接下來按照該方法處理所有的輸入對象,每個對象都可能把bitMap中一些位置設置為1,也可能會遇到已經是1的位置,遇到已經為1的讓他繼續為1即可。處理完所有的輸入對象之后,在bitMap中可能已經有相當多的位置已經被為1。至此,一個布隆過濾器生成完成,這個布隆過濾器代表之前所有輸入對象組成的集合。

  如何去判斷一個元素是否存在bit array中呢? 原理是一樣,根據k個哈希函數去得到的結果,如果所有的結果都是1,表示這個元素可能(假設某個元素通過映射對應下標為4,5,6這3個點。雖然這3個點都為1,但是很明顯這3個點是不同元素經過哈希得到的位置,因此這種情況說明元素雖然不在集合中,也可能對應的都是1)存在。 如果一旦發現其中一個比特位的元素是0,表示這個元素一定不存在.

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM