談談在incubator-dolphinscheduler 中為啥不能及時看到python任務輸出的print日志


一、incubator-dolphinscheduler 中如何獲取shell類型的節點或者python類型的節點任務的日志

1、在org.apache.dolphinscheduler.server.worker.task.AbstractCommandExecutor 類中通過java.lang.ProcessBuilder 來將python 腳本生成命令進行執行,AbstractCommandExecutor中的部分源碼如下:

......        
} else {
            //init process builder
            ProcessBuilder processBuilder = new ProcessBuilder();
            // setting up a working directory
            processBuilder.directory(new File(taskExecutionContext.getExecutePath()));
            // merge error information to standard output stream
            processBuilder.redirectErrorStream(true);

            // setting up user to run commands
            command.add("sudo");
            command.add("-u");
            command.add(taskExecutionContext.getTenantCode());
            command.add(commandInterpreter());
            command.addAll(commandOptions());
            command.add(commandFile);

            // setting commands
            processBuilder.command(command);
            process = processBuilder.start();
        }
......

2、通過process.getInputStream() 來獲取命令終端輸出的日志,部分源碼如下:

 private void parseProcessOutput(Process process) {
        String threadLoggerInfoName = String.format(LoggerUtils.TASK_LOGGER_THREAD_NAME + "-%s", taskExecutionContext.getTaskAppId());
        ExecutorService parseProcessOutputExecutorService = ThreadUtils.newDaemonSingleThreadExecutor(threadLoggerInfoName);
        parseProcessOutputExecutorService.submit(new Runnable() {
            @Override
            public void run() {
                BufferedReader inReader = null;

                try {
                    inReader = new BufferedReader(new InputStreamReader(process.getInputStream()));
                    String line;
                    long lastFlushTime = System.currentTimeMillis();
                    logBuffer.add("welcome to use bigdata scheduling system...");
                    Thread.sleep(Constants.DEFAULT_LOG_FLUSH_INTERVAL * 2);
                    while ((line = inReader.readLine()) != null || logBuffer.size()>0) {
                        if(null != line){
                            logBuffer.add(line);
                        }
                        lastFlushTime = flush(lastFlushTime);
                    }
                    if (logBuffer.size() > 0) {
                        Thread.sleep(Constants.DEFAULT_LOG_FLUSH_INTERVAL * 2);
                        lastFlushTime = flush(lastFlushTime);
                    }
                } catch (Exception e) {
                    logger.error(e.getMessage(), e);
                } finally {
                    clear();
                    close(inReader);
                }
            }
        });
        parseProcessOutputExecutorService.shutdown();
    }

二、 Python 腳本中通過print()打印輸出的日志為啥不能及時被incubator-dolphinscheduler獲取到以及如何改進python腳本任務

在python 腳本中,很多人習慣於用print()來輸出日志,這本身也沒啥問題,而且在python 3版本中,print()本身也是自動換行輸出的,而dolphinscheduler 也是按行來讀取process的輸出的,按理應該是可以及時輸出的。

 

if __name__=='__main__':
.........
    print(xxxxxxxx)
.........
    print(xxxxxxxx)
.........
    print(xxxxxxxx)

 

在 Python 3中打印日志調用 print (obj) 的時候,事實上是調用了 sys.stdout.write(obj+'\n'),print ()將需要的打印內容打印到了控制台,然后追加了一個換行符,print() 會調用 sys.stdout 的 write() 方法。

一行print("hello,world") 其實等價於執行sys.stdout.write('hello,world'+'\n'),看到這里是不是就容易理解了。因為這樣會一直寫如到了緩沖區,需要等到線程退出等情況下,緩沖區的內容才會被刷出,但是我們可以通過在腳本中強制調用sys.stdout.flush() 讓其及時的刷出。

三、 直接通過參數解決

 

 

 python中提供了-u 參數:force the stdout and stderr streams to be unbuffered;this option has no effect on stdin; also PYTHONUNBUFFERED=x  可以強制輸出e stdout and stderr streams


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM