log4j2用asyncRoot配置異步日志是如何使用disruptor


asyncRoot配置對應的對接disruptor類是AsyncLoggerConfigDisruptor,用Log4jContextSelector啟動參數配置全局異步的對應的對接disruptor類是AsyncLoggerDisruptor。下面分析的是AsyncLoggerConfigDisruptor

disruptor的創建與啟動需要的部件實現

AsyncLoggerConfigDisruptor.start方法用來創建並啟動disruptor實例
創建disruptor需要EventFactoryringBuffer的大小ThreadFactoryProducerType等待策略waitStrategy
創建后需要設置ExceptionHandler,設置EventHandler
發布(生產)事件的translator。

EventFactory

分是否可變(mutable字段)場景對應兩個不同的EventFactory。
不可變的factory的邏輯是:

@Override
public Log4jEventWrapper newInstance() {
    return new Log4jEventWrapper();
}

可變的factory邏輯是:

public Log4jEventWrapper newInstance() {
    return new Log4jEventWrapper(new MutableLogEvent());
}

會在 Log4jEventWrapper的構造函數中傳入MutableLogEvent實例。

ringBuffer的大小

是根據AsyncLoggerConfig.RingBufferSize配置值算出來的。
這個配置項的值最小不能小於128,默認值分兩種情況進行設定:如果啟用了ENABLE_THREADLOCALS(優化GC的一個配置項),那么默認值是4 * 1024,否則是256 * 1024
這個配置是通過System properties指定,同樣存在不同版本,配置項名稱不一致的情況,log4j2.asyncLoggerRingBufferSize (AsyncLogger.RingBufferSize)。詳細可以參見這里

ThreadFactory

主要是定制線程名:
線程名格式是:"AsyncLoggerConfig-" + FACTORY_NUMBER(自增) + "-" + threadFactoryName + "-" + THREAD_NUMBER(自增)
默認的實際示例是: Log4j2-TF-1-AsyncLoggerConfig--1,跟上面的有些差異,上面的分析錯了嗎??

ProducerType

多生產者

等待策略waitStrategy

默認是10ms的TimeoutBlockingWaitStrategy。 支持可配置SleepingWaitStrategyYieldingWaitStrategyBlockingWaitStrategyBusySpinWaitStrategyTimeoutBlockingWaitStrategy
這個配置是通過System properties指定,同樣存在不同版本,配置項名稱不一致的情況,log4j2.asyncLoggerWaitStrategy (AsyncLogger.WaitStrategy)。

ExceptionHandler

可以配置,配置項名稱是AsyncLoggerConfig.ExceptionHandler,默認是用AsyncLoggerConfigDefaultExceptionHandler,打印: AsyncLogger error handling event seq=..., value=...,並打出異常棧。

EventHandler

此處使用了 Log4jEventWrapperHandler RingBufferLogEventHandler,是disruptor的SequenceReportingEventHandler實現。

/**
 * EventHandler performs the work in a separate thread.
 */
private static class Log4jEventWrapperHandler implements SequenceReportingEventHandler<Log4jEventWrapper> {
    private static final int NOTIFY_PROGRESS_THRESHOLD = 50;
    private Sequence sequenceCallback;
    private int counter;

    @Override
    public void setSequenceCallback(final Sequence sequenceCallback) {
        this.sequenceCallback = sequenceCallback;
    }

    @Override
    public void onEvent(final Log4jEventWrapper event, final long sequence, final boolean endOfBatch)
            throws Exception {
        event.event.setEndOfBatch(endOfBatch);
        event.loggerConfig.asyncCallAppenders(event.event);
        event.clear();

        notifyIntermediateProgress(sequence);
    }

    /**
     * Notify the BatchEventProcessor that the sequence has progressed. Without this callback the sequence would not
     * be progressed until the batch has completely finished.
     */
    private void notifyIntermediateProgress(final long sequence) {
        if (++counter > NOTIFY_PROGRESS_THRESHOLD) {
            sequenceCallback.set(sequence);
            counter = 0;
        }
    }
}

event.loggerConfig.asyncCallAppenders(event.event); 這個會觸發日志的輸出

translator

EventFactory一樣分mutable是否可變的兩種情況。
不可變:

private static final EventTranslatorTwoArg<Log4jEventWrapper, LogEvent, AsyncLoggerConfig> TRANSLATOR =
        new EventTranslatorTwoArg<Log4jEventWrapper, LogEvent, AsyncLoggerConfig>() {
    @Override
    public void translateTo(final Log4jEventWrapper ringBufferElement, final long sequence,
            final LogEvent logEvent, final AsyncLoggerConfig loggerConfig) {
        ringBufferElement.event = logEvent;
        ringBufferElement.loggerConfig = loggerConfig;
    }
};

可變:

/**
 * Object responsible for passing on data to a RingBuffer event with a MutableLogEvent.
 */
private static final EventTranslatorTwoArg<Log4jEventWrapper, LogEvent, AsyncLoggerConfig> MUTABLE_TRANSLATOR =
        new EventTranslatorTwoArg<Log4jEventWrapper, LogEvent, AsyncLoggerConfig>() {
    @Override
    public void translateTo(final Log4jEventWrapper ringBufferElement, final long sequence,
            final LogEvent logEvent, final AsyncLoggerConfig loggerConfig) {
        ((MutableLogEvent) ringBufferElement.event).initFrom(logEvent);
        ringBufferElement.loggerConfig = loggerConfig;
    }
};

都是完成一個事情給ringBufferElement元素的event賦值或者初始化。

事件進隊列的tryEnqueue方法邏輯分析

  • prepareEvent
  1. ensureImmutable:創建LogEvent的快照,傳遞進來的原LogEvent在應用線程還會被繼續修改,所以此處要創建快照
  2. Log4jLogEvent.makeMessageImmutable:格式化消息。因為是異步日志,針對需要format的消息在此處進行格式化,否則會因為引用對象值的改變導致日志不准確
  • tryPublishEvent
  1. 發布事件對象到disruptor隊列

事件出隊列的處理

先看按下調用棧:

Daemon Thread [Log4j2-TF-1-AsyncLoggerConfig--1] (Suspended (breakpoint at line 37 in PatternFormatter))	
	PatternFormatter.format(LogEvent, StringBuilder) line: 37	
	PatternLayout$PatternSerializer.toSerializable(LogEvent, StringBuilder) line: 334	
	PatternLayout.toText(AbstractStringLayout$Serializer2, LogEvent, StringBuilder) line: 233	
	PatternLayout.encode(LogEvent, ByteBufferDestination) line: 218	
	PatternLayout.encode(Object, ByteBufferDestination) line: 58	
	ConsoleAppender(AbstractOutputStreamAppender<M>).directEncodeEvent(LogEvent) line: 177	
	ConsoleAppender(AbstractOutputStreamAppender<M>).tryAppend(LogEvent) line: 170	
	ConsoleAppender(AbstractOutputStreamAppender<M>).append(LogEvent) line: 161	
	AppenderControl.tryCallAppender(LogEvent) line: 156	
	AppenderControl.callAppender0(LogEvent) line: 129	
	AppenderControl.callAppenderPreventRecursion(LogEvent) line: 120	
	AppenderControl.callAppender(LogEvent) line: 84	
	AsyncLoggerConfig(LoggerConfig).callAppenders(LogEvent) line: 448	
	AsyncLoggerConfig.asyncCallAppenders(LogEvent) line: 129	
	AsyncLoggerConfigDisruptor$Log4jEventWrapperHandler.onEvent(AsyncLoggerConfigDisruptor$Log4jEventWrapper, long, boolean) line: 111	
	AsyncLoggerConfigDisruptor$Log4jEventWrapperHandler.onEvent(Object, long, boolean) line: 97	
	BatchEventProcessor<T>.run() line: 129	
	Log4jThread(Thread).run() line: 748	

Log4jEventWrapperHandler.onEvent 111行是event.loggerConfig.asyncCallAppenders(event.event),參見上面EventHandler,此處完成日志真正寫出。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM