一、應用場景
在需求開發過程中,我們經常會遇到一些類似下面的場景:
1)外賣訂單超過15分鍾未支付,自動取消
2)使用搶票軟件訂到車票后,1小時內未支付,自動取消
3)待處理申請超時1天,通知審核人員經理,超時2天通知審核人員總監
4)客戶預定自如房子后,24小時內未支付,房源自動釋放
那么針對這類場景的需求應該如果實現呢,我們最先想到的一般是啟個定時任務,來掃描數據庫里符合條件的數據,並對其進行更新操作。一般來說spring-quartz 、elasticjob 就可以實現,甚至自己寫個 Timer 也可以。但是這種方式有個弊端,就是需要不停的掃描數據庫,如果數據量比較大,並且任務執行間隔時間比較短,對數據庫會有一定的壓力。另外定時任務的執行間隔時間的粒度也不太好設置,設置長會影響時效性,設置太短又會增加服務壓力。我們來看一下有沒有更好的實現方式。
二、JDK 延時隊列實現
DelayQueue 是 JDK 中java.util.concurrent包下的一種無界阻塞隊列,底層是優先隊列PriorityQueue。對於放到隊列中的任務,可以按照到期時間進行排序,只需要取已經到期的元素處理即可。
具體的步驟是,要放入隊列的元素需要實現 Delayed 接口並實現 getDelay 方法來計算到期時間,compare方法來對比到期時間以進行排序。一個簡單的使用例子如下:
package com.lyqiang.delay.jdk; import java.time.LocalDateTime; import java.util.concurrent.DelayQueue; import java.util.concurrent.Delayed; import java.util.concurrent.TimeUnit; public class TestDelayQueue { public static void main(String[] args) throws InterruptedException { // 新建3個任務,並依次設置超時時間為 20s 10s 30s DelayTask d1 = new DelayTask(1, System.currentTimeMillis() + 20000L); DelayTask d2 = new DelayTask(2, System.currentTimeMillis() + 10000L); DelayTask d3 = new DelayTask(3, System.currentTimeMillis() + 30000L); DelayQueue<DelayTask> queue = new DelayQueue<>(); queue.add(d1); queue.add(d2); queue.add(d3); int size = queue.size(); System.out.println("當前時間是:" + LocalDateTime.now()); // 從延時隊列中獲取元素, 將輸出 d2 、d1 、d3 for (int i = 0; i < size; i++) { System.out.println(queue.take() + " ------ " + LocalDateTime.now()); } } } class DelayTask implements Delayed { private Integer taskId; private long exeTime; DelayTask(Integer taskId, long exeTime) { this.taskId = taskId; this.exeTime = exeTime; } @Override public long getDelay(TimeUnit unit) { return exeTime - System.currentTimeMillis(); } @Override public int compareTo(Delayed o) { DelayTask t = (DelayTask) o; if (this.exeTime - t.exeTime <= 0) { return -1; } else { return 1; } } @Override public String toString() { return "DelayTask{" + "taskId=" + taskId + ", exeTime=" + exeTime + '}'; } }
使用 DelayQueue, 只需要有一個線程不斷從隊列中獲取數據即可,它的優點是不用引入第三方依賴,實現也很簡單,缺點也很明顯,它是內存存儲,對分布式支持不友好,如果發生單點故障,可能會造成數據丟失,無界隊列還存在 OOM 的風險。
三、時間輪算法實現
1996 年 George Varghese 和 Tony Lauck 的論文《Hashed and Hierarchical Timing Wheels: Data Structures for the Efficient Implementation of a Timer Facility》中提出了一種時間輪管理 Timeout 事件的方式。其設計非常巧妙,並且類似時鍾的運行,如下圖的原始時間輪有 8 個格子,假定指針經過每個格子花費時間是 1 個時間單位,當前指針指向 0,一個 17 個時間單位后超時的任務則需要運轉 2 圈再通過一個格子后被執行,放在相同格子的任務會形成一個鏈表。
Netty 包里提供了一種時間輪的實現——HashedWheelTimer,其底層使用了數組+鏈表的數據結構,使用方式如下:
<!-- https://mvnrepository.com/artifact/io.netty/netty-common --> <dependency> <groupId>io.netty</groupId> <artifactId>netty-common</artifactId> <version>4.1.48.Final</version> </dependency>
代碼如下:
package com.lyqiang.delay.wheeltimer; import io.netty.util.HashedWheelTimer; import java.time.LocalDateTime; import java.util.concurrent.TimeUnit; public class WheelTimerTest { public static void main(String[] args) { //設置每個格子是 100ms, 總共 256 個格子 HashedWheelTimer hashedWheelTimer = new HashedWheelTimer(100, TimeUnit.MILLISECONDS, 256); //加入三個任務,依次設置超時時間是 10s 5s 20s System.out.println("加入一個任務,ID = 1, time= " + LocalDateTime.now()); hashedWheelTimer.newTimeout(timeout -> { System.out.println("執行一個任務,ID = 1, time= " + LocalDateTime.now()); }, 10, TimeUnit.SECONDS); System.out.println("加入一個任務,ID = 2, time= " + LocalDateTime.now()); hashedWheelTimer.newTimeout(timeout -> { System.out.println("執行一個任務,ID = 2, time= " + LocalDateTime.now()); }, 5, TimeUnit.SECONDS); System.out.println("加入一個任務,ID = 3, time= " + LocalDateTime.now()); hashedWheelTimer.newTimeout(timeout -> { System.out.println("執行一個任務,ID = 3, time= " + LocalDateTime.now()); }, 20, TimeUnit.SECONDS); System.out.println("等待任務執行==========="); } }
四、Redis 實現延遲任務
使用 Redis 實現延遲任務的方法大體可分為兩類:通過 zset 數據判斷的方式,和通過鍵空間通知的方式。
1、Redis ZSet 實現
Redis 里有 5 種數據結構,最常用的是 String 和 Hash,而 ZSet 是一種支持按 score 排序的數據結構,每個元素都會關聯一個 double 類型的分數,Redis 通過分數來為集合中的成員進行從小到大的排序,借助這個特性我們可以把超時時間作為 score 來將任務進行排序。
使用 zadd key score member 命令向 redis 中放入任務,超時時間作為 score, 任務 ID 作為 member, 使用 zrange key start stop withscores 命令從 redis 中讀取任務,使用 zrem key member 命令從 redis 中刪除任務。代碼如下:
package com.lyqiang.delay.redis; import java.time.LocalDateTime; import java.util.Set; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; public class TestRedisDelay { public static void main(String[] args) { TaskProducer taskProducer = new TaskProducer(); //創建 3個任務,並設置超時間為 10s 5s 20s taskProducer.produce(1, System.currentTimeMillis() + 10000); taskProducer.produce(2, System.currentTimeMillis() + 5000); taskProducer.produce(3, System.currentTimeMillis() + 20000); System.out.println("等待任務執行==========="); //消費端從redis中消費任務 TaskConsumer taskConsumer = new TaskConsumer(); taskConsumer.consumer(); } } class TaskProducer { public void produce(Integer taskId, long exeTime) { System.out.println("加入任務, taskId: " + taskId + ", exeTime: " + exeTime + ", 當前時間:" + LocalDateTime.now()); RedisOps.getJedis().zadd(RedisOps.key, exeTime, String.valueOf(taskId)); } } class TaskConsumer { public void consumer() { Executors.newSingleThreadExecutor().submit(new Runnable() { @Override public void run() { while (true) { Set<String> taskIdSet = RedisOps.getJedis().zrangeByScore(RedisOps.key, 0, System.currentTimeMillis(), 0, 1); if (taskIdSet == null || taskIdSet.isEmpty()) { //System.out.println("沒有任務"); } else { taskIdSet.forEach(id -> { long result = RedisOps.getJedis().zrem(RedisOps.key, id); if (result == 1L) { System.out.println("從延時隊列中獲取到任務,taskId:" + id + " , 當前時間:" + LocalDateTime.now()); } }); } try { TimeUnit.MILLISECONDS.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } } }); } }
2、Redis 鍵過期通知
默認情況下 Redis 服務器端是不開啟鍵過期通知的,需要我們通過 config set notify-keyspace-events Ex 的命令手動開啟,開啟鍵過期通知后,我們就可以拿到每個鍵值過期的事件,我們利用這個機制實現了給每個人開啟一個定時任務的功能,實現代碼如下:
import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPubSub; import utils.JedisUtils; public class TaskExample { public static final String _TOPIC = "__keyevent@0__:expired"; // 訂閱頻道名稱 public static void main(String[] args) { Jedis jedis = JedisUtils.getJedis(); // 執行定時任務 doTask(jedis); } /** * 訂閱過期消息,執行定時任務 * @param jedis Redis 客戶端 */ public static void doTask(Jedis jedis) { // 訂閱過期消息 jedis.psubscribe(new JedisPubSub() { @Override public void onPMessage(String pattern, String channel, String message) { // 接收到消息,執行定時任務 System.out.println("收到消息:" + message); } }, _TOPIC); } }
使用 Redis 可以將數據持久化到磁盤,規避了數據丟失的風險,並且支持分布式,避免了單點故障。
3、Redisson中的DelayedQueue
Redisson將Redis ZSet 實現封裝了一層,更方便的使用了,示例如下:
maven依賴:
<dependency> <groupId>org.redisson</groupId> <artifactId>redisson</artifactId> <version>3.8.1</version> </dependency>
代理示例:
@Data @NoArgsConstructor @AllArgsConstructor public class Employer { private String name; private int age; private String wife; private Double salary; private String putTime; public void setPutTime() { this.putTime = new SimpleDateFormat("hh:mm:ss").format(new Date()); } } import org.redisson.Redisson; import org.redisson.api.RBlockingQueue; import org.redisson.api.RDelayedQueue; import org.redisson.api.RedissonClient; import org.redisson.config.Config; import java.util.concurrent.TimeUnit; //延遲消息生產者 public class RedisPutInQueue { public static void main(String args[]) { Config config = new Config(); config.useSingleServer().setAddress("redis://127.0.0.1:6379"); RedissonClient redissonClient = Redisson.create(config); RBlockingQueue<Employer> blockingFairQueue = redissonClient.getBlockingQueue("delay_queue"); RDelayedQueue<Employer> delayedQueue = redissonClient.getDelayedQueue(blockingFairQueue); for (int i = 0; i < 10; i++) { try { //模擬間隔投遞消息 Thread.sleep(1 * 1000); } catch (InterruptedException e) { e.printStackTrace(); } //一分鍾以后將消息發送到指定隊列 //延遲隊列包含callCdr 1分鍾,然后將其傳輸到blockingFairQueue中 //在1分鍾后就可以在blockingFairQueue 中獲取callCdr了 Employer callCdr = new Employer(); callCdr.setSalary(345.6); callCdr.setPutTime(); delayedQueue.offer(callCdr, 1, TimeUnit.MINUTES); System.out.println("callCdr ==================> " + callCdr); } } } import org.redisson.Redisson; import org.redisson.api.RBlockingQueue; import org.redisson.api.RDelayedQueue; import org.redisson.api.RedissonClient; import org.redisson.config.Config; import java.text.SimpleDateFormat; import java.util.Date; //延遲消息消費者 public class RedisOutFromQueue { public static void main(String args[]) { Config config = new Config(); config.useSingleServer().setAddress("redis://127.0.0.1:6379"); RedissonClient redissonClient = Redisson.create(config); RBlockingQueue<Employer> blockingFairQueue = redissonClient.getBlockingQueue("delay_queue"); RDelayedQueue<Employer> delayedQueue = redissonClient.getDelayedQueue(blockingFairQueue); while (true) { Employer callCdr = null; try { callCdr = blockingFairQueue.take(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("訂單取消時間:" + new SimpleDateFormat("hh:mm:ss").format(new Date()) + "==訂單生成時間" + callCdr.getPutTime()); } } }
五、MQ 延時隊列實現
以 RabbitMQ 為例,它本身並沒有直接支持延時隊列的功能,但是通過一些特性,我們可以達到實現延時隊列的效果。
RabbitMQ 可以為 Queue 設置 TTL,,到了過期時間沒有被消費的消息將變為死信——Dead Letter。我們還可以為Queue 設置死信轉發 x-dead-letter-exchange,過期的消息可以被路由到另一個 Exchange。下圖說明了這個流程,生產者通過不同的 RoutingKey 發送不同過期時間的消息,多個隊列分別消費並產生死信后被路由到 exe-dead-exchange,再有一些隊列綁定到這個 exchange,從而進行不同業務邏輯的消費。
實現參考:
<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.5.0</version> </dependency>
代碼:
public class Main { static ConnectionFactory connectionFactory; static Connection connection; static { connectionFactory = new ConnectionFactory(); connectionFactory.setHost("localhost"); try { connection = connectionFactory.newConnection(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } public static void main(String[] args) throws Exception { producer(); Thread thread = new Thread(() -> { try { consume(); } catch (Exception e) { e.printStackTrace(); } }); thread.start(); } private static void producer() throws Exception { Channel channel = connection.createChannel();//創建一個channel,不管是生產數據,還是消費數據,都是通過channel去操作的 channel.exchangeDeclare("orderExchange", "direct", true);//定義一個交換機,路由類型為direct,所有的訂單會塞給此交換機 channel.exchangeDeclare("orderDelayExchange", "direct", true);//定義一個交換機,路由類型為direct,延遲的訂單會塞給此交換機 HashMap<String, Object> arguments = new HashMap<String, Object>(); arguments.put("x-dead-letter-exchange", "orderDelayExchange");//申明死信交換機是名稱為orderDelayExchange的交換機 channel.queueDeclare("order_queue", true, false, false, arguments);//定義一個名稱為order_queue的隊列,綁定上面定義的參數,這樣就告訴rabbit此隊列延遲的消息,發送給orderDelayExchange交換機 channel.queueDeclare("order_delay_queue", true, false, false, null);//定義一個名稱為order_delay_queue的隊列 channel.queueBind("order_queue", "orderExchange", "delay");//order_queue和orderExchange綁定,路由為delay。路由也為delay的消息會通過orderExchange進入到order_queue隊列 channel.queueBind("order_delay_queue", "orderDelayExchange", "delay");//order_delay_queue和orderDelayExchange綁定 AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder(); builder.expiration("15000");//設置消息TTL(消息生存時間) builder.deliveryMode(2);//設置消息持久化 AMQP.BasicProperties properties = builder.build(); Thread productThread = new Thread(() -> { for (int i = 0; i < 20; i++) { String order = "order" + i; try { channel.basicPublish("orderExchange", "delay", properties, order.getBytes());//通過channel,向orderExchange交換機發送路由為delay的消息,這樣就可以進入到order_queue隊列 String str = "現在時間是" + new Date().toString() + " " + order + " 的消息產生了"; System.out.println(str); } catch (IOException e) { e.printStackTrace(); } } try { channel.close(); } catch (Exception ex) { } }); productThread.start(); } private static void consume() throws Exception { Channel channel = connection.createChannel();//創建一個channel,不管是生產數據,還是消費數據,都是通過channel去操作的 //消費名稱為order_delay_queue的隊列,且關閉自動應答,需要手動應答 channel.basicConsume("order_delay_queue", false, new DefaultConsumer(channel) { public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { long deliveryTag = envelope.getDeliveryTag();//消息的標記,應答的時候需要傳入這個參數 String str = "現在時間是" + new Date().toString() + " " + new String(body) + " 的消息消費了"; System.out.println(str); channel.basicAck(deliveryTag, false);//手動應答,代表這個消息處理完成了 } }); } }
六、總結
通過上面不同實現方式的比較,可以很明顯的看出各個方案的優缺點,在分布式系統中我們會優先考慮使用 Redis 和 MQ 的實現方式。
在需求開發中實現一個功能的方式多種多樣,需要我們進行多維度的比較,才能選擇出合理的、可靠的、高效的並且適合自己業務的解決方案。