最近在學習Flume源碼,所以想寫一份Flume源碼學習的筆記供需要的朋友一起學習參考。
1、Flume介紹
Flume是cloudera公司開源的一款分布式、可靠地進行大量日志數據采集、聚合和並轉移到存儲中;通過事務機制提供了可靠的消息傳輸支持,自帶負載均衡機制來支撐水平擴展;並且提供了一些默認組件供直接使用。
Flume目前常見的應用場景:日志--->Flume--->實時計算(如Kafka+Storm) 、日志--->Flume--->離線計算(如HDFS、HBase)、日志--->Flume--->ElasticSearch。
2、整體架構
Flume主要分為三個組件:Source、Channel、Sink;數據流如下圖所示:
1、Source負責日志流入,比如從文件、網絡、Kafka等數據源流入數據,數據流入的方式有兩種輪訓拉取和事件驅動;
2、Channel負責數據聚合/暫存,比如暫存到內存、本地文件、數據庫、Kafka等,日志數據不會在管道停留很長時間,很快會被Sink消費掉;
3、Sink負責數據轉移到存儲,比如從Channel拿到日志后直接存儲到HDFS、HBase、Kafka、ElasticSearch等,然后再有如Hadoop、Storm、ElasticSearch之類的進行數據分析或查詢。
一個Agent會同時存在這三個組件,Source和Sink都是異步執行的,相互之間不會影響。
假設我們有采集並索引Nginx訪問日志,我們可以按照如下方式部署:
1、Agent和Web Server是部署在同一台機器;
2、Source使用ExecSource並使用tail命令采集日志;
3、Channel使用MemoryChannel,因為日志數據丟點也不算什么大問題;
4、Sink使用ElasticSearchSink寫入到ElasticSearch,此處可以配置多個ElasticSearch服務器IP:PORT列表以便提升處理能力。
以上介紹了日志是如何流的,對於復雜的日志采集,我們需要對Source日志進行過濾、寫到多個Channel、對Sink進行失敗處理/負載均衡等處理,這些Flume默認都提供了支持:
1、Source采集的日志會傳入ChannelProcessor組件,其首先通過Interceptor進行日志過濾,如果接觸過Servlet的話這個概念是類似的,可以參考《Servlet3.1規范翻譯——過濾器 》 ;過濾器可以過濾掉日志,也可以修改日志內容;
2、過濾完成后接下來會交給ChannelSelector進行處理,默認提供了兩種選擇器:復制或多路復用選擇器;復制即把一個日志復制到多個Channel;而多路復用會根據配置的選擇器條件,把符合條件的路由到相應的Channel;在寫多個Channel時可能存在存在失敗的情況,對於失敗的處理有兩種:稍后重試或者忽略。重試一般采用指數級時間進行重試。
我們之前說過Source生產日志給Channel、Sink從Channel消費日志;它倆完全是異步的,因此Sink只需要監聽自己關系的Channel變化即可。
到此我們可以對Source日志進行過濾/修改,把一個消息復制/路由到多個Channel,對於Sink的話也應該存在寫失敗的情況,Flume默認提供了如下策略:
默認策略就是一個Sink,失敗了則這個事務就失敗了,會稍后重試。
Flume還提供了故障轉移策略:
Failover策略是給多個Sink定義優先級,假設其中一個失敗了,則路由到下一個優先級的Sink;Sink只要拋出一次異常就會被認為是失敗了,則從存活Sink中移除,然后指數級時間等待重試,默認是等待1s開始重試,最大等待重試時間是30s。
Flume也提供了負載均衡策略:
負載均衡算法默認提供了兩種:輪訓和隨機;其通過抽象一個類似ChannelSelector的SinkSelector進行選擇,失敗補償機制和Failover中的算法類似,但是默認是關閉失敗補償的,需要配置backoff參數為true開啟。
到此Flume涉及的一些核心組件就介紹完了,對於Source和Sink如何異步、Channel提供的事務機制等我們后續分析組件時再講。
假設我們需要采集非常多的客戶端日志並對他們進行一些緩沖或集中的處理,就可以部署一個聚合層,整體架構類似於如下:
1、首先是日志采集層,該層的Agent和應用部署在同一台機器上,負責采集如Nginx訪問日志;然后通過RPC將日志流入到收集/聚合層;在這一層應該快速的采集到日志然后流入到收集/聚合層;
2、收集/聚合層進行日志的收集或聚合,並且可以進行容錯處理,如故障轉移或負載均衡,以提升可靠性;另外可以在該層開啟文件Channel,做數據緩沖區;
3、收集/聚合層對數據進行過濾或修改然后進行存儲或處理;比如存儲到HDFS,或者流入Kafka然后通過Storm對數據進行實時處理。
到此從Flume核心組件到一般的部署架構我們就大體了解了,而涉及的一些實現細節在接下來的部分進行詳細介紹。
首先所有核心組件都會實現org.apache.flume.lifecycle.LifecycleAware接口:
- public interface LifecycleAware {
- public void start();
- public void stop();
- public LifecycleState getLifecycleState();
- }
start方法在整個Flume啟動時或者初始化組件時都會調用start方法進行組件初始化,Flume組件出現異常停止時會調用stop,getLifecycleState返回組件的生命周期狀態,有IDLE, START, STOP, ERROR四個狀態。
如果開發的組件需要配置,如設置一些屬性;可以實現org.apache.flume.conf.Configurable接口:
- public interface Configurable {
- public void configure(Context context);
- }
Flume在啟動組件之前會調用configure來初始化組件一些配置。
1、Source
Source用於采集日志數據,有兩種實現方式:輪訓拉取和事件驅動機制;Source接口如下:
- public interface Source extends LifecycleAware, NamedComponent {
- public void setChannelProcessor(ChannelProcessor channelProcessor);
- public ChannelProcessor getChannelProcessor();
- }
Source接口首先繼承了LifecycleAware接口,然后只提供了ChannelProcessor的setter和getter接口,也就是說它的的所有邏輯的實現應該在LifecycleAware接口的start和stop中實現;ChannelProcessor之前介紹過用來進行日志流的過濾和Channel的選擇及調度。
而Source是通過SourceFactory工廠創建,默認提供了DefaultSourceFactory,其首先通過Enum類型org.apache.flume.conf.source.SourceType查找默認實現,如exec,則找到org.apache.flume.source.ExecSource實現,如果找不到直接Class.forName(className)創建。
Source提供了兩種機制: PollableSource(輪訓拉取)和EventDrivenSource(事件驅動):
PollableSource默認提供了如下實現:
比如JMSSource實現使用javax.jms.MessageConsumer.receive(pollTimeout)主動去拉取消息。
EventDrivenSource默認提供了如下實現:
比如NetcatSource、HttpSource就是事件驅動,即被動等待;比如HttpSource就是內部啟動了一個內嵌的Jetty啟動了一個Servlet容器,通過FlumeHTTPServlet去接收消息。
Flume提供了SourceRunner用來啟動Source的流轉:
- public class EventDrivenSourceRunner extends SourceRunner {
- private LifecycleState lifecycleState;
- public EventDrivenSourceRunner() {
- lifecycleState = LifecycleState.IDLE; //啟動之前是空閑狀態
- }
- @Override
- public void start() {
- Source source = getSource(); //獲取Source
- ChannelProcessor cp = source.getChannelProcessor(); //Channel處理器
- cp.initialize(); //初始化Channel處理器
- source.start(); //啟動Source
- lifecycleState = LifecycleState.START; //本組件狀態改成啟動狀態
- }
- @Override
- public void stop() {
- Source source = getSource(); //先停Source
- source.stop();
- ChannelProcessor cp = source.getChannelProcessor();
- cp.close();//再停Channel處理器
- lifecycleState = LifecycleState.STOP; //本組件狀態改成停止狀態
- }
- }
從本組件也可以看出:1、首先要初始化ChannelProcessor,其實現時初始化過濾器鏈;2、接着啟動Source並更改本組件的狀態。
- public class PollableSourceRunner extends SourceRunner {
- @Override
- public void start() {
- PollableSource source = (PollableSource) getSource();
- ChannelProcessor cp = source.getChannelProcessor();
- cp.initialize();
- source.start();
- runner = new PollingRunner();
- runner.source = source;
- runner.counterGroup = counterGroup;
- runner.shouldStop = shouldStop;
- runnerThread = new Thread(runner);
- runnerThread.setName(getClass().getSimpleName() + "-" +
- source.getClass().getSimpleName() + "-" + source.getName());
- runnerThread.start();
- lifecycleState = LifecycleState.START;
- }
- }
而PollingRunner首先初始化組件,但是又啟動了一個線程PollingRunner,其作用就是輪訓拉取數據:
- @Override
- public void run() {
- while (!shouldStop.get()) { //如果沒有停止,則一直在死循環運行
- counterGroup.incrementAndGet("runner.polls");
- try {
- //調用PollableSource的process方法進行輪訓拉取,然后判斷是否遇到了失敗補償
- if (source.process().equals(PollableSource.Status.BACKOFF)) {/
- counterGroup.incrementAndGet("runner.backoffs");
- //失敗補償時暫停線程處理,等待超時時間之后重試
- Thread.sleep(Math.min(
- counterGroup.incrementAndGet("runner.backoffs.consecutive")
- * source.getBackOffSleepIncrement(), source.getMaxBackOffSleepInterval()));
- } else {
- counterGroup.set("runner.backoffs.consecutive", 0L);
- }
- } catch (InterruptedException e) {
- }
- }
- }
- }
- }
Flume在啟動時會判斷Source是PollableSource還是EventDrivenSource來選擇使用PollableSourceRunner還是EventDrivenSourceRunner。
比如HttpSource實現,其通過FlumeHTTPServlet接收消息然后:
- List<Event> events = Collections.emptyList(); //create empty list
- //首先從請求中獲取Event
- events = handler.getEvents(request);
- //然后交給ChannelProcessor進行處理
- getChannelProcessor().processEventBatch(events);
到此基本的Source流程就介紹完了,其作用就是監聽日志,采集,然后交給ChannelProcessor進行處理。
2、Channel
Channel用於連接Source和Sink,Source生產日志發送到Channel,Sink從Channel消費日志;也就是說通過Channel實現了Source和Sink的解耦,可以實現多對多的關聯,和Source、Sink的異步化。
之前Source采集到日志后會交給ChannelProcessor處理,那么接下來我們先從ChannelProcessor入手,其依賴三個組件:
- private final ChannelSelector selector; //Channel選擇器
- private final InterceptorChain interceptorChain; //攔截器鏈
- private ExecutorService execService; //用於實現可選Channel的ExecutorService,默認是單線程實現
接下來看下其是如何處理Event的:
- public void processEvent(Event event) {
- event = interceptorChain.intercept(event); //首先進行攔截器鏈過濾
- if (event == null) {
- return;
- }
- List<Event> events = new ArrayList<Event>(1);
- events.add(event);
- //通過Channel選擇器獲取必須成功處理的Channel,然后事務中執行
- List<Channel> requiredChannels = selector.getRequiredChannels(event);
- for (Channel reqChannel : requiredChannels) {
- executeChannelTransaction(reqChannel, events, false);
- }
- //通過Channel選擇器獲取可選的Channel,這些Channel失敗是可以忽略,不影響其他Channel的處理
- List<Channel> optionalChannels = selector.getOptionalChannels(event);
- for (Channel optChannel : optionalChannels) {
- execService.submit(new OptionalChannelTransactionRunnable(optChannel, events));
- }
- }
另外內部還提供了批處理實現方法processEventBatch;對於內部事務實現的話可以參考executeChannelTransaction方法,整體事務機制類似於JDBC:
- private static void executeChannelTransaction(Channel channel, List<Event> batch, boolean isOptional) {
- //1、獲取Channel上的事務
- Transaction tx = channel.getTransaction();
- Preconditions.checkNotNull(tx, "Transaction object must not be null");
- try {
- //2、開啟事務
- tx.begin();
- //3、在Channel上執行批量put操作
- for (Event event : batch) {
- channel.put(event);
- }
- //4、成功后提交事務
- tx.commit();
- } catch (Throwable t) {
- //5、異常后回滾事務
- tx.rollback();
- if (t instanceof Error) {
- LOG.error("Error while writing to channel: " +
- channel, t);
- throw (Error) t;
- } else if(!isOptional) {//如果是可選的Channel,異常忽略
- throw new ChannelException("Unable to put batch on required " +
- "channel: " + channel, t);
- }
- } finally {
- //最后關閉事務
- tx.close();
- }
- }
Interceptor用於過濾Event,即傳入一個Event然后進行過濾加工,然后返回一個新的Event,接口如下:
- public interface Interceptor {
- public void initialize();
- public Event intercept(Event event);
- public List<Event> intercept(List<Event> events);
- public void close();
- }
可以看到其提供了initialize和close方法用於啟動和關閉;intercept方法用於過濾或加工Event。比如HostInterceptor攔截器用於獲取本機IP然后默認添加到Event的字段為host的Header中。
接下來就是ChannelSelector選擇器了,其通過如下方式創建:
- //獲取ChannelSelector配置,比如agent.sources.s1.selector.type = replicating
- ChannelSelectorConfiguration selectorConfig = config.getSelectorConfiguration();
- //使用Source關聯的Channel創建,比如agent.sources.s1.channels = c1 c2
- ChannelSelector selector = ChannelSelectorFactory.create(sourceChannels, selectorConfig);
ChannelSelector默認提供了兩種實現:復制和多路復用:
默認實現是復制選擇器ReplicatingChannelSelector,即把接收到的消息復制到每一個Channel;多路復用選擇器MultiplexingChannelSelector會根據Event Header中的參數進行選擇,以此來選擇使用哪個Channel。
而Channel是Event中轉的地方,Source發布Event到Channel,Sink消費Channel的Event;Channel接口提供了如下接口用來實現Event流轉:
- public interface Channel extends LifecycleAware, NamedComponent {
- public void put(Event event) throws ChannelException;
- public Event take() throws ChannelException;
- public Transaction getTransaction();
- }
put用於發布Event,take用於消費Event,getTransaction用於事務支持。默認提供了如下Channel的實現:
對於Channel的實現我們后續單獨章節介紹。
3、Sink
Sink從Channel消費Event,然后進行轉移到收集/聚合層或存儲層。Sink接口如下所示:
- public interface Sink extends LifecycleAware, NamedComponent {
- public void setChannel(Channel channel);
- public Channel getChannel();
- public Status process() throws EventDeliveryException;
- public static enum Status {
- READY, BACKOFF
- }
- }
類似於Source,其首先繼承了LifecycleAware,然后提供了Channel的getter/setter方法,並提供了process方法進行消費,此方法會返回消費的狀態,READY或BACKOFF。
Sink也是通過SinkFactory工廠來創建,其也提供了DefaultSinkFactory默認工廠,比如傳入hdfs,會先查找Enum org.apache.flume.conf.sink.SinkType,然后找到相應的默認處理類org.apache.flume.sink.hdfs.HDFSEventSink,如果沒找到默認處理類,直接通過Class.forName(className)進行反射創建。
我們知道Sink還提供了分組功能,用於把多個Sink聚合為一組進行使用,內部提供了SinkGroup用來完成這個事情。此時問題來了,如何去調度多個Sink,其內部使用了SinkProcessor來完成這個事情,默認提供了故障轉移和負載均衡兩個策略。
首先SinkGroup就是聚合多個Sink為一組,然后將多個Sink傳給SinkProcessorFactory進行創建SinkProcessor,而策略是根據配置文件中配置的如agent.sinkgroups.g1.processor.type = load_balance來選擇的。
SinkProcessor提供了如下實現:
DefaultSinkProcessor:默認實現,用於單個Sink的場景使用。
FailoverSinkProcessor:故障轉移實現:
- public Status process() throws EventDeliveryException {
- Long now = System.currentTimeMillis();
- //1、首先檢查失敗隊列的頭部的Sink是否已經過了失敗補償等待時間了
- while(!failedSinks.isEmpty() && failedSinks.peek().getRefresh() < now) {
- //2、如果可以使用了,則從失敗Sink隊列獲取隊列第一個Sink
- FailedSink cur = failedSinks.poll();
- Status s;
- try {
- s = cur.getSink().process(); //3、使用此Sink進行處理
- if (s == Status.READY) { //4、如果處理成功
- liveSinks.put(cur.getPriority(), cur.getSink()); //4.1、放回存活Sink隊列
- activeSink = liveSinks.get(liveSinks.lastKey());
- } else {
- failedSinks.add(cur); //4.2、如果此時不是READY,即BACKOFF期間,再次放回失敗隊列
- }
- return s;
- } catch (Exception e) {
- cur.incFails(); //5、如果遇到異常了,則增加失敗次數,並放回失敗隊列
- failedSinks.add(cur);
- }
- }
- Status ret = null;
- while(activeSink != null) { //6、此時失敗隊列中沒有Sink能處理了,那么需要使用存活Sink隊列進行處理
- try {
- ret = activeSink.process();
- return ret;
- } catch (Exception e) { //7、處理失敗進行轉移到失敗隊列
- activeSink = moveActiveToDeadAndGetNext();
- }
- }
- throw new EventDeliveryException("All sinks failed to process, " +
- "nothing left to failover to");
- }
失敗隊列是一個優先級隊列,使用refresh屬性排序,而refresh是通過如下機制計算的:
- refresh = System.currentTimeMillis()
- + Math.min(maxPenalty, (1 << sequentialFailures) * FAILURE_PENALTY);
其中maxPenalty是最大等待時間,默認30s,而(1 << sequentialFailures) * FAILURE_PENALTY)用於實現指數級等待時間遞增, FAILURE_PENALTY是1s。
LoadBalanceSinkProcessor:用於實現Sink的負載均衡,其通過SinkSelector進行實現,類似於ChannelSelector。LoadBalanceSinkProcessor在啟動時會根據配置,如agent.sinkgroups.g1.processor.selector = random進行選擇,默認提供了兩種選擇器:
LoadBalanceSinkProcessor使用如下機制進行負載均衡:
- public Status process() throws EventDeliveryException {
- Status status = null;
- //1、使用選擇器創建相應的迭代器,也就是用來選擇Sink的迭代器
- Iterator<Sink> sinkIterator = selector.createSinkIterator();
- while (sinkIterator.hasNext()) {
- Sink sink = sinkIterator.next();
- try {
- //2、選擇器迭代Sink進行處理,如果成功直接break掉這次處理,此次負載均衡就算完成了
- status = sink.process();
- break;
- } catch (Exception ex) {
- //3、失敗后會通知選擇器,采取相應的失敗退避補償算法進行處理
- selector.informSinkFailed(sink);
- LOGGER.warn("Sink failed to consume event. "
- + "Attempting next sink if available.", ex);
- }
- }
- if (status == null) {
- throw new EventDeliveryException("All configured sinks have failed");
- }
- return status;
- }
如上的核心就是怎么創建迭代器,如何進行失敗退避補償處理,首先我們看下RoundRobinSinkSelector實現,其內部是通過通用的RoundRobinOrderSelector選擇器實現:
- public Iterator<T> createIterator() {
- //1、獲取存活的Sink索引,
- List<Integer> activeIndices = getIndexList();
- int size = activeIndices.size();
- //2、如果上次記錄的下一個存活Sink的位置超過了size,那么從隊列頭重新開始計數
- if (nextHead >= size) {
- nextHead = 0;
- }
- //3、獲取本次使用的起始位置
- int begin = nextHead++;
- if (nextHead == activeIndices.size()) {
- nextHead = 0;
- }
- //4、從該位置開始迭代,其實現類似於環形隊列,比如整個隊列是5,起始位置是3,則按照 3、4、0、1、2的順序進行輪訓,實現了輪訓算法
- int[] indexOrder = new int[size];
- for (int i = 0; i < size; i++) {
- indexOrder[i] = activeIndices.get((begin + i) % size);
- }
- //indexOrder是迭代順序,getObjects返回相關的Sinks;
- return new SpecificOrderIterator<T>(indexOrder, getObjects());
- }
getIndexList實現如下:
- protected List<Integer> getIndexList() {
- long now = System.currentTimeMillis();
- List<Integer> indexList = new ArrayList<Integer>();
- int i = 0;
- for (T obj : stateMap.keySet()) {
- if (!isShouldBackOff() || stateMap.get(obj).restoreTime < now) {
- indexList.add(i);
- }
- i++;
- }
- return indexList;
- }
isShouldBackOff()表示是否開啟退避算法支持,如果不開啟,則認為每個Sink都是存活的,每次都會重試,通過agent.sinkgroups.g1.processor.backoff = true配置開啟,默認false;restoreTime和之前介紹的refresh一樣,是退避補償等待時間,算法類似,就不多介紹了。
那么什么時候調用Sink進行消費呢?其類似於SourceRunner,Sink提供了SinkRunner進行輪訓拉取處理,SinkRunner會輪訓調度SinkProcessor消費Channel的消息,然后調用Sink進行轉移。SinkProcessor之前介紹過,其負責消息復制/路由。
SinkRunner實現如下:
- public void start() {
- SinkProcessor policy = getPolicy();
- policy.start();
- runner = new PollingRunner();
- runner.policy = policy;
- runner.counterGroup = counterGroup;
- runner.shouldStop = new AtomicBoolean();
- runnerThread = new Thread(runner);
- runnerThread.setName("SinkRunner-PollingRunner-" +
- policy.getClass().getSimpleName());
- runnerThread.start();
- lifecycleState = LifecycleState.START;
- }
即獲取SinkProcessor然后啟動它,接着啟動輪訓線程去處理。PollingRunner線程負責輪訓消息,核心實現如下:
- public void run() {
- while (!shouldStop.get()) { //如果沒有停止
- try {
- if (policy.process().equals(Sink.Status.BACKOFF)) {//如果處理失敗了,進行退避補償處理
- counterGroup.incrementAndGet("runner.backoffs");
- Thread.sleep(Math.min(
- counterGroup.incrementAndGet("runner.backoffs.consecutive")
- * backoffSleepIncrement, maxBackoffSleep)); //暫停退避補償設定的超時時間
- } else {
- counterGroup.set("runner.backoffs.consecutive", 0L);
- }
- } catch (Exception e) {
- try {
- Thread.sleep(maxBackoffSleep); //如果遇到異常則等待最大退避時間
- } catch (InterruptedException ex) {
- Thread.currentThread().interrupt();
- }
- }
- }
- }
整體實現類似於PollableSourceRunner實現,整體處理都是交給SinkProcessor完成的。SinkProcessor會輪訓Sink的process方法進行處理;此處以LoggerSink為例:
- @Override
- public Status process() throws EventDeliveryException {
- Status result = Status.READY;
- Channel channel = getChannel();
- //1、獲取事務
- Transaction transaction = channel.getTransaction();
- Event event = null;
- try {
- //2、開啟事務
- transaction.begin();
- //3、從Channel獲取Event
- event = channel.take();
- if (event != null) {
- if (logger.isInfoEnabled()) {
- logger.info("Event: " + EventHelper.dumpEvent(event, maxBytesToLog));
- }
- } else {//4、如果Channel中沒有Event,則默認進入故障補償機制,即防止死循環造成CPU負載高
- result = Status.BACKOFF;
- }
- //5、成功后提交事務
- transaction.commit();
- } catch (Exception ex) {
- //6、失敗后回滾事務
- transaction.rollback();
- throw new EventDeliveryException("Failed to log event: " + event, ex);
- } finally {
- //7、關閉事務
- transaction.close();
- }
- return result;
- }
Sink中一些實現是支持批處理的,比如RollingFileSink:
- //1、開啟事務
- //2、批處理
- for (int i = 0; i < batchSize; i++) {
- event = channel.take();
- if (event != null) {
- sinkCounter.incrementEventDrainAttemptCount();
- eventAttemptCounter++;
- serializer.write(event);
- }
- }
- //3、提交/回滾事務、關閉事務
定義一個批處理大小然后在事務中執行批處理。
4、整體流程
從以上部分我們可以看出,不管是Source還是Sink都依賴Channel,那么啟動時應該先啟動Channel然后再啟動Source或Sink即可。
Flume有兩種啟動方式:使用EmbeddedAgent內嵌在Java應用中或使用Application單獨啟動一個進程,此處我們已Application分析為主。
首先進入org.apache.flume.node.Application的main方法啟動:
- //1、設置默認值啟動參數、參數是否必須的
- Options options = new Options();
- Option option = new Option("n", "name", true, "the name of this agent");
- option.setRequired(true);
- options.addOption(option);
- option = new Option("f", "conf-file", true,
- "specify a config file (required if -z missing)");
- option.setRequired(false);
- options.addOption(option);
- //2、接着解析命令行參數
- CommandLineParser parser = new GnuParser();
- CommandLine commandLine = parser.parse(options, args);
- String agentName = commandLine.getOptionValue('n');
- boolean reload = !commandLine.hasOption("no-reload-conf");
- if (commandLine.hasOption('z') || commandLine.hasOption("zkConnString")) {
- isZkConfigured = true;
- }
- if (isZkConfigured) {
- //3、如果是通過ZooKeeper配置,則使用ZooKeeper參數啟動,此處忽略,我們以配置文件講解
- } else {
- //4、打開配置文件,如果不存在則快速失敗
- File configurationFile = new File(commandLine.getOptionValue('f'));
- if (!configurationFile.exists()) {
- throw new ParseException(
- "The specified configuration file does not exist: " + path);
- }
- List<LifecycleAware> components = Lists.newArrayList();
- if (reload) { //5、如果需要定期reload配置文件,則走如下方式
- //5.1、此處使用Guava提供的事件總線
- EventBus eventBus = new EventBus(agentName + "-event-bus");
- //5.2、讀取配置文件,使用定期輪訓拉起策略,默認30s拉取一次
- PollingPropertiesFileConfigurationProvider configurationProvider =
- new PollingPropertiesFileConfigurationProvider(
- agentName, configurationFile, eventBus, 30);
- components.add(configurationProvider);
- application = new Application(components); //5.3、向Application注冊組件
- //5.4、向事件總線注冊本應用,EventBus會自動注冊Application中使用@Subscribe聲明的方法
- eventBus.register(application);
- } else { //5、配置文件不支持定期reload
- PropertiesFileConfigurationProvider configurationProvider =
- new PropertiesFileConfigurationProvider(
- agentName, configurationFile);
- application = new Application();
- //6.2、直接使用配置文件初始化Flume組件
- application.handleConfigurationEvent(configurationProvider
- .getConfiguration());
- }
- }
- //7、啟動Flume應用
- application.start();
- //8、注冊虛擬機關閉鈎子,當虛擬機關閉時調用Application的stop方法進行終止
- final Application appReference = application;
- Runtime.getRuntime().addShutdownHook(new Thread("agent-shutdown-hook") {
- @Override
- public void run() {
- appReference.stop();
- }
- });
以上流程只提取了核心代碼中的一部分,比如ZK的實現直接忽略了,而Flume啟動大體流程如下:
1、讀取命令行參數;
2、讀取配置文件;
3、根據是否需要reload使用不同的策略初始化Flume;如果需要reload,則使用Guava的事件總線實現,Application的handleConfigurationEvent是事件訂閱者,PollingPropertiesFileConfigurationProvider是事件發布者,其會定期輪訓檢查文件是否變更,如果變更則重新讀取配置文件,發布配置文件事件變更,而handleConfigurationEvent會收到該配置變更重新進行初始化;
4、啟動Application,並注冊虛擬機關閉鈎子。
handleConfigurationEvent方法比較簡單,首先調用了stopAllComponents停止所有組件,接着調用startAllComponents使用配置文件初始化所有組件:
- @Subscribe
- public synchronized void handleConfigurationEvent(MaterializedConfiguration conf) {
- stopAllComponents();
- startAllComponents(conf);
- }
MaterializedConfiguration存儲Flume運行時需要的組件:Source、Channel、Sink、SourceRunner、SinkRunner等,其是通過ConfigurationProvider進行初始化獲取,比如PollingPropertiesFileConfigurationProvider會讀取配置文件然后進行組件的初始化。
對於startAllComponents實現大體如下:
- //1、首先啟動Channel
- supervisor.supervise(Channels,
- new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
- //2、確保所有Channel是否都已啟動
- for(Channel ch: materializedConfiguration.getChannels().values()){
- while(ch.getLifecycleState() != LifecycleState.START
- && !supervisor.isComponentInErrorState(ch)){
- try {
- Thread.sleep(500);
- } catch (InterruptedException e) {
- Throwables.propagate(e);
- }
- }
- }
- //3、啟動SinkRunner
- supervisor.supervise(SinkRunners,
- new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
- //4、啟動SourceRunner
- supervisor.supervise(SourceRunner,
- new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
- //5、初始化監控服務
- this.loadMonitoring();
從如下代碼中可以看到,首先要准備好Channel,因為Source和Sink會操作它,對於Channel如果初始化失敗則整個流程是失敗的;然后啟動SinkRunner,先准備好消費者;接着啟動SourceRunner開始進行采集日志。此處我們發現有兩個單獨的組件LifecycleSupervisor和MonitorService,一個是組件守護哨兵,一個是監控服務。守護哨兵對這些組件進行守護,假設出問題了默認策略是自動重啟這些組件。
對於stopAllComponents實現大體如下:
- //1、首先停止SourceRunner
- supervisor.unsupervise(SourceRunners);
- //2、接着停止SinkRunner
- supervisor.unsupervise(SinkRunners);
- //3、然后停止Channel
- supervisor.unsupervise(Channels);
- //4、最后停止MonitorService
- monitorServer.stop();
此處可以看出,停止的順序是Source、Sink、Channel,即先停止生產,再停止消費,最后停止管道。
Application中的start方法代碼實現如下:
- public synchronized void start() {
- for(LifecycleAware component : components) {
- supervisor.supervise(component,
- new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
- }
- }
其循環Application注冊的組件,然后守護哨兵對它進行守護,默認策略是出現問題會自動重啟組件,假設我們支持reload配置文件,則之前啟動Application時注冊過PollingPropertiesFileConfigurationProvider組件,即該組件會被守護哨兵守護着,出現問題默認策略自動重啟。
而Application關閉執行了如下動作:
- public synchronized void stop() {
- supervisor.stop();
- if(monitorServer != null) {
- monitorServer.stop();
- }
- }
即關閉守護哨兵和監控服務。
到此基本的Application分析結束了,我們還有很多疑問,守護哨兵怎么實現的。
整體流程可以總結為:
1、首先初始化命令行配置;
2、接着讀取配置文件;
3、根據是否需要reload初始化配置文件中的組件;如果需要reload會使用Guava事件總線進行發布訂閱變化;
4、接着創建Application,創建守護哨兵,並先停止所有組件,接着啟動所有組件;啟動順序:Channel、SinkRunner、SourceRunner,並把這些組件注冊給守護哨兵、初始化監控服務;停止順序:SourceRunner、SinkRunner、Channel;
5、如果配置文件需要定期reload,則需要注冊Polling***ConfigurationProvider到守護哨兵;
6、最后注冊虛擬機關閉鈎子,停止守護哨兵和監控服務。
輪訓實現的SourceRunner 和SinkRunner會創建一個線程進行工作,之前已經介紹了其工作方式。接下來我們看下守護哨兵的實現。
首先創建LifecycleSupervisor:
- //1、用於存放被守護的組件
- supervisedProcesses = new HashMap<LifecycleAware, Supervisoree>();
- //2、用於存放正在被監控的組件
- monitorFutures = new HashMap<LifecycleAware, ScheduledFuture<?>>();
- //3、創建監控服務線程池
- monitorService = new ScheduledThreadPoolExecutor(10,
- new ThreadFactoryBuilder().setNameFormat(
- "lifecycleSupervisor-" + Thread.currentThread().getId() + "-%d")
- .build());
- monitorService.setMaximumPoolSize(20);
- monitorService.setKeepAliveTime(30, TimeUnit.SECONDS);
- //4、定期清理被取消的組件
- purger = new Purger();
- //4.1、默認不進行清理
- needToPurge = false;
LifecycleSupervisor啟動時會進行如下操作:
- public synchronized void start() {
- monitorService.scheduleWithFixedDelay(purger, 2, 2, TimeUnit.HOURS);
- lifecycleState = LifecycleState.START;
- }
首先每隔兩個小時執行清理組件,然后改變狀態為啟動。而LifecycleSupervisor停止時直接停止了監控服務,然后更新守護組件狀態為STOP:
- //1、首先停止守護監控服務
- if (monitorService != null) {
- monitorService.shutdown();
- try {
- monitorService.awaitTermination(10, TimeUnit.SECONDS);
- } catch (InterruptedException e) {
- logger.error("Interrupted while waiting for monitor service to stop");
- }
- }
- //2、更新所有守護組件狀態為STOP,並調用組件的stop方法進行停止
- for (final Entry<LifecycleAware, Supervisoree> entry : supervisedProcesses.entrySet()) {
- if (entry.getKey().getLifecycleState().equals(LifecycleState.START)) {
- entry.getValue().status.desiredState = LifecycleState.STOP;
- entry.getKey().stop();
- }
- }
- //3、更新本組件狀態
- if (lifecycleState.equals(LifecycleState.START)) {
- lifecycleState = LifecycleState.STOP;
- }
- //4、最后的清理
- supervisedProcesses.clear();
- monitorFutures.clear();
接下來就是調用supervise進行組件守護了:
- if(this.monitorService.isShutdown() || this.monitorService.isTerminated()
- || this.monitorService.isTerminating()){
- //1、如果哨兵已停止則拋出異常,不再接收任何組件進行守護
- }
- //2、初始化守護組件
- Supervisoree process = new Supervisoree();
- process.status = new Status();
- //2.1、默認策略是失敗重啟
- process.policy = policy;
- //2.2、初始化組件默認狀態,大多數組件默認為START
- process.status.desiredState = desiredState;
- process.status.error = false;
- //3、組件監控器,用於定時獲取組件的最新狀態,或者重新啟動組件
- MonitorRunnable monitorRunnable = new MonitorRunnable();
- monitorRunnable.lifecycleAware = lifecycleAware;
- monitorRunnable.supervisoree = process;
- monitorRunnable.monitorService = monitorService;
- supervisedProcesses.put(lifecycleAware, process);
- //4、定期的去執行組件監控器,獲取組件最新狀態,或者重新啟動組件
- ScheduledFuture<?> future = monitorService.scheduleWithFixedDelay(
- monitorRunnable, 0, 3, TimeUnit.SECONDS);
- monitorFutures.put(lifecycleAware, future);
- }
如果不需要守護了,則需要調用unsupervise:
- public synchronized void unsupervise(LifecycleAware lifecycleAware) {
- synchronized (lifecycleAware) {
- Supervisoree supervisoree = supervisedProcesses.get(lifecycleAware);
- //1.1、設置守護組件的狀態為被丟棄
- supervisoree.status.discard = true;
- //1.2、設置組件盼望的最新生命周期狀態為STOP
- this.setDesiredState(lifecycleAware, LifecycleState.STOP);
- //1.3、停止組件
- lifecycleAware.stop();
- }
- //2、從守護組件中移除
- supervisedProcesses.remove(lifecycleAware);
- //3、取消定時監控組件服務
- monitorFutures.get(lifecycleAware).cancel(false);
- //3.1、通知Purger需要進行清理,Purger會定期的移除cancel的組件
- needToPurge = true;
- monitorFutures.remove(lifecycleAware);
- }
接下來我們再看下MonitorRunnable的實現,其負責進行組件狀態遷移或組件故障恢復:
- public void run() {
- long now = System.currentTimeMillis();
- try {
- if (supervisoree.status.firstSeen == null) {
- supervisoree.status.firstSeen = now; //1、記錄第一次狀態查看時間
- }
- supervisoree.status.lastSeen = now; //2、記錄最后一次狀態查看時間
- synchronized (lifecycleAware) {
- //3、如果守護組件被丟棄或出錯了,則直接返回
- if (supervisoree.status.discard || supervisoree.status.error) {
- return;
- }
- //4、更新最后一次查看到的狀態
- supervisoree.status.lastSeenState = lifecycleAware.getLifecycleState();
- //5、如果組件的狀態和守護組件看到的狀態不一致,則以守護組件的狀態為准,然后進行初始化
- if (!lifecycleAware.getLifecycleState().equals(
- supervisoree.status.desiredState)) {
- switch (supervisoree.status.desiredState) {
- case START: //6、如果是啟動狀態,則啟動組件
- try {
- lifecycleAware.start();
- } catch (Throwable e) {
- if (e instanceof Error) {
- supervisoree.status.desiredState = LifecycleState.STOP;
- try {
- lifecycleAware.stop();
- } catch (Throwable e1) {
- supervisoree.status.error = true;
- if (e1 instanceof Error) {
- throw (Error) e1;
- }
- }
- }
- supervisoree.status.failures++;
- }
- break;
- case STOP: //7、如果是停止狀態,則停止組件
- try {
- lifecycleAware.stop();
- } catch (Throwable e) {
- if (e instanceof Error) {
- throw (Error) e;
- }
- supervisoree.status.failures++;
- }
- break;
- default:
- }
- } catch(Throwable t) {
- }
- }
- }
如上代碼進行了一些簡化,整體邏輯即定時去采集組件的狀態,如果發現守護組件和組件的狀態不一致,則可能需要進行啟動或停止。即守護監視器可以用來保證組件如能失敗后自動啟動。默認策略是總是失敗后重啟,還有一種策略是只啟動一次。
日志是系統數據的基石,對於系統的安全來說非常重要,它記錄了系統每天發生的各種各樣的事情,用戶可以通過它來檢查錯誤發生的原因,或者尋找受到攻擊時攻擊者留下的痕跡。日志主要的功能是審計和監測。它還可以實時地監測系統狀態,監測和追蹤侵入者。現在互聯網上存在的日志組件各種各樣,我們這里主要講的是Flume。
Flume 發展歷史
Cloudera 開發的分布式日志收集系統 Flume,是 hadoop 周邊組件之一。其可以實時的將分布在不同節點、機器上的日志收集到 hdfs 中。Flume 初始的發行版本目前被統稱為 Flume OG(original generation),屬於 cloudera。但隨着 FLume 功能的擴展,Flume OG 代碼工程臃腫、核心組件設計不合理、核心配置不標准等缺點暴露出來,尤其是在 Flume OG 的最后一個發行版本 0.94.0 中,日志傳輸不穩定的現象尤為嚴重,這點可以在 BigInsights 產品文檔的 troubleshooting 板塊發現。為了解決這些問題,2011 年 10 月 22 號,cloudera 完成了 Flume-728,對 Flume 進行了里程碑式的改動:重構核心組件、核心配置以及代碼架構,重構后的版本統稱為 Flume NG(next generation);改動的另一原因是將 Flume 納入 apache 旗下,cloudera Flume 改名為 Apache Flume。當然我們現在用的是Flume NG,所以不再講Flume OG的內容。
Flume定義
Flume是一個高可用,高可靠,分布式海量日志采集、聚合和傳輸系統,Flume支持在日志系統中定制各類數據發送方,用於收集數據;同時,Flume提供對數據進行簡單處理,並寫到各種數據接受方(可定制)的能力。
Flume架構介紹
Flume日志收集結構圖
Flume 的核心是把數據從數據源收集過來,再送到目的地。
為了保證輸送一定成功,在送到目的地之前,會先緩存數據,待數據真正到達目的地后,刪除自己緩存的數據。
Flume 傳輸的數據的基本單位是 Event,如果是文本文件,通常是一行記錄,這也是事務的基本單位。
Event 從 Source,流向 Channel,再到 Sink,本身為一個 byte 數組,並可攜帶 headers 信息。
Event 代表着一個數據流的最小完整單元,從外部數據源來,向外部的目的地去。
Flume 運行的核心是 Agent。它是一個完整的數據收集工具,含有三個核心組件,分別是 Source、Channel、Sink。
Source 可以接收外部源發送過來的數據。不同的 Source,可以接受不同的數據格式。比如有目錄池(spooling directory)數據源,可以監控指定文件夾中的新文件變化,如果目錄中有文件產生,就會立刻讀取其內容。
Channel 是一個存儲地,接收 Source 的輸出,直到有 Sink 消費掉 Channel 中的數據。
Channel 中的數據直到進入到下一個Channel中或者進入終端才會被刪除。
當 Sink 寫入失敗后,可以自動重啟,不會造成數據丟失,因此很可靠。
Sink 會消費 Channel 中的數據,然后送給外部源或者其他 Source。如數據可以寫入到 HDFS 或者 HBase 中。
Flume 核心概念整理
Agent Agent中包含多個sources和sinks。
Client 生產數據,運行在一個獨立的線程。
Source 從Client收集數據,傳遞給Channel。用來消費傳遞到該組件的Event。
Sink 從Channel收集數據,將Event傳遞到Flow Pipeline中的下一個Agent。
Channel 中轉Event臨時存儲,保存Source傳遞過來Event,連接 sources 和 sinks 。
Events 一個數據單元,帶有一個可選的消息頭。可以是日志記錄、 avro 對象等。
Flume 特點
flume的數據流由事件(Event)貫穿始終。事件是Flume的基本數據單位,它攜帶日志數據(字節數組形式)並且攜帶有頭信息,這些Event由Agent外部的Source生成,當Source捕獲事件后會進行特定的格式化,然后Source會把事件推入(單個或多個)Channel中。你可以把Channel看作是一個緩沖區,它將保存事件直到Sink處理完該事件。Sink負責持久化日志或者把事件推向另一個Source。
Agent是Flume中最小的運行單位,一個Agent中由Source、Sink和Channel三個組件構成。
Event是Flume中基本數據單位,Event中包含有傳輸數據及數據頭數據包
如下圖所示:
值得注意的是,Flume提供了大量內置的Source、Channel和Sink類型。不同類型的Source,Channel和Sink可以自由組合。組合方式基於用戶設置的配置文件,非常靈活。
比如:Channel可以把事件暫存在內存里,也可以持久化到本地硬盤上。Sink可以把日志寫入HDFS, HBase,甚至是另外一個Source等等。Flume支持用戶建立多級流,也就是說,多個agent可以協同工作,並且支持Fan-in、Fan-out、Contextual Routing、Backup Routes,這也正是NB之處。
如下圖所示:
Flume 整體架構總結
Flume架構整體上看就是 source-->channel-->sink 的三層架構,類似生成者和消費者的架構,他們之間通過queue(channel)傳輸,解耦。
Source:完成對日志數據的收集,分成 transtion 和 event 打入到channel之中。
Channel:主要提供一個隊列的功能,對source提供中的數據進行簡單的緩存。
Sink:取出Channel中的數據,進行相應的存儲文件系統,數據庫,或者提交到遠程服務器。
對現有程序改動最小的使用方式是使用是直接讀取程序原來記錄的日志文件,基本可以實現無縫接入,不需要對現有程序進行任何改動。
Flume 下載、安裝
安裝JDK
1.將下載好的JDK包解壓,比如我的解壓到 /home/liuqing/jdk1.7.0_72 目錄下
2.配置環境變量
在/etc/profile 文件中添加
- export JAVA_HOME=/home/liuqing/jdk1.7.0_72
- export PATH=$JAVA_HOME/bin:$PATH
- export CLASS_PATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar:$CLASS_PATH
3.執行source profile
4.在命令行輸入 java -version
出現:
java version "1.7.0_72"
Java(TM) SE Runtime Environment (build 1.7.0_72-b14)
Java HotSpot(TM) 64-Bit Server VM (build 24.72-b04, mixed mode)
表示安裝成功
安裝Flume
1. 從官網 http://flume.apache.org/download.html 下載最新的安裝包
2. 解壓縮,比如我的解壓到 /home/liuqing/hadoop/flume目錄
3. 修改 flume-env.sh 配置文件,主要是JAVA_HOME變量設置
JAVA_HOME=/home/liuqing/jdk1.7.0_72
4. 驗證是否安裝成功
root@ubuntu:/home/liuqing/hadoop/flume/bin# ./flume-ng version
出現:
Flume 1.6.0
Source code repository: https://git-wip-us.apache.org/repos/asf/flume.git
表示安裝成功
案例
案例1. 單節點 Flume配置
1. 新建配置文件,配置文件示例
- # example.conf: A single-node Flume configuration
- # agent組件名稱
- a1.sources = r1
- a1.sinks = k1
- a1.channels = c1
- # source 配置
- a1.sources.r1.type = netcat
- a1.sources.r1.bind = localhost
- a1.sources.r1.port = 44444
- # sink 配置
- a1.sinks.k1.type = logger
- # 使用內存中Buffer Event Channel
- a1.channels.c1.type = memory
- a1.channels.c1.capacity = 1000
- a1.channels.c1.transactionCapacity = 100
- # 綁定 source 和 sink 到channel
- a1.sources.r1.channels = c1
- a1.sinks.k1.channel = c1
將上述配置存為:/home/liuqing/hadoop/flume/conf/example.conf
2. 然后我們就可以啟動 Flume 了:
在/home/liuqing/hadoop/flume路徑下運行:
- bin/flume-ngagent --conf conf --conf-file conf/example.conf --name a1 -Dflume.root.logger=INFO,console
其中 -c/--conf 后跟配置目錄,-f/--conf-file 后跟具體的配置文件,-n/--name 指定agent的名稱
3. 然后我們再開一個 shell 終端窗口,telnet 上配置中偵聽的端口,就可以發消息看到效果了:
- $ telnet localhost 44444
- Trying 127.0.0.1...
- Connected to localhost.localdomain (127.0.0.1).
- Escape character is '^]'.
- Hello world! <ENTER>
- OK
4.Flume 終端窗口此時會打印出如下信息,就表示成功了
- 12/06/1915:32:19 INFO source.NetcatSource: Source starting
- 12/06/1915:32:19 INFO source.NetcatSource: Created serverSocket:sun.nio.ch.ServerSocketChannelImpl[/127.0.0.1:44444]
- 12/06/1915:32:34 INFO sink.LoggerSink: Event: { headers:{} body: 48 65 6C 6C 6F 20 77 6F 72 6C 64 21 0D Hello world!. }
至此,咱們的第一個 Flume Agent 算是部署成功了!
案例2. 結合實際項目
參考:https://github.com/gilt/logback-flume-appender
1. 在/home/liuqing/hadoop/flume/conf/下新建配置文件 test.conf
- agent1.sources = source1
- agent1.sinks = sink1
- agent1.channels = channel1
- # Describe/configure source1
- agent1.sources.source1.type = avro
- agent1.sources.source1.bind = 0.0.0.0
- agent1.sources.source1.port = 44444
- # Describe sink1
- #日志文件按時間生成
- #agent1.sinks.sink1.type = FILE_ROLL
- #agent1.sinks.sink1.sink.directory = /home/liuqing/hadoop/flume/flume-out
- #agent1.sinks.sink1.sink.rollInterval = 1800
- #agent1.sinks.sink1.batchSize = 5
- #日志文件根據大小生成
- #生成目錄在conf文件夾下的log4j.properties可以配置
- agent1.sinks.sink1.type = logger
- # Use a channel which buffers events in memory
- agent1.channels.channel1.type = memory
- agent1.channels.channel1.capacity = 1000
- agent1.channels.channel1.transactionCapactiy = 100
- # Bind the source and sink to the channel
- agent1.sources.source1.channels = channel1
- agent1.sinks.sink1.channel = channel1
2. 項目已經配好了logback.xml 文件
在logback.xml文件中添加
- <appender name="flumeApplender"
- class="com.xxx.hd.extended.log.flume.FlumeLogstashV1Appender">
- <flumeAgents>
- 192.168.23.235:44444,
- </flumeAgents>
- <flumeProperties>
- connect-timeout=4000;
- request-timeout=8000
- </flumeProperties>
- <batchSize>2048</batchSize>
- <reportingWindow>20480</reportingWindow>
- <additionalAvroHeaders>
- myHeader=myValue
- </additionalAvroHeaders>
- <application>ProjectName</application>
- <layout class="ch.qos.logback.classic.PatternLayout">
- <pattern>%d{HH:mm:ss.SSS} %-5level %logger{36} - \(%file:%line\) - %msg%n%ex</pattern>
- </layout>
- </appender>
3. 然后我們就可以啟動 Flume 了:
在/home/liuqing/hadoop/flume路徑下運行:
- bin/flume-ng agent --conf ./conf/ -f conf/lqtest.conf -n agent1
4. 現在日志會打印到/home/liuqing/hadoop/flume/logs目錄下
日志文件滿128M就會自動建一個新的