前提
前置文章:
Github Page
:《基於Netty和SpringBoot實現一個輕量級RPC框架-協議篇》Coding Page
:《基於Netty和SpringBoot實現一個輕量級RPC框架-協議篇》
在前置的《基於Netty和SpringBoot實現一個輕量級RPC框架-協議篇》一文中已經定義了一個相對簡單的RPC
私有協議,並且實現了對應的編碼和解碼模塊。這篇文章基於協議篇,完成Server
端代碼調用的編寫。考慮到目前相對主流的IOC
容器是Spring
,這里選用了spring-boot-starter
(非MVC
容器,只是單純管理Bean
),依賴JDK1.8+
。
思路
首先RPC
私有協議定義了Client
端會傳過來四個和服務調用息息相關的字符:接口全類名interfaceName
、方法名methodName
、方法參數簽名字符串數組methodArgumentSignatures
(可選,這個參數不是必須傳入的)以及方法參數數組methodArguments
(可選,空方法列表的時候不需要傳入參數)。主要流程如下:
- 把
Server
端的所有服務端(實現)類交由IOC
容器托管。 Client
端發起RPC
請求。- 通過前面提到的最多四個參數,從
Server
服務實例的IOC
容器中匹配出吻合度最高的一個方法java.lang.reflect.Method
實例、該方法實例的宿主類以及宿主類對應的Bean
實例,如果這一步匹配的目標方法超過1個或者為0個,可以直接返回異常信息。 - 把前一步得到的
Method
實例、宿主類Bean
實例,結合方法參數數組methodArguments
進行反射調用,得到調用結果。 Server
端把響應結果封裝到payload
通過私有協議發送回Client
端。
Server端代碼實現
為了暫時方便起見,部分數組入參被重新封裝為ArrayList
,實際上編寫RPC
框架的時候應該優先考慮性能問題,像JDK
提供的集合類庫等等應該盡可能少用(以ArrayList
為例,擴容的時候存在底層Object[]
拷貝,造成性能損失和額外的內存消耗),極盡可能使用基本類型和數組。
先定義方法匹配器MethodMatcher
相關的類:
public interface MethodMatcher {
/**
* 查找一個匹配度最高的方法信息
*
* @param input input
* @return output
*/
MethodMatchOutput selectOneBestMatchMethod(MethodMatchInput input);
}
// 輸入值
@EqualsAndHashCode
@Data
public class MethodMatchInput {
private String interfaceName;
private String methodName;
private List<String> methodArgumentSignatures;
private int methodArgumentArraySize;
}
// 輸出值
@Data
public class MethodMatchOutput {
/**
* 目標方法實例
*/
private Method targetMethod;
/**
* 目標實現類 - 這個有可能是被Cglib增強過的類型,是宿主類的子類,如果沒有被Cglib增強過,那么它就是宿主類
*/
private Class<?> targetClass;
/**
* 宿主類
*/
private Class<?> targetUserClass;
/**
* 宿主類Bean實例
*/
private Object target;
/**
* 方法參數類型列表
*/
private List<Class<?>> parameterTypes;
}
目標方法匹配的邏輯大致如下:
- 方法名稱和方法實例的宿主類型一定作為匹配條件的一部分。
- 如果傳入了參數簽名列表,優先使用參數簽名列表類型進行匹配。
- 如果沒有傳入參數簽名列表,那么使用參數的數量進行匹配。
- 如果參數簽名列表和參數列表都沒有傳入,那么只能通過方法名稱和方法實例的宿主類型匹配。
- 考慮到方法匹配解析的過程相對耗時,需要把結果緩存起來。
分析至此,可以基於反射,編寫一個抽象的方法匹配器BaseMethodMatcher
,然后把獲取宿主類信息的功能委托到子類:
public class MethodMatchException extends RuntimeException {
public MethodMatchException(String message) {
super(message);
}
public MethodMatchException(String message, Throwable cause) {
super(message, cause);
}
public MethodMatchException(Throwable cause) {
super(cause);
}
}
@Data
public class HostClassMethodInfo {
private Class<?> hostClass;
private Class<?> hostUserClass;
private Object hostTarget;
}
@Slf4j
abstract class BaseMethodMatcher implements MethodMatcher {
private final ConcurrentMap<MethodMatchInput, MethodMatchOutput> cache = Maps.newConcurrentMap();
@Override
public MethodMatchOutput selectOneBestMatchMethod(MethodMatchInput input) {
return cache.computeIfAbsent(input, in -> {
try {
MethodMatchOutput output = new MethodMatchOutput();
Class<?> interfaceClass = Class.forName(in.getInterfaceName());
// 獲取宿主類信息
HostClassMethodInfo info = findHostClassMethodInfo(interfaceClass);
List<Method> targetMethods = Lists.newArrayList();
ReflectionUtils.doWithMethods(info.getHostUserClass(), targetMethods::add, method -> {
String methodName = method.getName();
Class<?> declaringClass = method.getDeclaringClass();
List<Class<?>> inputParameterTypes = Optional.ofNullable(in.getMethodArgumentSignatures())
.map(mas -> {
List<Class<?>> list = Lists.newArrayList();
mas.forEach(ma -> list.add(ClassUtils.resolveClassName(ma, null)));
return list;
}).orElse(Lists.newArrayList());
output.setParameterTypes(inputParameterTypes);
// 如果傳入了參數簽名列表,優先使用參數簽名列表類型進行匹配
if (!inputParameterTypes.isEmpty()) {
List<Class<?>> parameterTypes = Lists.newArrayList(method.getParameterTypes());
return Objects.equals(methodName, in.getMethodName()) &&
Objects.equals(info.getHostUserClass(), declaringClass) &&
Objects.equals(parameterTypes, inputParameterTypes);
}
// 如果沒有傳入參數簽名列表,那么使用參數的數量進行匹配
if (in.getMethodArgumentArraySize() > 0) {
List<Class<?>> parameterTypes = Lists.newArrayList(method.getParameterTypes());
return Objects.equals(methodName, in.getMethodName()) &&
Objects.equals(info.getHostUserClass(), declaringClass) &&
in.getMethodArgumentArraySize() == parameterTypes.size();
}
// 如果參數簽名列表和參數列表都沒有傳入,那么只能通過方法名稱和方法實例的宿主類型匹配
return Objects.equals(methodName, in.getMethodName()) &&
Objects.equals(info.getHostUserClass(), declaringClass);
});
if (targetMethods.size() != 1) {
throw new MethodMatchException(String.format("查找到目標方法數量不等於1,interface:%s,method:%s",
in.getInterfaceName(), in.getMethodName()));
}
Method targetMethod = targetMethods.get(0);
output.setTargetClass(info.getHostClass());
output.setTargetMethod(targetMethod);
output.setTargetUserClass(info.getHostUserClass());
output.setTarget(info.getHostTarget());
return output;
} catch (Exception e) {
log.error("查找匹配度最高的方法失敗,輸入參數:{}", JSON.toJSONString(in), e);
if (e instanceof MethodMatchException) {
throw (MethodMatchException) e;
} else {
throw new MethodMatchException(e);
}
}
});
}
/**
* 獲取宿主類的信息
*
* @param interfaceClass interfaceClass
* @return HostClassMethodInfo
*/
abstract HostClassMethodInfo findHostClassMethodInfo(Class<?> interfaceClass);
}
接着,通過接口類型獲取宿主類的功能就委托給Spring
實現,從IOC
容器中獲取,定義SpringMethodMatcher
:
@Component
public class SpringMethodMatcher extends BaseMethodMatcher implements BeanFactoryAware {
private DefaultListableBeanFactory beanFactory;
@Override
public void setBeanFactory(@NonNull BeanFactory beanFactory) throws BeansException {
this.beanFactory = (DefaultListableBeanFactory) beanFactory;
}
@Override
HostClassMethodInfo findHostClassMethodInfo(Class<?> interfaceClass) {
HostClassMethodInfo info = new HostClassMethodInfo();
// 從容器中通過接口類型獲取對應的實現,實現必須只有一個
Object bean = beanFactory.getBean(interfaceClass);
info.setHostTarget(bean);
info.setHostClass(bean.getClass());
info.setHostUserClass(ClassUtils.getUserClass(bean.getClass()));
return info;
}
}
至此,目標方法匹配的模塊已經編寫完畢,接下來需要處理方法參數列表的反序列化。編寫協議的時候,筆者把方法參數列表methodArguments
存放在Object
數組中,傳輸的時候序列化為byte
數組,經過協議解析之后,方法參數列表的實際類型為ByteBuf
數組(這是因為Netty
中的字節容器就是ByteBuf
),那么需要考慮把ByteBuf
數組轉換為目標方法的參數類型實例。主要步驟如下:
- 如果方法參數列表為空,那么什么都不用做,也就是調用了無參數的方法。
- 如果方法參數列表不為空同時方法參數類型列表不為空,優先選用方法參數類型列表進行轉換。
- 如果方法參數列表不為空同時方法參數類型列表為空,則使用
Method#getParameterTypes()
得到的方法參數列表類型進行轉換。
定義一個方法參數轉換器接口MethodArgumentConverter
:
public interface MethodArgumentConverter {
ArgumentConvertOutput convert(ArgumentConvertInput input);
}
@Data
public class ArgumentConvertInput {
/**
* 目標方法
*/
private Method method;
/**
* 方法參數類型列表
*/
private List<Class<?>> parameterTypes;
/**
* 方法參數列表
*/
private List<Object> arguments;
}
@Data
public class ArgumentConvertOutput {
private Object[] arguments;
}
方法參數轉換器的默認實現如下:
@Slf4j
@Component
public class DefaultMethodArgumentConverter implements MethodArgumentConverter {
private final Serializer serializer = FastJsonSerializer.X;
@Override
public ArgumentConvertOutput convert(ArgumentConvertInput input) {
ArgumentConvertOutput output = new ArgumentConvertOutput();
try {
if (null == input.getArguments() || input.getArguments().isEmpty()) {
output.setArguments(new Object[0]);
return output;
}
List<Class<?>> inputParameterTypes = input.getParameterTypes();
int size = inputParameterTypes.size();
if (size > 0) {
Object[] arguments = new Object[size];
for (int i = 0; i < size; i++) {
ByteBuf byteBuf = (ByteBuf) input.getArguments().get(i);
int readableBytes = byteBuf.readableBytes();
byte[] bytes = new byte[readableBytes];
byteBuf.readBytes(bytes);
arguments[i] = serializer.decode(bytes, inputParameterTypes.get(i));
byteBuf.release();
}
output.setArguments(arguments);
return output;
}
Class<?>[] parameterTypes = input.getMethod().getParameterTypes();
int len = parameterTypes.length;
Object[] arguments = new Object[len];
for (int i = 0; i < len; i++) {
ByteBuf byteBuf = (ByteBuf) input.getArguments().get(i);
int readableBytes = byteBuf.readableBytes();
byte[] bytes = new byte[readableBytes];
byteBuf.readBytes(bytes);
arguments[i] = serializer.decode(bytes, parameterTypes[i]);
byteBuf.release();
}
output.setArguments(arguments);
return output;
} catch (Exception e) {
throw new ArgumentConvertException(e);
}
}
}
所有前置工作都完成了,現在編寫一個Server
端的入站處理器ServerHandler
,暫時不做代碼邏輯優化,只做實現,把反射調用的模塊直接在此類中編寫:
@Component
@Slf4j
public class ServerHandler extends SimpleChannelInboundHandler<RequestMessagePacket> {
@Autowired
private MethodMatcher methodMatcher;
@Autowired
private MethodArgumentConverter methodArgumentConverter;
@Override
protected void channelRead0(ChannelHandlerContext ctx, RequestMessagePacket packet) throws Exception {
log.info("服務端接收到:{}", packet);
MethodMatchInput input = new MethodMatchInput();
input.setInterfaceName(packet.getInterfaceName());
input.setMethodArgumentSignatures(Optional.ofNullable(packet.getMethodArgumentSignatures())
.map(Lists::newArrayList).orElse(Lists.newArrayList()));
input.setMethodName(packet.getMethodName());
Object[] methodArguments = packet.getMethodArguments();
input.setMethodArgumentArraySize(null != methodArguments ? methodArguments.length : 0);
MethodMatchOutput output = methodMatcher.selectOneBestMatchMethod(input);
log.info("查找目標實現方法成功,目標類:{},宿主類:{},宿主方法:{}",
output.getTargetClass().getCanonicalName(),
output.getTargetUserClass().getCanonicalName(),
output.getTargetMethod().getName()
);
Method targetMethod = output.getTargetMethod();
ArgumentConvertInput convertInput = new ArgumentConvertInput();
convertInput.setArguments(input.getMethodArgumentArraySize() > 0 ? Lists.newArrayList(methodArguments) : Lists.newArrayList());
convertInput.setMethod(output.getTargetMethod());
convertInput.setParameterTypes(output.getParameterTypes());
ArgumentConvertOutput convertOutput = methodArgumentConverter.convert(convertInput);
ReflectionUtils.makeAccessible(targetMethod);
// 反射調用
Object result = targetMethod.invoke(output.getTarget(), convertOutput.getArguments());
ResponseMessagePacket response = new ResponseMessagePacket();
response.setMagicNumber(packet.getMagicNumber());
response.setVersion(packet.getVersion());
response.setSerialNumber(packet.getSerialNumber());
response.setAttachments(packet.getAttachments());
response.setMessageType(MessageType.RESPONSE);
response.setErrorCode(200L);
response.setMessage("Success");
response.setPayload(JSON.toJSONString(result));
log.info("服務端輸出:{}", JSON.toJSONString(response));
ctx.writeAndFlush(response);
}
}
編寫一個Server
的啟動類ServerApplication
,在Spring
容器啟動之后,啟動Netty
服務:
@SpringBootApplication(scanBasePackages = "club.throwable.server")
@Slf4j
public class ServerApplication implements CommandLineRunner {
@Value("${netty.port:9092}")
private Integer nettyPort;
@Autowired
private ServerHandler serverHandler;
public static void main(String[] args) throws Exception {
SpringApplication.run(ServerApplication.class, args);
}
@Override
public void run(String... args) throws Exception {
int port = nettyPort;
ServerBootstrap bootstrap = new ServerBootstrap();
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, 0, 4));
ch.pipeline().addLast(new LengthFieldPrepender(4));
ch.pipeline().addLast(new RequestMessagePacketDecoder());
ch.pipeline().addLast(new ResponseMessagePacketEncoder(FastJsonSerializer.X));
ch.pipeline().addLast(serverHandler);
}
});
ChannelFuture future = bootstrap.bind(port).sync();
log.info("啟動NettyServer[{}]成功...", port);
future.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
}
最后,編寫契約包和契約實現:
- ch0-custom-rpc-protocol 項目根目錄
- club.throwable
- utils 工具類
- protocol 協議
- exception 異常
- contract 契約
- HelloService 契約接口
- server 服務端
- contract
- DefaultHelloService 契約接口實現
public interface HelloService {
String sayHello(String name);
}
// 實現
@Service
public class DefaultHelloService implements HelloService {
@Override
public String sayHello(String name) {
return String.format("%s say hello!", name);
}
}
先啟動服務端ServerApplication
,再啟動上一節提到的TestProtocolClient
,輸出結果:
// 服務端日志
2020-01-15 00:05:57.898 INFO 14420 --- [ main] club.throwable.server.ServerApplication : 啟動NettyServer[9092]成功...
2020-01-15 00:06:05.980 INFO 14420 --- [ntLoopGroup-3-1] club.throwable.server.ServerHandler : 服務端接收到:RequestMessagePacket(interfaceName=club.throwable.contract.HelloService, methodName=sayHello, methodArgumentSignatures=[java.lang.String], methodArguments=[PooledUnsafeDirectByteBuf(ridx: 0, widx: 6, cap: 6/139)])
2020-01-15 00:06:07.448 INFO 14420 --- [ntLoopGroup-3-1] club.throwable.server.ServerHandler : 查找目標實現方法成功,目標類:club.throwable.server.contract.DefaultHelloService,宿主類:club.throwable.server.contract.DefaultHelloService,宿主方法:sayHello
2020-01-15 00:06:07.521 INFO 14420 --- [ntLoopGroup-3-1] club.throwable.server.ServerHandler : 服務端輸出:{"attachments":{},"errorCode":200,"magicNumber":10086,"message":"Success","messageType":"RESPONSE","payload":"\"doge say hello!\"","serialNumber":"65f01b8e89bb479b8a36a60bd6519617","version":1}
// 客戶端日志
00:06:05.891 [main] INFO club.throwable.protocol.TestProtocolClient - 啟動NettyClient[9092]成功...
...省略...
00:06:13.197 [nioEventLoopGroup-2-1] INFO club.throwable.protocol.TestProtocolClient - 接收到來自服務端的響應消息,消息內容:{"attachments":{},"errorCode":200,"magicNumber":10086,"message":"Success","messageType":"RESPONSE","payload":"\"doge say hello!\"","serialNumber":"65f01b8e89bb479b8a36a60bd6519617","version":1}
可見RPC
調用成功。
小結
編寫RPC
的Server
端技巧在於處理目標方法和宿主類的查找,在轉換方法參數的時候,需要考慮簡化處理和提高效率,剩下的就是做好異常處理和模塊封裝。限於篇幅,后面會先分析Client
端的處理,再分析心跳處理、服務端優化、甚至是對接注冊中心等等,在Netty
、SpringBoot
等優秀框架的加持下編寫一個RPC
框架其實並不困難,困難的是性能優化和生態圈的支持。
Demo
項目地址:
(本文完 c-1-d e-a-20200115)
技術公眾號(《Throwable文摘》),不定期推送筆者原創技術文章(絕不抄襲或者轉載):
娛樂公眾號(《天天沙雕》),甄選奇趣沙雕圖文和視頻不定期推送,緩解生活工作壓力: