Rabbitmq 定時任務


寫的很棒,轉載一下:Rabbitmq 延遲隊列實現定時任務,這才是正確的方式! - Java技術棧 - 博客園

 

場景

開發中經常需要用到定時任務,對於商城來說,定時任務尤其多,比如優惠券定時過期、訂單定時關閉、微信支付2小時未支付關閉訂單等等,都需要用到定時任務,但是定時任務本身有一個問題。

一般來說我們都是通過定時輪詢查詢數據庫來判斷是否有任務需要執行,也就是說不管怎么樣,我們需要先查詢數據庫,而且有些任務對時間准確要求比較高的,需要每秒查詢一次,對於系統小倒是無所謂,如果系統本身就大而且數據也多的情況下,這就不大現實了,所以需要其他方式的,當然實現的方式有多種多樣的,比如Redis實現定時隊列、基於優先級隊列的JDK延遲隊列、時間輪等。

因為我們項目中本身就使用到了Rabbitmq,所以基於方便開發和維護的原則,我們使用了Rabbitmq延遲隊列來實現定時任務,不知道rabbitmq是什么的和不知道springboot怎么集成Rabbitmq的可以查看公眾號Java技術棧之前的文章Spring boot集成RabbitMQ。

Spring Boot 基礎教程和示例代碼:https://github.com/javastacks/spring-boot-best-practice

Rabbitmq延遲隊列

Rabbitmq本身是沒有延遲隊列的,只能通過Rabbitmq本身隊列的特性來實現,想要Rabbitmq實現延遲隊列,需要使用Rabbitmq的死信交換機(Exchange)和消息的存活時間TTL(Time To Live)

死信交換機

一個消息在滿足如下條件下,會進死信交換機,記住這里是交換機而不是隊列,一個交換機可以對應很多隊列。

一個消息被Consumer拒收了,並且reject方法的參數里requeue是false。也就是說不會被再次放在隊列里,被其他消費者使用。

上面的消息的TTL到了,消息過期了。

隊列的長度限制滿了。排在前面的消息會被丟棄或者扔到死信路由上。

死信交換機就是普通的交換機,只是因為我們把過期的消息扔進去,所以叫死信交換機,並不是說死信交換機是某種特定的交換機

消息TTL(消息存活時間)

消息的TTL就是消息的存活時間。RabbitMQ可以對隊列和消息分別設置TTL。對隊列設置就是隊列沒有消費者連着的保留時間,也可以對每一個單獨的消息做單獨的設置。超過了這個時間,我們認為這個消息就死了,稱之為死信。如果隊列設置了,消息也設置了,那么會取小的。所以一個消息如果被路由到不同的隊列中,這個消息死亡的時間有可能不一樣(不同的隊列設置)。這里單講單個消息的TTL,因為它才是實現延遲任務的關鍵。

byte[] messageBodyBytes = "Hello, world!".getBytes();
AMQP.BasicProperties properties = new AMQP.BasicProperties(); properties.setExpiration("60000");
channel.basicPublish("my-exchange", "queue-key", properties, messageBodyBytes);

可以通過設置消息的expiration字段或者x-message-ttl屬性來設置時間,兩者是一樣的效果。只是expiration字段是字符串參數,所以要寫個int類型的字符串:當上面的消息扔到隊列中后,過了60秒,如果沒有被消費,它就死了。不會被消費者消費到。這個消息后面的,沒有“死掉”的消息對頂上來,被消費者消費。死信在隊列中並不會被刪除和釋放,它會被統計到隊列的消息數中去

處理流程圖

創建交換機(Exchanges)和隊列(Queues)

創建死信交換機

如圖所示,就是創建一個普通的交換機,這里為了方便區分,把交換機的名字取為:delay

創建自動過期消息隊列

這個隊列的主要作用是讓消息定時過期的,比如我們需要2小時候關閉訂單,我們就需要把消息放進這個隊列里面,把消息過期時間設置為2小時

創建一個一個名為delay_queue1的自動過期的隊列,當然圖片上面的參數並不會讓消息自動過期,因為我們並沒有設置x-message-ttl參數,如果整個隊列的消息有消息都是相同的,可以設置,這里為了靈活,所以並沒有設置,另外兩個參數x-dead-letter-exchange代表消息過期后,消息要進入的交換機,這里配置的是delay,也就是死信交換機,x-dead-letter-routing-key是配置消息過期后,進入死信交換機的routing-key,跟發送消息的routing-key一個道理,根據這個key將消息放入不同的隊列

創建消息處理隊列

這個隊列才是真正處理消息的隊列,所有進入這個隊列的消息都會被處理

消息隊列的名字為delay_queue2

消息隊列綁定到交換機

進入交換機詳情頁面,將創建的2個隊列(delayqueue1和delayqueue2)綁定到交換機上面

自動過期消息隊列的routing key 設置為delay

綁定delayqueue2

delay*queue2 的key要設置為創建自動過期的隊列的x-dead-letter-routing-key參數,這樣當消息過期的時候就可以自動把消息放入delay_queue2這個隊列中了

綁定后的管理頁面如下圖:

當然這個綁定也可以使用代碼來實現,只是為了直觀表現,所以本文使用的管理平台來操作

發送消息

String msg = "hello word";  MessageProperties messageProperties = newMessageProperties();          messageProperties.setExpiration("6000");        messageProperties.setCorrelationId(UUID.randomUUID().toString().getBytes());        Message message = newMessage(msg.getBytes(), messageProperties);        rabbitTemplate.convertAndSend("delay", "delay",message);

主要的代碼就是

messageProperties.setExpiration("6000");

設置了讓消息6秒后過期

注意:因為要讓消息自動過期,所以一定不能設置delay_queue1的監聽,不能讓這個隊列里面的消息被接受到,否則消息一旦被消費,就不存在過期了

接收消息

接收消息配置好delay_queue2的監聽就好了

package wang.raye.rabbitmq.demo1;

import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class DelayQueue {
 /** 消息交換機的名字*/
 public static final String EXCHANGE = "delay";
 /** 隊列key1*/
 public static final String ROUTINGKEY1 = "delay";
 /** 隊列key2*/
 public static final String ROUTINGKEY2 = "delay_key";

 /** * 配置鏈接信息 * @return */
 @Bean
 public ConnectionFactory connectionFactory() {
  CachingConnectionFactory connectionFactory = new CachingConnectionFactory("120.76.237.8",5672);

  connectionFactory.setUsername("kberp");
  connectionFactory.setPassword("kberp");
  connectionFactory.setVirtualHost("/");
  connectionFactory.setPublisherConfirms(true); // 必須要設置
  return connectionFactory;
 }

 /** * 配置消息交換機 * 針對消費者配置 FanoutExchange: 將消息分發到所有的綁定隊列,無routingkey的概念 HeadersExchange :通過添加屬性key-value匹配 DirectExchange:按照routingkey分發到指定隊列 TopicExchange:多關鍵字匹配 */
    @Bean
    public DirectExchange defaultExchange() {
     return new DirectExchange(EXCHANGE, true, false);
    }

    /** * 配置消息隊列2 * 針對消費者配置 * @return */
    @Bean
    public Queue queue() {
       return new Queue("delay_queue2", true); //隊列持久

    }
    /** * 將消息隊列2與交換機綁定 * 針對消費者配置 * @return */
    @Bean
    @Autowired
    public Binding binding() {
        return BindingBuilder.bind(queue()).to(defaultExchange()).with(DelayQueue.ROUTINGKEY2);
    }

    /** * 接受消息的監聽,這個監聽會接受消息隊列1的消息 * 針對消費者配置 * @return */
    @Bean
    @Autowired
    public SimpleMessageListenerContainer messageContainer2(ConnectionFactory connectionFactory) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory());
        container.setQueues(queue());
        container.setExposeListenerChannel(true);
        container.setMaxConcurrentConsumers(1);
        container.setConcurrentConsumers(1);
        container.setAcknowledgeMode(AcknowledgeMode.MANUAL); //設置確認模式手工確認
        container.setMessageListener(new ChannelAwareMessageListener() {

   public void onMessage(Message message, com.rabbitmq.client.Channel channel) throws Exception {
    byte[] body = message.getBody();
                System.out.println("delay_queue2 收到消息 : " + new String(body));
                channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); //確認消息成功消費

   }

        });
        return container;
    }


}

在消息監聽中處理需要定時處理的任務就好了,因為Rabbitmq能發送消息,所以可以把任務特征碼發過來,比如關閉訂單就把訂單id發過來,這樣就避免了需要查詢一下那些訂單需要關閉而加重MySQL負擔了,畢竟一旦訂單量大的話,查詢本身也是一件很費IO的事情

總結

基於Rabbitmq實現定時任務,就是將消息設置一個過期時間,放入一個沒有讀取的隊列中,讓消息過期后自動轉入另外一個隊列中,監控這個隊列消息的監聽處來處理定時任務具體的操作。

寫的很棒,轉載一下:Rabbitmq 延遲隊列實現定時任務,這才是正確的方式! - Java技術棧 - 博客園

場景

開發中經常需要用到定時任務,對於商城來說,定時任務尤其多,比如優惠券定時過期、訂單定時關閉、微信支付2小時未支付關閉訂單等等,都需要用到定時任務,但是定時任務本身有一個問題。

一般來說我們都是通過定時輪詢查詢數據庫來判斷是否有任務需要執行,也就是說不管怎么樣,我們需要先查詢數據庫,而且有些任務對時間准確要求比較高的,需要每秒查詢一次,對於系統小倒是無所謂,如果系統本身就大而且數據也多的情況下,這就不大現實了,所以需要其他方式的,當然實現的方式有多種多樣的,比如Redis實現定時隊列、基於優先級隊列的JDK延遲隊列、時間輪等。

因為我們項目中本身就使用到了Rabbitmq,所以基於方便開發和維護的原則,我們使用了Rabbitmq延遲隊列來實現定時任務,不知道rabbitmq是什么的和不知道springboot怎么集成Rabbitmq的可以查看公眾號Java技術棧之前的文章Spring boot集成RabbitMQ。

Spring Boot 基礎教程和示例代碼:GitHub - javastacks/spring-boot-best-practice: Spring Boot 最佳實踐,包括自動配置、核心原理、源碼分析、國際化支持、調試、日志集成、熱部署等。

Rabbitmq延遲隊列

Rabbitmq本身是沒有延遲隊列的,只能通過Rabbitmq本身隊列的特性來實現,想要Rabbitmq實現延遲隊列,需要使用Rabbitmq的死信交換機(Exchange)和消息的存活時間TTL(Time To Live)

死信交換機

一個消息在滿足如下條件下,會進死信交換機,記住這里是交換機而不是隊列,一個交換機可以對應很多隊列。

一個消息被Consumer拒收了,並且reject方法的參數里requeue是false。也就是說不會被再次放在隊列里,被其他消費者使用。

上面的消息的TTL到了,消息過期了。

隊列的長度限制滿了。排在前面的消息會被丟棄或者扔到死信路由上。

死信交換機就是普通的交換機,只是因為我們把過期的消息扔進去,所以叫死信交換機,並不是說死信交換機是某種特定的交換機

消息TTL(消息存活時間)

消息的TTL就是消息的存活時間。RabbitMQ可以對隊列和消息分別設置TTL。對隊列設置就是隊列沒有消費者連着的保留時間,也可以對每一個單獨的消息做單獨的設置。超過了這個時間,我們認為這個消息就死了,稱之為死信。如果隊列設置了,消息也設置了,那么會取小的。所以一個消息如果被路由到不同的隊列中,這個消息死亡的時間有可能不一樣(不同的隊列設置)。這里單講單個消息的TTL,因為它才是實現延遲任務的關鍵。

  1.  
    byte[] messageBodyBytes = "Hello, world!".getBytes();
  2.  
    AMQP.BasicProperties properties = new AMQP.BasicProperties();
  3.  
    properties.setExpiration( "60000");
  4.  
    channel.basicPublish( "my-exchange", "queue-key", properties, messageBodyBytes);

可以通過設置消息的expiration字段或者x-message-ttl屬性來設置時間,兩者是一樣的效果。只是expiration字段是字符串參數,所以要寫個int類型的字符串:當上面的消息扔到隊列中后,過了60秒,如果沒有被消費,它就死了。不會被消費者消費到。這個消息后面的,沒有“死掉”的消息對頂上來,被消費者消費。死信在隊列中並不會被刪除和釋放,它會被統計到隊列的消息數中去

處理流程圖

創建交換機(Exchanges)和隊列(Queues)

創建死信交換機

如圖所示,就是創建一個普通的交換機,這里為了方便區分,把交換機的名字取為:delay

創建自動過期消息隊列

這個隊列的主要作用是讓消息定時過期的,比如我們需要2小時候關閉訂單,我們就需要把消息放進這個隊列里面,把消息過期時間設置為2小時

創建一個一個名為delay_queue1的自動過期的隊列,當然圖片上面的參數並不會讓消息自動過期,因為我們並沒有設置x-message-ttl參數,如果整個隊列的消息有消息都是相同的,可以設置,這里為了靈活,所以並沒有設置,另外兩個參數x-dead-letter-exchange代表消息過期后,消息要進入的交換機,這里配置的是delay,也就是死信交換機,x-dead-letter-routing-key是配置消息過期后,進入死信交換機的routing-key,跟發送消息的routing-key一個道理,根據這個key將消息放入不同的隊列

創建消息處理隊列

這個隊列才是真正處理消息的隊列,所有進入這個隊列的消息都會被處理

消息隊列的名字為delay_queue2

消息隊列綁定到交換機

進入交換機詳情頁面,將創建的2個隊列(delayqueue1和delayqueue2)綁定到交換機上面

自動過期消息隊列的routing key 設置為delay

綁定delayqueue2

delay*queue2 的key要設置為創建自動過期的隊列的x-dead-letter-routing-key參數,這樣當消息過期的時候就可以自動把消息放入delay_queue2這個隊列中了

綁定后的管理頁面如下圖:

當然這個綁定也可以使用代碼來實現,只是為了直觀表現,所以本文使用的管理平台來操作

發送消息

String msg = "hello word";  MessageProperties messageProperties = newMessageProperties();          messageProperties.setExpiration("6000");        messageProperties.setCorrelationId(UUID.randomUUID().toString().getBytes());        Message message = newMessage(msg.getBytes(), messageProperties);        rabbitTemplate.convertAndSend("delay", "delay",message);

主要的代碼就是

messageProperties.setExpiration("6000");

設置了讓消息6秒后過期

注意:因為要讓消息自動過期,所以一定不能設置delay_queue1的監聽,不能讓這個隊列里面的消息被接受到,否則消息一旦被消費,就不存在過期了

接收消息

接收消息配置好delay_queue2的監聽就好了

  1.  
    package wang.raye.rabbitmq.demo1;
  2.  
     
  3.  
    import org.springframework.amqp.core.AcknowledgeMode;
  4.  
    import org.springframework.amqp.core.Binding;
  5.  
    import org.springframework.amqp.core.BindingBuilder;
  6.  
    import org.springframework.amqp.core.DirectExchange;
  7.  
    import org.springframework.amqp.core.Message;
  8.  
    import org.springframework.amqp.core.Queue;
  9.  
    import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
  10.  
    import org.springframework.amqp.rabbit.connection.ConnectionFactory;
  11.  
    import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
  12.  
    import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
  13.  
    import org.springframework.beans.factory.annotation.Autowired;
  14.  
    import org.springframework.context.annotation.Bean;
  15.  
    import org.springframework.context.annotation.Configuration;
  16.  
     
  17.  
    @Configuration
  18.  
    public class DelayQueue {
  19.  
    /** 消息交換機的名字*/
  20.  
    public static final String EXCHANGE = "delay";
  21.  
    /** 隊列key1*/
  22.  
    public static final String ROUTINGKEY1 = "delay";
  23.  
    /** 隊列key2*/
  24.  
    public static final String ROUTINGKEY2 = "delay_key";
  25.  
     
  26.  
    /**
  27.  
    * 配置鏈接信息
  28.  
    * @return
  29.  
    */
  30.  
    @Bean
  31.  
    public ConnectionFactory connectionFactory() {
  32.  
    CachingConnectionFactory connectionFactory = new CachingConnectionFactory( "120.76.237.8", 5672);
  33.  
     
  34.  
    connectionFactory.setUsername( "kberp");
  35.  
    connectionFactory.setPassword( "kberp");
  36.  
    connectionFactory.setVirtualHost( "/");
  37.  
    connectionFactory.setPublisherConfirms( true); // 必須要設置
  38.  
    return connectionFactory;
  39.  
    }
  40.  
     
  41.  
    /**
  42.  
    * 配置消息交換機
  43.  
    * 針對消費者配置
  44.  
    FanoutExchange: 將消息分發到所有的綁定隊列,無routingkey的概念
  45.  
    HeadersExchange :通過添加屬性key-value匹配
  46.  
    DirectExchange:按照routingkey分發到指定隊列
  47.  
    TopicExchange:多關鍵字匹配
  48.  
    */
  49.  
    @Bean
  50.  
    public DirectExchange defaultExchange() {
  51.  
    return new DirectExchange(EXCHANGE, true, false);
  52.  
    }
  53.  
     
  54.  
    /**
  55.  
    * 配置消息隊列2
  56.  
    * 針對消費者配置
  57.  
    * @return
  58.  
    */
  59.  
    @Bean
  60.  
    public Queue queue() {
  61.  
    return new Queue( "delay_queue2", true); //隊列持久
  62.  
     
  63.  
    }
  64.  
    /**
  65.  
    * 將消息隊列2與交換機綁定
  66.  
    * 針對消費者配置
  67.  
    * @return
  68.  
    */
  69.  
    @Bean
  70.  
    @Autowired
  71.  
    public Binding binding() {
  72.  
    return BindingBuilder.bind(queue()).to(defaultExchange()).with(DelayQueue.ROUTINGKEY2);
  73.  
    }
  74.  
     
  75.  
    /**
  76.  
    * 接受消息的監聽,這個監聽會接受消息隊列1的消息
  77.  
    * 針對消費者配置
  78.  
    * @return
  79.  
    */
  80.  
    @Bean
  81.  
    @Autowired
  82.  
    public SimpleMessageListenerContainer messageContainer2(ConnectionFactory connectionFactory) {
  83.  
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory());
  84.  
    container.setQueues(queue());
  85.  
    container.setExposeListenerChannel( true);
  86.  
    container.setMaxConcurrentConsumers( 1);
  87.  
    container.setConcurrentConsumers( 1);
  88.  
    container.setAcknowledgeMode(AcknowledgeMode.MANUAL); //設置確認模式手工確認
  89.  
    container.setMessageListener( new ChannelAwareMessageListener() {
  90.  
     
  91.  
    public void onMessage(Message message, com.rabbitmq.client.Channel channel) throws Exception {
  92.  
    byte[] body = message.getBody();
  93.  
    System.out.println( "delay_queue2 收到消息 : " + new String(body));
  94.  
    channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); //確認消息成功消費
  95.  
     
  96.  
    }
  97.  
     
  98.  
    });
  99.  
    return container;
  100.  
    }
  101.  
     
  102.  
     
  103.  
    }

在消息監聽中處理需要定時處理的任務就好了,因為Rabbitmq能發送消息,所以可以把任務特征碼發過來,比如關閉訂單就把訂單id發過來,這樣就避免了需要查詢一下那些訂單需要關閉而加重MySQL負擔了,畢竟一旦訂單量大的話,查詢本身也是一件很費IO的事情

總結

基於Rabbitmq實現定時任務,就是將消息設置一個過期時間,放入一個沒有讀取的隊列中,讓消息過期后自動轉入另外一個隊列中,監控這個隊列消息的監聽處來處理定時任務具體的操作。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM