組件整合之Redisson


Redisson簡介

Redisson是一個在Redis的基礎上實現的Java駐內存數據網格(In-Memory Data Grid)。它不僅提供了一系列的分布式的Java常用對象,還提供了許多分布式服務。其中包括(BitSet, Set, Multimap, SortedSet, Map, List, Queue, BlockingQueue, Deque, BlockingDeque, Semaphore, Lock, AtomicLong, CountDownLatch, Publish / Subscribe, Bloom filter, Remote service, Spring cache, Executor service, Live Object service, Scheduler service) Redisson提供了使用Redis的最簡單和最便捷的方法。Redisson的宗旨是促進使用者對Redis的關注分離(Separation of Concern),從而讓使用者能夠將精力更集中地放在處理業務邏輯上

快速入門

pom配置

<!--redisson-->
<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson</artifactId>
    <version>3.15.0</version>
</dependency>

Redission配置

程序化配置

import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.MapperFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import com.fasterxml.jackson.datatype.jsr310.deser.LocalDateTimeDeserializer;
import com.fasterxml.jackson.datatype.jsr310.ser.LocalDateTimeSerializer;
import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.codec.JsonJacksonCodec;
import org.redisson.config.Config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.text.SimpleDateFormat;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.List;

@Configuration
public class RedissonConfig {

    private static final String DEFAULT_DATE_TIME_FORMAT = "yyyy-MM-dd HH:mm:ss";


    /**
     * 單機模式自動裝配
     * @return
     */
    @Bean
    public RedissonClient redissonSingle(){
        Config config = new Config();
        config.useSingleServer()
                .setAddress("redis://localhost:6379")
                .setConnectionMinimumIdleSize(10);
        config.setCodec(new JsonJacksonCodec(objectMapper()));
        RedissonClient redissonClient = Redisson.create(config);
        return redissonClient;
    }

    /**
     * 主從模式
     * @return
     */
    @Bean
    public RedissonClient masterSlave(){
        Config config = new Config();
        config.useMasterSlaveServers()
                //可以用"rediss://"來啟用SSL連接
                .setMasterAddress("redis://127.0.0.1:6379")
                .addSlaveAddress("redis://127.0.0.1:6389", "redis://127.0.0.1:6332", "redis://127.0.0.1:6419")
                .addSlaveAddress("redis://127.0.0.1:6399");
        return Redisson.create(config);
    }

    /**
     * 哨兵模式自動裝配
     * @return
     */
    @Bean
    public RedissonClient redissonSentinel(){
        Config config = new Config();
        List<String> newNodes = new ArrayList();
        newNodes.add("redis://192.168.1.10:6379");
        newNodes.add("redis://192.168.1.11:16379");
        newNodes.add("redis://192.168.1.12:16379");
        config.useSentinelServers()
                .addSentinelAddress(newNodes.toArray(new String[0]))
                .setMasterName("mymaster")
                .setMasterConnectionPoolSize(250)
                .setMasterConnectionMinimumIdleSize(20)
                .setSlaveConnectionPoolSize(250)
                .setSlaveConnectionMinimumIdleSize(20);
        config.setCodec(new JsonJacksonCodec(objectMapper()));
        return Redisson.create(config);
    }

    /**
     * 集群模式自動裝配
     * @return
     */
    @Bean
    public RedissonClient redissionClusterNodes(){
        Config config = new Config();
        List<String> newNodes = new ArrayList();
        newNodes.add("redis://127.0.0.1:6380");
        newNodes.add("redis://127.0.0.1:6381");
        newNodes.add("redis://127.0.0.1:6382");
        config.useClusterServers()
                .addNodeAddress(newNodes.toArray(new String[0]));
        config.setCodec(new JsonJacksonCodec(objectMapper()));
        return Redisson.create(config);
    }

    public ObjectMapper objectMapper() {
        ObjectMapper mapper = new ObjectMapper();
        //大小寫不敏感
        mapper.configure(MapperFeature.ACCEPT_CASE_INSENSITIVE_PROPERTIES, true);
        mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
        mapper.setDateFormat(new SimpleDateFormat(DEFAULT_DATE_TIME_FORMAT));
        JavaTimeModule javaTimeModule = new JavaTimeModule();
        javaTimeModule.addSerializer(LocalDateTime.class, new LocalDateTimeSerializer(DateTimeFormatter.ofPattern(DEFAULT_DATE_TIME_FORMAT)));
        javaTimeModule.addDeserializer(LocalDateTime.class, new LocalDateTimeDeserializer(DateTimeFormatter.ofPattern(DEFAULT_DATE_TIME_FORMAT)));
        mapper.registerModule(javaTimeModule);
        return mapper;
    }
}

文件方式配置

 Redisson也可以通過用戶提供的JSON或YAML格式的文本文件來配置:

(1)可以通過調用Config.fromJSON方法並指定一個File實例來實現讀取JSON格式的配置

Config config = Config.fromJSON(new File("config-file.json"));
RedissonClient redisson = Redisson.create(config);

config-file.json示例:

{
   "clusterServersConfig":{
      "idleConnectionTimeout":10000,
      "pingTimeout":1000,
      "connectTimeout":10000,
      "timeout":3000,
      "retryAttempts":3,
      "retryInterval":1500,
      "reconnectionTimeout":3000,
      "failedAttempts":3,
      "password":null,
      "subscriptionsPerConnection":5,
      "clientName":null,
      "loadBalancer":{
         "class":"org.redisson.connection.balancer.RoundRobinLoadBalancer"
      },
      "slaveSubscriptionConnectionMinimumIdleSize":1,
      "slaveSubscriptionConnectionPoolSize":50,
      "slaveConnectionMinimumIdleSize":32,
      "slaveConnectionPoolSize":64,
      "masterConnectionMinimumIdleSize":32,
      "masterConnectionPoolSize":64,
      "readMode":"SLAVE",
      "nodeAddresses":[
         "redis://127.0.0.1:7004",
         "redis://127.0.0.1:7001",
         "redis://127.0.0.1:7000"
      ],
      "scanInterval":1000
   },
   "threads":0,
   "nettyThreads": 0,
   "codec":{
      "class":"org.redisson.codec.JsonJacksonCodec"
   },
   "transportMode":"NIO"
}

(2)通過調用config.fromYAML方法並指定一個File實例來實現讀取YAML格式的配置

Config config = Config.fromYAML(new File("config-file.yaml"));
RedissonClient redisson = Redisson.create(config);

config-file.yaml示例:

singleServerConfig:
  #連接空閑超時,單位:毫秒
  idleConnectionTimeout: 10000
  pingTimeout: 1000
  #連接超時,單位:毫秒
  connectTimeout: 10000
  #命令等待超時,單位:毫秒
  timeout: 3000
  #命令失敗重試次數
  retryAttempts: 3
  #命令重試發送時間間隔,單位:毫秒
  retryInterval: 1500
  #重新連接時間間隔,單位:毫秒
  reconnectionTimeout: 3000
  #執行失敗最大次數
  failedAttempts: 3
  #單個連接最大訂閱數量
  subscriptionsPerConnection: 5
  #客戶端名稱
  clientName: null
  #地址
  address: "redis://192.168.1.16:6379"
  #數據庫編號
  database: 0
  #密碼
  password: xiaokong
  #發布和訂閱連接的最小空閑連接數
  subscriptionConnectionMinimumIdleSize: 1 
  #發布和訂閱連接池大小
  subscriptionConnectionPoolSize: 50
  #最小空閑連接數
  connectionMinimumIdleSize: 32
  #連接池大小
  connectionPoolSize: 64
  #是否啟用DNS監測
  dnsMonitoring: false
  #DNS監測時間間隔,單位:毫秒
  dnsMonitoringInterval: 5000
threads: 0
nettyThreads: 0
codec: !<org.redisson.codec.JsonJacksonCodec> {}
transportMode : "NIO"

Config類

Redisson程序化的配置方法是通過構建Config對象實例來實現的。

以下是org.redisson.Config類的配置參數,它適用於所有Redis組態模式(單機,集群和哨兵)。

codec(編碼)

默認值是:org.redisson.codec.JsonJacksonCodec

Redisson的對象編碼類是用於將對象進行序列化和反序列化,以實現對該對象在Redis里的讀取和存儲。Redisson提供了以下幾種的對象編碼應用,以供大家選擇:

編碼類名稱 說明
org.redisson.codec.JsonJacksonCodec Jackson JSON 編碼 默認編碼
org.redisson.codec.KryoCodec Kryo 二進制對象序列化編碼
org.redisson.codec.SerializationCodec JDK序列化編碼
org.redisson.codec.FstCodec FST 10倍於JDK序列化性能而且100%兼容的編碼
org.redisson.client.codec.JsonJacksonMapCodec 基於Jackson的映射類使用的編碼。可用於避免序列化類的信息,以及用於解決使用byte[]遇到的問題。
org.redisson.client.codec.ByteArrayCodec 字節數組編碼
org.redisson.codec.CompositeCodec 用來組合多種不同編碼在一起

屬性配置

屬性名 屬性簡介 默認值 詳細說明
threads  線程池數量 當前處理核數量 * 2  這個線程池數量被所有RTopic對象監聽器,RRemoteService調用者和RExecutorService任務共同共享。 
nettyThreads  Netty線程池數量  當前處理核數量 * 2  

這個線程池數量是在一個Redisson實例內,被其創建的所有分布式數據類型和服務,以及底層客戶端所一同共享的線程池里保存的線程數量。 

executor  線程池    單獨提供一個用來執行所有RTopic對象監聽器,RRemoteService調用者和RExecutorService任務的線程池(ExecutorService)實例。 
eventLoopGroup     

用於特別指定一個EventLoopGroup. EventLoopGroup是用來處理所有通過Netty與Redis服務之間的連接發送和接受的消息。

每一個Redisson都會在默認情況下自己創建管理一個EventLoopGroup實例。

因此,如果在同一個JVM里面可能存在多個Redisson實例的情況下,采取這個配置實現多個Redisson實例共享一個EventLoopGroup的目的。

只有io.netty.channel.epoll.EpollEventLoopGroup或io.netty.channel.nio.NioEventLoopGroup才是允許的類型。

 
transportMode  傳輸模式  TransportMode.NIO 

可選參數:
TransportMode.NIO
TransportMode.EPOLL - 需要依賴里有netty-transport-native-epoll包(Linux)
TransportMode.KQUEUE - 需要依賴里有 netty-transport-native-kqueue包(macOS)

 
lockWatchdogTimeout 

監控鎖的看門狗超時。單位:毫秒

 
30000 

