Flume之核心架構深入解析


 

我們一起來了解Source、Channel和Sink的全鏈路過程。

一、Flume架構分析

這個圖中核心的組件是:

Source,ChannelProcessor,Channel,Sink。他們的關系結構如下:

Source  {
    ChannelProcessor  {
             Channel  ch1
             Channel  ch2
             …
    }
} 
Sink  {
   Channel  ch; 
} 
SinkGroup {
   Channel ch;
   Sink s1;
   Sink s2;
   …
}

二、各組件詳細介紹

1、Source組件

Source是數據源的總稱,我們往往設定好源后,數據將源源不斷的被抓取或者被推送。

常見的數據源有:ExecSource,KafkaSource,HttpSource,NetcatSource,JmsSource,AvroSource等等。

所有的數據源統一實現一個接口類如下:

@InterfaceAudience.Public @InterfaceStability.Stable public interface Source extends LifecycleAware, NamedComponent { /** * Specifies which channel processor will handle this source's events. * * @param channelProcessor */ public void setChannelProcessor(ChannelProcessor channelProcessor); /** * Returns the channel processor that will handle this source's events. */ public ChannelProcessor getChannelProcessor(); } 

Source提供了兩種機制: PollableSource(輪詢拉取)和EventDrivenSource(事件驅動):

上圖展示的Source繼承關系類圖。

通過類圖我們可以看到NetcatSource,ExecSource和HttpSource屬於事件驅動模型。KafkaSource,SequenceGeneratorSource和JmsSource屬於輪詢拉取模型。

Source接口繼承了LifecycleAware接口,它的的所有邏輯的實現在接口的start和stop方法中進行。

下圖是類關系方法圖:

Source接口定義的是最終的實現過程,比如通過日志抓取日志,這個抓取的過程和實際操作就是在對應的Source實現中,比如:ExecSource。那么這些Source實現由誰來驅動的呢?現在我們將介紹SourceRunner類。看一下類繼承結構圖:

我們看一下PollableSourceRunner和EventDrivenSourceRunner的具體實現:

//PollableSourceRunner: public void start() { PollableSource source = (PollableSource) getSource(); ChannelProcessor cp = source.getChannelProcessor(); cp.initialize(); source.start(); runner = new PollingRunner(); runner.source = source; //Source實現類就在這里被賦與。 runner.counterGroup = counterGroup; runner.shouldStop = shouldStop; runnerThread = new Thread(runner); runnerThread.setName(getClass().getSimpleName() + "-" + source.getClass().getSimpleName() + "-" + source.getName()); runnerThread.start(); lifecycleState = LifecycleState.START; } //EventDrivenSourceRunner: @Override public void start() { Source source = getSource(); ChannelProcessor cp = source.getChannelProcessor(); cp.initialize(); source.start(); lifecycleState = LifecycleState.START; } 

注:其實所有的Source實現類內部都維護着線程,執行source.start()其實就是啟動了相應的線程。

剛才我們看代碼,代碼中一直都在展示channelProcessor這個類,同時最上面架構設計圖里面也提到了這個類,那它到底是干什么呢,下面我們就對其分解。

2、Channel組件

Channel用於連接Source和Sink,Source將日志信息發送到Channel,Sink從Channel消費日志信息;Channel是中轉日志信息的一個臨時存儲,保存有Source組件傳遞過來的日志信息。

先看代碼如下:

ChannelSelectorConfiguration selectorConfig = config.getSelectorConfiguration(); ChannelSelector selector = ChannelSelectorFactory.create(sourceChannels, selectorConfig); ChannelProcessor channelProcessor = new ChannelProcessor(selector); Configurables.configure(channelProcessor, config); source.setChannelProcessor(channelProcessor); 

ChannelSelectorFactory.create方法實現如下:

public static ChannelSelector create(List<Channel> channels, ChannelSelectorConfiguration conf) { String type = ChannelSelectorType.REPLICATING.toString(); if (conf != null){ type = conf.getType(); } ChannelSelector selector = getSelectorForType(type); selector.setChannels(channels); Configurables.configure(selector, conf); return selector; } 

其中我們看一下ChannelSelectorType這個枚舉類,包括了幾種類型:

public enum ChannelSelectorType { /** * Place holder for custom channel selectors not part of this enumeration. */ OTHER(null), /** * 復用通道選擇器 */ REPLICATING("org.apache.flume.channel.ReplicatingChannelSelector"), /** * 多路通道選擇器 */ MULTIPLEXING("org.apache.flume.channel.MultiplexingChannelSelector"); } 

ChannelSelector的類結構圖如下所示:

注:RelicatingChannelSelector和MultiplexingChannelSelector是二個通道選擇器,第一個是復用型通道選擇器,也就是的默認的方式,會把接收到的消息發送給其他每個channel。第二個是多路通道選擇器,這個會根據消息header中的參數進行通道選擇。

說完通道選擇器,正式來解釋Channel是什么,先看一個接口類:

public interface Channel extends LifecycleAware, NamedComponent { public void put(Event event) throws ChannelException; public Event take() throws ChannelException; public Transaction getTransaction(); } 

注:put方法是用來發送消息,take方法是獲取消息,transaction是用於事務操作。

類結構圖如下:

3、Sink組件

Sink負責取出Channel中的消息數據,進行相應的存儲文件系統,數據庫,或者提交到遠程服務器。

Sink在設置存儲數據時,可以向文件系統中,數據庫中,hadoop中儲數據,在日志數據較少時,可以將數據存儲在文件系中,並且設定一定的時間間隔保存數據。在日志數據較多時,可以將相應的日志數據存儲到Hadoop中,便於日后進行相應的數據分析。

Sink接口類內容如下:

public interface Sink extends LifecycleAware, NamedComponent { public void setChannel(Channel channel); public Channel getChannel(); public Status process() throws EventDeliveryException; public static enum Status { READY, BACKOFF } } 

Sink是通過如下代碼進行的創建:

Sink sink = sinkFactory.create(comp.getComponentName(), comp.getType()); 

DefaultSinkFactory.create方法如下:

public Sink create(String name, String type) throws FlumeException { Preconditions.checkNotNull(name, "name"); Preconditions.checkNotNull(type, "type"); logger.info("Creating instance of sink: {}, type: {}", name, type); Class<? extends Sink> sinkClass = getClass(type); try { Sink sink = sinkClass.newInstance(); sink.setName(name); return sink; } catch (Exception ex) { System.out.println(ex); throw new FlumeException("Unable to create sink: " + name + ", type: " + type + ", class: " + sinkClass.getName(), ex); } } 

注:Sink是通過SinkFactory工廠來創建,提供了DefaultSinkFactory默認工廠,程序會查找org.apache.flume.conf.sink.SinkType這個枚舉類找到相應的Sink處理類,比如:org.apache.flume.sink.LoggerSink,如果沒找到對應的處理類,直接通過Class.forName(className)進行直接查找實例化實現類。

Sink的類結構圖如下:

與ChannelProcessor處理類對應的是SinkProcessor,由SinkProcessorFactory工廠類負責創建,SinkProcessor的類型由一個枚舉類提供,看下面代碼:

public enum SinkProcessorType { /** * Place holder for custom sinks not part of this enumeration. */ OTHER(null), /** * 故障轉移 processor * * @see org.apache.flume.sink.FailoverSinkProcessor */ FAILOVER("org.apache.flume.sink.FailoverSinkProcessor"), /** * 默認processor * * @see org.apache.flume.sink.DefaultSinkProcessor */ DEFAULT("org.apache.flume.sink.DefaultSinkProcessor"), /** * 負載processor * * @see org.apache.flume.sink.LoadBalancingSinkProcessor */ LOAD_BALANCE("org.apache.flume.sink.LoadBalancingSinkProcessor"); private final String processorClassName; private SinkProcessorType(String processorClassName) { this.processorClassName = processorClassName; } public String getSinkProcessorClassName() { return processorClassName; } } 

SinkProcessor的類結構圖如下:

說明:

1、FailoverSinkProcessor是故障轉移處理器,當sink從通道拿數據信息時出錯進行的相關處理,代碼如下:

public Status process() throws EventDeliveryException { // 經過了冷卻時間,再次發起重試 Long now = System.currentTimeMillis(); while(!failedSinks.isEmpty() && failedSinks.peek().getRefresh() < now) { //從失敗隊列中獲取sink節點 FailedSink cur = failedSinks.poll(); Status s; try { //調用相應sink進行處理,比如將channel的數據讀取存放到文件中, //這個存放文件的動作就在process中進行。 s = cur.getSink().process(); if (s == Status.READY) { //如果處理成功,則放到存活隊列中 liveSinks.put(cur.getPriority(), cur.getSink()); activeSink = liveSinks.get(liveSinks.lastKey()); logger.debug("Sink {} was recovered from the fail list", cur.getSink().getName()); } else { // if it's a backoff it needn't be penalized. //如果處理失敗,則繼續放到失敗隊列中 failedSinks.add(cur); } return s; } catch (Exception e) { cur.incFails(); failedSinks.add(cur); } } Status ret = null; while(activeSink != null) { try { ret = activeSink.process(); return ret; } catch (Exception e) { logger.warn("Sink {} failed and has been sent to failover list", activeSink.getName(), e); activeSink = moveActiveToDeadAndGetNext(); } } 

2、LoadBalancingSinkProcessor是負載Sink處理器

首先我們和ChannelProcessor一樣,我們也要重點說明一下SinkSelector這個選擇器。

先看一下SinkSelector.configure方法的部分代碼:

if (selectorTypeName.equalsIgnoreCase(SELECTOR_NAME_ROUND_ROBIN)) { selector = new RoundRobinSinkSelector(shouldBackOff); } else if (selectorTypeName.equalsIgnoreCase(SELECTOR_NAME_RANDOM)) { selector = new RandomOrderSinkSelector(shouldBackOff); } else { try { @SuppressWarnings("unchecked") Class<? extends SinkSelector> klass = (Class<? extends SinkSelector>) Class.forName(selectorTypeName); selector = klass.newInstance(); } catch (Exception ex) { throw new FlumeException("Unable to instantiate sink selector: " + selectorTypeName, ex); } } 

結合上面的代碼,再看類結構圖如下:

注:RoundRobinSinkSelector是輪詢選擇器,RandomOrderSinkSelector是隨機分配選擇器。

最后我們以KafkaSink為例看一下Sink里面的具體實現:

public Status process() throws EventDeliveryException { Status result = Status.READY; Channel channel = getChannel(); Transaction transaction = null; Event event = null; String eventTopic = null; String eventKey = null; try { long processedEvents = 0; transaction = channel.getTransaction(); transaction.begin(); messageList.clear(); for (; processedEvents < batchSize; processedEvents += 1) { event = channel.take(); if (event == null) { // no events available in channel break; } byte[] eventBody = event.getBody(); Map<String, String> headers = event.getHeaders(); if ((eventTopic = headers.get(TOPIC_HDR)) == null) { eventTopic = topic; } eventKey = headers.get(KEY_HDR); if (logger.isDebugEnabled()) { logger.debug("{Event} " + eventTopic + " : " + eventKey + " : " + new String(eventBody, "UTF-8")); logger.debug("event #{}", processedEvents); } // create a message and add to buffer KeyedMessage<String, byte[]> data = new KeyedMessage<String, byte[]> (eventTopic, eventKey, eventBody); messageList.add(data); } // publish batch and commit. if (processedEvents > 0) { long startTime = System.nanoTime(); producer.send(messageList); long endTime = System.nanoTime(); counter.addToKafkaEventSendTimer((endTime-startTime)/(1000*1000)); counter.addToEventDrainSuccessCount(Long.valueOf(messageList.size())); } transaction.commit(); } catch (Exception ex) { String errorMsg = "Failed to publish events"; logger.error("Failed to publish events", ex); result = Status.BACKOFF; if (transaction != null) { try { transaction.rollback(); counter.incrementRollbackCount(); } catch (Exception e) { logger.error("Transaction rollback failed", e); throw Throwables.propagate(e); } } throw new EventDeliveryException(errorMsg, ex); } finally { if (transaction != null) { transaction.close(); } } return result; } 

注:方法從channel中不斷的獲取數據,然后通過Kafka的producer生產者將消息發送到Kafka里面。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM