1 RabbitMQ介紹
1.1 應用場景
MQ全稱為Message Queue,即消息隊列, RabbitMQ是由erlang語言開發,基於AMQP(Advanced Message Queue 高級消息隊列協議)協議實現的消息隊列,它是一種應用程序之間的通信方法,消息隊列在分布式系統開發中應用非常廣泛。RabbitMQ官方地址:http://www.rabbitmq.com/
開發中消息隊列通常有如下應用場景:
1、任務異步處理。
將不需要同步處理的並且耗時長的操作由消息隊列通知消息接收方進行異步處理。提高了應用程序的響應時間。
2、應用程序解耦合
MQ相當於一個中介,生產方通過MQ與消費方交互,它將應用程序進行解耦合。
市場上還有哪些消息隊列?
ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ、Redis。
為什么使用RabbitMQ呢?
1、使得簡單,功能強大。
2、基於AMQP協議。
3、社區活躍,文檔完善。
4、高並發性能好,這主要得益於Erlang語言。
5、Spring Boot默認已集成RabbitMQ
1.2 其它相關術語
AMQP是什么 ?
總結:AMQP是一套公開的消息隊列協議,最早在2003年被提出,它旨在從協議層定義消息通信數據的標准格式,為的就是解決MQ市場上協議不統一的問題。RabbitMQ就是遵循AMQP標准協議開發的MQ服務。
JMS是什么 ?
總結:
JMS是java提供的一套消息服務API標准,其目的是為所有的java應用程序提供統一的消息通信的標准,類似java的jdbc,只要遵循jms標准的應用程序之間都可以進行消息通信。它和AMQP有什么 不同,jms是java語言專屬的消息服務標准,它是在api層定義標准,並且只能用於java應用;而AMQP是在協議層定義的標准,是跨語言的 。
2 RabbitMQ工作原理
2.1 RabbitMQ工作原理
下圖是RabbitMQ的基本結構:
組成部分說明如下:
- Broker:消息隊列服務進程,此進程包括兩個部分:Exchange和Queue。
- Exchange:消息隊列交換機,按一定的規則將消息路由轉發到某個隊列,對消息進行過慮。
- Queue:消息隊列,存儲消息的隊列,消息到達隊列並轉發給指定的消費方。
- Producer:消息生產者,即生產方客戶端,生產方客戶端將消息發送到MQ。
- Consumer:消息消費者,即消費方客戶端,接收MQ轉發的消息。
消息發布接收流程:
—–發送消息—–
1、生產者和Broker建立TCP連接。
2、生產者和Broker建立通道。
3、生產者通過通道消息發送給Broker,由Exchange將消息進行轉發。
4、Exchange將消息轉發到指定的Queue(隊列)
—-接收消息—–
1、消費者和Broker建立TCP連接
2、消費者和Broker建立通道
3、消費者監聽指定的Queue(隊列)
4、當有消息到達Queue時Broker默認將消息推送給消費者。
5、消費者接收到消息。
2.2 RabbitMQ下載安裝
2.2.1 下載安裝
RabbitMQ由Erlang語言開發,Erlang語言用於並發及分布式系統的開發,在電信領域應用廣泛,OTP(Open Telecom Platform)作為Erlang語言的一部分,包含了很多基於Erlang開發的中間件及工具庫,安裝RabbitMQ需要安裝Erlang/OTP,並保持版本匹配,如下圖:
RabbitMQ的下載地址:http://www.rabbitmq.com/download.html
本項目使用Erlang/OTP 20.3版本和RabbitMQ3.7.3版本。
1)下載erlang
地址如下:
http://erlang.org/download/otp_win64_20.3.exe
或去老師提供的軟件包中找到 otp_win64_20.3.exe,以管理員方式運行此文件,安裝。
erlang安裝完成需要配置erlang環境變量: ERLANG_HOME=D:\Program Files\erl9.3 在path中添加%ERLANG_HOME%\bin;
2)安裝RabbitMQ
https://github.com/rabbitmq/rabbitmq-server/releases/tag/v3.7.3
或去老師提供的軟件包中找到 rabbitmq-server-3.7.3.exe,以管理員方式運行此文件,安裝。
2.2.2啟動
安裝成功后會自動創建RabbitMQ服務並且啟動。
1)從開始菜單啟動RabbitMQ
完成在開始菜單找到RabbitMQ的菜單:
RabbitMQ Service-install :安裝服務
RabbitMQ Service-remove 刪除服務
RabbitMQ Service-start 啟動
RabbitMQ Service-stop 啟動
2)如果沒有開始菜單則進入安裝目錄下sbin目錄手動啟動:
1)安裝並運行服務
rabbitmq-service.bat install 安裝服務 rabbitmq-service.bat stop 停止服務 rabbitmq-service.bat start 啟動服務
2)安裝管理插件
安裝rabbitMQ的管理插件,方便在瀏覽器端管理RabbitMQ
管理員身份運行 rabbitmq-plugins.bat enable rabbitmq_management
3、啟動成功 登錄RabbitMQ
進入瀏覽器,輸入:http://localhost:15672
初始賬號和密碼:guest/guest
2.2.3 注意事項
1、安裝erlang和rabbitMQ以管理員身份運行。
2、當卸載重新安裝時會出現RabbitMQ服務注冊失敗,此時需要進入注冊表清理erlang
搜索RabbitMQ、ErlSrv,將對應的項全部刪除。
2.2 快速入門
按照官方教程(http://www.rabbitmq.com/getstarted.html)測試hello world:
2.2.1搭建環境
1)java client
生產者和消費者都屬於客戶端,rabbitMQ的java客戶端如下:
我們先用 rabbitMQ官方提供的java client測試,目的是對RabbitMQ的交互過程有個清晰的認識。
參考 :https://github.com/rabbitmq/rabbitmq-java-client/
2)創建maven工程
創建生產者工程和消費者工程,分別加入RabbitMQ java client的依賴。
test-rabbitmq-producer:生產者工程
test-rabbitmq-consumer:消費者工程
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>4.0.3</version><!--此版本與spring boot 1.5.9版本匹配-->
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-logging</artifactId>
</dependency>
2.2.2 生產者
生產者操作流程如下:
1)創建連接
2)創建通道
3)聲明隊列
4)發送消息
在生產者工程下的test包中創建測試類如下:
public class Producer01 {
//隊列名稱
private static final String QUEUE = "helloworld";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = null;
Channel channel = null;
try
{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
factory.setVirtualHost("/");//rabbitmq默認虛擬機名稱為“/”,虛擬機相當於一個獨立的mq服務器
//創建與RabbitMQ服務的TCP連接
connection = factory.newConnection();
//創建與Exchange的通道,每個連接可以創建多個通道,每個通道代表一個會話任務
channel = connection.createChannel();
/**
* 聲明隊列,如果Rabbit中沒有此隊列將自動創建
* param1:隊列名稱
* param2:是否持久化
* param3:隊列是否獨占此連接
* param4:隊列不再使用時是否自動刪除此隊列
* param5:隊列參數
*/
channel.queueDeclare(QUEUE, true, false, false, null);
String message = "helloworld小明"+System.currentTimeMillis();
/**
* 消息發布方法
* param1:Exchange的名稱,如果沒有指定,則使用Default Exchange
* param2:routingKey,消息的路由Key,是用於Exchange(交換機)將消息轉發到指定的消息隊列
* param3:消息包含的屬性
* param4:消息體
*/
/**
* 這里沒有指定交換機,消息將發送給默認交換機,每個隊列也會綁定那個默認的交換機,但是不能顯示綁定或解除綁定
* 默認的交換機,routingKey等於隊列名稱
*/
channel.basicPublish("", QUEUE, null, message.getBytes());
System.out.println("Send Message is:'" + message + "'");
}
catch(Exception ex)
{
ex.printStackTrace();
}
finally
{
if(channel != null)
{
channel.close();
}
if(connection != null)
{
connection.close();
}
}
}
}
2.2.3 消費者
消費者操作流程如下:
1)創建連接
2)創建通道
3)聲明隊列
4)監聽隊列
5)接收消息
6)ack回復
在消費者工程下的test包中創建測試類如下:
public class Consumer01 {
private static final String QUEUE = "helloworld";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
//設置MabbitMQ所在服務器的ip和端口
factory.setHost("127.0.0.1");
factory.setPort(5672);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//聲明隊列
channel.queueDeclare(QUEUE, true, false, false, null);
//定義消費方法
DefaultConsumer consumer = new DefaultConsumer(channel) {
/**
* 消費者接收消息調用此方法
* @param consumerTag 消費者的標簽,在channel.basicConsume()去指定
* @param envelope 消息包的內容,可從中獲取消息id,消息routingkey,交換機,消息和重傳標志(收到消息失敗后是否需要重新發送)
* @param properties
* @param body
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body)
throws IOException {
//交換機
String exchange = envelope.getExchange();
//路由key
String routingKey = envelope.getRoutingKey();
//消息id
long deliveryTag = envelope.getDeliveryTag();
//消息內容
String msg = new String(body,"utf-8");
System.out.println("receive message.." + msg);
}
};
/**
* 監聽隊列String queue, boolean autoAck,Consumer callback
* 參數明細
* 1、隊列名稱
* 2、是否自動回復,設置為true為表示消息接收到自動向mq回復接收到了,mq接收到回復會刪除消息,設置為false則需要手動回復
* 3、消費消息的方法,消費者接收到消息后調用此方法
*/
channel.basicConsume(QUEUE, true, consumer);
}
}
3 RabbitMQ工作模式
RabbitMQ有以下幾種工作模式 :
1、Work queues
2、Publish/Subscribe
3、Routing
4、Topics
5、Header
6、RPC
3.1 Work queues
work queues與入門程序相比,多了一個消費端,兩個消費端共同消費同一個隊列中的消息。
應用場景:對於 任務過重或任務較多情況使用工作隊列可以提高任務處理的速度。
測試:
1、使用入門程序,啟動多個消費者。
2、生產者發送多個消息。
結果:
1、一條消息只會被一個消費者接收;
2、rabbit采用輪詢的方式將消息是平均發送給消費者的;
3、消費者在處理完某條消息后,才會收到下一條消息。
3.2 Publish/subscribe
3.2.1生產者
3.2.1.1 發布訂閱模式介紹
發布訂閱模式:
1、每個消費者監聽自己的隊列。
2、生產者將消息發給broker,由交換機將消息轉發到綁定此交換機的每個隊列,每個綁定交換機的隊列都將接收到消息
3.2.1.2 生產者代碼
通過一個案例講解發布訂閱模式:
用戶通知,當用戶充值成功或轉賬完成系統通知用戶,通知方式有短信、郵件多種方法 。
生產者代碼如下:
聲明Exchange_fanout_inform交換機。
聲明兩個隊列並且綁定到此交換機,綁定時不需要指定routingkey
發送消息時不需要指定routingkey
package com.pbteach.test.rabbitmq;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Producer02_publish {
//隊列名稱
private static final String QUEUE_INFORM_EMAIL = "queue_inform_email";
private static final String QUEUE_INFORM_SMS = "queue_inform_sms";
private static final String EXCHANGE_FANOUT_INFORM="exchange_fanout_inform";
public static void main(String[] args) {
Connection connection = null;
Channel channel = null;
try {
//創建一個與MQ的連接
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
factory.setVirtualHost("/");//rabbitmq默認虛擬機名稱為“/”,虛擬機相當於一個獨立的mq服務器
//創建一個連接
connection = factory.newConnection();
//創建與交換機的通道,每個通道代表一個會話
channel = connection.createChannel();
//聲明交換機 String exchange, BuiltinExchangeType type
/**
* 參數明細
* 1、交換機名稱
* 2、交換機類型,fanout、topic、direct、headers
*/
channel.exchangeDeclare(EXCHANGE_FANOUT_INFORM, BuiltinExchangeType.FANOUT);
//聲明隊列
// (String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
/**
* 參數明細:
* 1、隊列名稱
* 2、是否持久化
* 3、是否獨占此隊列
* 4、隊列不用是否自動刪除
* 5、參數
*/
channel.queueDeclare(QUEUE_INFORM_EMAIL, true, false, false, null);
channel.queueDeclare(QUEUE_INFORM_SMS, true, false, false, null);
//交換機和隊列綁定String queue, String exchange, String routingKey
/**
* 參數明細
* 1、隊列名稱
* 2、交換機名稱
* 3、路由key
*/
channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_FANOUT_INFORM,"");
channel.queueBind(QUEUE_INFORM_SMS,EXCHANGE_FANOUT_INFORM,"");
//發送消息
for (int i=0;i<10;i++){
String message = "inform to user"+i;
//向交換機發送消息 String exchange, String routingKey, BasicProperties props, byte[] body
/**
* 參數明細
* 1、交換機名稱,不指令使用默認交換機名稱 Default Exchange
* 2、routingKey(路由key),根據key名稱將消息轉發到具體的隊列,這里填寫隊列名稱表示消息將發到此隊列
* 3、消息屬性
* 4、消息內容
*/
channel.basicPublish(EXCHANGE_FANOUT_INFORM, "", null, message.getBytes());
System.out.println("Send Message is:'" + message + "'");
}
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}finally{
if(channel!=null){
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
if(connection!=null){
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
3.2.2 消費者
3.2.2.1 消費者代碼
郵件發送消費者代碼如下:
package com.pbteach.test.rabbitmq;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author Administrator
* @version 1.0
* @create 2018-06-14 10:32
**/
public class Consumer02_subscribe_email {
//隊列名稱
private static final String QUEUE_INFORM_EMAIL = "inform_queue_email";
private static final String EXCHANGE_FANOUT_INFORM="inform_exchange_fanout";
public static void main(String[] args) throws IOException, TimeoutException {
//創建一個與MQ的連接
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
factory.setVirtualHost("/");//rabbitmq默認虛擬機名稱為“/”,虛擬機相當於一個獨立的mq服務器
//創建一個連接
Connection connection = factory.newConnection();
//創建與交換機的通道,每個通道代表一個會話
Channel channel = connection.createChannel();
//聲明交換機 String exchange, BuiltinExchangeType type
/**
* 參數明細
* 1、交換機名稱
* 2、交換機類型,fanout、topic、direct、headers
*/
channel.exchangeDeclare(EXCHANGE_FANOUT_INFORM, BuiltinExchangeType.FANOUT);
//聲明隊列
// channel.queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
/**
* 參數明細:
* 1、隊列名稱
* 2、是否持久化
* 3、是否獨占此隊列
* 4、隊列不用是否自動刪除
* 5、參數
*/
channel.queueDeclare(QUEUE_INFORM_EMAIL, true, false, false, null);
//交換機和隊列綁定String queue, String exchange, String routingKey
/**
* 參數明細
* 1、隊列名稱
* 2、交換機名稱
* 3、路由key
*/
channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_FANOUT_INFORM,"");
//定義消費方法
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
long deliveryTag = envelope.getDeliveryTag();
String exchange = envelope.getExchange();
//消息內容
String message = new String(body, "utf-8");
System.out.println(message);
}
};
/**
* 監聽隊列String queue, boolean autoAck,Consumer callback
* 參數明細
* 1、隊列名稱
* 2、是否自動回復,設置為true為表示消息接收到自動向mq回復接收到了,mq接收到回復會刪除消息,設置為false則需要手動回復
* 3、消費消息的方法,消費者接收到消息后調用此方法
*/
channel.basicConsume(QUEUE_INFORM_EMAIL, true, defaultConsumer);
}
}
按照上邊的代碼,編寫郵件通知的消費代碼。
3、短信發送消費者
參考上邊的郵件發送消費者代碼編寫。
3.2.2.2 測試
打開RabbitMQ的管理界面,觀察交換機綁定情況:
使用生產者發送若干條消息,每條消息都轉發到各各隊列,每消費者都接收到了消息。
3.2.2.3 小結
1、publish/subscribe與work queues有什么區別。
區別:
1)work queues不用定義交換機,而publish/subscribe需要定義交換機。
2)publish/subscribe的生產方是面向交換機發送消息,work queues的生產方是面向隊列發送消息(底層使用默認交換機)。
3)publish/subscribe需要設置隊列和交換機的綁定,work queues不需要設置,實質上work queues會將隊列綁定到默認的交換機 。
相同點:
所以兩者實現的發布/訂閱的效果是一樣的,多個消費端監聽同一個隊列不會重復消費消息。
2、實質工作用什么 publish/subscribe還是work queues。
建議使用 publish/subscribe,發布訂閱模式比工作隊列模式更強大,並且發布訂閱模式可以指定自己專用的交換機。
3.3 Routing路由模式
3.3.1 路由模式介紹
路由模式:
1、每個消費者監聽自己的隊列,並且設置routingkey。
2、生產者將消息發給交換機,由交換機根據routingkey來轉發消息到指定的隊列。
3.3.2 生產者
聲明exchange_routing_inform交換機。
聲明兩個隊列並且綁定到此交換機,綁定時需要指定routingkey
發送消息時需要指定routingkey
package com.pbteach.test.rabbitmq;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Producer03_routing {
//隊列名稱
private static final String QUEUE_INFORM_EMAIL = "queue_inform_email";
private static final String QUEUE_INFORM_SMS = "queue_inform_sms";
private static final String EXCHANGE_ROUTING_INFORM="exchange_routing_inform";
public static void main(String[] args) {
Connection connection = null;
Channel channel = null;
try {
//創建一個與MQ的連接
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
factory.setVirtualHost("/");//rabbitmq默認虛擬機名稱為“/”,虛擬機相當於一個獨立的mq服務器
//創建一個連接
connection = factory.newConnection();
//創建與交換機的通道,每個通道代表一個會話
channel = connection.createChannel();
//聲明交換機 String exchange, BuiltinExchangeType type
/**
* 參數明細
* 1、交換機名稱
* 2、交換機類型,fanout、topic、direct、headers
*/
channel.exchangeDeclare(EXCHANGE_ROUTING_INFORM, BuiltinExchangeType.DIRECT);
//聲明隊列
// channel.queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
/**
* 參數明細:
* 1、隊列名稱
* 2、是否持久化
* 3、是否獨占此隊列
* 4、隊列不用是否自動刪除
* 5、參數
*/
channel.queueDeclare(QUEUE_INFORM_EMAIL, true, false, false, null);
channel.queueDeclare(QUEUE_INFORM_SMS, true, false, false, null);
//交換機和隊列綁定String queue, String exchange, String routingKey
/**
* 參數明細
* 1、隊列名稱
* 2、交換機名稱
* 3、路由key
*/
channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_ROUTING_INFORM,QUEUE_INFORM_EMAIL);
channel.queueBind(QUEUE_INFORM_SMS,EXCHANGE_ROUTING_INFORM,QUEUE_INFORM_SMS);
//發送郵件消息
for (int i=0;i<10;i++){
String message = "email inform to user"+i;
//向交換機發送消息 String exchange, String routingKey, BasicProperties props, byte[] body
/**
* 參數明細
* 1、交換機名稱,不指令使用默認交換機名稱 Default Exchange
* 2、routingKey(路由key),根據key名稱將消息轉發到具體的隊列,這里填寫隊列名稱表示消息將發到此隊列
* 3、消息屬性
* 4、消息內容
*/
channel.basicPublish(EXCHANGE_ROUTING_INFORM, QUEUE_INFORM_EMAIL, null, message.getBytes());
System.out.println("Send Message is:'" + message + "'");
}
//發送短信消息
for (int i=0;i<10;i++){
String message = "sms inform to user"+i;
//向交換機發送消息 String exchange, String routingKey, BasicProperties props, byte[] body
channel.basicPublish(EXCHANGE_ROUTING_INFORM, QUEUE_INFORM_SMS, null, message.getBytes());
System.out.println("Send Message is:'" + message + "'");
}
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}finally{
if(channel!=null){
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
if(connection!=null){
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
3.3.3 消費者
郵件發送消費者代碼:
package com.pbteach.test.rabbitmq;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer03_routing_email {
//隊列名稱
private static final String QUEUE_INFORM_EMAIL = "inform_queue_email";
private static final String EXCHANGE_ROUTING_INFORM="inform_exchange_routing";
public static void main(String[] args) throws IOException, TimeoutException {
//創建一個與MQ的連接
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
factory.setVirtualHost("/");//rabbitmq默認虛擬機名稱為“/”,虛擬機相當於一個獨立的mq服務器
//創建一個連接
Connection connection = factory.newConnection();
//創建與交換機的通道,每個通道代表一個會話
Channel channel = connection.createChannel();
//聲明交換機 String exchange, BuiltinExchangeType type
/**
* 參數明細
* 1、交換機名稱
* 2、交換機類型,fanout、topic、direct、headers
*/
channel.exchangeDeclare(EXCHANGE_ROUTING_INFORM, BuiltinExchangeType.DIRECT);
//聲明隊列
// channel.queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
/**
* 參數明細:
* 1、隊列名稱
* 2、是否持久化
* 3、是否獨占此隊列
* 4、隊列不用是否自動刪除
* 5、參數
*/
channel.queueDeclare(QUEUE_INFORM_EMAIL, true, false, false, null);
//交換機和隊列綁定String queue, String exchange, String routingKey
/**
* 參數明細
* 1、隊列名稱
* 2、交換機名稱
* 3、路由key
*/
channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_ROUTING_INFORM,QUEUE_INFORM_EMAIL);
//定義消費方法
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
long deliveryTag = envelope.getDeliveryTag();
String exchange = envelope.getExchange();
//消息內容
String message = new String(body, "utf-8");
System.out.println(message);
}
};
/**
* 監聽隊列String queue, boolean autoAck,Consumer callback
* 參數明細
* 1、隊列名稱
* 2、是否自動回復,設置為true為表示消息接收到自動向mq回復接收到了,mq接收到回復會刪除消息,設置為false則需要手動回復
* 3、消費消息的方法,消費者接收到消息后調用此方法
*/
channel.basicConsume(QUEUE_INFORM_EMAIL, true, defaultConsumer);
}
}
3、短信發送消費者
參考郵件發送消費者的代碼流程,編寫短信通知的代碼。
3.3.4 測試
打開RabbitMQ的管理界面,觀察交換機綁定情況:
使用生產者發送若干條消息,交換機根據routingkey轉發消息到指定的隊列。
3.4.5 小結
1、Routing模式和Publish/subscibe有啥區別?
Routing模式要求隊列在綁定交換機時要指定routingkey,消息會轉發到符合routingkey的隊列。
3.4 Topics
3.4.1統配符模式介紹
路由模式:
1、每個消費者監聽自己的隊列,並且設置帶統配符的routingkey。
2、生產者將消息發給broker,由交換機根據routingkey來轉發消息到指定的隊列。
3.4.2生產者
案例:
根據用戶的通知設置去通知用戶,設置接收Email的用戶只接收Email,設置接收sms的用戶只接收sms,設置兩種通知類型都接收的則兩種通知都有效。
生產者代碼如下:
聲明交換機,指定topic類型:
/**
* 聲明交換機
* param1:交換機名稱
* param2:交換機類型 四種交換機類型:direct、fanout、topic、headers
*/
channel.exchangeDeclare(EXCHANGE_TOPICS_INFORM, BuiltinExchangeType.TOPIC);
//Email通知
channel.basicPublish(EXCHANGE_TOPICS_INFORM, "inform.email", null, message.getBytes());
//sms通知
channel.basicPublish(EXCHANGE_TOPICS_INFORM, "inform.sms", null, message.getBytes());
//兩種都通知
channel.basicPublish(EXCHANGE_TOPICS_INFORM, "inform.sms.email", null, message.getBytes());
完整代碼:
package com.pbteach.test.rabbitmq;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Producer04_topics {
//隊列名稱
private static final String QUEUE_INFORM_EMAIL = "queue_inform_email";
private static final String QUEUE_INFORM_SMS = "queue_inform_sms";
private static final String EXCHANGE_TOPICS_INFORM="exchange_topics_inform";
public static void main(String[] args) {
Connection connection = null;
Channel channel = null;
try {
//創建一個與MQ的連接
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
factory.setVirtualHost("/");//rabbitmq默認虛擬機名稱為“/”,虛擬機相當於一個獨立的mq服務器
//創建一個連接
connection = factory.newConnection();
//創建與交換機的通道,每個通道代表一個會話
channel = connection.createChannel();
//聲明交換機 String exchange, BuiltinExchangeType type
/**
* 參數明細
* 1、交換機名稱
* 2、交換機類型,fanout、topic、direct、headers
*/
channel.exchangeDeclare(EXCHANGE_TOPICS_INFORM, BuiltinExchangeType.TOPIC);
//聲明隊列
/**
* 參數明細:
* 1、隊列名稱
* 2、是否持久化
* 3、是否獨占此隊列
* 4、隊列不用是否自動刪除
* 5、參數
*/
channel.queueDeclare(QUEUE_INFORM_EMAIL, true, false, false, null);
channel.queueDeclare(QUEUE_INFORM_SMS, true, false, false, null);
//發送郵件消息
for (int i=0;i<10;i++){
String message = "email inform to user"+i;
//向交換機發送消息 String exchange, String routingKey, BasicProperties props, byte[] body
/**
* 參數明細
* 1、交換機名稱,不指令使用默認交換機名稱 Default Exchange
* 2、routingKey(路由key),根據key名稱將消息轉發到具體的隊列,這里填寫隊列名稱表示消息將發到此隊列
* 3、消息屬性
* 4、消息內容
*/
channel.basicPublish(EXCHANGE_TOPICS_INFORM, "inform.email", null, message.getBytes());
System.out.println("Send Message is:'" + message + "'");
}
//發送短信消息
for (int i=0;i<10;i++){
String message = "sms inform to user"+i;
channel.basicPublish(EXCHANGE_TOPICS_INFORM, "inform.sms", null, message.getBytes());
System.out.println("Send Message is:'" + message + "'");
}
//發送短信和郵件消息
for (int i=0;i<10;i++){
String message = "sms and email inform to user"+i;
channel.basicPublish(EXCHANGE_TOPICS_INFORM, "inform.sms.email", null, message.getBytes());
System.out.println("Send Message is:'" + message + "'");
}
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}finally{
if(channel!=null){
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
if(connection!=null){
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
3.4.3 消費端
隊列綁定交換機指定通配符:
統配符規則:
中間以“.”分隔。
符號#可以匹配多個詞,符號*可以匹配一個詞語。
//聲明隊列
channel.queueDeclare(QUEUE_INFORM_EMAIL, true, false, false, null);
channel.queueDeclare(QUEUE_INFORM_SMS, true, false, false, null);
//聲明交換機
channel.exchangeDeclare(EXCHANGE_TOPICS_INFORM, BuiltinExchangeType.TOPIC);
//綁定email通知隊列
channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_TOPICS_INFORM,"inform.#.email.#");
//綁定sms通知隊列
channel.queueBind(QUEUE_INFORM_SMS,EXCHANGE_TOPICS_INFORM,"inform.#.sms.#");
3.4.4 測試
使用生產者發送若干條消息,交換機根據routingkey統配符匹配並轉發消息到指定的隊列。
3.4.5 小結
1、本案例的需求使用Routing工作模式能否實現?
使用Routing模式也可以實現本案例,共設置三個 routingkey,分別是email、sms、all,email隊列綁定email和all,sms隊列綁定sms和all,這樣就可以實現上邊案例的功能,實現過程比topics復雜。
Topic模式更多加強大,它可以實現Routing、publish/subscirbe模式的功能。
3.5 其它模式
3.5.1 Header模式
header模式與routing不同的地方在於,header模式取消routingkey,使用header中的 key/value(鍵值對)匹配隊列。
案例:
根據用戶的通知設置去通知用戶,設置接收Email的用戶只接收Email,設置接收sms的用戶只接收sms,設置兩種通知類型都接收的則兩種通知都有效。
代碼:
1)生產者
隊列與交換機綁定的代碼與之前不同,如下:
Map<String, Object> headers_email = new Hashtable<String, Object>();
headers_email.put("inform_type", "email");
Map<String, Object> headers_sms = new Hashtable<String, Object>();
headers_sms.put("inform_type", "sms");
channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_HEADERS_INFORM,"",headers_email);
channel.queueBind(QUEUE_INFORM_SMS,EXCHANGE_HEADERS_INFORM,"",headers_sms);
通知:
String message = "email inform to user"+i;
Map<String,Object> headers = new Hashtable<String, Object>();
headers.put("inform_type", "email");//匹配email通知消費者綁定的header
//headers.put("inform_type", "sms");//匹配sms通知消費者綁定的header
AMQP.BasicProperties.Builder properties = new AMQP.BasicProperties.Builder();
properties.headers(headers);
//Email通知
channel.basicPublish(EXCHANGE_HEADERS_INFORM, "", properties.build(), message.getBytes());
2)發送郵件消費者
channel.exchangeDeclare(EXCHANGE_HEADERS_INFORM, BuiltinExchangeType.HEADERS);
Map<String, Object> headers_email = new Hashtable<String, Object>();
headers_email.put("inform_email", "email");
//交換機和隊列綁定
channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_HEADERS_INFORM,"",headers_email);
//指定消費隊列
channel.basicConsume(QUEUE_INFORM_EMAIL, true, consumer);
3)測試
3.5.2 RPC
RPC即客戶端遠程調用服務端的方法 ,使用MQ可以實現RPC的異步調用,基於Direct交換機實現,流程如下:
1、客戶端即是生產者就是消費者,向RPC請求隊列發送RPC調用消息,同時監聽RPC響應隊列。
2、服務端監聽RPC請求隊列的消息,收到消息后執行服務端的方法,得到方法返回的結果
3、服務端將RPC方法 的結果發送到RPC響應隊列
4、客戶端(RPC調用方)監聽RPC響應隊列,接收到RPC調用結果。
4 Spring整合RibbitMQ
4.1 搭建環境
4.1.1 添加依賴
我們選擇基於Spring-Rabbit去操作RabbitMQ
https://github.com/spring-projects/spring-amqp
使用spring-boot-starter-amqp會自動添加spring-rabbit依賴,如下:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-logging</artifactId>
</dependency>
4.1.2 配置application.yml
配置連接rabbitmq的參數
server:
port: 44000
spring:
application:
name: test-rabbitmq-producer
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
passowrd: guest
virtualHost: /
4.1.3 配置交換機和隊列
定義RabbitConfig類,配置Exchange、Queue、及綁定交換機。
本例配置Topic交換機。
package com.pbteach.test.rabbitmq.config;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitmqConfig {
public static final String QUEUE_INFORM_EMAIL = "queue_inform_email";
public static final String QUEUE_INFORM_SMS = "queue_inform_sms";
public static final String EXCHANGE_TOPICS_INFORM="exchange_topics_inform";
/**
* 交換機配置
* ExchangeBuilder提供了fanout、direct、topic、header交換機類型的配置
* @return the exchange
*/
@Bean(EXCHANGE_TOPICS_INFORM)
public Exchange EXCHANGE_TOPICS_INFORM() {
//durable(true)持久化,消息隊列重啟后交換機仍然存在
return ExchangeBuilder.topicExchange(EXCHANGE_TOPICS_INFORM).durable(true).build();
}
//聲明隊列
@Bean(QUEUE_INFORM_SMS)
public Queue QUEUE_INFORM_SMS() {
Queue queue = new Queue(QUEUE_INFORM_SMS);
return queue;
}
//聲明隊列
@Bean(QUEUE_INFORM_EMAIL)
public Queue QUEUE_INFORM_EMAIL() {
Queue queue = new Queue(QUEUE_INFORM_EMAIL);
return queue;
}
/** channel.queueBind(INFORM_QUEUE_SMS,"inform_exchange_topic","inform.#.sms.#");
* 綁定隊列到交換機 .
*
* @param queue the queue
* @param exchange the exchange
* @return the binding
*/
@Bean
public Binding BINDING_QUEUE_INFORM_SMS(@Qualifier(QUEUE_INFORM_SMS) Queue queue, @Qualifier(EXCHANGE_TOPICS_INFORM) Exchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with("inform.#.sms.#").noargs();
}
@Bean
public Binding BINDING_QUEUE_INFORM_EMAIL(@Qualifier(QUEUE_INFORM_EMAIL) Queue queue, @Qualifier(EXCHANGE_TOPICS_INFORM) Exchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with("inform.#.email.#").noargs();
}
}
4.3 生產者
使用RarbbitTemplate發送消息
package com.pbteach.test.rabbitmq;
import com.pbteach.test.rabbitmq.config.RabbitmqConfig;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
@SpringBootTest
@RunWith(SpringRunner.class)
public class Producer05_topics_springboot {
@Autowired
RabbitTemplate rabbitTemplate;
@Test
public void testSendByTopics(){
for (int i=0;i<5;i++){
String message = "sms email inform to user"+i;
rabbitTemplate.convertAndSend(RabbitmqConfig.EXCHANGE_TOPICS_INFORM,"inform.sms.email",message);
System.out.println("Send Message is:'" + message + "'");
}
}
}
4.4消費者
4.4.1 消費者代碼
創建消費端工程,添加依賴:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-logging</artifactId>
</dependency>
使用@RabbitListener注解監聽隊列。
package com.pbteach.test.rabbitmq.mq;
import com.rabbitmq.client.Channel;
import com.pbteach.test.rabbitmq.config.RabbitmqConfig;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class ReceiveHandler {
//監聽email隊列
@RabbitListener(queues = {RabbitmqConfig.QUEUE_INFORM_EMAIL})
public void receive_email(String msg,Message message,Channel channel){
System.out.println(msg);
}
//監聽sms隊列
@RabbitListener(queues = {RabbitmqConfig.QUEUE_INFORM_SMS})
public void receive_sms(String msg,Message message,Channel channel){
System.out.println(msg);
}
}
4.4.2 測試