前提
前置文章:
前一篇文章相對簡略地介紹了RPC
服務端的編寫,而這篇博文最要介紹客戶端(Client
)的實現。RPC
調用一般是面向契約編程的,而Client
的核心功能就是:把契約接口方法的調用抽象為使用Netty
向RPC
服務端通過私有協議發送一個請求。這里最底層的實現依賴於動態代理,因此動態代理是動態實現接口的最簡單方式(如果字節碼研究得比較深入,可以通過字節碼編程實現接口)。需要的依賴如下:
JDK1.8+
Netty:4.1.44.Final
SpringBoot:2.2.2.RELEASE
動態代理的簡單使用
一般可以通過JDK
動態代理或者Cglib
的字節碼增強來實現此功能,為了簡單起見,不引入額外的依賴,這里選用JDK
動態代理。這里重新搬出前面提到的契約接口HelloService
:
public interface HelloService {
String sayHello(String name);
}
接下來需要通過動態代理為此接口添加一個實現:
public class TestDynamicProxy {
public static void main(String[] args) throws Exception {
Class<HelloService> interfaceKlass = HelloService.class;
InvocationHandler handler = new HelloServiceImpl(interfaceKlass);
HelloService helloService = (HelloService)
Proxy.newProxyInstance(interfaceKlass.getClassLoader(), new Class[]{interfaceKlass}, handler);
System.out.println(helloService.sayHello("throwable"));
}
@RequiredArgsConstructor
private static class HelloServiceImpl implements InvocationHandler {
private final Class<?> interfaceKlass;
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
// 這里應該根據方法的返回值類型去決定返回結果
return String.format("[%s#%s]方法被調用,參數列表:%s", interfaceKlass.getName(), method.getName(),
JSON.toJSONString(args));
}
}
}
// 控制台輸出結果
[club.throwable.contract.HelloService#sayHello]方法被調用,參數列表:["throwable"]
這里可以確認兩點:
InvocationHandler
實現后會對被代理接口生成一個動態實現類。- 動態實現類(接口)方法被調用的時候,實際上是調用
InvocationHandler
對應實例的invoke()
方法,傳入的參數就是當前方法調用的元數據。
Client端代碼實現
Client
端需要通過動態代理為契約接口生成一個動態實現類,然后提取契約接口調用方法時候所能提供的元數據,通過這些元數據和Netty
客戶端的支持(例如Netty
的Channel
)基於私有RPC
協議組裝請求信息並且發送請求。這里先定義一個請求參數提取器接口RequestArgumentExtractor
:
@Data
public class RequestArgumentExtractInput {
private Class<?> interfaceKlass;
private Method method;
}
@Data
public class RequestArgumentExtractOutput {
private String interfaceName;
private String methodName;
private List<String> methodArgumentSignatures;
}
// 接口
public interface RequestArgumentExtractor {
RequestArgumentExtractOutput extract(RequestArgumentExtractInput input);
}
簡單實現一下,解析結果添加到緩存中,實現類DefaultRequestArgumentExtractor
代碼如下:
public class DefaultRequestArgumentExtractor implements RequestArgumentExtractor {
private final ConcurrentMap<CacheKey, RequestArgumentExtractOutput> cache = Maps.newConcurrentMap();
@Override
public RequestArgumentExtractOutput extract(RequestArgumentExtractInput input) {
Class<?> interfaceKlass = input.getInterfaceKlass();
Method method = input.getMethod();
String methodName = method.getName();
Class<?>[] parameterTypes = method.getParameterTypes();
return cache.computeIfAbsent(new CacheKey(interfaceKlass.getName(), methodName,
Lists.newArrayList(parameterTypes)), x -> {
RequestArgumentExtractOutput output = new RequestArgumentExtractOutput();
output.setInterfaceName(interfaceKlass.getName());
List<String> methodArgumentSignatures = Lists.newArrayList();
for (Class<?> klass : parameterTypes) {
methodArgumentSignatures.add(klass.getName());
}
output.setMethodArgumentSignatures(methodArgumentSignatures);
output.setMethodName(methodName);
return output;
});
}
@RequiredArgsConstructor
private static class CacheKey {
private final String interfaceName;
private final String methodName;
private final List<Class<?>> parameterTypes;
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
CacheKey cacheKey = (CacheKey) o;
return Objects.equals(interfaceName, cacheKey.interfaceName) &&
Objects.equals(methodName, cacheKey.methodName) &&
Objects.equals(parameterTypes, cacheKey.parameterTypes);
}
@Override
public int hashCode() {
return Objects.hash(interfaceName, methodName, parameterTypes);
}
}
}
在不考慮重連、斷連等情況下,新增一個類ClientChannelHolder
用於保存Netty
客戶端的Channel
實例:
public class ClientChannelHolder {
public static final AtomicReference<Channel> CHANNEL_REFERENCE = new AtomicReference<>();
}
接着新增一個契約動態代理工廠(工具類)ContractProxyFactory
,用於為契約接口生成代理類實例:
public class ContractProxyFactory {
private static final RequestArgumentExtractor EXTRACTOR = new DefaultRequestArgumentExtractor();
private static final ConcurrentMap<Class<?>, Object> CACHE = Maps.newConcurrentMap();
@SuppressWarnings("unchecked")
public static <T> T ofProxy(Class<T> interfaceKlass) {
// 緩存契約接口的代理類實例
return (T) CACHE.computeIfAbsent(interfaceKlass, x ->
Proxy.newProxyInstance(interfaceKlass.getClassLoader(), new Class[]{interfaceKlass}, (target, method, args) -> {
RequestArgumentExtractInput input = new RequestArgumentExtractInput();
input.setInterfaceKlass(interfaceKlass);
input.setMethod(method);
RequestArgumentExtractOutput output = EXTRACTOR.extract(input);
// 封裝請求參數
RequestMessagePacket packet = new RequestMessagePacket();
packet.setMagicNumber(ProtocolConstant.MAGIC_NUMBER);
packet.setVersion(ProtocolConstant.VERSION);
packet.setSerialNumber(SerialNumberUtils.X.generateSerialNumber());
packet.setMessageType(MessageType.REQUEST);
packet.setInterfaceName(output.getInterfaceName());
packet.setMethodName(output.getMethodName());
packet.setMethodArgumentSignatures(output.getMethodArgumentSignatures().toArray(new String[0]));
packet.setMethodArguments(args);
Channel channel = ClientChannelHolder.CHANNEL_REFERENCE.get();
// 發起請求
channel.writeAndFlush(packet);
// 這里方法返回值需要進行同步處理,相對復雜,后面專門開一篇文章講解,暫時統一返回字符串
// 如果契約接口的返回值類型不是字符串,這里方法返回后會拋出異常
return String.format("[%s#%s]調用成功,發送了[%s]到NettyServer[%s]", output.getInterfaceName(),
output.getMethodName(), JSON.toJSONString(packet), channel.remoteAddress());
}));
}
}
最后編寫客戶端ClientApplication
的代碼:
@Slf4j
public class ClientApplication {
public static void main(String[] args) throws Exception {
int port = 9092;
EventLoopGroup workerGroup = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
try {
bootstrap.group(workerGroup);
bootstrap.channel(NioSocketChannel.class);
bootstrap.option(ChannelOption.SO_KEEPALIVE, Boolean.TRUE);
bootstrap.option(ChannelOption.TCP_NODELAY, Boolean.TRUE);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, 0, 4));
ch.pipeline().addLast(new LengthFieldPrepender(4));
ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
ch.pipeline().addLast(new RequestMessagePacketEncoder(FastJsonSerializer.X));
ch.pipeline().addLast(new ResponseMessagePacketDecoder());
ch.pipeline().addLast(new SimpleChannelInboundHandler<ResponseMessagePacket>() {
@Override
protected void channelRead0(ChannelHandlerContext ctx, ResponseMessagePacket packet) throws Exception {
Object targetPayload = packet.getPayload();
if (targetPayload instanceof ByteBuf) {
ByteBuf byteBuf = (ByteBuf) targetPayload;
int readableByteLength = byteBuf.readableBytes();
byte[] bytes = new byte[readableByteLength];
byteBuf.readBytes(bytes);
targetPayload = FastJsonSerializer.X.decode(bytes, String.class);
byteBuf.release();
}
packet.setPayload(targetPayload);
log.info("接收到來自服務端的響應消息,消息內容:{}", JSON.toJSONString(packet));
}
});
}
});
ChannelFuture future = bootstrap.connect("localhost", port).sync();
// 保存Channel實例,暫時不考慮斷連重連
ClientChannelHolder.CHANNEL_REFERENCE.set(future.channel());
// 構造契約接口代理類實例
HelloService helloService = ContractProxyFactory.ofProxy(HelloService.class);
String result = helloService.sayHello("throwable");
log.info(result);
future.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
}
}
}
先啟動《基於Netty和SpringBoot實現一個輕量級RPC框架-Server篇》一文中的ServerApplication
,再啟動ClientApplication
,控制台輸出如下:
// 服務端日志
2020-01-16 22:34:51 [main] INFO c.throwable.server.ServerApplication - 啟動NettyServer[9092]成功...
2020-01-16 22:36:35 [nioEventLoopGroup-3-1] INFO club.throwable.server.ServerHandler - 服務端接收到:RequestMessagePacket(interfaceName=club.throwable.contract.HelloService, methodName=sayHello, methodArgumentSignatures=[java.lang.String], methodArguments=[PooledUnsafeDirectByteBuf(ridx: 0, widx: 11, cap: 11/144)])
2020-01-16 22:36:35 [nioEventLoopGroup-3-1] INFO club.throwable.server.ServerHandler - 查找目標實現方法成功,目標類:club.throwable.server.contract.DefaultHelloService,宿主類:club.throwable.server.contract.DefaultHelloService,宿主方法:sayHello
2020-01-16 22:36:35 [nioEventLoopGroup-3-1] INFO club.throwable.server.ServerHandler - 服務端輸出:{"attachments":{},"errorCode":200,"magicNumber":10086,"message":"Success","messageType":"RESPONSE","payload":"\"throwable say hello!\"","serialNumber":"63d386214d30410c9e5f04de03d8b2da","version":1}
// 客戶端日志
2020-01-16 22:36:35 [main] INFO c.throwable.client.ClientApplication - [club.throwable.contract.HelloService#sayHello]調用成功,發送了[{"attachments":{},"interfaceName":"club.throwable.contract.HelloService","magicNumber":10086,"messageType":"REQUEST","methodArgumentSignatures":["java.lang.String"],"methodArguments":["throwable"],"methodName":"sayHello","serialNumber":"63d386214d30410c9e5f04de03d8b2da","version":1}]到NettyServer[localhost/127.0.0.1:9092]
2020-01-16 22:36:35 [nioEventLoopGroup-2-1] INFO c.throwable.client.ClientApplication - 接收到來自服務端的響應消息,消息內容:{"attachments":{},"errorCode":200,"magicNumber":10086,"message":"Success","messageType":"RESPONSE","payload":"\"throwable say hello!\"","serialNumber":"63d386214d30410c9e5f04de03d8b2da","version":1}
小結
Client
端主要負責契約接口調用轉換為發送RPC
協議請求這一步,核心技術就是動態代理,在不進行模塊封裝優化的前提下實現是相對簡單的。這里其實Client
端還有一個比較大的技術難題沒有解決,上面例子中客戶端日志輸出如果眼尖的伙伴會發現,Client
端發送RPC
請求的線程(main
線程)和Client
端接收Server
端RPC
響應處理的線程(nioEventLoopGroup-2-1
線程)並不相同,這一點是Netty
處理網絡請求之所以能夠如此高效的根源(簡單來說就是請求和響應是異步的,兩個流程本來是互不感知的)。但是更多情況下,我們希望外部請求是同步的,希望發送RPC
請求的線程得到響應結果再返回(這里請求和響應有可能依然是異步流程)。下一篇文章會詳細分析一下如果對請求-響應做同步化處理。
Demo
項目地址:
(c-2-d e-a-20200116)
技術公眾號(《Throwable文摘》),不定期推送筆者原創技術文章(絕不抄襲或者轉載):
娛樂公眾號(《天天沙雕》),甄選奇趣沙雕圖文和視頻不定期推送,緩解生活工作壓力: