前言
RPC 框架是后端攻城獅永遠都繞不開的知識點,目前業界比較知名有 Dubbo、Spring Cloud 等。很多人都停留在了只會用的階段,作為程序猿,擁有好奇心深入學習,才能有效提高自己的競爭力。再進一層的同學,會去翻源碼,看功能是如何實現的,這是很好的開始。看源碼過一段時間容易忘記,我覺得看完源碼之后,更好的做法是自己動手開發一個出來,這樣你對框架的理解會更深。我認為,"會用"、"會讀源碼"、"會寫出來"是完全不一樣的水平。
本系列 "造輪子系列之RPC",手把手教大家如何打造自己的RPC框架。
以下是我個人寫的簡單版 RPC 框架 ccx-rpc 的源碼,歡迎 Star、Fork。水平有限,大家有更好的想法可以提出來。
Github:https://github.com/chenchuxin/ccx-rpc
Gitee:https://gitee.com/imccx/ccx-rpc
RPC 框架的結構
一個最簡單的 RPC 框架分成三個部分:注冊中心、服務端、客戶端。以下是一個最簡單的結構流程圖。

組成部分:
- 注冊中心:用於注冊和獲取服務。
- 服務端:指提供服務的一方,也叫服務提供方
Provider - 客戶端:指調用服務的一方,也叫服務消費者
Consumer
流程:
- 服務端把服務信息注冊到注冊中心,通常包含服務端地址、接口類和方法
- 客戶端從注冊中心獲取對應服務的信息
- 客戶端根據服務的信息,通過網絡調用到服務端的接口
RPC 框架的設計
上面的流程有很多細節沒有畫出來,例如:
- 服務端以什么形式注冊到注冊中心?
- 客戶端是怎么做到像調用接口一樣調用服務?
- 調用服務的網絡協議是怎樣的?
一個基本的 RPC 框架,需要包含以下部分:
- 注冊中心:注冊中心負責服務信息的注冊與查找。服務端在啟動的時候,掃描所有的服務,然后將自己的服務地址和服務名注冊到注冊中心。客戶端在調用服務之前,通過注冊中心查找到服務的地址,就可以通過服務的地址調用到服務啦。常見的注冊中心有
Zookeeper、Eureka等。 - 動態代理:客戶端調用接口,需要框架能自己根據接口去遠程調用服務,這一步是用戶無感知的。這樣一來,就需要使用到動態代理,用戶調用接口,實際上是在調用動態生成的代理類。常見的動態代理有:
JDK Proxy,CGLib,Javassist等。 - 網絡傳輸:RPC 遠程調用實際上就是網絡傳輸,所以網絡傳輸是 RPC 框架中必不可少的部分。網絡框架有
Java NIO、Netty框架等。 - 自定義協議:網絡傳輸需要制定好協議,一個良好的協議能提高傳輸的效率。
- 序列化:網絡傳輸肯定會涉及到序列化,常見的序列化有
Json、Protostuff、Kyro等。 - 負載均衡:當請求調用量大的時候,需要增加服務端的數量,一旦增加,就會涉及到符合選擇服務的問題,這就是負載均衡。常見的負載均衡策略有:輪詢、隨機、加權輪詢、加權隨機、一致性哈希等等。
- 集群容錯:當請求服務異常的時候,我們是應該直接報錯呢?還是重試?還是請求其他服務?這個就是集群容錯策略啦。
代碼實現概覽
下面我們從代碼的角度上,來看看以上幾部分是如何組織的:
服務注冊、監聽
1. 掃描服務
服務要注冊到注冊中心,第一步是需要掃描到需要注冊的接口。
我們通過 Spring 的 BeanPostProcessor#postProcessBeforeInitialization,將帶有 @RpcService 注解的接口進行發布。
@Component
public class ServiceBeanPostProcessor implements BeanPostProcessor {
@Override
public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
RpcService rpcService = bean.getClass().getAnnotation(RpcService.class);
// rpc 服務發布到注冊中心
if (rpcService != null) {
RegistryFactory registryFactory = ExtensionLoader.getLoader(RegistryFactory.class).getAdaptiveExtension();
RegistryConfig registryConfig = ConfigManager.getInstant().getRegistryConfig();
Registry registry = registryFactory.getRegistry(registryConfig.toURL());
registry.register(buildServiceURL(bean, rpcService));
}
return bean;
}
}
2. 注冊中心
服務注冊最終的表現就是:把服務信息注冊到注冊中心中。
根據注冊中心的特性,可以抽出一個接口 Registry ,包含了注冊、取消注冊、查找服務的方法。
通過實現 Registry 接口,可以擴展出多種類型的注冊中心。
public interface Registry {
/**
* 向注冊中心注冊服務
*/
void register(URL url);
/**
* 向注冊中心取消注冊服務
*/
void unregister(URL url);
/**
* 查找注冊的服務
*/
List<URL> lookup(URL condition);
}
3. 監聽
RPC 的請求響應本質上是網絡請求,作為服務方,需要開啟端口監聽客戶端的請求。
Netty 是目前最流行的網絡開發框架。
@Component
public class NettyServerBootstrap {
public void start() {
ShutdownHook.addShutdownHook();
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
DefaultEventExecutorGroup serviceHandlerGroup = new DefaultEventExecutorGroup(
RuntimeUtil.getProcessorCount() * 2,
ThreadUtil.newNamedThreadFactory("service-handler-group", false)
);
try {
ServerBootstrap bootstrap = new ServerBootstrap()
.group(bossGroup, workerGroup)
.channel(Epoll.isAvailable() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ChannelPipeline p = ch.pipeline();
// 編解碼器
p.addLast(new RpcMessageEncoder());
p.addLast(new RpcMessageDecoder());
// RPC 消息處理器
p.addLast(serviceHandlerGroup, new NettyServerHandler());
}
});
// 綁定端口,同步等待綁定成功
ServiceConfig serviceConfig = ConfigManager.getInstant().getServiceConfig();
ChannelFuture channelFuture = bootstrap.bind(NetUtil.getLocalHostName(), serviceConfig.getPort()).sync();
log.info("server start success. port=" + serviceConfig.getPort());
// 等待服務端監聽端口關閉
channelFuture.channel().closeFuture().sync();
} catch (Exception ex) {
log.error("shutdown bossGroup and workerGroup");
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
客戶端發現、請求
1. 掃描
客戶端要是用 RPC 接口,首先要用 @RpcReference 注解標出。
通過 Spring 的 BeanPostProcessor#postProcessAfterInitialization 初始化 Bean 之后,生成代理類。
調用接口的時候,這個代理類,就會在背地里偷偷找到服務,並請求到結果返回。
public class ServiceBeanPostProcessor implements BeanPostProcessor {
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
Field[] fields = bean.getClass().getDeclaredFields();
for (Field field : fields) {
RpcReference rpcReference = field.getAnnotation(RpcReference.class);
if (rpcReference != null) {
// 生成代理對象
RpcClientProxy rpcClientProxy = new RpcClientProxy(rpcReference);
Object proxy = rpcClientProxy.getProxy(field.getType());
field.setAccessible(true);
try {
// 設置字段
field.set(bean, proxy);
} catch (IllegalAccessException e) {
log.error("field.set error. bean={}, field={}", bean.getClass(), field.getName(), e);
}
}
}
return bean;
}
}
2. 服務發現
客戶端要請求服務,首先需要找到服務對應的域名/IP 和 端口,這個過程就是服務發現。
服務發現就是從注冊中心找到對應服務的地址,上面注冊中心的接口有提供對應的方法。
public interface Registry {
// ... 省略其他代碼
/**
* 查找注冊的服務
*/
List<URL> lookup(URL condition);
}
3. 負載均衡
從注冊中心找到的地址可能是多個,那我們如何從多個地址中選擇一個地址,這就是負載均衡。
負載均衡抽象出一個接口 LoadBalance ,方法只有一個,就是選擇 select。
public interface LoadBalance {
/**
* 選擇
*
* @param candidateUrls 候選的 URL
* @param request 請求
* @return 選擇的 URL
*/
URL select(List<URL> candidateUrls, RpcRequest request);
}
使用方法如下:
// 注冊中心拿出所有服務的信息
List<URL> urls = registry.lookup(url);
// 通過負載均衡選出一個地址
URL selected = loadBalance.select(urls, request);
4. 集群容錯
當請求服務失敗之后,應該如何處理?重試?快速失敗?這個就是集群容錯策略啦。我們來簡單看一下重試策略吧。
public class RetryInvoker extends AbstractFaultTolerantInvoker {
/**
* 默認重試次數
*/
private static final Integer DEFAULT_RETRY_TIMES = 3;
@Override
protected RpcResult doInvoke(RpcRequest request, Invoker invoker, List<URL> candidateUrls, LoadBalance loadBalance) throws RpcException {
// 獲取重試次數
int retryTimes = Optional.ofNullable(clusterConfig.getRetryTimes()).orElse(DEFAULT_RETRY_TIMES);
RpcException rpcException = null;
for (int i = 0; i < retryTimes; i++) {
try {
// 執行,如果成功則返回結果,失敗繼續嘗試
RpcResult result = invoker.invoke(request);
if (result.isSuccess()) {
return result;
}
} catch (RpcException ex) {
log.error("invoke error. retry times=" + i, ex);
rpcException = ex;
}
}
if (rpcException == null) {
rpcException = new RpcException("invoker error. request=" + request);
}
throw rpcException;
}
}
網絡傳輸
1. 序列化
網絡傳輸不可獲取的就是序列化,序列化就是怎么把一個對象的狀態信息轉化為可以存儲或傳輸的形式的過程。我們常見的序列化方式有JSON、Protobuf等等。
序列化和反序列化是一對,共同組成序列化器。
public interface Serializer {
/**
* 序列化
*
* @param object 要序列化的對象
* @return 字節數組
*/
byte[] serialize(Object object);
/**
* 反序列化
*
* @param bytes 字節數組
* @param clazz 要反序列化的類
* @param <T> 類型
* @return 反序列化的對象
*/
<T> T deserialize(byte[] bytes, Class<T> clazz);
}
2. 自定義協議
網絡傳輸中,收發兩端如何正確解析請求,統一的協議是必不可少的。
在 Netty 中的表現就是編碼解碼器 codec。下面是 ccx-rpc 的自定義協議。可以簡單看一下,后面再仔細講解哈。
0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
+-----+-----+-------+----+----+----+----+-----------+---------+--------+----+----+----+----+----+----+----+---+
| magic |version| full length |messageType|serialize|compress| RequestId |
+-----+-----+-------+----+----+----+----+-----------+----- ---+--------+----+----+----+----+----+----+----+---+
| |
| body |
| |
| ... ... |
+-------------------------------------------------------------------------------------------------------------+
2B magic(魔法數)
1B version(版本)
4B full length(消息長度)
1B messageType(消息類型)
1B serialize(序列化類型)
1B compress(壓縮類型)
8B requestId(請求的Id)
body(object類型數據)
總結
RPC 的組成包括: 注冊中心、動態代理、網絡傳輸、自定義協議、序列化、負載均衡、集群容錯等等。
想要深入了解,先要知道他們是怎么組合運作的,其簡單的運作都在上面提到了。
更加細節的代碼實現,將會在接下來的文章中詳細講解,希望各位賞臉一看。
