RabbitMQ入門案例
Rabbit 模式
實現步驟
- 構建一個 maven工程
- 導入 rabbitmq的依賴
- 啟動 rabbitmq-server服務
- 定義生產者
- 定義消費者
- 觀察消息的在 rabbitmq-server服務中的進程
初步實現
前期准備
1.構建項目
2.導入依賴
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.10.0</version>
</dependency>
簡單模型

在上圖的模型中,有以下概念:
- 生產者,也就是要發送消息的程序
- 消費者:消息的接受者,會一直等待消息到來。
- 消息隊列:圖中紅色部分。類似一個郵箱,可以緩存消息;生產者向其中投遞消息,消費者從其中取出消息。
所有的中間件技術都是基於
tcp/ip
協議基礎之上構建新型的協議規范,只不過rabbitmq
遵循的是amqp
實現步驟:
- 創建連接工程
- 創建連接 connection
- 通過連接獲取通道 Channel
- 通過通道創建交換機,聲明隊列,綁定關系,路由key,發送消息,和接收消息
- 准備消息內容
- 發送消息給隊列 queue
- 關閉連接
- 關閉通道
生產者
public class Producer {
public static void main(String[] args) {
//1. 創建連接工程
ConnectionFactory connectionFactory = new ConnectionFactory();
//這里要使用自己的IP地址
connectionFactory.setHost("192.168.57.129");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
connectionFactory.setVirtualHost("/");
Connection connection = null;
Channel channel = null;
try {
//2. 創建連接 connection
connection = connectionFactory.newConnection("生產者");
//3. 通過連接獲取通道 Channel
channel = connection.createChannel();
//4. 通過通道創建交換機,聲明隊列,綁定關系,路由key,發送消息
String quequeName = "queuel";
/**
* @params1 隊列的名稱
* @params2 是否要持久化 durable-false
* @params3 排他性,是否是獨占獨立
* @params4 是否自動刪除,隨着最后一個消費者消息完畢以后是否把隊列自動刪除
* @params5 攜帶的附屬參數
*/
channel.queueDeclare(quequeName,false,false,false,null);
//5. 准備消息內容
String message = "Hello,Consumer";
//6. 發送消息給隊列 queue
channel.basicPublish("",quequeName,null,message.getBytes());
System.out.println("消息發送成功");
} catch (Exception e) {
e.printStackTrace();
}finally {
//7. 關閉連接
if (channel != null && channel.isOpen()){
try {
channel.close();
} catch (Exception e) {
e.printStackTrace();
}
}
//8. 關閉通道
if (connection != null && connection.isOpen()){
try {
connection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
}
消費者
public class Consumer {
public static void main(String[] args) {
//1. 創建連接工程
ConnectionFactory connectionFactory = new ConnectionFactory();
//這里要使用自己的IP地址
connectionFactory.setHost("192.168.57.129");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
connectionFactory.setVirtualHost("/");
Connection connection = null;
Channel channel = null;
try {
//2. 創建連接 connection
connection = connectionFactory.newConnection("消費者");
//3. 通過連接獲取通道 Channel
channel = connection.createChannel();
//4. 通過通道創建交換機,聲明隊列,綁定關系,路由key,發送消息,和接收消息
String quequeName = "queue1";
channel.queueDeclare(quequeName,false,false,false,null);
//5.監聽消息
DefaultConsumer consumer = new DefaultConsumer(channel){
/*
consumerTag:消息者標簽,channel.basicConsume可以指定
envelope:消息包內容,可從中獲取消息id,消息routing key,交換機,消息和重裝標記(收到消息失敗后是否需要重新發送)
properties:消息屬性
body;消息
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//路由key
System.out.println("路由key為:"+ envelope.getRoutingKey());
//交換機
System.out.println("交換機為:"+ envelope.getExchange());
//消息id
System.out.println("消息id為:"+ envelope.getDeliveryTag());
//收到的消息
System.out.println("接收到的消息:"+ new String(body,"UTF-8"));
System.out.println("");
System.out.println("======================================================");
System.out.println("");
}
};
channel.basicConsume("queue1", true, consumer);
} catch (Exception e) {
e.printStackTrace();
}finally {
//6. 不關閉資源,一直監聽
}
}
}

AMQP
概念介紹
AMQP 一個提供統一消息服務的應用層標准高級消息隊列協議,是應用層協議的一個開放標准,為面向消息的中間件設計。
AMQP是一個二進制協議,擁有一些現代化特點:
多信道
、協商式
,異步
,安全
,擴平台
,中立
,高效
。RabbitMQ 是 AMQP協議 的 Erlang的實現。
概念 | 說明 |
---|---|
連接 Connection | 一個網絡連接,例如:TCP/IP套接字連接。 |
會話 Session | 端點之間的命名對話。在一個會話上下文中,保證“恰好傳遞一次”。 |
信道 Channel | 多路復用連接中的一條獨立的雙向數據流通道。為會話提供物理傳輸介質。 |
客戶端 Client | AMQP連接或者會話的發起者。AMQP是非對稱的,客戶端生產和消費消息,服務器存儲和路由這些消息。 |
服務節點Broker | 消息中間件的服務節點。一般情況下可以將一個RabbitMQ Broker看作一台RabbitMQ 服務器。 |
端點 | AMQP對話的任意一方。一個AMQP連接包括兩個端點(一個是客戶端,一個是服務器)。 |
消費者 Consumer | 一個從消息隊列里請求消息的客戶端程序。 |
生產者 Producer | 一個向交換機發布消息的客戶端應用程序。 |
RabbitMQ運轉流程
以 入門案例 為例
生產者發送消息
- 生產者創建連接(Connection),開啟一個信道(Channel),連接到RabbitMQ Broker;
- 聲明隊列、設置屬性;如是否排它,是否持久化,是否自動刪除;
- 將路由鍵(空字符串)與隊列綁定起來;
- 發送消息至RabbitMQ Broker;
- 關閉信道;
- 關閉連接;
消費者接收消息
- 消費者創建連接(Connection),開啟一個信道(Channel),連接到RabbitMQ Broker
- 向Broker 請求消費相應隊列中的消息,設置相應的回調函數;
- 等待Broker回應閉關投遞響應隊列中的消息,消費者接收消息;
- 確認(ack,自動確認)接收到的消息;
- RabbitMQ從隊列中刪除相應已經被確認的消息;
- 關閉信道;
- 關閉連接;
生產者流轉過程解析
- 客戶端與代理服務器Broker建立連接。調用
newConnection()
方法 , 會進一步封裝Protocol Header 0-9-1
的報文頭發送給Broker
,以此通知Broker
本次交互采用的是AMQP 0-9-1
協議,緊接着Broker
返回Connection.Start
來建立連接,在連接的過程中涉及Connection.Start/.Start-OK
、Connection.Tune/.Tune-Ok
,Connection.Open/ .Open-Ok
這6 個命令的交互。 - 客戶端調用
connection.createChannel
方法。此方法開啟信道,其包裝的channel.open
命令發送給Broker
, 等待channel.basicPublish
方法,對應的AMQP命令為Basic.Publish
, 這個命令包含了content Header
和content Body()
。content Header 包含了消息體的屬性,例如:投遞模式,優先級等,content Body 包含了消息體本身。 - 客戶端發送完消息需要關閉資源時,涉及到
Channel.Close和Channl.Close-Ok
與Connetion.Close和Connection.Close-Ok
的命令交互。
消費者流轉過程解析
- 消費者客戶端與代理服務器Broker建立連接。會調用
newConnection()
方法,這個方法會進一步封裝Protocol Header 0-9-1
的報文頭發送給Broker ,以此通知Broker 本次交互采用的是AMQP 0-9-1
協議,緊接着Broker 返回Connection.Start
來建立連接,在連接的過程中涉及Connection.Start/.Start-OK 、Connection.Tune/.Tune-Ok ,Connection.Open/ .Open-Ok
這6 個命令的交互。 - 消費者客戶端調用
connection.createChannel
方法。和生產者客戶端一樣,協議涉及Channel . Open/Open-Ok
命令。 - 在真正消費之前,消費者客戶端需要向Broker 發送
Basic.Consume
命令(即調用channel.basicConsume
方法〉將Channel 置為接收模式,之后Broker 回執Basic . Consume - Ok
以告訴消費者客戶端准備好消費消息。 - Broker 向消費者客戶端推送(Push) 消息,即
Basic.Deliver
命令,這個命令和Basic.Publish
命令一樣會攜帶Content Header 和Content Body。
- 消費者接收到消息並正確消費之后,向Broker 發送確認,即
Basic.Ack
命令。 - 客戶端發送完消息需要關閉資源時,涉及到
Channel.Close和Channl.Close-Ok 與Connetion.Close和Connection.Close-Ok
的命令交互。
個人博客為:
MoYu's HomePage