基於netty實現rpc框架-spring boot服務端


demo地址

https://gitee.com/syher/grave-netty

 

RPC介紹

首先了解一下RPC:遠程過程調用。簡單點說就是本地應用可以調用遠程服務器的接口。那么通過什么方式調用遠程接口呢?說白了RPC只是一種概念。他的調用可以基於HTTP實現,也可以基於TCP/IP實現。甚至私人定制的通訊協議。

當然,私人定制通訊協議成本過高且不具備通用性。我們不做展開討論(其實我也展不開。。。)。那為什么不使用HTTP協議呢?受限於HTTP協議層級過高,數據傳輸效率不如TCP/IP。所以RPC遠程調用一般采用TCP/IP實現。即調用socket方法。

 

RPC實現原理

1. 客戶端發起遠程服務調用。

2. 客戶端將類信息、調用方法和入參信息通過socket通道發送給服務端。

3. 服務端解析數據包,調用本地接口。

5.將執行結果通過socket返回給客戶端。

6.客戶端拿到並解析返回結果。

 

RPC實現

java如何實現一個rpc框架,其實就是按照上面的原理再做一些詳細的補充。比如通過動態代理封裝客戶端的數據包、通過反射機制實現服務端實現類的調用等等。

今天,我們先基於spring boot + netty 做rpc服務端的實現。

 

首先,做一個注解用於標識接口提供rpc調用。

1
2
3
4
5
@Target (ElementType.TYPE)
@Retention (RetentionPolicy.RUNTIME)
public  @interface  Service {
     String name()  default  "" ;
}

  

該注解用於提供服務的實現類上。

1
2
3
4
public  interface  INettyService {
 
     String getString();
}

  

其實現類:

1
2
3
4
5
6
7
8
9
package  com.braska.grave.netty.server.service;
 
@Service  // 該注解為自定義rpc服務注解
public  class  NettyService  implements  INettyService {
     @Override
     public  String getString() {
         return  "welcome to use netty rpc." ;
     }
}

  

接着,定義一個注解用來掃描指定包名下的Service注解。

1
2
3
4
5
6
7
8
@Retention (RetentionPolicy.RUNTIME)
@Target ({ElementType.TYPE})
@Documented
@Import ({NettyServerScannerRegistrar. class , NettyServerApplicationContextAware. class })
public  @interface  NettyServerScan {
 
     String[] basePackages();
}

  

該注解用於spring boot啟動類上,參數basePackages指定服務所在的包路徑。

1
2
3
4
5
6
7
8
9
10
11
@SpringBootApplication
@NettyServerScan (basePackages = {
         "com.braska.grave.netty.server.service"
})
public  class  GraveNettyServerApplication {
 
     public  static  void  main(String[] args) {
         SpringApplication.run(GraveNettyServerApplication. class , args);
     }
 
}

  

NettyServerScannerRegistrar類處理服務的spring bean注冊。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public  class  NettyServerScannerRegistrar  implements  BeanFactoryAware, ImportBeanDefinitionRegistrar, ResourceLoaderAware {
     
 
     @Override
     public  void  registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {          // 創建掃描器實例
         NettyServerInterfaceScanner scanner =  new  NettyServerInterfaceScanner(registry);
         if  ( this .resourceLoader !=  null ) {
             scanner.setResourceLoader( this .resourceLoader);
         }
 
         AnnotationAttributes annoAttrs =
                 AnnotationAttributes.fromMap(importingClassMetadata.getAnnotationAttributes(NettyServerScan. class .getName()));
 
         List<String> basePackages =  new  ArrayList<String>();
         for  (String pkg : annoAttrs.getStringArray( "basePackages" )) {
             if  (StringUtils.hasText(pkg)) {
                 basePackages.add(pkg);
             }
         }
      // 只掃描指定的注解。
         scanner.setAnnotationClass(Service. class );
         scanner.registerFilters();      // 將basePackages里面的通過@Service注解的類注冊成spring bean。
         scanner.doScan(StringUtils.toStringArray(basePackages));
     }
}

  

NettyServerApplicationContextAware類,暴露socket server端口。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public  class  NettyServerApplicationContextAware  implements  ApplicationContextAware, InitializingBean {
     private  static  final  Logger logger = Logger.getLogger(NettyServerApplicationContextAware. class .getName());    // 存儲接口與實現類的映射,其中key是接口名。value是實現類的bean。
     private  Map<String, Object> serviceMap =  new  HashMap<>();    // 服務worker。包含netty socket服務端生命周期及讀寫。
     ServerWorker runner;
 
     @Override
     public  void  setApplicationContext(ApplicationContext applicationContext)  throws  BeansException {
         String address = applicationContext.getEnvironment().getProperty( "remoteAddress" );
 
         Map<String, Object> beans = applicationContext.getBeansWithAnnotation(Service. class );
         for  (Object serviceBean : beans.values()) {
 
             Class<?> clazz = serviceBean.getClass();
 
             Class<?>[] interfaces = clazz.getInterfaces();
 
             for  (Class<?> inter : interfaces) {
                 String interfaceName = inter.getName();
                 serviceMap.put(interfaceName, serviceBean);
             }
         }      // 創建netty worker對象
         runner =  new  ServerWorker(address, serviceMap);
     }
 
     @Override
     public  void  afterPropertiesSet()  throws  Exception {      // 創建netty socketServer及通道處理器
         runner.open();
     }
}

  

ServerWorker類的open方法。

public class ServerWorker extends ChannelInitializer {
   // socket ip:port private String remoteAddress;

// 實現類的beanMap private Map<String, Object> serviceMap;
// netty channel處理器 NettyServerHandler handler;public void open() { try { int parallel = Runtime.getRuntime().availableProcessors() * 2; ServerBootstrap bootstrap = new ServerBootstrap(); this.bossGroup = new NioEventLoopGroup(); // todo 使用線程池,提高並發能力 this.workerGroup = new NioEventLoopGroup(parallel); bootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 1024) .childOption(ChannelOption.SO_KEEPALIVE, true) .childOption(ChannelOption.TCP_NODELAY, true) .childHandler(this); String[] hostAndPort = this.remoteAddress.split(":"); if (hostAndPort == null || hostAndPort.length != 2) { throw new RuntimeException("remoteAddress is error."); } ChannelFuture cf = bootstrap.bind(hostAndPort[0], Integer.parseInt(hostAndPort[1])).sync(); // todo 信息寫入注冊中心 // registry.register(serverAddress); logger.info("netty 服務器啟動.監聽端口:" + hostAndPort[1]); // 等待服務端監聽端口關閉 cf.channel().closeFuture().sync(); } catch (Exception e) { logger.log(Level.SEVERE, "netty server open failed.", e); this.bossGroup.shutdownGracefully(); this.workerGroup.shutdownGracefully(); } } @Override protected void initChannel(Channel channel) throws Exception { ChannelPipeline pipeline = channel.pipeline(); pipeline.addLast(new IdleStateHandler(0, 0, 60)); pipeline.addLast(new JSONEncoder()); pipeline.addLast(new JSONDecoder()); pipeline.addLast(this.handler); } }

 

NettyServerHandler服務端channel處理器,繼承ChannelInboundHandlerAdapter。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
@ChannelHandler .Sharable
public  class  NettyServerHandler  extends  ChannelInboundHandlerAdapter {
     private  Map<String, Object> serviceMap;
 
     public  NettyServerHandler(Map<String, Object> serviceMap) {
         this .serviceMap = serviceMap;
     }
 
     @Override
     public  void  channelRead(ChannelHandlerContext ctx, Object msg) {      // 解析客戶端發送過來的數據。包含類名、方法名、入參等信息。
         Request request = JSON.parseObject(msg.toString(), Request. class );
         
         Response response =  new  Response();
         response.setRequestId(request.getId());
         try  {        // 調用本地實現類
             Object res =  this .handler(request);
             response.setData(res);
         catch  (Exception e) {
             response.setCode(- 1 );
             response.setError(e.getMessage());
             logger.log(Level.SEVERE,  "請求調用失敗" , e);
         }      // 返回處理結果給客戶端
         ctx.writeAndFlush(response);
     }
 
     private  Object handler(Request request)  throws  Exception {
         String className = request.getClassName();      // 通過className從beanMap映射中找到托管給spring的bean實現類。
         Object serviceBean = serviceMap.get(className);
         String methodName = request.getMethodName();
         Object[] parameters = request.getParameters();      // 通過反射機制調用實現類。並返回調用結果。
         return  MethodUtils.invokeMethod(serviceBean, methodName, parameters);
     }
}

  

至此,rpc服務端的實現就完成了。

一路看下來,服務端的代碼實現還是比較簡單的。核心代碼只有兩個類:ServerWorker和NettyServerHandler。其余的都是對spring bean注冊的支持。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM