Flume-ng源碼解析之Source組件


如果你還沒看過Flume-ng源碼解析系列中的啟動流程、Channel組件和Sink組件,可以點擊下面鏈接:
Flume-ng源碼解析之啟動流程
Flume-ng源碼解析之Channel組件
Flume-ng源碼解析之Sink組件

在前面三篇文章中我們初步了解了Flume的啟動流程、Channel組件和Sink組件,接下來我們一起來看看agent三大組件中Source組件。

1 Source

Source,作為agent中的消息來源組件,我們來看看它是如何將event傳遞給channel的和它的特性。

依然先看代碼:

@InterfaceAudience.Public
@InterfaceStability.Stable
public interface Source extends LifecycleAware, NamedComponent {
  public void setChannelProcessor(ChannelProcessor channelProcessor);
  public ChannelProcessor getChannelProcessor();
}

我們可以看到它里面定義的兩個需要實現方法是getChannelProcessor和setChannelProcessor,我們大概可以猜到,source就是通過ChannelProcessor將event傳輸給channel的。

這里先來了解一下Source的類型,Flume根據數據來源的特性將Source分成兩類類,像Http、netcat和exec等就是屬於事件驅動型(EventDrivenSource),而kafka和Jms等就是屬於輪詢拉取型(PollableSource)。

據我們在啟動流程中了解到的,Application是先啟動SourceRunner,再由SourceRunner來啟動source,那么既然source有兩種類型,那么Sourcerunner也分為EventDrivenSourceRunner和PollableSourceRunner,我們來看看它們的start():

EventDrivenSourceRunner

public class EventDrivenSourceRunner extends SourceRunner {
  …
  @Override
  public void start() {
    Source source = getSource();
    ChannelProcessor cp = source.getChannelProcessor();
    cp.initialize();
    source.start();
    lifecycleState = LifecycleState.START;
  }
  …
}

PollableSourceRunner

public class PollableSourceRunner extends SourceRunner {

  …
  @Override
  public void start() {
    PollableSource source = (PollableSource) getSource();
    ChannelProcessor cp = source.getChannelProcessor();
    cp.initialize();
    source.start();

    runner = new PollingRunner();

    runner.source = source;
    runner.counterGroup = counterGroup;
    runner.shouldStop = shouldStop;

    runnerThread = new Thread(runner);
    runnerThread.setName(getClass().getSimpleName() + "-" + 
        source.getClass().getSimpleName() + "-" + source.getName());
    runnerThread.start();

    lifecycleState = LifecycleState.START;
  }

  …
  public static class PollingRunner implements Runnable {

    private PollableSource source;
    private AtomicBoolean shouldStop;
    private CounterGroup counterGroup;

    @Override
    public void run() {
      logger.debug("Polling runner starting. Source:{}", source);

      while (!shouldStop.get()) {
        counterGroup.incrementAndGet("runner.polls");

        try {
          if (source.process().equals(PollableSource.Status.BACKOFF)) {
            counterGroup.incrementAndGet("runner.backoffs");

            Thread.sleep(Math.min(
                counterGroup.incrementAndGet("runner.backoffs.consecutive")
                * source.getBackOffSleepIncrement(), source.getMaxBackOffSleepInterval()));
          } else {
            counterGroup.set("runner.backoffs.consecutive", 0L);
          }
        } catch (InterruptedException e) {
          logger.info("Source runner interrupted. Exiting");
          counterGroup.incrementAndGet("runner.interruptions");
        } catch (EventDeliveryException e) {
          logger.error("Unable to deliver event. Exception follows.", e);
          counterGroup.incrementAndGet("runner.deliveryErrors");
        } catch (Exception e) {
          counterGroup.incrementAndGet("runner.errors");
          logger.error("Unhandled exception, logging and sleeping for " +
              source.getMaxBackOffSleepInterval() + "ms", e);
          try {
            Thread.sleep(source.getMaxBackOffSleepInterval());
          } catch (InterruptedException ex) {
            Thread.currentThread().interrupt();
          }
        }
      }

      logger.debug("Polling runner exiting. Metrics:{}", counterGroup);
    }

  }

}

無論是PollableSourceRunner還是EventDrivenSourceRunner,都是調用它里面的source的start()。這個時候我們看到ChannelProcessor的存在,那么就會有疑惑,這ChannelProcessor哪來的?我們還是得看回AbstarctConfigurationProvider,查看里面的loadSources(),我們就會發現下面這段代碼:

ChannelSelectorConfiguration selectorConfig = config.getSelectorConfiguration();
ChannelSelector selector = ChannelSelectorFactory.create(sourceChannels, selectorConfig);
ChannelProcessor channelProcessor = new ChannelProcessor(selector);
Configurables.configure(channelProcessor, config);
source.setChannelProcessor(channelProcessor);

到這里我們基本已經了解了Source的啟動流程,下面以AvroSource為例看看,source是在哪里調用ChannelProcessor的插入方法。

2 AvroSource

public class AvroSource extends AbstractSource implements EventDrivenSource,
    Configurable, AvroSourceProtocol {
  …
  @Override
  public Status append(AvroFlumeEvent avroEvent) {
    if (logger.isDebugEnabled()) {
      if (LogPrivacyUtil.allowLogRawData()) {
        logger.debug("Avro source {}: Received avro event: {}", getName(), avroEvent);
      } else {
        logger.debug("Avro source {}: Received avro event", getName());
      }
    }

    sourceCounter.incrementAppendReceivedCount();
    sourceCounter.incrementEventReceivedCount();

    Event event = EventBuilder.withBody(avroEvent.getBody().array(),
        toStringMap(avroEvent.getHeaders()));

    try {
      getChannelProcessor().processEvent(event);
    } catch (ChannelException ex) {
      logger.warn("Avro source " + getName() + ": Unable to process event. " +
          "Exception follows.", ex);
      return Status.FAILED;
    }

    sourceCounter.incrementAppendAcceptedCount();
    sourceCounter.incrementEventAcceptedCount();

    return Status.OK;
  }

  @Override
  public Status appendBatch(List<AvroFlumeEvent> events) {
    logger.debug("Avro source {}: Received avro event batch of {} events.",
        getName(), events.size());
    sourceCounter.incrementAppendBatchReceivedCount();
    sourceCounter.addToEventReceivedCount(events.size());

    List<Event> batch = new ArrayList<Event>();

    for (AvroFlumeEvent avroEvent : events) {
      Event event = EventBuilder.withBody(avroEvent.getBody().array(),
          toStringMap(avroEvent.getHeaders()));

      batch.add(event);
    }

    try {
      getChannelProcessor().processEventBatch(batch);
    } catch (Throwable t) {
      logger.error("Avro source " + getName() + ": Unable to process event " +
          "batch. Exception follows.", t);
      if (t instanceof Error) {
        throw (Error) t;
      }
      return Status.FAILED;
    }

    sourceCounter.incrementAppendBatchAcceptedCount();
    sourceCounter.addToEventAcceptedCount(events.size());

    return Status.OK;
  }
  …
}

在append方法中我們可以看到getChannelProcessor().processEvent(event);,所以不同的Source根據它的不同觸發機制和拉取機制,在特定的時候調用ChannelProcessor來執行event的插入。 ·

到此為止,我們就完成了對Flume啟動流程和三大組件的研究,鑒於能力,其中有些細節沒辦法深入研究,希望以后有時間能夠繼續深入分析下去。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM