Azkaban作為LinkedIn開源的任務流式管理工具,在工作中很大程度上被用到。但是,由於非國人開發,對中文的支持性很不好。大多數情況下,會出現幾種亂碼現象: - 執行內置腳本生成log亂碼 - 直接command執行中文亂碼 - 中文包名亂碼等,其中對日常使用影響最大的就是日志亂碼問題。不管是調度Hive、DataX還是Java程序,只要日志拋出來中文,中文都是亂碼顯示,將日志文件拷貝出來查看文件格式為GB2312,但是Linux系統編碼和Azkaban日志編碼明明設置的都是UTF-8,很疑惑,摸索許久,決定從源碼入手開始層層解惑。
文中大部分內容從源碼一步步進入解析,有經驗的朋友可以跳至文末見具體解決方法。
根據頁面獲取日志的接口可以知道方法在 azkaban-web-server項目下package azkaban.webapp.servlet 下的方法handleAJAXAction,如下圖 請求參數是fetchExecJobLogs
對應的處理方法為 ajaxFetchJobLogs(req, resp, ret, session.getUser(), exFlow)和ajaxFetchExecFlowLogs(req, resp, ret, session.getUser(), exFlow)
進入該方法后可以發現返回的data為經過 StringEscapeUtils.escapeHtml格式化過的,這就是引發亂碼的原因之一。
改用commons-lang3下的方法可以解決這個問題,增加如下依賴后更新gradle項目,將此處StringEscapeUtils.escapeHtml(data.getData())更改為 org.apache.commons.lang3.StringEscapeUtils.escapeHtml3(data.getData())
// https://mvnrepository.com/artifact/org.apache.commons/commons-lang3
compile group: 'org.apache.commons', name: 'commons-lang3', version: '3.4'
修改后為
仔細研讀代碼可以發現其實這樣還不夠,讀取日志的時候會先判斷該任務是否正在執行,如果在執行的話就直接在服務器上暫存日志路徑讀取,否則是話就在mysql表讀取,具體代碼如下
一.本地文件讀取的話是走http請求的,對應的為azkaban-exec-server下的包package azkaban.execapp下的ExecutorServlet
進入方法handleFetchLogEvent,一個是FlowLogs 一個是JobLogs ,
進入這兩個方法后發現最終都是調用的FileIOUtils.readUtf8File()這個方法
讀取日志的核心方法就是這個了
BufferedInputStream讀取的是字節byte,因為一個漢字占兩個字節,而當中英文混合的時候,有的字符占一個字節,有的字符占兩個字節,所以如果直接讀字節,而數據比較長,沒有一次讀完的時候,很可能剛好讀到一個漢字的前一個字節,這樣,這個中文就成了亂碼,后面的數據因為沒有字節對齊,也都成了亂碼.所以我們需要用BufferedReader來讀取,
它讀到的是字符,所以不會讀到半個字符的情況,不會出現亂碼。
在這里修改方法后為一個新方法如下(編碼方式GBK根據生成的日志編碼格式設定)

public static LogData readUtf8FileEncode(final File file, final int fileOffset, final int length) throws IOException { final char[] chars = new char[length]; final FileInputStream fileStream = new FileInputStream(file); final long skipped = fileStream.skip(fileOffset); if (skipped < fileOffset) { fileStream.close(); return new LogData(fileOffset, 0, ""); } BufferedReader reader = null; int read = 0; try { reader = new BufferedReader(new InputStreamReader(fileStream, "GBK")); read = reader.read(chars); } finally { IOUtils.closeQuietly(reader); } if (read <= 0) { return new LogData(fileOffset, 0, ""); } final byte[] buffer = new String(chars).getBytes(); final Pair<Integer, Integer> utf8Range = getUtf8Range(buffer, 0, read); final String outputString = new String(buffer, utf8Range.getFirst(), utf8Range.getSecond(), StandardCharsets.UTF_8); return new LogData(fileOffset + utf8Range.getFirst(), utf8Range.getSecond(), outputString); }
在原來引用的地方替換為新的即可,如下圖。
二.對於運行完成的,日志會被寫入數據庫,而且是經過壓縮的,方法為 package azkaban.executor下的ExecutionLogsDao 搜索找到fetchLogs,在方法體中實例化了一個FetchLogsHandler
在FetchLogsHandler中核心方法為handle,在這個方法中可以發現這塊會有解壓縮的過程,開始懷疑是不是這塊出現問題,測試后否定了這一猜想。
這里有解壓,那在入庫之前肯定會有壓縮過程,再找到入庫的方法(如何觸發將本地文件寫入到數據庫在這里不討論)uploadLogFile(ExecutionLogsDao方法中)
進入方法體后可以發現和之前類似,也是采用BufferedInputStream讀取文件
修改后的方法為如下(編碼方式GBK根據生成的日志編碼格式設定)

private void uploadLogFileEncode(final DatabaseTransOperator transOperator, final int execId, final String name, final int attempt, final File[] files, final EncodingType encType) throws SQLException { // 50K buffer... if logs are greater than this, we chunk. // However, we better prevent large log files from being uploaded somehow
final char[] buffer = new char[50 * 1024]; int pos = 0; int length = buffer.length; int startByte = 0; try { for (int i = 0; i < files.length; ++i) { final File file = files[i]; // BufferedInputStream讀取的是字節byte,因為一個漢字占兩個字節,而當中英文混合的時候,有的字符占一個字節, // 有的字符占兩個字節,所以如果直接讀字節,而數據比較長,沒有一次讀完的時候,很可能剛好讀到一個漢字的前一個字節, // 這樣,這個中文就成了亂碼,后面的數據因為沒有字節對齊,也都成了亂碼.所以我們需要用BufferedReader來讀取, // 它讀到的是字符,所以不會讀到半個字符的情況,不會出現亂碼
BufferedReader reader = new BufferedReader(new InputStreamReader(new FileInputStream(file), "GBK")); try { int size = reader.read(buffer, pos, length); while (size >= 0) { if (pos + size == buffer.length) { // Flush here.
uploadLogPart(transOperator, execId, name, attempt, startByte, startByte + buffer.length, encType, buffer, buffer.length); pos = 0; length = buffer.length; startByte += buffer.length; } else { // Usually end of file.
pos += size; length = buffer.length - pos; } size = reader.read(buffer, pos, length); } } finally { IOUtils.closeQuietly(reader); } } // Final commit of buffer.
if (pos > 0) { uploadLogPart(transOperator, execId, name, attempt, startByte, startByte + pos, encType, buffer, pos); } } catch (final SQLException e) { logger.error("Error writing log part.", e); throw new SQLException("Error writing log part", e); } catch (final IOException e) { logger.error("Error chunking.", e); throw new SQLException("Error chunking", e); } } private void uploadLogPart(final DatabaseTransOperator transOperator, final int execId, final String name, final int attempt, final int startByte, final int endByte, final EncodingType encType, final char[] buffer, final int length) throws SQLException, IOException { final String INSERT_EXECUTION_LOGS = "INSERT INTO execution_logs "
+ "(exec_id, name, attempt, enc_type, start_byte, end_byte, "
+ "log, upload_time) VALUES (?,?,?,?,?,?,?,?)"; byte[] buf = new String(buffer).getBytes(); if (encType == EncodingType.GZIP) { buf = GZIPUtils.gzipBytes(buf, 0, buf.length); } else if (length < buffer.length) { buf = new String(buffer, 0, length).getBytes(); } transOperator.update(INSERT_EXECUTION_LOGS, execId, name, attempt, encType.getNumVal(), startByte, startByte + length, buf, DateTime.now().getMillis());
以上代碼直接放在ExecutionLogsDao內即可,修改引用的地方
至此日志顯示中文亂碼問題即可解決。以上為代碼分析過程,操作過程可直接總結如下:
1.添加commons-lang3依賴
// https://mvnrepository.com/artifact/org.apache.commons/commons-lang3
compile group: 'org.apache.commons', name: 'commons-lang3', version: '3.4'
2.azkaban-web-server -->package azkaban.webapp.servlet-->ExecutorServlet-->將方法ajaxFetchExecFlowLogs和ajaxFetchJobLogs中的StringEscapeUtils.escapeHtml替換為org.apache.commons.lang3.StringEscapeUtils.escapeHtml3
3.azkaban-common-->package azkaban.utils-->FileIOUtils-->增加方法readUtf8FileEncode (文中第一段代碼)
4.azkaban-common-->package azkaban.execapp-->FlowRunnerManager-->分別將readFlowLogs和readJobLogs中readUtf8File替換為readUtf8FileEncode
5.azkaban-common-->package azkaban.executor-->ExecutionLogsDao-->增加方法uploadLogFileEncode和uploadLogPart(重載)(文中第二段代碼),將uploadLogFile(在70行左右的這個方法public void uploadLogFile(final int execId, final String name, final int attempt, final File... files))中的uploadLogFile替換為uploadLogFileEncode
代碼修改就是這些,我們可以發現代碼中解析文件流都是采用UTF-8,如果生成的日志是其他編碼就會出現亂碼的情況,所以需要在讀取文件的地方指定日志文件編碼(還有一種思路就是Log4j寫日志時候就指定編碼,嘗試沒有成功就采用了現在的方式),來張前后對比圖(對於已經寫入數據庫的日志,中文亂碼問題無法解決,因為寫入數據庫前讀取文件的方法不支持中文,修改代碼之后入數據庫的日志可以支持中文,另外代碼中的GBK可以根據日志編碼更換)