准實時的郵件直接告警很容易將公司的郵箱服務打爆,如果將日志接入ELK,一方面能實現日志的統一存儲,方便后續查看追蹤,另一方面也能根據特定級別的日志量進行分析,實現曲線預警。
logback將日志接入ELK,可以將日志先打到Kafka,es再費kafka的消息。
lobback原生是沒有寫入kafka的組件,需要使用到第三方的包 :danielwegener/logback-kafka-appender 在git上可以搜索到
因為logback-kafka-appender還依賴了其他包,方便起見,直接將源碼拉下來本地把所有依賴都打到一個依賴包中。
最終編譯后的包就是logback-kafka-appender-0.2.0-RC2-jar-with-dependencies.jar
將這個包添加到 flink/lib下
然后在logback配置文件增加對應的appender
<appender name="kafkaAppender" class="com.github.danielwegener.logback.kafka.KafkaAppender">
<encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} flink [%thread] %-5level %logger{60} - %msg</pattern>
</encoder>
<topic>flink_log</topic>
<!-- we don't care how the log messages will be partitioned -->
<keyingStrategy class="com.github.danielwegener.logback.kafka.keying.NoKeyKeyingStrategy" />
<!-- use async delivery. the application threads are not blocked by logging -->
<deliveryStrategy class="com.github.danielwegener.logback.kafka.delivery.AsynchronousDeliveryStrategy" />
<!-- each <producerConfig> translates to regular kafka-client config (format: key=value) -->
<!-- producer configs are documented here: https://kafka.apache.org/documentation.html#newproducerconfigs -->
<!-- bootstrap.servers is the only mandatory producerConfig -->
<producerConfig>bootstrap.servers=xx.xx.xx.xx:9092,xx.xx.xx.xx:9092</producerConfig>
<!-- don't wait for a broker to ack the reception of a batch. -->
<producerConfig>acks=0</producerConfig>
<!-- wait up to 1000ms and collect log messages before sending them as a batch -->
<producerConfig>linger.ms=1000</producerConfig>
<!-- even if the producer buffer runs full, do not block the application but start to drop messages -->
<producerConfig>max.block.ms=0</producerConfig>
<!-- define a client-id that you use to identify yourself against the kafka broker -->
<producerConfig>client.id=flinkonyarn-${HOSTNAME}-${CONTEXT_NAME}-logback-relaxed</producerConfig>
</appender>
<root level="INFO">
<appender-ref ref="kafkaAppender"/>
</root>
因為我們的flink 啟用了metrics,一直報轉換number的warn
這個錯誤沒有意義,只能另外增加一個logger過濾掉了
<logger name="org.apache.flink.runtime.metrics.MetricRegistryImpl" additivity="false">
<level value="ERROR"/>
<appender-ref ref="file"/>
</logger>
到這里,我們flink產生的日志都會打到kafka
消費kafka的數據,寫入es並通過kibana展示相關內容,可以很方便的查看日志內容
因為kibana暫時沒有接入預警功能,所以在這里我們使用grafana聚合es數據,做最后的展示和預警
到這里就大功告成了.
參考資料
https://github.com/danielwegener/logback-kafka-appender
https://www.jianshu.com/p/d1be3364f32d
https://www.aiprose.com/blog/26