Redisson的使用


前言

  Redisson的功能非常強大,下面摘抄官網上的描述:

  Redisson采用了基於NIO的Netty框架,不僅能作為Redis底層驅動客戶端,具備提供對Redis各種組態形式的連接功能,對Redis命令能以同步發送、異步形式發送、異步流形式發送或管道形式發送的功能,LUA腳本執行處理,以及處理返回結果的功能,還在此基礎上融入了更高級的應用方案,不但將原生的Redis Hash,List,Set,String,Geo,HyperLogLog等數據結構封裝為Java里大家最熟悉的映射(Map),列表(List),集(Set),通用對象桶(Object Bucket),地理空間對象桶(Geospatial Bucket),基數估計算法(HyperLogLog)等結構,在這基礎上還提供了分布式的多值映射(Multimap),本地緩存映射(LocalCachedMap),有序集(SortedSet),計分排序集(ScoredSortedSet),字典排序集(LexSortedSet),列隊(Queue),阻塞隊列(Blocking Queue),有界阻塞列隊(Bounded Blocking Queue),雙端隊列(Deque),阻塞雙端列隊(Blocking Deque),阻塞公平列隊(Blocking Fair Queue),延遲列隊(Delayed Queue),布隆過濾器(Bloom Filter),原子整長形(AtomicLong),原子雙精度浮點數(AtomicDouble),BitSet等Redis原本沒有的分布式數據結構。不僅如此,Redisson還實現了Redis文檔中提到像分布式鎖Lock這樣的更高階應用場景。事實上Redisson並沒有不止步於此,在分布式鎖的基礎上還提供了聯鎖(MultiLock),讀寫鎖(ReadWriteLock),公平鎖(Fair Lock),紅鎖(RedLock),信號量(Semaphore),可過期性信號量(PermitExpirableSemaphore)和閉鎖(CountDownLatch)這些實際當中對多線程高並發應用至關重要的基本部件。正是通過實現基於Redis的高階應用方案,使Redisson成為構建分布式系統的重要工具。
在提供這些工具的過程當中,Redisson廣泛的使用了承載於Redis訂閱發布功能之上的分布式話題(Topic)功能。使得即便是在復雜的分布式環境下,Redisson的各個實例仍然具有能夠保持相互溝通的能力。在以這為前提下,結合了自身獨有的功能完善的分布式工具,Redisson進而提供了像分布式遠程服務(Remote Service),分布式執行服務(Executor Service)和分布式調度任務服務(Scheduler Service)這樣適用於不同場景的分布式服務。使得Redisson成為了一個基於Redis的Java中間件(Middleware)。

Redisson的配置

  redisson提供了文件方式配置和程序方式配置,支持redis單點,主從,哨兵,集群模式,以redis的cluster模式為例,使用基於文件方式配置,首先在resource目錄下定義了redis.yml配置文件:

---
clusterServersConfig:
  idleConnectionTimeout: 10000
  pingTimeout: 1000
  connectTimeout: 5000
  timeout: 3000
  retryAttempts: 3
  retryInterval: 1500
  password: 123456
  subscriptionsPerConnection: 5 #單個連接最大訂閱數量
  slaveSubscriptionConnectionMinimumIdleSize: 1
  slaveSubscriptionConnectionPoolSize: 50
  slaveConnectionMinimumIdleSize: 32
  slaveConnectionPoolSize: 64
  masterConnectionMinimumIdleSize: 32
  masterConnectionPoolSize: 64
  readMode: "SLAVE"
  nodeAddresses:
    - "redis://10.110.27.139:6379"
    - "redis://10.110.27.139:6380"
    - "redis://10.110.27.139:6381"
    - "redis://10.110.27.138:6379"
    - "redis://10.110.27.138:6380"
    - "redis://10.110.27.138:6381"
  scanInterval: 1000
transportMode: NIO

編寫配置類

@Configuration
public class InitializingRedis{
    
    @Bean
    public RedissonClient getRedissonClient() throws IOException {
        ResourceLoader loader = new DefaultResourceLoader();
        Resource resource = loader.getResource("redis.yml");
        Config config = Config.fromYAML(resource.getInputStream());
        config.useClusterServers();
        return Redisson.create(config);
    }

}
依賴注入RedisClient即可以使用了。值得一提的是Redisson集成了Spring Session會話管理,那么需要將@Configuration 換成 @EnableRedissonHttpSession(如果需要管理分布式session的話)
同時定義一個會話初始化器即可:
public class SessionInitializer extends AbstractHttpSessionApplicationInitializer {
    public SessionInitializer() {
        super(InitializingRedis.class);
    }
}

Redisson的使用

RedissonClient是線程安全的,由於其內部是通過Netty通信,所以除了同步執行方式,也支持異步執行。同步我們使用RedissonClient,異步使用RedissonReactiveClient.