監控鎖的看門狗超時時間單位為毫秒。該參數只適用於分布式鎖的加鎖請求中未明確使用leaseTimeout參數的情況。

如果該看門口未使用lockWatchdogTimeout去重新調整一個分布式鎖的lockWatchdogTimeout超時,那么這個鎖將變為失效狀態。

這個參數可以用來避免由Redisson客戶端節點宕機或其他原因造成死鎖的情況。 

keepPubSubOrder  保持訂閱發布順序  true  通過該參數來修改是否按訂閱發布消息的接收順序出來消息,如果選否將對消息實行並行處理,該參數只適用於訂閱發布消息的情況。 
performanceMode  高性能模式 

HIGHER_THROUGHPUT

 

用來指定高性能引擎的行為。由於該變量值的選用與使用場景息息相關(NORMAL除外)我們建議對每個參數值都進行嘗試。

該參數僅限於Redisson PRO版本。

可選模式:

HIGHER_THROUGHPUT - 將高性能引擎切換到 高通量 模式。

LOWER_LATENCY_AUTO - 將高性能引擎切換到 低延時 模式並自動探測最佳設定。

LOWER_LATENCY_MODE_1 - 將高性能引擎切換到 低延時 模式並調整到預設模式1。 LOWER_LATENCY_MODE_2 - 將高性能引擎切換到 低延時 模式並調整到預設模式2。

NORMAL - 將高性能引擎切換到 普通 模式

ClusterServersConfig類

ClusterServersConfig clusterConfig = config.useClusterServers();

nodeAddresses(添加節點地址)
可以通過host:port的格式來添加Redis集群節點的地址。多個節點可以一次性批量添加。

scanInterval(集群掃描間隔時間)
默認值: 1000,對Redis集群節點狀態掃描的時間間隔。單位是毫秒。

slots(分片數量)
默認值: 231用於指定數據分片過程中的分片數量。支持數據分片/框架結構有:集(Set)、映射(Map)、BitSet、Bloom filter, Spring Cache和Hibernate Cache等.

readMode(讀取操作的負載均衡模式)
默認值: SLAVE(只在從服務節點里讀取)注:在從服務節點里讀取的數據說明已經至少有兩個節點保存了該數據,確保了數據的高可用性。設置讀取操作選擇節點的模式。可用值為:SLAVE - 只在從服務節點里讀取。MASTER - 只在主服務節點里讀取。MASTER_SLAVE - 在主從服務節點里都可以讀取。

subscriptionMode(訂閱操作的負載均衡模式)
默認值:SLAVE(只在從服務節點里訂閱),設置訂閱操作選擇節點的模式。可用值為:SLAVE - 只在從服務節點里訂閱。MASTER - 只在主服務節點里訂閱。

loadBalancer(負載均衡算法類的選擇)
默認值: org.redisson.connection.balancer.RoundRobinLoadBalancer

在多Redis服務節點的環境里,可以選用以下幾種負載均衡方式選擇一個節點:org.redisson.connection.balancer.WeightedRoundRobinBalancer - 權重輪詢調度算法org.redisson.connection.balancer.RoundRobinLoadBalancer - 輪詢調度算法org.redisson.connection.balancer.RandomLoadBalancer - 隨機調度算法

subscriptionConnectionMinimumIdleSize(從節點發布和訂閱連接的最小空閑連接數)
默認值:1。多從節點的環境里,每個 從服務節點里用於發布和訂閱連接的最小保持連接數(長連接)。Redisson內部經常通過發布和訂閱來實現許多功能。長期保持一定數量的發布訂閱連接是必須的。

subscriptionConnectionPoolSize(從節點發布和訂閱連接池大小)
默認值:50。多從節點的環境里,每個 從服務節點里用於發布和訂閱連接的連接池最大容量。連接池的連接數量自動彈性伸縮。

slaveConnectionMinimumIdleSize(從節點最小空閑連接數)
默認值:32。多從節點的環境里,每個 從服務節點里用於普通操作(非 發布和訂閱)的最小保持連接數(長連接)。長期保持一定數量的連接有利於提高瞬時讀取反映速度。

slaveConnectionPoolSize(從節點連接池大小)
默認值:64。多從節點的環境里,每個 從服務節點里用於普通操作(非 發布和訂閱)連接的連接池最大容量。連接池的連接數量自動彈性伸縮。

masterConnectionMinimumIdleSize(主節點最小空閑連接數)
默認值:32。多節點的環境里,每個 主節點的最小保持連接數(長連接)。長期保持一定數量的連接有利於提高瞬時寫入反應速度。

masterConnectionPoolSize(主節點連接池大小)
默認值:64。多主節點的環境里,每個 主節點的連接池最大容量。連接池的連接數量自動彈性伸縮。

idleConnectionTimeout(連接空閑超時,單位:毫秒)
默認值:10000。如果當前連接池里的連接數量超過了最小空閑連接數,而同時有連接空閑時間超過了該數值,那么這些連接將會自動被關閉,並從連接池里去掉。時間單位是毫秒。

connectTimeout(連接超時,單位:毫秒)
默認值:10000。同任何節點建立連接時的等待超時。時間單位是毫秒。

timeout(命令等待超時,單位:毫秒)
默認值:3000。等待節點回復命令的時間。該時間從命令發送成功時開始計時。

retryAttempts(命令失敗重試次數)
默認值:3。如果嘗試達到 retryAttempts(命令失敗重試次數) 仍然不能將命令發送至某個指定的節點時,將拋出錯誤。如果嘗試在此限制之內發送成功,則開始啟用 timeout(命令等待超時) 計時。

retryInterval(命令重試發送時間間隔,單位:毫秒)
默認值:1500。在一條命令發送失敗以后,等待重試發送的時間間隔。時間單位是毫秒。

reconnectionTimeout(重新連接時間間隔,單位:毫秒)
默認值:3000。當與某個節點的連接斷開時,等待與其重新建立連接的時間間隔。時間單位是毫秒。

failedAttempts(執行失敗最大次數)
默認值:3。在某個節點執行相同或不同命令時,連續 失敗 failedAttempts(執行失敗最大次數) 時,該節點將被從可用節點列表里清除,直到 reconnectionTimeout(重新連接時間間隔) 超時以后再次嘗試。

password(密碼)
默認值:null。用於節點身份驗證的密碼。

subscriptionsPerConnection(單個連接最大訂閱數量)
默認值:5。每個連接的最大訂閱數量。

clientName(客戶端名稱)
默認值:null。在Redis節點里顯示的客戶端名稱。

sslEnableEndpointIdentification(啟用SSL終端識別)
默認值:true。開啟SSL終端識別能力。

sslProvider(SSL實現方式)
默認值:JDK。確定采用哪種方式(JDK或OPENSSL)來實現SSL連接。

sslTruststore(SSL信任證書庫路徑)
默認值:null。指定SSL信任證書庫的路徑。

sslTruststorePassword(SSL信任證書庫密碼)
默認值:null。指定SSL信任證書庫的密碼。

sslKeystore(SSL鑰匙庫路徑)
默認值:null。指定SSL鑰匙庫的路徑。

sslKeystorePassword(SSL鑰匙庫密碼)
默認值:null。指定SSL鑰匙庫的密碼。

SingleServerConfig類

SingleServerConfig singleConfig = config.useSingleServer();

address(節點地址)
可以通過host:port的格式來指定節點地址。

subscriptionConnectionMinimumIdleSize(發布和訂閱連接的最小空閑連接數)
默認值:1。用於發布和訂閱連接的最小保持連接數(長連接)。Redisson內部經常通過發布和訂閱來實現許多功能。長期保持一定數量的發布訂閱連接是必須的。

subscriptionConnectionPoolSize(發布和訂閱連接池大小)
默認值:50。用於發布和訂閱連接的連接池最大容量。連接池的連接數量自動彈性伸縮。

connectionMinimumIdleSize(最小空閑連接數)
默認值:32。最小保持連接數(長連接)。長期保持一定數量的連接有利於提高瞬時寫入反應速度。

connectionPoolSize(連接池大小)
默認值:64。連接池最大容量。連接池的連接數量自動彈性伸縮。

dnsMonitoring(是否啟用DNS監測)
默認值:false。在啟用該功能以后,Redisson將會監測DNS的變化情況。

dnsMonitoringInterval(DNS監測時間間隔,單位:毫秒)
默認值:5000。監測DNS的變化情況的時間間隔。

idleConnectionTimeout(連接空閑超時,單位:毫秒)
默認值:10000。如果當前連接池里的連接數量超過了最小空閑連接數,而同時有連接空閑時間超過了該數值,那么這些連接將會自動被關閉,並從連接池里去掉。時間單位是毫秒。

connectTimeout(連接超時,單位:毫秒)
默認值:10000。同節點建立連接時的等待超時。時間單位是毫秒。

timeout(命令等待超時,單位:毫秒)
默認值:3000。等待節點回復命令的時間。該時間從命令發送成功時開始計時。

retryAttempts(命令失敗重試次數)
默認值:3。如果嘗試達到 retryAttempts(命令失敗重試次數) 仍然不能將命令發送至某個指定的節點時,將拋出錯誤。如果嘗試在此限制之內發送成功,則開始啟用 timeout(命令等待超時) 計時。

retryInterval(命令重試發送時間間隔,單位:毫秒)
默認值:1500。在一條命令發送失敗以后,等待重試發送的時間間隔。時間單位是毫秒。

reconnectionTimeout(重新連接時間間隔,單位:毫秒)
默認值:3000。當與某個節點的連接斷開時,等待與其重新建立連接的時間間隔。時間單位是毫秒。

failedAttempts(執行失敗最大次數)
默認值:3。在某個節點執行相同或不同命令時,連續 失敗 failedAttempts(執行失敗最大次數) 時,該節點將被從可用節點列表里清除,直到 reconnectionTimeout(重新連接時間間隔) 超時以后再次嘗試。

database(數據庫編號)
默認值:0。嘗試連接的數據庫編號。

password(密碼)
默認值:null。用於節點身份驗證的密碼。

subscriptionsPerConnection(單個連接最大訂閱數量)
默認值:5。每個連接的最大訂閱數量。

clientName(客戶端名稱)
默認值:null。在Redis節點里顯示的客戶端名稱。

sslEnableEndpointIdentification(啟用SSL終端識別)
默認值:true。開啟SSL終端識別能力。

sslProvider(SSL實現方式)
默認值:JDK。確定采用哪種方式(JDK或OPENSSL)來實現SSL連接。

sslTruststore(SSL信任證書庫路徑)
默認值:null。指定SSL信任證書庫的路徑。

