前言
Redisson的功能非常強大,下面摘抄官網上的描述:
在提供這些工具的過程當中,Redisson廣泛的使用了承載於Redis訂閱發布功能之上的分布式話題(Topic)功能。使得即便是在復雜的分布式環境下,Redisson的各個實例仍然具有能夠保持相互溝通的能力。在以這為前提下,結合了自身獨有的功能完善的分布式工具,Redisson進而提供了像分布式遠程服務(Remote Service),分布式執行服務(Executor Service)和分布式調度任務服務(Scheduler Service)這樣適用於不同場景的分布式服務。使得Redisson成為了一個基於Redis的Java中間件(Middleware)。
Redisson的配置
redisson提供了文件方式配置和程序方式配置,支持redis單點,主從,哨兵,集群模式,以redis的cluster模式為例,使用基於文件方式配置,首先在resource目錄下定義了redis.yml配置文件:
--- clusterServersConfig: idleConnectionTimeout: 10000 pingTimeout: 1000 connectTimeout: 5000 timeout: 3000 retryAttempts: 3 retryInterval: 1500 password: 123456 subscriptionsPerConnection: 5 #單個連接最大訂閱數量 slaveSubscriptionConnectionMinimumIdleSize: 1 slaveSubscriptionConnectionPoolSize: 50 slaveConnectionMinimumIdleSize: 32 slaveConnectionPoolSize: 64 masterConnectionMinimumIdleSize: 32 masterConnectionPoolSize: 64 readMode: "SLAVE" nodeAddresses: - "redis://10.110.27.139:6379" - "redis://10.110.27.139:6380" - "redis://10.110.27.139:6381" - "redis://10.110.27.138:6379" - "redis://10.110.27.138:6380" - "redis://10.110.27.138:6381" scanInterval: 1000 transportMode: NIO
編寫配置類
@Configuration public class InitializingRedis{ @Bean public RedissonClient getRedissonClient() throws IOException { ResourceLoader loader = new DefaultResourceLoader(); Resource resource = loader.getResource("redis.yml"); Config config = Config.fromYAML(resource.getInputStream()); config.useClusterServers(); return Redisson.create(config); } }
同時定義一個會話初始化器即可:
public class SessionInitializer extends AbstractHttpSessionApplicationInitializer { public SessionInitializer() { super(InitializingRedis.class); } }
Redisson的使用
RedissonClient是線程安全的,由於其內部是通過Netty通信,所以除了同步執行方式,也支持異步執行。同步我們使用RedissonClient,異步使用RedissonReactiveClient.
分布式對象
-
通用對象桶
我們可以使用RBucket來存放任意類型的對象
RedissonClient client = Redisson.create(config); RBucket<Object> bucket = client.getBucket("city"); bucket.set("nanjing"); Object o = bucket.get(); System.out.println(o.getClass()); System.out.println(o);
代碼輸出
class java.lang.String nanjing
我們登陸redis查看結果如下:
10.150.27.139:6380> get city "\xfc\ananjing" 10.150.27.139:6380> type city string 10.150.27.139:6380> ttl city (integer) -1
發現get city 多了 \xfc\a,這是因為redisson默認使用的Jackson JSON做的數據序列化,我們可以使用StringCodec作為編碼:
RedissonClient client = Redisson.create(config); RBucket<Object> bucket = client.getBucket("city", new StringCodec("utf-8")); bucket.set("nanjing");
//java對象 RBucket<Object> bucket = client.getBucket("city"); City city = new City(); //對象必須實現序列化接口 city.name = "hangzhou"; city.province = "zhejiang"; bucket.set(city); City c1 = (City)bucket.get(); System.out.println(c1.province);
查看服務器上的數據類型
10.150.27.139:6380> get city "\x00\x01\x04City\xfc\bhangzhou\xfc\bzhejiang\x00" 10.150.27.139:6380> type city string
發現使用通用對象桶都是以String的方式存入到redis中的。
Redisson還提供了地理位置桶RGeo和位向量RBitSet用於位置空間的計算。
- 原子長整型與雙精度浮點
我們有時候需要一個全局的計數器,那么就可以使用原子長整型。
RedissonClient client = Redisson.create(config); RAtomicLong count = client.getAtomicLong("count"); long l = count.incrementAndGet(); System.out.println(l);
RAtomicLong的用法和juc下的AtomicLong是一樣的。在jdk8中,增加了LongAdder,該類在高並發的環境下性能更優於RAtomicLong,Redisson同樣也有該類的實現RLongAdder count = client.getLongAdder("count");
在java中並沒有提供AtomicDouble,Redisson為我們提供了:
RAtomicDouble d = client.getAtomicDouble("double");
我們就可以使用該類存儲或計算浮點數據。
- 話題(訂閱分發)
發布內容代碼:
RedissonClient client = Redisson.create(config); RTopic topic = client.getTopic("anyTopic"); DemoMessage message = new DemoMessage(); message.setTitle("震驚,一女子深夜竟然做出這種事情!"); message.setArticle("阿巴阿巴阿巴"); topic.publish(message);
訂閱的代碼
RedissonClient client = Redisson.create(config); RTopic topic = client.getTopic("anyTopic"); topic.addListenerAsync(DemoMessage.class, new MessageListener<DemoMessage>() { @Override public void onMessage(CharSequence channel, DemoMessage msg) { System.out.println(msg.getTitle()); } });
除卻上面的對象,Redisson還提供了布隆過濾器,基數估計算法及限流器,有興趣的可以深入了解。
分布式集合
- 映射(Map)
Redisson使用map來存取redis中hash的數據結構:
RedissonClient client = Redisson.create(config); RMap<Object, Object> cities = client.getMap("cities"); City c1 = new City("南京", "江蘇"); City c2 = new City("杭州", "浙江"); cities.put(1,c1); cities.put(2,c2); City c = (City)cities.get(2); System.out.println(c.name +"-"+ c.province);
登錄服務器查看:
10.150.27.139:6381> type cities hash 10.150.27.139:6381> hgetall cities 1) "\xf7\x01" 2) "\x00\x01\x04City\xfc\x02\xffWS\xff\xacN\xfc\x02\xff_l\xff\xcf\x82\x00" 3) "\xf7\x02" 4) "\x00\x01\x04City\xfc\x02\xffmg\xff\xde]\xfc\x02\xffYm\xff_l\x00"
對於高度頻繁讀寫的緩存,Redisson提供了本地緩存的機制,以減少網絡通信帶來的時間等待。
RLocalCachedMap<Object, Object> cities = client.getLocalCachedMap("cities", LocalCachedMapOptions.defaults()); City c1 = new City("武漢", "湖北"); cities.put(1,c1); City c = (City)cities.get(1); System.out.println(c.name+"-"+c.province);
redisson pro中支持數據分片,類似分庫的原理,可以將一個map中的數據分散映射到多個節點中,這樣大大的提高了redis單一hash的容量。
- Redisson中的元素淘汰機制
元素淘汰功能(Eviction)
我們使用Redis作為緩存時,就需要考慮緩存的淘汰機制。可以通過client.getKey() 來設定key的存活時間,另外可以使用RMapCache控制每一條數據的過期時間。
RedissonClient client = Redisson.create(config); RMapCache<Object, Object> cities = client.getMapCache("cities", new StringCodec("utf-8")); cities.put(1,new City("成都","四川"),60,TimeUnit.SECONDS); cities.put(2,new City("深圳","廣東"),30, TimeUnit.SECONDS); while (true){ }
每隔30s登錄服務器查看數據如下:
10.150.27.139:6381> hgetall cities 1) "1" 2) "\x00\x00\x00\x00\x00\x00\x00\x00\r\x00\x00\x00\x00\x00\x00\x00City@669d2b1b" 3) "2" 4) "\x00\x00\x00\x00\x00\x00\x00\x00\r\x00\x00\x00\x00\x00\x00\x00City@50b1f030" 10.150.27.139:6381> hgetall cities 1) "1" 2) "\x00\x00\x00\x00\x00\x00\x00\x00\r\x00\x00\x00\x00\x00\x00\x00City@669d2b1b" 10.150.27.139:6381> hgetall cities (empty list or set)
redis並沒有實現對hash元素過期時間的設置。Redisson通過在初始化RedissonMapCache時,設置了一個EvictionScheduler,這個類通過netty的EventloopGroup線程池周期地向以redisson_map_cache_expired前綴名的頻道發布消息。RedissonMapCache會訂閱這個頻道來處理消息。它一次可移除 100 條過期項。
任務的調度時間會根據上次任務中刪除的過期項數量自動調整,時間在 1 秒到 2 個小時內。因此若清理任務每次刪除了100項數據,它將每秒鍾執行一次(最小的執行延遲)。但如果當前過期項數量比前一次少,則執行延遲將擴大為 1.5 倍。
本地緩存功能(Local Cache)
在上面的代碼已經介紹了本地緩存機制,其中有一個參數LocalCachedMapOptions,這個參數可以自定義緩存的淘汰機制。EvictionPolicy可以選擇使用LRU,LFU或者通過GC過程清除元素,SyncStrategy實現了本地緩存的同步機制。
- 列表與隊列
RedissonClient client = Redisson.create(config); RList<String> list = client.getList("list",new StringCodec("utf-8")); list.add("北京"); list.add("濟南");
RedissonClient client = Redisson.create(config); RQueue<String> qq = client.getQueue("qq"); qq.add("12"); qq.offer("34");
上面代碼都對應了redis的list數據結構
- 計分排序集(ScoredSortedSet)
基於Redis的Redisson的分布式RScoredSortedSet
Java對象是一個可以按插入時指定的元素評分排序的集合。它同時還保證了元素的唯一性。
RScoredSortedSet<SomeObject> set = redisson.getScoredSortedSet("simple"); set.add(0.13,newSomeObject(a, b)); set.addAsync(0.251,newSomeObject(c, d)); set.add(0.302,newSomeObject(g, d)); set.pollFirst(); set.pollLast(); int index = set.rank(newSomeObject(g, d));// 獲取元素在集合中的位置 Double score = set.getScore(newSomeObject(g, d));// 獲取元素的評分
分布式鎖與同步器
@RestController public class TestController { @Autowired private RedissonClient client; @RequestMapping("/test") public String test(){ RLock anyLock = client.getLock("anyLock"); anyLock.lock(); return "success"; } }
上面的demo獲取到一個lock不去釋放。我們打開一個瀏覽器請求這個controller返回success后,再打開一個窗口重新請求,發現一直等待無法返回結果。查看redis:
10.150.27.139:6380> hgetall anyLock
1) "c5745dc6-3105-4d60-9d5d-e39258714c31:38"
2) "1"
刪除了這個key后就可以成功執行了。在設計分布式鎖我們一般都要考慮鎖的釋放。因為如果獲取到鎖而線程出現異常或者系統故障,會導致這個鎖無法釋放。自己實現redis的鎖的話會給這個key一個過期時間以避免死鎖的發生。Redisson默認的鎖的過期時間為30s。如果這個期間任務並沒有執行完,而鎖已經過期了不就出問題了嗎?Redisson這里有一個watch dog,看一下lock()方法的代碼:
private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException { long threadId = Thread.currentThread().getId(); Long ttl = tryAcquire(leaseTime, unit, threadId); // lock acquired if (ttl == null) { return; } RFuture<RedissonLockEntry> future = subscribe(threadId); if (interruptibly) { commandExecutor.syncSubscriptionInterrupted(future); } else { commandExecutor.syncSubscription(future); } try { while (true) { ttl = tryAcquire(leaseTime, unit, threadId); // lock acquired if (ttl == null) { break; } // waiting for message if (ttl >= 0) { try { future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { if (interruptibly) { throw e; } future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); } } else { if (interruptibly) { future.getNow().getLatch().acquire(); } else { future.getNow().getLatch().acquireUninterruptibly(); } } } } finally { unsubscribe(future, threadId); } // get(lockAsync(leaseTime, unit)); }
看一下tryAcquireAsync方法
private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, long threadId) { if (leaseTime != -1) { return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG); } RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG); ttlRemainingFuture.onComplete((ttlRemaining, e) -> { if (e != null) { return; } // lock acquired if (ttlRemaining == null) { scheduleExpirationRenewal(threadId); } }); return ttlRemainingFuture; }
如果lock指定過期時間,那么直接執行tryLockInnerAsync,tryLockInnerAsync方法是一段lua腳本,如下:
eval "if (redis.call('exists', KEYS[1]) == 0) then redis.call('hset', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; return redis.call('pttl', KEYS[1]);" 1 anyLock 30000 4a23dfaa-9d98-4f4c-9c6a-8966b28e1a95:31
private void scheduleExpirationRenewal(long threadId) { ExpirationEntry entry = new ExpirationEntry(); ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry); if (oldEntry != null) { oldEntry.addThreadId(threadId); } else { entry.addThreadId(threadId); renewExpiration(); } }
上面代碼會將該線程放入到一個concurrentmap中,並執行renewExpiration方法。
private void renewExpiration() { ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName()); if (ee == null) { return; } Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() { @Override public void run(Timeout timeout) throws Exception { ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName()); if (ent == null) { return; } Long threadId = ent.getFirstThreadId(); if (threadId == null) { return; } RFuture<Boolean> future = renewExpirationAsync(threadId); future.onComplete((res, e) -> { if (e != null) { log.error("Can't update lock " + getName() + " expiration", e); return; } if (res) { // reschedule itself renewExpiration(); } }); } }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS); ee.setTimeout(task); }
上面的方法會生成一個timertask來檢查concurrentmap中的key是否存在,如果存在說明該線程還沒有釋放掉鎖,則會更新鎖的過期時間,該方法以一種異步遞歸的方式循環執行。
返回到lock方法,如果返回的ttl>0,則會進入while循環中一直嘗試獲取,達到了阻塞的目的。
Redisson還有許多的功能,比如分布式任務調度,Redisson事務,spring cache整合等。