基于netty实现rpc框架-spring boot客户端


上篇讲了RPC服务端的实现。原理就是解析netty通道数据拿到类、方法及入参等信息,然后通过java反射机制调用本地接口返回结果。没有用到很复杂的技术。

这篇我们将客户端的实现。说白了客户端的任务很简单:一是建立socket长连接。二是封装发送服务端需要的数据包。三是处理返回结果。

demo地址

https://gitee.com/syher/grave-netty

RPC实现

同样定义注解扫描service接口。

1
2
3
4
5
6
7
8
9
10
@Retention (RetentionPolicy.RUNTIME)
@Target ({ElementType.TYPE})
@Documented
@Import ({NettyClientScannerRegistrar. class , NettyClientApplicationContextAware. class })
public  @interface  NettyClientScan {
 
     String[] basePackages();
 
     Class<?  extends  NettyFactoryBean> factoryBean()  default  NettyFactoryBean. class ;
}

  

该注解用于spring boot启动类上,参数basePackages指定接口所在的包路径。

1
2
3
4
5
6
7
8
9
10
11
@SpringBootApplication
@NettyClientScan (basePackages = {
         "com.braska.grave.netty.api.service"
})
public  class  GraveNettyClientApplication {
 
     public  static  void  main(String[] args) {
         SpringApplication.run(GraveNettyClientApplication. class , args);
     }
 
}

  

NettyServerScannerRegistrar类注册bean。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public  class  NettyClientScannerRegistrar  implements  ImportBeanDefinitionRegistrar, ResourceLoaderAware {
 
     @Override
     public  void  registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {
         // spring bean注册
         NettyClientInterfaceScanner scanner =  new  NettyClientInterfaceScanner(registry);
 
         AnnotationAttributes annoAttrs =
                 AnnotationAttributes.fromMap(importingClassMetadata.getAnnotationAttributes(NettyClientScan. class .getName()));
 
         Class<?  extends  NettyFactoryBean> nettyFactoryBeanClass = annoAttrs.getClass( "factoryBean" );
         if  (!NettyFactoryBean. class .equals(nettyFactoryBeanClass)) {
             scanner.setNettyFactoryBean(BeanUtils.instantiateClass(nettyFactoryBeanClass));
         }
 
         List<String> basePackages =  new  ArrayList<String>();
         for  (String pkg : annoAttrs.getStringArray( "basePackages" )) {
             if  (StringUtils.hasText(pkg)) {
                 basePackages.add(pkg);
             }
         }
 
         scanner.doScan(StringUtils.toStringArray(basePackages));
     }
}

  

NettyClientInterfaceScanner类使用jdk动态代理basePackages路径下的接口。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public  class  NettyClientInterfaceScanner  extends  ClassPathBeanDefinitionScanner {
     private  NettyFactoryBean nettyFactoryBean =  new  NettyFactoryBean();
 
 
     @Override
     public  Set<BeanDefinitionHolder> doScan(String... basePackages) {
         Set<BeanDefinitionHolder> beanDefinitions =  super .doScan(basePackages);
 
         if  (beanDefinitions.isEmpty()) {
         else  {
             processBeanDefinitions(beanDefinitions);
         }
 
         return  beanDefinitions;
     }
 
     private  void  processBeanDefinitions(
             Set<BeanDefinitionHolder> beanDefinitions) {
 
         GenericBeanDefinition definition;
 
         for  (BeanDefinitionHolder holder : beanDefinitions) {
 
             definition = (GenericBeanDefinition) holder.getBeanDefinition();
             // 为对象属性赋值(这一块我也还不太明白)
       definition.getConstructorArgumentValues().addGenericArgumentValue(definition.getBeanClassName());
             // 这里的nettyFactoryBean是生成Bean实例的工厂,不是Bean本身
             definition.setBeanClass( this .nettyFactoryBean.getClass());
 
             definition.setAutowireMode(AbstractBeanDefinition.AUTOWIRE_BY_TYPE);
         }
     }
}

  

NettyFactoryBean 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public  class  NettyFactoryBean<T>  implements  FactoryBean<T> {
     private  Class<T> nettyInterface;
 
     public  NettyFactoryBean() {}
 
     public  NettyFactoryBean(Class<T> nettyInterface) {
         this .nettyInterface = nettyInterface;
     }
 
     @Override
     public  T getObject()  throws  Exception {
         // 通过jdk动态代理创建实例
         return  (T) Proxy.newProxyInstance(nettyInterface.getClassLoader(),  new  Class[]{nettyInterface}, c.getInstance());
     }
 
     @Override
     public  Class<?> getObjectType() {
         return  this .nettyInterface;
     }
 
     @Override
     public  boolean  isSingleton() {
         return  true ;
     }
}

  

关键来了,NettyInterfaceInvoker类负责数据包封装及发送。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
public  class  NettyInterfaceInvoker  implements  InvocationHandler {
 
     private  RequestSender sender;
     // 静态内部类做单例模式
     private  static  class  SINGLETON {
         private  static  final  NettyInterfaceInvoker invoker =  new  NettyInterfaceInvoker();
 
         private  static  NettyInterfaceInvoker setSender(RequestSender sender) {
             invoker.sender = sender;
             return  invoker;
         }
     }
 
     public  static  NettyInterfaceInvoker getInstance() {
         return  SINGLETON.invoker;
     }
 
     public  static  NettyInterfaceInvoker setSender(RequestSender sender) {
         return  SINGLETON.setSender(sender);
     }
 
     @Override
     public  Object invoke(Object proxy, Method method, Object[] args)  throws  Throwable {
         // 数据包封装,包含类名、方法名及参数等信息。
         Request request =  new  Request();
         request.setClassName(method.getDeclaringClass().getName());
         request.setMethodName(method.getName());
         request.setParameters(args);
         request.setParameterTypes(method.getParameterTypes());
         request.setId(UUID.randomUUID().toString());
         // 数据发送
         Object result = sender.send(request);
         Class<?> returnType = method.getReturnType();
         // 处理返回数据
         Response response = JSON.parseObject(result.toString(), Response. class );
         if  (response.getCode() ==  1 ) {
             throw  new  Exception(response.getError());
         }
         if  (returnType.isPrimitive() || String. class .isAssignableFrom(returnType)) {
             return  response.getData();
         else  if  (Collection. class .isAssignableFrom(returnType)) {
             return  JSONArray.parseArray(response.getData().toString(), Object. class );
         else  if  (Map. class .isAssignableFrom(returnType)) {
             return  JSON.parseObject(response.getData().toString(), Map. class );
         else  {
             Object data = response.getData();
             return  JSONObject.parseObject(data.toString(), returnType);
         }
     }
}

  

接着我们来看看RequestSender怎么处理数据的。

1
2
3
4
5
public  interface  RequestSender {
     Channel connect(SocketAddress address)  throws  InterruptedException;
 
     Object send(Request request)  throws  InterruptedException;
}

  

RequestSender本身只是一个接口。他的实现类有:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
public  class  NettyClientApplicationContextAware  extends  ChannelInitializer<SocketChannel>
         implements  RequestSender, ApplicationContextAware, InitializingBean {
     private  static  final  Logger logger = Logger.getLogger(NettyClientApplicationContextAware. class .getName());
 
     private  String remoteAddress;
     private  Bootstrap bootstrap;
     private  EventLoopGroup group;
     private  NettyChannelManager manager;
     private  NettyClientHandler handler;
 
     @Override
     public  void  setApplicationContext(ApplicationContext applicationContext)  throws  BeansException {
         this .remoteAddress = applicationContext.getEnvironment().getProperty( "remoteAddress" );
         this .bootstrap =  new  Bootstrap();
         this .group =  new  NioEventLoopGroup( 1 );
         this .bootstrap.group(group).
                 channel(NioSocketChannel. class ).
                 option(ChannelOption.TCP_NODELAY,  true ).
                 option(ChannelOption.SO_KEEPALIVE,  true ).
                 handler( this );
         this .manager =  new  NettyChannelManager( this );
         this .handler =  new  NettyClientHandler(manager, remoteAddress);
     }
 
     @Override
     public  void  afterPropertiesSet()  throws  Exception {
         // socket连接入口。
         this .manager.refresh(Lists.newArrayList(remoteAddress));
     }
 
     @Override
     public  Object send(Request request)  throws  InterruptedException {
         Channel channel = manager.take();
         if  (channel !=  null  && channel.isActive()) {
             SynchronousQueue<Object> queue =  this .handler.sendRequest(request, channel);
             Object result = queue.take();
             return  JSONArray.toJSONString(result);
         else  {
             Response res =  new  Response();
             res.setCode( 1 );
             res.setError( "未正确连接到服务器.请检查相关配置信息!" );
             return  JSONArray.toJSONString(res);
         }
     }
 
     @Override
     protected  void  initChannel(SocketChannel channel)  throws  Exception {
         ChannelPipeline pipeline = channel.pipeline();
         pipeline.addLast( new  IdleStateHandler( 0 0 30 ));
         pipeline.addLast( new  JSONEncoder());
         pipeline.addLast( new  JSONDecoder());
         // 管道处理器
         pipeline.addLast( this .handler);
     }
 
     @Override
     public  Channel connect(SocketAddress address)  throws  InterruptedException {
         ChannelFuture future = bootstrap.connect(address);
         // 建立长连接,提供失败重连。
         future.addListener( new  ConnectionListener( this .manager,  this .remoteAddress));
         Channel channel = future.channel(); //future.sync().channel();
         return  channel;
     }
 
     public  void  destroy() {
         this .group.shutdownGracefully();
     }
}

  

NettyClientHandler类处理管道事件。与服务端不通,这个管道处理器是继承ChannelInboundHandlerAdapter类。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
@ChannelHandler .Sharable
public  class  NettyClientHandler  extends  ChannelInboundHandlerAdapter {
     private  static  final  Logger logger = Logger.getLogger(NettyServerHandler. class .getName());
 
     private  ConcurrentHashMap<String, SynchronousQueue<Object>> queueMap =  new  ConcurrentHashMap<>();
     private  NettyChannelManager manager;
     private  String remoteAddress;
 
     public  NettyClientHandler(NettyChannelManager manager, String remoteAddress) {
         this .manager = manager;
         this .remoteAddress = remoteAddress;
     }
 
     @Override
     public  void  channelInactive(ChannelHandlerContext ctx) {
         InetSocketAddress address = (InetSocketAddress) ctx.channel().remoteAddress();
         logger.info( "与netty服务器断开连接."  + address);
         ctx.channel().close();
         manager.remove(ctx.channel());
         // 掉线重连
         final  EventLoop eventLoop = ctx.channel().eventLoop();
         eventLoop.schedule(() -> {
             manager.refresh(Lists.newArrayList(remoteAddress));
         }, 1L, TimeUnit.SECONDS);
     }
 
     @Override
     public  void  channelRead(ChannelHandlerContext ctx, Object msg)  throws  Exception {
         // 处理服务端返回的数据
         Response response = JSON.parseObject(msg.toString(), Response. class );
         String requestId = response.getRequestId();
         SynchronousQueue<Object> queue = queueMap.get(requestId);
         queue.put(response);
         queueMap.remove(requestId);
     }
 
     public  SynchronousQueue<Object> sendRequest(Request request, Channel channel) {
         // 使用阻塞队列处理客户端请求
         SynchronousQueue<Object> queue =  new  SynchronousQueue<>();
         queueMap.put(request.getId(), queue);
         channel.writeAndFlush(request);
         return  queue;
     }
 
     public  void  userEventTriggered(ChannelHandlerContext ctx, Object evt)  throws  Exception {
         logger.info( "发送心跳消息..." );
         if  (evt  instanceof  IdleStateEvent) {
             IdleStateEvent event = (IdleStateEvent) evt;
             if  (event.state() == IdleState.ALL_IDLE) {
                 Request request =  new  Request();
                 request.setMethodName( "heartBeat" );
                 ctx.channel().writeAndFlush(request);
             }
         else  {
             super .userEventTriggered(ctx, evt);
         }
     }
}

  

这样,RPC的客户端就写好了,其中主要涉及到的关键内容就是netty实例及管道处理器、jdk动态代理、还有一个阻塞队列。

结合上篇RPC服务端。一个完整的RPC框架就搭建完了。

当然,有些地方处理的还是比较粗糙。后续有修改以git代码为准。


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM