log4j、logback、log4j2 歷史和關系,我們就在這里不展開講了。直接上干貨,log4j2突出於其他日志的優勢,異步日志實現。
看一個東西,首先看官網文檔 ,因為前面文章已經講解了disruptor源碼,本文主要展開說說異步日志AsyncAppender和AsyncLogger(基於disruptor實現)。
一、AsyncAppender
我們先來看看AsyncApperder核心,就是logger將數據通過append方法放入到阻塞隊列中,隨后后台線程從隊列中取出數據然后進行后續的操作。
那這樣看來,就很簡單了,一個append()方法,一個后台線程執行就是我們要看的核心代碼了。圍繞我們要看的類AsyncAppender,來看看類關系圖。
一、放入隊列
主要實現就是logger將數據通過append方法放入到阻塞隊列中。
//AsyncAppender.java /** * Actual writing occurs here. * * @param logEvent The LogEvent. */ @Override public void append(final LogEvent logEvent) { if (!isStarted()) { throw new IllegalStateException("AsyncAppender " + getName() + " is not active"); } //創建Log4jLogEvent的對象memento final Log4jLogEvent memento = Log4jLogEvent.createMemento(logEvent, includeLocation); InternalAsyncUtil.makeMessageImmutable(logEvent.getMessage()); //transfer(memento)將event放入隊列 //默認ArrayBlockingQueueFactory 大小1024 if (!transfer(memento)) { if (blocking) { if (AbstractLogger.getRecursionDepth() > 1) { // LOG4J2-1518, LOG4J2-2031 // If queue is full AND we are in a recursive call, call appender directly to prevent deadlock AsyncQueueFullMessageUtil.logWarningToStatusLogger(); logMessageInCurrentThread(logEvent); } else { // delegate to the event router (which may discard, enqueue and block, or log in current thread) final EventRoute route = asyncQueueFullPolicy.getRoute(thread.getId(), memento.getLevel()); route.logMessage(this, memento); } } else { error("Appender " + getName() + " is unable to write primary appenders. queue is full"); logToErrorAppenderIfNecessary(false, memento); } } } private boolean transfer(final LogEvent memento) { return queue instanceof TransferQueue ? ((TransferQueue<LogEvent>) queue).tryTransfer(memento) : queue.offer(memento); }
如流程圖所示,首先會判斷用戶是否設置了blocking選項,默認是true,如果設置為false,則Appender直接會ToErrorAppender,如果用戶沒有配置或者配置為true,則會按照一定的策略來處理這些消息。策略可以分為2種,他們分別為:
1、DefaultAsyncQueueFullPolicy---等待隊列,轉為同步操作策略
public class DefaultAsyncQueueFullPolicy implements AsyncQueueFullPolicy { @Override public EventRoute getRoute(final long backgroundThreadId, final Level level) { // LOG4J2-471: prevent deadlock when RingBuffer is full and object // being logged calls Logger.log() from its toString() method if (Thread.currentThread().getId() == backgroundThreadId) { return EventRoute.SYNCHRONOUS; } return EventRoute.ENQUEUE; }
2、DiscardingAsyncQueueFullPolicy---按照日志等級拋棄日志策略
//DiscardingAsyncQueueFullPolicy.java @Override public EventRoute getRoute(final long backgroundThreadId, final Level level) { if (level.isLessSpecificThan(thresholdLevel)) { if (discardCount.getAndIncrement() == 0) { LOGGER.warn("Async queue is full, discarding event with level {}. " + "This message will only appear once; future events from {} " + "are silently discarded until queue capacity becomes available.", level, thresholdLevel); } return EventRoute.DISCARD; } return super.getRoute(backgroundThreadId, level); }
二、后台線程執行后續操作。
主要就是后台線程從隊列中取出數據然后進行后續的操作。
//AsyncAppender.java private class AsyncThread extends Log4jThread { private volatile boolean shutdown = false; private final List<AppenderControl> appenders; private final BlockingQueue<LogEvent> queue; public AsyncThread(final List<AppenderControl> appenders, final BlockingQueue<LogEvent> queue) { super("AsyncAppender-" + THREAD_SEQUENCE.getAndIncrement()); this.appenders = appenders; this.queue = queue; setDaemon(true); } @Override public void run() { while (!shutdown) { LogEvent event; try { event = queue.take(); if (event == SHUTDOWN_LOG_EVENT) { shutdown = true; continue; } } catch (final InterruptedException ex) { break; // LOG4J2-830 } event.setEndOfBatch(queue.isEmpty()); final boolean success = callAppenders(event); if (!success && errorAppender != null) { try { errorAppender.callAppender(event); } catch (final Exception ex) { // Silently accept the error. } } } // Process any remaining items in the queue. LOGGER.trace("AsyncAppender.AsyncThread shutting down. Processing remaining {} queue events.", queue.size()); int count = 0; int ignored = 0; while (!queue.isEmpty()) { try { final LogEvent event = queue.take(); if (event instanceof Log4jLogEvent) { final Log4jLogEvent logEvent = (Log4jLogEvent) event; logEvent.setEndOfBatch(queue.isEmpty()); callAppenders(logEvent); count++; } else { ignored++; LOGGER.trace("Ignoring event of class {}", event.getClass().getName()); } } catch (final InterruptedException ex) { // May have been interrupted to shut down. // Here we ignore interrupts and try to process all remaining events. } } LOGGER.trace("AsyncAppender.AsyncThread stopped. Queue has {} events remaining. " + "Processed {} and ignored {} events since shutdown started.", queue.size(), count, ignored); } ... }
該線程會一直嘗試從阻塞隊列中獲取LogEvent,如果獲取成功,調用AppenderRef所引用Appender的append方法。我們也可以看到,AsyncAppender實際上主要是類似於中轉,日志異步化,當消息放入阻塞隊列,返回成功,這樣能夠大幅提高日志記錄的吞吐。用戶可以在權衡性能與日志收集質量上進行權衡配置策略(設置blocking選項),當然也可以設置不同類型的阻塞隊列已到達更好的日志記錄吞吐。
AsyncAppender配置參數
https://logging.apache.org/log4j/2.x/manual/appenders.html#AsyncAppender