DolphinScheduler源碼分析之任務日志


DolphinScheduler源碼分析之任務日志

任務日志打印在調度系統中算是一個比較重要的功能,下面就簡要分析一下其打印的邏輯和前端頁面查詢的流程。

AbstractTask

所有的任務都會繼承AbstractTask,這個抽象類有一個比較重要的字段就是logger,其實也就是一個org.slf4j.Logger對象。

也就是說所有的任務都是通過slf4j打印日志的。那這個logger是如何創建的呢?

Logger taskLogger = LoggerFactory.getLogger(LoggerUtils.buildTaskId(LoggerUtils.TASK_LOGGER_INFO_PREFIX,
                    taskInstance.getProcessDefine().getId(),
                    taskInstance.getProcessInstance().getId(),
                    taskInstance.getId()));
public static String buildTaskId(String affix, int processDefId, int processInstId, int taskId){ // - [taskAppId=TASK_79_4084_15210] return String.format(" - [taskAppId=%s-%s-%s-%s]",affix, processDefId, processInstId, taskId); } 

非常簡單,就是通過LoggerFactory.getLogger獲取的,名字是由流程定義ID、流程實例ID、任務ID拼接成的。前端查詢日志時,taskAppId其實就是logger的名稱。通過下圖可以很直觀的看到,當前任務的流程定義ID是1,流程實例ID是2,任務ID是2 logger

其實分析到這里,並沒有證明最終的進程把日志通過logger寫到文件,至少目前沒有看到相關的代碼。為了更加直觀的證明,我們選擇Shell類型的任務來分析打印日志的方式。因為它最終創建了一個shell子進程,如果要通過logger字段打印日志,一定會有相關的代碼。

ShellCommandExecutor

Shell類型的任務是通過ShellCommandExecutor去執行具體的shell腳本的。

/** * constructor * @param logHandler log handler * @param taskDir task dir * @param taskAppId task app id * @param taskInstId task instance id * @param tenantCode tenant code * @param envFile env file * @param startTime start time * @param timeout timeout * @param logger logger */ public ShellCommandExecutor(Consumer<List<String>> logHandler, String taskDir, String taskAppId, int taskInstId, String tenantCode, String envFile, Date startTime, int timeout, Logger logger) 

上面是ShellCommandExecutor的構造函數,通過注釋以及參數命名大概可以猜到,logHandler是最終打印日志的地方。下面從其賦值以及如何使用分析日志究竟是不是logger打印的。

this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle, taskProps.getTaskDir(), taskProps.getTaskAppId(), taskProps.getTaskInstId(), taskProps.getTenantCode(), taskProps.getEnvFile(), taskProps.getTaskStartTime(), taskProps.getTaskTimeout(), logger); 

ShellCommandExecutor創建的時候,logHandler是通過ShellTask的logHandle方法賦值的。

/** * log handle * @param logs log list */ public void logHandle(List<String> logs) { // note that the "new line" is added here to facilitate log parsing logger.info(" -> {}", String.join("\n\t", logs)); } 

上面是logHandle的方法定義,很明顯就是通過logger打印日志的。

那logHandler是什么時候使用的呢?

AbstractCommandExecutor

ShellCommandExecutor繼承了AbstractCommandExecutor,在AbstractCommandExecutor.run中調用了一個非常重要的方法:parseProcessOutput

private void parseProcessOutput(Process process) { String threadLoggerInfoName = String.format(LoggerUtils.TASK_LOGGER_THREAD_NAME + "-%s", taskAppId); ExecutorService parseProcessOutputExecutorService = ThreadUtils.newDaemonSingleThreadExecutor(threadLoggerInfoName); parseProcessOutputExecutorService.submit(new Runnable(){ @Override public void run() { BufferedReader inReader = null; try { inReader = new BufferedReader(new InputStreamReader(process.getInputStream())); String line; long lastFlushTime = System.currentTimeMillis(); while ((line = inReader.readLine()) != null) { logBuffer.add(line); lastFlushTime = flush(lastFlushTime); } } catch (Exception e) { logger.error(e.getMessage(),e); } finally { clear(); close(inReader); } } }); parseProcessOutputExecutorService.shutdown(); } 

parseProcessOutput這個方法就是把Process的標准輸入輸出打印到了logBuffer中,然后根據條件flush。

private long flush(long lastFlushTime) { long now = System.currentTimeMillis(); /** * when log buffer siz or flush time reach condition , then flush */ if (logBuffer.size() >= Constants.defaultLogRowsNum || now - lastFlushTime > Constants.defaultLogFlushInterval) { lastFlushTime = now; /** log handle */ logHandler.accept(logBuffer); logBuffer.clear(); } return lastFlushTime; } 

flush就是根據條件(大小、時間)把logBuffer中的內容,通過logHandler打印,其實就是通過logger打印到文件。

分析到這個地方,我們才真正清楚,任務其實就是通過slf4j打印到文件。那么問題又來了,前端是如何查詢日志文件的呢?日志文件的路徑前端是如何找到的呢?

logback.xml

既然我們知道了是slf4j在打印日志,那么配置文件在哪里呢?

在dolphinscheduler-server模塊的resources目錄下,有兩個logback.xml文件:worker_logback.xml、master_logback.xml。任務打印日志的配置應該是worker_logback.xml,在哪里指定的呢?

dolphinscheduler-daemon.sh文件中有一個關於日志的配置。

-Dlogging.config=classpath:master_logback.xml 

worker_logback

上面是worker_logback.xml,可以看到有兩個appender,其中TASKLOGFILE是我們關注的對象。它有一個比較關鍵的filter,根據logback中filter的概念來猜測,這應該就是用來區分workerlogfile這個appender的。也就是說兩個appender,會通過filter分別篩選出各自的日志進行打印。

/** * Accept or reject based on thread name * @param event event * @return FilterReply */ @Override public FilterReply decide(ILoggingEvent event) { if (event.getThreadName().startsWith(LoggerUtils.TASK_LOGGER_THREAD_NAME) || event.getLevel().isGreaterOrEqual(level)) { return FilterReply.ACCEPT; } return FilterReply.DENY; } 

這個filter根據日志級別和線程名過濾,符合條件的才能打印到當前appender。其實也就是只打印任務線程的日志。

當然了,還配置了Discriminator,它限定了logger的名稱符合前面的定義。

/** * logger name should be like: * Task Logger name should be like: Task-{processDefinitionId}-{processInstanceId}-{taskInstanceId} */ @Override public String getDiscriminatingValue(ILoggingEvent event) { String loggerName = event.getLoggerName() .split(Constants.EQUAL_SIGN)[1]; String prefix = LoggerUtils.TASK_LOGGER_INFO_PREFIX + "-"; if (loggerName.startsWith(prefix)) { return loggerName.substring(prefix.length(), loggerName.length() - 1).replace("-","/"); } else { return "unknown_task"; } } 

LoggerController

前面的分析我們知道,任務的日志其實就是打印到本地日志文件中,那么前端查詢的時候估計就是直接讀取日志文件然后返回。

但有一個很現實的問題,任務是隨機分布在各個worker的,如何讀取日志文件呢?

LoggerController.queryLog就是用來查詢日志的,它調用了LoggerService.queryLog

public Result queryLog(int taskInstId, int skipLineNum, int limit) { TaskInstance taskInstance = processDao.findTaskInstanceById(taskInstId); if (taskInstance == null){ return new Result(Status.TASK_INSTANCE_NOT_FOUND.getCode(), Status.TASK_INSTANCE_NOT_FOUND.getMsg()); } String host = taskInstance.getHost(); if(StringUtils.isEmpty(host)){ return new Result(Status.TASK_INSTANCE_NOT_FOUND.getCode(), Status.TASK_INSTANCE_NOT_FOUND.getMsg()); } Result result = new Result(Status.SUCCESS.getCode(), Status.SUCCESS.getMsg()); logger.info("log host : {} , logPath : {} , logServer port : {}",host,taskInstance.getLogPath(),Constants.RPC_PORT); LogClient logClient = new LogClient(host, Constants.RPC_PORT); String log = logClient.rollViewLog(taskInstance.getLogPath(),skipLineNum,limit); result.setData(log); logger.info(log); return result; } 

LoggerService.queryLog的邏輯其實就是通過任務實例ID,查詢到了任務所在節點以及日志路徑,通過LogClient讀取日志。當然了,讀取的時候,有限定跳過的行數以及需要讀取的行數。

LogClient.rollViewLog其實就是一次rpc調用,它連接到對應host的50051端口,讀取日志。

LoggerServer

LoggerServer其實就是一個socket服務,它監聽Constants.RPC_PORT(50051)端口的連接,交給LogViewServiceGrpcImpl處理對應的rpc請求。

/** * server start * @throws IOException io exception */ public void start() throws IOException { /* The port on which the server should run */ int port = Constants.RPC_PORT; server = ServerBuilder.forPort(port) .addService(new LogViewServiceGrpcImpl()) .build() .start(); logger.info("server started, listening on port : {}" , port); Runtime.getRuntime().addShutdownHook(new Thread() { @Override public void run() { // Use stderr here since the logger may have been reset by its JVM shutdown hook. logger.info("shutting down gRPC server since JVM is shutting down"); LoggerServer.this.stop(); logger.info("server shut down"); } }); } 

rollViewLog的實現如下,其實也比較簡單,就是調用readFile讀取日志文件,然后返回。

public void rollViewLog(LogParameter request, StreamObserver<RetStrInfo> responseObserver) { logger.info("log parameter path : {} ,skip line : {}, limit : {}", request.getPath(), request.getSkipLineNum(), request.getLimit()); List<String> list = readFile(request.getPath(), request.getSkipLineNum(), request.getLimit()); StringBuilder sb = new StringBuilder(); boolean errorLineFlag = false; for (String line : list){ sb.append(line + "\r\n"); } RetStrInfo retInfoBuild = RetStrInfo.newBuilder().setMsg(sb.toString()).build(); responseObserver.onNext(retInfoBuild); responseObserver.onCompleted(); } 

總結

 

 

 

上面是一個簡單的流程圖,是worker寫入日志的流程。

 

 

 

這是一個前端讀取日志的路程,讀取日志的請求按照箭頭方向傳遞,最終由LoggerServer讀取本地日志返回給遠程的ApiServer,ApiServer返回給前端。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM