Redisson簡介
Redisson是一個在Redis的基礎上實現的Java駐內存數據網格(In-Memory Data Grid)。它不僅提供了一系列的分布式的Java常用對象,還提供了許多分布式服務。其中包括(BitSet, Set, Multimap, SortedSet, Map, List, Queue, BlockingQueue, Deque, BlockingDeque, Semaphore, Lock, AtomicLong, CountDownLatch, Publish / Subscribe, Bloom filter, Remote service, Spring cache, Executor service, Live Object service, Scheduler service) Redisson提供了使用Redis的最簡單和最便捷的方法。Redisson的宗旨是促進使用者對Redis的關注分離(Separation of Concern),從而讓使用者能夠將精力更集中地放在處理業務邏輯上。
快速入門
pom配置
<!--redisson-->
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.15.0</version>
</dependency>
Redission配置
程序化配置
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.MapperFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import com.fasterxml.jackson.datatype.jsr310.deser.LocalDateTimeDeserializer;
import com.fasterxml.jackson.datatype.jsr310.ser.LocalDateTimeSerializer;
import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.codec.JsonJacksonCodec;
import org.redisson.config.Config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.text.SimpleDateFormat;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.List;
@Configuration
public class RedissonConfig {
private static final String DEFAULT_DATE_TIME_FORMAT = "yyyy-MM-dd HH:mm:ss";
/**
* 單機模式自動裝配
* @return
*/
@Bean
public RedissonClient redissonSingle(){
Config config = new Config();
config.useSingleServer()
.setAddress("redis://localhost:6379")
.setConnectionMinimumIdleSize(10);
config.setCodec(new JsonJacksonCodec(objectMapper()));
RedissonClient redissonClient = Redisson.create(config);
return redissonClient;
}
/**
* 主從模式
* @return
*/
@Bean
public RedissonClient masterSlave(){
Config config = new Config();
config.useMasterSlaveServers()
//可以用"rediss://"來啟用SSL連接
.setMasterAddress("redis://127.0.0.1:6379")
.addSlaveAddress("redis://127.0.0.1:6389", "redis://127.0.0.1:6332", "redis://127.0.0.1:6419")
.addSlaveAddress("redis://127.0.0.1:6399");
return Redisson.create(config);
}
/**
* 哨兵模式自動裝配
* @return
*/
@Bean
public RedissonClient redissonSentinel(){
Config config = new Config();
List<String> newNodes = new ArrayList();
newNodes.add("redis://192.168.1.10:6379");
newNodes.add("redis://192.168.1.11:16379");
newNodes.add("redis://192.168.1.12:16379");
config.useSentinelServers()
.addSentinelAddress(newNodes.toArray(new String[0]))
.setMasterName("mymaster")
.setMasterConnectionPoolSize(250)
.setMasterConnectionMinimumIdleSize(20)
.setSlaveConnectionPoolSize(250)
.setSlaveConnectionMinimumIdleSize(20);
config.setCodec(new JsonJacksonCodec(objectMapper()));
return Redisson.create(config);
}
/**
* 集群模式自動裝配
* @return
*/
@Bean
public RedissonClient redissionClusterNodes(){
Config config = new Config();
List<String> newNodes = new ArrayList();
newNodes.add("redis://127.0.0.1:6380");
newNodes.add("redis://127.0.0.1:6381");
newNodes.add("redis://127.0.0.1:6382");
config.useClusterServers()
.addNodeAddress(newNodes.toArray(new String[0]));
config.setCodec(new JsonJacksonCodec(objectMapper()));
return Redisson.create(config);
}
public ObjectMapper objectMapper() {
ObjectMapper mapper = new ObjectMapper();
//大小寫不敏感
mapper.configure(MapperFeature.ACCEPT_CASE_INSENSITIVE_PROPERTIES, true);
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
mapper.setDateFormat(new SimpleDateFormat(DEFAULT_DATE_TIME_FORMAT));
JavaTimeModule javaTimeModule = new JavaTimeModule();
javaTimeModule.addSerializer(LocalDateTime.class, new LocalDateTimeSerializer(DateTimeFormatter.ofPattern(DEFAULT_DATE_TIME_FORMAT)));
javaTimeModule.addDeserializer(LocalDateTime.class, new LocalDateTimeDeserializer(DateTimeFormatter.ofPattern(DEFAULT_DATE_TIME_FORMAT)));
mapper.registerModule(javaTimeModule);
return mapper;
}
}
文件方式配置
Redisson也可以通過用戶提供的JSON或YAML格式的文本文件來配置:
(1)可以通過調用Config.fromJSON方法並指定一個File實例來實現讀取JSON格式的配置
Config config = Config.fromJSON(new File("config-file.json"));
RedissonClient redisson = Redisson.create(config);
config-file.json示例:
{
"clusterServersConfig":{
"idleConnectionTimeout":10000,
"pingTimeout":1000,
"connectTimeout":10000,
"timeout":3000,
"retryAttempts":3,
"retryInterval":1500,
"reconnectionTimeout":3000,
"failedAttempts":3,
"password":null,
"subscriptionsPerConnection":5,
"clientName":null,
"loadBalancer":{
"class":"org.redisson.connection.balancer.RoundRobinLoadBalancer"
},
"slaveSubscriptionConnectionMinimumIdleSize":1,
"slaveSubscriptionConnectionPoolSize":50,
"slaveConnectionMinimumIdleSize":32,
"slaveConnectionPoolSize":64,
"masterConnectionMinimumIdleSize":32,
"masterConnectionPoolSize":64,
"readMode":"SLAVE",
"nodeAddresses":[
"redis://127.0.0.1:7004",
"redis://127.0.0.1:7001",
"redis://127.0.0.1:7000"
],
"scanInterval":1000
},
"threads":0,
"nettyThreads": 0,
"codec":{
"class":"org.redisson.codec.JsonJacksonCodec"
},
"transportMode":"NIO"
}
(2)通過調用config.fromYAML方法並指定一個File實例來實現讀取YAML格式的配置
Config config = Config.fromYAML(new File("config-file.yaml"));
RedissonClient redisson = Redisson.create(config);
config-file.yaml示例:
singleServerConfig:
#連接空閑超時,單位:毫秒
idleConnectionTimeout: 10000
pingTimeout: 1000
#連接超時,單位:毫秒
connectTimeout: 10000
#命令等待超時,單位:毫秒
timeout: 3000
#命令失敗重試次數
retryAttempts: 3
#命令重試發送時間間隔,單位:毫秒
retryInterval: 1500
#重新連接時間間隔,單位:毫秒
reconnectionTimeout: 3000
#執行失敗最大次數
failedAttempts: 3
#單個連接最大訂閱數量
subscriptionsPerConnection: 5
#客戶端名稱
clientName: null
#地址
address: "redis://192.168.1.16:6379"
#數據庫編號
database: 0
#密碼
password: xiaokong
#發布和訂閱連接的最小空閑連接數
subscriptionConnectionMinimumIdleSize: 1
#發布和訂閱連接池大小
subscriptionConnectionPoolSize: 50
#最小空閑連接數
connectionMinimumIdleSize: 32
#連接池大小
connectionPoolSize: 64
#是否啟用DNS監測
dnsMonitoring: false
#DNS監測時間間隔,單位:毫秒
dnsMonitoringInterval: 5000
threads: 0
nettyThreads: 0
codec: !<org.redisson.codec.JsonJacksonCodec> {}
transportMode : "NIO"
Config類
Redisson程序化的配置方法是通過構建Config對象實例來實現的。
以下是org.redisson.Config類的配置參數,它適用於所有Redis組態模式(單機,集群和哨兵)。
codec(編碼)
默認值是:org.redisson.codec.JsonJacksonCodec
Redisson的對象編碼類是用於將對象進行序列化和反序列化,以實現對該對象在Redis里的讀取和存儲。Redisson提供了以下幾種的對象編碼應用,以供大家選擇:
編碼類名稱 | 說明 |
org.redisson.codec.JsonJacksonCodec | Jackson JSON 編碼 默認編碼 |
org.redisson.codec.KryoCodec | Kryo 二進制對象序列化編碼 |
org.redisson.codec.SerializationCodec | JDK序列化編碼 |
org.redisson.codec.FstCodec | FST 10倍於JDK序列化性能而且100%兼容的編碼 |
org.redisson.client.codec.JsonJacksonMapCodec | 基於Jackson的映射類使用的編碼。可用於避免序列化類的信息,以及用於解決使用byte[]遇到的問題。 |
org.redisson.client.codec.ByteArrayCodec | 字節數組編碼 |
org.redisson.codec.CompositeCodec | 用來組合多種不同編碼在一起 |
屬性配置
屬性名 | 屬性簡介 | 默認值 | 詳細說明 |
threads | 線程池數量 | 當前處理核數量 * 2 | 這個線程池數量被所有RTopic對象監聽器,RRemoteService調用者和RExecutorService任務共同共享。 |
nettyThreads | Netty線程池數量 | 當前處理核數量 * 2 | 這個線程池數量是在一個Redisson實例內,被其創建的所有分布式數據類型和服務,以及底層客戶端所一同共享的線程池里保存的線程數量。 |
executor | 線程池 | 單獨提供一個用來執行所有RTopic對象監聽器,RRemoteService調用者和RExecutorService任務的線程池(ExecutorService)實例。 | |
eventLoopGroup | 用於特別指定一個EventLoopGroup. EventLoopGroup是用來處理所有通過Netty與Redis服務之間的連接發送和接受的消息。 每一個Redisson都會在默認情況下自己創建管理一個EventLoopGroup實例。 因此,如果在同一個JVM里面可能存在多個Redisson實例的情況下,采取這個配置實現多個Redisson實例共享一個EventLoopGroup的目的。 只有io.netty.channel.epoll.EpollEventLoopGroup或io.netty.channel.nio.NioEventLoopGroup才是允許的類型。 |
||
transportMode | 傳輸模式 | TransportMode.NIO | 可選參數: |
lockWatchdogTimeout | 監控鎖的看門狗超時。單位:毫秒 |
30000 | 監控鎖的看門狗超時時間單位為毫秒。該參數只適用於分布式鎖的加鎖請求中未明確使用leaseTimeout參數的情況。 如果該看門口未使用lockWatchdogTimeout去重新調整一個分布式鎖的lockWatchdogTimeout超時,那么這個鎖將變為失效狀態。 這個參數可以用來避免由Redisson客戶端節點宕機或其他原因造成死鎖的情況。 |
keepPubSubOrder | 保持訂閱發布順序 | true | 通過該參數來修改是否按訂閱發布消息的接收順序出來消息,如果選否將對消息實行並行處理,該參數只適用於訂閱發布消息的情況。 |
performanceMode | 高性能模式 | HIGHER_THROUGHPUT |
用來指定高性能引擎的行為。由於該變量值的選用與使用場景息息相關(NORMAL除外)我們建議對每個參數值都進行嘗試。 該參數僅限於Redisson PRO版本。 可選模式: HIGHER_THROUGHPUT - 將高性能引擎切換到 高通量 模式。 LOWER_LATENCY_AUTO - 將高性能引擎切換到 低延時 模式並自動探測最佳設定。 LOWER_LATENCY_MODE_1 - 將高性能引擎切換到 低延時 模式並調整到預設模式1。 LOWER_LATENCY_MODE_2 - 將高性能引擎切換到 低延時 模式並調整到預設模式2。 NORMAL - 將高性能引擎切換到 普通 模式 |
ClusterServersConfig類
ClusterServersConfig clusterConfig = config.useClusterServers();
nodeAddresses(添加節點地址)
可以通過host:port的格式來添加Redis集群節點的地址。多個節點可以一次性批量添加。
scanInterval(集群掃描間隔時間)
默認值: 1000,對Redis集群節點狀態掃描的時間間隔。單位是毫秒。
slots(分片數量)
默認值: 231用於指定數據分片過程中的分片數量。支持數據分片/框架結構有:集(Set)、映射(Map)、BitSet、Bloom filter, Spring Cache和Hibernate Cache等.
readMode(讀取操作的負載均衡模式)
默認值: SLAVE(只在從服務節點里讀取)注:在從服務節點里讀取的數據說明已經至少有兩個節點保存了該數據,確保了數據的高可用性。設置讀取操作選擇節點的模式。可用值為:SLAVE - 只在從服務節點里讀取。MASTER - 只在主服務節點里讀取。MASTER_SLAVE - 在主從服務節點里都可以讀取。
subscriptionMode(訂閱操作的負載均衡模式)
默認值:SLAVE(只在從服務節點里訂閱),設置訂閱操作選擇節點的模式。可用值為:SLAVE - 只在從服務節點里訂閱。MASTER - 只在主服務節點里訂閱。
loadBalancer(負載均衡算法類的選擇)
默認值: org.redisson.connection.balancer.RoundRobinLoadBalancer
在多Redis服務節點的環境里,可以選用以下幾種負載均衡方式選擇一個節點:org.redisson.connection.balancer.WeightedRoundRobinBalancer - 權重輪詢調度算法org.redisson.connection.balancer.RoundRobinLoadBalancer - 輪詢調度算法org.redisson.connection.balancer.RandomLoadBalancer - 隨機調度算法
subscriptionConnectionMinimumIdleSize(從節點發布和訂閱連接的最小空閑連接數)
默認值:1。多從節點的環境里,每個 從服務節點里用於發布和訂閱連接的最小保持連接數(長連接)。Redisson內部經常通過發布和訂閱來實現許多功能。長期保持一定數量的發布訂閱連接是必須的。
subscriptionConnectionPoolSize(從節點發布和訂閱連接池大小)
默認值:50。多從節點的環境里,每個 從服務節點里用於發布和訂閱連接的連接池最大容量。連接池的連接數量自動彈性伸縮。
slaveConnectionMinimumIdleSize(從節點最小空閑連接數)
默認值:32。多從節點的環境里,每個 從服務節點里用於普通操作(非 發布和訂閱)的最小保持連接數(長連接)。長期保持一定數量的連接有利於提高瞬時讀取反映速度。
slaveConnectionPoolSize(從節點連接池大小)
默認值:64。多從節點的環境里,每個 從服務節點里用於普通操作(非 發布和訂閱)連接的連接池最大容量。連接池的連接數量自動彈性伸縮。
masterConnectionMinimumIdleSize(主節點最小空閑連接數)
默認值:32。多節點的環境里,每個 主節點的最小保持連接數(長連接)。長期保持一定數量的連接有利於提高瞬時寫入反應速度。
masterConnectionPoolSize(主節點連接池大小)
默認值:64。多主節點的環境里,每個 主節點的連接池最大容量。連接池的連接數量自動彈性伸縮。
idleConnectionTimeout(連接空閑超時,單位:毫秒)
默認值:10000。如果當前連接池里的連接數量超過了最小空閑連接數,而同時有連接空閑時間超過了該數值,那么這些連接將會自動被關閉,並從連接池里去掉。時間單位是毫秒。
connectTimeout(連接超時,單位:毫秒)
默認值:10000。同任何節點建立連接時的等待超時。時間單位是毫秒。
timeout(命令等待超時,單位:毫秒)
默認值:3000。等待節點回復命令的時間。該時間從命令發送成功時開始計時。
retryAttempts(命令失敗重試次數)
默認值:3。如果嘗試達到 retryAttempts(命令失敗重試次數) 仍然不能將命令發送至某個指定的節點時,將拋出錯誤。如果嘗試在此限制之內發送成功,則開始啟用 timeout(命令等待超時) 計時。
retryInterval(命令重試發送時間間隔,單位:毫秒)
默認值:1500。在一條命令發送失敗以后,等待重試發送的時間間隔。時間單位是毫秒。
reconnectionTimeout(重新連接時間間隔,單位:毫秒)
默認值:3000。當與某個節點的連接斷開時,等待與其重新建立連接的時間間隔。時間單位是毫秒。
failedAttempts(執行失敗最大次數)
默認值:3。在某個節點執行相同或不同命令時,連續 失敗 failedAttempts(執行失敗最大次數) 時,該節點將被從可用節點列表里清除,直到 reconnectionTimeout(重新連接時間間隔) 超時以后再次嘗試。
password(密碼)
默認值:null。用於節點身份驗證的密碼。
subscriptionsPerConnection(單個連接最大訂閱數量)
默認值:5。每個連接的最大訂閱數量。
clientName(客戶端名稱)
默認值:null。在Redis節點里顯示的客戶端名稱。
sslEnableEndpointIdentification(啟用SSL終端識別)
默認值:true。開啟SSL終端識別能力。
sslProvider(SSL實現方式)
默認值:JDK。確定采用哪種方式(JDK或OPENSSL)來實現SSL連接。
sslTruststore(SSL信任證書庫路徑)
默認值:null。指定SSL信任證書庫的路徑。
sslTruststorePassword(SSL信任證書庫密碼)
默認值:null。指定SSL信任證書庫的密碼。
sslKeystore(SSL鑰匙庫路徑)
默認值:null。指定SSL鑰匙庫的路徑。
sslKeystorePassword(SSL鑰匙庫密碼)
默認值:null。指定SSL鑰匙庫的密碼。
SingleServerConfig類
SingleServerConfig singleConfig = config.useSingleServer();
address(節點地址)
可以通過host:port的格式來指定節點地址。
subscriptionConnectionMinimumIdleSize(發布和訂閱連接的最小空閑連接數)
默認值:1。用於發布和訂閱連接的最小保持連接數(長連接)。Redisson內部經常通過發布和訂閱來實現許多功能。長期保持一定數量的發布訂閱連接是必須的。
subscriptionConnectionPoolSize(發布和訂閱連接池大小)
默認值:50。用於發布和訂閱連接的連接池最大容量。連接池的連接數量自動彈性伸縮。
connectionMinimumIdleSize(最小空閑連接數)
默認值:32。最小保持連接數(長連接)。長期保持一定數量的連接有利於提高瞬時寫入反應速度。
connectionPoolSize(連接池大小)
默認值:64。連接池最大容量。連接池的連接數量自動彈性伸縮。
dnsMonitoring(是否啟用DNS監測)
默認值:false。在啟用該功能以后,Redisson將會監測DNS的變化情況。
dnsMonitoringInterval(DNS監測時間間隔,單位:毫秒)
默認值:5000。監測DNS的變化情況的時間間隔。
idleConnectionTimeout(連接空閑超時,單位:毫秒)
默認值:10000。如果當前連接池里的連接數量超過了最小空閑連接數,而同時有連接空閑時間超過了該數值,那么這些連接將會自動被關閉,並從連接池里去掉。時間單位是毫秒。
connectTimeout(連接超時,單位:毫秒)
默認值:10000。同節點建立連接時的等待超時。時間單位是毫秒。
timeout(命令等待超時,單位:毫秒)
默認值:3000。等待節點回復命令的時間。該時間從命令發送成功時開始計時。
retryAttempts(命令失敗重試次數)
默認值:3。如果嘗試達到 retryAttempts(命令失敗重試次數) 仍然不能將命令發送至某個指定的節點時,將拋出錯誤。如果嘗試在此限制之內發送成功,則開始啟用 timeout(命令等待超時) 計時。
retryInterval(命令重試發送時間間隔,單位:毫秒)
默認值:1500。在一條命令發送失敗以后,等待重試發送的時間間隔。時間單位是毫秒。
reconnectionTimeout(重新連接時間間隔,單位:毫秒)
默認值:3000。當與某個節點的連接斷開時,等待與其重新建立連接的時間間隔。時間單位是毫秒。
failedAttempts(執行失敗最大次數)
默認值:3。在某個節點執行相同或不同命令時,連續 失敗 failedAttempts(執行失敗最大次數) 時,該節點將被從可用節點列表里清除,直到 reconnectionTimeout(重新連接時間間隔) 超時以后再次嘗試。
database(數據庫編號)
默認值:0。嘗試連接的數據庫編號。
password(密碼)
默認值:null。用於節點身份驗證的密碼。
subscriptionsPerConnection(單個連接最大訂閱數量)
默認值:5。每個連接的最大訂閱數量。
clientName(客戶端名稱)
默認值:null。在Redis節點里顯示的客戶端名稱。
sslEnableEndpointIdentification(啟用SSL終端識別)
默認值:true。開啟SSL終端識別能力。
sslProvider(SSL實現方式)
默認值:JDK。確定采用哪種方式(JDK或OPENSSL)來實現SSL連接。
sslTruststore(SSL信任證書庫路徑)
默認值:null。指定SSL信任證書庫的路徑。
sslTruststorePassword(SSL信任證書庫密碼)
默認值:null。指定SSL信任證書庫的密碼。
sslKeystore(SSL鑰匙庫路徑)
默認值:null。指定SSL鑰匙庫的路徑。
sslKeystorePassword(SSL鑰匙庫密碼)
默認值:null。指定SSL鑰匙庫的密碼。
SentinelServersConfig類
SentinelServersConfig sentinelConfig = config.useSentinelServers();
dnsMonitoringInterval(DNS監控間隔,單位:毫秒)
默認值:5000。用來指定檢查節點DNS變化的時間間隔。使用的時候應該確保JVM里的DNS數據的緩存時間保持在足夠低的范圍才有意義。用-1來禁用該功能。
masterName(主服務器的名稱)
主服務器的名稱是哨兵進程中用來監測主從服務切換情況的。
addSentinelAddress(添加哨兵節點地址)
可以通過host:port的格式來指定哨兵節點的地址。多個節點可以一次性批量添加。
readMode(讀取操作的負載均衡模式)
默認值: SLAVE(只在從服務節點里讀取)注:在從服務節點里讀取的數據說明已經至少有兩個節點保存了該數據,確保了數據的高可用性。
設置讀取操作選擇節點的模式。可用值為:SLAVE - 只在從服務節點里讀取。MASTER - 只在主服務節點里讀取。MASTER_SLAVE - 在主從服務節點里都可以讀取。
subscriptionMode(訂閱操作的負載均衡模式)
默認值:SLAVE(只在從服務節點里訂閱)。設置訂閱操作選擇節點的模式。可用值為:SLAVE - 只在從服務節點里訂閱。MASTER - 只在主服務節點里訂閱。
loadBalancer(負載均衡算法類的選擇)
默認值: org.redisson.connection.balancer.RoundRobinLoadBalancer
在使用多個Redis服務節點的環境里,可以選用以下幾種負載均衡方式選擇一個節點:org.redisson.connection.balancer.WeightedRoundRobinBalancer - 權重輪詢調度算法org.redisson.connection.balancer.RoundRobinLoadBalancer - 輪詢調度算法org.redisson.connection.balancer.RandomLoadBalancer - 隨機調度算法
subscriptionConnectionMinimumIdleSize(從節點發布和訂閱連接的最小空閑連接數)
默認值:1。多從節點的環境里,每個 從服務節點里用於發布和訂閱連接的最小保持連接數(長連接)。Redisson內部經常通過發布和訂閱來實現許多功能。長期保持一定數量的發布訂閱連接是必須的。
subscriptionConnectionPoolSize(從節點發布和訂閱連接池大小)
默認值:50。多從節點的環境里,每個 從服務節點里用於發布和訂閱連接的連接池最大容量。連接池的連接數量自動彈性伸縮。
slaveConnectionMinimumIdleSize(從節點最小空閑連接數)
默認值:32。多從節點的環境里,每個 從服務節點里用於普通操作(非 發布和訂閱)的最小保持連接數(長連接)。長期保持一定數量的連接有利於提高瞬時讀取反映速度。
slaveConnectionPoolSize(從節點連接池大小)
默認值:64。多從節點的環境里,每個 從服務節點里用於普通操作(非 發布和訂閱)連接的連接池最大容量。連接池的連接數量自動彈性伸縮。
masterConnectionMinimumIdleSize(主節點最小空閑連接數)
默認值:32。多從節點的環境里,每個 主節點的最小保持連接數(長連接)。長期保持一定數量的連接有利於提高瞬時寫入反應速度。
masterConnectionPoolSize(主節點連接池大小)
默認值:64。主節點的連接池最大容量。連接池的連接數量自動彈性伸縮。
idleConnectionTimeout(連接空閑超時,單位:毫秒)
默認值:10000。如果當前連接池里的連接數量超過了最小空閑連接數,而同時有連接空閑時間超過了該數值,那么這些連接將會自動被關閉,並從連接池里去掉。時間單位是毫秒。
connectTimeout(連接超時,單位:毫秒)
默認值:10000。同任何節點建立連接時的等待超時。時間單位是毫秒。
timeout(命令等待超時,單位:毫秒)
默認值:3000。等待節點回復命令的時間。該時間從命令發送成功時開始計時。
retryAttempts(命令失敗重試次數)
默認值:3。如果嘗試達到 retryAttempts(命令失敗重試次數) 仍然不能將命令發送至某個指定的節點時,將拋出錯誤。如果嘗試在此限制之內發送成功,則開始啟用 timeout(命令等待超時) 計時。
retryInterval(命令重試發送時間間隔,單位:毫秒)
默認值:1500。在一條命令發送失敗以后,等待重試發送的時間間隔。時間單位是毫秒。
reconnectionTimeout(重新連接時間間隔,單位:毫秒)
默認值:3000。當與某個節點的連接斷開時,等待與其重新建立連接的時間間隔。時間單位是毫秒。
failedAttempts(執行失敗最大次數)
默認值:3。在某個節點執行相同或不同命令時,連續 失敗 failedAttempts(執行失敗最大次數) 時,該節點將被從可用節點列表里清除,直到 reconnectionTimeout(重新連接時間間隔) 超時以后再次嘗試。
database(數據庫編號)
默認值:0。嘗試連接的數據庫編號。
password(密碼)
默認值:null。用於節點身份驗證的密碼。
subscriptionsPerConnection(單個連接最大訂閱數量)
默認值:5。每個連接的最大訂閱數量。
clientName(客戶端名稱)
默認值:null。在Redis節點里顯示的客戶端名稱。
sslEnableEndpointIdentification(啟用SSL終端識別)
默認值:true。開啟SSL終端識別能力。
sslProvider(SSL實現方式)
默認值:JDK。確定采用哪種方式(JDK或OPENSSL)來實現SSL連接。
sslTruststore(SSL信任證書庫路徑)
默認值:null。指定SSL信任證書庫的路徑。
sslTruststorePassword(SSL信任證書庫密碼)
默認值:null。指定SSL信任證書庫的密碼。
sslKeystore(SSL鑰匙庫路徑)
默認值:null。指定SSL鑰匙庫的路徑。
sslKeystorePassword(SSL鑰匙庫密碼)
默認值:null。指定SSL鑰匙庫的密碼。
MasterSlaveServersConfig類
MasterSlaveServersConfig masterSlaveConfig = config.useMasterSlaveServers();
dnsMonitoringInterval(DNS監控間隔,單位:毫秒)
默認值:5000。用來指定檢查節點DNS變化的時間間隔。使用的時候應該確保JVM里的DNS數據的緩存時間保持在足夠低的范圍才有意義。用-1來禁用該功能。
masterAddress(主節點地址)
可以通過host:port的格式來指定主節點地址。
addSlaveAddress(添加從主節點地址)
可以通過host:port的格式來指定從節點的地址。多個節點可以一次性批量添加。
readMode(讀取操作的負載均衡模式)
默認值: SLAVE(只在從服務節點里讀取)。注:在從服務節點里讀取的數據說明已經至少有兩個節點保存了該數據,確保了數據的高可用性。
設置讀取操作選擇節點的模式。可用值為:SLAVE - 只在從服務節點里讀取。MASTER - 只在主服務節點里讀取。MASTER_SLAVE - 在主從服務節點里都可以讀取。
subscriptionMode(訂閱操作的負載均衡模式)
默認值:SLAVE(只在從服務節點里訂閱)。設置訂閱操作選擇節點的模式。可用值為:SLAVE - 只在從服務節點里訂閱。MASTER - 只在主服務節點里訂閱。
loadBalancer(負載均衡算法類的選擇)
默認值: org.redisson.connection.balancer.RoundRobinLoadBalancer
在使用多個Redis服務節點的環境里,可以選用以下幾種負載均衡方式選擇一個節點:org.redisson.connection.balancer.WeightedRoundRobinBalancer - 權重輪詢調度算法org.redisson.connection.balancer.RoundRobinLoadBalancer - 輪詢調度算法org.redisson.connection.balancer.RandomLoadBalancer - 隨機調度算法
subscriptionConnectionMinimumIdleSize(從節點發布和訂閱連接的最小空閑連接數)
默認值:1。多從節點的環境里,每個 從服務節點里用於發布和訂閱連接的最小保持連接數(長連接)。Redisson內部經常通過發布和訂閱來實現許多功能。長期保持一定數量的發布訂閱連接是必須的。
subscriptionConnectionPoolSize(從節點發布和訂閱連接池大小)
默認值:50。多從節點的環境里,每個 從服務節點里用於發布和訂閱連接的連接池最大容量。連接池的連接數量自動彈性伸縮。
slaveConnectionMinimumIdleSize(從節點最小空閑連接數)
默認值:32。多從節點的環境里,每個 從服務節點里用於普通操作(非 發布和訂閱)的最小保持連接數(長連接)。長期保持一定數量的連接有利於提高瞬時讀取反映速度。
slaveConnectionPoolSize(從節點連接池大小)
默認值:64。多從節點的環境里,每個 從服務節點里用於普通操作(非 發布和訂閱)連接的連接池最大容量。連接池的連接數量自動彈性伸縮。
masterConnectionMinimumIdleSize(主節點最小空閑連接數)
默認值:32。多從節點的環境里,每個 主節點的最小保持連接數(長連接)。長期保持一定數量的連接有利於提高瞬時寫入反應速度。
masterConnectionPoolSize(主節點連接池大小)
默認值:64。主節點的連接池最大容量。連接池的連接數量自動彈性伸縮。
idleConnectionTimeout(連接空閑超時,單位:毫秒)
默認值:10000。如果當前連接池里的連接數量超過了最小空閑連接數,而同時有連接空閑時間超過了該數值,那么這些連接將會自動被關閉,並從連接池里去掉。時間單位是毫秒。
connectTimeout(連接超時,單位:毫秒)
默認值:10000。同任何節點建立連接時的等待超時。時間單位是毫秒。
timeout(命令等待超時,單位:毫秒)
默認值:3000。等待節點回復命令的時間。該時間從命令發送成功時開始計時。
retryAttempts(命令失敗重試次數)
默認值:3。如果嘗試達到 retryAttempts(命令失敗重試次數) 仍然不能將命令發送至某個指定的節點時,將拋出錯誤。如果嘗試在此限制之內發送成功,則開始啟用 timeout(命令等待超時) 計時。
retryInterval(命令重試發送時間間隔,單位:毫秒)
默認值:1500。在一條命令發送失敗以后,等待重試發送的時間間隔。時間單位是毫秒。
reconnectionTimeout(重新連接時間間隔,單位:毫秒)
默認值:3000。當與某個節點的連接斷開時,等待與其重新建立連接的時間間隔。時間單位是毫秒。
failedAttempts(執行失敗最大次數)
默認值:3。在某個節點執行相同或不同命令時,連續 失敗 failedAttempts(執行失敗最大次數) 時,該節點將被從可用節點列表里清除,直到 reconnectionTimeout(重新連接時間間隔) 超時以后再次嘗試。
database(數據庫編號)
默認值:0。嘗試連接的數據庫編號。
password(密碼)
默認值:null。用於節點身份驗證的密碼。
subscriptionsPerConnection(單個連接最大訂閱數量)
默認值:5。每個連接的最大訂閱數量。
clientName(客戶端名稱)
默認值:null。在Redis節點里顯示的客戶端名稱。
sslEnableEndpointIdentification(啟用SSL終端識別)
默認值:true。開啟SSL終端識別能力。
sslProvider(SSL實現方式)
默認值:JDK。確定采用哪種方式(JDK或OPENSSL)來實現SSL連接。
sslTruststore(SSL信任證書庫路徑)
默認值:null。指定SSL信任證書庫的路徑。
sslTruststorePassword(SSL信任證書庫密碼)
默認值:null
指定SSL信任證書庫的密碼。
sslKeystore(SSL鑰匙庫路徑)
默認值:null。指定SSL鑰匙庫的路徑。
sslKeystorePassword(SSL鑰匙庫密碼)
默認值:null。指定SSL鑰匙庫的密碼。
測試類
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(classes={RootConfig.class, WebConfig.class}) //如果是Java Config配置的,指定相關的配置類
@WebAppConfiguration
public class RedissionTest {
@Autowired
private RedissonClient redissionClusterNodes;
@Test
public void testConnect(){
RBucket<String> stringRBucket = redissionClusterNodes.getBucket("test");
//設值
stringRBucket.set("1");
System.out.println(stringRBucket.get());
}
}
Redisson 操作API
分布式對象
分布式Object
通過泛型指定對象類型
RBucket<Integer> bucket = redissionClusterNodes.getBucket("test");
bucket.set(1);
Integer obj = bucket.get();
bucket.trySet(3);
bucket.compareAndSet(4, 5);
bucket.getAndSet(6);
分布式BitSet
BitSet是一個bit數據集,類似bit數組,和Set接口沒啥關系。對於存儲一些需要按位操作的數據是很理想的數據結構。對應於Java中 Java.util.BitSet。
RBucket<Integer> bucket = redissionClusterNodes.getBucket("test");
bucket.set(1);
Integer obj = bucket.get();
bucket.trySet(3);
bucket.compareAndSet(4, 5);
bucket.getAndSet(6);
分布式Lock
RLock lock = redissionClusterNodes.getLock("anyLock");
// Most familiar locking method
lock.lock();
// Lock time-to-live support
// releases lock automatically after 10 seconds
// if unlock method not invoked
lock.lock(10, TimeUnit.SECONDS);
// Wait for 100 seconds and automatically unlock it after 10 seconds
boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS);
//...
lock.unlock();
一般我們在程序中的使用方式都是:
RLock lock = redissionClusterNodes.getLock("anyLock");
lock.lock();
try {
...
} finally {
lock.unlock();
}
分布式ReadWriteLock
RLock lock1 = redissionClusterNodes.getLock("lock1");
RLock lock2 = redissionClusterNodes.getLock("lock2");
RLock lock3 = redissionClusterNodes.getLock("lock3");
RedissonMultiLock lock = new RedissonMultiLock(lock1, lock2, lock3);
lock.lock();
分布式ReadWriteLock
RReadWriteLock rwlock = redissionClusterNodes.getReadWriteLock("anyRWLock");
// Most familiar locking method
rwlock.readLock().lock();
// or
rwlock.writeLock().lock();
// Lock time-to-live support
// releases lock automatically after 10 seconds
// if unlock method not invoked
rwlock.readLock().lock(10, TimeUnit.SECONDS);
// or
rwlock.writeLock().lock(10, TimeUnit.SECONDS);
// Wait for 100 seconds and automatically unlock it after 10 seconds
boolean res = rwlock.readLock().tryLock(100, 10, TimeUnit.SECONDS);
// or
boolean res = rwlock.writeLock().tryLock(100, 10, TimeUnit.SECONDS);
分布式Semaphore
RSemaphore semaphore = redissionClusterNodes.getSemaphore("semaphore");
semaphore.acquire();
semaphore.acquire(23);
semaphore.tryAcquire();
semaphore.tryAcquire(23, TimeUnit.SECONDS);
semaphore.release(10);
semaphore.release();
分布式AtomicLong
RAtomicLong atomicLong = redissionClusterNodes.getAtomicLong("myAtomicLong");
atomicLong.set(3);
atomicLong.incrementAndGet();
atomicLong.get();
分布式AtomicDouble
RAtomicDouble atomicDouble = redissionClusterNodes.getAtomicDouble("myAtomicDouble");
atomicDouble.set(2.81);
atomicDouble.addAndGet(4.11);
atomicDouble.get();
分布式CountDownLatch
RCountDownLatch latch = redissionClusterNodes.getCountDownLatch("anyCountDownLatch");
latch.trySetCount(1);
latch.await();
// in other thread or other JVM
RCountDownLatch latch2 = redissionClusterNodes.getCountDownLatch("anyCountDownLatch");
latch2.countDown();
Topic
RTopic topic = redissionClusterNodes.getTopic("mytopic");
topic.addListener(String.class, new MessageListener<String>() {
@Override
public void onMessage(CharSequence channel, String s) {
System.out.println("channel=" + channel); //mytopic
System.out.println("接收到的消息:" + s);
}
});
// in other thread or JVM
RTopic topic2 = redissionClusterNodes.getTopic("mytopic");
long clientsReceivedMessage = topic.publish("1234567");
System.out.println(clientsReceivedMessage);
Topic patttern
// subscribe to all topics by `topic1.*` pattern
RPatternTopic topic1 = redissionClusterNodes.getPatternTopic("topic1.*");
topic1.addListener(String.class, new PatternMessageListener<String>() {
@Override
public void onMessage(CharSequence pattern, CharSequence channel, String msg) {
System.out.println("pattern=" + pattern); //topic1.*
System.out.println("channel=" + channel); //topic1.a
System.out.println("msg=" + msg); //1234567
}
});
RTopic topic2 = redissionClusterNodes.getTopic("topic1.a");
long clientsReceivedMessage = topic2.publish("1234567");
System.out.println(clientsReceivedMessage);
分布式集合
分布式Map
除此之外,還支持Multimap。
RMap<String, String> map = redissionClusterNodes.getMap("anyMap");
map.put("name", "hello");
//如果不存在,那么會向map中添加該鍵值對,並返回null。
//如果已經存在,那么不會覆蓋已有的值,直接返回已經存在的值。
String currentValue = map.putIfAbsent("name", "harvey");
System.out.println("currentValue=" + currentValue);
//刪除鍵的值,返回相應的值
String obj = map.remove("name");
System.out.println("obj=" + obj);
map.fastPut("name", "test");
map.fastRemove("name");
//異步設值,馬上取無法取到設置的值
Future<String> putAsyncFuture = map.putAsync("alia", "cool");
System.out.println("asyncFuture value = " + putAsyncFuture.get());
//快速異步設值,只返回true或false,如果已存在返回的是false
RFuture<Boolean> rFuture = map.fastPutAsync("alia4", "mixue");
System.out.println("asyncFuture value boolean = " + rFuture.get());
map.fastPutAsync("321", "6666666");
map.fastRemoveAsync("321");
Map eviction
現在Redis沒有過期清空Map中的某個entry的功能,只能是清空Map所有的entry。Redission提供了這種功能。
RMapCache<Object, Object> mapCache = redissionClusterNodes.getMapCache("anyMap");
// ttl = 10 minutes,
mapCache.put("key1", "hello", 10, TimeUnit.MINUTES);
// ttl = 10 minutes, maxIdleTime = 10 seconds
mapCache.put("key1", "hello", 10, TimeUnit.MINUTES, 10, TimeUnit.SECONDS);
// ttl = 3 seconds
mapCache.putIfAbsent("key2", "hello", 3, TimeUnit.SECONDS);
// ttl = 40 seconds, maxIdleTime = 10 seconds
mapCache.putIfAbsent("key2", "hello", 40, TimeUnit.SECONDS, 10, TimeUnit.SECONDS);
分布式Set
RSet<String> set = redissionClusterNodes.getSet("anySet");
set.add("11");
Set<String> stringSetting = set.readAll();
stringSetting.forEach(s->{
System.out.println(s);
});
set.remove("11");
除此之外還有,還支持Set eviction, SortedSet, ScoredSortedSet, LexSortedSet。
分布式List
RList<String> list = redissionClusterNodes.getList("anyList");
list.add("11");
List<String> stringList = list.readAll();
System.out.println(stringList);
list.get(0);
list.remove("11");
分布式Blocking Queue
RBlockingQueue<String> queue = redissionClusterNodes.getBlockingQueue("anyQueue");
queue.offer("111");
//查看首個元素,不會移除首個元素,如果隊列是空的就返回null
String obj = queue.peek();
//查看首個元素,不會移除首個元素,如果隊列是空的就拋出異常NoSuchElementException
String obj2 = queue.element();
//將首個元素從隊列中彈出,如果隊列是空的,就返回null
String someObj = queue.poll();
String ob = queue.poll(10, TimeUnit.MINUTES);
除此之外,還支持Queue, Deque, Blocking Deque。
其他功能
執行批量命令
//future列表
LinkedList<RFuture<Object>> futures = new LinkedList<>();
//結果集
LinkedList<Object> result = new LinkedList<>();
RBatch batch = redissionClusterNodes.createBatch();
for (int i = 0; i < 100; i++) {
RBucketAsync<Object> bucket = batch.getBucket("key" + i);
RFuture<Object> async = bucket.getAsync();
futures.add(async);
}
//批量執行
batch.execute();
while (futures.size() > 0) {
RFuture<Object> first = futures.removeFirst();
//獲取當前值,未完成時值為null,使用isDone方法區分value不存在還是任務未完成
Object o = first.getNow();
if (o != null) {
result.add(o);
} else {
if (!first.isDone()) {
futures.addLast(first);
}
}
}