sslTruststorePassword(SSL信任證書庫密碼)
默認值:null。指定SSL信任證書庫的密碼。

sslKeystore(SSL鑰匙庫路徑)
默認值:null。指定SSL鑰匙庫的路徑。

sslKeystorePassword(SSL鑰匙庫密碼)
默認值:null。指定SSL鑰匙庫的密碼。

SentinelServersConfig類

SentinelServersConfig sentinelConfig = config.useSentinelServers();

dnsMonitoringInterval(DNS監控間隔,單位:毫秒)
默認值:5000。用來指定檢查節點DNS變化的時間間隔。使用的時候應該確保JVM里的DNS數據的緩存時間保持在足夠低的范圍才有意義。用-1來禁用該功能。

masterName(主服務器的名稱)
主服務器的名稱是哨兵進程中用來監測主從服務切換情況的。

addSentinelAddress(添加哨兵節點地址)
可以通過host:port的格式來指定哨兵節點的地址。多個節點可以一次性批量添加。

readMode(讀取操作的負載均衡模式)
默認值: SLAVE(只在從服務節點里讀取)注:在從服務節點里讀取的數據說明已經至少有兩個節點保存了該數據,確保了數據的高可用性。

設置讀取操作選擇節點的模式。可用值為:SLAVE - 只在從服務節點里讀取。MASTER - 只在主服務節點里讀取。MASTER_SLAVE - 在主從服務節點里都可以讀取。

subscriptionMode(訂閱操作的負載均衡模式)
默認值:SLAVE(只在從服務節點里訂閱)。設置訂閱操作選擇節點的模式。可用值為:SLAVE - 只在從服務節點里訂閱。MASTER - 只在主服務節點里訂閱。

loadBalancer(負載均衡算法類的選擇)
默認值: org.redisson.connection.balancer.RoundRobinLoadBalancer

在使用多個Redis服務節點的環境里,可以選用以下幾種負載均衡方式選擇一個節點:org.redisson.connection.balancer.WeightedRoundRobinBalancer - 權重輪詢調度算法org.redisson.connection.balancer.RoundRobinLoadBalancer - 輪詢調度算法org.redisson.connection.balancer.RandomLoadBalancer - 隨機調度算法

subscriptionConnectionMinimumIdleSize(從節點發布和訂閱連接的最小空閑連接數)
默認值:1。多從節點的環境里,每個 從服務節點里用於發布和訂閱連接的最小保持連接數(長連接)。Redisson內部經常通過發布和訂閱來實現許多功能。長期保持一定數量的發布訂閱連接是必須的。

subscriptionConnectionPoolSize(從節點發布和訂閱連接池大小)
默認值:50。多從節點的環境里,每個 從服務節點里用於發布和訂閱連接的連接池最大容量。連接池的連接數量自動彈性伸縮。

slaveConnectionMinimumIdleSize(從節點最小空閑連接數)
默認值:32。多從節點的環境里,每個 從服務節點里用於普通操作(非 發布和訂閱)的最小保持連接數(長連接)。長期保持一定數量的連接有利於提高瞬時讀取反映速度。

slaveConnectionPoolSize(從節點連接池大小)
默認值:64。多從節點的環境里,每個 從服務節點里用於普通操作(非 發布和訂閱)連接的連接池最大容量。連接池的連接數量自動彈性伸縮。

masterConnectionMinimumIdleSize(主節點最小空閑連接數)
默認值:32。多從節點的環境里,每個 主節點的最小保持連接數(長連接)。長期保持一定數量的連接有利於提高瞬時寫入反應速度。

masterConnectionPoolSize(主節點連接池大小)
默認值:64。主節點的連接池最大容量。連接池的連接數量自動彈性伸縮。

idleConnectionTimeout(連接空閑超時,單位:毫秒)
默認值:10000。如果當前連接池里的連接數量超過了最小空閑連接數,而同時有連接空閑時間超過了該數值,那么這些連接將會自動被關閉,並從連接池里去掉。時間單位是毫秒。

connectTimeout(連接超時,單位:毫秒)
默認值:10000。同任何節點建立連接時的等待超時。時間單位是毫秒。

timeout(命令等待超時,單位:毫秒)
默認值:3000。等待節點回復命令的時間。該時間從命令發送成功時開始計時。

retryAttempts(命令失敗重試次數)
默認值:3。如果嘗試達到 retryAttempts(命令失敗重試次數) 仍然不能將命令發送至某個指定的節點時,將拋出錯誤。如果嘗試在此限制之內發送成功,則開始啟用 timeout(命令等待超時) 計時。

retryInterval(命令重試發送時間間隔,單位:毫秒)
默認值:1500。在一條命令發送失敗以后,等待重試發送的時間間隔。時間單位是毫秒。

reconnectionTimeout(重新連接時間間隔,單位:毫秒)
默認值:3000。當與某個節點的連接斷開時,等待與其重新建立連接的時間間隔。時間單位是毫秒。

failedAttempts(執行失敗最大次數)
默認值:3。在某個節點執行相同或不同命令時,連續 失敗 failedAttempts(執行失敗最大次數) 時,該節點將被從可用節點列表里清除,直到 reconnectionTimeout(重新連接時間間隔) 超時以后再次嘗試。

database(數據庫編號)
默認值:0。嘗試連接的數據庫編號。

