使用任務隊列一個優點是能夠輕易地並行處理任務。當處理大量積壓的任務,只要增加“Worker”,通過這個方式,能夠實現輕易的縮放。
Round-robin dispatching:
默認地,RabbitMQ會逐一地向下一個“Consumer”發放消息,每一個“Consumer”會得到數目相同的消息。
這種發放消息的方式叫Round-ronbin dispaching。
Message acknowledgment:
當“Consumer”接受到一個消息並作長時間處理時,有可能發生意外狀況,如運行“Consumer”的機器突然關閉,這時這個消息所要執行的任務可能沒有得到正確處理。
我們不希望有任務丟失或者沒有正確處理,RabbitMQ支持Message acknowledgment來解決這個問題。
當“Consumer”接收到消息、處理任務完成之后,會發送帶有這個消息標示符的ack,來告訴RabbitMQ這個消息接收到並處理完成。
RabbitMQ會一直等到處理某個消息的“Consumer”的鏈接失去之后,才確定這個消息沒有正確處理,從而RabbitMQ重發這個消息。
Message acknowledgment是默認關閉的。
初始化“Consumer”時有個auto參數,如果設置為true,這個Consumer在收到消息之后會馬上返回ack。
我們的應用應該是在消息的任務處理完之后再ack,因此初始化“Consumer”時這個參數應該置為false。實例代碼:
QueueingConsumer consumer = new QueueingConsumer(channel);
boolean autoAck = false;
channel.basicConsume("hello", autoAck, consumer);
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
//我們的任務處理
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
忘記ack是一個容易犯並且后果很嚴重的錯誤。RabbitMQ會侵占越來越多的內存,因為它不會釋放沒有被ack的消息。
可以使用rabbitmqctl去debug這個錯誤。實例:
rabbitmqctl list_queues name message_rady message_unacknowleded
Message durability:
Message acknowledgment解決了如果“Consumer”異常之后,任務得到保證。但是這並不能保證服務器異常之后任務能不丟失。Message durability正是用來解決這個問題。
為了實現Message durability,我們應該做兩個設置:消息隊列和消息本身。
聲明消息隊列時應該將durable參數置為true,消息的properties做設置,一個實例:
//channel設置
boolean durable = true;
channel.queueDeclare("hello", durable, false, false, null);
//publish的消息properties設置
channel.basicPublish("", "task_queue",
MessageProperties.PERSISTENT_TEXT_PLAIN,
message.getBytes());
這里有一個有意思的提示,RabbitMQ不能重復聲明隊列名稱相同但是屬性不一樣的隊列。屬性如durable。
關於message持久性的note:
即使做了上述的幾步來確保消息和任務不丟失,但是這樣依然有幾率丟失消息。
RabbitMQ沒有為每一個消息做fsync(2)(不知道這個是什么),消息可能只被保存在cache中,而沒有真正寫入硬盤,這時服務器突然異常就可能產生丟失。即使這個概率發生很小,但是如果需要更強大的保障,考慮用事件去包裝發布消息的過程。
Fair dispatch:
RabbitMQ默認只會用Round-robin的方式盲目地將消息派發給下一個“Consumer”,它不顧及“Consumer”還有多少個消息沒有ack。
為解決這個問題,我們可以使用basicQos方法來限制“Consumer”沒有ack的消息數目。實例:
int prefetchCount = 1;
channel.basicQos(prefetchCount);
如果這樣做也有個問題,如果消息無法派發(“Consumer”的unack消息太多),消息多的可能使消息隊列溢出。
exchange:
在實際的情況下,消息發送者不會直接將Message發入queue,而是發入exchange,exchange能通過指定的規則將message放入指定的queue中(public/subscribe模型)。exchange有direct,topic,headers和fanout等幾種類型。
聲明exchange的方法如下:
channel.exchangeDeclare(String name, String type);
fanout是一種廣播模式,將消息派發到所有的綁定到該exchange的queque中。
查看rabbitMQ中的exchange可以使用rabbitmqctl list_exchanges,查看exchange和queue的綁定關系可以使用rabbitmqctl list_bindings
basicPublish()方法中,如果exchange置為"",將使用默認的exchange,這時將消息放入名字為routingKey參數的值的隊列。
channel.queueDeclare()不帶參數的方法將生產一個non-durable(服務器重啟后,這個queue不將存在), exclusive(這個queue只能被這個鏈接使用), autodelete(這個queue不再使用的時候會被刪除)並且名字隨機生成的queue,這種queue非常適合做廣播接收queue
direct exchange:
這種exchange的routing-key只有一個單詞,exchange根據這個key將消息派發到對應的queue中。
topic exchange:
topic exchange的routing-key不能是任意,它必須是一個一些詞的數組,用"."隔開,每個詞可以是任意的,但通常是代表某些含義的。這個詞的數組不能超過255個字節。
"*"可以代表任意一個詞
"#"可以代表任意數目的詞(0個或更多)。
topic exchange是非常強大的,它幾乎能實現其他exchange的routing規則。
自己的note:
一個exchange能用n條routing-key規則去指向一個queue。
如果一個Message符合n條規則去往同一個queue,這個Message只會被投放到這個queue一次。
RPC:
Remote Procedure Call(遠程過程調度)。Client向遠程的計算機請求一個耗時較長的任務,並等待其返回結果。
RPC盡管是非常常見的一種模式,但是它有許多不足之處。RPC會帶來一些不可預知的錯誤並增加了調試的復雜性。開發人員可能會困惑哪個是遠程調用哪個是本地調用,或者說哪個RPC是耗時較長的調用。
開發RPC時,記住以下的建議:
1)將遠程調用和本地調用清楚的區別
2)為系統編寫文檔,將組件的依賴關系清楚的分離。
3)着手准備處理意外事件,要考慮到遠程調用的服務器意外關閉之后該如何處理。
用RabbitMQ做RPC是非常容易的,客戶端將Message發送到服務器,在Message里應該帶上一個回調queue的地址,服務器通過這個地址將結果返回給客戶端。
Message一共有14種屬性,常見的有這么幾個:
1)deliveryMode:標識這個Message是持久性的還是短暫性,如果是持久性並且保持這個Message的queue是durable的,服務器重啟之后這個消息如果未被Consumer處理,就會恢復這個Message並加入到queue中。
2)contentType: 標識消息內容的MIME,例如JSON用application/json
3)replayTo: 標識回調的queue的地址
4)correlationId:用於request和response的關聯,確保消息的請求和響應的同一性。