前言
這一章節我們將講解高並發解決方案中的隊列。消息隊列已經逐漸成為企業IT系統內部通信的核心手段。它具有低耦合、可靠投遞、廣播、流量控制、最終一致性等一系列功能,成為異步RPC的主要手段之一。
主體概要
- 高並發の消息隊列基本介紹
- 消息隊列特性
主體內容
一、高並發の消息隊列基本介紹
1.例子
在購物商城下單后,希望購買者能收到短信或者郵件通知。有一種做法時在下單邏輯執行后調用短信發送的API,如果此時服務器響應較慢、短信客戶端出現問題等諸多原因購買者不能正常收到短信,那么此時是不斷重試呢還是直接放棄發送呢?不管選擇哪一種,在實現上都會變得復雜。
消息隊列是如何解決的呢?可以講發送短信這個過程封裝成一條消息,發送到消息隊列,消息隊列按照一定順序依次處理隊列中的消息,在某一個時刻就會處理剛才收到的發送短信的消息。消息隊列會通知一個服務去發送這個短信,順利的話這個消息剛被放進隊列就會被處理,這種情況一次性就發送成功了。如果出現了什么問題,可以再次將該消息放進消息隊列中等待處理。上面的例子中如果使用消息隊列,其好處是將發送短信這個流程與其他功能解耦,發送短信時只需要保證將這條消息發送到消息隊列就行了,然后就可以處理發送短信后的其他事情了;其次,系統設計變得簡單,不用在下單的場景下過多的考慮發送短信的問題,而是交給了消息隊列來處理這個事。而且可以保證消息一定會被發送出去,消息只要沒有發送成功會不斷被重新加入到消息隊列。如果短信服務出現問題,那么等到服務恢復了,消息隊列再發送出去即可,只是發送的不那么及時而已。
最后一點,我們假設在發送短信完成之后還要發送郵件。有了消息隊列,我們就不要做同步等待了,我們可以直接並行處理,直接下單的核心流程可以更快的結束這樣就可以增加應用的異步處理能力,減少甚至不可能出現並發現象。回顧一下我們平時在網站上輸入手機號發送驗證碼的時候,半天都收不到短信,算着短信接口時間已經超時了,其實這時后台極有可能通過消息隊列的方式發送短信,而正好碰到短信發送出了點問題,或者服務器網絡開了小差,也有可能某段時間內消息隊列里消息太多了需要處理。
2.好處
1.成功完成了一個異步解耦的過程。短信發送時只要保證放到消息隊列中就可以了,接着做后面的事情就行。一個事務只關心本質的流程,需要依賴其他事情但是不那么重要的時候,有通知即可,無需等待結果。每個成員不必受其他成員影響,可以更獨立自主,只通過一個簡單的容器來聯系。
對於我們的訂單系統,訂單最終支付成功之后可能需要給用戶發送短信積分什么的,但其實這已經不是我們系統的核心流程了。如果外部系統速度偏慢(比如短信網關速度不好),那么主流程的時間會加長很多,用戶肯定不希望點擊支付過好幾分鍾才看到結果。那么我們只需要通知短信系統“我們支付成功了”,不一定非要等待它處理完成。
3.應用場景
可以使用消息隊列的場景非常多
主要特點是異步處理,主要目的是減少請求響應時間和解耦。所以主要的使用場景就是將比較耗時而且不需要即時(同步)返回結果的操作作為消息放入消息隊列。
使用場景的話,舉個例子:
假設用戶在你的軟件中注冊,服務端收到用戶的注冊請求后,它會做這些操作:
- 校驗用戶名等信息,如果沒問題會在數據庫中添加一個用戶記錄
- 如果是用郵箱注冊會給你發送一封注冊成功的郵件,手機注冊則會發送一條短信
- 分析用戶的個人信息,以便將來向他推薦一些志同道合的人,或向那些人推薦他
- 發送給用戶一個包含操作指南的系統通知
等等……
但是對於用戶來說,注冊功能實際只需要第一步,只要服務端將他的賬戶信息存到數據庫中他便可以登錄上去做他想做的事情了。至於其他的事情,非要在這一次請求中全部完成么?值得用戶浪費時間等你處理這些對他來說無關緊要的事情么?所以實際當第一步做完后,服務端就可以把其他的操作放入對應的消息隊列中然后馬上返回用戶結果,由消息隊列異步的進行這些操作。
或者還有一種情況,同時有大量用戶注冊你的軟件,再高並發情況下注冊請求開始出現一些問題,例如郵件接口承受不住,或是分析信息時的大量計算使cpu滿載,這將會出現雖然用戶數據記錄很快的添加到數據庫中了,但是卻卡在發郵件或分析信息時的情況,導致請求的響應時間大幅增長,甚至出現超時,這就有點不划算了。面對這種情況一般也是將這些操作放入消息隊列(生產者消費者模型),消息隊列慢慢的進行處理,同時可以很快的完成注冊請求,不會影響用戶使用其他功能。
二、消息隊列特性
1.四大特性
- 業務無關:只做消息分發
- FIFO(First In First Out):先投遞先到達
- 容災:節點的動態增刪和消息的持久化
- 性能:吞吐量提升,系統內部通信效率提高
2.為什么需要消息隊列?
- 【生產】和【消費】的速度或者穩定性不一致。
3.消息隊列的好處
-
業務解耦:它是消息隊列解決的最本質的問題,所謂解耦就是一個事物之關心核心的流程,而需要依賴其他系統,但不那么重要的事情,有通知即可,無需等待結果,換句話說就是基於消息的模型關心的是通知而不是處理。比如一個l旅游平台內部有一個產品中心,產品中心對接的是主站,移動后台,旅游供應鏈等多個數據源,下游對接的是推薦系統,API系統等展示系統,當上游的數據發生變更的時候,如果不使用消息隊列,勢必不停的需要調用接口來更新數據,這就特別依賴產品中心接口的穩定性和處理能力,但是其實作為旅游的產品中心,也許只有對於旅游自建的供應鏈產品中心更新成功,才是他們關心的事情,而對於團購等外部系統,產品中心更新的成功也好,失敗也罷並不是他們的職責所在,他們只需要保證在信息變更的時候通知一下就可以了;而對於下游,可能有更新索引,更新緩存等一系列需求,對於產品中心而言,這些也不是職責所在。說白了,如果他們定時的拉取數據,也能保證數據的更新,只是實時性沒有那么強,但是如果使用接口方式去更新數據,顯然對於產品中心太過於重量級了,這時只需要發布一個產品ID變更的通知,由下游系統進行處理就更合理了。我們再舉一個例子:對於訂單系統,訂單最后支付成功之后,我們可能要給用戶發送一個短信通知,但其實這已經不是系統的核心流程了,如果外部系統速度偏慢,比如短信網關速度不好,那么主流程的時間就會加長很多,用戶肯定不希望點擊好幾分鍾之后才看到結果,那么我們只需要通知短信系統我們支付成功了,你去發個短信通知就好了,並不一定要等待它處理完成才結束。
-
最終一致性:最終一致性指的是兩個系統的狀態保持一致,要么都成功,要么都失敗。當然有個時間限制,理論上越快越好,但實際上在各種異常的情況下,可能會有一定延遲達到最終一致狀態,但最后兩個系統的狀態是一樣的。
業界有一些為“最終一致性”而生的消息隊列,如Notify(阿里)、QMQ(去哪兒)等,其設計初衷,就是為了交易系統中的高可靠通知。
以一個銀行的轉賬過程來理解最終一致性,轉賬的需求很簡單,如果A系統扣錢成功,則B系統加錢一定成功。反之則一起回滾,像什么都沒發生一樣。
然而,這個過程中存在很多可能的意外:(1)A扣錢成功,調用B加錢接口失敗。
(2)A扣錢成功,調用B加錢接口雖然成功,但獲取最終結果時網絡異常引起超時。
(3)A扣錢成功,B加錢失敗,A想回滾扣的錢,但A機器down機。
可見,想把這件看似簡單的事真正做成,真的不那么容易。所有跨JVM的一致性問題,從技術的角度講通用的解決方案是:
(1)強一致性,分布式事務,但落地太難且成本太高,這里不再具體介紹,想要了解百度一下。
(2)最終一致性,主要是用“記錄”和“補償”的方式。在做所有的不確定的事情之前,先把事情記錄下來,然后去做不確定的事情,結果可能是:成功、失敗或是不確定,“不確定”(例如超時等)可以等價為失敗。成功就可以把記錄的東西清理掉了,對於失敗和不確定,可以依靠定時任務等方式把所有失敗的事情重新搞一遍,直到成功為止。
回到剛才的例子,系統在A扣錢成功的情況下,把要給B“通知”這件事記錄在庫里(為了保證最高的可靠性可以把通知B系統加錢和扣錢成功這兩件事維護在一個本地事務里),通知成功則刪除這條記錄,通知失敗或不確定則依靠定時任務補償性地通知我們,直到我們把狀態更新成正確的為止。需要注意的是,像Kafka等消息隊列,它的設計在設計層面上具有丟失消息的可能,比如定時刷盤,會有丟失消息的可能,哪怕是只丟千分之一的消息,業務用其他手段也必須保證結果正確。 -
廣播:消息隊列的基本功能之一是進行廣播。如果沒有消息隊列,每當一個新的業務方接入,我們都要聯調一次新接口。有了消息隊列,我們只需要關心消息是否送達了隊列,至於誰希望訂閱,是下游的事情,無疑極大地減少了開發和聯調的工作量。
-
提速:假設我們還需要發送郵件,有了消息隊列就不需要同步等待,我們可以直接並行處理,而下單核心任務可以更快完成。增強業務系統的異步處理能力。甚至幾乎不可能出現並發現象。
-
削峰和流控:對於不需要實時處理的請求來說,當並發量特別大的時候,可以先在消息隊列中作緩存,然后陸續發送給對應的服務去處理。試想上下游對於事情的處理能力是不同的。比如,Web前端每秒承受上千萬的請求,並不是什么神奇的事情,只需要加多一點機器,再搭建一些LVS負載均衡設備和Nginx等即可。但數據庫的處理能力卻十分有限,即使使用SSD加分庫分表,單機的處理能力仍然在萬級。由於成本的考慮,我們不能奢求數據庫的機器數量追上前端。這種問題同樣存在於系統和系統之間,如短信系統可能由於短板效應,速度卡在網關上(每秒幾百次請求),跟前端的並發量不是一個數量級。但用戶晚上個半分鍾左右收到短信,一般是不會有太大問題的。如果沒有消息隊列,兩個系統之間通過協商、滑動窗口等復雜的方案也不是說不能實現。但系統復雜性指數級增長,勢必在上游或者下游做存儲,並且要處理定時、擁塞等一系列問題。而且每當有處理能力有差距的時候,都需要單獨開發一套邏輯來維護這套邏輯。所以,利用中間系統轉儲兩個系統的通信內容,並在下游系統有能力處理這些消息的時候,再處理這些消息,是一套相對較通用的方式。總而言之,消息隊列不是萬能的。對於需要強事務保證而且延遲敏感的,RPC是優於消息隊列的。對於一些無關痛癢,或者對於別人非常重要但是對於自己不是那么關心的事情,可以利用消息隊列去做。支持最終一致性的消息隊列,能夠用來處理延遲不那么敏感的“分布式事務”場景,而且相對於笨重的分布式事務,可能是更優的處理方式。當上下游系統處理能力存在差距的時候,利用消息隊列做一個通用的“漏斗”。在下游有能力處理的時候,再進行分發。如果下游有很多系統關心你的系統發出的通知的時候,果斷地使用消息隊列吧。
4.消息隊列舉例
(1)這里我們只針對Kafka,RabbitMQ舉例
- Kafka
- RabbitMQ
...
(2)Kafka是一個apache項目,是一個高性能,跨語言,分布式發布訂閱消息隊列系統。
結構圖
特性
- 快速持久化。以時間復雜度為O(1)的方式提供消息持久化能力,即使對TB級以上數據也能保證常數時間復雜度的訪問性能。
- 高吞吐率。即使在非常廉價的商用機器上也能做到單機支持每秒100K條以上消息的傳輸。它是完全的分布式系統,它的Broker,Producer,Consumer(參考基本術語)都原生,自動支持分布式和自動實現負載均衡,它支持Hadoop數據並行加載。
- 支持Kafka Server間的消息分區,及分布式消費,同時保證每個Partition內的消息順序傳輸。
- 同時支持離線數據處理和實時數據處理。
- Scale out:支持在線水平擴展。
基本術語
- Broker:Kafka集群包含一個或多個服務器,這種服務器被稱為broker。
- Topic:每條發布到Kafka集群的消息都有一個類別,這個類別被稱為Topic。(物理上不同Topic的消息分開存儲,邏輯上一個Topic的消息雖然保存於一個或多個broker上但用戶只需指定消息的Topic即可生產或消費數據而不必關心數據存於何處)
- Partition:Partition是物理上的概念,每個Topic包含一個或多個Partition。
- Producer:負責發布消息到Kafka broker。
- Consumer:消息消費者,向Kafka broker讀取消息的客戶端。
- Consumer Group:每個Consumer屬於一個特定的Consumer Group(可為每個Consumer指定group name,若不指定group name則屬於默認的group)。
(3)接下來,我們看一下RabbitMQ。
結構圖
RabbitMQ里的基本定義
RabbitMQ Server:提供消息一條從Producer到Consumer的處理。
Exchange:一邊從發布者方接收消息,一邊把消息推送到隊列。
producer只能將消息發送給exchange。而exchange負責將消息發送到queues。Procuder Publish的Message進入了exchange,exchange會根據routingKey處理接收到的消息,判斷消息是應該推送到指定的隊列還是是多個隊列,或者是直接忽略消息。這些規則是通過交換機類型(exchange type)來定義的主要的type有direct,topic,headers,fanout。具體針對不同的場景使用不同的type。
queue也是通過這個routing keys來做的綁定。交換機將會對綁定鍵(binding key)和路由鍵(routing key)進行精確匹配,從而確定消息該分發到哪個隊列。
Queue:消息隊列。接收來自exchange的消息,然后再由consumer取出。exchange和queue可以一對一,也可以一對多,它們的關系通過routingKey來綁定。
Producer:Client A & B,生產者,消息的來源,消息必須發送給exchange。而不是直接給queue
Consumer:Client 1,2,3消費者,直接從queue中獲取消息進行消費,而不是從exchange中獲取消息進行消費。
(4)Kafka,rabbitmq使用springboot舉例,為了簡約內容,兩者同時運行測試。
windows下安裝kafka請參考:https://www.jianshu.com/p/d64798e81f3b
包架構
KafkaReceiver.java
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
/**
* 接收端
*/
@Component
@Slf4j
public class KafkaReceiver {
@KafkaListener(topics={TopicConstants.TEST})
public void receive(ConsumerRecord<?,?> record){
log.info("record:{}",record);
}
}
KafkaSender.java
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.practice.mq.Message;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.Date;
/**
* 發送端
*/
@Component
@Slf4j
public class KafkaSender {
@Resource
private KafkaTemplate<String,String> kafkaTemplate;
private Gson gson = new GsonBuilder().create();
public void send(String msg) {
Message message = new Message();
message.setId(System.currentTimeMillis());
message.setMsg(msg);
message.setSendTime(new Date());
log.info("send Message:{}",message);
kafkaTemplate.send(TopicConstants.TEST,gson.toJson(message));
}
}
TopicConstants.java
public interface TopicConstants {
//定義一下我們需要使用Topic的字符串
String TEST = "test";
String MESSAGE = "message";
}
QueuesContants.java
public interface QueuesConstants {
String TEST="test";
String MESSAGE="message";
}
RabbitMQClient.java
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
@Component
public class RabbitMQClient {
@Resource
private RabbitTemplate rabbitTemplate;
public void send(String message){
//發送到指定隊列
rabbitTemplate.convertAndSend(QueuesConstants.TEST,message);
}
}
RabbitMQServer.java
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMQConfig {
@Bean
public Queue queue(){
//定義好要發送的隊列
return new Queue(QueuesConstants.TEST);
}
}
Message.java
import lombok.Data;
import java.util.Date;
@Data
public class Message {
private Long id;
private String msg;
private Date sendTime;
}
MQController.java
import com.practice.mq.kafka.KafkaSender;
import com.practice.mq.rabbitmq.RabbitMQClient;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import javax.annotation.Resource;
@Controller
@RequestMapping("/mq")
public class MQController {
@Resource
private RabbitMQClient rabbitMQClient;
@Resource
private KafkaSender kafkaSender;
@RequestMapping("/send")
@ResponseBody
public String send(){
String message = "message";
rabbitMQClient.send(message);
kafkaSender.send(message);
return "success";
}
}
用到的Maven依賴(這里結合springboot)
<!--kafka-->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<!--Gson-->
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.7</version>
</dependency>
<!-- rabbitmq依賴 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
application.properties配置
#============== kafka ===================
# 指定kafka server的地址,集群配多個,中間,逗號隔開
spring.kafka.bootstrap-servers=127.0.0.1:9092
spring.kafka.consumer.group-id=test
#=============== provider =======================
# 寫入失敗時,重試次數。當leader節點失效,一個repli節點會替代成為leader節點,此時可能出現寫入失敗,
# 當retris為0時,produce不會重復。retirs重發,此時repli節點完全成為leader節點,不會產生消息丟失。
spring.kafka.producer.retries=0
# 每次批量發送消息的數量,produce積累到一定數據,一次發送
spring.kafka.producer.batch-size=16384
# produce積累數據一次發送,緩存大小達到buffer.memory就發送數據
spring.kafka.producer.buffer-memory=33554432
#procedure要求leader在考慮完成請求之前收到的確認數,用於控制發送記錄在服務端的持久化,其值可以為如下:
#acks = 0 如果設置為零,則生產者將不會等待來自服務器的任何確認,該記錄將立即添加到套接字緩沖區並視為已發送。在這種情況下,無法保證服務器已收到記錄,並且重試配置將不會生效(因為客戶端通常不會知道任何故障),為每條記錄返回的偏移量始終設置為-1。
#acks = 1 這意味着leader會將記錄寫入其本地日志,但無需等待所有副本服務器的完全確認即可做出回應,在這種情況下,如果leader在確認記錄后立即失敗,但在將數據復制到所有的副本服務器之前,則記錄將會丟失。
#acks = all 這意味着leader將等待完整的同步副本集以確認記錄,這保證了只要至少一個同步副本服務器仍然存活,記錄就不會丟失,這是最強有力的保證,這相當於acks = -1的設置。
#可以設置的值為:all, -1, 0, 1
spring.kafka.producer.acks=1
# 指定消息key和消息體的編解碼方式
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
瀏覽器運行“http://127.0.0.1:8090/mq/send”
控制台打印
rabbitmq:
...
2020-04-19 02:09:12.040 INFO 31676 --- [nio-8090-exec-1] com.practice.mq.kafka.KafkaSender : send Message:Message(id=1587233352040, msg=message, sendTime=Sun Apr 19 02:09:12 GMT+08:00 2020)
2020-04-19 02:09:12.048 INFO 31676 --- [cTaskExecutor-1] com.practice.mq.rabbitmq.RabbitMQServer : message:message
2020-04-19 02:09:12.054 INFO 31676 --- [nio-8090-exec-1] o.a.k.clients.producer.ProducerConfig : ProducerConfig values:
kafka:
...
2020-04-19 02:09:12.070 INFO 31676 --- [nio-8090-exec-1] o.a.kafka.common.utils.AppInfoParser : Kafka version : 2.0.0
2020-04-19 02:09:12.070 INFO 31676 --- [nio-8090-exec-1] o.a.kafka.common.utils.AppInfoParser : Kafka commitId : 3402a8361b734732
2020-04-19 02:09:12.076 INFO 31676 --- [ad | producer-1] org.apache.kafka.clients.Metadata : Cluster ID: i1-NXUmvQRyaT-E27LPozQ
2020-04-19 02:09:12.106 INFO 31676 --- [ntainer#0-0-C-1] com.practice.mq.kafka.KafkaReceiver : record:ConsumerRecord(topic = test, partition = 0, offset = 5, CreateTime = 1587233352082, serialized key size = -1, serialized value size = 73, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = {"id":1587233352040,"msg":"message","sendTime":"Apr 19, 2020 2:09:12 AM"})
OK,那么關於這兩個隊列的簡單示例到此結束!