這里會介紹:
- Sentinel會使用多線程的方式實現一個類Reactor的IO模型
- Sentinel會使用心跳檢測來觀察控制台是否正常
Sentinel源碼解析系列:
1.Sentinel源碼分析—FlowRuleManager加載規則做了什么?
2. Sentinel源碼分析—Sentinel是如何進行流量統計的?
3. Sentinel源碼分析— QPS流量控制是如何實現的?
4.Sentinel源碼分析— Sentinel是如何做到降級的?
5.Sentinel源碼分析—Sentinel如何實現自適應限流?
6.Sentinel源碼分析—Sentinel是如何動態加載配置限流的?
在看我的這篇文章之前大家可以先看一下官方的這篇文章:https://github.com/alibaba/Sentinel/wiki/控制台。介紹了控制台怎么使用,以及客戶端要怎么設置才能被收集數據。
客戶端會在InitExecutor調用doInit方法中與控制台建立通信,所以我們直接看doInit方法:
InitExecutor#doInit
public static void doInit() {
//InitExecutor只會初始化一次,並且初始化失敗會退出
if (!initialized.compareAndSet(false, true)) {
return;
}
try {
//通過spi加載InitFunc子類
ServiceLoader<InitFunc> loader = ServiceLoader.load(InitFunc.class);
List<OrderWrapper> initList = new ArrayList<OrderWrapper>();
for (InitFunc initFunc : loader) {
RecordLog.info("[InitExecutor] Found init func: " + initFunc.getClass().getCanonicalName());
//給所有的initFunc排序,按@InitOrder從小到大進行排序
//然后封裝成OrderWrapper對象
insertSorted(initList, initFunc);
}
for (OrderWrapper w : initList) {
w.func.init();
RecordLog.info(String.format("[InitExecutor] Executing %s with order %d",
w.func.getClass().getCanonicalName(), w.order));
}
} catch (Exception ex) {
RecordLog.warn("[InitExecutor] WARN: Initialization failed", ex);
ex.printStackTrace();
} catch (Error error) {
RecordLog.warn("[InitExecutor] ERROR: Initialization failed with fatal error", error);
error.printStackTrace();
}
}
因為這里我們引入了sentinel-transport-simple-http
模塊,所以使用spi加載InitFunc的子類的時候會加載三個子類實例,分別是:CommandCenterInitFunc、HeartbeatSenderInitFunc、MetricCallbackInit。
然后會遍歷loader,根據@InitOrder的大小進行排序,並封裝成OrderWrapper放入到initList中。
所以initList里面的對象順序是:
- CommandCenterInitFunc
- HeartbeatSenderInitFunc
- MetricCallbackInit
然后遍歷initList依次調用init方法。
所以下面我們來看一下這三個實現類的init方法做了什么:
CommandCenterInitFunc
CommandCenterInitFunc#init
public void init() throws Exception {
//獲取commandCenter對象
CommandCenter commandCenter = CommandCenterProvider.getCommandCenter();
if (commandCenter == null) {
RecordLog.warn("[CommandCenterInitFunc] Cannot resolve CommandCenter");
return;
}
//調用SimpleHttpCommandCenter的beforeStart方法
//用來設置CommandHandler的實現類
commandCenter.beforeStart();
commandCenter.start();
RecordLog.info("[CommandCenterInit] Starting command center: "
+ commandCenter.getClass().getCanonicalName());
}
這個方法里面的所有操作都是針對CommandCenter來進行的,所以我們先來看看CommandCenterProvider這個類。
CommandCenterProvider
static {
//初始化commandCenter對象
resolveInstance();
}
private static void resolveInstance() {
//獲取SpiOrder更大的子類實現類
CommandCenter resolveCommandCenter = SpiLoader.loadHighestPriorityInstance(CommandCenter.class);
if (resolveCommandCenter == null) {
RecordLog.warn("[CommandCenterProvider] WARN: No existing CommandCenter found");
} else {
commandCenter = resolveCommandCenter;
RecordLog.info("[CommandCenterProvider] CommandCenter resolved: " + resolveCommandCenter.getClass()
.getCanonicalName());
}
}
CommandCenterProvider會在首次初始化的時候調用resolveInstance方法。在resolveInstance方法里面會調用SpiLoader.loadHighestPriorityInstance
來獲取CommandCenter,這里獲取的是SimpleHttpCommandCenter這個實例,loadHighestPriorityInstance方法具體的實現非常簡單,我就不去分析了。
然后將commandCenter賦值SimpleHttpCommandCenter實例。
所以CommandCenterProvider.getCommandCenter()
方法返回的是SimpleHttpCommandCenter實例。
然后調用SimpleHttpCommandCenter的beforeStart方法。
SimpleHttpCommandCenter#beforeStart
public void beforeStart() throws Exception {
// Register handlers
//調用CommandHandlerProvider的namedHandlers方法
//獲取CommandHandler的spi中設置的實現類
Map<String, CommandHandler> handlers = CommandHandlerProvider.getInstance().namedHandlers();
//將handlers中的數據設置到handlerMap中
registerCommands(handlers);
}
這個方法首先會調用CommandHandlerProvider的namedHandlers中獲取所有的CommandHandler實現類。
CommandHandlerProvider#namedHandlers
private final ServiceLoader<CommandHandler> serviceLoader = ServiceLoader.load(CommandHandler.class);
public Map<String, CommandHandler> namedHandlers() {
Map<String, CommandHandler> map = new HashMap<String, CommandHandler>();
for (CommandHandler handler : serviceLoader) {
//獲取實現類CommandMapping注解的name屬性
String name = parseCommandName(handler);
if (!StringUtil.isEmpty(name)) {
map.put(name, handler);
}
}
return map;
}
這個類會通過spi先加載CommandHandler的實現類,然后將實現類按注解上面的name屬性放入到map里面去。
CommandHandler的實現類是用來和控制台進行交互的處理類,負責處理。
這也是策略模式的一種應用,根據map里面的不同策略來做不同的處理,例如SendMetricCommandHandler是用來統計調用信息然后發送給控制台用的,ModifyRulesCommandHandler是用來做實時修改限流策略的處理的等等。
然后我們再回到CommandCenterInitFunc中,繼續往下走,調用commandCenter.start()
方法。
SimpleHttpCommandCenter#start
public void start() throws Exception {
//獲取當前機器的cpu線程數
int nThreads = Runtime.getRuntime().availableProcessors();
//創建一個cpu線程數大小的固定線程池,用來做業務線程池用
this.bizExecutor = new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<Runnable>(10),
new NamedThreadFactory("sentinel-command-center-service-executor"),
new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
CommandCenterLog.info("EventTask rejected");
throw new RejectedExecutionException();
}
});
Runnable serverInitTask = new Runnable() {
int port;
{
try {
//獲取port
port = Integer.parseInt(TransportConfig.getPort());
} catch (Exception e) {
port = DEFAULT_PORT;
}
}
@Override
public void run() {
boolean success = false;
//創建一個ServerSocket
ServerSocket serverSocket = getServerSocketFromBasePort(port);
if (serverSocket != null) {
CommandCenterLog.info("[CommandCenter] Begin listening at port " + serverSocket.getLocalPort());
socketReference = serverSocket;
executor.submit(new ServerThread(serverSocket));
success = true;
port = serverSocket.getLocalPort();
} else {
CommandCenterLog.info("[CommandCenter] chooses port fail, http command center will not work");
}
if (!success) {
port = PORT_UNINITIALIZED;
}
TransportConfig.setRuntimePort(port);
//關閉線程池
executor.shutdown();
}
};
new Thread(serverInitTask).start();
}
- 這個方法會創建一個固定大小的業務線程池
- 創建一個serverInitTask,里面負責建立serverSocket然后用executor去創建一個ServerThread異步執行serverSocket
- executor用完之后會在serverInitTask里面調用executor的shutdown方法去關閉線程池
其中executor是一個單線程的線程池:
private ExecutorService executor = Executors.newSingleThreadExecutor(
new NamedThreadFactory("sentinel-command-center-executor"));
ServerThread是SimpleHttpCommandCenter的內部類:
public void run() {
while (true) {
Socket socket = null;
try {
//建立連接
socket = this.serverSocket.accept();
//默認的超時時間是3s
setSocketSoTimeout(socket);
HttpEventTask eventTask = new HttpEventTask(socket);
//使用業務線程異步處理
bizExecutor.submit(eventTask);
} catch (Exception e) {
CommandCenterLog.info("Server error", e);
if (socket != null) {
try {
socket.close();
} catch (Exception e1) {
CommandCenterLog.info("Error when closing an opened socket", e1);
}
}
try {
// In case of infinite log.
Thread.sleep(10);
} catch (InterruptedException e1) {
// Indicates the task should stop.
break;
}
}
}
}
run方法會使用構造器傳入的serverSocket建立連接后設置超時時間,封裝成HttpEventTask類,然后使用上面創建的bizExecutor異步執行任務。
HttpEventTask是Runnable的實現類,所以調用bizExecutor的submit的時候會調用其中的run方法使用socket與控制台進行交互。
HttpEventTask#run
public void run() {
....
// Validate the target command.
//獲取commandName
String commandName = HttpCommandUtils.getTarget(request);
if (StringUtil.isBlank(commandName)) {
badRequest(printWriter, "Invalid command");
return;
}
// Find the matching command handler.
//根據commandName獲取處理器名字
CommandHandler<?> commandHandler = SimpleHttpCommandCenter.getHandler(commandName);
if (commandHandler != null) {
//調用處理器結果,然后返回給控制台
CommandResponse<?> response = commandHandler.handle(request);
handleResponse(response, printWriter, outputStream);
}
....
} catch (Throwable e) {
....
} finally {
....
}
}
HttpEventTask的run方法很長,但是很多都是有關輸入輸出流的,我們不關心,所以省略。只需要知道會把request請求最后轉換成一個控制台發過來的指令,然后通過SimpleHttpCommandCenter調用getHandler得到處理器,然后處理數據就行了。
所以這個整個的處理流程就是:
通過這樣的一個處理流程,然后實現了類似reactor的一個處理流程。
SimpleHttpCommandCenter#getHandler
public static CommandHandler getHandler(String commandName) {
return handlerMap.get(commandName);
}
handlerMap里面的數據是通過前面我們分析的調用beforeStart方法設置進來的。
然后通過commandName獲取對應的控制台,例如:控制台發送過來metric指令,那么就會對應的調用SendMetricCommandHandler的handle方法來處理控制台的指令。
我們來看看SendMetricCommandHandler是怎么處理返回統計數據的:
SendMetricCommandHandler#handle
public CommandResponse<String> handle(CommandRequest request) {
// Note: not thread-safe.
if (searcher == null) {
synchronized (lock) {
//獲取應用名
String appName = SentinelConfig.getAppName();
if (appName == null) {
appName = "";
}
if (searcher == null) {
//用來找metric文件,
searcher = new MetricSearcher(MetricWriter.METRIC_BASE_DIR,
MetricWriter.formMetricFileName(appName, PidUtil.getPid()));
}
}
}
//獲取請求的開始結束時間和最大的行數
String startTimeStr = request.getParam("startTime");
String endTimeStr = request.getParam("endTime");
String maxLinesStr = request.getParam("maxLines");
//用來確定資源
String identity = request.getParam("identity");
long startTime = -1;
int maxLines = 6000;
if (StringUtil.isNotBlank(startTimeStr)) {
startTime = Long.parseLong(startTimeStr);
} else {
return CommandResponse.ofSuccess("");
}
List<MetricNode> list;
try {
// Find by end time if set.
if (StringUtil.isNotBlank(endTimeStr)) {
long endTime = Long.parseLong(endTimeStr);
//根據開始結束時間找到統計數據
list = searcher.findByTimeAndResource(startTime, endTime, identity);
} else {
if (StringUtil.isNotBlank(maxLinesStr)) {
maxLines = Integer.parseInt(maxLinesStr);
}
maxLines = Math.min(maxLines, 12000);
list = searcher.find(startTime, maxLines);
}
} catch (Exception ex) {
return CommandResponse.ofFailure(new RuntimeException("Error when retrieving metrics", ex));
}
if (list == null) {
list = new ArrayList<>();
}
//如果identity為空就加入CPU負載和系統負載
if (StringUtil.isBlank(identity)) {
addCpuUsageAndLoad(list);
}
StringBuilder sb = new StringBuilder();
for (MetricNode node : list) {
sb.append(node.toThinString()).append("\n");
}
return CommandResponse.ofSuccess(sb.toString());
}
我們在1.Sentinel源碼分析—FlowRuleManager加載規則做了什么?里介紹了Metric統計信息會在MetricTimerListener的run方法中定時寫入文件中去。
所以handle方法里面主要是如何根據請求的開始結束時間,資源名來獲取磁盤的文件,然后返回磁盤的統計信息,並記錄一下當前的統計信息,防止重復發送統計數據到控制台。
HeartbeatSenderInitFunc
HeartbeatSenderInitFunc主要是用來做心跳線程使用的,定期的和控制台進行心跳連接。
HeartbeatSenderInitFunc#init
public void init() {
//獲取HeartbeatSender的實現類
HeartbeatSender sender = HeartbeatSenderProvider.getHeartbeatSender();
if (sender == null) {
RecordLog.warn("[HeartbeatSenderInitFunc] WARN: No HeartbeatSender loaded");
return;
}
//創建一個corepoolsize為2,maximumPoolSize為最大的線程池
initSchedulerIfNeeded();
//獲取心跳間隔時間,默認10s
long interval = retrieveInterval(sender);
//設置間隔心跳時間
setIntervalIfNotExists(interval);
//開啟一個定時任務,每隔interval時間發送一個心跳
scheduleHeartbeatTask(sender, interval);
}
- 首先會調用HeartbeatSenderProvider.getHeartbeatSender方法,里面會根據spi創建實例,返回一個SimpleHttpHeartbeatSender實例。
- 調用initSchedulerIfNeeded方法創建一個corepoolsize為2的線程池
- 獲取心跳間隔時間,如果沒有設置,那么是10s
- 調用scheduleHeartbeatTask方法開啟一個定時線程調用。
我們來看看scheduleHeartbeatTask方法:
HeartbeatSenderInitFunc#scheduleHeartbeatTask
private void scheduleHeartbeatTask(/*@NonNull*/ final HeartbeatSender sender, /*@Valid*/ long interval) {
pool.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
sender.sendHeartbeat();
} catch (Throwable e) {
RecordLog.warn("[HeartbeatSender] Send heartbeat error", e);
}
}
}, 5000, interval, TimeUnit.MILLISECONDS);
RecordLog.info("[HeartbeatSenderInit] HeartbeatSender started: "
+ sender.getClass().getCanonicalName());
}
默認的情況,創建的這個定時任務會每隔10s調用一次SimpleHttpHeartbeatSender的sendHeartbeat方法。
SimpleHttpHeartbeatSender#sendHeartbeat
public boolean sendHeartbeat() throws Exception {
if (TransportConfig.getRuntimePort() <= 0) {
RecordLog.info("[SimpleHttpHeartbeatSender] Runtime port not initialized, won't send heartbeat");
return false;
}
//獲取控制台的ip和端口等信息
InetSocketAddress addr = getAvailableAddress();
if (addr == null) {
return false;
}
//設置http調用的ip和端口,還有訪問的url
SimpleHttpRequest request = new SimpleHttpRequest(addr, HEARTBEAT_PATH);
//獲取版本號,端口等信息
request.setParams(heartBeat.generateCurrentMessage());
try {
//發送post請求
SimpleHttpResponse response = httpClient.post(request);
if (response.getStatusCode() == OK_STATUS) {
return true;
}
} catch (Exception e) {
RecordLog.warn("[SimpleHttpHeartbeatSender] Failed to send heartbeat to " + addr + " : ", e);
}
return false;
}
這個心跳檢測的方法就寫的很簡單了,通過Dcsp.sentinel.dashboard.server預先設置好的ip和端口號發送post請求到控制台,然后檢測是否返回200,如果是則說明控制台正常,否則進行異常處理。