如果你還沒看過Flume-ng源碼解析系列中的啟動流程、Channel組件和Sink組件,可以點擊下面鏈接:
Flume-ng源碼解析之啟動流程
Flume-ng源碼解析之Channel組件
Flume-ng源碼解析之Sink組件
在前面三篇文章中我們初步了解了Flume的啟動流程、Channel組件和Sink組件,接下來我們一起來看看agent三大組件中Source組件。
1 Source
Source,作為agent中的消息來源組件,我們來看看它是如何將event傳遞給channel的和它的特性。
依然先看代碼:
@InterfaceAudience.Public
@InterfaceStability.Stable
public interface Source extends LifecycleAware, NamedComponent {
public void setChannelProcessor(ChannelProcessor channelProcessor);
public ChannelProcessor getChannelProcessor();
}
我們可以看到它里面定義的兩個需要實現方法是getChannelProcessor和setChannelProcessor,我們大概可以猜到,source就是通過ChannelProcessor將event傳輸給channel的。
這里先來了解一下Source的類型,Flume根據數據來源的特性將Source分成兩類類,像Http、netcat和exec等就是屬於事件驅動型(EventDrivenSource),而kafka和Jms等就是屬於輪詢拉取型(PollableSource)。
據我們在啟動流程中了解到的,Application是先啟動SourceRunner,再由SourceRunner來啟動source,那么既然source有兩種類型,那么Sourcerunner也分為EventDrivenSourceRunner和PollableSourceRunner,我們來看看它們的start():
EventDrivenSourceRunner
public class EventDrivenSourceRunner extends SourceRunner {
…
@Override
public void start() {
Source source = getSource();
ChannelProcessor cp = source.getChannelProcessor();
cp.initialize();
source.start();
lifecycleState = LifecycleState.START;
}
…
}
PollableSourceRunner
public class PollableSourceRunner extends SourceRunner {
…
@Override
public void start() {
PollableSource source = (PollableSource) getSource();
ChannelProcessor cp = source.getChannelProcessor();
cp.initialize();
source.start();
runner = new PollingRunner();
runner.source = source;
runner.counterGroup = counterGroup;
runner.shouldStop = shouldStop;
runnerThread = new Thread(runner);
runnerThread.setName(getClass().getSimpleName() + "-" +
source.getClass().getSimpleName() + "-" + source.getName());
runnerThread.start();
lifecycleState = LifecycleState.START;
}
…
public static class PollingRunner implements Runnable {
private PollableSource source;
private AtomicBoolean shouldStop;
private CounterGroup counterGroup;
@Override
public void run() {
logger.debug("Polling runner starting. Source:{}", source);
while (!shouldStop.get()) {
counterGroup.incrementAndGet("runner.polls");
try {
if (source.process().equals(PollableSource.Status.BACKOFF)) {
counterGroup.incrementAndGet("runner.backoffs");
Thread.sleep(Math.min(
counterGroup.incrementAndGet("runner.backoffs.consecutive")
* source.getBackOffSleepIncrement(), source.getMaxBackOffSleepInterval()));
} else {
counterGroup.set("runner.backoffs.consecutive", 0L);
}
} catch (InterruptedException e) {
logger.info("Source runner interrupted. Exiting");
counterGroup.incrementAndGet("runner.interruptions");
} catch (EventDeliveryException e) {
logger.error("Unable to deliver event. Exception follows.", e);
counterGroup.incrementAndGet("runner.deliveryErrors");
} catch (Exception e) {
counterGroup.incrementAndGet("runner.errors");
logger.error("Unhandled exception, logging and sleeping for " +
source.getMaxBackOffSleepInterval() + "ms", e);
try {
Thread.sleep(source.getMaxBackOffSleepInterval());
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
}
}
logger.debug("Polling runner exiting. Metrics:{}", counterGroup);
}
}
}
無論是PollableSourceRunner還是EventDrivenSourceRunner,都是調用它里面的source的start()。這個時候我們看到ChannelProcessor的存在,那么就會有疑惑,這ChannelProcessor哪來的?我們還是得看回AbstarctConfigurationProvider,查看里面的loadSources(),我們就會發現下面這段代碼:
ChannelSelectorConfiguration selectorConfig = config.getSelectorConfiguration();
ChannelSelector selector = ChannelSelectorFactory.create(sourceChannels, selectorConfig);
ChannelProcessor channelProcessor = new ChannelProcessor(selector);
Configurables.configure(channelProcessor, config);
source.setChannelProcessor(channelProcessor);
到這里我們基本已經了解了Source的啟動流程,下面以AvroSource為例看看,source是在哪里調用ChannelProcessor的插入方法。
2 AvroSource
public class AvroSource extends AbstractSource implements EventDrivenSource,
Configurable, AvroSourceProtocol {
…
@Override
public Status append(AvroFlumeEvent avroEvent) {
if (logger.isDebugEnabled()) {
if (LogPrivacyUtil.allowLogRawData()) {
logger.debug("Avro source {}: Received avro event: {}", getName(), avroEvent);
} else {
logger.debug("Avro source {}: Received avro event", getName());
}
}
sourceCounter.incrementAppendReceivedCount();
sourceCounter.incrementEventReceivedCount();
Event event = EventBuilder.withBody(avroEvent.getBody().array(),
toStringMap(avroEvent.getHeaders()));
try {
getChannelProcessor().processEvent(event);
} catch (ChannelException ex) {
logger.warn("Avro source " + getName() + ": Unable to process event. " +
"Exception follows.", ex);
return Status.FAILED;
}
sourceCounter.incrementAppendAcceptedCount();
sourceCounter.incrementEventAcceptedCount();
return Status.OK;
}
@Override
public Status appendBatch(List<AvroFlumeEvent> events) {
logger.debug("Avro source {}: Received avro event batch of {} events.",
getName(), events.size());
sourceCounter.incrementAppendBatchReceivedCount();
sourceCounter.addToEventReceivedCount(events.size());
List<Event> batch = new ArrayList<Event>();
for (AvroFlumeEvent avroEvent : events) {
Event event = EventBuilder.withBody(avroEvent.getBody().array(),
toStringMap(avroEvent.getHeaders()));
batch.add(event);
}
try {
getChannelProcessor().processEventBatch(batch);
} catch (Throwable t) {
logger.error("Avro source " + getName() + ": Unable to process event " +
"batch. Exception follows.", t);
if (t instanceof Error) {
throw (Error) t;
}
return Status.FAILED;
}
sourceCounter.incrementAppendBatchAcceptedCount();
sourceCounter.addToEventAcceptedCount(events.size());
return Status.OK;
}
…
}
在append方法中我們可以看到getChannelProcessor().processEvent(event);,所以不同的Source根據它的不同觸發機制和拉取機制,在特定的時候調用ChannelProcessor來執行event的插入。 ·
到此為止,我們就完成了對Flume啟動流程和三大組件的研究,鑒於能力,其中有些細節沒辦法深入研究,希望以后有時間能夠繼續深入分析下去。