password(密碼)
默認值:null。用於節點身份驗證的密碼。

subscriptionsPerConnection(單個連接最大訂閱數量)
默認值:5。每個連接的最大訂閱數量。

clientName(客戶端名稱)
默認值:null。在Redis節點里顯示的客戶端名稱。

sslEnableEndpointIdentification(啟用SSL終端識別)
默認值:true。開啟SSL終端識別能力。

sslProvider(SSL實現方式)
默認值:JDK。確定采用哪種方式(JDK或OPENSSL)來實現SSL連接。

sslTruststore(SSL信任證書庫路徑)
默認值:null。指定SSL信任證書庫的路徑。

sslTruststorePassword(SSL信任證書庫密碼)
默認值:null。指定SSL信任證書庫的密碼。

sslKeystore(SSL鑰匙庫路徑)
默認值:null。指定SSL鑰匙庫的路徑。

sslKeystorePassword(SSL鑰匙庫密碼)
默認值:null。指定SSL鑰匙庫的密碼。

MasterSlaveServersConfig類

MasterSlaveServersConfig masterSlaveConfig = config.useMasterSlaveServers();

dnsMonitoringInterval(DNS監控間隔,單位:毫秒)
默認值:5000。用來指定檢查節點DNS變化的時間間隔。使用的時候應該確保JVM里的DNS數據的緩存時間保持在足夠低的范圍才有意義。用-1來禁用該功能。

masterAddress(主節點地址)
可以通過host:port的格式來指定主節點地址。

addSlaveAddress(添加從主節點地址)
可以通過host:port的格式來指定從節點的地址。多個節點可以一次性批量添加。

readMode(讀取操作的負載均衡模式)
默認值: SLAVE(只在從服務節點里讀取)。注:在從服務節點里讀取的數據說明已經至少有兩個節點保存了該數據,確保了數據的高可用性。

設置讀取操作選擇節點的模式。可用值為:SLAVE - 只在從服務節點里讀取。MASTER - 只在主服務節點里讀取。MASTER_SLAVE - 在主從服務節點里都可以讀取。

subscriptionMode(訂閱操作的負載均衡模式)
默認值:SLAVE(只在從服務節點里訂閱)。設置訂閱操作選擇節點的模式。可用值為:SLAVE - 只在從服務節點里訂閱。MASTER - 只在主服務節點里訂閱。

loadBalancer(負載均衡算法類的選擇)
默認值: org.redisson.connection.balancer.RoundRobinLoadBalancer

在使用多個Redis服務節點的環境里,可以選用以下幾種負載均衡方式選擇一個節點:org.redisson.connection.balancer.WeightedRoundRobinBalancer - 權重輪詢調度算法org.redisson.connection.balancer.RoundRobinLoadBalancer - 輪詢調度算法org.redisson.connection.balancer.RandomLoadBalancer - 隨機調度算法

subscriptionConnectionMinimumIdleSize(從節點發布和訂閱連接的最小空閑連接數)
默認值:1。多從節點的環境里,每個 從服務節點里用於發布和訂閱連接的最小保持連接數(長連接)。Redisson內部經常通過發布和訂閱來實現許多功能。長期保持一定數量的發布訂閱連接是必須的。

subscriptionConnectionPoolSize(從節點發布和訂閱連接池大小)
默認值:50。多從節點的環境里,每個 從服務節點里用於發布和訂閱連接的連接池最大容量。連接池的連接數量自動彈性伸縮。

slaveConnectionMinimumIdleSize(從節點最小空閑連接數)
默認值:32。多從節點的環境里,每個 從服務節點里用於普通操作(非 發布和訂閱)的最小保持連接數(長連接)。長期保持一定數量的連接有利於提高瞬時讀取反映速度。

slaveConnectionPoolSize(從節點連接池大小)
默認值:64。多從節點的環境里,每個 從服務節點里用於普通操作(非 發布和訂閱)連接的連接池最大容量。連接池的連接數量自動彈性伸縮。

masterConnectionMinimumIdleSize(主節點最小空閑連接數)
默認值:32。多從節點的環境里,每個 主節點的最小保持連接數(長連接)。長期保持一定數量的連接有利於提高瞬時寫入反應速度。

masterConnectionPoolSize(主節點連接池大小)
默認值:64。主節點的連接池最大容量。連接池的連接數量自動彈性伸縮。

idleConnectionTimeout(連接空閑超時,單位:毫秒)
默認值:10000。如果當前連接池里的連接數量超過了最小空閑連接數,而同時有連接空閑時間超過了該數值,那么這些連接將會自動被關閉,並從連接池里去掉。時間單位是毫秒。

connectTimeout(連接超時,單位:毫秒)
默認值:10000。同任何節點建立連接時的等待超時。時間單位是毫秒。

timeout(命令等待超時,單位:毫秒)
默認值:3000。等待節點回復命令的時間。該時間從命令發送成功時開始計時。

retryAttempts(命令失敗重試次數)
默認值:3。如果嘗試達到 retryAttempts(命令失敗重試次數) 仍然不能將命令發送至某個指定的節點時,將拋出錯誤。如果嘗試在此限制之內發送成功,則開始啟用 timeout(命令等待超時) 計時。

retryInterval(命令重試發送時間間隔,單位:毫秒)
默認值:1500。在一條命令發送失敗以后,等待重試發送的時間間隔。時間單位是毫秒。

reconnectionTimeout(重新連接時間間隔,單位:毫秒)
默認值:3000。當與某個節點的連接斷開時,等待與其重新建立連接的時間間隔。時間單位是毫秒。

failedAttempts(執行失敗最大次數)
默認值:3。在某個節點執行相同或不同命令時,連續 失敗 failedAttempts(執行失敗最大次數) 時,該節點將被從可用節點列表里清除,直到 reconnectionTimeout(重新連接時間間隔) 超時以后再次嘗試。

database(數據庫編號)
默認值:0。嘗試連接的數據庫編號。

password(密碼)
默認值:null。用於節點身份驗證的密碼。

subscriptionsPerConnection(單個連接最大訂閱數量)
默認值:5。每個連接的最大訂閱數量。

clientName(客戶端名稱)
默認值:null。在Redis節點里顯示的客戶端名稱。

sslEnableEndpointIdentification(啟用SSL終端識別)
默認值:true。開啟SSL終端識別能力。

sslProvider(SSL實現方式)
默認值:JDK。確定采用哪種方式(JDK或OPENSSL)來實現SSL連接。

sslTruststore(SSL信任證書庫路徑)
默認值:null。指定SSL信任證書庫的路徑。

sslTruststorePassword(SSL信任證書庫密碼)
默認值:null

指定SSL信任證書庫的密碼。

sslKeystore(SSL鑰匙庫路徑)
默認值:null。指定SSL鑰匙庫的路徑。

sslKeystorePassword(SSL鑰匙庫密碼)
默認值:null。指定SSL鑰匙庫的密碼。

測試類

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(classes={RootConfig.class, WebConfig.class}) //如果是Java Config配置的,指定相關的配置類
@WebAppConfiguration
public class RedissionTest {

    @Autowired
    private RedissonClient redissionClusterNodes;

    @Test
    public void testConnect(){
        RBucket<String> stringRBucket = redissionClusterNodes.getBucket("test");
        //設值
        stringRBucket.set("1");
        System.out.println(stringRBucket.get());
    }
}

Redisson 操作API

分布式對象

分布式Object

通過泛型指定對象類型

RBucket<Integer> bucket = redissionClusterNodes.getBucket("test");
bucket.set(1);
Integer obj = bucket.get();
bucket.trySet(3);
bucket.compareAndSet(4, 5);
bucket.getAndSet(6);

分布式BitSet

BitSet是一個bit數據集,類似bit數組,和Set接口沒啥關系。對於存儲一些需要按位操作的數據是很理想的數據結構。對應於Java中 Java.util.BitSet。

RBucket<Integer> bucket = redissionClusterNodes.getBucket("test");
bucket.set(1);
Integer obj = bucket.get();
bucket.trySet(3);
bucket.compareAndSet(4, 5);
bucket.getAndSet(6);

分布式Lock

RLock lock = redissionClusterNodes.getLock("anyLock");
// Most familiar locking method
lock.lock();
// Lock time-to-live support
// releases lock automatically after 10 seconds
// if unlock method not invoked
lock.lock(10, TimeUnit.SECONDS);
// Wait for 100 seconds and automatically unlock it after 10 seconds
boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS);
//...
lock.unlock();

一般我們在程序中的使用方式都是:

RLock lock = redissionClusterNodes.getLock("anyLock");
lock.lock();
try {
    ...
} finally {
    lock.unlock();
}

有時 redission分布式鎖釋放異常問題

分布式ReadWriteLock

RLock lock1 = redissionClusterNodes.getLock("lock1");
RLock lock2 = redissionClusterNodes.getLock("lock2");
RLock lock3 = redissionClusterNodes.getLock("lock3");
RedissonMultiLock lock = new RedissonMultiLock(lock1, lock2, lock3);
lock.lock();

分布式ReadWriteLock

RReadWriteLock rwlock = redissionClusterNodes.getReadWriteLock("anyRWLock");
// Most familiar locking method
rwlock.readLock().lock();
// or
rwlock.writeLock().lock();
// Lock time-to-live support
// releases lock automatically after 10 seconds
// if unlock method not invoked
rwlock.readLock().lock(10, TimeUnit.SECONDS);
// or
rwlock.writeLock().lock(10, TimeUnit.SECONDS);
// Wait for 100 seconds and automatically unlock it after 10 seconds
boolean res = rwlock.readLock().tryLock(100, 10, TimeUnit.SECONDS);
// or
boolean res = rwlock.writeLock().tryLock(100, 10, TimeUnit.SECONDS);

分布式Semaphore

RSemaphore semaphore = redissionClusterNodes.getSemaphore("semaphore");
semaphore.acquire();
semaphore.acquire(23);
semaphore.tryAcquire();
semaphore.tryAcquire(23, TimeUnit.SECONDS);
semaphore.release(10);
semaphore.release();

分布式AtomicLong

RAtomicLong atomicLong = redissionClusterNodes.getAtomicLong("myAtomicLong");
atomicLong.set(3);
atomicLong.incrementAndGet();
atomicLong.get();

分布式AtomicDouble

RAtomicDouble atomicDouble = redissionClusterNodes.getAtomicDouble("myAtomicDouble");
atomicDouble.set(2.81);
atomicDouble.addAndGet(4.11);
atomicDouble.get();

分布式CountDownLatch

