高并发2-RedisConnection实现分布式锁


*简介:RedisConnection实现分布锁的方式,采用redisTemplate操作redisConnection实现setnx和setex两个命令连用**

- redisTemplate本身有没通过valueOperation实现分布式锁

* 问题探索:
  Spring Data Redis提供了与Java客户端包的集成服务,比如Jedis, JRedis等
  通过getNativeConnection的方式可以解决问题吗?

- Spring Data Redis提供了与Java客户端包的集成服务,比如Jedis, JRedis等 

  * 代码演示

 /**
            * 重写redisTemplate的set方法
            * <p>
            * 命令 SET resource-name anystring NX EX max-lock-time 是一种在 Redis 中实现锁的简单方法。
            * <p>
            * 客户端执行以上的命令:
            * <p>
            * 如果服务器返回 OK ,那么这个客户端获得锁。
            * 如果服务器返回 NIL ,那么客户端获取锁失败,可以在稍后再重试。
            *
            * @param key     锁的Key
            * @param value   锁里面的值
            * @param seconds 过去时间(秒)
            * @return
         */
          private String set(final String key, final String value, final long seconds) {
            Assert.isTrue(!StringUtils.isEmpty(key), "key不能为空");
            return redisTemplate.execute(new RedisCallback<String>() {
                @Override
                public String doInRedis(RedisConnection connection) throws DataAccessException {
                    Object nativeConnection = connection.getNativeConnection();
                    String result = null;
                    if (nativeConnection instanceof JedisCommands) {
                        result = ((JedisCommands) nativeConnection).set(key, value, NX, EX, seconds);
                    }
    
                    if (!StringUtils.isEmpty(lockKeyLog) && !StringUtils.isEmpty(result)) {
                        logger.info("获取锁{}的时间:{}", lockKeyLog, System.currentTimeMillis());
                    }
    
                    return result;
                }
            });
          }

- 为什么新版本的spring-data-redis会报class not can not be case错误

  io.lettuce.core.RedisAsyncCommandsImpl cannot be cast to redis.clients.jedis.JedisCommands

- 探索spring-data-redis升级

* 官网api分析
  https://docs.spring.io/spring-data/redis/docs/1.5.0.RELEASE/api/
  https://docs.spring.io/spring-data/redis/docs/2.0.13.RELEASE/api/

* 源码改造

 public Boolean doInRedis(RedisConnection connection) throws DataAccessException {
          RedisConnection redisConnection =             redisTemplate.getConnectionFactory().getConnection();
         return redisConnection.set(key.getBytes(), getHostIp().getBytes(), Expiration.seconds(expire), RedisStringCommands.SetOption.ifAbsent());
    }

这样通过jedis源码改造后,同样实现了setnx,setex的连用效果

完整代码:

package com.concurrent.schedule;

import com.concurrent.util.RedisService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.io.ClassPathResource;
import org.springframework.dao.DataAccessException;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.connection.RedisStringCommands;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.data.redis.core.types.Expiration;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.scripting.support.ResourceScriptSource;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.net.Inet4Address;
import java.net.InetAddress;
import java.net.NetworkInterface;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.List;

/*****
 * RedisConnection实现分布式锁
 */
@Component
public class JedisDistributedLock {

    private final Logger logger = LoggerFactory.getLogger(JedisDistributedLock.class);

    private static String LOCK_PREFIX = "JedisDistributedLock_";

    private DefaultRedisScript<Boolean> lockScript;

    @Resource
    private RedisTemplate<Object, Object> redisTemplate;

    @Autowired
    private RedisService redisService;

    public static final String UNLOCK_LUA;

    static {
        StringBuilder sb = new StringBuilder();
        sb.append("if redis.call(\"get\",KEYS[1]) == ARGV[1] ");
        sb.append("then ");
        sb.append("    return redis.call(\"del\",KEYS[1]) ");
        sb.append("else ");
        sb.append("    return 0 ");
        sb.append("end ");
        UNLOCK_LUA = sb.toString();
    }


    @Scheduled(cron = "0/10 * * * * *")
    public void lockJob() {

        String lock = LOCK_PREFIX + "JedisNxExJob";
        boolean lockRet = false;
        try {
            lockRet = this.setLock(lock, 600);

            //获取锁失败
            if (!lockRet) {
                String value = (String) redisService.genValue(lock);
                //打印当前占用锁的服务器IP
                logger.info("jedisLockJob get lock fail,lock belong to:{}", value);
                return;
            } else {
                //获取锁成功
                logger.info("jedisLockJob start  lock lockNxExJob success");
                Thread.sleep(5000);
            }
        } catch (Exception e) {
            logger.error("jedisLockJob lock error", e);

        } finally {
            if (lockRet) {
                logger.info("jedisLockJob release lock success");
                releaseLock(lock,getHostIp());
            }
        }
    }


    public boolean setLock(String key, long expire) {
        try {
            Boolean result = redisTemplate.execute(new RedisCallback<Boolean>() {
                @Override
                public Boolean doInRedis(RedisConnection connection) throws DataAccessException {
                    return connection.set(key.getBytes(), getHostIp().getBytes(), Expiration.seconds(expire) ,RedisStringCommands.SetOption.ifAbsent());
                }
            });
            return result;
        } catch (Exception e) {
            logger.error("set redis occured an exception", e);
        }
        return false;
    }

/*    public String get(String key) {
        try {
            RedisCallback<String> callback = (connection) -> {
                JedisCommands commands = (JedisCommands) connection.getNativeConnection();
                return commands.get(key);
            };
            String result = redisTemplate.execute(callback);
            return result;
        } catch (Exception e) {
            logger.error("get redis occured an exception", e);
        }
        return "";
    }*/

    /**
     * 释放锁操作
     * @param key
     * @param value
     * @return
     */
    private boolean releaseLock(String key, String value) {
        lockScript = new DefaultRedisScript<Boolean>();
        lockScript.setScriptSource(
                new ResourceScriptSource(new ClassPathResource("unlock.lua")));
        lockScript.setResultType(Boolean.class);
        // 封装参数
        List<Object> keyList = new ArrayList<Object>();
        keyList.add(key);
        keyList.add(value);
        Boolean result = (Boolean) redisTemplate.execute(lockScript, keyList);
        return result;
    }

    /**
     * 获取本机内网IP地址方法
     *
     * @return
     */
    private static String getHostIp() {
        try {
            Enumeration<NetworkInterface> allNetInterfaces = NetworkInterface.getNetworkInterfaces();
            while (allNetInterfaces.hasMoreElements()) {
                NetworkInterface netInterface = (NetworkInterface) allNetInterfaces.nextElement();
                Enumeration<InetAddress> addresses = netInterface.getInetAddresses();
                while (addresses.hasMoreElements()) {
                    InetAddress ip = (InetAddress) addresses.nextElement();
                    if (ip != null
                            && ip instanceof Inet4Address
                            && !ip.isLoopbackAddress() //loopback地址即本机地址,IPv4的loopback范围是127.0.0.0 ~ 127.255.255.255
                            && ip.getHostAddress().indexOf(":") == -1) {
                        return ip.getHostAddress();
                    }
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        return null;
    }

}

 


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM