概念
什么是消息
- 消息是指在兩個獨立的系統間傳遞的數據。這兩個系統可以是兩台計算機,也可以是兩個進程。
- 消息是平台無關和語言無關的!
什么是隊列
- 隊列是一種數據結構,內部是用數組或鏈表實現的,
- 隊列的特點是只能隊尾放入,隊頭取出,即先入先出【FIFO】
- 隊列的操作有入隊和出隊
也就是你有一個程序在產生內容然后入隊(生產者)
另一個程序讀取內容,內容出隊(消費者)
什么是消息隊列
- 簡單的理解就是:在消息的傳輸過程中使用隊列作為保存消息的容器。
隊列是在消息的傳輸過程中的通道,是保存消息的容器,
根據不同的情形,可以有先進先出,優先級隊列等區別 。
為什么要使用消息隊列呢
解耦
消息隊列能夠將業務邏輯解耦,調用方只需要下達命令而不用等待整個邏輯執行完畢!
比如說:注冊的時候需要調用三個服務,這三個服務可以各自獨立放在三個服務器中,執行到哪一步直接發送消息即可實現異步調用。注冊的效率就快多了
調用郵件服務:發送帶有驗證鏈接的注冊郵件,
調用第三方驗證服務:驗證身份證信息真假,
調用用戶的服務:對用戶進行注冊。
同步轉異步
可以把同步的處理變成異步進行處理
將消息寫入消息隊列,非必要的業務邏輯以異步的方式運行,加快響應速度
下完訂單直接返回給用戶結果,只需要耗時50ms,然后再通知MQ做后續的事情。
削峰
在高並發場景下【平滑短時間內大量的服務請求】
分流:將突發大量請求轉換為后端能承受的隊列請求。
什么時候使用消息隊列呢
關注下游執行執行結果,用RPC/REST
不關注下游執行結果,用MQ,不用RPC/REST
對於需要強事務保證而且延遲敏感的,RPC是優於消息隊列的。
比如:
你的服務器一秒能處理100個訂單,但秒殺活動1秒進來1000個訂單,持續10秒,在后端能力無法增加的情況下,
你可以用消息隊列將總共10000個請求壓在隊列里,后台consumer按原有能力處理,100秒后處理完所有請求(而不是直接宕機丟失訂單數據)。
注意
mq關心的是“通知”,而非“處理
簡單的說:MQ只能保證消息按照順序通知給consumer,不能保證consumer處理邏輯,比如:是不是按照順序執行。
假設有三個消息: M1(發短信),M2(發郵件),M3(站內推送)
在隊列中的順序為:M3,M2,M1 MQ能保證消息在消費的時候是按照這個順序,
但是不能保證consumer,必須先發送站內推送,再發郵件,最后發短信,
因為這三個consumer接受到消息執行的業務時間很可能不相同的。
安裝Rabbit MQ
安裝ErLang
Erlang(['ə:læŋ])是一種通用的面向並發的編程語言,它由瑞典電信設備制造商愛立信所轄的CS-Lab開發,目的是創造一種可以應對大規模並發活動的編程語言和運行環境。
rpm --import https://packages.erlang-solutions.com/rpm/erlang_solutions.asc
vi /etc/yum.repos.d/xxx (xxx是目錄中的任意一個已有的yum列表文件)
在文件中增加下述內容:
[erlang-solutions]
name=Centos $releasever - $basearch - Erlang Solutions
baseurl=https://packages.erlang-solutions.com/rpm/centos/$releasever/$basearch
gpgcheck=1
gpgkey=https://packages.erlang-solutions.com/rpm/erlang_solutions.asc
enabled=1
生成yum緩存信息
yum makecache
安裝ErLang
yum -y install erlang
檢查安裝結果,查看ErLang版本
erl -version
安裝Rabbit Mq
-
wget https://www.rabbitmq.com/releases/rabbitmq-server/v3.6.6/rabbitmq-server-3.6.6-1.el6.noarch.rpm --no-check-certificate
-
rpm --import https://www.rabbitmq.com/rabbitmq-release-signing-key.asc
-
yum -y install rabbitmq-server-3.6.6-1.el6.noarch.rpm
報錯可以參考:
安裝Rabbit MQ
啟動 Rabbit MQ
配置為守護進程隨系統自動啟動,root權限下執行:
chkconfig rabbitmq-server on
啟動RabbitMQ服務
service rabbitmq-server start
檢查RabbitMQ服務狀態
service rabbitmq-server status
安裝RabbitMQ的WEB管理界面
rabbitmq-plugins enable rabbitmq_management
設置RabbitMQ用戶及授予權限
創建賬號
rabbitmqctl add_user test 123456
設置用戶角色
rabbitmqctl set_user_tags test administrator
設置用戶權限
rabbitmqctl set_permissions -p "/" test "." "." ".*"
設置完成后可以查看當前用戶和角色(需要開啟服務)
rabbitmqctl list_users
瀏覽器訪問WEB管理界面
http://rabbitmq-server-ip:15672
rabbitmq-server-ip就是RabbitMQ按照所在物理機的IP。
RabbitMQ提供的WEB管理界面端口為15672
RabbitMQ的原理
原理圖
Message
有兩部分: Header和Body。
Header是由Producer添加上的各種屬性的集合,
這些屬性有控制Message是否可被緩存,接收的queue是哪個,優先級是多少等。
Body是真正需要傳送的數據,它是對Broker不可見的二進制數據流,在傳輸過程中不應該受到影響。
(在rabbitMQ中,存儲消息可以是任意的java類型的對象,必須實現序列化(serializable))
Publisher 消息的生產者
也是一個向交換器發布消息的客戶端應用程序
Consumer 消息的消費者
表示一個從消息隊列中取得消息的客戶端應用程序。
Exchange 交換器。
用來接收生產者發送的消息並將這些消息路由給服務器中的隊列。
三種常用的交換器類型
direct(發布與訂閱 完全匹配)
fanout(廣播)
topic(主題,規則匹配)
Routing-key 路由鍵
RabbitMQ決定消息該投遞到哪個隊列的規則。
隊列通過路由鍵綁定到交換器。
消息發送到MQ服務器時,消息將擁有一個路由鍵,即便是空的,RabbitMQ也會將其和綁定使用的路由鍵進行匹配。
如果相匹配,消息將會投遞到該隊列。
如果不匹配,消息將會進入黑洞。
Binding 綁定
用於【消息隊列】和【交換器】之間的關聯。一個綁定就是基於路由鍵將交換器和消息隊列連接起來的路由規則,所以可以將交換器理解成一個由綁定構成的路由表。
Queue 消息隊列。
用來保存消息直到發送給消費者。
它是消息的容器,也是消息的終點。一個消息可投入一個或多個隊列。
消息一直在隊列里面,等待消費者鏈接到這個隊列將其取走。
Connection
指rabbit服務器和服務建立的TCP鏈接。
Channel
信道,是TCP里面的虛擬鏈接。一條TCP連接上可以創建多條信道。
TCP一旦打開,就會創建AMQP信道。無論是發布消息、接收消息、訂閱隊列,這些動作都是通過信道完成的
Virtual Host
表示一組交換器,消息隊列和相關對象。
個vhost本質上就是一個mini版的RabbitMQ服務器,擁有自己的隊列、交換器、綁定和權限機制。
類似一個mysql里面有N個數據庫一樣。
Borker
表示消息隊列服務器實體。就是RabbitMQ整體應用。
交換器和隊列的關系
交換器是通過路由鍵和隊列綁定在一起的,如果消息擁有的路由鍵跟隊列和交換器的路由鍵匹配,那么消息就會被路由到該綁定的隊列中。
也就是說,消息到隊列的過程中,消息首先會經過交換器,接下來交換器在通過路由鍵匹配分發消息到具體的隊列中。
路由鍵可以理解為匹配的規則。
RabbitMQ為什么需要信道?為什么不是TCP直接通信?
TCP的創建和銷毀開銷特別大。
創建需要3次握手,銷毀需要4次分手。
如果不用信道,那應用程序就會以TCP鏈接Rabbit,高峰時每秒成千上萬條鏈接會造成資源巨大的浪費,而且操作系統每秒處理TCP鏈接數也是有限制的,必定造成性能瓶頸。
信道的原理是一條線程一條通道,多條線程多條通道同用一條TCP鏈接。一條TCP鏈接可以容納無限的信道,即使每秒成千上萬的請求也不會成為性能的瓶頸。
大致流程
consumer注冊隊列監聽器到Broker(RabbitMQ)
Consumer首先注冊一個隊列監聽器,來監聽隊列的狀態,當隊列狀態變化時消費消息,
注冊隊列監聽的時候需要提供:
- Exchange(交換器)信息:
交換類型(Dircet直連 ,Topic主題 ,Fanout廣播),交換器名稱,是否自動刪除等 - Queue(隊列)信息,
名稱,是否自動刪除等 - 以及Routing Key(路由鍵)信息。
自定義的一個key值,這個值是連接Exchange和Queue的標識。
producer 發送消息到隊列
producer 發送消息給RabbitMQ,需要在消息頭中指定Exchange(交換器)信息,Routing Key(路由鍵)信息
Broker(RabbitMQ) 匹配
RebbitMQ通過Producer指定的Exchange名稱找到交換器,然后通過指定的Routing key找到對應的隊列,將消息放入隊列中。
隊列狀態發生變化,Consumer就會通過監聽器得到消息並消費。
consumer做一個集群是如何消費消息的
假設我的一個短信發送服務,為了保證短信發送的穩定,做了一個短信發送服務的集群,這個時候MQ的消息是如何被消費的。
Exchange
它的作用:用來接收生產者發送的消息並將這些消息路由給服務器中的隊列。
Exchange是通過Routing Key來匹配對應的Queue的。
我們要知道在RabbitMQ中Exchange的類型以及Queue,還有Routing key都是由consumer端提供的,
producer只是提供Exchange和Routing key,broker根據producer提供的Exchange名字找到對應的交換器,然后再
根據路由鍵去匹配對應的隊列,放入消息到隊列中。
有好幾種類型的Exchange:
Direct類型的Exchange的Routing key就是全匹配。
Topic類型的Exchange的Routing key就是部分匹配或者是模糊匹配。
Fanout類型的Exchange的Routing key就是放棄匹配。
匹配肯定都是限制在同一個Exchange中的,也就是相同的Exchange進行匹配。
消息的可靠性處理
消息持久化
保證消息在MQ中不丟失。
消息丟失的情況
- consumer未啟動,而producer發送了消息,則消息會丟失。
- 當所有的consumer宕機的時候,queue會auto-delete,消息仍舊會丟失
消息確認機制
必要性
consumer收到消息,在消費的過程中程序出現異常或者網絡中斷,如果沒有ack的話,MQ就把消息刪除了,就造成了數據丟失。
過程
RabbitMQ把消息推送給Consumer,RabbitMQ就會把這個消息進行鎖定,在鎖定狀態的消息不會被重復推送也就是二次消費。
其他consumer可以繼續消費下一個消息,當消息的consumer確認消費完成之后發送一個ack給RabbitMQ,RabbitMQ會將這個消息刪除。
如果超過一定時間RabbitMQ沒有收到consumer的ack,就會把這個消息進行解鎖,重新放入隊列頭,保證消息的順序。
內存泄露的可能
如果Consumer沒有處理消息確認,將導致嚴重后果。
假設所有的Consumer都沒有正常反饋確認信息,並退出監聽狀態,那么這些消息則會永久保存,並處於鎖定狀態,直到消息被正常消費為止。
而消息的Producer繼續持續發送消息到RabbitMQ,那么消息將會堆積,持續占用RabbitMQ所在服務器的內存,導致“內存泄漏”問題。
解決方案:
- 配置消息的重試次數。
通過全局配置文件,開啟消息消費重試機制,配置重試次數。
當RabbitMQ未收到Consumer的確認反饋時,會根據配置來決定重試推送消息的次數,當重試次數使用完畢,無論是否收到確認反饋,RabbitMQ都會刪除消息,避免內存泄漏的可能。
在consumer端具體配置如下:
#開啟重試
spring.rabbitmq.listener.retry.enabled=true
#重試次數,默認為3次
spring.rabbitmq.listener.retry.max-attempts=5
- 編碼異常處理
通過編碼處理異常的方式,保證消息確認機制正常執行。
如:catch代碼塊中,將未處理成功的消息,重新發送給MQ。
如:catch代碼中,本地邏輯的重試(使用定時線程池重復執行任務3次。)
如:catch代碼中,將異常消息存儲到DB,然后使用定時任務去清除消息。
重復消費
- 假設RabbitMQ等待ack的超時時間為1s,而consumer消費消息需要2s,那么這個消息就會出現ack等待超時,重新放入隊列,這就出現了重復消費。
- consumer收到消息之后中斷了Connection,消息也會被重新放入隊列中,也會出現重復消費。
- 假設consumer端處理消息的時候出現了系統異常,無法發送確認機制。
【解決方法】
- 測試consumer的執行時長,並合理限定MQ的ack超時時長。
- 為消息添加版本或者時間戳,或者根據業務id進行判重。
如果不強制要求不能出現重復消費,最好還是不要判斷。
RabbitMQ默認是開啟消息確認的,不建議關閉。
Direct 交換器
就是點對點(point to point)實現【發布/訂閱】標准的交換器。這里的交換器就是(Exchange)。
業務場景
producer端的代碼實現
pom依賴
繼承spring-boot-starter-parent
引入rabbitMq:spring-boot-starter-amqp
rabbitMQ的依賴。rabbitmq已經被spring-boot做了整合訪問實現。
spring cloud也對springboot做了整合邏輯。所以rabbitmq的依賴可以在spring cloud中直接使用。
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.bjsxt</groupId>
<artifactId>rabbitmq-direct-producer</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>rabbitmq-direct-producer</name>
<description>Demo project for Spring Boot</description>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.5.13.RELEASE</version>
<relativePath /> <!-- lookup parent from repository -->
</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<!-- rabbitMQ的依賴。rabbitmq已經被spring-boot做了整合訪問實現。
spring cloud也對springboot做了整合邏輯。所以rabbitmq的依賴可以在spring cloud中直接使用。
-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
配置RabbitMQ
spring.application.name=direct-producer
server.port=8082
# 必要配置
# 配置rabbitmq鏈接相關信息。key都是固定的。是springboot要求的。
# rabbitmq安裝位置
spring.rabbitmq.host=192.168.1.122
# rabbitmq的端口
spring.rabbitmq.port=5672
# rabbitmq的用戶名
spring.rabbitmq.username=test
# rabbitmq的用戶密碼
spring.rabbitmq.password=123456
創建消息載體對象
- 對象必須實現序列化接口。
這里把getter和setter方法省略了。
/**
* 消息內容載體,在rabbitmq中,存儲的消息可以是任意的java類型的對象。
* 強制要求,作為消息數據載體的類型,必須是Serializable的。
* 如果消息數據載體類型未實現Serializable,在收發消息的時候,都會有異常發生。
*/
public class LogMessage implements Serializable {
private Long id;
private String msg;
private String logLevel;
private String serviceType;
private Date createTime;
private Long userId;
public LogMessage() {
super();
}
public LogMessage(Long id, String msg, String logLevel, String serviceType, Date createTime, Long userId) {
super();
this.id = id;
this.msg = msg;
this.logLevel = logLevel;
this.serviceType = serviceType;
this.createTime = createTime;
this.userId = userId;
}
@Override
public String toString() {
return "LogMessage [id=" + id + ", msg=" + msg + ", logLevel=" + logLevel + ", serviceType=" + serviceType
+ ", createTime=" + createTime + ", userId=" + userId + "]";
}
}
編寫測試類
使用spring boot提供的【AmqpTemplate】接口RabbitMQ的默認實現R【RabbitTemplate】對象發送消息。
其中convertAndSend方法可以發送消息:
這個方法是將傳入的普通java對象,轉換為rabbitmq中需要的message類型對象,並發送消息到rabbitmq中。
參數一:交換器名稱。 類型是String
參數二:路由鍵。 類型是String
參數三:消息,是要發送的消息內容對象。類型是Object
/**
* Direct交換器
* Producer測試。
* 注意:
* 在rabbitmq中,consumer都是listener監聽模式消費消息的。
* 一般來說,在開發的時候,都是先啟動consumer,確定有什么exchange、queue、routing-key。
* 然后再啟動producer發送消息。
*/
@RunWith(SpringRunner.class)
@SpringBootTest(classes=SpringbootServerApplication.class)
public class QueueTest {
@Autowired
private AmqpTemplate rabbitAmqpTemplate;
/*
* 測試消息隊列
*/
@Test
public void testSendInfo()throws Exception{
Long id = 1L;
while(true){
Thread.sleep(1000);
final LogMessage logMessage = new LogMessage(id, "test log", "info", "訂單服務", new Date(), id);
this.rabbitAmqpTemplate.convertAndSend("log.direct", "log.error.routing.key", logMessage);
id++;
}
}
/*
* 測試消息隊列
*/
@Test
public void testSendError()throws Exception{
Long id = 1L;
while(true){
Thread.sleep(1000);
final LogMessage logMessage = new LogMessage(id, "test log", "info", "訂單服務", new Date(), id);
this.rabbitAmqpTemplate.convertAndSend("log.direct", "log.info.routing.key", logMessage);
id++;
}
}
}
consumer端的實現
pom
和producer端一樣
info級別的日志消費代碼的編寫
@Component
@RabbitListener(
bindings=@QueueBinding(
value=@Queue(value="log.error",autoDelete="false"),
exchange=@Exchange(value="log.direct",type=ExchangeTypes.DIRECT),
key="log.error.routing.key"
)
)
public class ErrorReceiver {
/**
* 消費消息的方法。采用消息隊列監聽機制
* @RabbitHandler - 代表當前方法是監聽隊列狀態的方法,就是隊列狀態發生變化后,執行的消費消息的方法。
* 方法參數。就是處理的消息的數據載體類型。
*/
@RabbitHandler
public void process(LogMessage msg){
System.out.println("Error..........receiver: "+msg);
}
}
@RabbitListener
可以注解類和方法,
注解類:當表當前類的對象是一個rabbit listener。監聽邏輯明確,可以由更好的方法定義規范。 必須配合@RabbitHandler才能實現rabbit消息消費能力。
注解方法:代表當前方法是一個rabbit listener處理邏輯。方便開發,一個類中可以定義若干個listener邏輯。方法定義規范可能不合理。
代表當前類型是一個rabbitmq的監聽器。
bindings:綁定隊列
@QueueBinding
@RabbitListener.bindings屬性的類型。綁定一個隊列。
value:綁定隊列, Queue類型。
exchange:配置交換器, Exchange類型。
key:路由鍵,字符串類型。
@Queue - 隊列。
value:隊列名稱
autoDelete:是否是一個臨時隊列(也就是所有的consumer關閉后是否刪除隊列)
true : 刪除
false:如果queue中有消息未消費,無論是否有consumer,都保存queue。
@Exchange - 交換器
value:為交換器起個名稱
type:指定具體的交換器類型
@RabbitHandler
代表當前方法是監聽隊列狀態的方法,就是隊列狀態發生變化后,執行的消費消息的方法。
Error級別的日志消費代碼編寫
@Component
@RabbitListener(
bindings=@QueueBinding(
value=@Queue(value="log.info",autoDelete="false"),
exchange=@Exchange(value="log.direct",type=ExchangeTypes.DIRECT),
key="log.info.routing.key"
)
)
public class InfoReceiver {
@RabbitHandler
public void process(LogMessage msg){
System.out.println("Info........receiver: "+msg);
}
}
Topic 交換器
場景
現在有用戶服務,訂單服務,商品服務三個服務,每個服務都會有日志,日志都分info,error等級別,可以使用MQ實現日志的收集。
使用Direct交換器,就需要定義至少六個隊列。
如果使用Topic交換器可以簡化consumer端的開發:
實現
- pom的依賴和上面一樣。
- consumer端主要修改了Exchange的類型以及對應的Routing key的規則
consumer端
處理Error日志的消費者
@Component
@RabbitListener(
bindings=@QueueBinding(
value=@Queue(value="${mq.config.queue.error}",autoDelete="true"),
exchange=@Exchange(value="${mq.config.exchange}",type=ExchangeTypes.TOPIC),
key="*.log.error"
)
)
public class ErrorReceiver {
@RabbitHandler
public void process(String msg){
System.out.println("......Error........receiver: "+msg);
}
}
處理Info日志的消費者
@Component
@RabbitListener(
bindings=@QueueBinding(
value=@Queue(value="${mq.config.queue.info}",autoDelete="true"),
exchange=@Exchange(value="${mq.config.exchange}",type=ExchangeTypes.TOPIC),
key="*.log.info"
)
)
public class InfoReceiver {
@RabbitHandler
public void process(String msg){
System.out.println("......Info........receiver: "+msg);
}
}
producer端
商品發送日志信息
@Component
public class ProductSender {
@Autowired
private AmqpTemplate rabbitAmqpTemplate;
/*
* 發送消息的方法
*/
public void send(String msg){
//向消息隊列發送消息
//參數一:交換器名稱。
//參數二:路由鍵
//參數三:消息
this.rabbitAmqpTemplate.convertAndSend("log.topic","product.log.info", "product.log.info....."+msg);
this.rabbitAmqpTemplate.convertAndSend("log.topic","product.log.error", "product.log.error....."+msg);
}
}
用戶發送信息
@Component
public class UserSender {
@Autowired
private AmqpTemplate rabbitAmqpTemplate;
/*
* 發送消息的方法
*/
public void send(String msg){
//向消息隊列發送消息
//參數一:交換器名稱。
//參數二:路由鍵
//參數三:消息
this.rabbitAmqpTemplate.convertAndSend("log.topic","user.log.info", "user.log.info....."+msg);
this.rabbitAmqpTemplate.convertAndSend("log.topic","user.log.error", "user.log.error....."+msg);
}
}
訂單代碼一樣,省略。
Fanout 交換器
這個更簡單,直接在producer和consumer端不需要配置Routing key就行了。