利用延遲消息隊列取代定時任務


§1 RabbitMQ延遲隊列

RabbitMQ延遲隊列,主要是借助消息的TTL(Time to Live)和死信exchange(Dead Letter Exchanges)來實現。

涉及到2個隊列,一個用於發送消息,一個用於消息過期后的轉發目標隊列。

 

本例中, 定義2組exchange和queue。

agentpayquery1exchange		agentpayquery1queue(routingkey為delay)
agentpayquery2exchange		agentpayquery2queue(routingkey為delay)
agentpayquery1queue是緩沖隊列,消息過期路由到agentpayquery2queue

 

 

 

§2 生產者

生產者配置:

<!-- 連接服務配置 -->
<rabbit:connection-factory
        id="connectionFactoryProducer"
        addresses="${mq.ip}"    //192.168.40.40:5672
        username="${username}"
        password="${password}"
        channel-cache-size="${cache.size}"
        publisher-confirms="${publisher.confirms}"
        publisher-returns="${publisher.returns}"
        virtual-host="/"
/>

<!--通過指定下面的admin信息,當前producer中的exchange和queue會在rabbitmq服務器上自動生成 -->
<rabbit:admin connection-factory="connectionFactory"/>

<!--========================出款查詢 延遲隊列配置 begin =========================-->
<rabbit:queue id="agentpayquery2queue" durable="true" auto-delete="false" exclusive="false" name="agentpayquery2queue"/>
<rabbit:direct-exchange name="agentpayquery2exchange" durable="true" auto-delete="false" id="agentpayquery2exchange">
    <rabbit:bindings>
        <rabbit:binding queue="agentpayquery2queue" key="delay" />
    </rabbit:bindings>
</rabbit:direct-exchange>


<rabbit:queue id="agentpayquery1queue" durable="true" auto-delete="false" exclusive="false" name="agentpayquery1queue" >
    <rabbit:queue-arguments>
        <entry key="x-dead-letter-exchange" value="agentpayquery2exchange"/>
    </rabbit:queue-arguments>
</rabbit:queue>
<rabbit:direct-exchange name="agentpayquery1exchange" durable="true" auto-delete="false" id="agentpayquery1exchange">
    <rabbit:bindings>
        <rabbit:binding queue="agentpayquery1queue" key="delay" />
    </rabbit:bindings>
</rabbit:direct-exchange>

<!--定義RabbitTemplate實例-->
<rabbit:template id="agentpayQueryMsgTemplate"
                 exchange="agentpayquery1exchange"  routing-key="delay"
                 connection-factory="connectionFactoryProducer" message-converter="mqMessageConverter"
                 mandatory="true"
                 confirm-callback="publisherConfirmsReturns" return-callback="publisherConfirmsReturns"/>
<!--========================出款查詢 延遲隊列配置 end =========================-->
 

 

 

生產者消息入隊(方法有待重構,見后文說明):

import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class AgentpayQueryProducer {

    private static final Logger log = LogManager.getLogger(AgentpayQueryProducer.class.getSimpleName());

    @Autowired
    private RabbitTemplate agentpayQueryMsgTemplate;

    public void sendDelay(String message, int delaySeconds) {
        String expiration = String.valueOf(delaySeconds * 1000);
        agentpayQueryMsgTemplate.convertAndSend((Object) message, new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message)
                    throws AmqpException {
                message.getMessageProperties().setExpiration(expiration);
                message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
                log.info("出款查詢數據入隊:{}", new String(message.getBody()));
                return message;
            }
        });
    }
}

 

 

§3消費者

消費端的配置無他:

<!-- 連接服務配置  channel-cache-size="25" -->
<rabbit:connection-factory id="connectionFactory"
                           addresses="${mq.ip}"
                           username="${username}"
                           password="${password}" />

<bean id="agentpayQueryConsumer" class="com.emaxcard.rpc.payment.service.impl.batchpay.AgentpayQueryConsumer" />

<!-- queue litener  觀察 監聽模式 當有消息到達時會通知監聽在對應的隊列上的監聽對象-->
<rabbit:queue id="agentpayquery2queue" durable="true" auto-delete="false" exclusive="false" name="agentpayquery2queue" />

<rabbit:listener-container connection-factory="connectionFactory" acknowledge="auto"

                           max-concurrency="20"
                           concurrency="10"
                           prefetch="10">
    <rabbit:listener ref="agentpayQueryConsumer" queues="agentpayquery2queue" />
</rabbit:listener-container>

 

消息消費:

import com.alibaba.fastjson.JSON;
import com.emaxcard.enums.BatchPayStatus;
import com.emaxcard.exceptions.ResponseException;
import com.emaxcard.payment.vo.PaymentRecord;
import com.emaxcard.rpc.payment.model.PaymentRecordModel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
import org.springframework.beans.factory.annotation.Autowired;

public class AgentpayQueryConsumer implements MessageListener {

    private static final Logger log = LogManager.getLogger();

    @Autowired
    QueryGatewayService queryGatewayService;
    @Autowired
    AgentpayQueryProducer agentpayQueryProducer;

    @Override
    public void onMessage(Message message) {
        String mqMsg = new String(message.getBody());
        log.info("出款查詢數據出隊:{}", mqMsg);
        PaymentRecord paymentRecordModel;
        try {
            paymentRecordModel = JSON.parseObject(mqMsg, PaymentRecord.class);
        } catch (Exception ex) {
            log.info("消息格式不是PaymentRecordModel,結束。");
            return;
        }

        try {
            BatchPayStatus payStatus = queryGatewayService.queryGateway(paymentRecordModel);

            // 非終態,繼續放入延遲隊列
            if (BatchPayStatus.SUCCESS != payStatus && BatchPayStatus.FAILED != payStatus) {
                if (BatchPayStatus.NOTEXIST == payStatus) {
                    log.info("查詢結果是{},不再處理", payStatus);
                } else {
                    agentpayQueryProducer.sendDelay(mqMsg, 10);
                }
            }
        } catch (Exception ex) {
            if (ex instanceof ResponseException) {
                log.info("轉賬查詢{},paymentId{},處理錯誤:{}",
                        paymentRecordModel.getTransNo(), paymentRecordModel.getPaymentId(), ex.getMessage());
            } else {
                log.error("處理消息異常:", ex);
            }
        }

    }
}

 

 

§4 使用延遲隊列要注意

隊列的數據結構是一種線性鏈表,遵從FIFO(First-In-First-Out)的存取方式。所以:

  1.  即使一個消息比在同一隊列中的其他消息提前過期,提前過期的也不會優先進入死信隊列,它們還是按照入隊的順序進入死信隊列。即:如果第一個入隊消息的TTL是1小時,那么死信隊列的消費者也許等1小時才能收到第一個消息。

    官方文檔:“Only when expired messages reach the head of a queue will they actually be discarded (or dead-lettered).”  只有當過期的消息到了隊列的頂端(隊首),才會被真正的丟棄或者進入死信隊列。

    所以在考慮使用RabbitMQ來實現延遲任務隊列的時候,需要確保業務上每個任務的延遲時間是一致的。如果遇到不同的任務類型需要不同的延時的話,需要為每一種不同延遲時間的消息建立單獨的消息隊列。

  2. 當緩沖隊列里一旦出現未設置過期時間的消息,那么就會造成整個隊列堵塞,消息無法轉入死信隊列。通過日志可以看到,打印出來的都是 BlockingQueueConsumer。為防止這種問題的發生,需給隊列設置默認的ttl,spring配置是<entry key="x-message-ttl" value="60000" value-type="java.lang.Long"/>(過期時間是1分鍾)。這樣,如果消息有ttl,就按照消息自己的ttl走,否則1分鍾后自動轉入死信隊列。

    這其實涉及到延遲隊列的正確使用方式了。正如上一點提到的,一個延遲隊列里的消息不應該有各自的ttl,而應該統一走隊列本身的ttl。所以,定義延遲隊列時,要配置ttl,同時,在消息入隊時,也不需要指定消息的過期時間了。

    所以上述AgentpayQueryProducer提供的那個方法,可以去掉第2個參數delaySeconds。further more,因為每個隊列只針對某一類型的消息,那么,應明確第一個參數的類型,而非泛泛的String message,這里重構為PaymentRecord。即最終方法簽名是:public void sendDelay(PaymentRecord paymentRecord)

 

 

 

Get messages Ack Mode選擇“Ack message requeue false”,可以將消息消費掉

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM