一個輕量級分布式RPC框架--NettyRpc


1、背景

最近在搜索Netty和Zookeeper方面的文章時,看到了這篇文章《輕量級分布式 RPC 框架》,作者用Zookeeper、Netty和Spring寫了一個輕量級的分布式RPC框架。花了一些時間看了下他的代碼,寫的干凈簡單,寫的RPC框架可以算是一個簡易版的dubbo。這個RPC框架雖小,但是麻雀雖小,五臟俱全,有興趣的可以學習一下。

本人在這個簡易版的RPC上添加了如下特性:

  • 異步調用,支持Future機制,支持回調函數callback
  • 客戶端使用TCP長連接(在多次調用共享連接)
  • TCP心跳連接檢測
  • 服務端異步多線程處理RPC請求
  • 支持不同的序列化/反序列化

項目地址:https://github.com/luxiaoxun/NettyRpc

2、簡介

RPC,即 Remote Procedure Call(遠程過程調用),調用遠程計算機上的服務,就像調用本地服務一樣。RPC可以很好的解耦系統,如WebService就是一種基於Http協議的RPC。

這個RPC整體框架如下:

這個RPC框架使用的一些技術所解決的問題:

服務發布與訂閱:服務端使用Zookeeper注冊服務地址,客戶端從Zookeeper獲取可用的服務地址。

通信:使用Netty作為通信框架。

Spring:使用Spring配置服務,加載Bean,掃描注解。

動態代理:客戶端使用代理模式透明化服務調用。

消息編解碼:使用Protostuff序列化和反序列化消息。

3、服務端發布服務

使用注解標注要發布的服務

服務注解

@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Component
public @interface RpcService {
    Class<?> value();
}

一個服務接口:

public interface HelloService {
    String hello(String name);
    String hello(Person person);
}

一個服務實現:使用注解標注

@RpcService(HelloService.class)
public class HelloServiceImpl implements HelloService {
    @Override
    public String hello(String name) {
        return "Hello! " + name;
    }

    @Override
    public String hello(Person person) {
        return "Hello! " + person.getFirstName() + " " + person.getLastName();
    }
}

服務在啟動的時候掃描得到所有的服務接口及其實現:

@Override
    public void setApplicationContext(ApplicationContext ctx) throws BeansException {
        Map<String, Object> serviceBeanMap = ctx.getBeansWithAnnotation(RpcService.class);
        if (MapUtils.isNotEmpty(serviceBeanMap)) {
            for (Object serviceBean : serviceBeanMap.values()) {
                String interfaceName = serviceBean.getClass().getAnnotation(RpcService.class).value().getName();
                handlerMap.put(interfaceName, serviceBean);
            }
        }
    }

在Zookeeper集群上注冊服務地址:

    private void createNode(ZooKeeper zk, String data) {
        try {
            byte[] bytes = data.getBytes();
            //Must be a EPHEMERAL node
            String path = zk.create(Constant.ZK_DATA_PATH, bytes, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
            logger.debug("Create zookeeper node ({} => {})", path, data);
            logger.info("Registry new service: " + data);
        } catch (KeeperException e) {
            logger.error(e.toString());
        } catch (InterruptedException ex) {
            logger.error(ex.toString());
        }
    }

這里在原文的基礎上加了AddRootNode()判斷服務父節點是否存在,如果不存在則添加一個PERSISTENT的服務父節點,這樣雖然啟動服務時多了點判斷,但是不需要手動命令添加服務父節點了。

關於Zookeeper的使用原理,可以看這里《ZooKeeper基本原理》。

4、客戶端調用服務

使用代理模式調用服務:

    @SuppressWarnings("unchecked")
    public static <T> T createService(Class<T> interfaceClass) {
        return (T) Proxy.newProxyInstance(
                interfaceClass.getClassLoader(),
                new Class<?>[]{interfaceClass},
                new ObjectProxy<T>(interfaceClass)
        );
    }

    public static <T> RpcService createAsyncService(Class<T> interfaceClass) {
        return new ObjectProxy<T>(interfaceClass);
    }

    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        if (Object.class == method.getDeclaringClass()) {
            String name = method.getName();
            if ("equals".equals(name)) {
                return proxy == args[0];
            } else if ("hashCode".equals(name)) {
                return System.identityHashCode(proxy);
            } else if ("toString".equals(name)) {
                return proxy.getClass().getName() + "@" +
                        Integer.toHexString(System.identityHashCode(proxy)) +
                        ", with InvocationHandler " + this;
            } else {
                throw new IllegalStateException(String.valueOf(method));
            }
        }

        RpcRequest request = new RpcRequest();
        request.setRequestId(UUID.randomUUID().toString());
        request.setClassName(method.getDeclaringClass().getName());
        request.setMethodName(method.getName());
        request.setParameterTypes(method.getParameterTypes());
        request.setParameters(args);
        // Debug
        logger.debug(method.getDeclaringClass().getName());
        logger.debug(method.getName());
        for (int i = 0; i < method.getParameterTypes().length; ++i) {
            logger.debug(method.getParameterTypes()[i].getName());
        }
        for (int i = 0; i < args.length; ++i) {
            logger.debug(args[i].toString());
        }

        RpcClientHandler handler = ConnectManage.getInstance().chooseHandler();
        RpcFuture rpcFuture = handler.sendRequest(request);
        return rpcFuture.get();
    }

這里每次使用代理遠程調用服務,從Zookeeper上獲取可用的服務地址,通過RpcClient send一個Request,等待該Request的Response返回。

這里原文有個比較嚴重的bug,在原文給出的簡單的Test中是很難測出來的,原文使用了obj的wait和notifyAll來等待Response返回,會出現“假死等待”的情況:一個Request發送出去后,在obj.wait()調用之前可能Response就返回了,這時候在channelRead0里已經拿到了Response並且obj.notifyAll()已經在obj.wait()之前調用了,這時候send后再obj.wait()就出現了假死等待,客戶端就一直等待在這里。使用CountDownLatch可以解決這個問題。

注意:這里每次調用的send時候才去和服務端建立連接,使用的是短連接,這種短連接在高並發時會有連接數問題,也會影響性能,優化后使用TCP長連接進行通信。

從Zookeeper上獲取服務地址:

    private void watchNode(final ZooKeeper zk) {
        try {
            List<String> nodeList = zk.getChildren(Constant.ZK_REGISTRY_PATH, new Watcher() {
                @Override
                public void process(WatchedEvent event) {
                    if (event.getType() == Event.EventType.NodeChildrenChanged) {
                        watchNode(zk);
                    }
                }
            });
            List<String> dataList = new ArrayList<>();
            for (String node : nodeList) {
                byte[] bytes = zk.getData(Constant.ZK_REGISTRY_PATH + "/" + node, false, null);
                dataList.add(new String(bytes));
            }
            logger.debug("Node data: {}", dataList);
            this.dataList = dataList;

            logger.debug("Service discovery triggered updating connected server node.");
            //Update the service info based on the latest data
            UpdateConnectedServer();

        } catch (KeeperException | InterruptedException e) {
            logger.error("", e);
        }
    }

每次服務地址節點發生變化,都需要再次watchNode,獲取新的服務地址列表。

5、消息編碼

請求消息:RpcRequest

響應消息:RpcResponse

消息序列化和反序列化接口:Serializer

由於處理的是TCP消息,本人加了TCP的粘包處理Handler

channel.pipeline().addLast(new LengthFieldBasedFrameDecoder(65536,0,4,0,0))

消息編解碼時開始4個字節表示消息的長度,也就是消息編碼的時候,先寫消息的長度,再寫消息。

6、性能改進

1)服務端請求異步處理

Netty本身就是一個高性能的網絡框架,從網絡IO方面來說並沒有太大的問題。

從這個RPC框架本身來說,在原文的基礎上把Server端處理請求的過程改成了多線程異步:

    public void channelRead0(final ChannelHandlerContext ctx, final RpcRequest request) {
        // filter beat ping
        if (Beat.BEAT_ID.equalsIgnoreCase(request.getRequestId())) {
            logger.info("Server read beat-ping.");
            return;
        }

        serverHandlerPool.execute(new Runnable() {
            @Override
            public void run() {
                logger.info("Receive request " + request.getRequestId());
                RpcResponse response = new RpcResponse();
                response.setRequestId(request.getRequestId());
                try {
                    Object result = handle(request);
                    response.setResult(result);
                } catch (Throwable t) {
                    response.setError(t.toString());
                    logger.error("RPC Server handle request error", t);
                }
                ctx.writeAndFlush(response).addListener(new ChannelFutureListener() {
                    @Override
                    public void operationComplete(ChannelFuture channelFuture) throws Exception {
                        logger.info("Send response for request " + request.getRequestId());
                    }
                });
            }
        });
    }

Netty 4中的Handler處理在IO線程中,如果Handler處理中有耗時的操作(如數據庫相關),會讓IO線程等待,影響性能。

2)服務端長連接的管理

客戶端保持和服務進行長連接,不需要每次調用服務的時候進行連接,長連接的管理(通過Zookeeper獲取有效的地址)。

通過監聽Zookeeper服務節點值的變化,動態更新客戶端和服務端保持的長連接。

這個事情現在放在客戶端在做,客戶端保持了和所有可用服務的長連接,給客戶端和服務端都造成了壓力,需要解耦這個實現。

3)客戶端請求異步處理

客戶端請求異步處理的支持,不需要同步等待:發送一個異步請求,返回Feature,通過Feature的callback機制獲取結果。

RpcService client = rpcClient.createAsyncService(HelloService.class);
RpcFuture helloFuture = client.call("hello", Integer.toString(i));
String result = (String) helloFuture.get(3000, TimeUnit.MILLISECONDS);

項目持續更新中。

項目地址:https://github.com/luxiaoxun/NettyRpc

 

參考:

輕量級分布式 RPC 框架:http://my.oschina.net/huangyong/blog/361751

你應該知道的RPC原理:http://www.cnblogs.com/LBSer/p/4853234.html

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM