看了這篇你就會手寫RPC框架了


一、學習本文你能學到什么?

  • RPC的概念及運作流程
  • RPC協議及RPC框架的概念
  • Netty的基本使用
  • Java序列化及反序列化技術
  • Zookeeper的基本使用(注冊中心)
  • 自定義注解實現特殊業務邏輯
  • Java的動態代理
  • 自定義Spring Boot Starter

這里只是列出了你能從RPC框架源碼中能學到的東西,本文並不會每個知識點都點到,主要講述如何手寫一個RPC框架,更多細節需要讀者閱讀源碼,文章的下方會提供源碼鏈接哦。

二、RPC基礎知識

2.1 RPC是什么?

Remote Procedure Call(RPC):遠程過程調用。

過程是什么?
過程就是業務處理、計算任務,更直白理解,就是程序。(像調用本地方法一樣調用遠程的過程。)

RPC采用Client-Server結構,通過Request-Response消息模式實現。

2.2 RPC的流程

image-20200712085059464

  • 客戶端處理過程中調用Client stub(就像調用本地方法一樣),傳遞參數;
  • Client stub將參數編組為消息,然后通過系統調用向服務端發送消息;
  • 客戶端本地操作系統將消息從客戶端機器發送到服務端機器;
  • 服務端操作系統將接收到的數據包傳遞給Server stub;
  • Server stub解組消息為參數;
  • Server stub再調用服務端的過程,過程執行結果以反方向的相同步驟響應給客戶端。

2.3 RPC流程中需要處理的問題

  • Client stub、Server stub的開發;
  • 參數如何編組為消息,以及解組消息;
  • 消息如何發送;
  • 過程結果如何表示、異常情況如何處理;
  • 如何實現安全的訪問控制。

2.4 RPC協議是什么?

RPC調用過程中需要將參數編組為消息進行發送,接受方需要解組消息為參數,過程處理結果同樣需要經編組、解組。消息由哪些部分構成及消息的表示形式就構成了消息協議。

RPC調用過程中采用的消息協議稱為RPC協議

RPC協議規定請求、響應消息的格式

在TCP(網絡傳輸控制協議)上可選用或自定義消息協議來完成RPC消息交互

我們可以選用通用的標准協議(如:http、https),也也可根據自身的需要定義自己的消息協議。

2.5 RPC框架是什么?

封裝好參數編組、消息解組、底層網絡通信的RPC程序開發框架,帶來的便捷是可以直接在其基礎上只需要專注於過程代碼編寫。

Java領域:

  • 傳統的webservice框架:Apache CXF、Apache Axis2、Java自帶的JAX-WS等。webservice框架大多基於標准的SOAP協議。
  • 新興的微服務框架:Dubbo、spring cloud、Apache Thrift等。

三、手寫RPC

3.1 目標

我們將會寫一個簡易的RPC框架,暫且叫它leisure-rpc-spring-boot-starter,通過在項目中引入該starter,並簡單的配置一下,項目即擁有提供遠程服務的能力。

編寫自定義注解@Service,被它注解的類將會提供遠程服務。

編寫自定義注解@InjectService,使用它可注入遠程服務。

3.2 項目整體結構

image-20200722221416777

3.3 客戶端編寫

3.3.1 客戶端需要做什么?

客戶端想要調用遠程服務,必須具備服務發現的能力;在知道有哪些服務過后,還必須有服務代理來執行服務調用;客戶端想要與服務端通信,必須要有相同的消息協議;客戶端想要調用遠程服務,那么必須具備網絡請求的能力,即網絡層功能。

當然,這是客戶端所需的最基本的能力,其實還可以擴展的能力,例如負載均衡。

3.3.2 具體實現

我們先看看客戶端的代碼結構:

image-20200722230006033

基於面向接口編程的理念,不同角色都實現了定義了相應規范的接口。這里面我們沒有發現消息協議相關內容,那是因為服務端也需要消息協議,因此抽離了出來,放在公共層。

3.3.2.1 服務發現者
/**
 * 服務發現抽象類,定義服務發現規范
 *
 * @author 東方雨傾
 * @since 1.0.0
 */
public interface ServiceDiscoverer {
    List<Service> getServices(String name);
}

/**
 * Zookeeper服務發現者,定義以Zookeeper為注冊中心的服務發現細則
 *
 * @author 東方雨傾
 * @since 1.0.0
 */
public class ZookeeperServiceDiscoverer implements ServiceDiscoverer {

    private ZkClient zkClient;

    public ZookeeperServiceDiscoverer(String zkAddress) {
        zkClient = new ZkClient(zkAddress);
        zkClient.setZkSerializer(new ZookeeperSerializer());
    }

    /**
     * 使用Zookeeper客戶端,通過服務名獲取服務列表
     * 服務名格式:接口全路徑
     *
     * @param name 服務名
     * @return 服務列表
     */
    @Override
    public List<Service> getServices(String name) {
        String servicePath = LeisureConstant.ZK_SERVICE_PATH + LeisureConstant.PATH_DELIMITER + name + "/service";
        List<String> children = zkClient.getChildren(servicePath);
        return Optional.ofNullable(children).orElse(new ArrayList<>()).stream().map(str -> {
            String deCh = null;
            try {
                deCh = URLDecoder.decode(str, LeisureConstant.UTF_8);
            } catch (UnsupportedEncodingException e) {
                e.printStackTrace();
            }
            return JSON.parseObject(deCh, Service.class);
        }).collect(Collectors.toList());
    }
}

服務發現者使用Zookeeper來實現,通過ZkClient我們很容易發現已經注冊在ZK上的服務。當然我們也可以使用其他組件作為注冊中心,例如Redis。

3.3.2.2 網絡客戶端
/**
 * 網絡請求客戶端,定義網絡請求規范
 *
 * @author 東方雨傾
 * @since 1.0.0
 */
public interface NetClient {
    byte[] sendRequest(byte[] data, Service service) throws InterruptedException;
}

/**
 * Netty網絡請求客戶端,定義通過Netty實現網絡請求的細則。
 *
 * @author 東方雨傾
 * @since 1.0.0
 */
public class NettyNetClient implements NetClient {
    private static Logger logger = LoggerFactory.getLogger(NettyNetClient.class);

    /**
     * 發送請求
     *
     * @param data    請求數據
     * @param service 服務信息
     * @return 響應數據
     * @throws InterruptedException 異常
     */
    @Override
    public byte[] sendRequest(byte[] data, Service service) throws InterruptedException {
        String[] addInfoArray = service.getAddress().split(":");
        String serverAddress = addInfoArray[0];
        String serverPort = addInfoArray[1];

        SendHandler sendHandler = new SendHandler(data);
        byte[] respData;
        // 配置客戶端
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap b = new Bootstrap();
            b.group(group).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch) {
                            ChannelPipeline p = ch.pipeline();
                            p.addLast(sendHandler);
                        }
                    });

            // 啟動客戶端連接
            b.connect(serverAddress, Integer.parseInt(serverPort)).sync();
            respData = (byte[]) sendHandler.rspData();
            logger.info("SendRequest get reply: {}", respData);
        } finally {
            // 釋放線程組資源
            group.shutdownGracefully();
        }

        return respData;
    }
}

/**
 * 發送處理類,定義Netty入站處理細則
 *
 * @author 東方雨傾
 * @since 1.0.0
 */
public class SendHandler extends ChannelInboundHandlerAdapter {
    private static Logger logger = LoggerFactory.getLogger(SendHandler.class);

    private CountDownLatch cdl;
    private Object readMsg = null;
    private byte[] data;

    public SendHandler(byte[] data) {
        cdl = new CountDownLatch(1);
        this.data = data;
    }

    /**
     * 當連接服務端成功后,發送請求數據
     *
     * @param ctx 通道上下文
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        logger.info("Successful connection to server:{}", ctx);
        ByteBuf reqBuf = Unpooled.buffer(data.length);
        reqBuf.writeBytes(data);
        logger.info("Client sends message:{}", reqBuf);
        ctx.writeAndFlush(reqBuf);
    }

    /**
     * 讀取數據,數據讀取完畢釋放CD鎖
     *
     * @param ctx 上下文
     * @param msg ByteBuf
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        logger.info("Client reads message: {}", msg);
        ByteBuf msgBuf = (ByteBuf) msg;
        byte[] resp = new byte[msgBuf.readableBytes()];
        msgBuf.readBytes(resp);
        readMsg = resp;
        cdl.countDown();
    }

    /**
     * 等待讀取數據完成
     *
     * @return 響應數據
     * @throws InterruptedException 異常
     */
    public Object rspData() throws InterruptedException {
        cdl.await();
        return readMsg;
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) {
        ctx.flush();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        // Close the connection when an exception is raised.
        cause.printStackTrace();
        logger.error("Exception occurred:{}", cause.getMessage());
        ctx.close();
    }
}

在這里我們使用Netty來實現網絡請求客戶端,當然也可以使用Mina。網絡請求客戶端能連接遠程服務端,並將編組好的請求數據發送給服務端,待服務端處理好后,又將服務端的響應數據返回給客戶端。

3.3.2.3 服務代理
/**
 * 客戶端代理工廠:用於創建遠程服務代理類
 * 封裝編組請求、請求發送、編組響應等操作。
 *
 * @author 東方雨傾
 * @since 1.0.0
 */
public class ClientProxyFactory {
    private ServiceDiscoverer serviceDiscoverer;

    private Map<String, MessageProtocol> supportMessageProtocols;

    private NetClient netClient;

    private Map<Class<?>, Object> objectCache = new HashMap<>();

    /**
     * 通過Java動態代理獲取服務代理類
     *
     * @param clazz 被代理類Class
     * @param <T>   泛型
     * @return 服務代理類
     */
    @SuppressWarnings("unchecked")
    public <T> T getProxy(Class<T> clazz) {
        return (T) this.objectCache.computeIfAbsent(clazz,
                cls -> newProxyInstance(cls.getClassLoader(), new Class<?>[]{cls}, new ClientInvocationHandler(cls)));
    }

    // getter setter ...

    /**
     * 客戶端服務代理類invoke函數細節實現
     */
    private class ClientInvocationHandler implements InvocationHandler {
        private Class<?> clazz;

        private Random random = new Random();

        public ClientInvocationHandler(Class<?> clazz) {
            super();
            this.clazz = clazz;
        }

        @Override
        public Object invoke(Object proxy, Method method, Object[] args) throws Exception {

            if (method.getName().equals("toString")) {
                return proxy.getClass().toString();
            }

            if (method.getName().equals("hashCode")) {
                return 0;
            }

            // 1、獲得服務信息
            String serviceName = this.clazz.getName();
            List<Service> services = serviceDiscoverer.getServices(serviceName);

            if (services == null || services.isEmpty()) {
                throw new LeisureException("No provider available!");
            }

            // 隨機選擇一個服務提供者(軟負載均衡)
            Service service = services.get(random.nextInt(services.size()));

            // 2、構造request對象
            LeisureRequest req = new LeisureRequest();
            req.setServiceName(service.getName());
            req.setMethod(method.getName());
            req.setParameterTypes(method.getParameterTypes());
            req.setParameters(args);

            // 3、協議層編組
            // 獲得該方法對應的協議
            MessageProtocol protocol = supportMessageProtocols.get(service.getProtocol());
            // 編組請求
            byte[] data = protocol.marshallingRequest(req);

            // 4、調用網絡層發送請求
            byte[] repData = netClient.sendRequest(data, service);

            // 5解組響應消息
            LeisureResponse rsp = protocol.unmarshallingResponse(repData);

            // 6、結果處理
            if (rsp.getException() != null) {
                throw rsp.getException();
            }
            return rsp.getReturnValue();
        }
    }
}

服務代理類由客戶端代理工廠類產生,代理方式是基於Java的動態代理。在處理類ClientInvocationHandler的invoke函數中,定義了一系列的操作,包括獲取服務、選擇服務提供者、構造請求對象、編組請求對象、網絡請求客戶端發送請求、解組響應消息、異常處理等。

3.3.2.4 消息協議
/**
 * 消息協議,定義編組請求、解組請求、編組響應、解組響應規范
 *
 * @author 東方雨傾
 * @since 1.0.0
 */
public interface MessageProtocol {

    /**
     * 編組請求
     *
     * @param req 請求信息
     * @return 請求字節數組
     * @throws Exception 編組請求異常
     */
    byte[] marshallingRequest(LeisureRequest req) throws Exception;

    /**
     * 解組請求
     *
     * @param data 請求字節數組
     * @return 請求信息
     * @throws Exception 解組請求異常
     */
    LeisureRequest unmarshallingRequest(byte[] data) throws Exception;

    /**
     * 編組響應
     *
     * @param rsp 響應信息
     * @return 響應字節數組
     * @throws Exception 編組響應異常
     */
    byte[] marshallingResponse(LeisureResponse rsp) throws Exception;

    /**
     * 解組響應
     *
     * @param data 響應字節數組
     * @return 響應信息
     * @throws Exception 解組響應異常
     */
    LeisureResponse unmarshallingResponse(byte[] data) throws Exception;
}

/**
 * Java序列化消息協議
 *
 * @author 東方雨傾
 * @since 1.0.0
 */
public class JavaSerializeMessageProtocol implements MessageProtocol {

    private byte[] serialize(Object obj) throws Exception {
        ByteArrayOutputStream bout = new ByteArrayOutputStream();
        ObjectOutputStream out = new ObjectOutputStream(bout);
        out.writeObject(obj);
        return bout.toByteArray();
    }

    @Override
    public byte[] marshallingRequest(LeisureRequest req) throws Exception {
        return this.serialize(req);
    }

    @Override
    public LeisureRequest unmarshallingRequest(byte[] data) throws Exception {
        ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(data));
        return (LeisureRequest) in.readObject();
    }

    @Override
    public byte[] marshallingResponse(LeisureResponse rsp) throws Exception {
        return this.serialize(rsp);
    }

    @Override
    public LeisureResponse unmarshallingResponse(byte[] data) throws Exception {
        ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(data));
        return (LeisureResponse) in.readObject();
    }
}

消息協議主要是定義了客戶端如何編組請求解組響應,服務端如何解組請求編組響應這四個操作規范。本文提供了Java序列化與反序列化的實現,感興趣的讀者可以基於其他序列化技術實現其他消息協議(偷偷說一句:Java的序列化性能很不理想)。

3.4 服務端編寫

3.4.1 服務端需要做什么?

首先,服務端要提供遠程服務,必須具備服務注冊及暴露的能力;在這之后,還需要開啟網絡服務,供客戶端連接。有些項目可能既是服務提供者,又是服務消費者,那什么時候開啟服務,什么時候注入服務呢?這里我們引入一個RPC處理者的概念,由它來幫我們開啟服務,以及注入服務。

3.4.3 具體實現

先看看服務端的代碼結構:

image-20200722234608540

服務端做的事情也很簡單,注冊服務並暴露服務,然后開啟網絡服務;如果服務端也是消費者,則注入遠程服務。

服務注冊和服務注入依賴兩個自定義注解來實現:

  • @Service:注冊服務
  • @InjectService:注入服務

下面是他們的實現代碼:

/**
 * 被該注解標記的服務可提供遠程RPC訪問的能力
 *
 * @author 東方雨傾
 * @since 1.0.0
 */
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Component
public @interface Service {
    String value() default "";
}

/**
 * 該注解用於注入遠程服務
 *
 * @author 東方雨傾
 * @since 1.0.0
 */
