RabbitMQ是基於AMQP的一款消息管理系統。AMQP(Advanced Message Queuing Protocol),是一個提供消息服務的應用層標准高級消息隊列協議,其中RabbitMQ就是基於這種協議的一種實現。
常見mq:
- ActiveMQ:基於JMS
- RabbitMQ:基於AMQP協議,erlang語言開發,穩定性好
- RocketMQ:基於JMS,阿里巴巴產品,目前交由Apache基金會
- Kafka:分布式消息系統,高吞吐量
1 消息模型
RabbitMq有5種常用的消息模型
1.1 基本消息模型
這是最簡單的消息模型,如下圖:

生產者將消息發送到隊列,消費者從隊列中獲取消息,隊列是存儲消息的緩沖區。
再演示代碼之前,我們先創建一個工程rabbitmq-demo,並編寫一個工具類,用於提供與mq服務創建連接
public class ConnectionUtil {
/**
* 建立與RabbitMQ的連接
* @return
* @throws Exception
*/
public static Connection getConnection() throws Exception {
//定義連接工廠
ConnectionFactory factory = new ConnectionFactory();
//設置服務地址
factory.setHost("192.168.18.130");
//端口
factory.setPort(5672);
//設置賬號信息,用戶名、密碼、vhost
factory.setUsername("admin");
factory.setPassword("admin");
// 通過工程獲取連接
Connection connection = factory.newConnection();
return connection;
}
}
生產者發送消息
接下來是生產者發送消息,其過程包括:1.與mq服務建立連接,2.建立通道,3.聲明隊列(有相同隊列則不創建,沒有則創建),4.發送消息,代碼如下:
public class Send {
private static final String QUEUE_NAME = "basic_queue";
public static void main(String[] args) throws Exception {
//消息發送端與mq服務創建連接
Connection connection = ConnectionUtil.getConnection();
//建立通道
Channel channel = connection.createChannel();
//聲明隊列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "hello world";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println("生產者已發送:" + message);
channel.close();
connection.close();
}
}
消費者接受消息
消費者在接收消息的過程需要經歷如下幾個步驟: 1.與mqfuwu建立連接,2.建立通道,3.聲明隊列,4,接收消息,代碼如下:
public class Consumer1 {
private static final String QUEUE_NAME = "basic_queue";
public static void main(String[] args) throws Exception {
//消息消費者與mq服務建立連接
Connection connection = ConnectionUtil.getConnection();
//建立通道
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
DefaultConsumer consumer = new DefaultConsumer(channel) {
// 獲取消息,並且處理,這個方法類似事件監聽,如果有消息的時候,會被自動調用
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
// body 即消息體
String msg = new String(body);
System.out.println("消費者1接收到消息:" + msg);
}
};
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
消息的接收與消費使用都需要在一個匿名內部類DefaultConsumer中完成
注意:隊列需要提前聲明,如果未聲明就使用隊列,則會報錯。如果不清楚生產者和消費者誰先聲明,為了保證不報錯,生產者和消費者都聲明隊列,隊列的創建會保證冪等性,也就是說生產者和消費者都聲明同一個隊列,則只會創建一個隊列
1.2 Work Queues工作隊列模型
在基本消息模型中,一個生產者對應一個消費者,而實際生產過程中,往往消息生產會發送很多條消息,如果消費者只有一個的話效率就會很低,因此rabbitmq有另外一種消息模型,這種模型下,一個生產發送消息到隊列,允許有多個消費者接收消息,但是一條消息只會被一個消費者獲取。

生產者發送消息
與基本消息模型基本一致,這里測試循環發布20條消息:
public class Send {
private static final String QUEUE_NAME = "work_queue";
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 循環發布任務
for (int i = 1; i <= 20; i++) {
// 消息內容
String message = "task .. " + i;
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println("生產者發送消息:" + message);
Thread.sleep(500);
}
channel.close();
connection.close();
}
}
消費者1
public class Consumer1 {
private static final String QUEUE_NAME = "work_queue";
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body);
System.out.println("消費者1接收到消息:" + msg);
try {
Thread.sleep(50);//模擬消費耗時
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
消費者2
public class Consumer2 {
private static final String QUEUE_NAME = "work_queue";
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,byte[] body) throws IOException {
String msg = new String(body);
System.out.println("消費者2接收到消息:" + msg);
try {
Thread.sleep(50);//模擬消費耗時
} catch (InterruptedException e) {
e.printStackTrace();
}
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
此時有兩個消費者監聽同一個隊列,當兩個消費者都工作時,生成者發送消息,就會按照負載均衡算法分配給不同消費者,如下圖:

1.3 訂閱模型
在之前的模型中,一條消息只能被一個消費者獲取,而在訂閱模式中,可以實現一條消息被多個消費者獲取。在這種模型下,消息傳遞過程中比之前多了一個exchange交換機,生產者不是直接發送消息到隊列,而是先發送給交換機,經由交換機分配到不同的隊列,而每個消費者都有自己的隊列:

解讀:
1、1個生產者,多個消費者
2、每一個消費者都有自己的一個隊列
3、生產者沒有將消息直接發送到隊列,而是發送到了交換機
4、每個隊列都要綁定到交換機
5、生產者發送的消息,經過交換機到達隊列,實現一個消息被多個消費者獲取的目的
X(exchange)交換機的類型有以下幾種:
Fanout:廣播,交換機將消息發送到所有與之綁定的隊列中去
Direct:定向,交換機按照指定的Routing Key發送到匹配的隊列中去
Topics:通配符,與Direct大致相同,不同在於Routing Key可以根據通配符進行匹配
注意:在發布訂閱模型中,生產者只負責發消息到交換機,至於消息該怎么發,以及發送到哪個隊列,生產者都不負責。一般由消費者創建隊列,並且綁定到交換機
訂閱模型之Fanout
在廣播模式下,消息發送的流程如下:
- 可以有多個消費者,每個消費者都有自己的隊列
- 每個隊列都要與exchange綁定
- 生產者發送消息到exchange
- exchange將消息把消息發送到所有綁定的隊列中去
- 消費者從各自的隊列中獲取消息
生產者發送消息
public class Send {
private static final String EXCHANGE_NAME = "fanout_exchange";
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 聲明exchange,指定類型為fanout
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
String message = "hello world";
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
System.out.println("生產者發送消息:" + message);
channel.close();
connection.close();
}
}
消費者
public class Consumer1 {
private static final String QUEUE_NAME = "fanout_queue_1";
private static final String EXCHANGE_NAME = "fanout_exchange";
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
//消費者聲明自己的隊列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 聲明exchange,指定類型為direct
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
//消費者將隊列與交換機進行綁定
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
channel.basicConsume(QUEUE_NAME, true, new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body)
throws IOException
{
String msg = new String(body);
System.out.println("消費者1獲取到消息:" + msg);
}
});
}
}
其他消費者只需修改QUEUE_NAME即可
注意:exchange與隊列一樣都需要提前聲明,如果未聲明就使用交換機,則會報錯。如果不清楚生產者和消費者誰先聲明,為了保證不報錯,生產者和消費者都聲明交換機,同樣的,交換機的創建也會保證冪等性。
訂閱模型之Direct
在fanout模型中,生產者發布消息,所有消費者都可以獲取所有消息。在路由模式(Direct)中,可以實現不同的消息被不同的隊列消費,在Direct模式下,交換機不再將消息發送給所有綁定的隊列,而是根據Routing Key將消息發送到指定的隊列,隊列在與交換機綁定時會設定一個Routing Key,而生產者發送的消息時也需要攜帶一個Routing Key。

如圖所示,消費者C1的隊列與交換機綁定時設置的Routing Key是“error”, 而C2的隊列與交換機綁定時設置的Routing Key包括三個:“info”,“error”,“warning”,假如生產者發送一條消息到交換機,並設置消息的Routing Key為“info”,那么交換機只會將消息發送給C2的隊列。
生產者發送消息
public class Send {
private static final String EXCHANGE_NAME = "direct_exchange";
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 聲明exchange,指定類型為direct
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
String message = "新增一個訂單";
//生產者發送消息時,設置消息的Routing Key:"insert"
channel.basicPublish(EXCHANGE_NAME, "insert", null, message.getBytes());
System.out.println("生產者發送消息:" + message);
channel.close();
connection.close();
}
}
消費者1
public class Consumer1 {
private static final String QUEUE_NAME = "direct_queue_1";
private static final String EXCHANGE_NAME = "direct_exchange";
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
//消費者聲明自己的隊列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//聲明交換機
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
//消費者將隊列與交換機進行綁定,並且設置Routing Key:"insert"
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "insert");
channel.basicConsume(QUEUE_NAME, true, new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body)
throws IOException
{
String msg = new String(body);
System.out.println("消費者1獲取到消息:" + msg);
}
});
}
}
其他消費者需要修改隊列名QUEUE_NAME和Routing Key,上述生成者發送的消息,消費者1是可以獲取到的
發布訂閱之Topics
Topic類型的Exchange與Direct相比,都是可以根據RoutingKey把消息路由到不同的隊列。只不過Topic類型Exchange可以讓隊列在綁定Routing key 的時候使用通配符
Routingkey 一般都是有一個或多個單詞組成,多個單詞之間以”.”分割,例如: item.insert
通配符規則:
#:匹配一個或多個詞
*:匹配不多不少恰好1個詞
舉例:
audit.#:能夠匹配audit.irs.corporate 或者 audit.irs
audit.*:只能匹配audit.irs
Topics生產者代碼與Direct大致相同,只不過子聲明交換機時,將類型設為BuiltinExchangeType.TOPIC(topic),
消費者代碼也與Direct大致相同,也是在聲明交換機時設置類型為topic,代碼不再演示
Spring AMQP
Spring AMQP是對AMQP的一種封裝,目的是能夠讓我們更簡便的使用消息隊列,下面介紹一下Spring AMQP在Spring boot中的使用方法
依賴和配置
添加AMQP的啟動器:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
在application.yml中添加RabbitMQ的地址:
spring:
rabbitmq:
host: 192.168.18.130
username: admin
password: admin
消費者
消費者需要定義一個類,類中定義監聽隊列的方法
@Component
public class Listener {
@RabbitListener(
bindings = @QueueBinding(
value = @Queue(value = "spring.test.queue", durable = "false"),
exchange = @Exchange(value = "spring.test.exchange", type = ExchangeTypes.DIRECT),
key = "insert"
)
)
public void listen(String msg){
System.out.println("消費者接受到消息:" + msg);
}
}
注解:
@Component:保證監聽類被spring掃描到
@RabbitListener:

@RabbitListener包含很多內容,在發布訂閱模式中,我們可以使用其中的“QueueBinding[] bindings”,其中QueueBinding底層如下:

其中Queue表示隊列,Exchange表示交換機,key表示Routing Key
@RabbitListener(
bindings = @QueueBinding(
value = @Queue(value = "spring.test.queue", durable = "false"),
exchange = @Exchange(value = "spring.test.exchange", type = ExchangeTypes.DIRECT),
key = "insert"
)
)
@Queue會創建隊列
@Exchange會創建交換機
@QueueBinding會綁定隊列和交換機
生產者發送消息
可以通過注解引入AmqpTemplate:
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAMQPTest {
@Resource
private AmqpTemplate template;
@Test
public void testSendMsg() throws InterruptedException {
String message = "hello spring";
template.convertAndSend("spring.test.exchange", "insert", message);
System.out.println("生產者發送消息:" + message);
Thread.sleep(10000);//等待10s,讓測試方法延遲結束,防止消費者未來得及獲取消息
}
}
RabbitMQ如何防止消息丟失
1. 消息確認機制(ACK)
RabbitMQ有一個ACK機制,消費者在接收到消息后會向mq服務發送回執ACK,告知消息已被接收。這種ACK分為兩種情況:
- 自動ACK:消息一旦被接收,消費者會自動發送ACK
- 手動ACK:消息接收后,不會自動發送ACK,而是需要手動發送ACK
如果消費者沒有發送ACK,則消息會一直保留在隊列中,等待下次接收。但這里存在一個問題,就是一旦消費者發送了ACK,如果消費者后面宕機,則消息會丟失。因此自動ACK不能保證消費者在接收到消息之后能夠正常完成業務功能,因此需要在消息被充分利用之后,手動ACK確認
自動ACK,basicConsume方法中將autoAck參數設為true即可:

手動ack,在匿名內部類中,手動發送ACK:

當然,如果設置了手動ack,但又不手動發送ACK確認,消息會一直停留在隊列中,可能造成消息的重復獲取
2. 持久化
消息確認機制(ACK)能夠保證消費者不丟失消息,但假如消費者在獲取消息之前mq服務宕機,則消息也會丟失,因此要保證消息在服務端不丟失,則需要將消息進行持久化。隊列、交換機、消息都要持久化。
隊列持久化

exchange持久化

消息持久化

3. 生產者確認
生成者在發送消息過程中也可能出現錯誤或者網絡延遲燈故障,導致消息未成功發送到交換機或者隊列,或重復發送消息,為了解決這個問題,rabbitmq中有多個解決辦法:
事務:
用事務將消息發送代碼包圍起來:

Confirm模式:
如下所示,在發送代碼前執行channel.confirmSelect(),如果消息未正常發送,就會進入if代碼塊,可以進行重發也可以對失敗消息進行記錄

異步confirm方法:
顧名思義,就是生產者發送消息后不用等待服務端回饋發送狀態,可以繼續執行后面的代碼,對於失敗消息重發進行異步處理:

Spring AMQP中添加配置:
生產者確認機制,確保消息正確發送,如果發送失敗會有錯誤回執,從而觸發重試
spring:
rabbitmq:
publisher-confirms: true
