微服務中使用MQ——RabbitMQ


概念

什么是消息

  • 消息是指在兩個獨立的系統間傳遞的數據。這兩個系統可以是兩台計算機,也可以是兩個進程。
  • 消息是平台無關和語言無關的!

什么是隊列

  • 隊列是一種數據結構,內部是用數組或鏈表實現的,
  • 隊列的特點是只能隊尾放入,隊頭取出,即先入先出【FIFO】
  • 隊列的操作有入隊和出隊
    也就是你有一個程序在產生內容然后入隊(生產者)
    另一個程序讀取內容,內容出隊(消費者)

什么是消息隊列

  • 簡單的理解就是:在消息的傳輸過程中使用隊列作為保存消息的容器。
    隊列是在消息的傳輸過程中的通道,是保存消息的容器,
    根據不同的情形,可以有先進先出,優先級隊列等區別 。

為什么要使用消息隊列呢

解耦

消息隊列能夠將業務邏輯解耦,調用方只需要下達命令而不用等待整個邏輯執行完畢!

比如說:注冊的時候需要調用三個服務,這三個服務可以各自獨立放在三個服務器中,執行到哪一步直接發送消息即可實現異步調用。注冊的效率就快多了
調用郵件服務:發送帶有驗證鏈接的注冊郵件,
調用第三方驗證服務:驗證身份證信息真假,
調用用戶的服務:對用戶進行注冊。

同步轉異步

可以把同步的處理變成異步進行處理
將消息寫入消息隊列,非必要的業務邏輯以異步的方式運行,加快響應速度

下完訂單直接返回給用戶結果,只需要耗時50ms,然后再通知MQ做后續的事情。

削峰

在高並發場景下【平滑短時間內大量的服務請求】
分流:將突發大量請求轉換為后端能承受的隊列請求。

什么時候使用消息隊列呢

關注下游執行執行結果,用RPC/REST
不關注下游執行結果,用MQ,不用RPC/REST
對於需要強事務保證而且延遲敏感的,RPC是優於消息隊列的。
比如:
你的服務器一秒能處理100個訂單,但秒殺活動1秒進來1000個訂單,持續10秒,在后端能力無法增加的情況下,
你可以用消息隊列將總共10000個請求壓在隊列里,后台consumer按原有能力處理,100秒后處理完所有請求(而不是直接宕機丟失訂單數據)。

注意

mq關心的是“通知”,而非“處理

簡單的說:MQ只能保證消息按照順序通知給consumer,不能保證consumer處理邏輯,比如:是不是按照順序執行。
假設有三個消息: M1(發短信),M2(發郵件),M3(站內推送)
在隊列中的順序為:M3,M2,M1 MQ能保證消息在消費的時候是按照這個順序,
但是不能保證consumer,必須先發送站內推送,再發郵件,最后發短信,
因為這三個consumer接受到消息執行的業務時間很可能不相同的。

安裝Rabbit MQ

安裝ErLang

Erlang(['ə:læŋ])是一種通用的面向並發的編程語言,它由瑞典電信設備制造商愛立信所轄的CS-Lab開發,目的是創造一種可以應對大規模並發活動的編程語言和運行環境。

rpm --import https://packages.erlang-solutions.com/rpm/erlang_solutions.asc

vi /etc/yum.repos.d/xxx (xxx是目錄中的任意一個已有的yum列表文件)
在文件中增加下述內容:

[erlang-solutions]
name=Centos $releasever - $basearch - Erlang Solutions
baseurl=https://packages.erlang-solutions.com/rpm/centos/$releasever/$basearch
gpgcheck=1
gpgkey=https://packages.erlang-solutions.com/rpm/erlang_solutions.asc
enabled=1

生成yum緩存信息
yum makecache

安裝ErLang
yum -y install erlang

檢查安裝結果,查看ErLang版本
erl -version

安裝Rabbit Mq

報錯可以參考:
安裝Rabbit MQ

啟動 Rabbit MQ

配置為守護進程隨系統自動啟動,root權限下執行:
chkconfig rabbitmq-server on
啟動RabbitMQ服務
service rabbitmq-server start

檢查RabbitMQ服務狀態
service rabbitmq-server status

安裝RabbitMQ的WEB管理界面

rabbitmq-plugins enable rabbitmq_management

設置RabbitMQ用戶及授予權限

創建賬號
rabbitmqctl add_user test 123456
設置用戶角色
rabbitmqctl set_user_tags test administrator
設置用戶權限
rabbitmqctl set_permissions -p "/" test "." "." ".*"
設置完成后可以查看當前用戶和角色(需要開啟服務)
rabbitmqctl list_users

瀏覽器訪問WEB管理界面

http://rabbitmq-server-ip:15672
rabbitmq-server-ip就是RabbitMQ按照所在物理機的IP。
RabbitMQ提供的WEB管理界面端口為15672

RabbitMQ的原理

原理圖

Message

有兩部分: Header和Body。
Header是由Producer添加上的各種屬性的集合,
這些屬性有控制Message是否可被緩存,接收的queue是哪個,優先級是多少等。
Body是真正需要傳送的數據,它是對Broker不可見的二進制數據流,在傳輸過程中不應該受到影響。
(在rabbitMQ中,存儲消息可以是任意的java類型的對象,必須實現序列化(serializable))

Publisher 消息的生產者

也是一個向交換器發布消息的客戶端應用程序

Consumer 消息的消費者

表示一個從消息隊列中取得消息的客戶端應用程序。

Exchange 交換器。

用來接收生產者發送的消息並將這些消息路由給服務器中的隊列。
三種常用的交換器類型
direct(發布與訂閱 完全匹配)
fanout(廣播)
topic(主題,規則匹配)

Routing-key 路由鍵

RabbitMQ決定消息該投遞到哪個隊列的規則。
隊列通過路由鍵綁定到交換器。
消息發送到MQ服務器時,消息將擁有一個路由鍵,即便是空的,RabbitMQ也會將其和綁定使用的路由鍵進行匹配。
如果相匹配,消息將會投遞到該隊列。
如果不匹配,消息將會進入黑洞。

Binding 綁定

用於【消息隊列】和【交換器】之間的關聯。一個綁定就是基於路由鍵將交換器和消息隊列連接起來的路由規則,所以可以將交換器理解成一個由綁定構成的路由表。

Queue 消息隊列。

用來保存消息直到發送給消費者。
它是消息的容器,也是消息的終點。一個消息可投入一個或多個隊列。
消息一直在隊列里面,等待消費者鏈接到這個隊列將其取走。

Connection

指rabbit服務器和服務建立的TCP鏈接。

Channel

信道,是TCP里面的虛擬鏈接。一條TCP連接上可以創建多條信道。
TCP一旦打開,就會創建AMQP信道。無論是發布消息、接收消息、訂閱隊列,這些動作都是通過信道完成的

Virtual Host

表示一組交換器,消息隊列和相關對象。
個vhost本質上就是一個mini版的RabbitMQ服務器,擁有自己的隊列、交換器、綁定和權限機制。
類似一個mysql里面有N個數據庫一樣。

Borker

表示消息隊列服務器實體。就是RabbitMQ整體應用。

交換器和隊列的關系

交換器是通過路由鍵和隊列綁定在一起的,如果消息擁有的路由鍵跟隊列和交換器的路由鍵匹配,那么消息就會被路由到該綁定的隊列中。
也就是說,消息到隊列的過程中,消息首先會經過交換器,接下來交換器在通過路由鍵匹配分發消息到具體的隊列中。
路由鍵可以理解為匹配的規則。

RabbitMQ為什么需要信道?為什么不是TCP直接通信?

TCP的創建和銷毀開銷特別大。
創建需要3次握手,銷毀需要4次分手。
如果不用信道,那應用程序就會以TCP鏈接Rabbit,高峰時每秒成千上萬條鏈接會造成資源巨大的浪費,而且操作系統每秒處理TCP鏈接數也是有限制的,必定造成性能瓶頸。
信道的原理是一條線程一條通道,多條線程多條通道同用一條TCP鏈接。一條TCP鏈接可以容納無限的信道,即使每秒成千上萬的請求也不會成為性能的瓶頸。

大致流程

consumer注冊隊列監聽器到Broker(RabbitMQ)

Consumer首先注冊一個隊列監聽器,來監聽隊列的狀態,當隊列狀態變化時消費消息,
注冊隊列監聽的時候需要提供:

  • Exchange(交換器)信息:
    交換類型(Dircet直連 ,Topic主題 ,Fanout廣播),交換器名稱,是否自動刪除等
  • Queue(隊列)信息,
    名稱,是否自動刪除等
  • 以及Routing Key(路由鍵)信息。
    自定義的一個key值,這個值是連接Exchange和Queue的標識。

producer 發送消息到隊列

producer 發送消息給RabbitMQ,需要在消息頭中指定Exchange(交換器)信息,Routing Key(路由鍵)信息

Broker(RabbitMQ) 匹配

RebbitMQ通過Producer指定的Exchange名稱找到交換器,然后通過指定的Routing key找到對應的隊列,將消息放入隊列中。
隊列狀態發生變化,Consumer就會通過監聽器得到消息並消費。

consumer做一個集群是如何消費消息的

假設我的一個短信發送服務,為了保證短信發送的穩定,做了一個短信發送服務的集群,這個時候MQ的消息是如何被消費的。

Exchange

它的作用:用來接收生產者發送的消息並將這些消息路由給服務器中的隊列。

Exchange是通過Routing Key來匹配對應的Queue的。

我們要知道在RabbitMQ中Exchange的類型以及Queue,還有Routing key都是由consumer端提供的,
producer只是提供Exchange和Routing key,broker根據producer提供的Exchange名字找到對應的交換器,然后再
根據路由鍵去匹配對應的隊列,放入消息到隊列中。

有好幾種類型的Exchange:
Direct類型的Exchange的Routing key就是全匹配。
Topic類型的Exchange的Routing key就是部分匹配或者是模糊匹配。
Fanout類型的Exchange的Routing key就是放棄匹配。
匹配肯定都是限制在同一個Exchange中的,也就是相同的Exchange進行匹配。

消息的可靠性處理

消息持久化

保證消息在MQ中不丟失。

消息丟失的情況

  • consumer未啟動,而producer發送了消息,則消息會丟失。
  • 當所有的consumer宕機的時候,queue會auto-delete,消息仍舊會丟失

消息確認機制

必要性

consumer收到消息,在消費的過程中程序出現異常或者網絡中斷,如果沒有ack的話,MQ就把消息刪除了,就造成了數據丟失。

過程

RabbitMQ把消息推送給Consumer,RabbitMQ就會把這個消息進行鎖定,在鎖定狀態的消息不會被重復推送也就是二次消費。
其他consumer可以繼續消費下一個消息,當消息的consumer確認消費完成之后發送一個ack給RabbitMQ,RabbitMQ會將這個消息刪除。
如果超過一定時間RabbitMQ沒有收到consumer的ack,就會把這個消息進行解鎖,重新放入隊列頭,保證消息的順序。

內存泄露的可能

如果Consumer沒有處理消息確認,將導致嚴重后果。
假設所有的Consumer都沒有正常反饋確認信息,並退出監聽狀態,那么這些消息則會永久保存,並處於鎖定狀態,直到消息被正常消費為止。
而消息的Producer繼續持續發送消息到RabbitMQ,那么消息將會堆積,持續占用RabbitMQ所在服務器的內存,導致“內存泄漏”問題。

解決方案:

  • 配置消息的重試次數。
    通過全局配置文件,開啟消息消費重試機制,配置重試次數。
    當RabbitMQ未收到Consumer的確認反饋時,會根據配置來決定重試推送消息的次數,當重試次數使用完畢,無論是否收到確認反饋,RabbitMQ都會刪除消息,避免內存泄漏的可能。
    在consumer端具體配置如下:
#開啟重試
spring.rabbitmq.listener.retry.enabled=true
#重試次數,默認為3次
spring.rabbitmq.listener.retry.max-attempts=5
  • 編碼異常處理
    通過編碼處理異常的方式,保證消息確認機制正常執行。
    如:catch代碼塊中,將未處理成功的消息,重新發送給MQ。
    如:catch代碼中,本地邏輯的重試(使用定時線程池重復執行任務3次。)
    如:catch代碼中,將異常消息存儲到DB,然后使用定時任務去清除消息。

重復消費

  • 假設RabbitMQ等待ack的超時時間為1s,而consumer消費消息需要2s,那么這個消息就會出現ack等待超時,重新放入隊列,這就出現了重復消費。
  • consumer收到消息之后中斷了Connection,消息也會被重新放入隊列中,也會出現重復消費。
  • 假設consumer端處理消息的時候出現了系統異常,無法發送確認機制。

【解決方法】

  • 測試consumer的執行時長,並合理限定MQ的ack超時時長。
  • 為消息添加版本或者時間戳,或者根據業務id進行判重。
    如果不強制要求不能出現重復消費,最好還是不要判斷。

RabbitMQ默認是開啟消息確認的,不建議關閉。

Direct 交換器

就是點對點(point to point)實現【發布/訂閱】標准的交換器。這里的交換器就是(Exchange)。

業務場景

producer端的代碼實現

pom依賴

繼承spring-boot-starter-parent
引入rabbitMq:spring-boot-starter-amqp
rabbitMQ的依賴。rabbitmq已經被spring-boot做了整合訪問實現。
spring cloud也對springboot做了整合邏輯。所以rabbitmq的依賴可以在spring cloud中直接使用。

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>

	<groupId>com.bjsxt</groupId>
	<artifactId>rabbitmq-direct-producer</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<packaging>jar</packaging>

	<name>rabbitmq-direct-producer</name>
	<description>Demo project for Spring Boot</description>

	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>1.5.13.RELEASE</version>
		<relativePath /> <!-- lookup parent from repository -->
	</parent>

	<properties>
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
		<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
		<java.version>1.8</java.version>
	</properties>

	<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
		</dependency>

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
			<scope>test</scope>
		</dependency>
		
		<!-- rabbitMQ的依賴。rabbitmq已經被spring-boot做了整合訪問實現。
			spring cloud也對springboot做了整合邏輯。所以rabbitmq的依賴可以在spring cloud中直接使用。
		 -->
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-amqp</artifactId>
		</dependency>
	</dependencies>

	<build>
		<plugins>
			<plugin>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-maven-plugin</artifactId>
			</plugin>
		</plugins>
	</build>
</project>

配置RabbitMQ

spring.application.name=direct-producer

server.port=8082

# 必要配置
# 配置rabbitmq鏈接相關信息。key都是固定的。是springboot要求的。
# rabbitmq安裝位置
spring.rabbitmq.host=192.168.1.122
# rabbitmq的端口
spring.rabbitmq.port=5672
# rabbitmq的用戶名
spring.rabbitmq.username=test
# rabbitmq的用戶密碼
spring.rabbitmq.password=123456

創建消息載體對象

  • 對象必須實現序列化接口。
    這里把getter和setter方法省略了。
/**
 * 消息內容載體,在rabbitmq中,存儲的消息可以是任意的java類型的對象。
 * 強制要求,作為消息數據載體的類型,必須是Serializable的。
 * 如果消息數據載體類型未實現Serializable,在收發消息的時候,都會有異常發生。
 */
public class LogMessage implements Serializable {

	private Long id;
	private String msg;
	private String logLevel;
	private String serviceType;
	private Date createTime;
	private Long userId;
	public LogMessage() {
		super();
	}
	public LogMessage(Long id, String msg, String logLevel, String serviceType, Date createTime, Long userId) {
		super();
		this.id = id;
		this.msg = msg;
		this.logLevel = logLevel;
		this.serviceType = serviceType;
		this.createTime = createTime;
		this.userId = userId;
	}
	@Override
	public String toString() {
		return "LogMessage [id=" + id + ", msg=" + msg + ", logLevel=" + logLevel + ", serviceType=" + serviceType
				+ ", createTime=" + createTime + ", userId=" + userId + "]";
	}
}

編寫測試類

使用spring boot提供的【AmqpTemplate】接口RabbitMQ的默認實現R【RabbitTemplate】對象發送消息。
其中convertAndSend方法可以發送消息:
這個方法是將傳入的普通java對象,轉換為rabbitmq中需要的message類型對象,並發送消息到rabbitmq中。
參數一:交換器名稱。 類型是String
參數二:路由鍵。 類型是String
參數三:消息,是要發送的消息內容對象。類型是Object

/**
 * Direct交換器
 * Producer測試。
 * 注意:
 * 在rabbitmq中,consumer都是listener監聽模式消費消息的。
 * 一般來說,在開發的時候,都是先啟動consumer,確定有什么exchange、queue、routing-key。
 * 然后再啟動producer發送消息。
 */
@RunWith(SpringRunner.class)
@SpringBootTest(classes=SpringbootServerApplication.class)
public class QueueTest {

	@Autowired
	private AmqpTemplate rabbitAmqpTemplate;
	/*
	 * 測試消息隊列
	 */
	@Test
	public void testSendInfo()throws Exception{
		Long id = 1L;
		while(true){
			Thread.sleep(1000);
			final LogMessage logMessage = new LogMessage(id, "test log", "info", "訂單服務", new Date(), id);
                           
			this.rabbitAmqpTemplate.convertAndSend("log.direct", "log.error.routing.key", logMessage);
			id++;
		}
	}
	/*
	 * 測試消息隊列
	 */
	@Test
	public void testSendError()throws Exception{
		Long id = 1L;
		while(true){
			Thread.sleep(1000);
			final LogMessage logMessage = new LogMessage(id, "test log", "info", "訂單服務", new Date(), id);
			this.rabbitAmqpTemplate.convertAndSend("log.direct", "log.info.routing.key", logMessage);
			id++;
		}
	}
}

consumer端的實現

pom

和producer端一樣

info級別的日志消費代碼的編寫

@Component
@RabbitListener(
			bindings=@QueueBinding(
					value=@Queue(value="log.error",autoDelete="false"),
					exchange=@Exchange(value="log.direct",type=ExchangeTypes.DIRECT),
					key="log.error.routing.key"
			)
		)
public class ErrorReceiver {

	/**
	 * 消費消息的方法。采用消息隊列監聽機制
	 * @RabbitHandler - 代表當前方法是監聽隊列狀態的方法,就是隊列狀態發生變化后,執行的消費消息的方法。
	 * 方法參數。就是處理的消息的數據載體類型。
	 */
	@RabbitHandler
	public void process(LogMessage msg){
		System.out.println("Error..........receiver: "+msg);
	}
}

@RabbitListener

 可以注解類和方法,
    注解類:當表當前類的對象是一個rabbit listener。監聽邏輯明確,可以由更好的方法定義規范。 必須配合@RabbitHandler才能實現rabbit消息消費能力。
    注解方法:代表當前方法是一個rabbit listener處理邏輯。方便開發,一個類中可以定義若干個listener邏輯。方法定義規范可能不合理。
    代表當前類型是一個rabbitmq的監聽器。
     bindings:綁定隊列

@QueueBinding

    @RabbitListener.bindings屬性的類型。綁定一個隊列。
     value:綁定隊列, Queue類型。
     exchange:配置交換器, Exchange類型。
     key:路由鍵,字符串類型。

@Queue - 隊列。

    value:隊列名稱
     autoDelete:是否是一個臨時隊列(也就是所有的consumer關閉后是否刪除隊列)
         true : 刪除
         false:如果queue中有消息未消費,無論是否有consumer,都保存queue。

@Exchange - 交換器

value:為交換器起個名稱
    type:指定具體的交換器類型

@RabbitHandler

  代表當前方法是監聽隊列狀態的方法,就是隊列狀態發生變化后,執行的消費消息的方法。

Error級別的日志消費代碼編寫

@Component
@RabbitListener(
			bindings=@QueueBinding(
					value=@Queue(value="log.info",autoDelete="false"),
					exchange=@Exchange(value="log.direct",type=ExchangeTypes.DIRECT),
					key="log.info.routing.key"
			)
		)
public class InfoReceiver {

	@RabbitHandler
	public void process(LogMessage msg){
		System.out.println("Info........receiver: "+msg);
	}
}

Topic 交換器

場景

現在有用戶服務,訂單服務,商品服務三個服務,每個服務都會有日志,日志都分info,error等級別,可以使用MQ實現日志的收集。
使用Direct交換器,就需要定義至少六個隊列。

如果使用Topic交換器可以簡化consumer端的開發:

實現

  • pom的依賴和上面一樣。
  • consumer端主要修改了Exchange的類型以及對應的Routing key的規則

consumer端

處理Error日志的消費者

@Component
@RabbitListener(
			bindings=@QueueBinding(
					value=@Queue(value="${mq.config.queue.error}",autoDelete="true"),
					exchange=@Exchange(value="${mq.config.exchange}",type=ExchangeTypes.TOPIC),
					key="*.log.error"
			)
		)
public class ErrorReceiver {

	@RabbitHandler
	public void process(String msg){
		System.out.println("......Error........receiver: "+msg);
	}
}

處理Info日志的消費者

@Component
@RabbitListener(
			bindings=@QueueBinding(
					value=@Queue(value="${mq.config.queue.info}",autoDelete="true"),
					exchange=@Exchange(value="${mq.config.exchange}",type=ExchangeTypes.TOPIC),
					key="*.log.info"
			)
		)
public class InfoReceiver {
	@RabbitHandler
	public void process(String msg){
		System.out.println("......Info........receiver: "+msg);
	}
}

producer端

商品發送日志信息

@Component
public class ProductSender {

	@Autowired
	private AmqpTemplate rabbitAmqpTemplate;
        /*
	 * 發送消息的方法
	 */
	public void send(String msg){
		//向消息隊列發送消息
		//參數一:交換器名稱。
		//參數二:路由鍵
		//參數三:消息
		this.rabbitAmqpTemplate.convertAndSend("log.topic","product.log.info", "product.log.info....."+msg);
		this.rabbitAmqpTemplate.convertAndSend("log.topic","product.log.error", "product.log.error....."+msg);
	}
}

用戶發送信息

@Component
public class UserSender {

	@Autowired
	private AmqpTemplate rabbitAmqpTemplate;
	
	/*
	 * 發送消息的方法
	 */
	public void send(String msg){
		//向消息隊列發送消息
		//參數一:交換器名稱。
		//參數二:路由鍵
		//參數三:消息
		this.rabbitAmqpTemplate.convertAndSend("log.topic","user.log.info", "user.log.info....."+msg);
		this.rabbitAmqpTemplate.convertAndSend("log.topic","user.log.error", "user.log.error....."+msg);
	}
}

訂單代碼一樣,省略。

Fanout 交換器

這個更簡單,直接在producer和consumer端不需要配置Routing key就行了。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM