一、incubator-dolphinscheduler 中如何獲取shell類型的節點或者python類型的節點任務的日志
1、在org.apache.dolphinscheduler.server.worker.task.AbstractCommandExecutor 類中通過java.lang.ProcessBuilder 來將python 腳本生成命令進行執行,AbstractCommandExecutor中的部分源碼如下:
...... } else { //init process builder ProcessBuilder processBuilder = new ProcessBuilder(); // setting up a working directory processBuilder.directory(new File(taskExecutionContext.getExecutePath())); // merge error information to standard output stream processBuilder.redirectErrorStream(true); // setting up user to run commands command.add("sudo"); command.add("-u"); command.add(taskExecutionContext.getTenantCode()); command.add(commandInterpreter()); command.addAll(commandOptions()); command.add(commandFile); // setting commands processBuilder.command(command); process = processBuilder.start(); } ......
2、通過process.getInputStream() 來獲取命令終端輸出的日志,部分源碼如下:
private void parseProcessOutput(Process process) { String threadLoggerInfoName = String.format(LoggerUtils.TASK_LOGGER_THREAD_NAME + "-%s", taskExecutionContext.getTaskAppId()); ExecutorService parseProcessOutputExecutorService = ThreadUtils.newDaemonSingleThreadExecutor(threadLoggerInfoName); parseProcessOutputExecutorService.submit(new Runnable() { @Override public void run() { BufferedReader inReader = null; try { inReader = new BufferedReader(new InputStreamReader(process.getInputStream())); String line; long lastFlushTime = System.currentTimeMillis(); logBuffer.add("welcome to use bigdata scheduling system..."); Thread.sleep(Constants.DEFAULT_LOG_FLUSH_INTERVAL * 2); while ((line = inReader.readLine()) != null || logBuffer.size()>0) { if(null != line){ logBuffer.add(line); } lastFlushTime = flush(lastFlushTime); } if (logBuffer.size() > 0) { Thread.sleep(Constants.DEFAULT_LOG_FLUSH_INTERVAL * 2); lastFlushTime = flush(lastFlushTime); } } catch (Exception e) { logger.error(e.getMessage(), e); } finally { clear(); close(inReader); } } }); parseProcessOutputExecutorService.shutdown(); }
二、 Python 腳本中通過print()打印輸出的日志為啥不能及時被incubator-dolphinscheduler獲取到以及如何改進python腳本任務
在python 腳本中,很多人習慣於用print()來輸出日志,這本身也沒啥問題,而且在python 3版本中,print()本身也是自動換行輸出的,而dolphinscheduler 也是按行來讀取process的輸出的,按理應該是可以及時輸出的。
if __name__=='__main__': ......... print(xxxxxxxx) ......... print(xxxxxxxx) ......... print(xxxxxxxx)
在 Python 3中打印日志調用 print (obj) 的時候,事實上是調用了 sys.stdout.write(obj+'\n'),print ()將需要的打印內容打印到了控制台,然后追加了一個換行符,print() 會調用 sys.stdout 的 write() 方法。
一行print("hello,world") 其實等價於執行sys.stdout.write('hello,world'+'\n'),看到這里是不是就容易理解了。因為這樣會一直寫如到了緩沖區,需要等到線程退出等情況下,緩沖區的內容才會被刷出,但是我們可以通過在腳本中強制調用sys.stdout.flush() 讓其及時的刷出。
三、 直接通過參數解決
python中提供了-u 參數:force the stdout and stderr streams to be unbuffered;this option has no effect on stdin; also PYTHONUNBUFFERED=x 可以強制輸出e stdout and stderr streams