背景:
項目業務上需要實現延時發送消息的需求。最開始想到的就是消息中間件,公司統一用的RocketMq,於是開始整。。。但是,業務需求要求的延時消息時間是可自由指定的,但是公司居然用的是開源的RocketMq,開源的只支持18個固定級別的延時,
我們這里不重點說RocketMq,有興趣的自己查吧。結論就是開源的RocketMq沒法實現現在的需求,要不就用阿里雲的,公司也不願出那份錢吧。哈哈哈。於是想別的方法。
辦法二。那就只能定時輪詢配合實現了,查數據庫的話,會增加數據庫的壓力,效率也不好。於是使用Redis配合定時實現延時消費。
Redis實現代碼
使用zSet數據結構
生產者代碼:
//key:redis,zSet集合key
//msg:key的value值,存儲消息對象
//execTime:執行時間,時間戳
public void producerRedisDelayMsg(String key, Object msg, long execTime) { redisService.zSetAdd(key, msg, execTime); }
調用的zSetAdd方法實際就是對redis的操作,重點就是時間戳為score這個值,集合就是通過這個值進行排序的
public boolean zSetAdd(String key, Object value, long score) { boolean re; try { re = redisTemplate.opsForZSet().add(key, value, score); return re; } catch (Exception e) { return false; } }
至此,生產者就寫完了。接下來是消費者的實現
//key:key值,就是生產者的那個key值
//startTime,endTime:操作時間范圍,時間戳
public void consumerRedisDelayMsg(String key, long startTime, long endTime) {
//查詢符合時間條件下的集合
Set<Object> set = redisService.zSetRangeByScore(key, startTime, endTime);
if (CollectionUtils.isNotEmpty(set)) {
//具體業務操作
//移除集合
redisService.zSetRemoveRangeByScore(key, startTime, endTime);
}
}
具體看一下redis的兩個操作
zSetRangeByScore,實際就是查詢key集合下min-max范圍內的數據
public Set<Object> zSetRangeByScore(String key, double min, double max) { try { Set<Object> value = redisTemplate.opsForZSet().rangeByScore(key, min, max); return value; } catch (Exception e) { return null; } }
zSetRemoveRangeByScore,實際就是移除key集合下min-max范圍內的數據
public Long zSetRemoveRangeByScore(String key, double min, double max) { try { Long value = redisTemplate.opsForZSet().removeRangeByScore(key, min, max); return value; } catch (Exception e) { return null; } }
至此,有關redis的操作就結束了。定時的代碼,我們這里就不寫了,1秒輪詢一次。
最后總結一下,整體的流程。定時每秒輪詢一次,通過consumerRedisDelayMsg方法,首先查詢符合時間范圍內的集合數據,查詢出來進行相應的業務實現,然后將查出來的集合數據移除。這里有個重要的問題,就是時間范圍,開始值一般確定為0即可,結束值為當前系統時間。
生產者方法producerRedisDelayMsg比較重要的就是對execTime參數的理解,執行時間,即為redis集合中的score值,排序依據。
至此可以實現定時,Redis延時消費。