分布式對象

  • 通用對象桶

    我們可以使用RBucket來存放任意類型的對象

RedissonClient client = Redisson.create(config);
RBucket<Object> bucket = client.getBucket("city");
bucket.set("nanjing");
Object o = bucket.get();
System.out.println(o.getClass());
System.out.println(o);

代碼輸出

class java.lang.String
nanjing

我們登陸redis查看結果如下:

10.150.27.139:6380> get city
"\xfc\ananjing"
10.150.27.139:6380> type city
string
10.150.27.139:6380> ttl city
(integer) -1

發現get city 多了 \xfc\a,這是因為redisson默認使用的Jackson JSON做的數據序列化,我們可以使用StringCodec作為編碼:

RedissonClient client = Redisson.create(config);
RBucket<Object> bucket = client.getBucket("city", new StringCodec("utf-8"));
bucket.set("nanjing");
  再在服務器上看就是get city > "nanjing" 了。Redisson提供了非常豐富的編碼,比如SerializationCodec(JDK序列化編碼),FstCodec(10倍於JDK序列化性能而且100%兼容的編碼),LongCodec(純整長型數字編碼),ByteArrayCodec(字節數組編碼),AvroJacksonCodec(二進制的JSON編碼)。
//java對象
RBucket<Object> bucket = client.getBucket("city");
City city = new City(); //對象必須實現序列化接口
city.name = "hangzhou";
city.province = "zhejiang";
bucket.set(city);
City c1 = (City)bucket.get();
System.out.println(c1.province);

查看服務器上的數據類型

10.150.27.139:6380> get city
"\x00\x01\x04City\xfc\bhangzhou\xfc\bzhejiang\x00"
10.150.27.139:6380> type city
string

發現使用通用對象桶都是以String的方式存入到redis中的。
Redisson還提供了地理位置桶RGeo和位向量RBitSet用於位置空間的計算。

  • 原子長整型與雙精度浮點
    我們有時候需要一個全局的計數器,那么就可以使用原子長整型。
RedissonClient client = Redisson.create(config);
RAtomicLong count = client.getAtomicLong("count");
long l = count.incrementAndGet();
System.out.println(l);

RAtomicLong的用法和juc下的AtomicLong是一樣的。在jdk8中,增加了LongAdder,該類在高並發的環境下性能更優於RAtomicLong,Redisson同樣也有該類的實現RLongAdder count = client.getLongAdder("count");

在java中並沒有提供AtomicDouble,Redisson為我們提供了:
RAtomicDouble d = client.getAtomicDouble("double");
我們就可以使用該類存儲或計算浮點數據。

  • 話題(訂閱分發)
    發布內容代碼:
RedissonClient client = Redisson.create(config);
RTopic topic = client.getTopic("anyTopic");
DemoMessage message = new DemoMessage();
message.setTitle("震驚,一女子深夜竟然做出這種事情!");
message.setArticle("阿巴阿巴阿巴");
topic.publish(message);

訂閱的代碼

RedissonClient client = Redisson.create(config);
RTopic topic = client.getTopic("anyTopic");
topic.addListenerAsync(DemoMessage.class, new MessageListener<DemoMessage>() {
        @Override
        public void onMessage(CharSequence channel, DemoMessage msg) {
            System.out.println(msg.getTitle());
        }
    });

除卻上面的對象,Redisson還提供了布隆過濾器,基數估計算法及限流器,有興趣的可以深入了解。

分布式集合

  • 映射(Map)

  Redisson使用map來存取redis中hash的數據結構:

RedissonClient client = Redisson.create(config);
RMap<Object, Object> cities = client.getMap("cities");
City c1 = new City("南京", "江蘇");
City c2 = new City("杭州", "浙江");
cities.put(1,c1);
cities.put(2,c2);
City c = (City)cities.get(2);
System.out.println(c.name +"-"+ c.province);

登錄服務器查看:

10.150.27.139:6381> type cities
hash
10.150.27.139:6381> hgetall cities
1) "\xf7\x01"
2) "\x00\x01\x04City\xfc\x02\xffWS\xff\xacN\xfc\x02\xff_l\xff\xcf\x82\x00"
3) "\xf7\x02"
4) "\x00\x01\x04City\xfc\x02\xffmg\xff\xde]\xfc\x02\xffYm\xff_l\x00"

對於高度頻繁讀寫的緩存,Redisson提供了本地緩存的機制,以減少網絡通信帶來的時間等待。

RLocalCachedMap<Object, Object> cities = client.getLocalCachedMap("cities", LocalCachedMapOptions.defaults());
City c1 = new City("武漢", "湖北");
cities.put(1,c1);
City c = (City)cities.get(1);
System.out.println(c.name+"-"+c.province);

redisson pro中支持數據分片,類似分庫的原理,可以將一個map中的數據分散映射到多個節點中,這樣大大的提高了redis單一hash的容量。

  • Redisson中的元素淘汰機制
    元素淘汰功能(Eviction)
    我們使用Redis作為緩存時,就需要考慮緩存的淘汰機制。可以通過client.getKey() 來設定key的存活時間,另外可以使用RMapCache控制每一條數據的過期時間。
RedissonClient client = Redisson.create(config);
RMapCache<Object, Object> cities = client.getMapCache("cities", new StringCodec("utf-8"));
cities.put(1,new City("成都","四川"),60,TimeUnit.SECONDS);
cities.put(2,new City("深圳","廣東"),30, TimeUnit.SECONDS);
while (true){
    }

每隔30s登錄服務器查看數據如下:

10.150.27.139:6381> hgetall cities
1) "1"
2) "\x00\x00\x00\x00\x00\x00\x00\x00\r\x00\x00\x00\x00\x00\x00\x00City@669d2b1b"
3) "2"
4) "\x00\x00\x00\x00\x00\x00\x00\x00\r\x00\x00\x00\x00\x00\x00\x00City@50b1f030"
10.150.27.139:6381> hgetall cities
1) "1"
2) "\x00\x00\x00\x00\x00\x00\x00\x00\r\x00\x00\x00\x00\x00\x00\x00City@669d2b1b"
10.150.27.139:6381> hgetall cities
(empty list or set)

redis並沒有實現對hash元素過期時間的設置。Redisson通過在初始化RedissonMapCache時,設置了一個EvictionScheduler,這個類通過netty的EventloopGroup線程池周期地向以redisson_map_cache_expired前綴名的頻道發布消息。RedissonMapCache會訂閱這個頻道來處理消息。它一次可移除 100 條過期項。
任務的調度時間會根據上次任務中刪除的過期項數量自動調整,時間在 1 秒到 2 個小時內。因此若清理任務每次刪除了100項數據,它將每秒鍾執行一次(最小的執行延遲)。但如果當前過期項數量比前一次少,則執行延遲將擴大為 1.5 倍。
本地緩存功能(Local Cache)
在上面的代碼已經介紹了本地緩存機制,其中有一個參數LocalCachedMapOptions,這個參數可以自定義緩存的淘汰機制。EvictionPolicy可以選擇使用LRU,LFU或者通過GC過程清除元素,SyncStrategy實現了本地緩存的同步機制。

  • 列表與隊列
RedissonClient client = Redisson.create(config);
RList<String> list = client.getList("list",new StringCodec("utf-8"));
list.add("北京");
list.add("濟南");
RedissonClient client = Redisson.create(config);
RQueue<String> qq = client.getQueue("qq");
qq.add("12");
qq.offer("34");

上面代碼都對應了redis的list數據結構

  • 計分排序集(ScoredSortedSet)

  基於Redis的Redisson的分布式RScoredSortedSet Java對象是一個可以按插入時指定的元素評分排序的集合。它同時還保證了元素的唯一性。

RScoredSortedSet<SomeObject> set = redisson.getScoredSortedSet("simple");
set.add(0.13,newSomeObject(a, b));
set.addAsync(0.251,newSomeObject(c, d));
set.add(0.302,newSomeObject(g, d));
set.pollFirst();
set.pollLast();
int index = set.rank(newSomeObject(g, d));// 獲取元素在集合中的位置
Double score = set.getScore(newSomeObject(g, d));// 獲取元素的評分

分布式鎖與同步器

  Redisson的強大之處在於完美的實現了分布式鎖和同步器,不需要我們再考慮怎么設計分布式鎖的可重入?怎么保證分布式鎖的公平性?如何實現一個分布式讀寫鎖?怎么實現分布式的信號量和閉鎖?這些在Redisson中都已經幫我們實現好了。先看一下最常用的lock的使用:
@RestController
public class TestController {
    @Autowired
    private RedissonClient client;

    @RequestMapping("/test")
    public String test(){
        RLock anyLock = client.getLock("anyLock");
        anyLock.lock();
        return "success";
    }

}

上面的demo獲取到一個lock不去釋放。我們打開一個瀏覽器請求這個controller返回success后,再打開一個窗口重新請求,發現一直等待無法返回結果。查看redis:

10.150.27.139:6380> hgetall anyLock
1) "c5745dc6-3105-4d60-9d5d-e39258714c31:38"
2) "1"

  刪除了這個key后就可以成功執行了。在設計分布式鎖我們一般都要考慮鎖的釋放。因為如果獲取到鎖而線程出現異常或者系統故障,會導致這個鎖無法釋放。自己實現redis的鎖的話會給這個key一個過期時間以避免死鎖的發生。Redisson默認的鎖的過期時間為30s。如果這個期間任務並沒有執行完,而鎖已經過期了不就出問題了嗎?Redisson這里有一個watch dog,看一下lock()方法的代碼:

private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
        long threadId = Thread.currentThread().getId();
        Long ttl = tryAcquire(leaseTime, unit, threadId);
        // lock acquired
        if (ttl == null) {
            return;
        }

        RFuture<RedissonLockEntry> future = subscribe(threadId);
        if (interruptibly) {
            commandExecutor.syncSubscriptionInterrupted(future);
        } else {
            commandExecutor.syncSubscription(future);
        }

        try {
            while (true) {
                ttl = tryAcquire(leaseTime, unit, threadId);
                // lock acquired
                if (ttl == null) {
                    break;
                }

                // waiting for message
                if (ttl >= 0) {
                    try {
                        future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                    } catch (InterruptedException e) {
                        if (interruptibly) {
                            throw e;
                        }
                        future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                    }
                } else {
                    if (interruptibly) {
                        future.getNow().getLatch().acquire();
                    } else {
                        future.getNow().getLatch().acquireUninterruptibly();
                    }
                }
            }
        } finally {
            unsubscribe(future, threadId);
        }
//        get(lockAsync(leaseTime, unit));
    }

看一下tryAcquireAsync方法

private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, long threadId) {
        if (leaseTime != -1) {
            return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
        }
        RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
        ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
            if (e != null) {
                return;
            }

            // lock acquired
            if (ttlRemaining == null) {
                scheduleExpirationRenewal(threadId);
            }
        });
        return ttlRemainingFuture;
    }

如果lock指定過期時間,那么直接執行tryLockInnerAsync,tryLockInnerAsync方法是一段lua腳本,如下:

eval "if (redis.call('exists', KEYS[1]) == 0) then redis.call('hset', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; return redis.call('pttl', KEYS[1]);" 1 anyLock 30000 4a23dfaa-9d98-4f4c-9c6a-8966b28e1a95:31
  先判斷anyLock 這個key是否存在,不存在則執行hset anyLock 4a23dfaa-9d98-4f4c-9c6a-8966b28e1a95:31 1結束。否則判斷anyLock這個hash中4a23dfaa-9d98-4f4c-9c6a-8966b28e1a95:31元素是否存在,如果存在則說明是重入鎖,累加重入次數,重置key的失效時間為30s,結束。否則說明anyLock已經被其他線程獲取,這里直接返回anyLock的失效時間。該方法是一個基於Future的異步方法。這里類似於JS通過Promise來實現異步操作的模式。在onComplete中執行了一個BiConsumer,這個函數會啟動失效檢查:
private void scheduleExpirationRenewal(long threadId) {
        ExpirationEntry entry = new ExpirationEntry();
        ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
        if (oldEntry != null) {
            oldEntry.addThreadId(threadId);
        } else {
            entry.addThreadId(threadId);
            renewExpiration();
        }
    }

上面代碼會將該線程放入到一個concurrentmap中,並執行renewExpiration方法。

private void renewExpiration() {
        ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
        if (ee == null) {
            return;
        }
        
        Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
            @Override
            public void run(Timeout timeout) throws Exception {
                ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
                if (ent == null) {
                    return;
                }
                Long threadId = ent.getFirstThreadId();
                if (threadId == null) {
                    return;
                }
                
                RFuture<Boolean> future = renewExpirationAsync(threadId);
                future.onComplete((res, e) -> {
                    if (e != null) {
                        log.error("Can't update lock " + getName() + " expiration", e);
                        return;
                    }
                    
                    if (res) {
                        // reschedule itself
                        renewExpiration();
                    }
                });
            }
        }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
        
        ee.setTimeout(task);
    }

  上面的方法會生成一個timertask來檢查concurrentmap中的key是否存在,如果存在說明該線程還沒有釋放掉鎖,則會更新鎖的過期時間,該方法以一種異步遞歸的方式循環執行。
返回到lock方法,如果返回的ttl>0,則會進入while循環中一直嘗試獲取,達到了阻塞的目的。

Redisson還有許多的功能,比如分布式任務調度,Redisson事務,spring cache整合等。

 
 
 參考文章:
https://www.jianshu.com/p/e9b26c743cae
https://www.bookstack.cn/read/redisson-wiki-zh/spilt.5.7.-%E5%88%86%E5%B8%83%E5%BC%8F%E9%9B%86%E5%90%88.md
https://www.bookstack.cn/read/redisson-wiki-zh/spilt.1.7.-%E5%88%86%E5%B8%83%E5%BC%8F%E9%9B%86%E5%90%88.md


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM