一、HTTP在微服務場景下的問題
對於SpringCloud微服務架構,每一個SpringBoot項目都是一個服務,各個服務對外暴露REST接口,通過HTTP協議彼此調用。

傳統的HTTP采用的是重文本傳輸,傳輸報文分為3部分:起始行、首部、主體。
-
報文的第一行就是起始行,在請求報文中用來說明要做些什么,在響應報文中說明出現了什么情況。
-
首部字段起始行后面有零個或多個首部字段。每個首部字段都包含一個名字和一個值,為了便於解析,兩者之間用冒號(:)來分隔。首部以一個空行結束,添加一個首部字段和添加新行一樣簡單。
-
空行之后就是可選的報文主體了,其中包含了所有類型的數據。請求主體中包括了要發送給Web 服務器的數據;響應主體中裝載了要返回給客戶端的數據。起始行和首部都是文本形式且都是結構化的,而主體則不同,主體中可以包含任意的二進制數據(比如圖片、視頻、音軌、軟件程序)。當然,主體中也可以包含文本。
使用HTTP做微服務間的信息傳輸協議會面臨以下問題:
- HTTP/1.1屬於無狀態協議,對於一些附加頭信息只能采取非壓縮方式傳輸,增大了服務器交互開銷。
- HTTP/1.1是基於請求-響應模式的,屬於一元操作,所以用戶每發送一個請求才能得到一個響應,未收到響應前不能夠發送其他請求。
- HTTP/1.1基於TCP完成,需要三次握手才能保證可靠連接,會非常耗時。
二、RSocket協議
RSocket協議是由Facebook、Netifi和Privota等公司開發的一個新的通訊協議,該協議采用二進制點對點的數據傳輸,主要用於分布式架構中,是一種基於Reactive Streams規范標准實現的新的網絡通信第七層(應用層)協議。
RSocket協議具有多路復用(Multiplexed)、雙向流(Bidirectional Streaming)、流控(Flow Control)、連接恢復(Socket Resumption)、異步消息傳遞(Asynchronous Message Passing)、傳輸層解耦和(Transport independent)等特點。
RSocket官方站點:https://rsocket.io/

2.1 多路復用
在HTTP/3.0標准以前所有的HTTP協議都是基於TCP協議實現的,所以在HTTP/1.0協議版本中每一次用戶的請求對服務器端都需要創建有一個新的TCP連接(3次握手與4次揮手),而為了解決TCP性能的問題,在HTTP/1.1協議版本中提出了TCP連接復用的支持,但是此時的連接復用在每次只允許有一個用戶的請求進行處理,而當該請求處理完成后才允許其他請求繼續使用此TCP連接進行請求處理,這樣一來如果某一個請求的處理操作非常耗時,則會導致后續請求處理性能下降。
所以為了進一步解決請求處理性能的問題,在HTTP/2.0中對連接操作進行了進一步改進,允許一個TCP連接同時實現多個客戶端的請求處理,這樣即便某一個請求操作耗時,但是也不會影響到整體的處理性能,如圖所示。但是基於TCP協議實現的HTTP協議始終會存在有性能問題,所以在HTTP/3.0協議版本中使用QUIC作為新的傳輸層協議,QUIC基於UDP協議實現,同時也自帶多路復用結構。
QUIC(Quick UDP lnternet Connection)是谷歌制定的一種基於UDP的低時延的互聯網傳輸層協議。在2016年11月國際互聯網工程任務組(IETF)召開了第一次QUIC工作組會議,受到了業界的廣泛關注。這也意味着QUIC開始了它的標准化過程,成為新一代傳輸層協議。
QUIC很好地解決了當今傳輸層和應用層面臨的各種需求,包括處理更多的連接,安全性,和低延遲。QUIC融合了包括TCP、TLS、HTTP/2.0等協議的特性。

在HTTP/2.0協議中重點的問題是解決了TCP連接多路復用的問題,但是在HTTP協議中一切的數據都是以文本的形式進行傳輸,所以在實際開發中就會存在有數據傳輸過大以及傳輸結構受限的問題,而RSocket是一個二進制協議,可以方便的進行各種數據的傳輸,同時沒有數據格式的限制,用戶也可以根據自身的需要進行壓縮處理。
在RSocket中將消息體分為數據(data)和元數據(metadata)兩個組成部分,這樣可以保證在高速數據傳輸下依然可以對外暴露少量元數據給其他服務使用。

2.2 雙向流
RSocket實現了雙向流通訊支持,利用雙向流可以實現服務端與客戶端之間的通訊處理這樣在請求與響應的處理過程中,客戶端可以向服務器端發送請求,服務器端也可以向客戶端發送請求。

RSocket四種數據交互模式:
- Request-And-Response:請求/響應,類似於HTTP的通信特點,提供異步通信與多路復用支持;
- Request-Response-Stream:請求/流式響應,一個請求對應多個流式的響應,例如:獲取視頻列表或產品列表,
- Fire-And-Forget:異步觸發,不需要響應,可以用於進行日志記錄;
- Channel(bi-directional streams):雙向異步通訊,消息流在服務端與客戶端兩個方向上異步流動;
2.3 流控
在分布式的項目開發環境之中,如果說生產者生產的數據過快,就會導致消費者無法及時進行處理,最終就有可能出現內存與CPU的占用率增高,從而出現服務端或客戶端無響應的狀況,而如果沒有進行良好的實現控制,那么就有可能會由於雪崩問題而導致整個應用集群的癱瘓,如圖所示。為了避免這樣的情況出現,就需要有一套流控機制來協調生產者與消費者之間的處理速度。

在RSocket中提供了Stream Leve|流量控制,由於RSocket作為一個應用層協議,所以采取的並不是基於字節的網絡層實現流控,而是基於應用層幀數的流量控制(控制生產者生產的消息數量)
2.4 連接恢復
由於移動網絡的興起,所以在網絡連接的穩定性上就出現了較大的挑戰,當網絡出現故障后應及時的進行連接恢復,在RSocket中提供有連接恢復(Connection Resumption)功能,同時為了簡化用戶的處理操作,在連接恢復成功后用戶不會有任何的感知,而在連接恢復失敗時才會通過onError事件觸發相應的回調函數,這樣在進行Stream時可以保持響應,同時減少重復數據信息的傳輸,因為在多路復用的結構中如果重復傳輸則意味着網絡壓力的增加。
RSocket中提供的"SocketResumption"恢復機制,恢復實現的核心原理在於重新建立網絡連接后不從頭處理用戶請求,客戶端和服務端需要能夠在連接中斷后的一段時間內自動的保存該Connection上的Stream狀態,而在連接恢復后,客戶端會將此狀態信息發送給服務器端,服務器端會進行灰復判斷,如果成功恢復則繼續之前的Stream操作。

2.5 異步消息傳遞
RSocket的協議在進行數據傳輸時采用的是異步消息傳遞的形式,所傳輸的內容為Frame(應用層幀,例如:FrameHeader、RESUME等),同時在RSocket傳輸中並不像HTTP協議那樣包含有明確的目標訪問路徑,所有的訪問全部由路由模塊負責實現。
RSocket協議在數據傳輸時氵肖息使用幀來進彳寸裝的,每個幀可能是請求內容、響應內容或與協議相關的數據信息,而一個應用消息可能被切分為多個不同的片段以保存在一個幀中(TCP中的粘包與拆包)。
2.6 傳輸層解耦和
RSocket協議是一個應用層的面向連接協議,不依賴於傳輸層協議,所以可以由用戶自由的選擇不同的應用場景,例如:在進行數據中心構建時可以使用TCP處理,而在進行瀏覽器異步交互時,可以使用WebSocket處理,在進行HTTP服務時可以使用HTTP/2.0處理。
三、RSocket基礎開發
創建springboot項目rsocket-base,導入依賴:
<dependency>
<groupid>org.springframework.boot</groupid>
<artifactid>spring-boot-starter-rsocket</artifactid>
</dependency>
定義RSocket處理類,該類需要實現RSocket接口。
package com.it.rsocket.server.handle;
import io.rsocket.Payload;
import io.rsocket.RSocket;
import io.rsocket.util.DefaultPayload;
import lombok.extern.slf4j.Slf4j;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@Slf4j
public class MessageRSocketHandler implements RSocket {
@Override
public Mono<void> fireAndForget(Payload payload) { // 無響應
// 一般這種無響應的操作可用於日志記錄的模式上
// Payload表示所有附加的數據,對於RSocket來講,所有的數據通訊都通過此結構傳輸
String message = payload.getDataUtf8(); // 獲取數據
log.info("【fireAndForget】接受請求數據:{}", message);
return Mono.empty(); // 返回空消息
}
@Override
public Mono<payload> requestResponse(Payload payload) { // 傳統模式,有數據響應
String message = payload.getDataUtf8();
log.info("【requestResponse】接受請求數據:{}", message);
return Mono.just(DefaultPayload.create("【ECHO】" + message)); // 數據響應
}
@Override
public Flux<payload> requestStream(Payload payload) { // 處理數據流
String message = payload.getDataUtf8();
log.info("【requestStream】接受請求數據:{}", message);
return Flux.fromStream(message.chars()// 接收字符串轉為int流數據
.mapToObj(c -> Character.toUpperCase((char) c)) // 字符編碼轉大寫
.map(Object::toString) // 調用toString
.map(DefaultPayload::create)); // 創建Payload附加數據
}
@Override
public Flux<payload> requestChannel(Publisher<payload> payloads) { // 雙向流
return Flux.from(payloads).map(Payload::getDataUtf8).map(msg -> {
log.info("【requestChannel】接受請求數據:{}", msg);
return msg; // 返回發送數據內容
}).map(DefaultPayload::create);
}
}
以上實現了RSocket核心的處理機制,但是在實際的開發之中,需要將這個處理的核心與客戶端請求綁定在一起的,所以這個時候就需要創建一個連接器。
package com.it.rsocket.server.acceptor;
import com.it.server.rsocket.handle.MessageRSocketHandler;
import io.rsocket.ConnectionSetupPayload;
import io.rsocket.RSocket;
import io.rsocket.SocketAcceptor;
import reactor.core.publisher.Mono;
public class MessageRSocketAcceptor implements SocketAcceptor {
@Override
public Mono<rsocket> accept(ConnectionSetupPayload setup, RSocket sendingSocket) {
return Mono.just(new MessageRSocketHandler());
}
}
由於沒有引入SpringBoot容器管理,需要手工的來實現服務的啟動與關閉操作,創建一個專屬工具類。
package com.it.rsocket.server;
import com.it.rsocket.server.acceptor.MessageRSocketAcceptor;
import io.rsocket.core.RSocketServer;
import io.rsocket.frame.decoder.PayloadDecoder;
import io.rsocket.transport.netty.server.TcpServerTransport;
import reactor.core.Disposable;
public class MessageServer { // 實現RSocket狀態控制
private static Disposable disposable; // 用於釋放任務
public static void start() { // 啟動服務
RSocketServer server = RSocketServer.create(); // 創建RSocket服務器
server.acceptor(new MessageRSocketAcceptor()); // 創建連接器
server.payloadDecoder(PayloadDecoder.ZERO_COPY); // 采用零拷貝技術
disposable = server.bind(TcpServerTransport.create(8080)).subscribe(); // 開啟訂閱
}
public static void stop() {
disposable.dispose(); // 釋放連接
}
}
編寫測試類。
package com.it.rsocket;
import com.it.rsocket.server.MessageServer;
import io.rsocket.Payload;
import io.rsocket.RSocket;
import io.rsocket.core.RSocketConnector;
import io.rsocket.transport.netty.client.TcpClientTransport;
import io.rsocket.util.DefaultPayload;
import org.junit.Test;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.MethodOrderer;
import org.junit.jupiter.api.TestMethodOrder;
import reactor.core.publisher.Flux;
import java.time.Duration;
@TestMethodOrder(MethodOrderer.OrderAnnotation.class) // 手工配置方法執行順序
public class TestMessageServer {
private static RSocket rSocket;
private static Flux<payload> getRequestPayload() { // 傳遞的數據
return Flux.just("RSocket", "SpringCloud", "Redis", "Netty")
.delayElements(Duration.ofSeconds(1))
.map(DefaultPayload::create);
}
@Test
public static void testFireAndForget() {
getRequestPayload().flatMap(payload -> rSocket.fireAndForget(payload))
.blockLast(Duration.ofMillis(1));
}
@BeforeAll // 測試前執行
public void setUpClient() {
MessageServer.start(); // 啟動服務器
rSocket = RSocketConnector.connectWith(TcpClientTransport.create(8080)).block(); // 客戶端連接
}
@AfterAll // 測試后執行
public void stopServer() {
MessageServer.stop(); // 關閉服務器
}
}
```</payload></rsocket></payload></payload></payload></payload></void>
