使用數據庫寫鎖、synchronized、ReentrantLock等都可以實現對於數據的線程安全控制。但這些都屬於排它鎖(或者你也可以認為是悲觀鎖)范疇,會造成一定的阻塞,無法滿足快速響應的要求。
基於【高並發搶購防止超賣】的案例。
我們使用redis的兩種不同方式,實現分布式鎖。
【閱讀前提:您對redis中的watch、事務、setnx有一定的了解】
一、基於watch機制
這種相當於是樂觀鎖的實現方式,樂觀的以為沒人和我搶。樂觀鎖適用於“讀多寫少”的場景。此處僅作為練習使用。方式二才是通常用法。

1 package qianggou; 2 3 import java.util.List; 4 import java.util.UUID; 5 import java.util.concurrent.ExecutorService; 6 import java.util.concurrent.Executors; 7 8 import comm.Value; 9 import redis.clients.jedis.Jedis; 10 import redis.clients.jedis.JedisPool; 11 import redis.clients.jedis.JedisPoolConfig; 12 import redis.clients.jedis.Transaction; 13 14 /** 15 * 測試搶購案例 16 * 17 */ 18 public class RedisTest { 19 20 public static void main(String[] args) { 21 final String watchkeys = "watchkeys"; 22 ExecutorService excutor = Executors.newFixedThreadPool(20);//開啟最多20個線程的線程池,相當於真實場景中的限流 23 JedisPoolConfig jedisPoolConfig = new JedisPoolConfig(); 24 JedisPool jedisPool = new JedisPool(jedisPoolConfig, "aliyun", 6379, 5000, Value.PASSWORD);//Value.PASSWORD是你的redis設置密碼,這里我使用接口常量封裝了 25 26 final Jedis jedis = jedisPool.getResource(); 27 jedis.set(watchkeys, "0"); 28 jedis.del("setsucc","setfail"); 29 jedis.close(); 30 31 32 for(int i=0;i<1000;i++) { 33 excutor.execute(new MyRunnable(jedisPool)); 34 } 35 excutor.shutdown(); 36 } 37 } 38 class MyRunnable implements Runnable{ 39 40 String watchkeys = "watchkeys"; 41 JedisPool jedisPool = null; 42 43 public MyRunnable(JedisPool jedisPool) { 44 this.jedisPool = jedisPool; 45 } 46 @Override 47 public void run() { 48 Jedis jedis = jedisPool.getResource(); 49 jedis.watch(watchkeys);//監聽watchkeys 50 String val = jedis.get(watchkeys); 51 jedis.set(watchkeys, "1"); 52 int valint = Integer.valueOf(val); 53 String userinfo = UUID.randomUUID().toString(); 54 if(valint < 10) { 55 56 Transaction tx = jedis.multi(); 57 tx.incr(watchkeys);//更改watchkeys的值。 58 List<Object> exec = tx.exec();//如果watchkeys被其他線程修改了,則搶購失敗 59 if(exec !=null && exec.size()>0) { 60 System.out.println("用戶【"+userinfo+"】搶購成功,當前搶購成功人數為:"+(valint+1)); 61 jedis.sadd("setsucc", userinfo); 62 jedis.close(); 63 return; 64 } 65 } 66 jedis.sadd("setfail", userinfo); 67 jedis.close(); 68 } 69 }
二、基於setnx
這種相當於是悲觀鎖的實現方式,沒有獲取到鎖則搶購失敗。(其實真實的悲觀鎖是會進行等待阻塞的)

1 package qianggou; 2 3 import java.util.UUID; 4 import java.util.concurrent.ExecutorService; 5 import java.util.concurrent.Executors; 6 import java.util.concurrent.ScheduledExecutorService; 7 import java.util.concurrent.TimeUnit; 8 9 import comm.Value; 10 import redis.clients.jedis.Jedis; 11 import redis.clients.jedis.JedisPool; 12 import redis.clients.jedis.JedisPoolConfig; 13 14 public class RedisTest02 { 15 16 17 public static void main(String[] args) { 18 final String SAIL_KEY = "sailkey"; 19 ExecutorService excutor = Executors.newFixedThreadPool(20); 20 21 JedisPoolConfig jedisPoolConfig = new JedisPoolConfig(); 22 JedisPool jedisPool = new JedisPool(jedisPoolConfig, "aliyun", 6379, 5000, Value.PASSWORD); 23 24 final Jedis jedis = jedisPool.getResource(); 25 jedis.del(SAIL_KEY); 26 jedis.set(SAIL_KEY, "0"); 27 jedis.del("setsucc","setfail","LOCK"); 28 jedis.close(); 29 30 for(int i=0;i<100;i++) { 31 excutor.execute(new MyRunnable2(jedisPool)); 32 } 33 excutor.shutdown(); 34 } 35 36 } 37 class MyRunnable2 implements Runnable{ 38 39 String LOCK = "LOCK"; 40 String SAIL_KEY = "sailkey"; 41 JedisPool jedisPool = null; 42 43 public MyRunnable2(JedisPool jedisPool ) { 44 this.jedisPool = jedisPool; 45 } 46 @Override 47 public void run() { 48 49 String userinfo = UUID.randomUUID().toString(); 50 51 Jedis jedis = jedisPool.getResource(); 52 53 Long lock = jedis.setnx(LOCK, userinfo); 54 if(lock>0 ? false : true) {//這一步在spring中封裝為RedisTemplate了 55 return; 56 } 57 58 ScheduledExecutorService schedul = Executors.newScheduledThreadPool(1); 59 try { 60 jedis.expire(LOCK, 2);//初始化鎖定時間 61 62 //異常點一:設置鎖的初始鎖定時間,如果2秒鍾之內方法未執行完,則通過以下定時器為鎖續命 63 schedul.schedule(new Runnable() { 64 @Override 65 public void run() { 66 jedis.expire(LOCK, 2); 67 } 68 }, 1, TimeUnit.SECONDS);//定時每1秒為鎖續期 69 70 int num = jedis.incr(SAIL_KEY).intValue(); 71 72 if(num<=10) { 73 System.out.println("用戶【"+userinfo+"】搶購成功,當前搶購成功人數為:"+num); 74 } 75 }finally { 76 schedul.shutdownNow();//關閉所有定時子進程 77 //異常點二:如果執行到這一步,突然卡住了Thread.sleep(),LOCK時間也到期了。 78 //別的線程就可以重新生成LOCK這把鎖,防止以下代碼刪除了別的線程的LOCK 79 if(userinfo.equals(jedis.get(LOCK))) {//防止誤刪 80 jedis.del(LOCK); //方式一 81 } 82 jedis.close();//先刪除再關閉 83 } 84 } 85 86 }
三、使用redisson進行簡化
方案二中的加鎖看起來過於繁瑣了,接下來使用redisson對其加鎖續期過程進行簡化。我們使用一下spring中的RedisTemplate(你也可以不使用)。
pom中需要引入引用:

1 <dependency> 2 <groupId>org.springframework.data</groupId> 3 <artifactId>spring-data-redis</artifactId> 4 <version>1.7.7.RELEASE</version> 5 </dependency> 6 <dependency> 7 <groupId>org.redisson</groupId> 8 <artifactId>redisson</artifactId> 9 <version>3.11.3</version> 10 </dependency>
spring配置文件:

1 <?xml version="1.0" encoding="UTF-8"?> 2 <beans xmlns="http://www.springframework.org/schema/beans" 3 xmlns:context="http://www.springframework.org/schema/context" 4 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 5 xsi:schemaLocation="http://www.springframework.org/schema/beans 6 http://www.springframework.org/schema/beans/spring-beans.xsd 7 http://www.springframework.org/schema/context 8 http://www.springframework.org/schema/context/spring-context.xsd"> 9 10 <bean id="redisPoolConfig" 11 class="redis.clients.jedis.JedisPoolConfig"></bean> 12 13 <bean id="JedisConnectionFactory" 14 class="org.springframework.data.redis.connection.jedis.JedisConnectionFactory"> 15 <property name="hostName" value="127.0.0.1"></property> 16 <property name="port" value="6379"></property> 17 <property name="password" value="123456"></property> 18 <property name="poolConfig" ref="redisPoolConfig"></property> 19 </bean> 20 21 <bean id="stringRedisSerializer" 22 class="org.springframework.data.redis.serializer.StringRedisSerializer" /> 23 <bean id="redisTemplate" 24 class="org.springframework.data.redis.core.RedisTemplate"> 25 <property name="connectionFactory" ref="JedisConnectionFactory" /> 26 <property name="keySerializer" ref="stringRedisSerializer" /> 27 <property name="valueSerializer" ref="stringRedisSerializer" /> 28 </bean> 29 30 </beans>
測試代碼:

1 package test; 2 3 import java.util.UUID; 4 import java.util.concurrent.ExecutorService; 5 import java.util.concurrent.Executors; 6 7 import org.redisson.Redisson; 8 import org.redisson.api.RLock; 9 import org.redisson.api.RedissonClient; 10 import org.redisson.config.Config; 11 import org.springframework.context.ApplicationContext; 12 import org.springframework.context.support.ClassPathXmlApplicationContext; 13 import org.springframework.data.redis.core.RedisTemplate; 14 15 public class Miaosha2 { 16 17 public static void main(String[] args) { 18 19 ApplicationContext ac = new ClassPathXmlApplicationContext("applicationContext-redis.xml"); 20 RedisTemplate<String, String> redisTemplate = ac.getBean(RedisTemplate.class); 21 //數據初始化 22 redisTemplate.opsForValue().set("sails", "0");//初始化庫存已售數量 23 redisTemplate.delete("LOCK"); 24 25 System.out.println(redisTemplate); 26 27 ExecutorService excutor = Executors.newFixedThreadPool(20); 28 for(int i=0;i<100;i++) { 29 excutor.execute(new MyRunnable2(redisTemplate)); 30 } 31 excutor.shutdown(); 32 33 } 34 35 } 36 class MyRunnable2 implements Runnable{ 37 RedisTemplate<String, String> redisTemplate; 38 public MyRunnable2(RedisTemplate<String, String> redisTemplate) { 39 this.redisTemplate = redisTemplate; 40 } 41 @Override 42 public void run() { 43 String userInfo = UUID.randomUUID().toString(); 44 Config config = new Config(); 45 config.useSingleServer().setAddress("redis://127.0.0.1:6379"); 46 config.useSingleServer().setPassword("123456"); 47 48 RedissonClient redisson = Redisson.create(config); 49 RLock lock = redisson.getLock("LOCK"); 50 51 try { 52 lock.lock();//獲取鎖,獲取不到則結束 53 54 redisTemplate.opsForValue().set("sails", String.valueOf(num)); 55 Long num = redisTemplate.opsForValue().increment("sails", 1); 56 if(num <= 10) { 57 System.out.println("用戶【" + userInfo + "】搶購成功,當前搶購成功人數為:" + num); 58 } 59 } finally { 60 lock.unlock();//釋放鎖 61 redisson.shutdown();//需要關閉redisson連接,否則會占用redis連接資源 62 } 63 64 } 65 66 }
四、原子性優化
以上方式[包含redisson方式]外存在一個很嚴重的額問題是,設置鎖和設置超時時間的代碼是非原子性操作。
我們想象一個場景如果設置完鎖之后系統異常宕機,但是沒有設置超時時間,這導致的后果就是:鎖永遠無法釋放了。
為了保證這兩步的原子性操作===>,
在redis2.6.12版本之前,可以使用LUA腳本方式:

1 public class Lua { 2 3 //加鎖腳本(索引從1開始) 4 private static final String SCRIPT_TRYLOCK = "if redis.call('setnx',KEYS[1],ARGV[1]) ==1 " 5 + " then redis.call('expire',KEYS[1],ARGV[2]) return 1 else return 0 end"; 6 7 /** 8 * 使用Lua腳本,嘗試獲取分布式鎖 9 */ 10 public static boolean tryLockLua(Jedis jedis, String lockkey, String lockValue, int expireTime) { 11 12 int result = (int) jedis.eval(SCRIPT_TRYLOCK, 1, lockkey,String.valueOf(expireTime));//設置鎖 13 if(result == 1) { 14 return true ; 15 } 16 return false; 17 } 18 }
在redis2.6.12版本及其之后,對set命令進行了增強,同時作者也不建議使用setnx命令了,這個命令也沒存在的必要了,后續版本可能會將其刪除:

1 /** 2 * 命令:SET key value NX PX miliseconds 3 * 如:SET key value NX PX 1000 4 */ 5 public static boolean tryLock(Jedis jedis, String lockkey, String lockValue, int expireTime) { 6 7 String result = jedis.set(lockkey, lockValue,"NX","PX",expireTime); 8 if("OK".equals(result)) { 9 return true ; 10 } 11 return false; 12 }