基於netty框架的輕量級RPC實現(附源碼)


前言

  Rpc( Remote procedure call):是一種請求 - 響應協議。RPC由客戶端啟動,客戶端向已知的遠程服務器發送請求消息,以使用提供的參數執行指定的過程。遠程服務器向客戶端發送響應,應用程序繼續其進程。當服務器正在處理該調用時,客戶端被阻塞(它等待服務器在恢復執行之前完成處理),除非客戶端向服務器發送異步請求,例如XMLHttpRequest。在各種實現中存在許多變化和細微之處,導致各種不同(不兼容)的RPC協議。

  技術選型:

  1. Protostuff:它基於 Protobuf 序列化框架,面向 POJO,無需編寫 .proto 文件。
  2. Netty:基於NIO的網絡編程框架,封裝了NIO細節,使用更加方便
  3. SpringBoot:Spring 的組件的集合,內部封裝服務器,實現了自動加載

 

1.封裝請求的pojo和響應的pojo

    

public class RpcRequest {
    public RpcRequest() {
    }

    private Long id;
    /**
     * rpc name
     */
    private String className;
    /**
     * 方法名
     */
    private String methodName;
    /**
     * 參數
     */
    private HashMap<Class<?>, Object> arguments;

     //get and set ...

  

public class RpcResponse {
    public RpcResponse() {
    }

    private Long id;
    private Integer code;
    private Object result;
    private String failMsg;
    // get and set ...

 

2.server端對request進行解碼,對response進行編碼。反之client端對request進行編碼,對response進行解碼,因此需要編寫兩個編碼和解碼器,在不同端,對不同pojo進行編碼解碼

  編碼類只對屬於某個 genericClass的類進行編碼,SerializationUtil為使用Protobuffer工具封裝的一個工具類

@ChannelHandler.Sharable
public class RpcEncode extends MessageToByteEncoder {
    //client 端為 request, server 端為 response
    private Class<?> genericClass;

    public RpcEncode(Class<?> clazz) {
        this.genericClass = clazz;
    }

    @Override
    protected void encode(ChannelHandlerContext channelHandlerContext,
                          Object o, ByteBuf byteBuf) throws Exception {
        if (genericClass.isInstance(o)) {
            byte[] data = SerializationUtil.serialize(o);
            byteBuf.writeInt(data.length);
            byteBuf.writeBytes(data);
        }
    }
}

  同樣的,解碼 

public class RpcDecode extends ByteToMessageDecoder {
    private Class<?> genericClass;

    public RpcDecode(Class<?> clazz) {
        this.genericClass = clazz;
    }

    @Override
    protected void decode(ChannelHandlerContext ctx,
                          ByteBuf in, List<Object> out) throws Exception {
        int dataLength = in.readInt();
        //一個整數4個字節
        if (dataLength < 4) {
            return;
        }
        in.markReaderIndex();
        if (in.readableBytes() < dataLength) {
            in.resetReaderIndex();
            return;
        }
        byte[] data = new byte[dataLength];
        in.readBytes(data);
        Object obj = SerializationUtil.deserialize(data, genericClass);
        out.add(obj);
    }

 

3. server端將數據解碼后,開始使用handler處理client的請求,handler里包含一個map,里面value是使用@RpcService后的bean,key是注解的value,通過RpcRequest的className,從map的key進行匹配,找到bean之后,通過反射執行  methodName對應的方法 和arguments的參數

  @RpcService用於標識在發布的服務類上,value為client 請求的classname,該注解繼承了@Component注解,將會被spring注冊為bean

    

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Component
public @interface RpcService {
    String value();

    String description() default "";
}

  通過@RpcService 標識的類,將被注冊為bean實例,這里將在 LrpcHandlerAutoConfiguration類中,將這些標識了該注解的bean實例找出來,傳入handler中執行client的請求方法

  

@Configurable
@Component
public class LrpcHandlerAutoConfiguration implements ApplicationContextAware {
    private ApplicationContext context;
    @Value("${lrpc.server}")
    public String port;

