本文將會介紹如何使用 Flume、log4j、Kafka進行規范的日志采集。
Flume 基本概念
Flume是一個完善、強大的日志采集工具,關於它的配置,在網上有很多現成的例子和資料,這里僅做簡單說明不再詳細贅述。
Flume包含Source、Channel、Sink三個最基本的概念:
Source——日志來源,其中包括:Avro Source、Thrift Source、Exec Source、JMS Source、Spooling Directory Source、Kafka Source、NetCat Source、Sequence Generator Source、Syslog Source、HTTP Source、Stress Source、Legacy Source、Custom Source、Scribe Source以及Twitter 1% firehose Source。
Channel——日志管道,所有從Source過來的日志數據都會以隊列的形式存放在里面,它包括:Memory Channel、JDBC Channel、Kafka Channel、File Channel、Spillable Memory Channel、Pseudo Transaction Channel、Custom Channel。
Sink——日志出口,日志將通過Sink向外發射,它包括:HDFS Sink、Hive Sink、Logger Sink、Avro Sink、Thrift Sink、IRC Sink、File Roll Sink、Null Sink、HBase Sink、Async HBase Sink、Morphline Solr Sink、Elastic Search Sink、Kite Dataset Sink、Kafka Sink、Custom Sink。
基於Flume的日志采集是靈活的,我們可以看到既有Avro Sink也有Avro Source,既有Thrift Sink也有Thrift Source,這意味着我們可以將多個管道處理串聯起來,如下圖所示:
串聯的意義在於,我們可以將多個管道合並到一個管道中最終輸出到同一個Sink中去,如下圖:
上面講述了Source和Sink的作用,而Channel的作用在於處理不同的Sink,假設我們一個Source要對應多個Sink,則只需要為一個Source建立多個Channel即可,如下所示:
一個Source如果想要輸出到多個Sink中去,就需要建立多個Channel進行介入並最終輸出,通過上面這幾張圖,我們可以很好的理解Flume的運行機制,我們在這里也就點到為止,詳細的配置可以在官網或者在網上搜索到、查看到。
一般情況下,我們使用 Exec Source對log文件進行監控,這樣做確實是比較簡單,但是並不方便,我們需要在每一台要監控的服務器上部署Flume,對運維來講萬一目標日志文件發生IO異常(例如格式改變、文件名改變、文件被鎖),也是很痛苦的,因此我們最好能讓日志直接通過Socket發送出去,而不是存放在本地,這樣一來,不僅降低了目標服務器的磁盤占用,還能夠有效的防止文件IO異常,而Kafka就是一個比較好的解決方案,具體的架構如下圖所示:
由上圖可以看到,日志最終流向了兩個地方:HBase Persistence和Realtime Processor,而至於為什么不用Kafka直接與Storm進行通信的原因是為了將Sotrm邏輯和日志源通過Flume進行隔離,在Storm中對日志進行簡單的分析后,將結果扔進 Rabbit MQ 中供 WEB APP消費。
HBase Persistence就是將原始的日志記錄在HBase中以便回檔查詢,而Realtime Processor則包含了實時的日志統計以及錯誤異常郵件提醒等功能。
為了能夠准確的捕獲到異常數據,我們還需要對程序進行一些規范化的改造,例如提供統一的異常處理句柄等等。
日志輸出格式
既然打算要對日志進行統一處理,一個統一、規范的日志格式就是非常重要的,而我們以往使用的 PatternLayout 對於最終字段的切分非常的不方便,如下所示:
2016-05-08 19:32:55,572 [INFO ] [main] - [com.banksteel.log.demo.log4j.Demo.main(Demo.java:13)] 輸出信息……
2016-05-08 19:32:55,766 [DEBUG] [main] - [com.banksteel.log.demo.log4j.Demo.main(Demo.java:15)] 調試信息……
2016-05-08 19:32:55,775 [WARN ] [main] - [com.banksteel.log.demo.log4j.Demo.main(Demo.java:16)] 警告信息……
2016-05-08 19:32:55,783 [ERROR] [main] - [com.banksteel.log.demo.log4j.Demo.main(Demo.java:20)] 處理業務邏輯的時候發生一個錯誤……
java.lang.Exception: 錯誤消息啊
at com.banksteel.log.demo.log4j.Demo.main(Demo.java:18)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
如何去解析這個日志,是個非常頭疼的地方,萬一某個系統的開發人員輸出的日志不符合既定規范的 PatternLayout 就會引發異常。
為了能夠一勞永逸的解決格式問題,我們采用 JsonLayout 就能很好的規范日志輸出,例如LOG4J 2.X 版本中提供的 JsonLayout 輸出的格式如下所示:
{ "timeMillis" : 1462712870612, "thread" : "main", "level" : "FATAL", "loggerName" : "com.banksteel.log.demo.log4j2.Demo", "message" : "發生了一個可能會影響程序繼續運行下去的異常!", "thrown" : { "commonElementCount" : 0, "localizedMessage" : "錯誤消息啊", "message" : "錯誤消息啊", "name" : "java.lang.Exception", "extendedStackTrace" : [ { "class" : "com.banksteel.log.demo.log4j2.Demo", "method" : "main", "file" : "Demo.java", "line" : 20, "exact" : true, "location" : "classes/", "version" : "?" }, { "class" : "sun.reflect.NativeMethodAccessorImpl", "method" : "invoke0", "file" : "NativeMethodAccessorImpl.java", "line" : -2, "exact" : false, "location" : "?", "version" : "1.7.0_80" }, { "class" : "sun.reflect.NativeMethodAccessorImpl", "method" : "invoke", "file" : "NativeMethodAccessorImpl.java", "line" : 57, "exact" : false, "location" : "?", "version" : "1.7.0_80" }, { "class" : "sun.reflect.DelegatingMethodAccessorImpl", "method" : "invoke", "file" : "DelegatingMethodAccessorImpl.java", "line" : 43, "exact" : false, "location" : "?", "version" : "1.7.0_80" }, { "class" : "java.lang.reflect.Method", "method" : "invoke", "file" : "Method.java", "line" : 606, "exact" : false, "location" : "?", "version" : "1.7.0_80" }, { "class" : "com.intellij.rt.execution.application.AppMain", "method" : "main", "file" : "AppMain.java", "line" : 144, "exact" : true, "location" : "idea_rt.jar", "version" : "?" } ] }, "endOfBatch" : false, "loggerFqcn" : "org.apache.logging.log4j.spi.AbstractLogger", "source" : { "class" : "com.banksteel.log.demo.log4j2.Demo", "method" : "main", "file" : "Demo.java", "line" : 23 } }
我們看到,這種格式,無論用什么語言都能輕松解析了。
日志框架的Kafka集成
我們這里只用log4j 1.x 和 log4j 2.x 進行示例。
log4j 1.x 與 Kafka 集成
首先POM.xml的內容如下:
<dependencies> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.17</version> </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-core</artifactId> <version>2.7.4</version> </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> <version>2.7.4</version> </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-annotations</artifactId> <version>2.7.4</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.8.2.1</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.11</artifactId> <version>0.8.2.1</version> </dependency> </dependencies>
注意,我們這里使用的Kafka版本號是0.8.2.1,但是對應0.9.0.1是可以使用的並且0.9.0.1也只能用0.8.2.1才不會發生異常(具體異常可以自己嘗試一下)。
而log4j 1.x 本身是沒有 JsonLayout 可用的,因此我們需要自己實現一個類,如下所示:
package com.banksteel.log.demo.log4j; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.log4j.Layout; import org.apache.log4j.spi.LoggingEvent; import java.util.LinkedHashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; /** * 擴展Log4j 1.x,使其支持 JsonLayout,與 log4j2.x 一樣是基於Jackson進行解析,其格式也是完全參考 Log4J 2.x實現的。 * * @author 熱血BUG男 * @version 1.0.0 * @since Created by gebug on 2016/5/8. */ public class JsonLayout extends Layout { private final ObjectMapper mapper = new ObjectMapper(); public String format(LoggingEvent loggingEvent) { String json; Map<String, Object> map = new LinkedHashMap<String, Object>(0); Map<String, Object> source = new LinkedHashMap<String, Object>(0); source.put("method", loggingEvent.getLocationInformation().getMethodName()); source.put("class", loggingEvent.getLocationInformation().getClassName()); source.put("file", loggingEvent.getLocationInformation().getFileName()); source.put("line", safeParse(loggingEvent.getLocationInformation().getLineNumber())); map.put("timeMillis", loggingEvent.getTimeStamp()); map.put("thread", loggingEvent.getThreadName()); map.put("level", loggingEvent.getLevel().toString()); map.put("loggerName", loggingEvent.getLocationInformation().getClassName()); map.put("source", source); map.put("endOfBatch", false); map.put("loggerFqcn", loggingEvent.getFQNOfLoggerClass()); map.put("message", safeToString(loggingEvent.getMessage())); map.put("thrown", formatThrowable(loggingEvent)); try { json = mapper.writeValueAsString(map); } catch (JsonProcessingException e) { return e.getMessage(); } return json; } private List<Map<String, Object>> formatThrowable(LoggingEvent le) { if (le.getThrowableInformation() == null || le.getThrowableInformation().getThrowable() == null) return null; List<Map<String, Object>> traces = new LinkedList<Map<String, Object>>(); Map<String, Object> throwableMap = new LinkedHashMap<String, Object>(0); StackTraceElement[] stackTraceElements = le.getThrowableInformation().getThrowable().getStackTrace(); for (StackTraceElement stackTraceElement : stackTraceElements) { throwableMap.put("class", stackTraceElement.getClassName()); throwableMap.put("file", stackTraceElement.getFileName()); throwableMap.put("line", stackTraceElement.getLineNumber()); throwableMap.put("method", stackTraceElement.getMethodName()); throwableMap.put("location", "?"); throwableMap.put("version", "?"); traces.add(throwableMap); } return traces; } private static String safeToString(Object obj) { if (obj == null) return null; try { return obj.toString(); } catch (Throwable t) { return "Error getting message: " + t.getMessage(); } } private static Integer safeParse(String obj) { try { return Integer.parseInt(obj.toString()); } catch (NumberFormatException t) { return null; } } public boolean ignoresThrowable() { return false; } public void activateOptions() { } }
其實並不復雜,注意其中有一些獲取不到的信息,用?代替了,保留字段的目的在於與log4j 2.x 的日志格式完全一致,配置log4j.properties如下對接 Kafka:
log4j.rootLogger=INFO,console log4j.logger.com.banksteel.log.demo.log4j=DEBUG,kafka log4j.appender.kafka=kafka.producer.KafkaLog4jAppender log4j.appender.kafka.topic=server_log log4j.appender.kafka.brokerList=Kafka-01:9092,Kafka-02:9092,Kafka-03:9092 log4j.appender.kafka.compressionType=none log4j.appender.kafka.syncSend=true log4j.appender.kafka.layout=com.banksteel.log.demo.log4j.JsonLayout
# appender console log4j.appender.console=org.apache.log4j.ConsoleAppender log4j.appender.console.target=System.out log4j.appender.console.layout=org.apache.log4j.PatternLayout log4j.appender.console.layout.ConversionPattern=%d [%-5p] [%t] - [%l] %m%n
通過打印日志我們可以看到其輸出的最終格式如下:
{ "timeMillis": 1462713132695, "thread": "main", "level": "ERROR", "loggerName": "com.banksteel.log.demo.log4j.Demo", "source": { "method": "main", "class": "com.banksteel.log.demo.log4j.Demo", "file": "Demo.java", "line": 20 }, "endOfBatch": false, "loggerFqcn": "org.slf4j.impl.Log4jLoggerAdapter", "message": "處理業務邏輯的時候發生一個錯誤……", "thrown": [ { "class": "com.intellij.rt.execution.application.AppMain", "file": "AppMain.java", "line": 144, "method": "main", "location": "?", "version": "?" }, { "class": "com.intellij.rt.execution.application.AppMain", "file": "AppMain.java", "line": 144, "method": "main", "location": "?", "version": "?" }, { "class": "com.intellij.rt.execution.application.AppMain", "file": "AppMain.java", "line": 144, "method": "main", "location": "?", "version": "?" }, { "class": "com.intellij.rt.execution.application.AppMain", "file": "AppMain.java", "line": 144, "method": "main", "location": "?", "version": "?" }, { "class": "com.intellij.rt.execution.application.AppMain", "file": "AppMain.java", "line": 144, "method": "main", "location": "?", "version": "?" }, { "class": "com.intellij.rt.execution.application.AppMain", "file": "AppMain.java", "line": 144, "method": "main", "location": "?", "version": "?" } ] }
測試類:
package com.banksteel.log.demo.log4j; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * @author 熱血BUG男 * @version 1.0.0 * @since Created by gebug on 2016/5/8. */ public class Demo { private static final Logger logger = LoggerFactory.getLogger(Demo.class); public static void main(String[] args) { logger.info("輸出信息……"); logger.trace("隨意打印……"); logger.debug("調試信息……"); logger.warn("警告信息……"); try { throw new Exception("錯誤消息啊"); } catch (Exception e) { logger.error("處理業務邏輯的時候發生一個錯誤……", e); } } }
log4j 2.x 與 Kafka 集成
log4j 2.x 天生支持 JsonLayout,並且與 Kafka 集成方便,我們只需要按部就班的配置一下就好了,POM.xml如下:
<dependencies> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-api</artifactId> <version>2.5</version> </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-core</artifactId> <version>2.5</version> </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-core</artifactId> <version>2.7.4</version> </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> <version>2.7.4</version> </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-annotations</artifactId> <version>2.7.4</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.11</artifactId> <version>0.9.0.1</version> </dependency> </dependencies>
log4j2.xml配置文件如下所示:
<?xml version="1.0" encoding="UTF-8"?> <!-- Log4j2 的配置文件 --> <Configuration status="DEBUG" strict="true" name="LOG4J2_DEMO" packages="com.banksteel.log.demo.log4j2"> <properties> <property name="logPath">log</property> </properties> <Appenders> <!--配置控制台輸出樣式--> <Console name="Console" target="SYSTEM_OUT"> <PatternLayout pattern="%highlight{%d{yyyy-MM-dd HH:mm:ss} %d{UNIX_MILLIS} [%t] %-5p %C{1.}:%L - %msg%n}"/> </Console> <!-- 配置Kafka日志主動采集,Storm會將日志解析成字段存放在HBase中。 --> <Kafka name="Kafka" topic="server_log"> <!--使用JSON傳輸日志文件--> <JsonLayout complete="true" locationInfo="true"/> <!--Kafka集群配置,需要在本機配置Hosts文件,或者通過Nginx配置--> <Property name="bootstrap.servers">Kafka-01:9092,Kafka-02:9092,Kafka-03:9092</Property> </Kafka> </Appenders> <Loggers> <Root level="DEBUG"> <!--啟用控制台輸出日志--> <AppenderRef ref="Console"/> <!--啟用Kafka采集日志--> <AppenderRef ref="Kafka"/> </Root> </Loggers> </Configuration>
這樣就Okay了,我們可以在Kafka中看到完整的輸出:
{ "timeMillis" : 1462712870591, "thread" : "main", "level" : "ERROR", "loggerName" : "com.banksteel.log.demo.log4j2.Demo", "message" : "處理業務邏輯的時候發生一個錯誤……", "thrown" : { "commonElementCount" : 0, "localizedMessage" : "錯誤消息啊", "message" : "錯誤消息啊", "name" : "java.lang.Exception", "extendedStackTrace" : [ { "class" : "com.banksteel.log.demo.log4j2.Demo", "method" : "main", "file" : "Demo.java", "line" : 20, "exact" : true, "location" : "classes/", "version" : "?" }, { "class" : "sun.reflect.NativeMethodAccessorImpl", "method" : "invoke0", "file" : "NativeMethodAccessorImpl.java", "line" : -2, "exact" : false, "location" : "?", "version" : "1.7.0_80" }, { "class" : "sun.reflect.NativeMethodAccessorImpl", "method" : "invoke", "file" : "NativeMethodAccessorImpl.java", "line" : 57, "exact" : false, "location" : "?", "version" : "1.7.0_80" }, { "class" : "sun.reflect.DelegatingMethodAccessorImpl", "method" : "invoke", "file" : "DelegatingMethodAccessorImpl.java", "line" : 43, "exact" : false, "location" : "?", "version" : "1.7.0_80" }, { "class" : "java.lang.reflect.Method", "method" : "invoke", "file" : "Method.java", "line" : 606, "exact" : false, "location" : "?", "version" : "1.7.0_80" }, { "class" : "com.intellij.rt.execution.application.AppMain", "method" : "main", "file" : "AppMain.java", "line" : 144, "exact" : true, "location" : "idea_rt.jar", "version" : "?" } ] }, "endOfBatch" : false, "loggerFqcn" : "org.apache.logging.log4j.spi.AbstractLogger", "source" : { "class" : "com.banksteel.log.demo.log4j2.Demo", "method" : "main", "file" : "Demo.java", "line" : 22 } }
為了減少日志對空間的占用,我們通常還會設置JSONLayout的compact屬性為true,這樣在kafka中獲得的日志將會排除掉空格和換行符。
最后
由於在實際開發中,我們會引入多個第三方依賴,這些依賴往往也會依賴無數的log日志框架,為了保證測試通過,請認清本文例子中的包名以及版本號,log4j 1.x 的 Json 輸出是為了完全模擬 2.x 的字段,因此部分字段用?代替,如果想要完美,請自行解決。
隨便解釋一下日志級別,以便建立規范:
log.error 錯誤信息,通常寫在 catch 中,可以使用 log.error("發生了一個錯誤",e) 來記錄詳細的異常堆棧
log.fatal 嚴重錯誤,該級別的錯誤用來記錄會導致程序異常退出的錯誤日志。
log.warn 警告
log.info 信息
log.trace 簡單輸出文字
log.debug 調試信息