分布式延遲消息隊列討論


很多時候我們會有延時處理一個任務的需求,比如說:

  • 2個小時后給用戶發送短信。
  • 15分鍾后關閉網絡連接。
  • 2分鍾后再次嘗試回調。

下面我們來分別探討一下幾種實現方案:

Java中的DelayQueue

Java中的DelayQueue位於java.util.concurrent包下,本質是由PriorityQueue和BlockingQueue實現的阻塞優先級隊列。

放入隊列的元素需要實現Delayed接口:

public interface Delayed extends Comparable<Delayed> {

    /**
     * Returns the remaining delay associated with this object, in the
     * given time unit.
     *
     * @param unit the time unit
     * @return the remaining delay; zero or negative values indicate
     * that the delay has already elapsed
     */
    long getDelay(TimeUnit unit);
}

通過實現這個接口,來完成對隊列中元素,按照時間延遲先后排序的目的。

從隊列中取元素:

   /**
     * Retrieves and removes the head of this queue, waiting if necessary
     * until an element with an expired delay is available on this queue.
     *
     * @return the head of this queue
     * @throws InterruptedException {@inheritDoc}
     */
    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            for (;;) {
                E first = q.peek();
                if (first == null)
                    available.await();
                else {
                    long delay = first.getDelay(TimeUnit.NANOSECONDS);
                    if (delay <= 0)
                        return q.poll();
                    else if (leader != null)
                        available.await();
                    else {
                        Thread thisThread = Thread.currentThread();
                        leader = thisThread;
                        try {
                            available.awaitNanos(delay);
                        } finally {
                            if (leader == thisThread)
                                leader = null;
                        }
                    }
                }
            }
        } finally {
            if (leader == null && q.peek() != null)
                available.signal();
            lock.unlock();
        }
    }

可以看到,在這段代碼里,在第一個元素的延遲時間還沒到的情況下:

  • 如果當前沒有其他線程等待,則阻塞當前線程直到延遲時間。
  • 如果有其他線程在等待,則阻塞當前線程。

向隊列中放入元素:

    /**
     * Inserts the specified element into this delay queue.
     *
     * @param e the element to add
     * @return <tt>true</tt>
     * @throws NullPointerException if the specified element is null
     */
    public boolean offer(E e) {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            q.offer(e);
            if (q.peek() == e) {
                leader = null;
                available.signal();
            }
            return true;
        } finally {
            lock.unlock();
        }
    }

在放入元素的時候,會喚醒等待中的讀線程。

如果我們不考慮分布式運行和任務持久化的話,Java中的DelayQueue是一個很理想的方案,精巧好用。

但是如果我們需要分布式運行和任務持久化,就需要引入一些外部組件。

使用Redis實現

前文我們看到,可以通過優先級隊列來實現延遲隊列的功能。

Redis提供了很多數據結構,其中的zset是一種有序的數據結構;我們可以通過Redis中的zset來實現一個延遲隊列。

基本的方法就是使用時間戳作為元素的score存入zset。

redis> ZADD delayqueue <future_timestamp> "messsage"

獲取所有已經“就緒”的message,並且刪除message。

redis> MULTI
redis> ZRANGEBYSCORE delayqueue 0 <current_timestamp>
redis> ZREMRANGEBYSCORE delayqueue 0 <current_timestamp>
redis> EXEC

但是這個方案也有一些問題:

Redis事務雖然保證了一致性和隔離性,但是並沒有提供回滾功能。消息處理失敗是不能被恢復的,如果處理某條消息的線程崩潰或機器宕機,這條未被處理不能被自動的再次處理。

也有考慮過將分為TODO和Doing兩條隊列:

先從TODO隊列中取出任務,放入Doing中,再開始處理;如果停留在Doing隊列總過久,則重新放入TODO隊列。

但是由於Redis的事務特性,並不能做到完全可靠;並且檢查Doing超時的邏輯也略復雜。

那么有沒有一個成熟的消息隊列可以支持延遲投遞消息的功能呢?

答案當然是有的,本文的標題就是使用RabbitMQ實現DelayQueue。

使用RabbitMQ實現

這是RabbitMQ眾多隱藏的強大特性中的一個,可以輕松的降低代碼的復雜度,實現DelayQueue的功能。

我們需要兩個隊列,一個用來做主隊列,真正的投遞消息;另一個用來延遲處理消息。

ConnectionFactory factory = new ConnectionFactory();
factory.setHost(host);
factory.setPort(port);
connection = factory.newConnection();
channel = connection.createChannel();

channel.queueDeclare("MAIN_QUEUE", true, false, false, null);
channel.queueBind("MAIN_QUEUE", "amq.direct", "MAIN_QUEUE");

HashMap<String, Object> arguments = new HashMap<String, Object>();
arguments.put("x-dead-letter-exchange", "amq.direct");
arguments.put("x-dead-letter-routing-key", "MAIN_QUEUE");
channel.queueDeclare("DELAY_QUEUE", true, false, false, arguments);

放入延遲消息:

AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
AMQP.BasicProperties properties = builder.expiration(String.valueOf(task.getDelayMillis())).deliveryMode(2).build();
channel.basicPublish("", "DELAY_QUEUE", properties, SerializationUtils.serialize(task));

而關鍵點,就在於 x-dead-letter-exchange 和 x-dead-letter-routing-key 兩個參數上。這兩個參數說明了:消息過期后的處理方式 --> 投遞到我們指定的MAIN_QUEUE;然后我們只需要在MAIN_QUEUE中等待消息投遞即可。

RabbitMQ本身提供了消息持久化和沒有收到ACK的重投遞功能,這樣我們就可以實現一個高可靠的分布式延遲消息隊列了。

PS

上面講述的RabbitMQ定時任務方案有問題,RabbitMQ TTL文檔 中寫道:

Caveats

While consumers never see expired messages, only when expired messages reach the head of a queue will they actually be discarded (or dead-lettered). When setting a per-queue TTL this is not a problem, since expired messages are always at the head of the queue. When setting per-message TTL however, expired messages can queue up behind non-expired ones until the latter are consumed or expired. Hence resources used by such expired messages will not be freed, and they will be counted in queue statistics (e.g. the number of messages in the queue).

per-queue TTL不會有問題,因為快要過期的消息總是在隊列的前邊;但是如果使用per-message TTL的話,過期的消息有可能會在未過期的消息后邊,直到前邊的消息過期或者被消費。因為RabbitMQ保證過期的消息一定不會被消費者消費,但是不能保證消息過期就會從隊列中移除。

ActiveMQ

ActiveMQ from version 5.4 has an optional persistent scheduler built into the ActiveMQ message broker.

可以支持定時、延遲投遞、重復投遞和Cron調度。

在配置文件中,啟用<broker ... schedulerSupport="true"> 選項后即可使用。

MessageProducer producer = session.createProducer(destination);
TextMessage message = session.createTextMessage("test msg");
long delay = 30 * 1000;
long period = 10 * 1000;
int repeat = 9;
message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, delay);
message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, period);
message.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, repeat);
producer.send(message);
MessageProducer producer = session.createProducer(destination);
TextMessage message = session.createTextMessage("test msg");
message.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_CRON, "0 * * * *");
producer.send(message);

PS

由於ActiveMQ采用的是類似於Java中DelayQueue的方式,通過先將消息排序再定時觸發的方式來實現延遲消息。在往隊列中投遞大量(10w+)定時消息之后,ActiveMQ的性能將會變得接近不可用,大量的消息擠壓得不到投遞。

其他一些可能的實現方式

  • RocketMQ 支持定時消息,但是不支持任意時間精度,支持特定的level,例如定時5s,10s,1m等。

  • 通過MySQL等數據庫記錄消息應該被投遞的時間,然后循環進行查找,並把當前時間應該投遞的消息放入普通的消息隊列。

參考資料


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM