曹工說mini-dubbo(1)--為了實踐動態代理,我寫了個簡單的rpc框架


相關背景及資源:

之前本來一直在寫spring源碼解析這塊,如下,aop部分剛好寫完。以前零散看過一些文章,知道rpc調用基本就是使用動態代理,比如rmi,dubbo,feign調用等。自己也就想着試一下,於是有了mini-dubbo這個東西,暫時也不能稱為一個框架,因為還不是生產級的,目前只是實現了一部分小功能,也沒有監控,也沒有xxx,反正就是缺的比較多。

曹工說Spring Boot源碼(22)-- 你說我Spring Aop依賴AspectJ,我依賴它什么了

我就說下,里面用到的知識點吧,有興趣的,可以克隆源碼下來看看:

  1. 動態代理
  2. 服務注冊和消費,使用redis作為注冊中心,其中使用了redisson作為redis客戶端,其中涉及到BeanFactoryPostProcessor的使用
  3. 因為傳輸層使用netty和mina,是異步的,但是上層又需要等待結果,所以用到了同步轉異步
  4. spring的xml解析,bean definition注冊,spring 擴展xml 命名空間
  5. 自定義的spi的相關知識
  6. 分層思想,從dubbo借鑒了其分層,但是mini-dubbo要少幾層,因為我暫時不是很清楚dubbo的每一層的具體職責,所以我按我自己理解分的層。上層依賴下層,只通過下層的接口,查找下層接口時,直接在spring容器中查找bean即可,類似於spring mvc的設計。當下層有多個實現時,通過類似spi機制來指定具體要使用的下層實現。
  7. 基於第5點,所以本框架非常容易替換各層的實現,只要自己自定義一個spring bean,實現對應的接口,然后在spi文件中指定本實現的類名即可。
  8. netty和mina的tcp粘包拆包工作。

概要

代碼我放在了如下位置:

https://gitee.com/ckl111/mini-dubbo

我介紹下代碼的整體結構:

服務端聚合工程比較簡單,目前也沒時間去仔細弄,包含了如下module:

    <modules>
        <!--業務層api-->
        <module>../mini-dubbo-api</module>
        <!--業務層,服務端demo-->
        <module>../mini-dubbo-server</module>

        <!--配置層,解析xml的工作,在本層完成-->
        <module>../mini-dubbo-core</module>

        <module>../mini-dubbo-common</module>
    </modules>

目前的大部分實現,是在客戶端,包含了如下module:

<modules>
       <!--業務層api-->
   	   <module>../mini-dubbo-api</module>
       <!--業務層,測試demo-->
       <module>../mini-dubbo-client</module>

       <!--配置層,解析xml的工作,在本層完成-->
       <module>../mini-dubbo-core</module>

       <module>../mini-dubbo-common</module>

       <!--注冊中心層-->
       <module>../mini-dubbo-registry-layer</module>
       <!--集群層,完成事情:負載均衡策略,集群容錯策略等-->
       <module>../mini-dubbo-cluster-layer</module>
       <!--信息交換層,主要完成同步轉異步的操作,因為下層的mina和netty為異步,本層同步等待結果-->
       <module>../mini-dubbo-exchange-layer</module>

       <!--傳輸層如使用netty實現,則需包含如下module-->
       <module>../mini-dubbo-transport-layer-netty</module>
       <!--傳輸層如使用mina實現,則需包含如下module-->
       <module>../mini-dubbo-transport-layer-mina</module>
</modules>

其中,模塊間的依賴關系如下:

業務模塊,一般只需要依賴mini-dubbo-core模塊,mini-dubbo-core主要依賴了如下模塊:

為什么這么划分,因為mini-dubbo-core模塊,其實主要是完成解析業務模塊(比如client)中的xml,根據其xml配置,注冊對應的bean到spring 容器中,而具體的bean實現,就是放在各個模塊的,比如,xml里配置netty作為傳輸層實現,那么mini-dubbo-core就得解析為mini-dubbo-transport-layer-netty中的一個實現類作為bean,注冊到spring容器,供上層使用。

目前的分層,只是暫時的,后續可能會略有調整。

一次客戶端調用的大體思路

  1. 業務module中,配置xml,示例如下:

    <dubbo:registry address="redis://127.0.0.1:6379"/>
    
    <dubbo:reference id="gpsLocationUpdateService" interface="dubbo.learn.IGpsLocationUpdateService"/>
    
    <context:component-scan base-package="dubbo"></context:component-scan>
    

    其中的dubbo:reference就代表了一個遠端的服務,業務代碼中可以自動注入該接口,當調用該接口時,實際就會發起rpc調用。

    熟悉的同學已經知道了,這塊肯定是生成了一個動態代理。

  2. 繼續之前,我們看看dubbo的十層架構:

    可以看到,我們這邊是比dubbo少了幾層,首先proxy,目前直接用了jdk動態代理,沒有其他技術,所以就沒有抽出一層;然后monitor層,現在肯定是沒有的,這部分其實才是一個框架的重頭戲,但是我也不會前端,所以這塊估計暫時沒有;接下來是protocol層,我暫時不太清楚dubbo的設計,所以就沒弄這層。

  3. 知道了分層結構后,我們可以回到第一點,即動態代理那里,我們的動態代理,只依賴下層的接口。目前,各層之間的接口,放在mini-dubbo-common模塊中,定義如下:

    • 注冊中心層,負責接收上層傳來的調用參數等上下文,並返回結果

      /**
       * 注冊中心層的rpc調用者
       * 1:接收上層傳下來的業務參數,並返回結果
       *
       * 本層:會根據不同實現,去相應的注冊中心,獲取匹配的服務提供者列表,傳輸給下一層
       */
      public interface RegistryLayerRpcInvoker {
      
          Object invoke(RpcContext rpcContext);
      }
      
    • 集群層,接收上層注冊中心層傳來的服務提供者列表和rpc調用上下文,並返回最終結果

      public interface ClusterLayerRpcInvoker {
      
          /**
           * 由注冊中心層提供對應service的服務提供者列表,本方法可以根據負載均衡策略,進行篩選
           * @param providerList
           * @param rpcContext
           * @return
           */
          Object invoke(List<ProviderHostAndPort> providerList, RpcContext rpcContext);
      }
      
    • exchange層,上層集群層,會替我們選好某一台具體的服務提供者,然后讓我們去調用,本層完成同步轉異步

      public interface ExchangeLayerRpcInvoker {
      
          /**
           *
           * @param providerHostAndPort 要調用的服務提供者的地址
           * @param rpcContext   rpc上下文,包含了要調用的參數等
           * @return  rpc調用的結果
           */
          Object invoke(ProviderHostAndPort providerHostAndPort, RpcContext rpcContext);
      }
      
    • 傳輸層,本層目前有兩個簡單實現,netty和mina。

      /**
       *
       * 本層為傳輸層,上層為exchange層。
       * 上層exchange,目前有一個默認實現,主要是完成同步轉異步的操作。
       * 上層將具體的傳輸工作交給底層的傳輸層,比如netty和mina,然后在一個future上等待傳輸層完成工作
       *
       * 本層會完成實際的發送工作和接收返回響應的工作
       */
      public interface TransportLayerRpcInvoker {
      
          /**
           *
           * @param providerHostAndPort 要調用的服務提供者的地址
           * @param rpcContext   rpc上下文,包含了要調用的參數等
           * @return  rpc調用的結果
           */
          Object invoke(ProviderHostAndPort providerHostAndPort, RpcContext rpcContext);
      }
      

      其中,我們的最上邊的動態代理層,只依賴於下層,其中,示例代碼如下:

      
          @Override
          public Object invoke(Object proxy, Method method, Object[] args) {
              // 1.從spring容器中,獲取下層的實現bean;如果有多個,則根據spi文件中指定的為准
              RegistryLayerRpcInvoker registryLayerRpcInvoker =
                      SpiServiceLoader.loadService(RegistryLayerRpcInvoker.class);
      
              RpcContext rpcContext = new RpcContext();
              rpcContext.setProxy(proxy);
              rpcContext.setMethod(method);
              rpcContext.setArgs(args);
              rpcContext.setServiceName(method.getDeclaringClass().getName());
      	   // 2.調用下層
              Object o = registryLayerRpcInvoker.invoke(rpcContext);
              return o;
          }
      

      這里1處,可以看到,我們通過SpiServiceLoader.loadService(RegistryLayerRpcInvoker.class)去獲取具體的下層實現,這是我們自定義的一個工具類,其內部實現一會再說。

      2處調用下層實現,獲取結果。

  4. registry,注冊中心層的實現

    @Service
    public class RedisRegistryRpcInvoker implements RegistryLayerRpcInvoker {
    
        @Autowired
        private RedisRegistry redisRegistry;
    
    
        @Override
        public Object invoke(RpcContext rpcContext) {
            //1.獲取集群層實現
            ClusterLayerRpcInvoker clusterLayerRpcInvoker = SpiServiceLoader.loadService(ClusterLayerRpcInvoker.class);
            //2.從redis中,根據服務名,獲取服務提供者列表
            List<ProviderHostAndPort> list = redisRegistry.getServiceProviderList(rpcContext.getServiceName());
            if (CollectionUtils.isEmpty(list)) {
                throw new RuntimeException();
            }
    	    //2.調用集群層實現,獲取結果
            Object o = clusterLayerRpcInvoker.invoke(list, rpcContext);
            return o;
        }
    }
    
  5. 集群層實現,本層我也不算懂,模仿dubbo實現了一下。

    主要實現了以下兩種:

    • Failover,出現失敗,立即重試其他服務器。可以設置重試次數。
    • Failfast,請求失敗以后,返回異常結果,不進行重試。

    以failover為例:

    @Slf4j
    @Service
    public class FailoverClusterLayerRpcInvoker implements ClusterLayerRpcInvoker {
    
        @Autowired
        private LoadBalancePolicy loadBalancePolicy;
    
        @Override
        public Object invoke(List<ProviderHostAndPort> providerList, RpcContext rpcContext) {
            ExchangeLayerRpcInvoker exchangeLayerRpcInvoker =
                    SpiServiceLoader.loadService(ExchangeLayerRpcInvoker.class);
    
            int retryTimes = 3;
            for (int i = 0; i < retryTimes; i++) {
                // 1.根據負載均衡策略,選擇1台服務提供者
                ProviderHostAndPort providerHostAndPort = loadBalancePolicy.selectOne(providerList);
                try {
                    // 調用下層,獲取結果
                    Object o = exchangeLayerRpcInvoker.invoke(providerHostAndPort, rpcContext);
                    return o;
                } catch (Exception e) {
                    log.error("fail to invoke {},exception:{},will try another",
                            providerHostAndPort,e);
                    // 2.如果調用失敗,進入下一次循環
                    continue;
                }
            }
    
            throw new RuntimeException("fail times extend");
        }
    }
    

    其中,一共會嘗試3次,每次的邏輯:根據負載均衡策略,選擇1台去調用;如果有問題,則換一台。

    調用下層時,獲取了下層的接口:ExchangeLayerRpcInvoker

  6. exchange層,這層完成同步轉異步的操作,目前只有一個實現:

    @Service
    public class Sync2AsyncExchangeImpl implements ExchangeLayerRpcInvoker {
    
        public static ConcurrentHashMap<String, CompletableFuture<Object>> requestId2futureMap =
                new ConcurrentHashMap<>();
    
    
        @Override
        public Object invoke(ProviderHostAndPort providerHostAndPort, RpcContext rpcContext) {
            String requestId = UUID.randomUUID().toString();
            rpcContext.setRequestId(requestId);
            rpcContext.setRequestId2futureMap(requestId2futureMap);
    
            CompletableFuture<Object> completableFuture = new CompletableFuture<>();
            requestId2futureMap.put(requestId, completableFuture);
    
    
            /**
             * 交給具體的底層去解決
             */
            TransportLayerRpcInvoker  transportLayerRpcInvoker =
                    SpiServiceLoader.loadService(TransportLayerRpcInvoker .class);
    
            transportLayerRpcInvoker.invoke(providerHostAndPort, rpcContext);
    
            Object s = null;
            try {
                s = completableFuture.get();
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }
    
            return s;
        }
    }
    

    這層大家可以簡單理解為:主線程調用傳輸層之前,生成一個id和一個completablefuture,放到一個全局map,然后將id傳給下層,然后在completablefuture上阻塞;下層拿到id后,在消息里傳輸;服務端再將id傳輸回來,然后客戶端拿着id找到completablefuture,並喚醒主線程。

  7. 信息傳輸層,以netty為例,具體的netty相關的知識,大家就得自己先學習一下:

    簡單步驟如下:

    
    //1.初始化客戶端連接
    public void initChannel() {
        Bootstrap b = configBootStrap();
        ChannelFuture future = null;
        try {
            future = b.connect(providerHostAndPort.getHost(), providerHostAndPort.getPort()).sync();
            if (future.isSuccess()) {
                channel = future.channel();
                return;
            }
        } catch (InterruptedException e) {
            ...
        }
    
        throw new RuntimeException();
    }
    
    private Bootstrap configBootStrap() {
        EventLoopGroup group = new NioEventLoopGroup();
        Bootstrap b = new Bootstrap();
        b.group(group)
                .channel(NioSocketChannel.class)
                .option(ChannelOption.TCP_NODELAY, true)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    public void initChannel(SocketChannel ch) throws Exception {
                        ChannelPipeline p = ch.pipeline();
                        p.addLast("lengthFieldPrepender", new LengthFieldPrepender(2));
                        p.addLast("lengthFieldBasedFrameDecoder",
                                new LengthFieldBasedFrameDecoder(
                                        65536, 0,
                                        2, 0, 2));
                        p.addLast("decoder", new StringDecoder());
                        p.addLast("encoder", new StringEncoder());
                        p.addLast(new ClientHandler());
    
                    }//攔截器設置
                });
        return b;
    }
    

    使用連接的channle,發送數據:

    public void sendMessage(String messageContent) {
        synchronized (lockObj) {
            if (channel == null) {
                initChannel();
            }
        }
        ChannelFuture channelFuture = channel.writeAndFlush(messageContent);
        channelFuture.addListener(new GenericFutureListener<Future<? super Void>>() {
            @Override
            public void operationComplete(Future<? super Void> future) throws Exception {
                System.out.println("發送請求消息成功");
            }
        });
    }
    
  8. netty接收到服務端相應后,根據requestId來獲取future,喚醒上層線程

    @Slf4j
    public class ClientHandler extends ChannelInboundHandlerAdapter {
        @Override
        public void channelActive(ChannelHandlerContext cx) {
            log.info("channelActive,local address:{},remote address:{}",
                    cx.channel().localAddress(),cx.channel().remoteAddress());
        }
    
        /**
         * 讀取信息
         *
         * @param ctx 渠道連接對象
         * @param msg 信息
         * @throws Exception
         */
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            ResponseVO responseVO = JSONObject.parseObject((String) msg, ResponseVO.class);
            String requestId = responseVO.getRequestId();
    
            //1.獲取future
            CompletableFuture<Object> completableFuture = Netty4ClientRpcInvoker.requestId2futureMap
                    .get(requestId);
            //2.將結果塞進future,在此future上阻塞的線程被喚醒
            completableFuture.complete(responseVO.getContent());
            log.info("client channelRead,thread:{}", Thread.currentThread());
            log.info("客戶端端讀寫遠程地址是-----------"
                    + ctx.channel().remoteAddress() + "信息是:" + msg.toString());
    
        }
    }
    

如何根據spi進行切換

之前我們提到了可以根據spi,隨意切換實現,比如我們想使用mina來傳輸的話:

這里的spi的原理也很簡單:

dubbo.learn.common.spi.SpiServiceLoader#loadService
public static <T> T loadService(Class<T> clazz) {
    //先查找緩存
    Object cached = spiName2ServiceMap.get(clazz.getName());
    if (cached != null) {
        return (T) cached;
    }
	//2.從spring容器獲取該class的全部實現bean
    Map<String, T> map = applicationContext.getBeansOfType(clazz);
    if (CollectionUtils.isEmpty(map)) {
        return null;
    }
	
    if (map.size() == 1) {
        Object o = map.values().iterator().next();
        return clazz.cast(o);
    }
	//讀取spi文件,獲取用戶指定的實現
    String s = SpiParser.getSpiForSpecifiedService(clazz);
    if (StringUtils.isEmpty(s)) {
        log.error("發現多個服務實現bean:{},且在spi中未指定要使用的bean",map);
        throw new RuntimeException();
    }
	// 根據用戶spi中的實現,來返回相應的bean
    Object specifiedServiceInSpiFile = map.values().stream().filter(v -> Objects.equals(v.getClass().getName(), s))
            .findFirst().orElse(null);
    if (specifiedServiceInSpiFile == null) {
        log.error("spi中指定的服務在bean集合中未找到。" +
                "發現多個服務實現bean:{},在spi中指定的服務為:{}",map,s);
        throw new RuntimeException();
    }

    spiName2ServiceMap.put(clazz.getName(),specifiedServiceInSpiFile);
    return (T) specifiedServiceInSpiFile;
}

總結

里面細節比較多,最近工作比較忙,所以,大家可以先把代碼弄下來,直接自己運行下,依賴的就只有一個redis而已。

后續我會接着優化該框架,歡迎大家加進來,一起開發;如果覺得還不錯,就star一下吧。
源碼路徑:
https://gitee.com/ckl111/mini-dubbo


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM