背景
上篇文章我簡單的介紹了自己打造的通俗簡版RPC通信框架,這篇是對簡版的增強~
如果大家對此項目還感興趣的話,可到碼雲上瞄瞄:Netty-RPC
上篇文章鏈接:《SpringBoot2+Netty打造通俗簡版RPC通信框架》
在介紹后續新增功能前,我們先回顧一下最簡單的RPC通信的流程,流程圖如下:
我們可以看到其實整個流程其實是非常的簡單的:客戶端接收前端發送的請求,封裝好請求Packet根據配置打開Netty通道進行通信,服務端接收請求Packet,解析並且根據請求信息,反射獲取實現類調用方法,得到結果並封裝好響應Packet然后返回結果給客戶端,客戶端獲取結果響應給前端。
新增功能
因為是模仿Dubbo造RPC通信框架,那么Dubbo基本的功能我們當然也必須得有啦,下面列出的是我后續新增的優化:
- 單一長連接
- 服務注解,並且帶版本號
- 增加注冊中心
- 處理Netty客戶端或服務端主動斷開異常
- 業務處理使用自定義業務線程池
詳細的代碼大家可到我的碼雲上閱讀。
詳細介紹
首先,我們先看一下帶注冊中心后的流程圖:
我們可以看到多了個Zookeeper作為注冊中心,然后就到了監聽緩存列表,和服務緩存列表。大家不理解這兩個列表不重要,下面我將繼續詳細的介紹一下我做這些功能的思路。
1、單一長連接:
首先上一下簡單的流程圖:
思路非常的簡單:就是使用內存緩存緩存起來,結構就是Map,key為IP:Port value為channel。
在沒有注冊中心時,服務端的IP和PORT是寫在配置文件里的,我們直接獲取IP信息,並且判斷在【Channel緩存列表】是否有此IP對應緩存起來的的Channel,有則獲取直接進行RPC通信,否則創建新Channel進行RPC通信,記得還要緩存起來。
而在有注冊中心時,我們需要在請求Packet中獲取需要提供的服務名,然后根據服務名在【服務緩存列表】獲取所有提供此服務的所有應用IP,然后就是判斷IP列表在【Channel緩存列表】中是否有緩存的Channel,有則獲取並直接進行RPC通信;
否則,遍歷應用IP列表,直到創建新Channel通信並且連接成功,然后將Channel緩存起來。最后,進行RPC通信然后等待獲取Result即可。
2、服務注解:
我們知道需要進行RPC通信的接口都會加上@NettyRPC注解,然后在服務端這,每次都是使用Reflections框架掃描出指定路徑下的所有類,再判斷是否有服務的實現類,有則利用反射進行方法調用。這聽起來可是相當浪費時間,哈哈。
這時候,我們可以利用Spring框架來去除掉這個掃描的動作。首先,我們提供一個@NettyRpcService注解,來標識所有RPC服務的實現類。然后我們創建一個類,實現接口ApplicationContextAware。然后我們可以利用ApplicationContext的getBeansWithAnnotation方法類根據指定注解獲取Bean,我們這里當然是指定前面的@NettyRPCService注解了,然后我們利用內存緩存來緩存起來【提供RPC服務的實現類列表】,結構為Map:key為接口名+版本號,value為bean實例。那么之后的大家應該就懂了,我們再也不必每次都使用Reflections框架了。
package com.hyf.rpc.netty.server.config;
import com.hyf.rpc.netty.anno.NettyRPCService;
import com.hyf.rpc.netty.properties.NettyProperties;
import com.hyf.rpc.netty.server.NettyServer;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.Map;
/**
* @author Howinfun
* @desc Netty服務提供者啟動&掃面存儲提供服務的實現類
* @date 2019/7/18
*/
@Component
public class NettyServerInitConfig implements ApplicationListener<ContextRefreshedEvent>, ApplicationContextAware {
/** 提供RPC服務的實現類 key為接口名+版本號,value為bean實例*/
public static final Map<String,Object> beanMap = new HashMap<>(10);
@Autowired
private NettyServer nettyServer;
@Autowired
private NettyProperties nettyProperties;
/**
* 當ApplicationContext初始或刷新完畢觸發
* @param event
*/
@Override
public void onApplicationEvent(ContextRefreshedEvent event) {
if (nettyProperties.getServerPort() != null){
nettyServer.start();
}
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
// 遍歷帶有NettyRPCService注釋的服務實現類
Map<String,Object> beans = applicationContext.getBeansWithAnnotation(NettyRPCService.class);
if (beans != null && beans.size() > 0) {
for (Object serviceBean : beans.values()) {
String interfaceName = serviceBean.getClass().getAnnotation(NettyRPCService.class).value().getName();
String version = serviceBean.getClass().getAnnotation(NettyRPCService.class).version();
beanMap.put(interfaceName+version, serviceBean);
}
}
System.out.println(beanMap.toString());
}
}
3、注冊中心:
這個上一下簡單的流程圖先:
順便看一下Zookeeper的數據結構:
我這里使用的是Zookeeper作為注冊中心。首先大家得自己安裝一個Zookeeper服務。我們做注冊中心是利用Zookeeper的監聽事件,當然了,Zookeeper原生的監聽事件是利用Watcher,而且是一次性的,所以不用。我會使用第三方框架Curator。Curator引入了Cache來實現對Zookeeper服務端事件監聽,Cache事件監聽可以理解為一個本地緩存視圖與遠程Zookeeper視圖的對比過程。Cache提供了反復注冊的功能。Cache分為兩類注冊類型:節點監聽和子節點監聽。
首先是服務端:在zookeeper的/root下創建名為配置文件中namespace(例如rpc-server)的節點,然后我們可在上面的掃描帶@NettyRPCService實現類緩存起來的同時,每一個實現類就往/root/namespace下創建一個節點,節點名為@NettyRPCService的value().getName+version(),節點內容為IPPojo(ip+提供netty服務端口號)的Json字符串。
再來客戶端:客戶端主要是要做監聽,首先是監聽(使用PathChildrenCache,可監聽子節點的增刪改)根節點/root,如果有節點新增,則表明有新應用提供服務:這時候我們又要繼續對此節點做監聽(也是使用PathChildrenCache),並且將PathChildrenCache緩存到內存緩存中【監聽緩存列表】。如果有節點刪除,則表明有應用不提供服務了:這時候我們將監聽關閉掉,並且從【監聽緩存列表】中刪除,並且從【服務緩存列表】刪除應用提供的所有服務。根節點/root下面的子節點才是真正提供的RPC服務:當新增,需要將節點信息緩存到內存緩存中【服務緩存列表】;當刪除,從【服務緩存列表】中刪除對應數據。
4、處理Netty客戶端或服務端主動異常:
客戶端:在RPCResponsPacketHandler中重寫exceptionCaught方法。首先是根據標識從【Channel緩存列表】中移除此Channel,然后根據標識從【服務緩存列表】中移除對應的所有服務,最后關閉通道ctx.channel().close()。
服務端:直接關閉通道即可,ctx.channel().close();
5、業務處理使用自定義業務線程池:
首先自定義一個線程池,根據自己需求去設計 核心線程數、最大線程數、線程保持活躍時間、隊列、拒絕策略等。然后在業務處理除直接新建一個任務(實現Runnable接口)提交到線程池處理即可。
package com.hyf.rpc.netty.common;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* @author Howinfun
* @desc
* @date 2019/7/12
*/
@Slf4j
public class TaskThreadPool {
public static final TaskThreadPool INSTANCE = new TaskThreadPool();
private final ThreadPoolExecutor executor;
private TaskThreadPool(){
/**
* 核心線程數:10
* 最大線程數:20
* 線程保持活躍時間:60s
* 隊列:阻塞隊列,最多存放100個任務
* 拒絕策略:任務將被放棄
*/
this.executor = new ThreadPoolExecutor(10,
20,
60, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(100),new ThreadPoolExecutor.CallerRunsPolicy());
}
public Future submit(Runnable task){
log.info("業務線程池執行任務中...");
Future future = executor.submit(task);
return future;
}
}
最后:
到這里就基本已經全部結束了,雖然總體做得不是很好,但是起碼自己經歷了從0到1的所謂手寫框架了,哈哈哈。同時,也將學到的Netty和Zookeeper等技術稍微實戰了一下子,算是很滿足了。接下來,得想想干點什么。。。。