可以給你提供思路
也可以讓你學到Netty相關的知識
當然,這只是一種實現方式
需求
看下圖,其實這個項目就是為了做這樣一件事。
有一個公共服務ServerA,它提供了一個名為getUserName的服務。
現在有多個類似ServerB的Web應用服務器。
當客戶想通過ServerB要請求getUserName服務時,由於ServerB服務中因為沒有UserService的實現類,導致不能正常提供服務的問題。
預期結果
可以看到,在Client項目中,UserService沒有實現類,但是返回了正常的結果。
項目結構
整個項目分為三個部分,Server端、Client端以及一個公共jar。
下圖正是整個項目的目錄結構圖。
公共部分
公共部分的存在是因為我將服務器端和客戶端寫在了一個項目中,為了不讓代碼重復警告,所以提出來的一個公共模塊。主要是幾個實體類和一些序列化工具類。
-
MsgPackDecoder
- 該類是一個MsgPack編碼器,主要作用將Object對象序列化成byte數組。
-
MsgPackEncoder
- 該類是一個MsgPack解碼器,主要作用將byte數組反序列化成Object對象。
-
ObjectCodec
-
該類是繼承了MessageToMessageCodec<ByteBuf, Object>。是自定義序列化類主要有兩個方法,encode和decode
@Override protected void encode(ChannelHandlerContext ctx, Object msg, List<Object> out) { // 調用工具類的序列化方法 byte[] data = ObjectSerializerUtils.serilizer(msg); ByteBuf buf = Unpooled.buffer(); buf.writeBytes(data); out.add(buf); } @Override protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) { // 調用工具類的反序列化方法 byte[] bytes = new byte[msg.readableBytes()]; msg.readBytes(bytes); Object deSerilizer = ObjectSerializerUtils.deSerilizer(bytes); out.add(deSerilizer); }
-
-
ObjectSerializerUtils
- 序列化工具類。序列化方法可以自己實現
-
MethodInvokeMeta
- 重點類。這個類是一個實體類。用於在網絡中傳輸的類。主要有5個字段分別記錄了一個接口的類對象,調用接口的方法名,方法的參數列表(包含參數類型,和參數列表),方法的返回值類型。
- 在客戶端中,這個類將調用方所要調用的方法封裝。
- 在服務端中,這個類主要用於服務器反射調用方法。
- 當然,也可以用String來記錄這些元信息。
-
NullWritable
- 這個類主要用於在網絡中傳輸null。當返回值為null時,服務端會返回NullWritable對象。客戶端接收到NullWritable時進行null處理。
-
User
- 實體對象。測試用例
客戶端
上面的目錄結構圖也有提到,客戶端中只有UserService接口,所以客戶端中如果不做處理是不能正常運行的。
客戶端中核心類有以下7個,其中與Netty相關的核心類與服務端一樣有3個
- ClientChannelHandlerAdapter
- CustomChannelInitializerClient
- NettyClient
- RpcProxyFactoryBean
- NettyBeanScanner
- PackageClassUtils
- WrapMethodUtils
NettyClient
這個類是Netty客戶端的啟動類,這個類中與Netty服務端進行通信
CustomChannelInitializerClient
這個類是用於初始化管道事件的類。主要添加了TCP粘包問題解決方案和自定義編解碼器工具
ClientChannelHandlerAdapter
這個類是Netty客戶端的處理類,主要通過這個類將調用信息寫給Netty服務端。
NettyBeanScanner
這個類實現了BeanFactoryPostProcessor接口。BeanFactoryPostProcessor是Spring初始化Bean時對外暴露的擴展點。
Spring IoC容器允許BeanFactoryPostProcessor在容器實例化任何bean之前讀取bean的定義(配置元數據),並可以修改它。同時可以定義多個BeanFactoryPostProcessor,通過設置'order'屬性來確定各個BeanFactoryPostProcessor執行順序。
在postProcessBeanFactory方法中,調用PackageClassUtils.resolver方法,將UserService.class注冊到SpringBean工廠。
/**
* 注冊Bean到Spring的bean工廠
*/
@Override
public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {
this.beanFactory = (DefaultListableBeanFactory) beanFactory;
// 加載遠程服務的接口
List<String> resolverClass = PackageClassUtils.resolver(basePackage);
for (String clazz : resolverClass) {
String simpleName;
if (clazz.lastIndexOf('.') != -1) {
simpleName = clazz.substring(clazz.lastIndexOf('.') + 1);
} else {
simpleName = clazz;
}
BeanDefinitionBuilder gd = BeanDefinitionBuilder.genericBeanDefinition(RpcProxyFactoryBean.class);
gd.addPropertyValue("interfaceClass", clazz);
gd.addPropertyReference("nettyClient", clientName);
this.beanFactory.registerBeanDefinition(simpleName, gd.getRawBeanDefinition());
}
}
RpcProxyFactoryBean
重點類,這個類繼承了AbstractFactoryBean
使用jdk動態代理的方式創建代理對象。
Proxy.newProxyInstance(interfaceClass.getClassLoader(), new Class[]{interfaceClass}, this);
在invoke方法中,
調用WrapMethodUtils工具類中的方法,將代理對象方法封裝成MethodInvokeMeta對象。
然后通過NettyClient傳輸給NettyServer端,進行RPC調用,並將結果返回。
至此,客戶端的核心類介紹完畢。
服務端
服務端主要的核心類有三個
- ServerChannelHandlerAdapter
- RequestDispatcher
- NettyServer
NettyServer
這個類主要有兩個方法,一個是啟動Netty服務的方法,一個是關閉服務器的方法。核心代碼如下:
/**
* 啟動netty服務的方法
*/
public void start() {
// 服務器監聽端口號
int port = serverConfig.getPort();
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
// BACKLOG用於構造服務端套接字ServerSocket對象,
// 標識當服務器請求處理線程全滿時,用於臨時存放已完成三次握手的請求的隊列的最大長度。
// 如果未設置或所設置的值小於1,Java將使用默認值50
.option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO));
try {
// 設置事件處理
serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
// 自定義長度解碼器解決TCP黏包問題
// maxFrameLength 最大包字節大小,超出報異常
// lengthFieldOffset 長度字段的偏差
// lengthFieldLength 長度字段占的字節數
// lengthAdjustment 添加到長度字段的補償值
// initialBytesToStrip 從解碼幀中第一次去除的字節數
pipeline.addLast(new LengthFieldBasedFrameDecoder(65535
, 0, 2, 0, 2));
// LengthFieldPrepender編碼器,它可以計算當前待發送消息的二進制字節長度,將該長度添加到ByteBuf的緩沖區頭中
pipeline.addLast(new LengthFieldPrepender(2));
// 序列化工具
pipeline.addLast(new ObjectCodec());
pipeline.addLast(channelHandlerAdapter);
}
});
ChannelFuture f = serverBootstrap.bind(port).sync();
f.channel().closeFuture().sync();
} catch (InterruptedException e) {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
ServerChannelHandlerAdapter
這個類主要重寫了ChannelRead方法,在ChannelRead方法中調用了RequestDispatcher類中的dispatcher方法來處理消息。
RequestDispatcher
這個類的作用為,將ChannelRead方法中讀到的數據(也可以說命令),通過反射來調用,執行對應方法,將執行后的結果寫回通道,供客戶端使用。
/**
* 分發命令
*
* @param channelHandlerContext channelHandlerContext
* @param invokeMeta invokeMeta
*/
public void dispatcher(final ChannelHandlerContext channelHandlerContext, final MethodInvokeMeta invokeMeta) {
ChannelFuture f = null;
try {
// 獲取客戶端准備調用的接口類
Class<?> interfaceClass = invokeMeta.getInterfaceClass();
// 獲取准備調用的方法名稱
String name = invokeMeta.getMethodName();
// 獲取方法對應的參數列表
Object[] args = invokeMeta.getArgs();
// 獲取參數類型
Class<?>[] parameterTypes = invokeMeta.getParameterTypes();
// 通過Spring獲取對象
Object targetObject = app.getBean(interfaceClass);
// 獲取待調用方法
Method method = targetObject.getClass().getMethod(name, parameterTypes);
// 執行方法
Object obj = method.invoke(targetObject, args);
if (obj == null) {
// 如果方法結果為空,將NULL結果寫給客戶端
f = channelHandlerContext.writeAndFlush(NullWritable.nullWritable());
} else {
// 寫給客戶端結果
f = channelHandlerContext.writeAndFlush(obj);
}
// 釋放通道,不是關閉連接
f.addListener(ChannelFutureListener.CLOSE);
} catch (Exception e) {
// 出現異常后的處理
f = channelHandlerContext.writeAndFlush(e.getMessage());
} finally {
if (f != null) {
f.addListener(ChannelFutureListener.CLOSE);
}
}
}
使用方法
- 啟動ServerApplication
- 啟動ClientApplication
- 打開Chrome瀏覽器,輸入:http://localhost:8080/client/get/name 或者 http://localhost:8080/client/get/info 即可看到結果。
如果想整合到現有項目中,請直接留言或者聯系作者,此次並沒有提供集成版本,但如果此篇文章已理解,那么自己可以手動的去集成到自己的項目中。基於Netty的RPC簡易實現
注:本文著作權歸作者,由demo大師代發,拒絕轉載,轉載需要作者授權