一、需求切入點
在公司做的一個系統業務需要有個定時提醒的功能(數據在mysql中),要求提醒的時間差精准到分鍾
解決方案有:
- 使用定時器,每分鍾執行一次,查符合提醒的數據,發起提醒(數據庫連接與系統的負載都承受不住的!!)
- 將待提醒數據提前查出存進redis中,根據提醒時間設置過期時間,做redis的過期監聽,監聽到過期的數據再做業務處理(優點 : 不用實時查數據庫,一定程度上減少系統壓力 缺點: 一旦系統重啟或者系統出現異常,可能導致一些過期的數據沒有監聽到,造成數據沒有推送)
- 使用一個延時隊列,利用redis的zset(sort set,有序不重復集合,關聯分數score進行排序),將提醒時間作為分數,提取符合條件的score對應的集合發起提醒(本文所述也是圍繞這個方案)
二、延時隊列的基本操作流程- 基本流程圖
- 代碼實現
生產者,只關心數據進隊列
- 基本流程圖
public class MessageProvider {
// 延時隊列的服務 通過這個bean來統一管理數據
private final DelayingQueueService delayingQueueService;
private static String APPOINTMENT_REMIND = "APPOINTMENT_REMIND";
/**
* 往隊列中添加消息
* @param messageContent
*/
public void sendMessage(String messageContent, long delay) {
try {
//業務代碼 ……
// 將分裝好的數據寫進隊列
delayingQueueService.push(queueMessage);
}
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 撤回消息 業務延伸
* @param members
*/
public void withdrawMessage(Long members){
delayingQueueService.removeByMembersId(members);
}
}
消費者,只關心需要消費的數據
// 從延伸隊列拉取符合消費的數據
List<QueueMessage> msgList = delayingQueueService.pull();
```
msgList.stream().forEach(msg -> {
// 拿出已經到期的預約提示 發起提醒
if (current >= msg.getDelayTime()) {
try {
// 進行業務消費 ……
//成功消費后移除消息
delayingQueueService.remove(msg);
}
```
延時隊列實現
public class DelayingQueueService {
private static ObjectMapper mapper = Jackson2ObjectMapperBuilder.json().build();
//key:membersId value:message
private static ConcurrentHashMap <Long,String> membersMap= new ConcurrentHashMap<>();
private final StringRedisTemplate redisTemplate;
/**
* 可以不同業務用不同的key
*/
public static final String QUEUE_NAME = "message:queue";
/**
* 鎖key
*/
public static final String LOCK_KEY="message_lock_key";
/**
* 插入消息
*
* @param queueMessage
* @return
*/
@SneakyThrows
public Boolean push(QueueMessage queueMessage) {
String messageStr = mapper.writeValueAsString(queueMessage);
Boolean addFlag = redisTemplate.opsForZSet().add(QUEUE_NAME, messageStr, queueMessage.getDelayTime());
membersMap.put(membersId,messageStr);
return addFlag;
}
/**
* 移除消息
*
* @param queueMessage
* @return
*/
@SneakyThrows
public Boolean remove(QueueMessage queueMessage) {
Long remove = redisTemplate.opsForZSet().remove(QUEUE_NAME, mapper.writeValueAsString(queueMessage));
if(remove>0){
membersMap.remove(membersId);
}
return remove > 0 ? true : false;
}
/**
* 拉取最新需要
* 被消費的消息
* rangeByScore 根據score范圍獲取 0-當前時間戳可以拉取當前時間及以前的需要被消費的消息
*
* @return
*/
public List<QueueMessage> pull() {
List<QueueMessage> msgList =new ArrayList<>();
try {
Set<String> strings = redisTemplate.opsForZSet().rangeByScore(QUEUE_NAME, 0, System.currentTimeMillis());
if (strings == null) {
return null;
}
msgList = strings.stream().map(msg -> {
QueueMessage message = null;
try {
message = mapper.readValue(msg, QueueMessage.class);
} catch (JsonProcessingException e) {
e.printStackTrace();
}
return message;
}).collect(Collectors.toList());
} catch (Exception e) {
log.error(e.toString());
}
return msgList;
}
//獲得鎖
public Boolean getLock(){
boolean lock = false;
//獲得鎖
lock = redisTemplate.opsForValue().setIfAbsent(LOCK_KEY,QUEUE_NAME+"is locking !",30, TimeUnit.SECONDS);
return lock;
}
public void releaseLock(){
redisTemplate.delete(LOCK_KEY);
}
}
三、結束語
本文所述的方法也是存在一些小的缺點,比如,數據的正常操作依賴於第三方組件,如果redis掛掉了,這個服務就down掉了,實現延時隊列的方法有很多種,基於業務與系統本身的情況,兼容利弊去做一些取舍,以達到最好的效果