@Target({ElementType.FIELD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface InjectService {

}
3.4.3.1 服務注冊(暴露)
/**
 * 服務注冊器,定義服務注冊規范
 *
 * @author 東方雨傾
 * @since 1.0.0
 */
public interface ServiceRegister {
    void register(ServiceObject so) throws Exception;
    ServiceObject getServiceObject(String name) throws Exception;
}

/**
 * 默認服務注冊器
 *
 * @author 東方雨傾
 * @since 1.0.0
 */
public class DefaultServiceRegister implements ServiceRegister {

    private Map<String, ServiceObject> serviceMap = new HashMap<>();
    protected String protocol;
    protected Integer port;

    @Override
    public void register(ServiceObject so) throws Exception {
        if (so == null) {
            throw new IllegalArgumentException("Parameter cannot be empty.");
        }
        this.serviceMap.put(so.getName(), so);
    }

    @Override
    public ServiceObject getServiceObject(String name) {
        return this.serviceMap.get(name);
    }
}

/**
 * Zookeeper服務注冊器,提供服務注冊、服務暴露的能力
 *
 * @author 東方雨傾
 * @since 1.0.0
 */
public class ZookeeperExportServiceRegister extends DefaultServiceRegister implements ServiceRegister {
    /**
     * Zk客戶端
     */
    private ZkClient client;
    public ZookeeperExportServiceRegister(String zkAddress, Integer port, String protocol) {
        client = new ZkClient(zkAddress);
        client.setZkSerializer(new ZookeeperSerializer());
        this.port = port;
        this.protocol = protocol;
    }

    /**
     * 服務注冊
     *
     * @param so 服務持有者
     * @throws Exception 注冊異常
     */
    @Override
    public void register(ServiceObject so) throws Exception {
        super.register(so);
        Service service = new Service();
        String host = InetAddress.getLocalHost().getHostAddress();
        String address = host + ":" + port;
        service.setAddress(address);
        service.setName(so.getClazz().getName());
        service.setProtocol(protocol);
        this.exportService(service);

    }

    /**
     * 服務暴露
     *
     * @param serviceResource 需要暴露的服務信息
     */
    private void exportService(Service serviceResource) {
        String serviceName = serviceResource.getName();
        String uri = JSON.toJSONString(serviceResource);
        try {
            uri = URLEncoder.encode(uri, UTF_8);
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        }
        String servicePath = ZK_SERVICE_PATH + PATH_DELIMITER + serviceName + "/service";
        if (!client.exists(servicePath)) {
            client.createPersistent(servicePath, true);
        }
        String uriPath = servicePath + PATH_DELIMITER + uri;
        if (client.exists(uriPath)) {
            client.delete(uriPath);
        }
        client.createEphemeral(uriPath);
    }
}

這個過程其實沒啥好說的,就是將指定ServiceObject對象序列化后保存到ZK上,供客戶端發現。同時會將服務對象緩存起來,在客戶端調用服務時,通過緩存的ServiceObject對象反射指定服務,調用方法。

3.4.3.2 網絡服務
/**
 * RPC服務端抽象類
 *
 * @author 東方雨傾
 * @since 1.0.0
 */
public abstract class RpcServer {

    /**
     * 服務端口
     */
    protected int port;

    /**
     * 服務協議
     */
    protected String protocol;

    /**
     * 請求處理者
     */
    protected RequestHandler handler;

    public RpcServer(int port, String protocol, RequestHandler handler) {
        super();
        this.port = port;
        this.protocol = protocol;
        this.handler = handler;
    }

    /**
     * 開啟服務
     */
    public abstract void start();

    /**
     * 停止服務
     */
    public abstract void stop();
	// getter setter ...
}

/**
 * Netty RPC服務端,提供Netty網絡服務開啟、關閉的能力
 *
 * @author 東方雨傾
 * @since 1.0.0
 */
public class NettyRpcServer extends RpcServer {
    private static Logger logger = LoggerFactory.getLogger(NettyRpcServer.class);

    private Channel channel;

    public NettyRpcServer(int port, String protocol, RequestHandler handler) {
        super(port, protocol, handler);
    }

    @Override
    public void start() {
        // 配置服務器
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 100)
                    .handler(new LoggingHandler(LogLevel.INFO)).childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) {
                    ChannelPipeline p = ch.pipeline();
                    p.addLast(new ChannelRequestHandler());
                }
            });

            // 啟動服務
            ChannelFuture f = b.bind(port).sync();
            logger.info("Server started successfully.");
            channel = f.channel();
            // 等待服務通道關閉
            f.channel().closeFuture().sync();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            // 釋放線程組資源
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

    @Override
    public void stop() {
        this.channel.close();
    }

    private class ChannelRequestHandler extends ChannelInboundHandlerAdapter {

        @Override
        public void channelActive(ChannelHandlerContext ctx) {
            logger.info("Channel active:{}", ctx);
        }

        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            logger.info("The server receives a message: {}", msg);
            ByteBuf msgBuf = (ByteBuf) msg;
            byte[] req = new byte[msgBuf.readableBytes()];
            msgBuf.readBytes(req);
            byte[] res = handler.handleRequest(req);
            logger.info("Send response:{}", msg);
            ByteBuf respBuf = Unpooled.buffer(res.length);
            respBuf.writeBytes(res);
            ctx.write(respBuf);
        }

        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) {
            ctx.flush();
        }

        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
            // Close the connection when an exception is raised.
            cause.printStackTrace();
            logger.error("Exception occurred:{}", cause.getMessage());
            ctx.close();
        }
    }
}

/**
 * 請求處理者,提供解組請求、編組響應等操作
 *
 * @author 東方雨傾
 * @since 1.0.0
 */
public class RequestHandler {
    private MessageProtocol protocol;

    private ServiceRegister serviceRegister;

    public RequestHandler(MessageProtocol protocol, ServiceRegister serviceRegister) {
        super();
        this.protocol = protocol;
        this.serviceRegister = serviceRegister;
    }

    public byte[] handleRequest(byte[] data) throws Exception {
        // 1、解組消息
        LeisureRequest req = this.protocol.unmarshallingRequest(data);

        // 2、查找服務對象
        ServiceObject so = this.serviceRegister.getServiceObject(req.getServiceName());

        LeisureResponse rsp = null;

        if (so == null) {
            rsp = new LeisureResponse(LeisureStatus.NOT_FOUND);
        } else {
            // 3、反射調用對應的過程方法
            try {
                Method m = so.getClazz().getMethod(req.getMethod(), req.getParameterTypes());
                Object returnValue = m.invoke(so.getObj(), req.getParameters());
                rsp = new LeisureResponse(LeisureStatus.SUCCESS);
                rsp.setReturnValue(returnValue);
            } catch (NoSuchMethodException | SecurityException | IllegalAccessException | IllegalArgumentException
                    | InvocationTargetException e) {
                rsp = new LeisureResponse(LeisureStatus.ERROR);
                rsp.setException(e);
            }
        }

        // 4、編組響應消息
        return this.protocol.marshallingResponse(rsp);
    }
	// getter setter ...
}

網絡服務定義了啟動服務的細則,以及如何處理客戶端發來的請求。

3.4.3.3 RPC處理者
/**
 * RPC處理者,支持服務啟動暴露、自動注入Service
 *
 * @author 東方雨傾
 * @since 1.0.0
 */
public class DefaultRpcProcessor implements ApplicationListener<ContextRefreshedEvent> {

    @Resource
    private ClientProxyFactory clientProxyFactory;

    @Resource
    private ServiceRegister serviceRegister;

    @Resource
    private RpcServer rpcServer;

    @Override
    public void onApplicationEvent(ContextRefreshedEvent event) {
        if (Objects.isNull(event.getApplicationContext().getParent())) {
            ApplicationContext context = event.getApplicationContext();
            // 開啟服務
            startServer(context);

            // 注入Service
            injectService(context);
        }
    }

    private void startServer(ApplicationContext context) {
        Map<String, Object> beans = context.getBeansWithAnnotation(Service.class);
        if (beans.size() != 0) {
            boolean startServerFlag = true;
            for (Object obj : beans.values()) {
                try {
                    Class<?> clazz = obj.getClass();
                    Class<?>[] interfaces = clazz.getInterfaces();
                    ServiceObject so;
                    if (interfaces.length != 1) {
                        Service service = clazz.getAnnotation(Service.class);
                        String value = service.value();
                        if (value.equals("")) {
                            startServerFlag = false;
                            throw new UnsupportedOperationException("The exposed interface is not specific with '" + obj.getClass().getName() + "'");
                        }
                        so = new ServiceObject(value, Class.forName(value), obj);
                    } else {
                        Class<?> superClass = interfaces[0];
                        so = new ServiceObject(superClass.getName(), superClass, obj);
                    }
                    serviceRegister.register(so);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
            if (startServerFlag) {
                rpcServer.start();
            }
        }
    }

    private void injectService(ApplicationContext context) {
        String[] names = context.getBeanDefinitionNames();
        for (String name : names) {
            Class<?> clazz = context.getType(name);
            if (Objects.isNull(clazz)) continue;
            Field[] fields = clazz.getDeclaredFields();
            for (Field field : fields) {
                InjectService injectLeisure = field.getAnnotation(InjectService.class);
                if (Objects.isNull(injectLeisure)) continue;
                Class<?> fieldClass = field.getType();
                Object object = context.getBean(name);
                field.setAccessible(true);
                try {
                    field.set(object, clientProxyFactory.getProxy(fieldClass));
                } catch (IllegalAccessException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

DefaultRpcProcessor實現了ApplicationListener,並監聽了ContextRefreshedEvent事件,其效果就是在Spring啟動完畢過后會收到一個事件通知,基於這個機制,就可以在這里開啟服務,以及注入服務。因為一切已經准備就緒了,所需要的資源都是OK的。

四、使用RPC框架

框架一個很重要的特性就是要使用簡單,使用該框架只需要一個條件和四個步驟即可。

4.1 一個條件

需要准備一個Zookeeper作為注冊中心,單節點即可。

4.2 步驟一

引入Maven依賴:

<dependency>
    <groupId>wang.leisure</groupId>
    <artifactId>leisure-rpc-spring-boot-starter</artifactId>
    <version>1.0.0-SNAPSHOT</version>
</dependency>

不知道如何獲得依賴的讀者,請在源碼下載后,進入項目目錄下(pom.xml文件所在位置),執行 mvn install命令,即可在本地倉庫生成maven依賴。

4.3 步驟二

在你的項目配置文件(application.properties)中配置注冊中心地址,例如:

leisure.rpc.register-address=192.168.199.241:2181

4.4 步驟三

將你的遠程服務使用@Service注解,例如:

import wang.leisure.rpc.annotation.Service;

@Service
public class UserServiceImpl implements UserService {
    @Override
    public ApiResult<User> getUser(Long id) {
        User user = getFromDbOrCache(id);
        return ApiResult.success(user);
    }

    private User getFromDbOrCache(Long id) {
        return new User(id, "東方雨傾", 1, "https://leisure.wang");
    }
}

4.5 步驟四

使用注解@InjectService注入遠程服務,例如:

@RestController
@RequestMapping("/index/")
public class IndexController {

    @InjectService
    private UserService userService;

    /**
     * 獲取用戶信息
     * http://localhost:8080/index/getUser?id=1
     *
     * @param id 用戶id
     * @return 用戶信息
     */
    @GetMapping("getUser")
    public ApiResult<User> getUser(Long id) {
        return userService.getUser(id);
    }
}

五、源碼下載

框架源碼:leisure-rpc-spring-boot-starter

示例源碼:leisure-rpc-example

為方便讀者看到效果,筆者也簡單的編寫了一個示例項目,可以下載下來試試。如果源碼對你有一丁點的幫助,希望點個小星星支持一下哦。

六、總結

希望讀者能夠真正動手去試一試,只有實踐了才能知道里面的運作邏輯。筆者也是花了兩個星期才把代碼跟文章整理好,並不是因為這個東西難,而是因為沒時間,苦逼的程序img早上七點起床,晚上10點左右回家,確實沒啥時間搞這些,哈哈哈。如果文章對你有幫助,希望多多支持。

原文地址:https://leisure.wang/procedural-framework/framework/704.html

end
Java開發樂園


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM