基於netty4,protostuff的出於學習目的的RPC框架,后續會完善功能。
背景
做微服務有不短時間了,單純RPC框架呢生產環境上之前主要使用dubbo,出於學習了解過Spring Cloud以及其它的比如Finagle,grpc,thrift。看過dubbo部分源碼,了解過RPC的基本原理,但不系統。
寫一個類似dubbo的有多難
猛的一看dubbo源碼的工程構建的話,代碼量不少,工程大大小小估計有十幾二十個,一時不知從何處下手,如果不反復debug源碼的話一時半會想了解清楚還是有一定難度的。如何面對一個龐大的工程呢?我的辦法就是化繁為簡,先看最為核心的。
如果給你看這個圖你不一定短時間知道它的內部結構。
如果給你看這張圖呢?會不會輕松很多?
那么根據RPC的基本原理,首先就只觀注如下圖的幾個部分就可以了:
- 遠程通信,dubbo支持多種遠程通信協議,先只看netty
- 編碼,遠程通信時需要將信息進行編碼以及解碼,這里采用protostuff
- 客戶端代理,RPC的主要特點就是將遠程調用本地化,實現這一目標的手段就是動態代理
以上這些,就可以實現一個基本的RPC調用了,但dubbo為什么有那么多功能呢,因為:
- 支持了強大的SPI插件機制,讓用戶方便的擴展原有功能
- 提供了多個已經實現的擴展功能,比如遠程通信除了netty還有mina,注冊中心除了推薦的ZK還是redis等
- 在服務治理上下了很大的功夫,比如服務注冊與發現,限流,多線程等
綜上所看實現一個簡易的RPC並不難,難在對於功能的抽象,擴展點的支持,性能優化等方面上。為了系統的了解這些優秀RPC框架,我想按自己的思路實現以此驗證實現一個RPC到底有多難。我並沒有完全從零開始設計然后編碼,因為我喜歡參考一些開源的項目,因為我相信有我這類想法的人一定不在少數,造輪子玩的事很多人都喜歡。
項目參考
主要思路來源於如下兩個項目,其中第一個項目是原始版本,第二個版本是另外的維護版本。我按我自己的需求進一步修改成自己想要的結果。
另外主要參考dubbo源碼
變更
出於我個人的理解做了不同程序的調整以及增強。
增加客戶端引用服務的注解
這里只是為了簡單,只實現通過注解方式獲取遠程接口的實例。先定義引用注解,isSync屬性是為了支持同步接口以及異步接口,默認為同步接口。
public @interface RpcReference {
boolean isSync() default true;
}
客戶端引用實例使用:
@RpcReference
private ProductService productService;
原理是通過BeanPostProcessor接口來實現注解字段的注入:
public class BeanPostPrcessorReference implements BeanPostProcessor {
//...
@Override
public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
//...
Field[] fields = clazz.getDeclaredFields();
for (Field field : fields) {
try {
if (! field.isAccessible()) {
field.setAccessible(true);
}
RpcReference reference = field.getAnnotation(RpcReference.class);
if (reference != null) {
Object value=this.rpcClient.createProxy(field.getType(),reference.isSync());
if (value != null) {
field.set(bean, value);
}
}
} catch (Exception e) {
throw new BeanInitializationException("Failed to init remote service reference at filed " + field.getName() + " in class " + bean.getClass().getName(), e);
}
}
return bean;
}
}
增加服務注解的功能
版本1項目的RpcService需要指定唯一的遠程接口,感覺有限制,修改為支持多接口的遠程服務。
@RpcService
public class ProductServiceImpl implements ProductService,CommentService
重構代碼
- 針對版本1同步調用存在的問題,參考了dubbo的思路,版本1在客戶端獲取結果時有這么段代碼,它的問題在於channelRead0方法的執行有可能出現在obj.wait()方法之前,這樣有可能造成客戶端永遠獲取不到預期的結果。
@Override
public void channelRead0(ChannelHandlerContext ctx, RpcResponse response) throws Exception {
this.response = response;
synchronized (obj) {
obj.notifyAll();
}
}
public RpcResponse send(RpcRequest request) throws Exception {
EventLoopGroup group = new NioEventLoopGroup();
try {
//....
ChannelFuture future = bootstrap.connect(host, port).sync();
future.channel().writeAndFlush(request).sync();
synchronized (obj) {
obj.wait();
}
if (response != null) {
future.channel().closeFuture().sync();
}
return response;
} finally {
group.shutdownGracefully();
}
}
修改后的版本,核心就是參考dubbo的做法返回一個ResponseFuture,在遠程方法回調時更新真實的返回值,最后通過get()阻塞方法獲取結果。由於我對原方案變更比較多就不貼代碼了,有興趣的可以看這:https://github.com/jiangmin168168/jim-framework
- 針對版本2在同步獲取服務端返回結果采用AbstractQueuedSynchronizer,感覺有些復雜,采用Lock替代,ReponseFuture獲取結果的邏輯:
private ReentrantLock lock = new ReentrantLock();
private Condition doneCondition=lock.newCondition();
public Object get(long timeout, TimeUnit unit) {
long start = System.currentTimeMillis();
if (!this.isDone()) {
this.lock.lock();
try{
while (!this.isDone()) {
this.doneCondition.await(2000,TimeUnit.MICROSECONDS);
if(System.currentTimeMillis()-start>timeout){
break;
}
}
}
catch (InterruptedException ex){
throw new RpcException(ex);
}
finally {
this.lock.unlock();
}
}
return this.getResultFromResponse();
}
- 針對程序關閉時資源的回收,參考了dubbo的思路,采用addShutdownHook注冊回收函數
- 增加了filter機制,這個功能對RPC是非常重要的,比如日志,異常,權限等通用功能的動態植入。
定義filter接口
public interface RpcFilter<T> {
<T> T invoke(RpcInvoker invoker, RpcInvocation invocation);
}
定義filter注解
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Component
public @interface ActiveFilter {
String[] group() default {};
String[] value() default {};
}
server invoker
public class RpcServerInvoker extends AbstractInvoker<RpcRequest> {
private final Map<String, Object> handlerMap;
public RpcServerInvoker(Map<String, Object> handlerMap, Map<String,RpcFilter> filterMap) {
super(handlerMap,filterMap);
this.handlerMap=handlerMap;
}
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, RpcRequest rpcRequest) {
//...
}
@Override
public RpcResponse invoke(RpcInvocation invocation) {
//...
}
}
AbstractInvoker構造函數中的filterMap是通過下面方式注入。
public class RpcServerInitializer extends ChannelInitializer<SocketChannel> implements ApplicationContextAware {
//...
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
Map<String, Object> rpcFilterMap = applicationContext.getBeansWithAnnotation(ActiveFilter.class);
if (null!=rpcFilterMap) {
for (Object filterBean : rpcFilterMap.values()) {
Class<?>[] interfaces = filterBean.getClass().getInterfaces();
ActiveFilter activeFilter=filterBean.getClass().getAnnotation(ActiveFilter.class);
if(null!=activeFilter.group()&& Arrays.stream(activeFilter.group()).filter(p->p.contains(ConstantConfig.PROVIDER)).count()==0){
continue;
}
for(Class<?> clazz:interfaces) {
if(clazz.isAssignableFrom(RpcFilter.class)){
this.filterMap.put(filterBean.getClass().getName(),(RpcFilter) filterBean);
}
}
}
}
}
}
AbstractInvoker,主要有ServerInvoker以及ClientInvoker兩個子類,兩個子類分別獲取不同作用域的Filter然后構建Filter執行鏈。
public abstract class AbstractInvoker<T> extends SimpleChannelInboundHandler<T> implements RpcInvoker {
private final Map<String, Object> handlerMap;
private final Map<String,RpcFilter> filterMap;
protected AbstractInvoker(Map<String, Object> handlerMap, Map<String,RpcFilter> filterMap){
this.handlerMap = handlerMap;
this.filterMap=filterMap;
}
public RpcInvocation buildRpcInvocation(RpcRequest request){
//...
}
public RpcInvoker buildInvokerChain(final RpcInvoker invoker) {
RpcInvoker last = invoker;
List<RpcFilter> filters = Lists.newArrayList(this.filterMap.values());
if (filters.size() > 0) {
for (int i = filters.size() - 1; i >= 0; i --) {
final RpcFilter filter = filters.get(i);
final RpcInvoker next = last;
last = new RpcInvoker() {
@Override
public Object invoke(RpcInvocation invocation) {
return filter.invoke(next, invocation);
}
};
}
}
return last;
}
protected abstract void channelRead0(ChannelHandlerContext channelHandlerContext, T t);
public abstract Object invoke(RpcInvocation invocation);
}
構建filter鏈,入口點是在構建代理的邏輯中。
public class RpcProxy <T> implements InvocationHandler {
//...
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
//...
RpcInvoker rpcInvoker=invoker.buildInvokerChain(invoker);
ResponseFuture response=(ResponseFuture) rpcInvoker.invoke(invoker.buildRpcInvocation(request));
if(isSync){
return response.get();
}
else {
RpcContext.getContext().setResponseFuture(response);
return null;
}
}
}
- 版本2的異步實現有點奇怪,感覺調用方式不RPC(特別是call接口,需要以字符串形式描述調用的方法)
IAsyncObjectProxy client = rpcClient.createAsync(HelloService.class);
RPCFuture helloFuture = client.call("hello", Integer.toString(i));
String result = (String) helloFuture.get(3000, TimeUnit.MILLISECONDS);
通過dubbo的方式的版本:創建代理的邏輯中根據是否同步來返回不同的值,如果是同步那么調用阻塞方法獲取實時返回的值,如果是異步直接返回null,同時將ResponseFuture放入RpcContext這個上下文變量中。
RpcInvoker rpcInvoker=invoker.buildInvokerChain(invoker);
ResponseFuture response=(ResponseFuture) rpcInvoker.invoke(invoker.buildRpcInvocation(request));
if(isSync){
return response.get();
}
else {
RpcContext.getContext().setResponseFuture(response);
return null;
}
獲取異步接口:
@RpcReference(isSync = false)
private ProductService productServiceAsync;
異步方法調用,獲取結果時需要從RpcContext中獲取,感覺調用有點復雜,特別是需要通過RpcContext來獲取,后續有更好的方案再更新。
Product responseFuture= this.productServiceAsync.getById(productId);
if(null==responseFuture){
System.out.println("async call result:product is null");
Product responseFutureResult= (Product) RpcContext.getContext().getResponseFuture().get();
if(null!=responseFutureResult){
System.out.println("async call result:"+responseFutureResult.getId());
}
}
- 調整了目錄結構以及類名
根據自己的理解,重命令了一些類名,也調整了一些目錄結構。
至此,RPC擁有了遠程通信,序列化,同步異步調用,客戶端代理,Filter等常用功能。所依賴的包也有限,要想完善RPC無非是做加法以及優化。盡管不能寫也一個超過dubbo的項目,但至少可以用自己的思路去模仿,並不是那么的不可想象。
未來添加的功能
- 服務注冊發現
- 限流/熔斷
- 服務版本
- 客戶端多線程
- ......