前言
本文講述的只是主要是 RabbitMQ 的入門知識,學習本文主要可以掌握以下知識點:
- MQ 的發展史
- AMQP 協議
- Rabbit MQ 的安裝
- Rabbit MQ 在 Java API 中的使用
- RabbitMQ 與 SpringBoot 的集成
MQ 的誕生歷史
大部分技術的剛產生時適用范圍都是特定的。比如互聯網的產生,剛開始出現的通信協議各個產商之間是無法兼容的,隨着歷史的發展,產生了業內的通信標准tcp/ip協議,而MQ也是一樣,第一款 MQ 類軟件是由一個在美國印度人 Vivek Ranadive 創辦的一家公司 Teknekron,並實現了世界上第一個消息中間件 The Information Bus(TIB)。
隨着第一款MQ 類軟件TIB的誕生,各大廠商立刻跟進,百花爭鳴,涌現了一批MQ類軟件,比如IBM開發的IBM Wesphere,微軟開發的MSMQ等等,但是正因為標准不統一,這就給我們使用者帶來了很大的不便,每次切換MQ時都需要重復去實現不同的協議和不同API的調用。
2001年,Java語言的老東家Sun公司發布了一個JMS規范,其在各大產商的MQ上進行了統一封裝,使用者如果需要使用不同的MQ只需要選擇不同的驅動就可以了(和我們使用數據庫驅動一個道理)。JMS規范雖然統一了標准,但是JMS規范卻有一個很大的缺陷就是它是和Java語言進行綁定的,所以依然沒有從根本上解決問題。
2004年,AMQP規范出現了,真正做到了跨語言和跨平台,自此MQ迎來了發展的黃金時代。
2007年,Rabbit公司基於AMQP規范開發出了一款消息隊列RabbmitMQ。很快的,RabbitMQ就得到了大家的喜愛,被用戶廣泛使用。
什么是 MQ
MQ即:Message Queue,稱之為消息隊列或者消息中間件。MQ的本質是:使用高效可靠的消息傳遞機制來進行與平台無關的數據傳遞,並基於數據通信來進行分布式系統的集成。也就是說MQ主要是用來解決消息的通信問題,其主要有以下三個特點:
- 1、
MQ是一個獨立運行的服務。通過生產者來發送消息,使用消費者來接收消費。 - 2、內部采用了隊列來進行消息存儲,一般采用的均是先進先出(
FIFO)隊列。 - 3、具有發布訂閱的模型,消費者可以根據需要來獲取自己想要的消息。
為什么需要 MQ
以Java語言為例,JDK本身就提供了許多不同類型的隊列,那么為什么還需要用MQ呢?這是因為:
- 1、跨語言。各大編程語言內部實現的隊列是和語言綁定的,而且是單機的,在分布式環境下無法很好的工作,所以我們需要可以單獨部署不依賴於語言的
MQ。 - 2、異步解耦。消息隊列可以實現異步通信,這樣發送消息方只需要關心消息是否發送成功,而接受消息方只需要關心怎么處理隊列中的消息,實現了消費和生產者的解耦。
- 3、流量削峰。因為消息隊列是先進先出,所以如果把需要消費的消息放進隊列,那么消費者就可以避免被瞬間大流量擊垮,而是可以從容的根據自己的能力從隊列中取出消息進行消費。
RabbitMQ
RabbitMQ 中的 Rabbit 是兔子的意思,就是形容跑的和兔子一樣快。其是一款輕量級的,支持多種消息傳遞協議的高可用的消息隊列。RabbitMQ 是由 Erlang 語言編寫的,而 Erlang 語言就是一款天生適合高並發的語言。
RabbitMQ 的優勢和特性
RabbitMQ 作為一款非常流行的消息中間件,其有着非常豐富的特性和優勢:
- 高可靠性:
RabbitMQ提供了持久化、發送應答、發布確認等功能來保證其可靠性。 - 靈活的路由:通過不同的交換機(Exchange)來實現了消息的靈活路由。
- 集群與擴展性:多個節點可以組成一個邏輯上的服務器,支持負載。
- 高可用性:通過鏡像隊列實現了隊列中數據的復制,保證了在極端情況下部分節點出現
crash整個集群仍然可用。 - 支持多種協議:
RabbitMQ最初是為了支持AMQP協議而開發的,所以AMQP是其核心協議,但是其也支持其他如:STOMP,MOTT,HTTP等協議。 - 支持多客戶端:
RabbitMQ幾乎支持所有常用語言客戶端,如:Java,Python,Ruby,Go等。 - 豐富的插件系統:支持各種豐富的插件擴展,同時也支持自定義插件。比如最常用的
RabbitMQ后台管理系統就是以插件的形式實現的。
AMQP 模型
AMQP 全稱是:Advanced Message Queuing Protocol。RabitMQ 最核心的協議就是基於 AMQP 模型的 AMQP 協議,AMQP 模型目前最新的版本是 1.0 版本,但是目前官方推薦使用者的最佳版本仍是基於 0.9.1 版本的 AMQP 模型,0.9.1 版本在 RabbitMQ官網中也將其稱之為 AMQP 0-9-1模型。
AMQP 0-9-1(高級消息隊列協議)是一種消息傳遞協議,它允許符合標准的客戶端應用程序與符合標准的消息傳遞中間件代理進行通信。消息傳遞代理(Broker)從發布者(Publisher,即發布消息的應用程序,也稱為生產者:Producter)接收消息,並將其路由到使用者(消費者:Consumer,即處理消息的應用程序)。
AMQP 0-9-1 模型的核心思想為:消息被發布到交換處,通常被比作郵局或郵箱。然后,交換機使用稱為綁定的規則將消息副本分發到隊列。然后,代理將消息傳遞給訂閱了隊列的使用者,或者使用者根據需要從隊列中獲取/提取消息。
下圖就是一個 AMQP 模型簡圖,理解了這幅圖,那么就基本理解了 RabbitMQ 的工作模式。

Producer 和 Consumer
Producer 即生產者,一般指的是應用程序客戶端,生產者會產生消息發送給 RabbitMQ,等待消費者進行處理。
Consumer 即消費者,消費者會從特定的隊列中取出消息,進行消費。當消息傳遞給消費者時,消費者會自動通知 Broker,Broker 只有在收到關於該消息的通知時才會從隊列中完全刪除該消息。
Connection:我是一個 TCP 長連接
生產者發送消息和消費者接收消息之前都必須要和 Broker 建立一個 tcp 長連接,才能進行通信。
Channel:我是被虛構出來的
消息隊列的作用之一就是用來做削峰,所以消息隊列在高並發場景可能會有大量的生產者和消費者,那么假如每一個生產者在發送消息時或者每一個消費者在消費消息時都需要不斷的創建和銷毀 tcp 連接,那么這對 Broker 會是一個很大的消耗,為了降低這個 tcp 連接的創建頻率,AMQP 模型引入了 Channel(通道或者信道)。
Channel 是一個虛擬的的連接,可以被認為是“輕量級的連接,其共享同一個 tcp 連接”。在同一個 tcp 長連接里面可以通過創建和銷毀不同的 Channel 來減少了創建和銷毀 tcp 連接的頻率,從而大大減少了資源的消耗。
客戶端(生產者/消費者)執行的每個協議操作都發生在通道上。特定 Channel 上的通信完全獨立於另一個 Channel 上的通信,因此每個協議方法還攜帶一個Channel ID(又稱通道號)。
Channel 只存在於連接的上下文中,不會獨立存在,所以當一個 tcp 連接被關閉時,其中所有 Channel 也都會被關閉。
Channel 是線程不安全的,所以對於使用多個線程/進程進行處理的應用程序,需要為每個線程/進程創建一個 Channel,而不是共享同一個 Channel。
Broker:我只是一個普通的代理商
Broker 直接翻譯成中文就是:中介/代理,所以如果我們使用的是 RabbitMQ,那么這個 Broker 就是指的 RabbitMQ 服務端。
Exchange:我只是做一個消息的映射
Echange 即交換機,因為要實現生產者和消費者的多對多關系,所以只有一個隊列是無法滿足要求的,那么如果有多個隊列,每次我們發送的消息應該存儲到哪里呢?交換機就是起到了中間角色的作用,我們發送消息到交換機上,然后通過交換機發送到對應的隊列,交換機和隊列之間需要提前綁定好對應關系,這樣消息就到了各自指定的隊列內,然后消費者就可以直接從各自負責的隊列內取出消息進行消費。
Queue:我才是真正存儲消息的地方
消息發送到 Broker 之后,通過交換機的映射,存儲到指定的 Queue里面。
VHost:我只是一個命名空間而已
VHost 類似於命名空間,主要作用就是用來隔離數據的,比如我們由很多個業務系統都需要用到 RabbitMQ,如果是一台服務器完全可以滿足要求,那就沒必要安裝多個 RabbitMQ 了,這時候就可以定義不同的 VHost ,不同的 VHost 就可以實現各個業務系統間的數據隔離。
RabbitMQ 的安裝
RabbitMQ 是用 Erlang 語言開發的,所以在安裝 RabbitMQ 之前,需要先安裝 Erlang,RabbitMQ 和 Erlang 之間有版本對應關系,這個需要注意,本文以 Erlang 21.3 和 RabbitMQ3.8.4 為例進行安裝 。
- 安裝
Erlang:
yum -y install gcc glibc-devel make ncurses-devel openssl-devel xmlto perl wget //提前安裝一些依賴,個人電腦依賴不同,可根據實際情況選擇未安裝的依賴進行安裝
wget http://erlang.org/download/otp_src_21.3.tar.gz # 下載(也可以下載好傳到服務器)
tar -xvf otp_src_21.3.tar.gz //解壓
mkdir erlang //在指定目錄,如/usr/local下創建erlang目錄
cd otp_src_21.3 //切換到解壓后的目錄
./configure --prefix=/usr/local/erlang //編譯(路徑根據實際情況選擇)
make && make install //安裝
- 配置
Erlang環境變量:
vim /etc/profile //編輯環境變量文件(CentOS系統默認環境變量文件,其他系統可能不一樣)
export PATH=$PATH:/usr/local/erlang/bin //在末尾加入環境變量配置(路徑根據實際情況選擇)
source /etc/profile //實時生效
- 輸入
erl驗證Erlang是否安裝成功。如果出現如下顯示版本號的界面則說明安裝成功(可以輸入halt().命令進行退出):

- 安裝
RabbitMQ:
wget https://dl.bintray.com/rabbitmq/all/rabbitmq-server/3.8.4/rabbitmq-server-generic-unix-3.8.4.tar.xz //下載RabbitMQ
xz -d rabbitmq-server-generic-unix-3.8.4.tar.xz //解壓
tar -xvf rabbitmq-server-generic-unix-3.8.4.tar //解壓
- 同樣的,這里需要進行環境變量配置:
vim /etc/profile //編輯環境變量文件(CentOS系統默認環境變量文件,其他系統可能不一樣)
export PATH=$PATH:/usr/local/rabbitmq_server-3.8.4/sbin //在末尾加入環境變量配置(路徑根據實際情況選擇)
source /etc/profile //實時生效
- 啟動
RabbitMQ,默認端口為6752:
/usr/local/rabbitmq_server-3.8.4/sbin/./rabbitmq-server -detached //在后台啟動。根據自己實際路徑選擇,或者也可以選擇service或者systemctl等命令啟動
- 如果沒有報錯則說明啟動成功,啟動之后默認會創建一個
guest/guest賬戶,只能本地連接,所以還需要再重新創建一個用戶,並給新用戶授權(當然,我們也可以直接給guest用戶授權):
./rabbitmqctl add_user admin 123456 //創建用戶admin
./rabbitmqctl set_user_tags admin administrator //添加標簽
./rabbitmqctl set_permissions -p / admin ".*" ".*" ".*" //授權
RabbitMQ默認還提供了可視化管理界面,需要手動開啟一下,默認端口為15672:
./rabbitmq-plugins enable rabbitmq_management //啟動后台管理系統插件(禁止的話換成disable即可)
- 開啟插件之后,可以通過訪問:
http://ip:15672/訪問后台管理系統,並進行一些參數設置,賬號密碼就是上面添加的admin/123456。

安裝過程常見錯誤
安裝過程中可能會出現如下圖所示錯誤:

-
odbc:ODBC library - link check failed:
解決方法:執行命令
yum install unixODBC.x86_64 unixODBC-devel.x86_64進行安裝。 -
wx:wxWidgets not found, wx will NOT be usable:
解決方法:這個屬於
APPLOICATION INFORMATION,可以不處理。 -
fakefop to generate placeholder PDF files,documentation: fop is missing.Using fakefop to generate placeholder PDF files:
解決方法:執行命令
yum install fop.noarch進行安裝。
利用 Java API 實現一個生產者和消費者
接下來用 Java 原生的 API 來實現一個簡單的生產者和消費者:
pom.xml文件引入RabbitMQ客戶端依賴:
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.6.0</version>
</dependency>
- 新建一個消費者
TestRabbitConsumer類:
package com.lonelyWolf.rabbitmq;
import com.rabbitmq.client.*;
import java.io.IOException;
public class TestRabbitConsumer {
public static void main(String[] args) throws Exception{
ConnectionFactory factory = new ConnectionFactory();
factory.setUri("amqp://admin:123456@ip:5672");
Connection conn = factory.newConnection();//建立連接
Channel channel = conn.createChannel(); //創建消息通道
channel.queueDeclare("TEST_QUEUE", false, false, false, null);//聲明隊列
System.out.println("正在等待接收消息...");
Consumer consumer = new DefaultConsumer(channel) {//創建消費者
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,byte[] body) throws IOException {
System.out.println("收到消息: " + new String(body, "UTF-8") + ",當前消息ID為:" + properties.getMessageId());
System.out.println("收到自定義屬性:"+ properties.getHeaders().get("name"));
}
};
channel.basicConsume("TEST_QUEUE", true, consumer);//消費之后,回調給consumer
}
}
- 新建一個生產者
TestRabbitProducter類:
package com.lonelyWolf.rabbitmq;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
public class TestRabbitProducter {
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setUri("amqp://admin:123456@ip:5672");
Connection conn = factory.newConnection();// 建立連接
Channel channel = conn.createChannel();//創建消息通道
Map<String, Object> headers = new HashMap<String, Object>(1);
headers.put("name", "雙子孤狼");//可以自定義一些自定義的參數和消息一起發送過去
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
.contentEncoding("UTF-8") //編碼
.headers(headers) //自定義的屬性
.messageId(String.valueOf(UUID.randomUUID()))//消息id
.build();
String msg = "Hello, RabbitMQ";//需要發送的消息
channel.queueDeclare("TEST_QUEUE", false, false, false, null); //聲明隊列
channel.basicPublish("", "TEST_QUEUE", properties, msg.getBytes());//發送消息
channel.close();
conn.close();
}
}
- 先啟動消費者,啟動之后消費者就會保持和
RabbitMQ的連接,等待消息;然后再運行生產者,消息發送之后,消費者就可以收到消息:

利用SpringBoot 實現一個生產者和消費者
接下來再看看 SpringBoot 怎么與 RabbitMQ 集成並實現一個簡單的生產者和消費者:
- 引入依賴(我這邊
SpringBoot用的是2.4.0版本,所以如果用的低版本這個版本號也需要修改):
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>2.4.0</version>
</dependency>
- 新增以下配置文件:
spring:
rabbitmq:
host: ip
port: 5672
username: admin
password: 123456
- 新建一個配置文件
RabbitConfig類,創建一個隊列:
package com.lonely.wolf.rabbit.config;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitConfig {
@Bean("simpleRabbitQueue")
public Queue getFirstQueue(){
Queue queue = new Queue("SIMPLE_QUEUE");
return queue;
}
}
- 新建一個消費者
SimpleConsumer類(注意這里監聽的名字要和上面定義的保持一致):
package com.lonely.wolf.rabbit.consumer;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@RabbitListener(queues = "SIMPLE_QUEUE")
@Component
public class SimpleConsumer {
@RabbitHandler
public void process(String msg){
System.out.println("收到消息:" + msg);
}
}
- 新建一個消息發送者
HelloRabbitController類(發送消息的隊列名要和消費者監聽的隊列名一致,否則無法收到消息),運行之后調用對應接口,消費者類SimpleConsumer就可以收到消息:
package com.lonely.wolf.rabbit.controller;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("/hello")
public class HelloRabbitController {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping(value="/send")
public String clearVipInfo(@RequestParam(value = "msg",defaultValue = "no message") String msg){
rabbitTemplate.convertAndSend("SIMPLE_QUEUE",msg);
return "succ";
}
}
總結
本文主要簡單講述了 MQ 的發展歷史,並介紹了為什么要使用 MQ 及 MQ 能解決什么問題,緊接着重點介紹了 AMQP 0.9.1 模型。掌握了 AMQP 模型就基本掌握了 RabbitMQ 的工作原理,最后我們通過 JAVA API 和 SpringBoot 兩個例子介紹了如何使用 RabbitMQ。
