本文分享Spring中如何實現Redis響應式交互模式。
本文將模擬一個用戶服務,並使用Redis作為數據存儲服務器。
本文涉及兩個java bean,用戶與權益
public class User {
private long id;
private String name;
// 標簽
private String label;
// 收貨地址經度
private Double deliveryAddressLon;
// 收貨地址維度
private Double deliveryAddressLat;
// 最新簽到日
private String lastSigninDay;
// 積分
private Integer score;
// 權益
private List<Rights> rights;
...
}
public class Rights {
private Long id;
private Long userId;
private String name;
...
}
啟動
引入依賴
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis-reactive</artifactId>
</dependency>
添加Redis配置
spring.redis.host=192.168.56.102
spring.redis.port=6379
spring.redis.password=
spring.redis.timeout=5000
SpringBoot啟動
@SpringBootApplication
public class UserServiceReactive {
public static void main(String[] args) {
new SpringApplicationBuilder(
UserServiceReactive.class)
.web(WebApplicationType.REACTIVE).run(args);
}
}
應用啟動后,Spring會自動生成ReactiveRedisTemplate(它的底層框架是Lettuce)。
ReactiveRedisTemplate與RedisTemplate使用類似,但它提供的是異步的,響應式Redis交互方式。
這里再強調一下,響應式編程是異步的,ReactiveRedisTemplate發送Redis請求后不會阻塞線程,當前線程可以去執行其他任務。
等到Redis響應數據返回后,ReactiveRedisTemplate再調度線程處理響應數據。
響應式編程可以通過優雅的方式實現異步調用以及處理異步結果,正是它的最大的意義。
序列化
ReactiveRedisTemplate默認使用的序列化是Jdk序列化,我們可以配置為json序列化
@Bean
public RedisSerializationContext redisSerializationContext() {
RedisSerializationContext.RedisSerializationContextBuilder builder = RedisSerializationContext.newSerializationContext();
builder.key(StringRedisSerializer.UTF_8);
builder.value(RedisSerializer.json());
builder.hashKey(StringRedisSerializer.UTF_8);
builder.hashValue(StringRedisSerializer.UTF_8);
return builder.build();
}
@Bean
public ReactiveRedisTemplate reactiveRedisTemplate(ReactiveRedisConnectionFactory connectionFactory) {
RedisSerializationContext serializationContext = redisSerializationContext();
ReactiveRedisTemplate reactiveRedisTemplate = new ReactiveRedisTemplate(connectionFactory,serializationContext);
return reactiveRedisTemplate;
}
builder.hashValue方法指定Redis列表值的序列化方式,由於本文Redis列表值只存放字符串,所以還是設置為StringRedisSerializer.UTF_8。
基本數據類型
ReactiveRedisTemplate支持Redis字符串,散列,列表,集合,有序集合等基本的數據類型。
本文使用散列保存用戶信息,列表保存用戶權益,其他基本數據類型的使用本文不展開。
public Mono<Boolean> save(User user) {
ReactiveHashOperations<String, String, String> opsForHash = redisTemplate.opsForHash();
Mono<Boolean> userRs = opsForHash.putAll("user:" + user.getId(), beanToMap(user));
if(user.getRights() != null) {
ReactiveListOperations<String, Rights> opsForRights = redisTemplate.opsForList();
opsForRights.leftPushAll("user:rights:" + user.getId(), user.getRights()).subscribe(l -> {
logger.info("add rights:{}", l);
});
}
return userRs;
}
beanToMap方法負責將User類轉化為map。
HyperLogLog
Redis HyperLogLog結構可以統計一個集合內不同元素的數量。
使用HyperLogLog統計每天登錄的用戶量
public Mono<Long> login(User user) {
ReactiveHyperLogLogOperations<String, Long> opsForHyperLogLog = redisTemplate.opsForHyperLogLog();
return opsForHyperLogLog.add("user:login:number:" + LocalDateTime.now().toString().substring(0, 10), user.getId());
}
BitMap
Redis BitMap(位圖)通過一個Bit位表示某個元素對應的值或者狀態。由於Bit是計算機存儲中最小的單位,使用它進行儲存將非常節省空間。
使用BitMap記錄用戶本周是否有簽到
public void addSignInFlag(long userId) {
String key = "user:signIn:" + LocalDateTime.now().getDayOfYear()/7 + (userId >> 16);
redisTemplate.opsForValue().setBit(
key, userId & 0xffff , true)
.subscribe(b -> logger.info("set:{},result:{}", key, b));
}
userId高48位用於將用戶划分到不同的key,低16位作為位圖偏移參數offset。
offset參數必須大於或等於0,小於2^32(bit 映射被限制在 512 MB 之內)。
Geo
Redis Geo可以存儲地理位置信息,並對地理位置進行計算。
如查找給定范圍內的倉庫信息
public Flux getWarehouseInDist(User u, double dist) {
ReactiveGeoOperations<String, String> geo = redisTemplate.opsForGeo();
Circle circle = new Circle(new Point(u.getDeliveryAddressLon(), u.getDeliveryAddressLat()), dist);
RedisGeoCommands.GeoRadiusCommandArgs args =
RedisGeoCommands.GeoRadiusCommandArgs.newGeoRadiusArgs().includeDistance().sortAscending();
return geo.radius("warehouse:address", circle, args);
}
warehouse:address
這個集合中需要先保存好倉庫地理位置信息。
ReactiveGeoOperations#radius方法可以查找集合中地理位置在給定范圍內的元素,它中還支持添加元素到集合,計算集合中兩個元素地理位置距離等操作。
Lua
ReactiveRedisTemplate也可以執行Lua腳本。
下面通過Lua腳本完成用戶簽到邏輯:如果用戶今天未簽到,允許簽到,積分加1,如果用戶今天已簽到,則拒接操作。
public Flux<String> addScore(long userId) {
DefaultRedisScript<String> script = new DefaultRedisScript<>();
script.setScriptSource(new ResourceScriptSource(new ClassPathResource("/signin.lua")));
List<String> keys = new ArrayList<>();
keys.add(String.valueOf(userId));
keys.add(LocalDateTime.now().toString().substring(0, 10));
return redisTemplate.execute(script, keys);
}
signin.lua內容如下
local score=redis.call('hget','user:'..KEYS[1],'score')
local day=redis.call('hget','user:'..KEYS[1],'lastSigninDay')
if(day==KEYS[2])
then
return '0'
else
redis.call('hset','user:'..KEYS[1],'score', score+1,'lastSigninDay',KEYS[2])
return '1'
end
Stream
Redis Stream 是 Redis 5.0 版本新增加的數據類型。該類型可以實現消息隊列,並提供消息的持久化和主備復制功能,並且可以記住每一個客戶端的訪問位置,還能保證消息不丟失。
Redis借鑒了kafka的設計,一個Stream內可以存在多個消費組,一個消費組內可以存在多個消費者。
如果一個消費組內某個消費者消費了Stream中某條消息,則這消息不會被該消費組其他消費者消費到,當然,它還可以被其他消費組中某個消費者消費到。
下面定義一個Stream消費者,負責處理接收到的權益數據
@Component
public class RightsStreamConsumer implements ApplicationRunner, DisposableBean {
private static final Logger logger = LoggerFactory.getLogger(RightsStreamConsumer.class);
@Autowired
private RedisConnectionFactory redisConnectionFactory;
private StreamMessageListenerContainer<String, ObjectRecord<String, Rights>> container;
// Stream隊列
private static final String STREAM_KEY = "stream:user:rights";
// 消費組
private static final String STREAM_GROUP = "user-service";
// 消費者
private static final String STREAM_CONSUMER = "consumer-1";
@Autowired
@Qualifier("reactiveRedisTemplate")
private ReactiveRedisTemplate redisTemplate;
public void run(ApplicationArguments args) throws Exception {
StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, ObjectRecord<String, Rights>> options =
StreamMessageListenerContainer.StreamMessageListenerContainerOptions.builder()
.batchSize(100) //一批次拉取的最大count數
.executor(Executors.newSingleThreadExecutor()) //線程池
.pollTimeout(Duration.ZERO) //阻塞式輪詢
.targetType(Rights.class) //目標類型(消息內容的類型)
.build();
// 創建一個消息監聽容器
container = StreamMessageListenerContainer.create(redisConnectionFactory, options);
// prepareStreamAndGroup查找Stream信息,如果不存在,則創建Stream
prepareStreamAndGroup(redisTemplate.opsForStream(), STREAM_KEY , STREAM_GROUP)
.subscribe(stream -> {
// 為Stream創建一個消費者,並綁定處理類
container.receive(Consumer.from(STREAM_GROUP, STREAM_CONSUMER),
StreamOffset.create(STREAM_KEY, ReadOffset.lastConsumed()),
new StreamMessageListener());
container.start();
});
}
@Override
public void destroy() throws Exception {
container.stop();
}
// 查找Stream信息,如果不存在,則創建Stream
private Mono<StreamInfo.XInfoStream> prepareStreamAndGroup(ReactiveStreamOperations<String, ?, ?> ops, String stream, String group) {
// info方法查詢Stream信息,如果該Stream不存在,底層會報錯,這時會調用onErrorResume方法。
return ops.info(stream).onErrorResume(err -> {
logger.warn("query stream err:{}", err.getMessage());
// createGroup方法創建Stream
return ops.createGroup(stream, group).flatMap(s -> ops.info(stream));
});
}
// 消息處理對象
class StreamMessageListener implements StreamListener<String, ObjectRecord<String, Rights>> {
public void onMessage(ObjectRecord<String, Rights> message) {
// 處理消息
RecordId id = message.getId();
Rights rights = message.getValue();
logger.info("receive id:{},rights:{}", id, rights);
redisTemplate.opsForList().leftPush("user:rights:" + rights.getUserId(), rights).subscribe(l -> {
logger.info("add rights:{}", l);
});
}
}
}
下面看一下如何發送信息
public Mono<RecordId> addRights(Rights r) {
String streamKey = "stream:user:rights";//stream key
ObjectRecord<String, Rights> record = ObjectRecord.create(streamKey, r);
Mono<RecordId> mono = redisTemplate.opsForStream().add(record);
return mono;
}
創建一個消息記錄對象ObjectRecord,並通過ReactiveStreamOperations發送信息記錄。
Sentinel、Cluster
ReactiveRedisTemplate也支持Redis Sentinel、Cluster集群模式,只需要調整配置即可。
Sentinel配置如下
spring.redis.sentinel.master=mymaster
spring.redis.sentinel.nodes=172.17.0.4:26379,172.17.0.5:26379,172.17.0.6:26379
spring.redis.sentinel.password=
spring.redis.sentinel.nodes
配置的是Sentinel節點IP地址和端口,不是Redis實例節點IP地址和端口。
Cluster配置如下
spring.redis.cluster.nodes=172.17.0.2:6379,172.17.0.3:6379,172.17.0.4:6379,172.17.0.5:6379,172.17.0.6:6379,172.17.0.7:6379
spring.redis.lettuce.cluster.refresh.period=10000
spring.redis.lettuce.cluster.refresh.adaptive=true
如Redis Cluster中node2是node1的從節點,Lettuce中會緩存該信息,當node1宕機后,Redis Cluster會將node2升級為主節點。但Lettuce不會自動將請求切換到node2,因為它的緩沖沒有刷新。
開啟spring.redis.lettuce.cluster.refresh.adaptive
配置,Lettuce可以定時刷新Redis Cluster集群緩存信息,動態改變客戶端的節點情況,完成故障轉移。
暫時未發現ReactiveRedisTemplate實現pipeline,事務的方案。
官方文檔:https://docs.spring.io/spring-data/redis/docs/current/reference/html/#redis:reactive
文章完整代碼:https://gitee.com/binecy/bin-springreactive/tree/master/user-service
如果您覺得本文不錯,歡迎關注我的微信公眾號,系列文章持續更新中。您的關注是我堅持的動力!