一.分布式延時任務
傳統做法是將延時任務插入數據庫,使用定時去掃描,比對任務是否到期,到期則執行並設置任務狀態為完成。這種做法在分布式環境下還需要對定時掃描做特殊處理(加分布式鎖)避免任務被重復執行。
然而使用RabbitMQ實現延時任務可以天然解決分布式環境下重復執行的問題(利用mq中消息只會被一個消費者消費這一特性可以讓延時任務只會被一個消費者執行)。基於RabbitMQ做延時任務的核心是利用RabbitMQ的消息到期轉發特性。發送消息時設置消息到期時間,等消息到期未被消費時會將消息轉發到一個新的隊列,新隊列的消費者收到消息后再處理,利用這種時間差特性實現任務的延時觸發。
二.准備RabbitMQ並設置延時任務用到的相關隊列
1.安裝erlang和RabbitMQ(注意erlang與RabbitMQ的版本對應關系)
2.開啟rabbitmq_management
打開RabbitMQ Command Prompt輸入命令:rabbitmq-plugins enable rabbitmq_management
3.創建兩個Exchange
創建一個Exchange用於添加延時任務,相關配置如下
再創建一個Exchange用於接收到期的延時任務,相關配置如下
4.創建兩個Queue
創建第一個Queue,用於添加延時任務,相關配置如下
上面配置創建了一個隊列q1,設置到期消息被轉移的目的地Exchange(dlx)和Route key(dlx_rk)
接下來配置q1綁定的Exchange為ExQ1,Route key為send
再創建第二個Queue,用於接收隊列q1中到期被轉移的任務,相關配置如下
並綁定到Exchange:dlx,Route key:dlx_rk
通過上面兩個Exchange和兩個Queue的配置,讓RabbitMQ支持q1中的消息到期后轉移到q2中。所以業務上我們只用將延時任務發送到q1,讓任務到期觸發執行的業務代碼去監聽(消費)q2。這樣基本上就實現了分布式環境下延時任務的創建以及到期調度觸發執行。
三.具體代碼實現
1.創建簡單maven項目,添加如下依賴
<dependencies> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.7.3</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.58</version> </dependency> </dependencies>
2.封裝用到的RabbitMQ操作
1 import com.rabbitmq.client.*; 2 3 import java.io.IOException; 4 import java.util.concurrent.TimeoutException; 5 6 /** 7 * 1.連接RabbitMQ 8 * 2.添加延時任務 9 * 3.消費延時任務 10 */ 11 public class RabbitMQUtil { 12 13 private static Connection conn; 14 private static Channel channel; 15 16 /** 17 * 初始化RabbitMQ連接與channel 18 */ 19 static { 20 ConnectionFactory factory = new ConnectionFactory(); 21 factory.setUsername("guest"); 22 factory.setPassword("guest"); 23 factory.setVirtualHost("/"); 24 factory.setHost("localhost"); 25 factory.setPort(5672); 26 27 try { 28 conn = factory.newConnection(); 29 channel = conn.createChannel(); 30 } catch (IOException e) { 31 System.out.println("獲取RabbitMQ連接失敗"); 32 } catch (TimeoutException e) { 33 System.out.println("獲取RabbitMQ連接超時"); 34 } 35 } 36 37 // public static void close() throws IOException, TimeoutException { 38 // if (Objects.nonNull(channel)) 39 // channel.close(); 40 // if (Objects.nonNull(conn)) 41 // conn.close(); 42 // } 43 44 /** 45 * 向指定exchange下route key發送延時任務 46 * @param msg 延時任務JSON bytes 47 * @param exchangeName 48 * @param routingKey 49 * @param expiration 延時時間 50 */ 51 public static void addTask(byte[] msg, String exchangeName, String routingKey, int expiration) { 52 try { 53 channel.basicPublish(exchangeName, routingKey, 54 new AMQP.BasicProperties.Builder() 55 .expiration(String.valueOf(expiration)) 56 .build(), msg); 57 } catch (IOException e) { 58 e.printStackTrace(); 59 } 60 } 61 62 /** 63 * 消費指定queue的消息(延時任務) 64 * @param queueName 65 * @param handler 任務處理器 66 * @param consumerTag 消費者標簽(多個消費者同時消息同一queue時可以使用consumerTag作區分) 67 */ 68 public static void bindConsumer(String queueName, DemoTaskHandler handler, String consumerTag) { 69 try { 70 channel.basicConsume(queueName, false, consumerTag, 71 new DefaultConsumer(channel) { 72 @Override 73 public void handleDelivery(String consumerTag, 74 Envelope envelope, 75 AMQP.BasicProperties properties, 76 byte[] body) 77 throws IOException { 78 long deliveryTag = envelope.getDeliveryTag(); 79 // (process the message components here ...) 80 handler.execute(body, consumerTag); 81 channel.basicAck(deliveryTag, false); // 應答,告知queue成功收到消息 82 } 83 }); 84 } catch (IOException e) { 85 e.printStackTrace(); 86 } 87 } 88 89 }
3.模擬延時任務POJO
1 import java.io.Serializable; 2 3 public class DemoTask implements Serializable { 4 5 private int id; 6 7 public int getId() { 8 return id; 9 } 10 11 public void setId(int id) { 12 this.id = id; 13 } 14 }
4.延時任務處理器
1 import com.alibaba.fastjson.JSON; 2 3 public class DemoTaskHandler { 4 5 public void execute(byte[] body, String consumerTag) { 6 DemoTask task = JSON.parseObject(new String(body), DemoTask.class); 7 System.out.println(consumerTag + "收到延時任務id:" + task.getId() + " 並處理完畢"); 8 } 9 }
5.設計一個主程序往q1隊列發送延時任務
1 import com.alibaba.fastjson.JSON; 2 3 import java.util.Scanner; 4 5 public class Producer { 6 7 public static void main(String[] args) { 8 // 添加延時任務 9 System.out.println("按下鍵盤添加延時任務"); 10 Scanner sc = new Scanner(System.in); 11 int i = 1; 12 while (sc.hasNextLine()) { 13 sc.nextLine(); 14 DemoTask bo = new DemoTask(); 15 bo.setId(i++); 16 RabbitMQUtil.addTask(JSON.toJSONString(bo).getBytes(), 17 "ExQ1", 18 "send", 19 10000); 20 System.out.println("成功添加一個延時任務"); 21 } 22 } 23 24 }
6.創建兩個消費者(處理延時任務的業務)消費延時任務,模擬分布式環境
1 public class Consumer1 { 2 3 public static void main(String[] args) { 4 // 模擬分布式環境,處理到期的延時任務 5 RabbitMQUtil.bindConsumer("q2", 6 new DemoTaskHandler(), 7 "consumer1"); 8 9 } 10 11 }
1 public class Consumer2 { 2 3 public static void main(String[] args) { 4 // 模擬分布式環境,處理到期的延時任務 5 RabbitMQUtil.bindConsumer("q2", 6 new DemoTaskHandler(), 7 "consumer2"); 8 } 9 }
7.運行Producer,Consumer1,Consumer2觀察結果
通過觀察發現,每次發送一個延時任務后,過10秒會被consumer1或者consumer2消費,以上就基本實現了分布式延時任務調度。