閱讀這篇文章之前,建議先閱讀和這篇文章關聯的內容。
[1]詳細剖析分布式微服務架構下網絡通信的底層實現原理(圖解)
[2][年薪60W的技巧]工作了5年,你真的理解Netty以及為什么要用嗎?(深度干貨)
[4]BAT面試必問細節:關於Netty中的ByteBuf詳解
[5]通過大量實戰案例分解Netty中是如何解決拆包黏包問題的?
[6]基於Netty實現自定義消息通信協議(協議設計及解析應用實戰)
在前面的內容中,我們已經由淺入深的理解了Netty的基礎知識和實現原理,相信大家已經對Netty有了一個較為全面的理解。那么接下來,我們通過一個手寫RPC通信的實戰案例來帶大家了解Netty的實際應用。
為什么要選擇RPC來作為實戰呢?因為Netty本身就是解決通信問題,而在實際應用中,RPC協議框架是我們接觸得最多的一種,所以這個實戰能讓大家了解到Netty實際應用之外,還能理解RPC的底層原理。
什么是RPC
RPC全稱為(Remote Procedure Call),是一種通過網絡從遠程計算機程序上請求服務,而不需要了解底層網絡技術的協議,簡單理解就是讓開發者能夠像調用本地服務一樣調用遠程服務。
既然是協議,那么它必然有協議的規范,如圖6-1所示。
為了達到“讓開發者能夠像調用本地服務那樣調用遠程服務”的目的,RPC協議需像圖6-1那樣實現遠程交互。
- 客戶端調用遠程服務時,必須要通過本地動態代理模塊來屏蔽網絡通信的細節,所以動態代理模塊需要負責將請求參數、方法等數據組裝成數據包發送到目標服務器
- 這個數據包在發送時,還需要遵循約定的消息協議以及序列化協議,最終轉化為二進制數據流傳輸
- 服務端收到數據包后,先按照約定的消息協議解碼,得到請求信息。
- 服務端再根據請求信息路由調用到目標服務,獲得結果並返回給客戶端。
業內主流的RPC框架
凡是滿足RPC協議的框架,我們成為RPC框架,在實際開發中,我們可以使用開源且相對成熟的RPC框架解決微服務架構下的遠程通信問題,常見的rpc框架:
- Thrift:thrift是一個軟件框架,用來進行可擴展且跨語言的服務的開發。它結合了功能強大的軟件堆棧和代碼生成引擎,以構建在 C++, Java, Python, PHP, Ruby, Erlang, Perl, Haskell, C#, Cocoa, JavaScript, Node.js, Smalltalk, and OCaml 這些編程語言間無縫結合的、高效的服務。
- Dubbo:Dubbo是一個分布式服務框架,以及SOA治理方案。其功能主要包括:高性能NIO通訊及多協議集成,服務動態尋址與路由,軟負載均衡與容錯,依賴分析與降級等。 Dubbo是阿里巴巴內部的SOA服務化治理方案的核心框架,Dubbo自2011年開源后,已被許多非阿里系公司使用。
手寫RPC注意要點
基於上文中對於RPC協議的理解,如果我們自己去實現,需要考慮哪些技術呢? 其實基於圖6-1的整個流程應該有一個大概的理解。
- 通信協議,RPC框架對性能的要求非常高,所以通信協議應該是越簡單越好,這樣可以減少編解碼帶來的性能損耗,大部分主流的RPC框架會直接選擇TCP、HTTP協議。
- 序列化和反序列化,數據要進行網絡傳輸,需要對數據進行序列化和反序列化,前面我們說過,所謂的序列化和反序列化是不把對象轉化成二進制流以及將二進制流轉化成對象的過程。在序列化框架選擇上,我們一般會選擇高效且通用的算法,比如FastJson、Protobuf、Hessian等。這些序列化技術都要比原生的序列化操作更加高效,壓縮比也較高。
- 動態代理, 客戶端調用遠程服務時,需要通過動態代理來屏蔽網絡通信細節。而動態代理又是在運行過程中生成的,所以動態代理類的生成速度、字節碼大小都會影響到RPC整體框架的性能和資源消耗。常見的動態代理技術: Javassist、Cglib、JDK的動態代理等。
基於Netty手寫實現RPC
理解了RPC協議后,我們基於Netty來實現一個RPC通信框架。
代碼詳見附件 netty-rpc-example
需要引入的jar包:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.72</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
</dependency>
模塊依賴關系:
-
provider依賴 netty-rpc-protocol和netty-rpc-api
-
cosumer依賴 netty-rpc-protocol和netty-rpc-api
netty-rpc-api模塊
IUserService
public interface IUserService {
String saveUser(String name);
}
netty-rpc-provider模塊
UserServiceImpl
@Service
@Slf4j
public class UserServiceImpl implements IUserService {
@Override
public String saveUser(String name) {
log.info("begin saveUser:"+name);
return "Save User Success!";
}
}
NettyRpcProviderMain
注意,在當前步驟中,描述了case的部分,暫時先不用加,后續再加上
@ComponentScan(basePackages = {"com.example.spring","com.example.service"}) //case1(后續再加上)
@SpringBootApplication
public class NettyRpcProviderMain {
public static void main(String[] args) throws Exception {
SpringApplication.run(NettyRpcProviderMain.class, args);
new NettyServer("127.0.0.1",8080).startNettyServer(); //case2(后續再加上)
}
}
netty-rpc-protocol
開始寫通信協議模塊,這個模塊主要做幾個事情
- 定義消息協議
- 定義序列化反序列化方法
- 建立netty通信
定義消息協議
之前我們講過自定義消息協議,我們在這里可以按照下面這個協議格式來定義好。
/*
+----------------------------------------------+
| 魔數 2byte | 序列化算法 1byte | 請求類型 1byte |
+----------------------------------------------+
| 消息 ID 8byte | 數據長度 4byte |
+----------------------------------------------+
*/
Header
@AllArgsConstructor
@Data
public class Header implements Serializable {
/*
+----------------------------------------------+
| 魔數 2byte | 序列化算法 1byte | 請求類型 1byte |
+----------------------------------------------+
| 消息 ID 8byte | 數據長度 4byte |
+----------------------------------------------+
*/
private short magic; //魔數-用來驗證報文的身份(2個字節)
private byte serialType; //序列化類型(1個字節)
private byte reqType; //操作類型(1個字節)
private long requestId; //請求id(8個字節)
private int length; //數據長度(4個字節)
}
RpcRequest
@Data
public class RpcRequest implements Serializable {
private String className;
private String methodName;
private Object[] params;
private Class<?>[] parameterTypes;
}
RpcResponse
@Data
public class RpcResponse implements Serializable {
private Object data;
private String msg;
}
RpcProtocol
@Data
public class RpcProtocol<T> implements Serializable {
private Header header;
private T content;
}
定義相關常量
上述消息協議定義中,涉及到幾個枚舉相關的類,定義如下
ReqType
消息類型
public enum ReqType {
REQUEST((byte)1),
RESPONSE((byte)2),
HEARTBEAT((byte)3);
private byte code;
private ReqType(byte code) {
this.code=code;
}
public byte code(){
return this.code;
}
public static ReqType findByCode(int code) {
for (ReqType msgType : ReqType.values()) {
if (msgType.code() == code) {
return msgType;
}
}
return null;
}
}
SerialType
序列化類型
public enum SerialType {
JSON_SERIAL((byte)0),
JAVA_SERIAL((byte)1);
private byte code;
SerialType(byte code) {
this.code=code;
}
public byte code(){
return this.code;
}
}
RpcConstant
public class RpcConstant {
//header部分的總字節數
public final static int HEAD_TOTAL_LEN=16;
//魔數
public final static short MAGIC=0xca;
}
定義序列化相關實現
這里演示兩種,一種是JSON方式,另一種是Java原生的方式
ISerializer
public interface ISerializer {
<T> byte[] serialize(T obj);
<T> T deserialize(byte[] data,Class<T> clazz);
byte getType();
}
JavaSerializer
public class JavaSerializer implements ISerializer{
@Override
public <T> byte[] serialize(T obj) {
ByteArrayOutputStream byteArrayOutputStream=
new ByteArrayOutputStream();
try {
ObjectOutputStream outputStream=
new ObjectOutputStream(byteArrayOutputStream);
outputStream.writeObject(obj);
return byteArrayOutputStream.toByteArray();
} catch (IOException e) {
e.printStackTrace();
}
return new byte[0];
}
@Override
public <T> T deserialize(byte[] data, Class<T> clazz) {
ByteArrayInputStream byteArrayInputStream=new ByteArrayInputStream(data);
try {
ObjectInputStream objectInputStream=
new ObjectInputStream(byteArrayInputStream);
return (T) objectInputStream.readObject();
} catch (IOException e) {
e.printStackTrace();
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
return null;
}
@Override
public byte getType() {
return SerialType.JAVA_SERIAL.code();
}
}
JsonSerializer
public class JsonSerializer implements ISerializer{
@Override
public <T> byte[] serialize(T obj) {
return JSON.toJSONString(obj).getBytes();
}
@Override
public <T> T deserialize(byte[] data, Class<T> clazz) {
return JSON.parseObject(new String(data),clazz);
}
@Override
public byte getType() {
return SerialType.JSON_SERIAL.code();
}
}
SerializerManager
實現對序列化機制的管理
public class SerializerManager {
private final static ConcurrentHashMap<Byte, ISerializer> serializers=new ConcurrentHashMap<Byte, ISerializer>();
static {
ISerializer jsonSerializer=new JsonSerializer();
ISerializer javaSerializer=new JavaSerializer();
serializers.put(jsonSerializer.getType(),jsonSerializer);
serializers.put(javaSerializer.getType(),javaSerializer);
}
public static ISerializer getSerializer(byte key){
ISerializer serializer=serializers.get(key);
if(serializer==null){
return new JavaSerializer();
}
return serializer;
}
}
定義編碼和解碼實現
由於自定義了消息協議,所以 需要自己實現編碼和解碼,代碼如下
RpcDecoder
@Slf4j
public class RpcDecoder extends ByteToMessageDecoder {
/*
+----------------------------------------------+
| 魔數 2byte | 序列化算法 1byte | 請求類型 1byte |
+----------------------------------------------+
| 消息 ID 8byte | 數據長度 4byte |
+----------------------------------------------+
*/
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
log.info("==========begin RpcDecoder ==============");
if(in.readableBytes()< RpcConstant.HEAD_TOTAL_LEN){
//消息長度不夠,不需要解析
return;
}
in.markReaderIndex();//標記一個讀取數據的索引,后續用來重置。
short magic=in.readShort(); //讀取magic
if(magic!=RpcConstant.MAGIC){
throw new IllegalArgumentException("Illegal request parameter 'magic',"+magic);
}
byte serialType=in.readByte(); //讀取序列化算法類型
byte reqType=in.readByte(); //請求類型
long requestId=in.readLong(); //請求消息id
int dataLength=in.readInt(); //請求數據長度
//可讀區域的字節數小於實際數據長度
if(in.readableBytes()<dataLength){
in.resetReaderIndex();
return;
}
//讀取消息內容
byte[] content=new byte[dataLength];
in.readBytes(content);
//構建header頭信息
Header header=new Header(magic,serialType,reqType,requestId,dataLength);
ISerializer serializer=SerializerManager.getSerializer(serialType);
ReqType rt=ReqType.findByCode(reqType);
switch(rt){
case REQUEST:
RpcRequest request=serializer.deserialize(content, RpcRequest.class);
RpcProtocol<RpcRequest> reqProtocol=new RpcProtocol<>();
reqProtocol.setHeader(header);
reqProtocol.setContent(request);
out.add(reqProtocol);
break;
case RESPONSE:
RpcResponse response=serializer.deserialize(content,RpcResponse.class);
RpcProtocol<RpcResponse> resProtocol=new RpcProtocol<>();
resProtocol.setHeader(header);
resProtocol.setContent(response);
out.add(resProtocol);
break;
case HEARTBEAT:
break;
default:
break;
}
}
}
RpcEncoder
@Slf4j
public class RpcEncoder extends MessageToByteEncoder<RpcProtocol<Object>> {
/*
+----------------------------------------------+
| 魔數 2byte | 序列化算法 1byte | 請求類型 1byte |
+----------------------------------------------+
| 消息 ID 8byte | 數據長度 4byte |
+----------------------------------------------+
*/
@Override
protected void encode(ChannelHandlerContext ctx, RpcProtocol<Object> msg, ByteBuf out) throws Exception {
log.info("=============begin RpcEncoder============");
Header header=msg.getHeader();
out.writeShort(header.getMagic()); //寫入魔數
out.writeByte(header.getSerialType()); //寫入序列化類型
out.writeByte(header.getReqType());//寫入請求類型
out.writeLong(header.getRequestId()); //寫入請求id
ISerializer serializer= SerializerManager.getSerializer(header.getSerialType());
byte[] data=serializer.serialize(msg.getContent()); //序列化
header.setLength(data.length);
out.writeInt(data.length); //寫入消息長度
out.writeBytes(data);
}
}
NettyServer
實現NettyServer構建。
@Slf4j
public class NettyServer{
private String serverAddress; //地址
private int serverPort; //端口
public NettyServer(String serverAddress, int serverPort) {
this.serverAddress = serverAddress;
this.serverPort = serverPort;
}
public void startNettyServer() throws Exception {
log.info("begin start Netty Server");
EventLoopGroup bossGroup=new NioEventLoopGroup();
EventLoopGroup workGroup=new NioEventLoopGroup();
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new RpcServerInitializer());
ChannelFuture channelFuture = bootstrap.bind(this.serverAddress, this.serverPort).sync();
log.info("Server started Success on Port:{}", this.serverPort);
channelFuture.channel().closeFuture().sync();
}catch (Exception e){
log.error("Rpc Server Exception",e);
}finally {
workGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
}
RpcServerInitializer
public class RpcServerInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline()
.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE,12,4,0,0))
.addLast(new RpcDecoder())
.addLast(new RpcEncoder())
.addLast(new RpcServerHandler());
}
}
RpcServerHandler
public class RpcServerHandler extends SimpleChannelInboundHandler<RpcProtocol<RpcRequest>> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, RpcProtocol<RpcRequest> msg) throws Exception {
RpcProtocol resProtocol=new RpcProtocol<>();
Header header=msg.getHeader();
header.setReqType(ReqType.RESPONSE.code());
Object result=invoke(msg.getContent());
resProtocol.setHeader(header);
RpcResponse response=new RpcResponse();
response.setData(result);
response.setMsg("success");
resProtocol.setContent(response);
ctx.writeAndFlush(resProtocol);
}
private Object invoke(RpcRequest request){
try {
Class<?> clazz=Class.forName(request.getClassName());
Object bean= SpringBeansManager.getBean(clazz); //獲取實例對象(CASE)
Method declaredMethod=clazz.getDeclaredMethod(request.getMethodName(),request.getParameterTypes());
return declaredMethod.invoke(bean,request.getParams());
} catch (ClassNotFoundException | NoSuchMethodException e) {
e.printStackTrace();
} catch (IllegalAccessException e) {
e.printStackTrace();
} catch (InvocationTargetException e) {
e.printStackTrace();
}
return null;
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
super.exceptionCaught(ctx, cause);
}
}
SpringBeansManager
@Component
public class SpringBeansManager implements ApplicationContextAware {
private static ApplicationContext applicationContext;
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
SpringBeansManager.applicationContext=applicationContext;
}
public static <T> T getBean(Class<T> clazz){
return applicationContext.getBean(clazz);
}
}
需要注意,這個類的構建好之后,需要在netty-rpc-provider模塊的main方法中增加compone-scan進行掃描
@ComponentScan(basePackages = {"com.example.spring","com.example.service"}) //修改這里
@SpringBootApplication
public class NettyRpcProviderMain {
public static void main(String[] args) throws Exception {
SpringApplication.run(NettyRpcProviderMain.class, args);
new NettyServer("127.0.0.1",8080).startNettyServer(); // 修改這里
}
}
netty-rpc-consumer
接下來開始實現消費端
RpcClientProxy
public class RpcClientProxy {
public <T> T clientProxy(final Class<T> interfaceCls,final String host,final int port){
return (T) Proxy.newProxyInstance
(interfaceCls.getClassLoader(),
new Class<?>[]{interfaceCls},
new RpcInvokerProxy(host,port));
}
}
RpcInvokerProxy
@Slf4j
public class RpcInvokerProxy implements InvocationHandler {
private String serviceAddress;
private int servicePort;
public RpcInvokerProxy(String serviceAddress, int servicePort) {
this.serviceAddress = serviceAddress;
this.servicePort = servicePort;
}
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
log.info("begin invoke target server");
//組裝參數
RpcProtocol<RpcRequest> protocol=new RpcProtocol<>();
long requestId= RequestHolder.REQUEST_ID.incrementAndGet();
Header header=new Header(RpcConstant.MAGIC, SerialType.JSON_SERIAL.code(), ReqType.REQUEST.code(),requestId,0);
protocol.setHeader(header);
RpcRequest request=new RpcRequest();
request.setClassName(method.getDeclaringClass().getName());
request.setMethodName(method.getName());
request.setParameterTypes(method.getParameterTypes());
request.setParams(args);
protocol.setContent(request);
//發送請求
NettyClient nettyClient=new NettyClient(serviceAddress,servicePort);
//構建異步數據處理
RpcFuture<RpcResponse> future=new RpcFuture<>(new DefaultPromise<>(new DefaultEventLoop()));
RequestHolder.REQUEST_MAP.put(requestId,future);
nettyClient.sendRequest(protocol);
return future.getPromise().get().getData();
}
}
定義客戶端連接
在netty-rpc-protocol這個模塊的protocol包路徑下,創建NettyClient
@Slf4j
public class NettyClient {
private final Bootstrap bootstrap;
private final EventLoopGroup eventLoopGroup=new NioEventLoopGroup();
private String serviceAddress;
private int servicePort;
public NettyClient(String serviceAddress,int servicePort){
log.info("begin init NettyClient");
bootstrap=new Bootstrap();
bootstrap.group(eventLoopGroup)
.channel(NioSocketChannel.class)
.handler(new RpcClientInitializer());
this.serviceAddress=serviceAddress;
this.servicePort=servicePort;
}
public void sendRequest(RpcProtocol<RpcRequest> protocol) throws InterruptedException {
ChannelFuture future=bootstrap.connect(this.serviceAddress,this.servicePort).sync();
future.addListener(listener->{
if(future.isSuccess()){
log.info("connect rpc server {} success.",this.serviceAddress);
}else{
log.error("connect rpc server {} failed .",this.serviceAddress);
future.cause().printStackTrace();
eventLoopGroup.shutdownGracefully();
}
});
log.info("begin transfer data");
future.channel().writeAndFlush(protocol);
}
}
RpcClientInitializer
@Slf4j
public class RpcClientInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
log.info("begin initChannel");
ch.pipeline()
.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE,12,4,0,0))
.addLast(new LoggingHandler())
.addLast(new RpcEncoder())
.addLast(new RpcDecoder())
.addLast(new RpcClientHandler());
}
}
RpcClientHandler
需要注意,Netty的通信過程是基於入站出站分離的,所以在獲取結果時,我們需要借助一個Future對象來完成。
@Slf4j
public class RpcClientHandler extends SimpleChannelInboundHandler<RpcProtocol<RpcResponse>> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, RpcProtocol<RpcResponse> msg) throws Exception {
log.info("receive rpc server result");
long requestId=msg.getHeader().getRequestId();
RpcFuture<RpcResponse> future=RequestHolder.REQUEST_MAP.remove(requestId);
future.getPromise().setSuccess(msg.getContent()); //返回結果
}
}
Future的實現
在netty-rpc-protocol模塊中添加rpcFuture實現
RpcFuture
@Data
public class RpcFuture<T> {
//Promise是可寫的 Future, Future自身並沒有寫操作相關的接口,
// Netty通過 Promise對 Future進行擴展,用於設置IO操作的結果
private Promise<T> promise;
public RpcFuture(Promise<T> promise) {
this.promise = promise;
}
}
RequestHolder
保存requestid和future的對應結果
public class RequestHolder {
public static final AtomicLong REQUEST_ID=new AtomicLong();
public static final Map<Long,RpcFuture> REQUEST_MAP=new ConcurrentHashMap<>();
}
需要源碼的同學,請關注公眾號[跟着Mic學架構],回復關鍵字[rpc],即可獲得
版權聲明:本博客所有文章除特別聲明外,均采用 CC BY-NC-SA 4.0 許可協議。轉載請注明來自
Mic帶你學架構
!
如果本篇文章對您有幫助,還請幫忙點個關注和贊,您的堅持是我不斷創作的動力。歡迎關注「跟着Mic學架構」公眾號公眾號獲取更多技術干貨!