RCountDownLatch latch = redissionClusterNodes.getCountDownLatch("anyCountDownLatch");
latch.trySetCount(1);
latch.await();
// in other thread or other JVM
RCountDownLatch latch2 = redissionClusterNodes.getCountDownLatch("anyCountDownLatch");
latch2.countDown();

Topic

RTopic topic = redissionClusterNodes.getTopic("mytopic");
topic.addListener(String.class, new MessageListener<String>() {
    @Override
    public void onMessage(CharSequence channel, String s) {
        System.out.println("channel=" + channel); //mytopic
        System.out.println("接收到的消息:" + s);
    }
});
// in other thread or JVM
RTopic topic2 = redissionClusterNodes.getTopic("mytopic");
long clientsReceivedMessage = topic.publish("1234567");
System.out.println(clientsReceivedMessage);

Topic patttern

// subscribe to all topics by `topic1.*` pattern
RPatternTopic topic1 = redissionClusterNodes.getPatternTopic("topic1.*");
topic1.addListener(String.class, new PatternMessageListener<String>() {
    @Override
    public void onMessage(CharSequence pattern, CharSequence channel, String msg) {
        System.out.println("pattern=" + pattern); //topic1.*
        System.out.println("channel=" + channel); //topic1.a
        System.out.println("msg=" + msg); //1234567
    }
});
RTopic topic2 = redissionClusterNodes.getTopic("topic1.a");
long clientsReceivedMessage = topic2.publish("1234567");
System.out.println(clientsReceivedMessage);

分布式集合

分布式Map

除此之外,還支持Multimap。

RMap<String, String> map = redissionClusterNodes.getMap("anyMap");
map.put("name", "hello");
//如果不存在,那么會向map中添加該鍵值對,並返回null。
//如果已經存在,那么不會覆蓋已有的值,直接返回已經存在的值。
String currentValue = map.putIfAbsent("name", "harvey");
System.out.println("currentValue=" + currentValue);

//刪除鍵的值,返回相應的值
String obj = map.remove("name");
System.out.println("obj=" + obj);

map.fastPut("name", "test");
map.fastRemove("name");

//異步設值,馬上取無法取到設置的值
Future<String> putAsyncFuture = map.putAsync("alia", "cool");
System.out.println("asyncFuture value = " + putAsyncFuture.get());
//快速異步設值,只返回true或false,如果已存在返回的是false
RFuture<Boolean> rFuture = map.fastPutAsync("alia4", "mixue");
System.out.println("asyncFuture value boolean = " + rFuture.get());

map.fastPutAsync("321", "6666666");
map.fastRemoveAsync("321");

Map eviction

現在Redis沒有過期清空Map中的某個entry的功能,只能是清空Map所有的entry。Redission提供了這種功能。

RMapCache<Object, Object> mapCache = redissionClusterNodes.getMapCache("anyMap");
// ttl = 10 minutes,
mapCache.put("key1", "hello", 10, TimeUnit.MINUTES);
// ttl = 10 minutes, maxIdleTime = 10 seconds
mapCache.put("key1", "hello", 10, TimeUnit.MINUTES, 10, TimeUnit.SECONDS);
// ttl = 3 seconds
mapCache.putIfAbsent("key2", "hello", 3, TimeUnit.SECONDS);
// ttl = 40 seconds, maxIdleTime = 10 seconds
mapCache.putIfAbsent("key2", "hello", 40, TimeUnit.SECONDS, 10, TimeUnit.SECONDS);

分布式Set

RSet<String> set = redissionClusterNodes.getSet("anySet");
set.add("11");
Set<String> stringSetting = set.readAll();
stringSetting.forEach(s->{
    System.out.println(s);
});
set.remove("11");

除此之外還有,還支持Set eviction, SortedSet, ScoredSortedSet, LexSortedSet。

分布式List

RList<String> list = redissionClusterNodes.getList("anyList");
list.add("11");
List<String> stringList = list.readAll();
System.out.println(stringList);
list.get(0);
list.remove("11");

分布式Blocking Queue

RBlockingQueue<String> queue = redissionClusterNodes.getBlockingQueue("anyQueue");
queue.offer("111");
//查看首個元素,不會移除首個元素,如果隊列是空的就返回null
String obj = queue.peek();
//查看首個元素,不會移除首個元素,如果隊列是空的就拋出異常NoSuchElementException
String obj2 = queue.element();
//將首個元素從隊列中彈出,如果隊列是空的,就返回null
String someObj = queue.poll();
String ob = queue.poll(10, TimeUnit.MINUTES);

除此之外,還支持Queue, Deque, Blocking Deque。

其他功能

執行批量命令

//future列表
LinkedList<RFuture<Object>> futures = new LinkedList<>();
//結果集
LinkedList<Object> result = new LinkedList<>();
RBatch batch = redissionClusterNodes.createBatch();
for (int i = 0; i < 100; i++) {
    RBucketAsync<Object> bucket = batch.getBucket("key" + i);
    RFuture<Object> async = bucket.getAsync();
    futures.add(async);
}
//批量執行
batch.execute();
while (futures.size() > 0) {
    RFuture<Object> first = futures.removeFirst();
    //獲取當前值,未完成時值為null,使用isDone方法區分value不存在還是任務未完成
    Object o = first.getNow();
    if (o != null) {
        result.add(o);
    } else {
        if (!first.isDone()) {
            futures.addLast(first);
        }
    }
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM