介紹flink在本地運行和on yarn運行時的日志配置。
很多現代框架都是用門面模式進行日志輸出,例如使用Slf4j中的接口輸出日志,具體實現類需要由log4j,log4j2,logback等日志框架進行實現。
Flink 中的日志記錄就是使用slf4j日志接口實現的。
Slf4j簡要說明
slf4j全名Simple Logging Facade for Java,為java提供的簡單日志Facade。Facade門面說白了就是接口。它允許用戶以自己的喜好,在工程中通過slf4j接入不同的日志系統。slf4j入口就是眾多接口的集合,它不負責具體的日志實現,只在編譯時負責尋找合適的日志系統進行綁定。具體有哪些接口,全部都定義在slf4j-api中。查看slf4j-api源碼就可以發現,里面除了public final class LoggerFactory類之外,都是接口定義。因此slf4j-api本質就是一個接口定義。要想使用slf4j日志門面,需要引入以下依賴:
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.25</version>
</dependency>
這個包只有日志的接口,並沒有實現,所以如果要使用就得再給它提供一個實現了些接口的日志框架包,比如:log4j,log4j2,logback等日志框架包,但是這些日志實現又不能通過接口直接調用,實現上他們根本就和slf4j-api不一致,因此slf4j和日志框架之間又增加了一層橋接器來轉換各日志實現包的使用,比如slf4j-log4j12,log4j-slf4j-impl等。
接下來從本地運行和on yarn部署來說明下日志的使用配置方式。
Flink本地idea運行的日志配置
在我們編寫Flink代碼的時候,官方推薦的最佳實踐也是使用Slf4j。Slf4j 的 logger 通過調用 LoggerFactory
的getLogger()
方法創建,然后使用logger對象輸出日志。
接下來,我們就使用slf4j來打印日志。
使用slf4j
依賴
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.25</version>
</dependency>
實現代碼
package com.upupfeng;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author mawf
*/
public class Main {
// 創建Logger對象
private static final Logger log = LoggerFactory.getLogger(Main.class);
public static void main(String[] args) throws Exception {
// 打印日志
log.info("-----------------> start");
}
}
光有上述的代碼,運行起來是打印不出來日志的。還需要我們導入對應的日志實現的依賴(log4j或log4j2),以及進行相關的配置。
接下里再分別說一下log4j和log4j2的依賴和配置。
log4j1
pom依賴
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.25</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.21</version>
<scope>test</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/log4j/log4j -->
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
log4j.properties配置
log4j.rootLogger=info,console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.Threshold=INFO
log4j.appender.console.ImmediateFlush=true
log4j.appender.console.Target=System.out
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=[%-5p] %d(%r) --> [%t] %l: %m %x %n
添加完pom和配置后,就可以使用log4j輸出日志了。
log4j2
pom依賴
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.25</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>2.9.1</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>2.9.1</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.9.1</version>
</dependency>
log4j2.xml
<?xml version="1.0" encoding="UTF-8"?>
<configuration monitorInterval="5">
<Properties>
<property name="LOG_PATTERN" value="%date{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n" />
<property name="LOG_LEVEL" value="INFO" />
</Properties>
<appenders>
<console name="Console" target="SYSTEM_OUT">
<PatternLayout pattern="${LOG_PATTERN}"/>
<ThresholdFilter level="${LOG_LEVEL}" onMatch="ACCEPT" onMismatch="DENY"/>
</console>
</appenders>
<loggers>
<root level="${LOG_LEVEL}">
<appender-ref ref="Console"/>
</root>
</loggers>
</configuration>
添加完pom和配置后,就可以使用log4j2輸出日志了。
在實際使用中,還是推薦使用log4j2。其他的不說,占位符就比log4j的自己拼字符串好很多了。
Flink on Yarn的日志配置
說明
Flink有多種部署運行方式,我這里采用的是on yarn的運行方式,以per-job的形式提交任務到yarn。
# 以這種方式提交
flink run \
-d \
-m yarn-cluster \
我使用的flink版本是1.11.3。
官方的二進制運行包中就自帶了slf4j和log4j2的jar包:
flink-dist_2.11-1.11.3.jar
log4j-1.2-api-2.12.1.jar
log4j-api-2.12.1.jar
log4j-core-2.12.1.jar
log4j-slf4j-impl-2.12.1.jar
flink-dist包中包含了slf4j的包,帶log4j的是log4j2的包。
如果我們要使用log4j2的話,就必須保證我們自己打的jar包中沒有log的相關依賴,不然會出現各種奇怪的問題。這點很重要,我最開始沒有排除,遇到了很多問題。
maven打包時排除log依賴
我這里是使用shade插件打包,然后排除依賴的。
shade插件的配置如下:
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.1.0</version>
<configuration>
<artifactSet>
<excludes>
<exclude>org.slf4j:*</exclude>
<exclude>log4j:*</exclude>
<exclude>ch.qos.logback:*</exclude>
</excludes>
</artifactSet>
</configuration>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
</configuration>
</execution>
</executions>
</plugin>
日志的配置
Flink附帶了默認的日志配置文件:
log4j-cli.properties
:由 Flink 命令行客戶端使用(例如flink run
)(不包括在集群上執行的代碼)。這個文件是我們使用flink run
提交任務時,任務提交到集群前打印的日志所需的配置。log4j-session.properties
:Flink 命令行客戶端在啟動 YARN 或 Kubernetes session 時使用(yarn-session.sh
,kubernetes-session.sh
)。log4j.properties
:作為 JobManager/TaskManager 日志配置使用(standalone 和 YARN 兩種模式下皆使用)
所以使用flink run
這種方式提交任務,會自動去FLINK_HOME
下的conf目錄下找log4j.properties
的文件作為jobmanager和taskmanager的日志配置。
官方說使用 -Dlog4j.configurationFile=
參數可以傳遞日志文件,但是我試了幾次都不可以。
所以如果要對日志級別、模板進行修改的話,就直接改這個log4j.properties
文件就好了。
滾動日志的配置
默認的日志配置文件不是滾動的,所以日志文件很大的話,會暫用較多的資源,我們需要修改為滾動日志。
滾動日志的配置內容
rootLogger.level = INFO
rootLogger.appenderRef.rolling.ref = RollingFileAppender
logger.akka.name = akka
logger.akka.level = INFO
logger.kafka.name= org.apache.kafka
logger.kafka.level = INFO
logger.hadoop.name = org.apache.hadoop
logger.hadoop.level = INFO
logger.zookeeper.name = org.apache.zookeeper
logger.zookeeper.level = INFO
logger.upupfeng.name = com.upupfeng
logger.upupfeng.level = INFO
logger.upupfeng.additivity = false
logger.upupfeng.appenderRef.rolling.ref = RollingFileAppender
appender.rolling.name = RollingFileAppender
appender.rolling.type = RollingFile
appender.rolling.fileName = ${sys:log.file}
appender.rolling.filePattern = ${sys:log.file}.%i
appender.rolling.layout.type = PatternLayout
appender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
appender.rolling.policies.type = Policies
appender.rolling.policies.size.type = SizeBasedTriggeringPolicy
appender.rolling.policies.size.size=100MB
appender.rolling.strategy.type = DefaultRolloverStrategy
appender.rolling.strategy.max = 10
logger.netty.name = org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline
logger.netty.level = OFF
效果
提交任務后,就會生成如下的滾動日志了。
-rw-r--r-- 1 yarn yarn 30112705 Mar 6 09:49 taskmanager.log
-rw-r--r-- 1 yarn yarn 104857827 Mar 6 09:48 taskmanager.log.1
-rw-r--r-- 1 yarn yarn 104857687 Mar 6 09:49 taskmanager.log.10
-rw-r--r-- 1 yarn yarn 104857649 Mar 6 09:48 taskmanager.log.2
-rw-r--r-- 1 yarn yarn 104857692 Mar 6 09:48 taskmanager.log.3
-rw-r--r-- 1 yarn yarn 104857693 Mar 6 09:48 taskmanager.log.4
-rw-r--r-- 1 yarn yarn 104857831 Mar 6 09:49 taskmanager.log.5
-rw-r--r-- 1 yarn yarn 104857707 Mar 6 09:49 taskmanager.log.6
-rw-r--r-- 1 yarn yarn 104857649 Mar 6 09:49 taskmanager.log.7
-rw-r--r-- 1 yarn yarn 104857659 Mar 6 09:49 taskmanager.log.8
-rw-r--r-- 1 yarn yarn 104857646 Mar 6 09:49 taskmanager.log.9
參考
Flink原理:Flink中的日志框架配置 https://www.codenong.com/cs106739594/
Flink官網:如何使用日志記錄 https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/deployment/advanced/logging.html
slf4j-api和slf4j-simple https://blog.csdn.net/u011179993/article/details/52490013
slf4j-api、slf4j-log4j12、log4j之間關系 https://www.cnblogs.com/lujiango/p/8573411.html