基於netty實現rpc框架-spring boot客戶端


上篇講了RPC服務端的實現。原理就是解析netty通道數據拿到類、方法及入參等信息,然后通過java反射機制調用本地接口返回結果。沒有用到很復雜的技術。

這篇我們將客戶端的實現。說白了客戶端的任務很簡單:一是建立socket長連接。二是封裝發送服務端需要的數據包。三是處理返回結果。

demo地址

https://gitee.com/syher/grave-netty

RPC實現

同樣定義注解掃描service接口。

1
2
3
4
5
6
7
8
9
10
@Retention (RetentionPolicy.RUNTIME)
@Target ({ElementType.TYPE})
@Documented
@Import ({NettyClientScannerRegistrar. class , NettyClientApplicationContextAware. class })
public  @interface  NettyClientScan {
 
     String[] basePackages();
 
     Class<?  extends  NettyFactoryBean> factoryBean()  default  NettyFactoryBean. class ;
}

  

該注解用於spring boot啟動類上,參數basePackages指定接口所在的包路徑。

1
2
3
4
5
6
7
8
9
10
11
@SpringBootApplication
@NettyClientScan (basePackages = {
         "com.braska.grave.netty.api.service"
})
public  class  GraveNettyClientApplication {
 
     public  static  void  main(String[] args) {
         SpringApplication.run(GraveNettyClientApplication. class , args);
     }
 
}

  

NettyServerScannerRegistrar類注冊bean。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public  class  NettyClientScannerRegistrar  implements  ImportBeanDefinitionRegistrar, ResourceLoaderAware {
 
     @Override
     public  void  registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {
         // spring bean注冊
         NettyClientInterfaceScanner scanner =  new  NettyClientInterfaceScanner(registry);
 
         AnnotationAttributes annoAttrs =
                 AnnotationAttributes.fromMap(importingClassMetadata.getAnnotationAttributes(NettyClientScan. class .getName()));
 
         Class<?  extends  NettyFactoryBean> nettyFactoryBeanClass = annoAttrs.getClass( "factoryBean" );
         if  (!NettyFactoryBean. class .equals(nettyFactoryBeanClass)) {
             scanner.setNettyFactoryBean(BeanUtils.instantiateClass(nettyFactoryBeanClass));
         }
 
         List<String> basePackages =  new  ArrayList<String>();
         for  (String pkg : annoAttrs.getStringArray( "basePackages" )) {
             if  (StringUtils.hasText(pkg)) {
                 basePackages.add(pkg);
             }
         }
 
         scanner.doScan(StringUtils.toStringArray(basePackages));
     }
}

  

NettyClientInterfaceScanner類使用jdk動態代理basePackages路徑下的接口。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public  class  NettyClientInterfaceScanner  extends  ClassPathBeanDefinitionScanner {
     private  NettyFactoryBean nettyFactoryBean =  new  NettyFactoryBean();
 
 
     @Override
     public  Set<BeanDefinitionHolder> doScan(String... basePackages) {
         Set<BeanDefinitionHolder> beanDefinitions =  super .doScan(basePackages);
 
         if  (beanDefinitions.isEmpty()) {
         else  {
             processBeanDefinitions(beanDefinitions);
         }
 
         return  beanDefinitions;
     }
 
     private  void  processBeanDefinitions(
             Set<BeanDefinitionHolder> beanDefinitions) {
 
         GenericBeanDefinition definition;
 
         for  (BeanDefinitionHolder holder : beanDefinitions) {
 
             definition = (GenericBeanDefinition) holder.getBeanDefinition();
             // 為對象屬性賦值(這一塊我也還不太明白)
       definition.getConstructorArgumentValues().addGenericArgumentValue(definition.getBeanClassName());
             // 這里的nettyFactoryBean是生成Bean實例的工廠,不是Bean本身
             definition.setBeanClass( this .nettyFactoryBean.getClass());
 
             definition.setAutowireMode(AbstractBeanDefinition.AUTOWIRE_BY_TYPE);
         }
     }
}

  

NettyFactoryBean 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public  class  NettyFactoryBean<T>  implements  FactoryBean<T> {
     private  Class<T> nettyInterface;
 
     public  NettyFactoryBean() {}
 
     public  NettyFactoryBean(Class<T> nettyInterface) {
         this .nettyInterface = nettyInterface;
     }
 
     @Override
     public  T getObject()  throws  Exception {
         // 通過jdk動態代理創建實例
         return  (T) Proxy.newProxyInstance(nettyInterface.getClassLoader(),  new  Class[]{nettyInterface}, c.getInstance());
     }
 
     @Override
     public  Class<?> getObjectType() {
         return  this .nettyInterface;
     }
 
     @Override
     public  boolean  isSingleton() {
         return  true ;
     }
}

  

關鍵來了,NettyInterfaceInvoker類負責數據包封裝及發送。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
public  class  NettyInterfaceInvoker  implements  InvocationHandler {
 
     private  RequestSender sender;
     // 靜態內部類做單例模式
     private  static  class  SINGLETON {
         private  static  final  NettyInterfaceInvoker invoker =  new  NettyInterfaceInvoker();
 
         private  static  NettyInterfaceInvoker setSender(RequestSender sender) {
             invoker.sender = sender;
             return  invoker;
         }
     }
 
     public  static  NettyInterfaceInvoker getInstance() {
         return  SINGLETON.invoker;
     }
 
     public  static  NettyInterfaceInvoker setSender(RequestSender sender) {
         return  SINGLETON.setSender(sender);
     }
 
     @Override
     public  Object invoke(Object proxy, Method method, Object[] args)  throws  Throwable {
         // 數據包封裝,包含類名、方法名及參數等信息。
         Request request =  new  Request();
         request.setClassName(method.getDeclaringClass().getName());
         request.setMethodName(method.getName());
         request.setParameters(args);
         request.setParameterTypes(method.getParameterTypes());
         request.setId(UUID.randomUUID().toString());
         // 數據發送
         Object result = sender.send(request);
         Class<?> returnType = method.getReturnType();
         // 處理返回數據
         Response response = JSON.parseObject(result.toString(), Response. class );
         if  (response.getCode() ==  1 ) {
             throw  new  Exception(response.getError());
         }
         if  (returnType.isPrimitive() || String. class .isAssignableFrom(returnType)) {
             return  response.getData();
         else  if  (Collection. class .isAssignableFrom(returnType)) {
             return  JSONArray.parseArray(response.getData().toString(), Object. class );
         else  if  (Map. class .isAssignableFrom(returnType)) {
             return  JSON.parseObject(response.getData().toString(), Map. class );
         else  {
             Object data = response.getData();
             return  JSONObject.parseObject(data.toString(), returnType);
         }
     }
}

  

接着我們來看看RequestSender怎么處理數據的。

1
2
3
4
5
public  interface  RequestSender {
     Channel connect(SocketAddress address)  throws  InterruptedException;
 
     Object send(Request request)  throws  InterruptedException;
}

  

RequestSender本身只是一個接口。他的實現類有:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
public  class  NettyClientApplicationContextAware  extends  ChannelInitializer<SocketChannel>
         implements  RequestSender, ApplicationContextAware, InitializingBean {
     private  static  final  Logger logger = Logger.getLogger(NettyClientApplicationContextAware. class .getName());
 
     private  String remoteAddress;
     private  Bootstrap bootstrap;
     private  EventLoopGroup group;
     private  NettyChannelManager manager;
     private  NettyClientHandler handler;
 
     @Override
     public  void  setApplicationContext(ApplicationContext applicationContext)  throws  BeansException {
         this .remoteAddress = applicationContext.getEnvironment().getProperty( "remoteAddress" );
         this .bootstrap =  new  Bootstrap();
         this .group =  new  NioEventLoopGroup( 1 );
         this .bootstrap.group(group).
                 channel(NioSocketChannel. class ).
                 option(ChannelOption.TCP_NODELAY,  true ).
                 option(ChannelOption.SO_KEEPALIVE,  true ).
                 handler( this );
         this .manager =  new  NettyChannelManager( this );
         this .handler =  new  NettyClientHandler(manager, remoteAddress);
     }
 
     @Override
     public  void  afterPropertiesSet()  throws  Exception {
         // socket連接入口。
         this .manager.refresh(Lists.newArrayList(remoteAddress));
     }
 
     @Override
     public  Object send(Request request)  throws  InterruptedException {
         Channel channel = manager.take();
         if  (channel !=  null  && channel.isActive()) {
             SynchronousQueue<Object> queue =  this .handler.sendRequest(request, channel);
             Object result = queue.take();
             return  JSONArray.toJSONString(result);
         else  {
             Response res =  new  Response();
             res.setCode( 1 );
             res.setError( "未正確連接到服務器.請檢查相關配置信息!" );
             return  JSONArray.toJSONString(res);
         }
     }
 
     @Override
     protected  void  initChannel(SocketChannel channel)  throws  Exception {
         ChannelPipeline pipeline = channel.pipeline();
         pipeline.addLast( new  IdleStateHandler( 0 0 30 ));
         pipeline.addLast( new  JSONEncoder());
         pipeline.addLast( new  JSONDecoder());
         // 管道處理器
         pipeline.addLast( this .handler);
     }
 
     @Override
     public  Channel connect(SocketAddress address)  throws  InterruptedException {
         ChannelFuture future = bootstrap.connect(address);
         // 建立長連接,提供失敗重連。
         future.addListener( new  ConnectionListener( this .manager,  this .remoteAddress));
         Channel channel = future.channel(); //future.sync().channel();
         return  channel;
     }
 
     public  void  destroy() {
         this .group.shutdownGracefully();
     }
}

  

NettyClientHandler類處理管道事件。與服務端不通,這個管道處理器是繼承ChannelInboundHandlerAdapter類。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
@ChannelHandler .Sharable
public  class  NettyClientHandler  extends  ChannelInboundHandlerAdapter {
     private  static  final  Logger logger = Logger.getLogger(NettyServerHandler. class .getName());
 
     private  ConcurrentHashMap<String, SynchronousQueue<Object>> queueMap =  new  ConcurrentHashMap<>();
     private  NettyChannelManager manager;
     private  String remoteAddress;
 
     public  NettyClientHandler(NettyChannelManager manager, String remoteAddress) {
         this .manager = manager;
         this .remoteAddress = remoteAddress;
     }
 
     @Override
     public  void  channelInactive(ChannelHandlerContext ctx) {
         InetSocketAddress address = (InetSocketAddress) ctx.channel().remoteAddress();
         logger.info( "與netty服務器斷開連接."  + address);
         ctx.channel().close();
         manager.remove(ctx.channel());
         // 掉線重連
         final  EventLoop eventLoop = ctx.channel().eventLoop();
         eventLoop.schedule(() -> {
             manager.refresh(Lists.newArrayList(remoteAddress));
         }, 1L, TimeUnit.SECONDS);
     }
 
     @Override
     public  void  channelRead(ChannelHandlerContext ctx, Object msg)  throws  Exception {
         // 處理服務端返回的數據
         Response response = JSON.parseObject(msg.toString(), Response. class );
         String requestId = response.getRequestId();
         SynchronousQueue<Object> queue = queueMap.get(requestId);
         queue.put(response);
         queueMap.remove(requestId);
     }
 
     public  SynchronousQueue<Object> sendRequest(Request request, Channel channel) {
         // 使用阻塞隊列處理客戶端請求
         SynchronousQueue<Object> queue =  new  SynchronousQueue<>();
         queueMap.put(request.getId(), queue);
         channel.writeAndFlush(request);
         return  queue;
     }
 
     public  void  userEventTriggered(ChannelHandlerContext ctx, Object evt)  throws  Exception {
         logger.info( "發送心跳消息..." );
         if  (evt  instanceof  IdleStateEvent) {
             IdleStateEvent event = (IdleStateEvent) evt;
             if  (event.state() == IdleState.ALL_IDLE) {
                 Request request =  new  Request();
                 request.setMethodName( "heartBeat" );
                 ctx.channel().writeAndFlush(request);
             }
         else  {
             super .userEventTriggered(ctx, evt);
         }
     }
}

  

這樣,RPC的客戶端就寫好了,其中主要涉及到的關鍵內容就是netty實例及管道處理器、jdk動態代理、還有一個阻塞隊列。

結合上篇RPC服務端。一個完整的RPC框架就搭建完了。

當然,有些地方處理的還是比較粗糙。后續有修改以git代碼為准。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM