一、HTTP在微服务场景下的问题
对于SpringCloud微服务架构,每一个SpringBoot项目都是一个服务,各个服务对外暴露REST接口,通过HTTP协议彼此调用。
传统的HTTP采用的是重文本传输,传输报文分为3部分:起始行、首部、主体。
-
报文的第一行就是起始行,在请求报文中用来说明要做些什么,在响应报文中说明出现了什么情况。
-
首部字段起始行后面有零个或多个首部字段。每个首部字段都包含一个名字和一个值,为了便于解析,两者之间用冒号(:)来分隔。首部以一个空行结束,添加一个首部字段和添加新行一样简单。
-
空行之后就是可选的报文主体了,其中包含了所有类型的数据。请求主体中包括了要发送给Web 服务器的数据;响应主体中装载了要返回给客户端的数据。起始行和首部都是文本形式且都是结构化的,而主体则不同,主体中可以包含任意的二进制数据(比如图片、视频、音轨、软件程序)。当然,主体中也可以包含文本。
使用HTTP做微服务间的信息传输协议会面临以下问题:
- HTTP/1.1属于无状态协议,对于一些附加头信息只能采取非压缩方式传输,增大了服务器交互开销。
- HTTP/1.1是基于请求-响应模式的,属于一元操作,所以用户每发送一个请求才能得到一个响应,未收到响应前不能够发送其他请求。
- HTTP/1.1基于TCP完成,需要三次握手才能保证可靠连接,会非常耗时。
二、RSocket协议
RSocket协议是由Facebook、Netifi和Privota等公司开发的一个新的通讯协议,该协议采用二进制点对点的数据传输,主要用于分布式架构中,是一种基于Reactive Streams规范标准实现的新的网络通信第七层(应用层)协议。
RSocket协议具有多路复用(Multiplexed)、双向流(Bidirectional Streaming)、流控(Flow Control)、连接恢复(Socket Resumption)、异步消息传递(Asynchronous Message Passing)、传输层解耦和(Transport independent)等特点。
RSocket官方站点:https://rsocket.io/
2.1 多路复用
在HTTP/3.0标准以前所有的HTTP协议都是基于TCP协议实现的,所以在HTTP/1.0协议版本中每一次用户的请求对服务器端都需要创建有一个新的TCP连接(3次握手与4次挥手),而为了解决TCP性能的问题,在HTTP/1.1协议版本中提出了TCP连接复用的支持,但是此时的连接复用在每次只允许有一个用户的请求进行处理,而当该请求处理完成后才允许其他请求继续使用此TCP连接进行请求处理,这样一来如果某一个请求的处理操作非常耗时,则会导致后续请求处理性能下降。
所以为了进一步解决请求处理性能的问题,在HTTP/2.0中对连接操作进行了进一步改进,允许一个TCP连接同时实现多个客户端的请求处理,这样即便某一个请求操作耗时,但是也不会影响到整体的处理性能,如图所示。但是基于TCP协议实现的HTTP协议始终会存在有性能问题,所以在HTTP/3.0协议版本中使用QUIC作为新的传输层协议,QUIC基于UDP协议实现,同时也自带多路复用结构。
QUIC(Quick UDP lnternet Connection)是谷歌制定的一种基于UDP的低时延的互联网传输层协议。在2016年11月国际互联网工程任务组(IETF)召开了第一次QUIC工作组会议,受到了业界的广泛关注。这也意味着QUIC开始了它的标准化过程,成为新一代传输层协议。
QUIC很好地解决了当今传输层和应用层面临的各种需求,包括处理更多的连接,安全性,和低延迟。QUIC融合了包括TCP、TLS、HTTP/2.0等协议的特性。
在HTTP/2.0协议中重点的问题是解决了TCP连接多路复用的问题,但是在HTTP协议中一切的数据都是以文本的形式进行传输,所以在实际开发中就会存在有数据传输过大以及传输结构受限的问题,而RSocket是一个二进制协议,可以方便的进行各种数据的传输,同时没有数据格式的限制,用户也可以根据自身的需要进行压缩处理。
在RSocket中将消息体分为数据(data)和元数据(metadata)两个组成部分,这样可以保证在高速数据传输下依然可以对外暴露少量元数据给其他服务使用。
2.2 双向流
RSocket实现了双向流通讯支持,利用双向流可以实现服务端与客户端之间的通讯处理这样在请求与响应的处理过程中,客户端可以向服务器端发送请求,服务器端也可以向客户端发送请求。
RSocket四种数据交互模式:
- Request-And-Response:请求/响应,类似于HTTP的通信特点,提供异步通信与多路复用支持;
- Request-Response-Stream:请求/流式响应,一个请求对应多个流式的响应,例如:获取视频列表或产品列表,
- Fire-And-Forget:异步触发,不需要响应,可以用于进行日志记录;
- Channel(bi-directional streams):双向异步通讯,消息流在服务端与客户端两个方向上异步流动;
2.3 流控
在分布式的项目开发环境之中,如果说生产者生产的数据过快,就会导致消费者无法及时进行处理,最终就有可能出现内存与CPU的占用率增高,从而出现服务端或客户端无响应的状况,而如果没有进行良好的实现控制,那么就有可能会由于雪崩问题而导致整个应用集群的瘫痪,如图所示。为了避免这样的情况出现,就需要有一套流控机制来协调生产者与消费者之间的处理速度。
在RSocket中提供了Stream Leve|流量控制,由于RSocket作为一个应用层协议,所以采取的并不是基于字节的网络层实现流控,而是基于应用层帧数的流量控制(控制生产者生产的消息数量)
2.4 连接恢复
由于移动网络的兴起,所以在网络连接的稳定性上就出现了较大的挑战,当网络出现故障后应及时的进行连接恢复,在RSocket中提供有连接恢复(Connection Resumption)功能,同时为了简化用户的处理操作,在连接恢复成功后用户不会有任何的感知,而在连接恢复失败时才会通过onError事件触发相应的回调函数,这样在进行Stream时可以保持响应,同时减少重复数据信息的传输,因为在多路复用的结构中如果重复传输则意味着网络压力的增加。
RSocket中提供的"SocketResumption"恢复机制,恢复实现的核心原理在于重新建立网络连接后不从头处理用户请求,客户端和服务端需要能够在连接中断后的一段时间内自动的保存该Connection上的Stream状态,而在连接恢复后,客户端会将此状态信息发送给服务器端,服务器端会进行灰复判断,如果成功恢复则继续之前的Stream操作。
2.5 异步消息传递
RSocket的协议在进行数据传输时采用的是异步消息传递的形式,所传输的内容为Frame(应用层帧,例如:FrameHeader、RESUME等),同时在RSocket传输中并不像HTTP协议那样包含有明确的目标访问路径,所有的访问全部由路由模块负责实现。
RSocket协议在数据传输时氵肖息使用帧来进彳寸装的,每个帧可能是请求内容、响应内容或与协议相关的数据信息,而一个应用消息可能被切分为多个不同的片段以保存在一个帧中(TCP中的粘包与拆包)。
2.6 传输层解耦和
RSocket协议是一个应用层的面向连接协议,不依赖于传输层协议,所以可以由用户自由的选择不同的应用场景,例如:在进行数据中心构建时可以使用TCP处理,而在进行浏览器异步交互时,可以使用WebSocket处理,在进行HTTP服务时可以使用HTTP/2.0处理。
三、RSocket基础开发
创建springboot项目rsocket-base,导入依赖:
<dependency>
<groupid>org.springframework.boot</groupid>
<artifactid>spring-boot-starter-rsocket</artifactid>
</dependency>
定义RSocket处理类,该类需要实现RSocket接口。
package com.it.rsocket.server.handle;
import io.rsocket.Payload;
import io.rsocket.RSocket;
import io.rsocket.util.DefaultPayload;
import lombok.extern.slf4j.Slf4j;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@Slf4j
public class MessageRSocketHandler implements RSocket {
@Override
public Mono<void> fireAndForget(Payload payload) { // 无响应
// 一般这种无响应的操作可用于日志记录的模式上
// Payload表示所有附加的数据,对于RSocket来讲,所有的数据通讯都通过此结构传输
String message = payload.getDataUtf8(); // 获取数据
log.info("【fireAndForget】接受请求数据:{}", message);
return Mono.empty(); // 返回空消息
}
@Override
public Mono<payload> requestResponse(Payload payload) { // 传统模式,有数据响应
String message = payload.getDataUtf8();
log.info("【requestResponse】接受请求数据:{}", message);
return Mono.just(DefaultPayload.create("【ECHO】" + message)); // 数据响应
}
@Override
public Flux<payload> requestStream(Payload payload) { // 处理数据流
String message = payload.getDataUtf8();
log.info("【requestStream】接受请求数据:{}", message);
return Flux.fromStream(message.chars()// 接收字符串转为int流数据
.mapToObj(c -> Character.toUpperCase((char) c)) // 字符编码转大写
.map(Object::toString) // 调用toString
.map(DefaultPayload::create)); // 创建Payload附加数据
}
@Override
public Flux<payload> requestChannel(Publisher<payload> payloads) { // 双向流
return Flux.from(payloads).map(Payload::getDataUtf8).map(msg -> {
log.info("【requestChannel】接受请求数据:{}", msg);
return msg; // 返回发送数据内容
}).map(DefaultPayload::create);
}
}
以上实现了RSocket核心的处理机制,但是在实际的开发之中,需要将这个处理的核心与客户端请求绑定在一起的,所以这个时候就需要创建一个连接器。
package com.it.rsocket.server.acceptor;
import com.it.server.rsocket.handle.MessageRSocketHandler;
import io.rsocket.ConnectionSetupPayload;
import io.rsocket.RSocket;
import io.rsocket.SocketAcceptor;
import reactor.core.publisher.Mono;
public class MessageRSocketAcceptor implements SocketAcceptor {
@Override
public Mono<rsocket> accept(ConnectionSetupPayload setup, RSocket sendingSocket) {
return Mono.just(new MessageRSocketHandler());
}
}
由于没有引入SpringBoot容器管理,需要手工的来实现服务的启动与关闭操作,创建一个专属工具类。
package com.it.rsocket.server;
import com.it.rsocket.server.acceptor.MessageRSocketAcceptor;
import io.rsocket.core.RSocketServer;
import io.rsocket.frame.decoder.PayloadDecoder;
import io.rsocket.transport.netty.server.TcpServerTransport;
import reactor.core.Disposable;
public class MessageServer { // 实现RSocket状态控制
private static Disposable disposable; // 用于释放任务
public static void start() { // 启动服务
RSocketServer server = RSocketServer.create(); // 创建RSocket服务器
server.acceptor(new MessageRSocketAcceptor()); // 创建连接器
server.payloadDecoder(PayloadDecoder.ZERO_COPY); // 采用零拷贝技术
disposable = server.bind(TcpServerTransport.create(8080)).subscribe(); // 开启订阅
}
public static void stop() {
disposable.dispose(); // 释放连接
}
}
编写测试类。
package com.it.rsocket;
import com.it.rsocket.server.MessageServer;
import io.rsocket.Payload;
import io.rsocket.RSocket;
import io.rsocket.core.RSocketConnector;
import io.rsocket.transport.netty.client.TcpClientTransport;
import io.rsocket.util.DefaultPayload;
import org.junit.Test;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.MethodOrderer;
import org.junit.jupiter.api.TestMethodOrder;
import reactor.core.publisher.Flux;
import java.time.Duration;
@TestMethodOrder(MethodOrderer.OrderAnnotation.class) // 手工配置方法执行顺序
public class TestMessageServer {
private static RSocket rSocket;
private static Flux<payload> getRequestPayload() { // 传递的数据
return Flux.just("RSocket", "SpringCloud", "Redis", "Netty")
.delayElements(Duration.ofSeconds(1))
.map(DefaultPayload::create);
}
@Test
public static void testFireAndForget() {
getRequestPayload().flatMap(payload -> rSocket.fireAndForget(payload))
.blockLast(Duration.ofMillis(1));
}
@BeforeAll // 测试前执行
public void setUpClient() {
MessageServer.start(); // 启动服务器
rSocket = RSocketConnector.connectWith(TcpClientTransport.create(8080)).block(); // 客户端连接
}
@AfterAll // 测试后执行
public void stopServer() {
MessageServer.stop(); // 关闭服务器
}
}
```</payload></rsocket></payload></payload></payload></payload></void>