    @Bean
    public RpcHandler rpcHandler() {
        Map<String, Object> rpcBeans = context.getBeansWithAnnotation(RpcService.class);
        Set<String> beanNameSet = rpcBeans.keySet();
        for (String beanName : beanNameSet) {
            Object obj = rpcBeans.get(beanName);
            //rpcService注解會 把value的值傳遞給component
            RpcService annotation = obj.getClass().getDeclaredAnnotation(RpcService.class);
            //默認bean name
            if (StringUtils.isBlank(annotation.value()) || annotation.value().equals(beanName)) {
                continue;
            }
            rpcBeans.put(annotation.value(), rpcBeans.get(beanName));
            //去掉重復
            rpcBeans.remove(beanName);
        }
        return new RpcHandler(rpcBeans);
    }
//..........................

RpcHandler的構造函數,注入了一份rpcBeans的引用,當client的RpcRequest請求時,將從該rpcBeans中獲取對應的bean

@ChannelHandler.Sharable
public class RpcHandler extends SimpleChannelInboundHandler<RpcRequest> {
    private static final Logger logger = Logger.getLogger(RpcHandler.class.getName());
    private Map<String, Object> rpcBeans;

    public RpcHandler(Map<String, Object> rpcBeans) {
        this.rpcBeans = rpcBeans;
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, RpcRequest msg) throws Exception {
        RpcResponse rpcResponse = handle(msg);
        ctx.channel().writeAndFlush(rpcResponse).addListener(ChannelFutureListener.CLOSE);
    }

    private RpcResponse handle(RpcRequest msg) throws InvocationTargetException {
        RpcResponse rpcResponse = new RpcResponse();
        Object obj = rpcBeans.get(msg.getClassName());
        //TODO 暫時這樣吧
        if (Objects.isNull(obj)) {
            System.out.println("未找到service");
            rpcResponse.setResult(null);
            rpcResponse.setCode(404);
            logger.warning("請求的service未找到,msg:" + msg.toString());
            return rpcResponse;
        }
        rpcResponse.setId(msg.getId());
        //解析請求,執行相應的rpc方法
        Class<?> clazz = obj.getClass();
        String methodName = msg.getMethodName();
        HashMap<Class<?>, Object> arguments = msg.getArguments();
        FastClass fastClass = FastClass.create(clazz);
        FastMethod method = fastClass.getMethod(methodName,
                arguments.keySet().toArray(new Class[arguments.size()]));
        Object result = method.invoke(obj, arguments.values().toArray());
        rpcResponse.setResult(result);
        rpcResponse.setCode(200);
        return rpcResponse;
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
        logger.warning(cause.toString());
    }
}

4. 啟動 LrpcServer,LrpcChannelInit 在 LrpcHandlerAutoConfiguration中進行初始化,同時注入 lrpc.server 環境變量給port參數

@Component
public class LrpcServerImpl implements LrpcServer, ApplicationListener<ApplicationReadyEvent> {
    @Autowired
    LrpcChannelInit lrpcChannelInit;

    @Override
    public void connect() {
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(bossGroup, workerGroup)
                    .handler(new LoggingHandler(LogLevel.INFO))
                    .channel(NioServerSocketChannel.class)
                    .childHandler(lrpcChannelInit)
                    .option(ChannelOption.SO_BACKLOG, 128)
                    .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000)
                    .childOption(ChannelOption.SO_KEEPALIVE, true);
            ChannelFuture future = bootstrap.bind(new InetSocketAddress(lrpcChannelInit.getPort())).sync();
            future.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

    @Override
    public void onApplicationEvent(ApplicationReadyEvent event) {
        connect();
    }
}

5. 客戶端handler類 LrpClientHandler,里面持一把對象鎖,因為netty返回數據總是異步的,這里將異步轉成同步,利用 Object的wait()和notify()方法實現,LrpClientHandler這里是多例的,不存在競爭狀態,因此是線程安全的

  

public class LrpClientHandler extends SimpleChannelInboundHandler<RpcResponse> {
    private final Object lock = new Object();
    private volatile RpcResponse rpcResponse = null;

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, RpcResponse msg) throws Exception {
        rpcResponse = msg;
        synchronized (lock) {
            lock.notifyAll();
        }
    }


    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }

    public Object getLock() {
        return lock;
    }

    public RpcResponse getRpcResponse() {
        return rpcResponse;
    }

    public void setRpcResponse(RpcResponse rpcResponse) {
        this.rpcResponse = rpcResponse;
    }

}

6. LrpcClientChannelInit 中持有一個LrpcClientHandler的引用,在初始化該類時同時初始化LrpcClientHandler

public class LrpcClientChannelInit extends ChannelInitializer {
    private LrpClientHandler lrpClientHandler;

    public LrpcClientChannelInit() {
        lrpClientHandler = new LrpClientHandler();
    }

    @Override
    protected void initChannel(Channel ch) {
        //請求加密
        ch.pipeline().addLast(new RpcEncode(RpcRequest.class))
                .addLast(new RpcDecode(RpcResponse.class))
                .addLast(new LoggingHandler(LogLevel.INFO))
                .addLast(lrpClientHandler);
    }
    public synchronized void initHandler(LrpClientHandler lrpClientHandler){
        this.lrpClientHandler = lrpClientHandler;
    }
    public LrpClientHandler getLrpClientHandler() {
        return lrpClientHandler;
    }

    public void setLrpClientHandler(LrpClientHandler lrpClientHandler) {
        this.lrpClientHandler = lrpClientHandler;
    }
}

7. 持有執行遠程方法的host和port,execute(RpcRequest r)中連接,傳遞參數

public class LrpcExecutorImpl implements LrpcExecutor {
    private String host;
    private Integer port;

    public LrpcExecutorImpl(String host, Integer port) {
        this.host = host;
        this.port = port;
    }


    @Override
    public RpcResponse execute(RpcRequest rpcRequest) {
        LrpcClientChannelInit lrpcClientChannelInit = new LrpcClientChannelInit();
        Bootstrap b = new Bootstrap();
        EventLoopGroup group = null;
        ChannelFuture future = null;
        try {
            group = new NioEventLoopGroup();
            b.group(group)
                    .channel(NioSocketChannel.class)
                    .handler(lrpcClientChannelInit)
                    .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000)
                    .option(ChannelOption.SO_KEEPALIVE, true);
            future = b.connect(new InetSocketAddress(host, port)).sync();
            //TODO 連接好了直接發送消息,同步則阻塞等待通知
            future.channel().writeAndFlush(rpcRequest).sync();
            Object lock = lrpcClientChannelInit.getLrpClientHandler().getLock();
            synchronized (lock) {
                lock.wait();
            }
            RpcResponse rpcResponse = lrpcClientChannelInit.getLrpClientHandler().getRpcResponse();
            if (null != rpcResponse) {
                future.channel().closeFuture().sync();
            }
            return rpcResponse;
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lrpcClientChannelInit.getLrpClientHandler().setRpcResponse(null);
            if (null != group) {
                group.shutdownGracefully();
            }
        }
        return null;
    }
//get and set ...
}

8.使用實例

     Server端發布服務

@RpcService("HelloService")
public class HelloServiceImpl implements HelloService {
    @Override
    public String say(String msg) {
        return "Hello Word!" + msg;
    }
}

    在application.properties中,注入環境變量:

#

lrpc.server=8888

    Client 配置服務地址和LrpcExecutor

   

lrpc.hello.host=xxxxxxxxxxxxxxxxxxx
lrpc.hello.port=8888
lrpc.hello.desc=hello rpc調用

  配置調用服務執行器 LrpcExecutor,保存在spring 容器bean里,可通過依賴注入進行調用

@Configuration
@Component
public class RpcConfiguration {

    @Bean("rpc.hello")
    @ConfigurationProperties(prefix = "lrpc.hello")
    public RpcServerProperties rpcClientCallProperties() {
        return new RpcServerProperties();
    }

    @Bean("helloRpcExecutor")
    LrpcExecutor lrpcExecutor(@Qualifier(value = "rpc.hello") RpcServerProperties rpcServerProperties) {
        return invoke(rpcServerProperties);
    }

    private LrpcExecutor invoke(RpcServerProperties config) {
        return new LrpcExecutorImpl(config.getHost(), config.getPort());
    }
}

  調用服務,methodName為ClassName對應類下的方法名: 

@Autowired
    @Qualifier(value = "helloRpcExecutor")
    private LrpcExecutor helloRpcExecutor;

    @GetMapping("/say")
    public String invoke(String msg) {
        RpcRequest rpcRequest = new RpcRequest();
        rpcRequest.setClassName("HelloService");
        rpcRequest.setMethodName("say");
        rpcRequest.setId(111L);
        HashMap<Class<?>, Object> arguments = new HashMap<>(8);
        arguments.put(String.class, "good");
        rpcRequest.setArguments(arguments);
        RpcResponse execute = helloRpcExecutor.execute(rpcRequest);
        System.out.println(execute.toString());
        return execute.toString();
    }

最后,以上為個人練手demo,未來有時間會把未完善的地方慢慢完善,最終目標是做成像dobble那樣的pj,如有好的意見或疑惑歡迎各位大佬指點(morty630@foxmail.com),附上 github完整代碼:https://github.com/To-echo/lrpc-all    (你的點贊是我的動力)

相關技術文檔  

objenesis反射工具:http://objenesis.org/details.html

Protobuf 協議:https://developers.google.com/protocol-buffers/

Protobuffer序列化工具:https://github.com/protostuff/protostuff

RPC介紹:https://en.wikipedia.org/wiki/Remote_procedure_call

Netty官網:https://netty.io/


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM