消息包含兩部分:1、有效載荷(payload) - 你想要傳輸的數據。2、標簽(lable) - 描述有效載荷的相關信息,包含具體的交換器、消息的接受興趣方等。
rabbitmq的基礎流程如下:
RabbitMQ的客戶端和服務端通過channel與RabbitMQ服務器進行通信。
Channel(信道):程序和RabbitMQ之間的連接是通過channel,channel是基於TCP協議之上的,
一個TCP連接可以有多個channel,可以比喻成如下:一根大電纜里面有很多根小的電線,大電纜就是一個TCP連接,
而小電線就是channel了,而為什么要這么設計呢?因為系統創建和銷毀TCP連接是很重量級的操作,
基於一個TCP上進行多個不沖突的通信操作可以大大增加效率。
AMQP的消息路由必須包含3個部分:交換器(Exchange)、隊列(Queue)、綁定(Bind)。
1、Queue(隊列):用來存放消息的地方,等待消費者來獲取。消費者可以通過以下2種方式從queue里接收消息。
⑴、通過AMQP的basic.consume命令訂閱。這樣會將channel置為接收模式,直到取消對該queue的訂閱為止。訂閱了消息后,消費者在消費(或者拒絕)最近接收的那條消息后,就能從隊列中(有可用的)自動接收下一條信息。此訂閱模式生產者把消息發布到queue后就會被消費消費者自動接收到。
⑵、通過AMQP的basic.get命令獲取下一條消息。該模式一次只獲取一條信息,然后就取消了訂閱,如果需要繼續獲取下一條需要再次發送此命令。注意:不應該在for循環里使用basic.get來替代basic.consume,這樣會影響Rabbit的性能。
當多個消費者訂閱同一個隊列時,rabbit會采取循環的方式發送給消費者,每條消息只發給一個訂閱的消費者,消費者收到信息后需要發送basic.ack來確認收到,或者將auto_ack設置為true來自動確認。如果消費者未回復確認信息,rabbit將不會再對此消費者發送消息了。當然你可以使用basic.reject命令來拒絕rabbit發來的消息,拒絕的同時將該消息從隊列里刪除。
流程如下:
說了這么多,隊列該如何創建呢?使用queue.declare命令來創建隊列,需要注意的是在同一條Channel(信道)上只能訂閱一個隊列,如果在Channel上已經訂閱了一個隊列的情況需要訂閱另外一個隊列,唯一的處理方法就是取消訂閱之前的隊列並將Channel置為【傳輸模式】然后再訂閱新的。
該命令主要用到的參數有:
/** * Declare a queue * @param queue 隊列的名稱,如果為空將生成隨機的名稱 * @param durable 是否持久化的隊列 (服務重啟后隊列和消息都還在) * @param exclusive 是否是私有(獨占)的隊列 (限制在此連接) * @param autoDelete 是否當沒人使用隊列時自動刪除 * @param arguments other properties (construction arguments) for the queue * @return a declaration-confirm method to indicate the queue was successfully declared * @throws java.io.IOException if an error is encountered */ Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) throws IOException;
Exchange(交換器)與綁定:服務器根據路由鍵(規則)將消息路由到指定的隊列。Exchange與queue通過綁定操作來進行關聯。
交換器共有4種類型:direct、fanout、topic和headers。
direct exchange:如果路由鍵匹配的話,消息會被投遞到對應隊列。服務器必須實現direct類型的交換器,包含一個空白字符串名稱的默認交換器。
如下的代碼將使用到默認對應exchange類型的exchange
channel.basicPublish("",ROUTING_KEY, null, msg.getBytes());
代碼中使用了默認的direct交換器,路由鍵為隊列的名稱。
當默認的direct交換器無法滿足需求時可以使用exchange.declare命令設置合適的參數來創建一個direct交換器。
別人對此交換器的解釋:
Direct Exchange – 處理路由鍵。需要將一個隊列綁定到交換機上,要求該消息與一個特定的路由鍵完全匹配。這是一個完整的匹配。
如果一個隊列綁定到該交換機上要求路由鍵 “dog”,則只有被標記為“dog”的消息才被轉發,不會轉發dog.puppy,也不會轉發dog.guard,只會轉發dog。

fanout exchange:不處理路由鍵。你只需要簡單的將隊列綁定到交換機上。一個發送到交換機的消息都會被轉發到與該交換機綁定的所有隊列上。很像子網廣播,每台子網內的主機都獲得了一份復制的消息。Fanout交換機轉發消息是最快的。
topic exchange:通過模式匹配分析消息的routing-key(路由鍵)屬性。它將routing-key和binding-key的字符串切分成單詞。這些單詞之間用點隔開。它同樣也會識別兩個通配符:#匹配0個或者多個單詞,*匹配一個單詞。例如,binding key:*.stock.#匹配routing key:usd.stcok和eur.stock.db,但是不匹配stock.nana
注意:如果將消息發送給未綁定/不符合路由規則的Queue,則消息會進入“黑洞”(Rabbit的一個特殊的隊列,老版本的就是直接丟失了)里。
RabbitMQ系統自帶的幾個Exchange: