log4j2異步日志解讀(一)AsyncAppender


 

log4j、logback、log4j2 歷史和關系,我們就在這里不展開講了。直接上干貨,log4j2突出於其他日志的優勢,異步日志實現。

看一個東西,首先看官網文檔 ,因為前面文章已經講解了disruptor源碼,本文主要展開說說異步日志AsyncAppender和AsyncLogger(基於disruptor實現)。

AsyncLogger筆者下文展開講

 

一、AsyncAppender

 

我們先來看看AsyncApperder核心,就是logger將數據通過append方法放入到阻塞隊列中,隨后后台線程從隊列中取出數據然后進行后續的操作。

那這樣看來,就很簡單了,一個append()方法,一個后台線程執行就是我們要看的核心代碼了。圍繞我們要看的類AsyncAppender,來看看類關系圖。

一、放入隊列

主要實現就是logger將數據通過append方法放入到阻塞隊列中。

//AsyncAppender.java
    /**
     * Actual writing occurs here.
     *
     * @param logEvent The LogEvent.
     */
    @Override
    public void append(final LogEvent logEvent) {
        if (!isStarted()) {
            throw new IllegalStateException("AsyncAppender " + getName() + " is not active");
        }
        //創建Log4jLogEvent的對象memento
        final Log4jLogEvent memento = Log4jLogEvent.createMemento(logEvent, includeLocation);
        InternalAsyncUtil.makeMessageImmutable(logEvent.getMessage());
        //transfer(memento)將event放入隊列
        //默認ArrayBlockingQueueFactory 大小1024
        if (!transfer(memento)) {
            if (blocking) {
                if (AbstractLogger.getRecursionDepth() > 1) { // LOG4J2-1518, LOG4J2-2031
                    // If queue is full AND we are in a recursive call, call appender directly to prevent deadlock
                    AsyncQueueFullMessageUtil.logWarningToStatusLogger();
                    logMessageInCurrentThread(logEvent);
                } else {
                    // delegate to the event router (which may discard, enqueue and block, or log in current thread)
                 
                    final EventRoute route = asyncQueueFullPolicy.getRoute(thread.getId(), memento.getLevel());
                    route.logMessage(this, memento);
                }
            } else {
                error("Appender " + getName() + " is unable to write primary appenders. queue is full");
                logToErrorAppenderIfNecessary(false, memento);
            }
        }
    }

    private boolean transfer(final LogEvent memento) {
        return queue instanceof TransferQueue
            ? ((TransferQueue<LogEvent>) queue).tryTransfer(memento)
            : queue.offer(memento);
    }

如流程圖所示,首先會判斷用戶是否設置了blocking選項,默認是true,如果設置為false,則Appender直接會ToErrorAppender,如果用戶沒有配置或者配置為true,則會按照一定的策略來處理這些消息。策略可以分為2種,他們分別為:

1、DefaultAsyncQueueFullPolicy---等待隊列,轉為同步操作策略

public class DefaultAsyncQueueFullPolicy implements AsyncQueueFullPolicy {
    @Override
    public EventRoute getRoute(final long backgroundThreadId, final Level level) {

        // LOG4J2-471: prevent deadlock when RingBuffer is full and object
        // being logged calls Logger.log() from its toString() method
        if (Thread.currentThread().getId() == backgroundThreadId) {
            return EventRoute.SYNCHRONOUS;
        }
        return EventRoute.ENQUEUE;
    }


2、DiscardingAsyncQueueFullPolicy---按照日志等級拋棄日志策略

//DiscardingAsyncQueueFullPolicy.java
    @Override
    public EventRoute getRoute(final long backgroundThreadId, final Level level) {
        if (level.isLessSpecificThan(thresholdLevel)) {
            if (discardCount.getAndIncrement() == 0) {
                LOGGER.warn("Async queue is full, discarding event with level {}. " +
                        "This message will only appear once; future events from {} " +
                        "are silently discarded until queue capacity becomes available.",
                        level, thresholdLevel);
            }
            return EventRoute.DISCARD;
        }
        return super.getRoute(backgroundThreadId, level);
    }

二、后台線程執行后續操作。

主要就是后台線程從隊列中取出數據然后進行后續的操作。

//AsyncAppender.java
private class AsyncThread extends Log4jThread {

        private volatile boolean shutdown = false;
        private final List<AppenderControl> appenders;
        private final BlockingQueue<LogEvent> queue;

        public AsyncThread(final List<AppenderControl> appenders, final BlockingQueue<LogEvent> queue) {
            super("AsyncAppender-" + THREAD_SEQUENCE.getAndIncrement());
            this.appenders = appenders;
            this.queue = queue;
            setDaemon(true);
        }

        @Override
        public void run() {
            while (!shutdown) {
                LogEvent event;
                try {
                    event = queue.take();
                    if (event == SHUTDOWN_LOG_EVENT) {
                        shutdown = true;
                        continue;
                    }
                } catch (final InterruptedException ex) {
                    break; // LOG4J2-830
                }
                event.setEndOfBatch(queue.isEmpty());
                final boolean success = callAppenders(event);
                if (!success && errorAppender != null) {
                    try {
                        errorAppender.callAppender(event);
                    } catch (final Exception ex) {
                        // Silently accept the error.
                    }
                }
            }
            // Process any remaining items in the queue.
            LOGGER.trace("AsyncAppender.AsyncThread shutting down. Processing remaining {} queue events.",
                queue.size());
            int count = 0;
            int ignored = 0;
            while (!queue.isEmpty()) {
                try {
                    final LogEvent event = queue.take();
                    if (event instanceof Log4jLogEvent) {
                        final Log4jLogEvent logEvent = (Log4jLogEvent) event;
                        logEvent.setEndOfBatch(queue.isEmpty());
                        callAppenders(logEvent);
                        count++;
                    } else {
                        ignored++;
                        LOGGER.trace("Ignoring event of class {}", event.getClass().getName());
                    }
                } catch (final InterruptedException ex) {
                    // May have been interrupted to shut down.
                    // Here we ignore interrupts and try to process all remaining events.
                }
            }
            LOGGER.trace("AsyncAppender.AsyncThread stopped. Queue has {} events remaining. "
                + "Processed {} and ignored {} events since shutdown started.", queue.size(), count, ignored);
        }

    ...
}

該線程會一直嘗試從阻塞隊列中獲取LogEvent,如果獲取成功,調用AppenderRef所引用Appender的append方法。我們也可以看到,AsyncAppender實際上主要是類似於中轉,日志異步化,當消息放入阻塞隊列,返回成功,這樣能夠大幅提高日志記錄的吞吐。用戶可以在權衡性能與日志收集質量上進行權衡配置策略(設置blocking選項),當然也可以設置不同類型的阻塞隊列已到達更好的日志記錄吞吐。

AsyncAppender配置參數  

https://logging.apache.org/log4j/2.x/manual/appenders.html#AsyncAppender

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM