rabbitmq延遲隊列
rabbitmq實現延遲隊列用了rabbitmq-delayed-message-exchange
插件,需要提前安裝,並啟用。
原理
其原理是通過Exchange來實現延遲功能,即在Exchange中根據各個message的x-delay
頭設置延遲時間,時間到達后才發送到對應的queue,進而被queue消費。
實現
其實現方法為:
-
正常我們申明一個Exchange只需要指定其類型(direct,fanout,topic等)即可,而聲明延遲Exchange需要指定type為
x-delayed-message
,並通過參數x-delay-type
指定其Exchange的類型(direct,fanout,topic等)。其實現如下:Map<String, Object> params = new HashMap<>(); params.put("x-delay-type", "direct"); channel.exchangeDeclare(exchangeName, "x-delayed-message", false, false, params);
-
聲明好Exchange之后,綁定任意隊列即可
-
發送消息的時候需要額外添加header,
x-delay
,用於設置延遲時間,單位:ms。實現如下:int delayMs = 5000; String msg1 = "delay message " + delayMs; Map<String, Object> headers = new HashMap<>(); headers.put("x-delay", delayMs); AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().headers(headers).build(); channel.basicPublish(exchangeName, "", props, msg1.getBytes("utf-8"));
性能影響
普通的Exchange收到message后直接推到queue,而延遲隊列需要判斷是否到達延遲時間,不到延遲時間的需要保存在表中,時間到了再撈出來推送,這些判斷和操作導致效率不如普通的Exchange,所以如果不需要的話,就不要用延遲隊列。