前言
本文講述的只是主要是 RabbitMQ
的入門知識,學習本文主要可以掌握以下知識點:
- MQ 的發展史
- AMQP 協議
- Rabbit MQ 的安裝
- Rabbit MQ 在 Java API 中的使用
- RabbitMQ 與 SpringBoot 的集成
MQ 的誕生歷史
大部分技術的剛產生時適用范圍都是特定的。比如互聯網的產生,剛開始出現的通信協議各個產商之間是無法兼容的,隨着歷史的發展,產生了業內的通信標准tcp/ip
協議,而MQ
也是一樣,第一款 MQ
類軟件是由一個在美國印度人 Vivek Ranadive
創辦的一家公司 Teknekron
,並實現了世界上第一個消息中間件 The Information Bus(TIB)
。
隨着第一款MQ
類軟件TIB
的誕生,各大廠商立刻跟進,百花爭鳴,涌現了一批MQ
類軟件,比如IBM
開發的IBM Wesphere
,微軟開發的MSMQ
等等,但是正因為標准不統一,這就給我們使用者帶來了很大的不便,每次切換MQ
時都需要重復去實現不同的協議和不同API的調用。
2001
年,Java
語言的老東家Sun
公司發布了一個JMS
規范,其在各大產商的MQ
上進行了統一封裝,使用者如果需要使用不同的MQ
只需要選擇不同的驅動就可以了(和我們使用數據庫驅動一個道理)。JMS
規范雖然統一了標准,但是JMS
規范卻有一個很大的缺陷就是它是和Java
語言進行綁定的,所以依然沒有從根本上解決問題。
2004
年,AMQP
規范出現了,真正做到了跨語言和跨平台,自此MQ
迎來了發展的黃金時代。
2007
年,Rabbit
公司基於AMQP
規范開發出了一款消息隊列RabbmitMQ
。很快的,RabbitMQ
就得到了大家的喜愛,被用戶廣泛使用。
什么是 MQ
MQ
即:Message Queue
,稱之為消息隊列或者消息中間件。MQ
的本質是:使用高效可靠的消息傳遞機制來進行與平台無關的數據傳遞,並基於數據通信來進行分布式系統的集成。也就是說MQ
主要是用來解決消息的通信問題,其主要有以下三個特點:
- 1、
MQ
是一個獨立運行的服務。通過生產者來發送消息,使用消費者來接收消費。 - 2、內部采用了隊列來進行消息存儲,一般采用的均是先進先出(
FIFO
)隊列。 - 3、具有發布訂閱的模型,消費者可以根據需要來獲取自己想要的消息。
為什么需要 MQ
以Java
語言為例,JDK
本身就提供了許多不同類型的隊列,那么為什么還需要用MQ
呢?這是因為:
- 1、跨語言。各大編程語言內部實現的隊列是和語言綁定的,而且是單機的,在分布式環境下無法很好的工作,所以我們需要可以單獨部署不依賴於語言的
MQ
。 - 2、異步解耦。消息隊列可以實現異步通信,這樣發送消息方只需要關心消息是否發送成功,而接受消息方只需要關心怎么處理隊列中的消息,實現了消費和生產者的解耦。
- 3、流量削峰。因為消息隊列是先進先出,所以如果把需要消費的消息放進隊列,那么消費者就可以避免被瞬間大流量擊垮,而是可以從容的根據自己的能力從隊列中取出消息進行消費。
RabbitMQ
RabbitMQ
中的 Rabbit
是兔子的意思,就是形容跑的和兔子一樣快。其是一款輕量級的,支持多種消息傳遞協議的高可用的消息隊列。RabbitMQ
是由 Erlang
語言編寫的,而 Erlang
語言就是一款天生適合高並發的語言。
RabbitMQ 的優勢和特性
RabbitMQ
作為一款非常流行的消息中間件,其有着非常豐富的特性和優勢:
- 高可靠性:
RabbitMQ
提供了持久化、發送應答、發布確認等功能來保證其可靠性。 - 靈活的路由:通過不同的交換機(Exchange)來實現了消息的靈活路由。
- 集群與擴展性:多個節點可以組成一個邏輯上的服務器,支持負載。
- 高可用性:通過鏡像隊列實現了隊列中數據的復制,保證了在極端情況下部分節點出現
crash
整個集群仍然可用。 - 支持多種協議:
RabbitMQ
最初是為了支持AMQP
協議而開發的,所以AMQP
是其核心協議,但是其也支持其他如:STOMP
,MOTT
,HTTP
等協議。 - 支持多客戶端:
RabbitMQ
幾乎支持所有常用語言客戶端,如:Java
,Python
,Ruby
,Go
等。 - 豐富的插件系統:支持各種豐富的插件擴展,同時也支持自定義插件。比如最常用的
RabbitMQ
后台管理系統就是以插件的形式實現的。
AMQP 模型
AMQP 全稱是:Advanced Message Queuing Protocol。RabitMQ
最核心的協議就是基於 AMQP
模型的 AMQP
協議,AMQP
模型目前最新的版本是 1.0
版本,但是目前官方推薦使用者的最佳版本仍是基於 0.9.1
版本的 AMQP
模型,0.9.1
版本在 RabbitMQ
官網中也將其稱之為 AMQP 0-9-1
模型。
AMQP 0-9-1
(高級消息隊列協議)是一種消息傳遞協議,它允許符合標准的客戶端應用程序與符合標准的消息傳遞中間件代理進行通信。消息傳遞代理(Broker)從發布者(Publisher,即發布消息的應用程序,也稱為生產者:Producter)接收消息,並將其路由到使用者(消費者:Consumer,即處理消息的應用程序)。
AMQP 0-9-1
模型的核心思想為:消息被發布到交換處,通常被比作郵局或郵箱。然后,交換機使用稱為綁定的規則將消息副本分發到隊列。然后,代理將消息傳遞給訂閱了隊列的使用者,或者使用者根據需要從隊列中獲取/提取消息。
下圖就是一個 AMQP
模型簡圖,理解了這幅圖,那么就基本理解了 RabbitMQ
的工作模式。
Producer 和 Consumer
Producer
即生產者,一般指的是應用程序客戶端,生產者會產生消息發送給 RabbitMQ
,等待消費者進行處理。
Consumer
即消費者,消費者會從特定的隊列中取出消息,進行消費。當消息傳遞給消費者時,消費者會自動通知 Broker
,Broker
只有在收到關於該消息的通知時才會從隊列中完全刪除該消息。
Connection:我是一個 TCP 長連接
生產者發送消息和消費者接收消息之前都必須要和 Broker
建立一個 tcp
長連接,才能進行通信。
Channel:我是被虛構出來的
消息隊列的作用之一就是用來做削峰,所以消息隊列在高並發場景可能會有大量的生產者和消費者,那么假如每一個生產者在發送消息時或者每一個消費者在消費消息時都需要不斷的創建和銷毀 tcp
連接,那么這對 Broker
會是一個很大的消耗,為了降低這個 tcp
連接的創建頻率,AMQP
模型引入了 Channel
(通道或者信道)。
Channel
是一個虛擬的的連接,可以被認為是“輕量級的連接,其共享同一個 tcp
連接”。在同一個 tcp
長連接里面可以通過創建和銷毀不同的 Channel
來減少了創建和銷毀 tcp
連接的頻率,從而大大減少了資源的消耗。
客戶端(生產者/消費者)執行的每個協議操作都發生在通道上。特定 Channel
上的通信完全獨立於另一個 Channel
上的通信,因此每個協議方法還攜帶一個Channel ID
(又稱通道號)。
Channel
只存在於連接的上下文中,不會獨立存在,所以當一個 tcp
連接被關閉時,其中所有 Channel
也都會被關閉。
Channel
是線程不安全的,所以對於使用多個線程/進程進行處理的應用程序,需要為每個線程/進程創建一個 Channel
,而不是共享同一個 Channel
。
Broker:我只是一個普通的代理商
Broker
直接翻譯成中文就是:中介/代理,所以如果我們使用的是 RabbitMQ
,那么這個 Broker
就是指的 RabbitMQ
服務端。
Exchange:我只是做一個消息的映射
Echange
即交換機,因為要實現生產者和消費者的多對多關系,所以只有一個隊列是無法滿足要求的,那么如果有多個隊列,每次我們發送的消息應該存儲到哪里呢?交換機就是起到了中間角色的作用,我們發送消息到交換機上,然后通過交換機發送到對應的隊列,交換機和隊列之間需要提前綁定好對應關系,這樣消息就到了各自指定的隊列內,然后消費者就可以直接從各自負責的隊列內取出消息進行消費。
Queue:我才是真正存儲消息的地方
消息發送到 Broker
之后,通過交換機的映射,存儲到指定的 Queue
里面。
VHost:我只是一個命名空間而已
VHost
類似於命名空間,主要作用就是用來隔離數據的,比如我們由很多個業務系統都需要用到 RabbitMQ
,如果是一台服務器完全可以滿足要求,那就沒必要安裝多個 RabbitMQ
了,這時候就可以定義不同的 VHost
,不同的 VHost
就可以實現各個業務系統間的數據隔離。
RabbitMQ 的安裝
RabbitMQ
是用 Erlang
語言開發的,所以在安裝 RabbitMQ
之前,需要先安裝 Erlang
,RabbitMQ
和 Erlang
之間有版本對應關系,這個需要注意,本文以 Erlang 21.3
和 RabbitMQ3.8.4
為例進行安裝 。
- 安裝
Erlang
:
yum -y install gcc glibc-devel make ncurses-devel openssl-devel xmlto perl wget //提前安裝一些依賴,個人電腦依賴不同,可根據實際情況選擇未安裝的依賴進行安裝
wget http://erlang.org/download/otp_src_21.3.tar.gz # 下載(也可以下載好傳到服務器)
tar -xvf otp_src_21.3.tar.gz //解壓
mkdir erlang //在指定目錄,如/usr/local下創建erlang目錄
cd otp_src_21.3 //切換到解壓后的目錄
./configure --prefix=/usr/local/erlang //編譯(路徑根據實際情況選擇)
make && make install //安裝
- 配置
Erlang
環境變量:
vim /etc/profile //編輯環境變量文件(CentOS系統默認環境變量文件,其他系統可能不一樣)
export PATH=$PATH:/usr/local/erlang/bin //在末尾加入環境變量配置(路徑根據實際情況選擇)
source /etc/profile //實時生效
- 輸入
erl
驗證Erlang
是否安裝成功。如果出現如下顯示版本號的界面則說明安裝成功(可以輸入halt().
命令進行退出):
- 安裝
RabbitMQ
:
wget https://dl.bintray.com/rabbitmq/all/rabbitmq-server/3.8.4/rabbitmq-server-generic-unix-3.8.4.tar.xz //下載RabbitMQ
xz -d rabbitmq-server-generic-unix-3.8.4.tar.xz //解壓
tar -xvf rabbitmq-server-generic-unix-3.8.4.tar //解壓
- 同樣的,這里需要進行環境變量配置:
vim /etc/profile //編輯環境變量文件(CentOS系統默認環境變量文件,其他系統可能不一樣)
export PATH=$PATH:/usr/local/rabbitmq_server-3.8.4/sbin //在末尾加入環境變量配置(路徑根據實際情況選擇)
source /etc/profile //實時生效
- 啟動
RabbitMQ
,默認端口為6752
:
/usr/local/rabbitmq_server-3.8.4/sbin/./rabbitmq-server -detached //在后台啟動。根據自己實際路徑選擇,或者也可以選擇service或者systemctl等命令啟動
- 如果沒有報錯則說明啟動成功,啟動之后默認會創建一個
guest/guest
賬戶,只能本地連接,所以還需要再重新創建一個用戶,並給新用戶授權(當然,我們也可以直接給guest
用戶授權):
./rabbitmqctl add_user admin 123456 //創建用戶admin
./rabbitmqctl set_user_tags admin administrator //添加標簽
./rabbitmqctl set_permissions -p / admin ".*" ".*" ".*" //授權
RabbitMQ
默認還提供了可視化管理界面,需要手動開啟一下,默認端口為15672
:
./rabbitmq-plugins enable rabbitmq_management //啟動后台管理系統插件(禁止的話換成disable即可)
- 開啟插件之后,可以通過訪問:
http://ip:15672/
訪問后台管理系統,並進行一些參數設置,賬號密碼就是上面添加的admin/123456
。
安裝過程常見錯誤
安裝過程中可能會出現如下圖所示錯誤:
-
odbc:ODBC library - link check failed:
解決方法:執行命令
yum install unixODBC.x86_64 unixODBC-devel.x86_64
進行安裝。 -
wx:wxWidgets not found, wx will NOT be usable:
解決方法:這個屬於
APPLOICATION INFORMATION
,可以不處理。 -
fakefop to generate placeholder PDF files,documentation: fop is missing.Using fakefop to generate placeholder PDF files:
解決方法:執行命令
yum install fop.noarch
進行安裝。
利用 Java API 實現一個生產者和消費者
接下來用 Java
原生的 API
來實現一個簡單的生產者和消費者:
pom.xml
文件引入RabbitMQ
客戶端依賴:
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.6.0</version>
</dependency>
- 新建一個消費者
TestRabbitConsumer
類:
package com.lonelyWolf.rabbitmq;
import com.rabbitmq.client.*;
import java.io.IOException;
public class TestRabbitConsumer {
public static void main(String[] args) throws Exception{
ConnectionFactory factory = new ConnectionFactory();
factory.setUri("amqp://admin:123456@ip:5672");
Connection conn = factory.newConnection();//建立連接
Channel channel = conn.createChannel(); //創建消息通道
channel.queueDeclare("TEST_QUEUE", false, false, false, null);//聲明隊列
System.out.println("正在等待接收消息...");
Consumer consumer = new DefaultConsumer(channel) {//創建消費者
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,byte[] body) throws IOException {
System.out.println("收到消息: " + new String(body, "UTF-8") + ",當前消息ID為:" + properties.getMessageId());
System.out.println("收到自定義屬性:"+ properties.getHeaders().get("name"));
}
};
channel.basicConsume("TEST_QUEUE", true, consumer);//消費之后,回調給consumer
}
}
- 新建一個生產者
TestRabbitProducter
類:
package com.lonelyWolf.rabbitmq;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
public class TestRabbitProducter {
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setUri("amqp://admin:123456@ip:5672");
Connection conn = factory.newConnection();// 建立連接
Channel channel = conn.createChannel();//創建消息通道
Map<String, Object> headers = new HashMap<String, Object>(1);
headers.put("name", "雙子孤狼");//可以自定義一些自定義的參數和消息一起發送過去
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
.contentEncoding("UTF-8") //編碼
.headers(headers) //自定義的屬性
.messageId(String.valueOf(UUID.randomUUID()))//消息id
.build();
String msg = "Hello, RabbitMQ";//需要發送的消息
channel.queueDeclare("TEST_QUEUE", false, false, false, null); //聲明隊列
channel.basicPublish("", "TEST_QUEUE", properties, msg.getBytes());//發送消息
channel.close();
conn.close();
}
}
- 先啟動消費者,啟動之后消費者就會保持和
RabbitMQ
的連接,等待消息;然后再運行生產者,消息發送之后,消費者就可以收到消息:
利用SpringBoot 實現一個生產者和消費者
接下來再看看 SpringBoot
怎么與 RabbitMQ
集成並實現一個簡單的生產者和消費者:
- 引入依賴(我這邊
SpringBoot
用的是2.4.0
版本,所以如果用的低版本這個版本號也需要修改):
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>2.4.0</version>
</dependency>
- 新增以下配置文件:
spring:
rabbitmq:
host: ip
port: 5672
username: admin
password: 123456
- 新建一個配置文件
RabbitConfig
類,創建一個隊列:
package com.lonely.wolf.rabbit.config;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitConfig {
@Bean("simpleRabbitQueue")
public Queue getFirstQueue(){
Queue queue = new Queue("SIMPLE_QUEUE");
return queue;
}
}
- 新建一個消費者
SimpleConsumer
類(注意這里監聽的名字要和上面定義的保持一致):
package com.lonely.wolf.rabbit.consumer;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@RabbitListener(queues = "SIMPLE_QUEUE")
@Component
public class SimpleConsumer {
@RabbitHandler
public void process(String msg){
System.out.println("收到消息:" + msg);
}
}
- 新建一個消息發送者
HelloRabbitController
類(發送消息的隊列名要和消費者監聽的隊列名一致,否則無法收到消息),運行之后調用對應接口,消費者類SimpleConsumer
就可以收到消息:
package com.lonely.wolf.rabbit.controller;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("/hello")
public class HelloRabbitController {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping(value="/send")
public String clearVipInfo(@RequestParam(value = "msg",defaultValue = "no message") String msg){
rabbitTemplate.convertAndSend("SIMPLE_QUEUE",msg);
return "succ";
}
}
總結
本文主要簡單講述了 MQ
的發展歷史,並介紹了為什么要使用 MQ
及 MQ
能解決什么問題,緊接着重點介紹了 AMQP 0.9.1
模型。掌握了 AMQP
模型就基本掌握了 RabbitMQ
的工作原理,最后我們通過 JAVA API
和 SpringBoot
兩個例子介紹了如何使用 RabbitMQ
。