使用redission實現分布式信號量以及遇到的一些坑


1、項目背景

  公司的緩存組件WRedis不再支持,所以需要將之前實現的WRedis遷移到新的緩存組件Redis中。Redisson基於java.utils提供了一系列分布式的工具類,比如Map、List、Lock等工具類。在redis和java增加了一層,讓我們以更熟悉的方式操作Redis。RPermitExpirableSemaphore(可過期性信號量)是Redisson提供為每個信號增加了一個過期時間。每個信號可以通過獨立的ID來辨識,釋放時只能通過提交這個ID才能釋放。

2、連接RedissonClient

redisson提供多種配置方式,程序式的、配置式的,本文選擇json配置式。

Config config = Config.fromJSON(redisCfgFile);
RedissonClient redissonClient = Redisson.create(config);

創建RedissionClient,使用json的形式創建。

redis的配置如下:

{
    "singleServerConfig":{
        "idleConnectionTimeout":10000,
        "pingTimeout":1000,
        "connectTimeout":10000,
        "timeout":6000,
        "retryAttempts":3,
        "retryInterval":1500,
        "reconnectionTimeout":3000,
        "failedAttempts":3,
        "password":"xxxxx",
        "subscriptionsPerConnection":5,
        "clientName":null,
        "address":"redis://xxxx:000",
        "subscriptionConnectionMinimumIdleSize":1,
        "subscriptionConnectionPoolSize":50,
        "connectionMinimumIdleSize":32,
        "connectionPoolSize":64,
        "database":0
    },
    "threads":0,
    "nettyThreads":0,
    "codec":{
        "class":"org.redisson.codec.JsonJacksonCodec"
    },
    "transportMode":"NIO"
}

 

配置說明:https://yq.aliyun.com/articles/551640?spm=a2c4e.11153940.0.0.3cc221bavS8pf3(參考)

遇到的坑:

  2.1、斷開連接

redis使用5.0,redission使用3.10.7,出現經常client與server斷開連接問題

 

追查問題,將線上redis版本降低,從5.0->4.0,不會出現斷開連接的問題。但是redission不支持斷開重連,使用定時任務去定時的ping server,斷開后手動重連。

  2.2、切換主從無法自動重連

public static void init() {
        SCHEDULED_EXECUTOR_SERVICE.scheduleWithFixedDelay(() -> {
            if (RUNNING) {
                try {
                    NodesGroup nodesGroup = redissonClient.getNodesGroup();
                    Collection<Node> allNodes = nodesGroup.getNodes();
                    for (Node n : allNodes) {
                        boolean ping = n.ping();
                        if (!ping) {
                            //ping不通reload client
                            reload();
                        }
                    }
                } catch (Exception e) {
                    //拋出異常reload client
                    reload();
                }
            }
        }, 0, 5, TimeUnit.SECONDS);
    }

  2.3、線上經常出現timed out

 

修改配置文件中timeout 3000 -> 6000,因為有一個5s的定時任務在ping server,所以設置稍大一點,就不會出現了。

3、使用redission實現分布式信號量

Redisson自帶一個RPermitExpirableSemaphore(有過期時間的分布式信號量)

官方的解釋:

 為每個申請對象提供參數化的釋放時間的信號量,每個許可證可以被自己的id識別,而且可以被自己的id釋放。這個許可證id是128b隨機數。同時這個分布式信號量工作於非公平模式,因此申請的順序是不可以預測。

具體實現如下:

  3.1、申請許可證

/**
     * @param semaphoreName 信號量標識
     * @param acquireId     獲取者的標識
     * @param limit         總量
     * @return int 0-成功,其他-失敗
     * @Description: 指定名稱和總量獲取信號量
     */
    public static int acquireSemaphore(String semaphoreName, String acquireId, long limit) {
        try {
            RedissonClient client = RedisUtils.getRedissonClient();
            RPermitExpirableSemaphore semaphore = client.getPermitExpirableSemaphore(semaphoreName);
            semaphore.trySetPermits((int) limit);
            //每申請一次信號量,expire信號量的生命SEMAPHORE_LIFE_EXPIRE秒
            semaphore.expire(SEMAPHORE_LIFE_EXPIRE, TimeUnit.SECONDS);
            //嘗試次數init
            int time = 0;
            while (MAX_TRY_ACQUIRE_TIME > time) {
                //嘗試獲取信號量
                String permitId = semaphore.tryAcquire(getSemaphoreLifeExpire(), getSemaphoreAcquireExpire(), TimeUnit.MILLISECONDS);
                //獲取信號量失敗
                if (null == permitId) {
                    time++;
                    continue;
                }
                //獲取信號量成功,設置acquireId和permitId的映射關系
                if (!RedisUtils.hset(getMapName(semaphoreName), acquireId, permitId, getSemaphoreAcquireExpire(), TimeUnit.MILLISECONDS)) {
                    //如果失敗,釋放資源
                    semaphore.release(permitId);
                    return acquireError();
                }
                return 0;
            }
        } catch (Exception e) {
            e.printStackTrace();
            LOG.error("exception for semaphoreName={}, acquireId={}, msg={}", semaphoreName, acquireId, e.getMessage());
        }
        return acquireError();
    }

 

將申請的的許可證id和acquireId放到hash結構中,做一個映射,因為需要這個許可證id去釋放資源。

流程圖:

 

 

  3.2、釋放信號量

/**
     * @param semaphoreName 信號量標識
     * @param acquireId     獲取者的標識
     * @Description: 釋放對應的信號量
     * @return: int 0 成功 1 失敗(超時的錯誤可能就無法成功釋放)
     * @Author: chi.zhang
     * @Date: 2020/02/19
     */
    public static int releaseSemaphore(String semaphoreName, String acquireId) {
        try {
            RedissonClient client = RedisUtils.getRedissonClient();
            RPermitExpirableSemaphore semaphore = client.getPermitExpirableSemaphore(semaphoreName);
            //根據映射關系找到permitId
            String permitId = RedisUtils.hgetStr(getMapName(semaphoreName), acquireId);
            if (StringUtils.isNotEmpty(permitId)) {
                //可能被釋放,所以使用tryRelease
                if (semaphore.tryRelease(permitId)) {
                    RedisUtils.hdel(getMapName(semaphoreName), acquireId);
                }
            } else {
                LOG.error("釋放分布式信號量失敗,semaphoreName:{},permitId{}", semaphoreName, permitId);
                return releaseError();
            }
        } catch (Exception e) {
            e.printStackTrace();
            LOG.error("exception for semaphoreName={}, acquireId={}, msg={}", semaphoreName, acquireId, e.getMessage());
            return releaseError();
        }
        return 0;
    }

流程圖:

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 參考文檔

https://github.com/redisson/redisson/wiki/%E7%9B%AE%E5%BD%95

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM