flume原理及代碼實現


轉載標明出處:http://www.cnblogs.com/adealjason/p/6240122.html

 

最近想玩一下流計算,先看了flume的實現原理及源碼

源碼可以去apache 官網下載

下面整理下flume的原理及代碼實現:

flume是一個實時數據收集工具,hadoop的生態圈之一,主要用來在分布式環境下各服務器節點做數據收集,然后匯總到統一的數據存儲平台,flume支持多種部署架構模式,單點agent部署,分層架構模式部署,如通過一個負載均衡agent將收集的數據分發到各個子agent,然后在匯總到同一個agent上,數據傳輸到統一的數據存儲平台,再次不多廢話,flume支持的部署架構圖可以參見源碼中的doc目錄下的圖片

 

 

flume原理:

目前最新版本是Flume NG,以下基於Flume NG來說:

flume由以下幾個核心概念:

flume event:flume內部的數據單元,包含兩部分,一個頭結點,一個body結點,頭結點是一個Map<String, String>,部署的agent結點可以通過現有的Interceptor或者自定義Interceptor往消息頭里放置數據,如ip,hostname等標識消息來源於哪台服務器,event在flume內部做流轉,是數據傳輸的載體

flume source:source是flume的數據來源,flume支持多種數據來源,如taildir監控一個文件的變化,spollDir監控一個文件夾的變化,jmsSource接收jms消息等,最常用的avroSource是構成flume分層架構的基礎,source是一個接口,flume提供了多種消息接入方式,在sourceType枚舉類中都有詳細列出,特殊說明下,由於flume是面向接口編程,其中有一個Other的枚舉,是占位符,使用者可以自定義source源,只要求在flume啟動的時候可以加載到這個類即可(底層是通過反射獲取到class的實例的)

flume channel:flume是基於pipeline的模式,channel的存在豐富了flume的數據傳播途徑,channel可以再source和sink之間做緩沖,動態調節數據的收集及發送(內部有一個xxxCounter會沒接收到一個event或者發送一個event都會做記錄),緩沖source和sink之間的壓力,其二channel可以關聯多個source,如一個source可以按照配置選擇的將數據復制到各個管道,或者按照消息頭自動分發到指定的管道,一個channel可以接多個sink,這個實現了同一份數據的多發發送池,實現了數據的復用及負載均衡等功能,channel內部流轉的數據載體是event,flume channel支持多種數據緩沖實現方式,如fileChannel:用一個文件做數據緩存、memoryChannel:使用內存緩存,底層實現是一個LinkedBlockingDeque,一個雙向阻塞列表,具體可參見ChannelType

flume sink:flume的數據發送池,主要負責數據的發送,從channel接收到event,然后發送到指定的數據接收方,flume提供多種sink實現,具體可參見SinkType,常用的有:loggerSink:這個主要用於flume的部署調試,它會將接收到的event事件直接用log4j輸出出來,RollingFileSink:這個sink主要是將接收到的日志文件序列化到一個文件目錄中,所以需要配置文件的地址,切分文件的頻率等,avroSink:這個是flume分層架構中最常用的sink,一般和avroSource配對使用,avro是apache的一個子項目,用於數據的序列化,使用avroSource及avroSink時,需要在avroSource的agent節點服務器上監聽一個端口,avroSink的agent把接收到的數據發送到該ip、port上即完成了flume的分層部署,avro僅是一個數據序列化工具,底層實現由一個RpcClient的東東來將數據在這source和sink之間傳輸(可以留一下啟動日志,會自動創建一個RpcClient),當然,flume的編碼是按照面向接口來的,所以和source一樣支持自定義的sink

上述是幾個核心的概念,正式由於flume的這種設計思想及編碼風格,讓flume有很強的拓展性

 

當然僅僅有這幾個還是不可以完全讓flume運行起來的,flume提供了很多輔助類用於驅動、分發內部event及整個flume系統的運轉,基本如下:

配置領域:

AgentConfiguration:這個看名字就知道是flume的配置元素領域內的東西,是的,使用者在flume-conf.properties中配置的數據解析成AgentConfiguration,是配置文件到面向對象的一個抽象

AbstractConfigurationProvider:該類看名字就是一個抽象的配置Provider類,內部有一個很重要的方法就是:getConfiguration(),該方法中通過如下幾個private方法來加載flume的channel、source、sink、sinkGroups並將它們關聯起來

        loadChannels(agentConf, channelComponentMap);

        loadSources(agentConf, channelComponentMap, sourceRunnerMap);

        loadSinks(agentConf, channelComponentMap, sinkRunnerMap);

flume還支持動態加載,PollingPropertiesFileConfigurationProvider(AbstractConfigurationProvider的一個具體實現)在flume啟動的時候會啟動一個線程FileWatcherRunnable,監控flume的配置文件變化,配置文件內部加載用的是google的EventBus來驅動的

驅動領域:

flume的source有如下兩個子接口:PollableSource和EventDrivenSource,前者需要自己去輪循的訪問數據源,當前是否可以加載到數據,如果有則加載進來轉換成flume的event,實現類有taildir、spollDir、jsmSource、kafkaSource等,該接口新增了一個process方法用於輪循調用,后者是一個事件驅動的Source,該接口不需要主動去訪問數據源,僅需要接收數據推動過來的event並轉換成flume的event即可,實現類有:scribeSource(該數據源用來打通Facebook的scribe數據收集工具)、AvroSource等

SourceRunner:

由於這兩個source的存在,所以所以flume提供了兩個sourceRunner來驅動source的運行,分別是PollableSourceRunner和EventDrivenSourceRunner,前者啟動時自動啟動一個PollingRunner線程用於定時輪循process方法

channelProcessor:

該類用於source到channel之間的數據發送,實現了一個source可以關聯到多個channel,簡單點如這2個接口,source的定義:setChannelProcessor(ChannelProcessor channelProcessor)指定一個ChannelProcessor ,ChannelProcessor 關聯到一個final的ChannelSelector,selector關聯到Channel:setChannels(List<Channel> channels)

ChannelProcessor:

關聯到指定的ChannelSelector,ChannelSelector提供了兩種selector方式,ReplicatingChannelSelector:將source的event復制到各個channel中,MultiplexingChannelSelector:根據頭結點的header信息自動路由到對應的Channel中

Transaction及BasicTransactionSemantics

flume的Channel內部保證一個event的發送在一個事務完成,如果發送失敗或者接收失敗則回滾,當成功時才從channel中刪除掉該event

SinkProcessor:

用過選擇要發送的sink,什么意思呢?該類有兩個實現:

LoadBalancingSinkProcessor:

負載均衡方式:提供了roud_bin算法和random算法、以及固定order算法的實現方式,將Channel中的event發送到多個sink上

FailoverSinkProcessor:

可以實現實現failover功能,具體流程類似LoadBalancingSinkProcessor,區別是FailoverSinkProcessor維護了一個PriorityQueue,用來根據權重選擇sink

SinkRunner:

該類用於驅動一個sink,啟動是內部開了一個線程PollingRunner,定時的調用SinkProcessor

上述是所有的核心概念及代碼作用,下面描述下flume的運行流程:

1.系統啟動時通過配置領域可以按照客戶定義的配置加載一個flume

2.SourceRunner和SinkProcessor同時啟動,一個往Channel中生產event,一個從Channel中消費event,內部是一個生產者消費者模式

3.通過一些輔助類,實現Channel到source及sink的多路分發及分層架構

 

下面是一個自己搭建的flume配置文件,供參考:

實現流程:

負載均衡+分發+落地到日志文件

1.負載均衡節點:

從兩個文件源讀數據,在event頭里增加數據來源標識,復制到兩個channel中,一個log打印,一個做負載均衡分發到另外兩台機器的agent上,負載均衡算法采用roud_robin

loadBalancAgent.sources = taildirSrc

loadBalancAgent.channels = memoryChannel fileChannel

loadBalancAgent.sinks = loggerSink1 loggerSink2 loggerSink3

loadBalancAgent.sinkgroups = loadBalanceGroups

## taildirSrc config

loadBalancAgent.sources.taildirSrc.type = TAILDIR

loadBalancAgent.sources.taildirSrc.positionFile = /alidata1/admin/openSystem/flumetest/log/taildir_position.json

loadBalancAgent.sources.taildirSrc.filegroups = f1 f2

loadBalancAgent.sources.taildirSrc.filegroups.f1 = /alidata1/admin/dts-server-web/dts-server.log

loadBalancAgent.sources.taildirSrc.headers.f1.headerKey1 = dts-server-log

loadBalancAgent.sources.taildirSrc.filegroups.f2 = /alidata1/admin/flume/test.log

loadBalancAgent.sources.taildirSrc.headers.f2.headerKey1 = flume-test-log

loadBalancAgent.sources.taildirSrc.fileHeader = true

## replicating channel config

loadBalancAgent.sources.taildirSrc.selector.type = replicating

loadBalancAgent.sources.taildirSrc.channels = memoryChannel fileChannel

loadBalancAgent.sources.taildirSrc.selector.optional = fileChannel

## memory chanel config

loadBalancAgent.channels.memoryChannel.type = memory

loadBalancAgent.channels.memoryChannel.capacity = 10000

loadBalancAgent.channels.memoryChannel.transactionCapacity = 10000

loadBalancAgent.channels.memoryChannel.byteCapacityBufferPercentage = 20

loadBalancAgent.channels.memoryChannel.byteCapacity = 800000

## file channel config

loadBalancAgent.channels.fileChannel.type = file

loadBalancAgent.channels.fileChannel.checkpointDir = /alidata1/admin/openSystem/flumetest/log

loadBalancAgent.channels.fileChannel.dataDirs = /alidata1/admin/openSystem/flumetest/data

## loadbalance sink processor

loadBalancAgent.sinkgroups.loadBalanceGroups.sinks = loggerSink1 loggerSink2

loadBalancAgent.sinkgroups.loadBalanceGroups.processor.type = load_balance

loadBalancAgent.sinkgroups.loadBalanceGroups.processor.backoff = true

loadBalancAgent.sinkgroups.loadBalanceGroups.processor.selector = round_robin

## loggerSink1 config

loadBalancAgent.sinks.loggerSink1.type = avro

loadBalancAgent.sinks.loggerSink1.channel = memoryChannel

loadBalancAgent.sinks.loggerSink1.hostname = 10.253.42.162

loadBalancAgent.sinks.loggerSink1.port = 4141

## loggerSink2 config

loadBalancAgent.sinks.loggerSink2.type = avro

loadBalancAgent.sinks.loggerSink2.channel = memoryChannel

loadBalancAgent.sinks.loggerSink2.hostname = 10.139.53.6

loadBalancAgent.sinks.loggerSink2.port = 4141

## loggerSink3 config

loadBalancAgent.sinks.loggerSink3.type = file_roll

loadBalancAgent.sinks.loggerSink3.channel = fileChannel

loadBalancAgent.sinks.loggerSink3.sink.rollInterval = 0

loadBalancAgent.sinks.loggerSink3.sink.directory = /alidata1/admin/openSystem/flumetest/dtsServerLog

2.負載均衡節點1

接收avroSink並落地到文件中

dispatchAgent.sources= avroSrc

dispatchAgent.channels=memoryChannel

dispatchAgent.sinks=loggerSink

## avroSrc config

dispatchAgent.sources.avroSrc.type = avro

dispatchAgent.sources.avroSrc.channels = memoryChannel

dispatchAgent.sources.avroSrc.bind = 0.0.0.0

dispatchAgent.sources.avroSrc.port = 4141

## memoryChannel config

dispatchAgent.channels.memoryChannel.type = memory

dispatchAgent.channels.memoryChannel.capacity = 10000

dispatchAgent.channels.memoryChannel.transactionCapacity = 10000

dispatchAgent.channels.memoryChannel.byteCapacityBufferPercentage = 20

dispatchAgent.channels.memoryChannel.byteCapacity = 800000

## loggerSink config

dispatchAgent.sinks.loggerSink.type = logger

dispatchAgent.sinks.loggerSink.channel = memoryChannel

3.負載均衡節點2

dispatchAgent.sources= avroSrc

dispatchAgent.channels=memoryChannel

dispatchAgent.sinks=loggerSink

## avroSrc config

dispatchAgent.sources.avroSrc.type = avro

dispatchAgent.sources.avroSrc.channels = memoryChannel

dispatchAgent.sources.avroSrc.bind = 0.0.0.0

dispatchAgent.sources.avroSrc.port = 4141

## memoryChannel config

dispatchAgent.channels.memoryChannel.type = memory

dispatchAgent.channels.memoryChannel.capacity = 10000

dispatchAgent.channels.memoryChannel.transactionCapacity = 10000

dispatchAgent.channels.memoryChannel.byteCapacityBufferPercentage = 20

dispatchAgent.channels.memoryChannel.byteCapacity = 800000

## loggerSink config

dispatchAgent.sinks.loggerSink.type = logger

dispatchAgent.sinks.loggerSink.channel = memoryChannel


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM