redis分布式锁Redisson扩展


如果大家项目中Redis是多机部署的可以来好好看看这篇实现,讲的非常好。

使用Redisson实现分布式锁,Spring AOP简化之

 

源码

Redisson概述

Redisson是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid)。它不仅提供了一系列的分布式的Java常用对象,还提供了许多分布式服务。其中包括(BitSet, Set, Multimap, SortedSet, Map, List, Queue, BlockingQueue, Deque, BlockingDeque, Semaphore, Lock, AtomicLong, CountDownLatch, Publish / Subscribe, Bloom filter, Remote service, Spring cache, Executor service, Live Object service, Scheduler service) Redisson提供了使用Redis的最简单和最便捷的方法。Redisson的宗旨是促进使用者对Redis的关注分离(Separation of Concern),从而让使用者能够将精力更集中地放在处理业务逻辑上。

Redisson底层采用的是Netty 框架。支持Redis 2.8以上版本,支持Java1.6+以上版本。

关于Redisson更多详细介绍,可参考Redssion概述

Redisson提供的分布式锁

可重入锁

Redisson的分布式可重入锁RLock Java对象实现了java.util.concurrent.locks.Lock接口,同时还支持自动过期解锁。下面是RLock的基本使用方法:

RLock lock = redisson.getLock("anyLock"); // 最常见的使用方法 lock.lock(); // 支持过期解锁功能 // 10秒钟以后自动解锁 // 无需调用unlock方法手动解锁 lock.lock(10, TimeUnit.SECONDS); // 尝试加锁,最多等待100秒,上锁以后10秒自动解锁 boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS); ... lock.unlock();

Redisson同时还为分布式锁提供了异步执行的相关方法:

RLock lock = redisson.getLock("anyLock"); lock.lockAsync(); lock.lockAsync(10, TimeUnit.SECONDS); Future<Boolean> res = lock.tryLockAsync(100, 10, TimeUnit.SECONDS);

公平锁

Redisson分布式可重入公平锁也是实现了java.util.concurrent.locks.Lock接口的一种RLock对象。在提供了自动过期解锁功能的同时,保证了当多个Redisson客户端线程同时请求加锁时,优先分配给先发出请求的线程。

RLock fairLock = redisson.getFairLock("anyLock"); // 最常见的使用方法 fairLock.lock(); // 支持过期解锁功能 // 10秒钟以后自动解锁 // 无需调用unlock方法手动解锁 fairLock.lock(10, TimeUnit.SECONDS); // 尝试加锁,最多等待100秒,上锁以后10秒自动解锁 boolean res = fairLock.tryLock(100, 10, TimeUnit.SECONDS); ... fairLock.unlock();

其他锁

Redisson还提供了其他机制的锁,如联锁(MultiLock)、红锁(RedLock)等。详细可参考:分布式锁和同步器

使用Redisson实现分布式锁

  1. 定义回调接口
/** * 分布式锁回调接口 */ public interface DistributedLockCallback<T> { /** * 调用者必须在此方法中实现需要加分布式锁的业务逻辑 * * @return */ public T process(); /** * 得到分布式锁名称 * * @return */ public String getLockName(); }
  1. 定义分布式锁模板
/** * 分布式锁操作模板 */ public interface DistributedLockTemplate { long DEFAULT_WAIT_TIME = 30; long DEFAULT_TIMEOUT = 5; TimeUnit DEFAULT_TIME_UNIT = TimeUnit.SECONDS; /** * 使用分布式锁,使用锁默认超时时间。 * @param callback * @param fairLock 是否使用公平锁 * @return */ <T> T lock(DistributedLockCallback<T> callback, boolean fairLock); /** * 使用分布式锁。自定义锁的超时时间 * * @param callback * @param leaseTime 锁超时时间。超时后自动释放锁。 * @param timeUnit * @param fairLock 是否使用公平锁 * @param <T> * @return */ <T> T lock(DistributedLockCallback<T> callback, long leaseTime, TimeUnit timeUnit, boolean fairLock); /** * 尝试分布式锁,使用锁默认等待时间、超时时间。 * @param callback * @param fairLock 是否使用公平锁 * @param <T> * @return */ <T> T tryLock(DistributedLockCallback<T> callback, boolean fairLock); /** * 尝试分布式锁,自定义等待时间、超时时间。 * @param callback * @param waitTime 获取锁最长等待时间 * @param leaseTime 锁超时时间。超时后自动释放锁。 * @param timeUnit * @param fairLock 是否使用公平锁 * @param <T> * @return */ <T> T tryLock(DistributedLockCallback<T> callback, long waitTime, long leaseTime, TimeUnit timeUnit, boolean fairLock); }
  1. 实现分布式锁模板
public class SingleDistributedLockTemplate implements DistributedLockTemplate { private RedissonClient redisson; public SingleDistributedLockTemplate() { } public SingleDistributedLockTemplate(RedissonClient redisson) { this.redisson = redisson; } @Override public <T> T lock(DistributedLockCallback<T> callback, boolean fairLock) { return lock(callback, DEFAULT_TIMEOUT, DEFAULT_TIME_UNIT, fairLock); } @Override public <T> T lock(DistributedLockCallback<T> callback, long leaseTime, TimeUnit timeUnit, boolean fairLock) { RLock lock = getLock(callback.getLockName(), fairLock); try { lock.lock(leaseTime, timeUnit); return callback.process(); } finally { if (lock != null && lock.isLocked()) { lock.unlock(); } } } @Override public <T> T tryLock(DistributedLockCallback<T> callback, boolean fairLock) { return tryLock(callback, DEFAULT_WAIT_TIME, DEFAULT_TIMEOUT, DEFAULT_TIME_UNIT, fairLock); } @Override public <T> T tryLock(DistributedLockCallback<T> callback, long waitTime, long leaseTime, TimeUnit timeUnit, boolean fairLock) { RLock lock = getLock(callback.getLockName(), fairLock); try { if (lock.tryLock(waitTime, leaseTime, timeUnit)) { return callback.process(); } } catch (InterruptedException e) { } finally { if (lock != null && lock.isLocked()) { lock.unlock(); } } return null; } private RLock getLock(String lockName, boolean fairLock) { RLock lock; if (fairLock) { lock = redisson.getFairLock(lockName); } else { lock = redisson.getLock(lockName); } return lock; } public void setRedisson(RedissonClient redisson) { this.redisson = redisson; } }
  1. 使用SingleDistributedLockTemplate
DistributedLockTemplate lockTemplate = ...;
final String lockName = ...; lockTemplate.lock(new DistributedLockCallback<Object>() { @Override public Object process() { //do some business return null; } @Override public String getLockName() { return lockName; } }, false);

但是每次使用分布式锁都要写类似上面的重复代码,有没有什么方法可以只关注核心业务逻辑代码的编写,即上面的"do some business"。下面介绍如何使用Spring AOP来实现这一目标。

使用Spring AOP简化分布式锁

  1. 定义注解@DistributedLock
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface DistributedLock { /** * 锁的名称。 * 如果lockName可以确定,直接设置该属性。 */ String lockName() default ""; /** * lockName后缀 */ String lockNamePre() default ""; /** * lockName后缀 */ String lockNamePost() default "lock"; /** * 获得锁名时拼接前后缀用到的分隔符 * @return */ String separator() default "."; /** * <pre> * 获取注解的方法参数列表的某个参数对象的某个属性值来作为lockName。因为有时候lockName是不固定的。 * 当param不为空时,可以通过argNum参数来设置具体是参数列表的第几个参数,不设置则默认取第一个。 * </pre> */ String param() default ""; /** * 将方法第argNum个参数作为锁 */ int argNum() default 0; /** * 是否使用公平锁。 * 公平锁即先来先得。 */ boolean fairLock() default false; /** * 是否使用尝试锁。 */ boolean tryLock() default false; /** * 最长等待时间。 * 该字段只有当tryLock()返回true才有效。 */ long waitTime() default 30L; /** * 锁超时时间。 * 超时时间过后,锁自动释放。 * 建议: * 尽量缩简需要加锁的逻辑。 */ long leaseTime() default 5L; /** * 时间单位。默认为秒。 */ TimeUnit timeUnit() default TimeUnit.SECONDS; }
  1. 定义切面织入的代码
@Aspect @Component public class DistributedLockAspect { @Autowired private DistributedLockTemplate lockTemplate; @Pointcut("@annotation(cn.sprinkle.study.distributedlock.common.annotation.DistributedLock)") public void DistributedLockAspect() {} @Around(value = "DistributedLockAspect()") public Object doAround(ProceedingJoinPoint pjp) throws Throwable { //切点所在的类 Class targetClass = pjp.getTarget().getClass(); //使用了注解的方法 String methodName = pjp.getSignature().getName(); Class[] parameterTypes = ((MethodSignature)pjp.getSignature()).getMethod().getParameterTypes(); Method method = targetClass.getMethod(methodName, parameterTypes); Object[] arguments = pjp.getArgs(); final String lockName = getLockName(method, arguments); return lock(pjp, method, lockName); } @AfterThrowing(value = "DistributedLockAspect()", throwing="ex") public void afterThrowing(Throwable ex) { throw new RuntimeException(ex); } public String getLockName(Method method, Object[] args) { Objects.requireNonNull(method); DistributedLock annotation = method.getAnnotation(DistributedLock.class); String lockName = annotation.lockName(), param = annotation.param(); if (isEmpty(lockName)) { if (args.length > 0) { if (isNotEmpty(param)) { Object arg; if (annotation.argNum() > 0) { arg = args[annotation.argNum() - 1]; } else { arg = args[0]; } lockName = String.valueOf(getParam(arg, param)); } else if (annotation.argNum() > 0) { lockName = args[annotation.argNum() - 1].toString(); } } } if (isNotEmpty(lockName)) { String preLockName = annotation.lockNamePre(), postLockName = annotation.lockNamePost(), separator = annotation.separator(); StringBuilder lName = new StringBuilder(); if (isNotEmpty(preLockName)) { lName.append(preLockName).append(separator); } lName.append(lockName); if (isNotEmpty(postLockName)) { lName.append(separator).append(postLockName); } lockName = lName.toString(); return lockName; } throw new IllegalArgumentException("Can't get or generate lockName accurately!"); } /** * 从方法参数获取数据 * * @param param * @param arg 方法的参数数组 * @return */ public Object getParam(Object arg, String param) { if (isNotEmpty(param) && arg != null) { try { Object result = PropertyUtils.getProperty(arg, param); return result; } catch (NoSuchMethodException e) { throw new IllegalArgumentException(arg + "没有属性" + param + "或未实现get方法。", e); } catch (Exception e) { throw new RuntimeException("", e); } } return null; } public Object lock(ProceedingJoinPoint pjp, Method method, final String lockName) { DistributedLock annotation = method.getAnnotation(DistributedLock.class); boolean fairLock = annotation.fairLock(); boolean tryLock = annotation.tryLock(); if (tryLock) { return tryLock(pjp, annotation, lockName, fairLock); } else { return lock(pjp,lockName, fairLock); } } public Object lock(ProceedingJoinPoint pjp, final String lockName, boolean fairLock) { return lockTemplate.lock(new DistributedLockCallback<Object>() { @Override public Object process() { return proceed(pjp); } @Override public String getLockName() { return lockName; } }, fairLock); } public Object tryLock(ProceedingJoinPoint pjp, DistributedLock annotation, final String lockName, boolean fairLock) { long waitTime = annotation.waitTime(), leaseTime = annotation.leaseTime(); TimeUnit timeUnit = annotation.timeUnit(); return lockTemplate.tryLock(new DistributedLockCallback<Object>() { @Override public Object process() { return proceed(pjp); } @Override public String getLockName() { return lockName; } }, waitTime, leaseTime, timeUnit, fairLock); } public Object proceed(ProceedingJoinPoint pjp) { try { return pjp.proceed(); } catch (Throwable throwable) { throw new RuntimeException(throwable); } } private boolean isEmpty(Object str) { return str == null || "".equals(str); } private boolean isNotEmpty(Object str) { return !isEmpty(str); } }
  1. 使用注解@DistributedLock实现分布式锁

有了上面两段代码,以后需要用到分布式锁,只需在核心业务逻辑方法添加注解@DistributedLock,并设置LockName、fairLock等即可。下面的DistributionService演示了多种使用情景。

@Service public class DistributionService { @Autowired private RedissonClient redissonClient; @DistributedLock(param = "id", lockNamePost = ".lock") public Integer aspect(Person person) { RMap<String, Integer> map = redissonClient.getMap("distributionTest"); Integer count = map.get("count"); if (count > 0) { count = count - 1; map.put("count", count); } return count; } @DistributedLock(argNum = 1, lockNamePost = ".lock") public Integer aspect(String i) { RMap<String, Integer> map = redissonClient.getMap("distributionTest"); Integer count = map.get("count"); if (count > 0) { count = count - 1; map.put("count", count); } return count; } @DistributedLock(lockName = "lock", lockNamePost = ".lock") public int aspect(Action<Integer> action) { return action.action(); } }
  1. 测试

定义一个Worker类:

public class Worker implements Runnable { private final CountDownLatch startSignal; private final CountDownLatch doneSignal; private final DistributionService service; private RedissonClient redissonClient; public Worker(CountDownLatch startSignal, CountDownLatch doneSignal, DistributionService service, RedissonClient redissonClient) { this.startSignal = startSignal; this.doneSignal = doneSignal; this.service = service; this.redissonClient = redissonClient; } @Override public void run() { try { startSignal.await(); System.out.println(Thread.currentThread().getName() + " start"); // Integer count = service.aspect(new Person(1, "张三")); // Integer count = service.aspect("1"); Integer count = service.aspect(() -> { RMap<String, Integer> map = redissonClient.getMap("distributionTest"); Integer count1 = map.get("count"); if (count1 > 0) { count1 = count1 - 1; map.put("count", count1); } return count1; }); System.out.println(Thread.currentThread().getName() + ": count = " + count); doneSignal.countDown(); } catch (InterruptedException ex) { System.out.println(ex); } } }

定义Controller类:

@RestController @RequestMapping("/distributedLockTest") public class DistributedLockTestController { private int count = 10; @Autowired private RedissonClient redissonClient; @Autowired private DistributionService service; @RequestMapping(method = RequestMethod.GET) public String distributedLockTest() throws Exception { RMap<String, Integer> map = redissonClient.getMap("distributionTest"); map.put("count", 8); CountDownLatch startSignal = new CountDownLatch(1); CountDownLatch doneSignal = new CountDownLatch(count); for (int i = 0; i < count; ++i) { // create and start threads new Thread(new Worker(startSignal, doneSignal, service)).start(); } startSignal.countDown(); // let all threads proceed doneSignal.await(); System.out.println("All processors done. Shutdown connection"); return "finish"; } }

Redisson基本配置:

singleServerConfig:  idleConnectionTimeout: 10000  pingTimeout: 1000  connectTimeout: 10000  timeout: 3000  retryAttempts: 3  retryInterval: 1500  reconnectionTimeout: 3000  failedAttempts: 3  password:  subscriptionsPerConnection: 5  clientName: null  address: "redis://127.0.0.1:6379"  subscriptionConnectionMinimumIdleSize: 1  subscriptionConnectionPoolSize: 50  connectionMinimumIdleSize: 10  connectionPoolSize: 64  database: 0  dnsMonitoring: false  dnsMonitoringInterval: 5000 threads: 0 nettyThreads: 0 codec: !<org.redisson.codec.JsonJacksonCodec> {} useLinuxNativeEpoll: false

工程中需要注入的对象:

@Value("classpath:/redisson-conf.yml") Resource configFile; @Bean(destroyMethod = "shutdown") RedissonClient redisson() throws IOException { Config config = Config.fromYAML(configFile.getInputStream()); return Redisson.create(config); } @Bean DistributedLockTemplate distributedLockTemplate(RedissonClient redissonClient) { return new SingleDistributedLockTemplate(redissonClient); }

需要引入的依赖:

<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-aop</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.redisson</groupId> <artifactId>redisson</artifactId> <version>3.5.3</version> </dependency> <dependency> <groupId>commons-beanutils</groupId> <artifactId>commons-beanutils</artifactId> <version>1.8.3</version> </dependency>

最后启动工程,然后访问localhost:8080/distributedLockTest,可以看到如下结果:
分布式锁测试结果

观察结果,可以看出,10个线程中只有8个线程能执行count减1操作,而且多个线程是依次执行的。也就是说分布式锁起作用了。

使用lambda

该注解还可以配合lambda使用。在介绍之前,先科普一下使用spring注解时需要注意的地方,有两点。

第一,在使用spring提供的方法注解时,比较常用的是@Transactional注解。若是Service层不带注解的方法A调用同一个Service类带@Transactional注解的方法B,那么方法B的事务注解将不起作用。比如:

...
    public void methodA() { methodB(); } @Transactional public void methodB() { // 操作表A // 操作表B } ...

上面的代码中,假设有一次调用方法A,方法A又调用方法B,但是此次调用在操作表B时出错了。我们的意愿是这样的:之前对表A的操作回滚。但实际上却不会回滚,因为此时的@Transactional注解并不会生效。原因是调用方法B是同一个Service的方法A,而若是在其他类中调用方法B注解才生效。这也就不难解释为什么注解加在private方法上是不起作用的了。因为private方法只能在同一个方法中调用。

上面所说的调用同一个类的带注解的方法,该注解将不生效,感兴趣的可以自己找找原因,这里就不细说了。

第二,注解(包括spring提供的、自定义的)加在普通类的方法上,spring是扫描不到的。普通类指类签名上没有诸如@Service等Spring提供的注解(因为此分布式锁集成使用的是spring aop,所以介绍的都是与spring相关的)。比如,如果把上面贴出的DistributionService中的各个方法放到Worker中,那么这些注解将不起作用,因为Worker类签名并没有加任何注解,所以spring在扫描的时候直接跳过该类,因此定义在Worker中的带@DistributedLock注解的方法(如果有的话)也就无法被扫描到。

在上面贴出的代码中,Worker中需要使用分布式锁的业务逻辑比较简单,所以都写到DistributionService中,但在实际开发中,我们通常有把业务逻辑直接写在Worker中的需求,毕竟是与Worker相关的,放到哪一个Service都感觉很别扭。所以,我们可以定义一个分布式锁管理器,如DistributedLockManager,然后在初始化Worker时引入即可。接下来改造Worker和定义DistributedLockManager

Worker1:

public class Worker1 implements Runnable { private final CountDownLatch startSignal; private final CountDownLatch doneSignal; private final DistributedLockManager distributedLockManager; private RedissonClient redissonClient; public Worker1(CountDownLatch startSignal, CountDownLatch doneSignal, DistributedLockManager distributedLockManager, RedissonClient redissonClient) { this.startSignal = startSignal; this.doneSignal = doneSignal; this.distributedLockManager = distributedLockManager; this.redissonClient = redissonClient; } @Override public void run() { try { System.out.println(Thread.currentThread().getName() + " start"); startSignal.await(); Integer count = aspect("lock"); System.out.println(Thread.currentThread().getName() + ": count = " + count); doneSignal.countDown(); } catch (Exception e) { e.printStackTrace(); } } public int aspect(String lockName) { return distributedLockManager.aspect(lockName, this); } public int aspectBusiness(String lockName) { RMap<String, Integer> map = redissonClient.getMap("distributionTest"); Integer count = map.get("count"); if (count > 0) { count = count - 1; map.put("count", count); } return count; } }

DistributedLockManager:

@Component public class DistributedLockManager { @DistributedLock(argNum = 1, lockNamePost = ".lock") public Integer aspect(String lockName, Worker1 worker1) { return worker1.aspectBusiness(lockName); } }

这样做,虽然可以将业务从Service层抽离出来,放到分布式锁管理器DistributedLockManager统一管理,但每次都需要将Worker一起传过去,同样感觉很别扭。那么有没有更好的办法呢?有,使用lambda。(上面铺垫了那么多,终于进入正题了!o(╥﹏╥)o)

lambda是java 8的新特性之一,若未了解过的建议先去恶补一番。因为java 8支持lambda,所以也新加了很多函数式接口,这里简单列几个:

函数式接口 参数类型 返回类型 描述
Supplier<T> T 提供一个T类型的值
Consumer<T> T void 处理一个T类型的值
BiConsumer<T, U> T, U void 处理T类型和U类型的值
Predicate<T> T boolean 一个 计算Boolean值的函数
Function<T, R> T R 一个参数类型为T的函数
ToIntFunction<T>
ToLongFunction<T>
ToDoubleFunction<T>
T int
long
double
分别计算int、long、double值的函数
IntFunction<R>
LongFunction<R>
DoubleFunction<R>
int
long
double
R 参数分别为int、long、double类型的函数
BiFunction<T, U, R> T, U R 一个参数类型为T和U的函数
UnaryOperator<T> T T 对类型T进行的一元操作
BinaryOperator<T> T, T T 对类型T进行的二元操作

观察Worker1中方法aspect(Person)的逻辑,最后需要返回一个int值,所以我们可以使用Supplier<T>来作为参数的类型,在分布式锁管理器中添加一个方法,如下:

@DistributedLock(lockName = "lock", lockNamePost = ".lock") public int aspect(Supplier<Integer> supplier) { return supplier.get(); }

然后,在Worker1中也定义一个方法:

private int aspect() { RMap<String, Integer> map = redissonClient.getMap("distributionTest"); Integer count1 = map.get("count"); if (count1 > 0) { count1 = count1 - 1; map.put("count", count1); } return count1; }

最后在Worker1的run方法中使用,把Integer count = aspect("lock");替换成如下:

Integer count = distributedLockManager.aspect(() -> { return aspect(); });

其实也可以简写:

Integer count = distributedLockManager.aspect(() -> aspect());

通过这样改造,是不是发现优雅多了。

测试

DistributedLockTestController中,帮下面的代码替换成另一段代码:

for (int i = 0; i < count; ++i) { // create and start threads new Thread(new Worker(startSignal, doneSignal, service, redissonClient)).start(); }

替换成:

for (int i = 0; i < count; ++i) { // create and start threads new Thread(new Worker1(startSignal, doneSignal, distributedLockManager, redissonClient)).start(); }

最后启动工程,访问http://localhost:8080/distributedLockTest,可以看到类似如下的结果:
分布式锁测试结果

另外,因为暂时没有找到合适的参数类型为“无”、返回类型也为“无”的函数式接口(找到一个——Runnable,但如果用了,怕产生歧义,所以就算了),既然如此,我们不妨自己定义一个,如下:

@FunctionalInterface public interface Action { void action(); }

使用也很简单,在DistributedLockManager定义类似如下的方法:

@DistributedLock(lockName = "lock", lockNamePost = ".lock") public void doSomething(Action action) { action.action(); }

然后,在需要的地方这样用:

distributedLockManager.doSomething(() -> { // do something });

至此,使用Redisson实现分布式锁,然后使用Spring AOP简化分布式锁介绍完毕。

若有什么地方有错误的或需要改进的,欢迎留言一起讨论交流。

转载自http://www.cnblogs.com/sprinkle/p/8366414.html


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM