分布式延遲消息隊列實現分析與設計


介紹

延遲隊列,顧名思義它是一種帶有延遲功能的消息隊列。 那么,是在什么場景下我才需要這樣的隊列呢?

很多時候我們會有延時處理一個任務的需求,比如說:

2個小時后給用戶發送短信。
15分鍾后關閉網絡連接。
2分鍾后再次嘗試回調。

下面我們來分別探討一下幾種實現方案:

1、Java中的DelayQueue

Java中的DelayQueue位於java.util.concurrent包下,本質是由PriorityQueue和BlockingQueue實現的阻塞優先級隊列。

見《延時隊列:Java中的DelayQueue

2、使用Redis實現

前文我們看到,可以通過優先級隊列來實現延遲隊列的功能。Redis提供了很多數據結構,其中的zset是一種有序的數據結構;我們可以通過Redis中的zset來實現一個延遲隊列。

基本的方法就是使用時間戳作為元素的score存入zset。

redis> ZADD delayqueue <future_timestamp> "messsage"
獲取所有已經“就緒”的message,並且刪除message。
 
redis> MULTI
redis> ZRANGEBYSCORE delayqueue 0 <current_timestamp>
redis> ZREMRANGEBYSCORE delayqueue 0 <current_timestamp>
redis> EXEC

但是這個方案也有一些問題:

Redis事務雖然保證了一致性和隔離性,但是並沒有提供回滾功能。消息處理失敗是不能被恢復的,如果處理某條消息的線程崩潰或機器宕機,這條未被處理不能被自動的再次處理。

也有考慮過將分為TODO和Doing兩條隊列:

先從TODO隊列中取出任務,放入Doing中,再開始處理;如果停留在Doing隊列總過久,則重新放入TODO隊列。但是由於Redis的事務特性,並不能做到完全可靠;並且檢查Doing超時的邏輯也略復雜。那么有沒有一個成熟的消息隊列可以支持延遲投遞消息的功能呢?答案當然是有的,本文的標題就是使用RabbitMQ實現DelayQueue。

見《基於redis的延遲消息隊列設計

3、使用AWS上的SQS的延時隊列

import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.services.sns.AmazonSNS;
import com.amazonaws.services.sns.AmazonSNSClientBuilder;
import com.amazonaws.services.sns.model.PublishResult;
import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.AmazonSQSClientBuilder;
import com.amazonaws.services.sqs.model.SendMessageRequest;
import com.amazonaws.services.sqs.model.SendMessageResult;
import com.amazonaws.util.StringUtils;


SendMessageRequest sendMessageRequest = new SendMessageRequest(queueUrl, message);
            sendMessageRequest.setDelaySeconds(delaySeconds);
            SendMessageResult result = sqs.sendMessage(sendMessageRequest);

4、使用RabbitMQ實現

這是RabbitMQ眾多隱藏的強大特性中的一個,可以輕松的降低代碼的復雜度,實現DelayQueue的功能。

我們需要兩個隊列,一個用來做主隊列,真正的投遞消息;另一個用來延遲處理消息。

ConnectionFactory factory = new ConnectionFactory();
factory.setHost(host);
factory.setPort(port);
connection = factory.newConnection();
channel = connection.createChannel();
 
channel.queueDeclare("MAIN_QUEUE", true, false, false, null);
channel.queueBind("MAIN_QUEUE", "amq.direct", "MAIN_QUEUE");
 
HashMap<String, Object> arguments = new HashMap<String, Object>();
arguments.put("x-dead-letter-exchange", "amq.direct");
arguments.put("x-dead-letter-routing-key", "MAIN_QUEUE");
channel.queueDeclare("DELAY_QUEUE", true, false, false, arguments);

 

放入延遲消息:

        public boolean send(String message, long delay) {
            AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
            builder.expiration(String.valueOf(delay));
            //璁劇疆娑堟伅TTL
            builder.deliveryMode(2);
            //璁劇疆娑堟伅鎸佷箙鍖�
            AMQP.BasicProperties properties = builder.build();
            //璁劇疆娑堟伅TTL
            try {
                logger.info("=====>start send rabbit mq message:{},delay:{}",message,delay);
                channel.basicPublish(exchangeName,routingKey,properties,message.getBytes());
                logger.info("=====>end send rabbit mq message:{},delay:{}",message,delay);
                return true;
            } catch (IOException e) {
                logger.error("<=====rabbitmq producer send message error",e);
                return false;
            }
        }

 

而關鍵點,就在於 x-dead-letter-exchange 和 x-dead-letter-routing-key 兩個參數上。這兩個參數說明了:消息過期后的處理方式 --> 投遞到我們指定的MAIN_QUEUE;然后我們只需要在MAIN_QUEUE中等待消息投遞即可。

 

RabbitMQ本身提供了消息持久化和沒有收到ACK的重投遞功能,這樣我們就可以實現一個高可靠的分布式延遲消息隊列了。

PS:上面講述的RabbitMQ定時任務方案有問題,RabbitMQ TTL文檔 中寫道:

Caveats

While consumers never see expired messages, only when expired messages reach the head of a queue will they actually be discarded (or dead-lettered). When setting a per-queue TTL this is not a problem, since expired messages are always at the head of the queue. When setting per-message TTL however, expired messages can queue up behind non-expired ones until the latter are consumed or expired. Hence resources used by such expired messages will not be freed, and they will be counted in queue statistics (e.g. the number of messages in the queue).

per-queue TTL不會有問題,因為快要過期的消息總是在隊列的前邊;但是如果使用per-message TTL的話,過期的消息有可能會在未過期的消息后邊,直到前邊的消息過期或者被消費。因為RabbitMQ保證過期的消息一定不會被消費者消費,但是不能保證消息過期就會從隊列中移除。

5、ActiveMQ

ActiveMQ from version 5.4 has an optional persistent scheduler built into the ActiveMQ message broker.

可以支持定時、延遲投遞、重復投遞和Cron調度。

在配置文件中,啟用<broker ... schedulerSupport="true"> 選項后即可使用。

MessageProducer producer = session.createProducer(destination);
TextMessage message = session.createTextMessage("test msg");
long delay = 30 * 1000;
long period = 10 * 1000;
int repeat = 9;
message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, delay);
message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, period);
message.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, repeat);
producer.send(message);
MessageProducer producer = session.createProducer(destination);
TextMessage message = session.createTextMessage("test msg");
message.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_CRON, "0 * * * *");
producer.send(message);

 

ActiveMQ配置項介紹:

Property                                        type                                  Description

AMQ_SCHEDULED_DELAY         false                   The time in milliseconds that a message will wait before

                                                                                being scheduled to be delivered by the broker

AMQ_SCHEDULED_DELAY         false                   消息延遲發送的延遲時間(單位毫秒)                                                                                

                                                                                

AMQ_SCHEDULED_PERIOD      false                     The time in milliseconds after the start time to wait before

                                                                                scheduling the message again

AMQ_SCHEDULED_PERIOD      false                     代理啟動后,發送消息之前的等待時間(單位毫秒).                                                                                

AMQ_SCHEDULED_REPEAT      false                     The number of times to repeat scheduling a message for delivery

AMQ_SCHEDULED_REPEAT      false                     調度消息發送的重復次數

AMQ_SCHEDULED_CRON        String                    Use a cron entry to set the schedule

AMQ_SCHEDULED_CRON        String                    使用一個cron實體設置消息發送調度

 

文章引自:http://zhangyp.net/rabbitmq-delayqueue/


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM