基於Netty和SpringBoot實現一個輕量級RPC框架-協議篇
前提
最近對網絡編程方面比較有興趣,在微服務實踐上也用到了相對主流的RPC
框架如Spring Cloud Gateway
底層也切換為Reactor-Netty
,像Redisson
底層也是使用Netty
封裝通訊協議,最近調研和准備使用的SOFARpc
也是基於Netty
封裝實現了多種協議的兼容。因此,基於Netty
造一個輪子,在SpringBoot
的加持下,實現一個輕量級的RPC
框架。這篇博文介紹的是RPC
框架協議的定義以及對應的編碼解碼處理的實現。
依賴引入
截止本文(2020-01-12
)編寫完成之時,Netty
的最新版本為4.1.44.Final
,而SpringBoot
的最新版本為2.2.2.RELEASE
,因此引入這兩個版本的依賴,加上其他工具包和序列化等等的支持,pom
文件的核心內容如下:
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>${spring.boot.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>${netty.version}</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.10</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.61</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>28.1-jre</version>
</dependency>
</dependencies>
部分參數的序列化會依賴到FastJson
或者Jackson
,具體看偏好而定。
自定義協議的定義
為了提高協議傳輸的效率,需要定制一套高效的RPC
協議,設計協議所需的字段和類型。
基礎Packet字段:
字段名 | 字段類型 | 字段功能 | 備注 |
---|---|---|---|
magicNumber |
int |
魔數,類似於Java 的字節碼文件的魔數是0xcafebase |
|
version |
int |
版本號 | 預留字段,默認為1 |
serialNumber |
java.lang.String |
請求流水號 | 十分重要,每個請求的唯一標識 |
messageType |
MessageType |
消息類型 | 自定義的枚舉類型,見下面的MessageType 類 |
attachments |
Map<String, String> |
附件 | K-V 形式,類似於HTTP 協議中的Header |
// 消息枚舉類型
@RequiredArgsConstructor
public enum MessageType {
/**
* 請求
*/
REQUEST((byte) 1),
/**
* 響應
*/
RESPONSE((byte) 2),
/**
* PING
*/
PING((byte) 3),
/**
* PONG
*/
PONG((byte) 4),
/**
* NULL
*/
NULL((byte) 5),
;
@Getter
private final Byte type;
public static MessageType fromValue(byte value) {
for (MessageType type : MessageType.values()) {
if (type.getType() == value) {
return type;
}
}
throw new IllegalArgumentException(String.format("value = %s", value));
}
}
// 基礎Packet
@Data
public abstract class BaseMessagePacket implements Serializable {
/**
* 魔數
*/
private int magicNumber;
/**
* 版本號
*/
private int version;
/**
* 流水號
*/
private String serialNumber;
/**
* 消息類型
*/
private MessageType messageType;
/**
* 附件 - K-V形式
*/
private Map<String, String> attachments = new HashMap<>();
/**
* 添加附件
*/
public void addAttachment(String key, String value) {
attachments.put(key, value);
}
}
請求Packet擴展字段:
字段名 | 字段類型 | 字段功能 | 備注 |
---|---|---|---|
interfaceName |
java.lang.String |
接口全類名 | |
methodName |
java.lang.String |
方法名 | |
methodArgumentSignatures |
java.lang.String[] |
方法參數簽名字符串數組 | 存放方法參數類型全類名字符串數組 |
methodArguments |
java.lang.Object[] |
方法參數數組 | 因為未知方法參數類型,所以用Object 表示 |
@EqualsAndHashCode(callSuper = true)
@Data
public class RequestMessagePacket extends BaseMessagePacket {
/**
* 接口全類名
*/
private String interfaceName;
/**
* 方法名
*/
private String methodName;
/**
* 方法參數簽名
*/
private String[] methodArgumentSignatures;
/**
* 方法參數
*/
private Object[] methodArguments;
}
響應Packet擴展字段:
字段名 | 字段類型 | 字段功能 | 備注 |
---|---|---|---|
errorCode |
java.lang.Long |
響應碼 | |
message |
java.lang.String |
響應消息 | 如果出現異常,message 就是對應的異常信息 |
payload |
java.lang.Object |
消息載荷 | 業務處理返回的消息載荷,定義為Object 類型 |
@EqualsAndHashCode(callSuper = true)
@Data
public class ResponseMessagePacket extends BaseMessagePacket {
/**
* error code
*/
private Long errorCode;
/**
* 消息描述
*/
private String message;
/**
* 消息載荷
*/
private Object payload;
}
需要注意以下幾點:
- 非基本類型在序列化和反序列化的時候,一定注意要先寫入或者先讀取序列的長度,以
java.lang.String
類型為例:
// 序列化 - 流水號
out.writeInt(packet.getSerialNumber().length());
out.writeCharSequence(packet.getSerialNumber(), ProtocolConstant.UTF_8);
// 反序列化 - 流水號
int serialNumberLength = in.readInt();
packet.setSerialNumber(in.readCharSequence(serialNumberLength, ProtocolConstant.UTF_8).toString());
- 特殊編碼的字符串在序列化的時候,要注意字符串編碼的長度,例如
UTF-8
編碼下一個中文字符占3個字節,這一點可以抽取一個工具類專門處理字符串的序列化:
public enum ByteBufferUtils {
// 單例
X;
public void encodeUtf8CharSequence(ByteBuf byteBuf, CharSequence charSequence) {
int writerIndex = byteBuf.writerIndex();
byteBuf.writeInt(0);
int length = ByteBufUtil.writeUtf8(byteBuf, charSequence);
byteBuf.setInt(writerIndex, length);
}
}
- 方法參數數組的序列化和反序列化方案需要定制,筆者為了簡化自定義協議,定義了方法參數簽名數組,長度和方法參數數組一致,這樣做方便后面編寫服務端代碼的時候,簡化對方法參數數組進行反序列化以及宿主類目標方法的查找。注意一下
Object[]
的序列化和反序列化相對特殊,因為ByteBuf
無法處理自定義類型的寫入和讀取(這個很好理解,網絡編程就是面向0
和1
的編程):
write Object --> ByteBuf#writeInt() && ByteBuf#writeBytes()
read Object --> ByteBuf#readInt() && ByteBuf#readBytes() [<== 這個方法返回值是ByteBuf實例]
- 最后注意釋放
ByteBuf
的引用,否則有可能導致內存泄漏。
自定義協議編碼解碼實現
自定義協議編碼解碼主要包括四個部分的編碼解碼器:
- 請求
Packet
編碼器:RequestMessagePacketEncoder
,主要用於客戶端把RequestMessagePacket
實例序列化為二進制序列。 - 請求
Packet
解碼器:RequestMessagePacketDecoder
,主要用於服務端把二進制序列反序列化為RequestMessagePacket
實例。 - 響應
Packet
編碼器:ResponseMessagePacketEncoder
,主要用於服務端把ResponseMessagePacket
實例序列化為二進制序列。 - 響應
Packet
解碼器:ResponseMessagePacketDecoder
,主要用於客戶端把二進制序列反序列化為ResponseMessagePacket
實例。
畫個圖描述一下幾個組件的交互流程(省略了部分入站和出站處理器):
序列化器Serializer
的代碼如下:
public interface Serializer {
byte[] encode(Object target);
Object decode(byte[] bytes, Class<?> targetClass);
}
// FastJson實現
public enum FastJsonSerializer implements Serializer {
// 單例
X;
@Override
public byte[] encode(Object target) {
return JSON.toJSONBytes(target);
}
@Override
public Object decode(byte[] bytes, Class<?> targetClass) {
return JSON.parseObject(bytes, targetClass);
}
}
請求Packet
編碼器RequestMessagePacketEncoder
的代碼如下:
@RequiredArgsConstructor
public class RequestMessagePacketEncoder extends MessageToByteEncoder<RequestMessagePacket> {
private final Serializer serializer;
@Override
protected void encode(ChannelHandlerContext context, RequestMessagePacket packet, ByteBuf out) throws Exception {
// 魔數
out.writeInt(packet.getMagicNumber());
// 版本
out.writeInt(packet.getVersion());
// 流水號
out.writeInt(packet.getSerialNumber().length());
out.writeCharSequence(packet.getSerialNumber(), ProtocolConstant.UTF_8);
// 消息類型
out.writeByte(packet.getMessageType().getType());
// 附件size
Map<String, String> attachments = packet.getAttachments();
out.writeInt(attachments.size());
// 附件內容
attachments.forEach((k, v) -> {
out.writeInt(k.length());
out.writeCharSequence(k, ProtocolConstant.UTF_8);
out.writeInt(v.length());
out.writeCharSequence(v, ProtocolConstant.UTF_8);
});
// 接口全類名
out.writeInt(packet.getInterfaceName().length());
out.writeCharSequence(packet.getInterfaceName(), ProtocolConstant.UTF_8);
// 方法名
out.writeInt(packet.getMethodName().length());
out.writeCharSequence(packet.getMethodName(), ProtocolConstant.UTF_8);
// 方法參數簽名(String[]類型) - 非必須
if (null != packet.getMethodArgumentSignatures()) {
int len = packet.getMethodArgumentSignatures().length;
// 方法參數簽名數組長度
out.writeInt(len);
for (int i = 0; i < len; i++) {
String methodArgumentSignature = packet.getMethodArgumentSignatures()[i];
out.writeInt(methodArgumentSignature.length());
out.writeCharSequence(methodArgumentSignature, ProtocolConstant.UTF_8);
}
} else {
out.writeInt(0);
}
// 方法參數(Object[]類型) - 非必須
if (null != packet.getMethodArguments()) {
int len = packet.getMethodArguments().length;
// 方法參數數組長度
out.writeInt(len);
for (int i = 0; i < len; i++) {
byte[] bytes = serializer.encode(packet.getMethodArguments()[i]);
out.writeInt(bytes.length);
out.writeBytes(bytes);
}
} else {
out.writeInt(0);
}
}
}
請求Packet
解碼器RequestMessagePacketDecoder
的代碼如下:
@RequiredArgsConstructor
public class RequestMessagePacketDecoder extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext context, ByteBuf in, List<Object> list) throws Exception {
RequestMessagePacket packet = new RequestMessagePacket();
// 魔數
packet.setMagicNumber(in.readInt());
// 版本
packet.setVersion(in.readInt());
// 流水號
int serialNumberLength = in.readInt();
packet.setSerialNumber(in.readCharSequence(serialNumberLength, ProtocolConstant.UTF_8).toString());
// 消息類型
byte messageTypeByte = in.readByte();
packet.setMessageType(MessageType.fromValue(messageTypeByte));
// 附件
Map<String, String> attachments = Maps.newHashMap();
packet.setAttachments(attachments);
int attachmentSize = in.readInt();
if (attachmentSize > 0) {
for (int i = 0; i < attachmentSize; i++) {
int keyLength = in.readInt();
String key = in.readCharSequence(keyLength, ProtocolConstant.UTF_8).toString();
int valueLength = in.readInt();
String value = in.readCharSequence(valueLength, ProtocolConstant.UTF_8).toString();
attachments.put(key, value);
}
}
// 接口全類名
int interfaceNameLength = in.readInt();
packet.setInterfaceName(in.readCharSequence(interfaceNameLength, ProtocolConstant.UTF_8).toString());
// 方法名
int methodNameLength = in.readInt();
packet.setMethodName(in.readCharSequence(methodNameLength, ProtocolConstant.UTF_8).toString());
// 方法參數簽名
int methodArgumentSignatureArrayLength = in.readInt();
if (methodArgumentSignatureArrayLength > 0) {
String[] methodArgumentSignatures = new String[methodArgumentSignatureArrayLength];
for (int i = 0; i < methodArgumentSignatureArrayLength; i++) {
int methodArgumentSignatureLength = in.readInt();
methodArgumentSignatures[i] = in.readCharSequence(methodArgumentSignatureLength, ProtocolConstant.UTF_8).toString();
}
packet.setMethodArgumentSignatures(methodArgumentSignatures);
}
// 方法參數
int methodArgumentArrayLength = in.readInt();
if (methodArgumentArrayLength > 0) {
// 這里的Object[]實際上是ByteBuf[] - 后面需要二次加工為對應類型的實例
Object[] methodArguments = new Object[methodArgumentArrayLength];
for (int i = 0; i < methodArgumentArrayLength; i++) {
int byteLength = in.readInt();
methodArguments[i] = in.readBytes(byteLength);
}
packet.setMethodArguments(methodArguments);
}
list.add(packet);
}
}
響應Packet
編碼器ResponseMessagePacketEncoder
的代碼如下:
@RequiredArgsConstructor
public class ResponseMessagePacketEncoder extends MessageToByteEncoder<ResponseMessagePacket> {
private final Serializer serializer;
@Override
protected void encode(ChannelHandlerContext ctx, ResponseMessagePacket packet, ByteBuf out) throws Exception {
// 魔數
out.writeInt(packet.getMagicNumber());
// 版本
out.writeInt(packet.getVersion());
// 流水號
out.writeInt(packet.getSerialNumber().length());
out.writeCharSequence(packet.getSerialNumber(), ProtocolConstant.UTF_8);
// 消息類型
out.writeByte(packet.getMessageType().getType());
// 附件size
Map<String, String> attachments = packet.getAttachments();
out.writeInt(attachments.size());
// 附件內容
attachments.forEach((k, v) -> {
out.writeInt(k.length());
out.writeCharSequence(k, ProtocolConstant.UTF_8);
out.writeInt(v.length());
out.writeCharSequence(v, ProtocolConstant.UTF_8);
});
// error code
out.writeLong(packet.getErrorCode());
// message
String message = packet.getMessage();
ByteBufferUtils.X.encodeUtf8CharSequence(out, message);
// payload
byte[] bytes = serializer.encode(packet.getPayload());
out.writeInt(bytes.length);
out.writeBytes(bytes);
}
}
響應Packet
解碼器ResponseMessagePacketDecoder
的代碼如下:
public class ResponseMessagePacketDecoder extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
ResponseMessagePacket packet = new ResponseMessagePacket();
// 魔數
packet.setMagicNumber(in.readInt());
// 版本
packet.setVersion(in.readInt());
// 流水號
int serialNumberLength = in.readInt();
packet.setSerialNumber(in.readCharSequence(serialNumberLength, ProtocolConstant.UTF_8).toString());
// 消息類型
byte messageTypeByte = in.readByte();
packet.setMessageType(MessageType.fromValue(messageTypeByte));
// 附件
Map<String, String> attachments = Maps.newHashMap();
packet.setAttachments(attachments);
int attachmentSize = in.readInt();
if (attachmentSize > 0) {
for (int i = 0; i < attachmentSize; i++) {
int keyLength = in.readInt();
String key = in.readCharSequence(keyLength, ProtocolConstant.UTF_8).toString();
int valueLength = in.readInt();
String value = in.readCharSequence(valueLength, ProtocolConstant.UTF_8).toString();
attachments.put(key, value);
}
}
// error code
packet.setErrorCode(in.readLong());
// message
int messageLength = in.readInt();
packet.setMessage(in.readCharSequence(messageLength, ProtocolConstant.UTF_8).toString());
// payload - ByteBuf實例
int payloadLength = in.readInt();
packet.setPayload(in.readBytes(payloadLength));
out.add(packet);
}
}
核心的編碼解碼器已經編寫完,接着要注意一下TCP
協議二進制包發送的時候只保證了包的發送順序、確認發送以及重傳,無法保證二進制包是否完整(有些博客也稱此類場景為粘包、半包等等,其實網絡協議里面並沒有定義這些術語,估計是有人杜撰出來),因此這里采取了定長幀編碼和解碼器LengthFieldPrepender
和LengthFieldBasedFrameDecoder
,簡單來說就是在消息幀的開頭幾位定義了整個幀的長度,讀取到整個長度的消息幀才認為是一個完整的二進制報文。舉個幾個例子:
|<--------packet frame--------->|
| Length Field | Actual Content |
序號 | Length Field | Actual Content |
---|---|---|
0 | 4 | abcd |
1 | 9 | throwable |
2 | 14 | {"name":"doge"} |
編寫測試客戶端和服務端
客戶端代碼如下:
@Slf4j
public class TestProtocolClient {
public static void main(String[] args) throws Exception {
int port = 9092;
EventLoopGroup workerGroup = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
try {
bootstrap.group(workerGroup);
bootstrap.channel(NioSocketChannel.class);
bootstrap.option(ChannelOption.SO_KEEPALIVE, Boolean.TRUE);
bootstrap.option(ChannelOption.TCP_NODELAY, Boolean.TRUE);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, 0, 4));
ch.pipeline().addLast(new LengthFieldPrepender(4));
ch.pipeline().addLast(new RequestMessagePacketEncoder(FastJsonSerializer.X));
ch.pipeline().addLast(new ResponseMessagePacketDecoder());
ch.pipeline().addLast(new SimpleChannelInboundHandler<ResponseMessagePacket>() {
@Override
protected void channelRead0(ChannelHandlerContext ctx, ResponseMessagePacket packet) throws Exception {
Object targetPayload = packet.getPayload();
if (targetPayload instanceof ByteBuf) {
ByteBuf byteBuf = (ByteBuf) targetPayload;
int readableByteLength = byteBuf.readableBytes();
byte[] bytes = new byte[readableByteLength];
byteBuf.readBytes(bytes);
targetPayload = FastJsonSerializer.X.decode(bytes, String.class);
byteBuf.release();
}
packet.setPayload(targetPayload);
log.info("接收到來自服務端的響應消息,消息內容:{}", JSON.toJSONString(packet));
}
});
}
});
ChannelFuture future = bootstrap.connect("localhost", port).sync();
log.info("啟動NettyClient[{}]成功...", port);
Channel channel = future.channel();
RequestMessagePacket packet = new RequestMessagePacket();
packet.setMagicNumber(ProtocolConstant.MAGIC_NUMBER);
packet.setVersion(ProtocolConstant.VERSION);
packet.setSerialNumber(SerialNumberUtils.X.generateSerialNumber());
packet.setMessageType(MessageType.REQUEST);
packet.setInterfaceName("club.throwable.contract.HelloService");
packet.setMethodName("sayHello");
packet.setMethodArgumentSignatures(new String[]{"java.lang.String"});
packet.setMethodArguments(new Object[]{"doge"});
channel.writeAndFlush(packet);
future.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
}
}
}
服務端代碼如下:
@Slf4j
public class TestProtocolServer {
public static void main(String[] args) throws Exception {
int port = 9092;
ServerBootstrap bootstrap = new ServerBootstrap();
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, 0, 4));
ch.pipeline().addLast(new LengthFieldPrepender(4));
ch.pipeline().addLast(new RequestMessagePacketDecoder());
ch.pipeline().addLast(new ResponseMessagePacketEncoder(FastJsonSerializer.X));
ch.pipeline().addLast(new SimpleChannelInboundHandler<RequestMessagePacket>() {
@Override
protected void channelRead0(ChannelHandlerContext ctx, RequestMessagePacket packet) throws Exception {
log.info("接收到來自客戶端的請求消息,消息內容:{}", JSON.toJSONString(packet));
ResponseMessagePacket response = new ResponseMessagePacket();
response.setMagicNumber(packet.getMagicNumber());
response.setVersion(packet.getVersion());
response.setSerialNumber(packet.getSerialNumber());
response.setAttachments(packet.getAttachments());
response.setMessageType(MessageType.RESPONSE);
response.setErrorCode(200L);
response.setMessage("Success");
response.setPayload("{\"name\":\"throwable\"}");
ctx.writeAndFlush(response);
}
});
}
});
ChannelFuture future = bootstrap.bind(port).sync();
log.info("啟動NettyServer[{}]成功...", port);
future.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
}
這里在測試的環境中,最大的消息幀長度暫時定義為1024。先啟動服務端,再啟動客戶端,見控制台輸出如下:
// 服務端
22:29:32.596 [main] INFO club.throwable.protocol.TestProtocolServer - 啟動NettyServer[9092]成功...
...省略其他日志...
22:29:53.538 [nioEventLoopGroup-3-1] INFO club.throwable.protocol.TestProtocolServer - 接收到來自客戶端的請求消息,消息內容:{"attachments":{},"interfaceName":"club.throwable.contract.HelloService","magicNumber":10086,"messageType":"REQUEST","methodArgumentSignatures":["java.lang.String"],"methodArguments":[{"contiguous":true,"direct":true,"readOnly":false,"readable":true,"writable":false}],"methodName":"sayHello","serialNumber":"7f992c7cf9f445258601def1cac9bec0","version":1}
// 客戶端
22:31:28.360 [main] INFO club.throwable.protocol.TestProtocolClient - 啟動NettyClient[9092]成功...
...省略其他日志...
22:31:39.320 [nioEventLoopGroup-2-1] INFO club.throwable.protocol.TestProtocolClient - 接收到來自服務端的響應消息,消息內容:{"attachments":{},"errorCode":200,"magicNumber":10086,"message":"Success","messageType":"RESPONSE","payload":"{\"name\":\"throwable\"}","serialNumber":"320808e709b34edbb91ba557780b58ad","version":1}
小結
一個基於Netty
實現的簡單的自定義協議基本完成,但是要編寫一個優秀的RPC
框架,還需要做服務端的宿主類和目標方法查詢、調用,客戶端的動態代理,Netty
的NIO
模式下的同步調用改造,心跳處理,異常處理等等。后面會使用多篇文章逐個問題解決,網絡編程其實挺好玩了,就是編碼量會比較大(゜-゜)つロ
。
Demo
項目:
(e-a-20200112 c-1-d)
技術公眾號(《Throwable文摘》),不定期推送筆者原創技術文章(絕不抄襲或者轉載):
娛樂公眾號(《天天沙雕》),甄選奇趣沙雕圖文和視頻不定期推送,緩解生活工作壓力: