Flume高可用+斷點續傳


Flume高可用集群

工欲善其事,必先利其器。
感謝以下博主:
https://www.cnblogs.com/qingyunzong/p/8994494.html
https://blog.csdn.net/peng_0129/article/details/80793440
https://blog.csdn.net/suojie123/article/details/86577935
https://blog.csdn.net/kelong_xhu/article/details/42677045
https://blog.csdn.net/johnnychu/article/details/82780521
flume簡介
官網:http://flume.apache.org/
打開官網【經翻譯】
Flume is a distributed, reliable, and available service for efficiently collecting, aggregating, and moving large amounts of log data. It has a simple and flexible architecture based on streaming data flows. It is robust and fault tolerant with tunable reliability mechanisms and many failover and recovery mechanisms. It uses a simple extensible data model that allows for online analytic application.
Flume是一種分布式的、可靠的、可用的服務,用於有效地收集、聚合和移動大量的日志數據。它具有簡單靈活的基於數據流的體系結構。它具有健壯性和容錯性,具有可調的可靠性機制和許多故障轉移和恢復機制。它時一個使用一個簡單的可擴展數據模型,允許在線分析應用程序。
【附模型】

flume作為cloudera開發的實時日志收集系統,收到業界的普遍認可和廣泛應用。flume按新老來分可分為2個版本:Flume OG【0.94及之前】和Flume NG。 並且重構后的Flume NG納入了apache旗下,更名為Apache Flume, 目前廣泛使用的都是Flume NG即Apache Fluem。
而且目前flume只支持Linux啟動。
Flume特點:
flume是一個分布式、可靠、和高可用的海量日志采集、聚合和傳輸的系統。支持在日志系統中定制各類數據發送方,用於收集數據;同時,Flume提供對數據進行簡單處理,並寫到各種數據接受方(比如文本、HDFS、Hbase等)的能力 。
  flume的數據流由事件(Event)貫穿始終。event是Flume的基本數據單位,它攜帶日志數據(字節數組形式)並且攜帶有頭信息,這些Event由Agent外部的Source生成,當Source捕獲事件后會進行特定的格式化,然后Source會把事件推入(單個或多個)Channel中。你可以把Channel看作是一個緩沖區,它將保存事件直到Sink處理完該事件。Sink負責持久化日志或者把事件推向另一個Source。
 (1)flume的可靠性
  當節點出現故障時,日志能夠被傳送到其他節點上而不會丟失。Flume提供了三種級別的可靠性保障,從強到弱依次分別為:end-to-end(收到數據agent首先將event寫到磁盤上,當數據傳送成功后,再刪除;如果數據發送失敗,可以重新發送。),Store on failure(這也是scribe采用的策略,當數據接收方crash時,將數據寫到本地,待恢復后,繼續發送),Besteffort(數據發送到接收方后,不會進行確認)。
 (2)flume的可恢復性
  還是靠Channel。推薦使用FileChannel,事件持久化在本地文件系統里(性能較差)。

核心概念:
Client:Client生產數據,運行在一個獨立的線程。
Event: 一個數據單元,消息頭和消息體組成。(Events可以是日志記錄、 avro 對象等。)
Flow: Event從源點到達目的點的遷移的抽象。
Agent: 一個獨立的Flume進程,包含組件Source、 Channel、 Sink。(Agent使用JVM 運行Flume。每台機器運行一個agent,但是可以在一個agent中包含多個sources和sinks。)
Source: 數據收集組件。(source從Client收集數據,傳遞給Channel)
Channel: 中轉Event的一個臨時存儲,保存由Source組件傳遞過來的Event。(Channel連接 sources 和 sinks ,這個有點像一個隊列。)
Sink: 從Channel中讀取並移除Event, 將Event傳遞到FlowPipeline中的下一個Agent(如果有的話)(Sink從Channel收集數據,運行在一個獨立線程。)

體系結構:
Flume 運行的核心是 Agent。Flume以agent為最小的獨立運行單位。一個agent就是一個JVM。它是一個完整的數據收集工具,含有三個核心組件,分別是source、 channel、 sink。
Flume source:

1、Avro Source
監聽Avro端口,從Avro client streams接收events。
屬性
【必需設置】
channels:【不表】
type:組件類型名稱,需要時avro
bind:監聽的主機名或ip地址
port:綁定的端口號
【選填設置】
threads:生成的工作現成的最大數量
interceptors:空格分隔的列表攔截器

a1.sources.s1.channels = c1
a1.sources.s1.type = avro
a1.sources.s1.bind = 192.168.123.102
a1.sources.s1.port = 6666


2、Thrift Source:與Avro基本一致

a1.sources.s1.channels = c1
a1.sources.s1.type = thrift
a1.sources.s1.bind = 192.168.123.102
a1.sources.s1.port = 6666

 


3、Exec Source
ExecSource的配置就是設定一個Unix(linux)命令,然后通過這個命令不斷輸出數據。如果進程退出,Exec Source也一起退出,不會產生進一步的數據。
channels:【不表】
type:exec
command:【shell invocation】

a1.sources.s1.type = exec 
a1.sources.s1.command = tail -F /home/hadoop/logs/test.log 
a1.sources.s1.channels = c1


4、JMS Source
從JMS系統(消息、主題)中讀取數據
channels:不表
type:JMS
initialContextFactory:org.apache.activemq.jndi.ActiveMQInitialContextFactory
connectionFactory:
providerURL:JMS系統URL
destinationName:desctination name
destinationType:desctination type(queue or topic)
示例:

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = jms
a1.sources.r1.channels = c1
a1.sources.r1.initialContextFactory = org.apache.activemq.jndi.ActiveMQInitialContextFactory
a1.sources.r1.connectionFactory = GenericConnectionFactory
a1.sources.r1.providerURL = tcp://mqserver:61616
a1.sources.r1.destinationName = BUSINESS_DATA
a1.sources.r1.destinationType = QUEUE

 

5、Spooling Directory Source:
Spooling Directory Source監測配置的目錄下新增的文件,並將文件中的數據讀取出來。
拷貝到spool目錄下的文件不可以再打開編輯;
spool目錄下不可包含相應的子目錄。(作為對日志的准實時監控)

channels:
type:spooldir

 

示例:

a1.channels = ch-1
a1.sources = src-1

a1.sources.src-1.type = spooldir
a1.sources.src-1.channels = ch-1
a1.sources.src-1.spoolDir = /var/log/apache/flumeSpool
a1.sources.src-1.fileHeader = true

 

Flume Channel:
Channels are the repositories where the events are staged on a agent. Source adds the events and Sink removes it.
Channels是events在agent上進行的存儲庫。Source添加events,Sink移除events。
1、Memory Channel(內存)
events存儲在配置最大大小的內存隊列中。對於流量較高和由於agent故障而准備丟失數據的流程來說,這是一個理想的選擇。
官方示例:

a1.channels = c1
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 10000
a1.channels.c1.byteCapacityBufferPercentage = 20
a1.channels.c1.byteCapacity = 800000


2、File Channel
type:file
checkPointDir:
dataDirs:
示例:

 

a1.channels = c1
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /mnt/flume/checkpoint
a1.channels.c1.dataDirs = /mnt/flume/data

3、JDBC Channel
events存儲在持久化存儲庫中(其背后是一個數據庫)。JDBC channel目前支持嵌入式Derby。這是一個持續的channel,對於可恢復性非常重要的流程來說是理想的選擇。
官方示例:

a1.channels = c1
a1.channels.c1.type = jdbc


4、Kafka Channel
events存儲在Kafka集群中。Kafka提供高可用性和高可靠性,所以當agent或者kafka broker 崩潰時,events能馬上被其他sinks可用。
Kafka channel可以被多個場景使用:
Flume source和sink - 它為events提供可靠和高可用的channel
Flume source和interceptor,但是沒sink - 它允許寫Flume evnets到Kafka topic
Flume sink,但是沒source - 這是一種低延遲,容錯的方式從Kafka發送events到Flume sinks 例如 HDFS, HBase或者Solr
type:org.apache.flume.channel.kafka.KafkaChannel
kafak.bootstrap.servers:kafka集群的broker-list【List of brokers in the Kafka cluster】
kafka.topic:flume-channel
示例:

a1.channels.channel1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.channel1.kafka.bootstrap.servers = kafka-1:9092,kafka-2:9092,kafka-3:9092
a1.channels.channel1.kafka.topic = channel1
a1.channels.channel1.kafka.consumer.group.id = flume-consumer

 


以上幾種較為常用,尤其時1和2,還有更多類型。可以到官網學習。

Flume Sinks:
1、Avro sink
Flume events發送到sink,轉換為Avro events,並發送到配置好的hostname/port。從配置好的channel按照配置好的批量大小批量獲取events
channel:
type:avro
hostname:The hostname or IP address to bind to
port:The port # to listen on
官方示例:【表示監聽10.10.10.10主機的4545端口】

 

a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = avro
a1.sinks.k1.channel = c1
a1.sinks.k1.hostname = 10.10.10.10
a1.sinks.k1.port = 4545

 

2、DHFS Sink
該sink把events寫進Hadoop分布式文件系統(HDFS)。它目前支持創建文本和序列文件。它支持在兩種文件類型壓縮。文件可以基於數據的經過時間或者大小或者事件的數量周期性地滾動。它還通過屬性(如時間戳或發生事件的機器)把數據划分為桶或區。
channel:
tyoe:hdfs
hdfs。path:hdfs://namenode/flume/webdata/【HDFS directory path】
hdfs.filePrefix:【FlumeData】Name prefixed to files created by Flume in hdfs directory
hdfs.fileSuffix:Suffix to append to file
官方示例:

a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/%S
a1.sinks.k1.hdfs.filePrefix = events-
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute

 

The above configuration will round down the timestamp to the last 10th minute. For example, an event with timestamp 11:54:34 AM, June 12, 2012 will cause the hdfs path to become /flume/events/2012-06-12/1150/00.

3、File Roll Sink
在本地文件系統存儲的events
channel:
type:file_roll
sink.directory:【The directory where files will be stored】存儲路徑
官網示例:

a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = file_roll
a1.sinks.k1.channel = c1
a1.sinks.k1.sink.directory = /var/log/flume

 

4、Hive Sink:
該sink streams 將包含分割文本或者JSON數據的events直接傳送到Hive表或分區中。使用Hive 事務寫events。當一系列events提交到Hive時,它們馬上可以被Hive查詢到。

channel:
type:hive
hive.metastore:Hive metastore URI (eg thrift://a.b.com:9083 )
hive.database:Hive database name
hive.table:Hive table name
serializer:【Serializer is responsible for parsing out field from the event and mapping them to columns in the hive table. Choice of serializer depends upon the format of the data in the event. Supported serializers: DELIMITED and JSON】

 


--------------------------------------------------
hive.partition:

以hive作為sink注意事項:
1、Hive表必須cluster by bucket
2、Hive表需要stored as orc【大數據:Hive - ORC 文件存儲格式 - ^_TONY_^ - 博客園】
3、在Flume配置的Hive 列名必須都為小寫字母
4、hive是事物表且分區或分桶
5、依賴問題【兩個依賴導入flume的lib目錄】
hive-hcatalog-core-2.1.1-cdh6.1.0.jar
hive-hcatalog-streaming-2.1.1-cdh6.1.0.jar

hive表創建實例【官網】:

create table weblogs ( id int , msg string )
partitioned by (continent string, country string, time string)
clustered by (id) into 5 buckets
stored as orc;


flume sink 配置示例【官網】:

a1.channels = c1
a1.channels.c1.type = memory
a1.sinks = k1

a1.sinks.k1.type = hive
a1.sinks.k1.channel = c1
a1.sinks.k1.hive.metastore = thrift://127.0.0.1:9083
a1.sinks.k1.hive.database = logsdb
a1.sinks.k1.hive.table = weblogs
a1.sinks.k1.hive.partition = asia,%{country},%y-%m-%d-%H-%M
a1.sinks.k1.useLocalTimeStamp = false
a1.sinks.k1.round = true
a1.sinks.k1.roundValue = 10
a1.sinks.k1.roundUnit = minute
a1.sinks.k1.serializer = DELIMITED
a1.sinks.k1.serializer.delimiter = "\t"
a1.sinks.k1.serializer.serdeSeparator = '\t'
#表字段
a1.sinks.k1.serializer.fieldnames =id,,msg
#批量提交到hive表的條數
a1.sinks.k1.batchSize=10000


hive表實例:

create table flume_user(
user_id int
,user_name string
,age int
)
clustered by (user_id) into 2 buckets
stored as orc
tableproperties("transactional"='true')

set hive.support.concurrency=true
set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;
select * from flume_user

 

flume hive sink配置實例:

a1.sources=r1
a1.channels=c1
a1.sinks=s1

a1.sources.r1.type=netcat
a1.sources.r1.bind=master
a1.sources.r1.port=44444

a1.sinks.s1.type=hive
a1.sinks.s1.hive.metastore=thrift://master:9083
a1.sinks.s1.hive.database=bd14
a1.sinks.s1.hive.table=flume_user
a1.sinks.s1.serializer=DELIMITED
a1.sinks.s1.serializer.delimiter="\t"
a1.sinks.s1.serializer.serdeSeparator='\t'
a1.sinks.s1.serializer.fieldnames=user_id,user_name,age

a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=100

a1.sources.r1.channels=c1
a1.sinks.s1.channel=c1

 


5、HBaseSinks / HBase2Sink【2種不同的sink類型,這里把配置sink.type寫在了一起】
【https://cloud.tencent.com/developer/article/1025431:利用Flume 匯入數據到HBase:Flume-hbase-sink 使用方法詳解】
該sink寫數據到HBase
channel:
type: hbase
table:The name of the table in Hbase to write to
columnFamily:The column family in Hbase to write to
zookeeperQuorum:The quorum spec. This is the value for the property hbase.zookeeper.quorum in hbase-site.xml

官網示例:

a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = hbase
#a1.sinks.k1.type = hbase
a1.sinks.k1.table = foo_table
a1.sinks.k1.columnFamily = bar_cf
a1.sinks.k1.serializer = org.apache.flume.sink.hbase.RegexHbaseEventSerializer
#a1.sinks.k1.serializer = org.apache.flume.sink.hbase.RegexHbaseEventSerializer
a1.sinks.k1.channel = c1

 

6、ElasticSearchSink
該sink寫數據到elasticsearch集群
channel:
type:org.apache.flume.sink.elasticsearch.ElasticSearchSink
hostname:ES-cluster:9300
indexName:索引名
indexType:索引類型
clusterName:【Name of the ElasticSearch cluster to connect to】

7、kafka Sink
Flume Sink實現可以導出數據到一個Kafka topic。
type:org.apache.flume.sink.kafka.KafkaSink
kafka.bootstrap.servers:List of brokers Kafka-Sink will connect to, to get the list of topic partitions This can be a partial list of brokers, but we recommend at least two for HA. The format is comma separated list of hostname:port
kafka topic:The topic in Kafka to which the messages will be published
官方示例:

a1.sinks.k1.channel = c1
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = mytopic
a1.sinks.k1.kafka.bootstrap.servers = localhost:9092
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1
a1.sinks.k1.kafka.producer.compression.type = snappy

 

8、Custom Sink
自定義sink是你實現Sink接口。當啟動Flume agent時,一個自定義sink類和它依賴項必須在agent的classpath中。
官方示例:

a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = org.example.MySink
a1.sinks.k1.channel = c1


還有更多的sink類型,如:HTTP Sink、Kite Dataset Sink、MorphlineSolrSink、AsyncHBaseSink

Flume部署:
1、單一流程

配置示例【這里拿了官網的示例】

# example.conf: A single-node Flume configuration

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

# Describe the sink
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

 

2、多代理流程【多個agent順序鏈接】

從tail命令獲取數據發送到avro端口
另一個節點可配置一個avro源來中繼數據,發送外部存儲

#tail-avro.conf
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /home/hadoop/log/test.log
a1.sources.r1.channels = c1

# //Describe the sink
# //綁定的不是本機, 是另外一台機器的服務地址, 
# //sink端的avro是一個發送端, avro的客戶端, 往shizhan02這個機器上發
a1.sinks = k1
a1.sinks.k1.type = avro
a1.sinks.k1.channel = c1
a1.sinks.k1.hostname = shizhan02
a1.sinks.k1.port = 4141
a1.sinks.k1.batch-size = 2

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100


# Bind the source and sink to the channel
a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

 


從avro端口接收數據,下沉到logger【shizhan02】

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
#source中的avro組件是接收者服務, 綁定本機
a1.sources.r1.type = avro
a1.sources.r1.channels = c1
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 4141

# Describe the sink
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

 

3、流合並【扇入】【多個agent匯聚到一個agent】

agent配置:將agent配置復制到不同的服務器上
#agent1表示代理名稱

a1.sources=source1
a1.sinks=sink1
a1.channels=c1

#配置source1
a1.sources.source1.type = spooldir
a1.sources.source1.spoolDir=/data/hadoop/spooldata
a1.sources.source1.channels=c1
a1.sources.source1.fileHeader = true
a1.sources.source1.interceptors =i1 i2 i3
a1.sources.source1.interceptors.i1.type = timestamp
a1.sources.source1.interceptors.i2.type = host
a1.sources.source1.interceptors.i2.hostHeader = hostname
a1.sources.source1.interceptors.i3.type = static
a1.sources.source1.interceptors.i3.key = app
a1.sources.source1.interceptors.i3.value = AOTA
#配置sink1
a1.sinks.sink1.type =avro
a1.sinks.sink1.channel = c1
a1.sinks.sink1.hostname=172.26.50.24
a1.sinks.sink1.port=10001
#配置channel1
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100


collector配置
#agent1表示代理名稱
a1.sources=source1
a1.sinks=sink1
a1.channels=c1

#配置source1
a1.sources.source1.type = avro
a1.sources.source1.channels = c1
a1.sources.source1.bind = 172.26.50.24
a1.sources.source1.port = 10001
#a1.sources.source1.type = spooldir
#a1.sources.source1.spoolDir=/data/hadoop/tmp/flume
#a1.sources.source1.channels=c1
#a1.sources.source1.fileHeader = true
#a1.sources.source1.interceptors =i1
#a1.sources.source1.interceptors.i1.type = timestamp
#配置sink1
#a1.sinks.sink1.type =avro
#a1.sinks.sink1.hostname=172.26.50.24
#a1.sinks.sink1.port=10001
a1.sinks.sink1.channel = c1
a1.sinks.sink1.type = hdfs
a1.sinks.sink1.hdfs.path = hdfs://172.26.50.24:9000/log_in/flume/%Y%m%d
a1.sinks.sink1.hdfs.filePrefix = %{app}_%{hostname}_%y%m%d
a1.sinks.sink1.hdfs.fileSuffix = .log
a1.sinks.sink1.hdfs.rollSize = 1048576
a1.sinks.sink1.hdfs.rollCount = 0
a1.sinks.sink1.hdfs.batchSize = 1500
a1.sinks.sink1.hdfs.round = true
a1.sinks.sink1.hdfs.roundUnit = minute
a1.sinks.sink1.hdfs.threadsPoolSize = 25
a1.sinks.sink1.hdfs.useLocalTimeStamp = true
a1.sinks.sink1.hdfs.minBlockReplicas = 1
a1.sinks.sink1.fileType = SequenceFile
a1.sinks.sink1.writeFormat = TEXT
a1.sinks.sink1.rollInterval = 0

#配置channel1
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

 

會有日志數據順序的問題??

4、多路復用【扇出】【多級流:一個agent對應多個channel】
多路復用:當syslog, java, nginx、 tomcat等混合在一起的日志流開始流入一個agent后,可以agent中將混雜的日志流分開,然后給每種日志建立一個自己的傳輸通道。
多路復制:source被發送到所有三個channel中。
選擇器:Selecttor.type 決定多路取數據的規則
Replication 多路復制
Multiplexing 多路復用
Custom 自定義【自定義方案待調研】

多路復用配置【根據event的header決定傳輸到不同的channal】

#如果文件名1_txt 傳輸到logger 如果是2_txt 傳輸到本地
a1.sources = s1 
a1.channels = c1 c2
a1.sinks = k1 k2

a1.sources.s1.type = spooldir
a1.sources.s1.channels = c1 c2
a1.sources.s1.spoolDir = /home/wangfutai/a/flume/logs
a1.sources.s1.basenameHeader = true
a1.sources.s1.basenameHeaderKey = myselect
#添加選擇器 多路復用
a1.sources.s1.selector.type= multiplexing
#復用規則 selector.header 頭名稱 mapping.變量名
#mapping后的變量名 就是 每個event 中header 值
# basename=2_txt
a1.sources.s1.selector.header = myselect
a1.sources.s1.selector.mapping.1_txt= c1
a1.sources.s1.selector.mapping.2_txt= c2
a1.sources.s1.selector.default= c1 c2

#配置c1 內存
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 10000
a1.channels.c1.byteCapacityBufferPercentage = 20
a1.channels.c1.byteCapacity = 800000

#配置c2 本地磁盤
a1.channels.c2.type = file
a1.channels.c2.checkpointDir = /home/wangfutai/a/flume/checkpoint
a1.channels.c2.dataDirs = /home/wangfutai/a/flume/data

a1.sinks.k1.type = file_roll
a1.sinks.k1.channel = c1
a1.sinks.k1.sink.directory = /home/wangfutai/a/flume/netlogs
#本地回滾時間,每隔60s自動生成一個新的文本.
a1.sinks.k1.sink.rollInterval = 60
a1.sinks.k1.sink.pathManager.prefix = network

#配置 sinks 
a1.sinks.k2.type = logger
#注意:一個sinks只能設置一個channel
a1.sinks.k2.channel = c2

 

多路復制【同一個source數據復制到不同的channal】
agent:

#2個channel和2個sink的配置文件
# Name the components on this agent
a1.sources = s1
a1.sinks = k1 k2
a1.channels = c1 c2

# Describe/configure the source
a1.sources.s1.type = netcat
a1.sources.s1.bind = 192.168.191.201
a1.sources.s1.port = 44444

# 指定source進行扇出到多個channnel的規則
a1.sources.s1.selector.type = replicating
a1.sources.s1.channels = c1 c2

# Use a channel which buffers events in memory
# 指定channel c1
a1.channels.c1.type = memory

# 指定c2的channel
a1.channels.c2.type = memory

# Describe the sink
# 指定k1的設置
a1.sinks.k1.type = avro
a1.sinks.k1.channel = c1
a1.sinks.k1.hostname = 192.168.191.202
a1.sinks.k1.port = 44445

# 指定k2的
a1.sinks.k2.type = avro
a1.sinks.k2.channel = c2
a1.sinks.k2.hostname = 192.168.191.203
a1.sinks.k2.port = 44445

 


2個collector配置:

a1.sources=s1
a1.channels=c1
a1.sinks=k1

#設定sources 數據源監聽本機的44444端口
a1.sources.s1.channels=c1
a1.sources.s1.type=avro

#指定ip或者主機名
a1.sources.s1.bind=192.168.191.202
#指定需要監控的端口
a1.sources.s1.port=44445

#channel
a1.channels.c1.type=memory

#sinks 寫出數據 logger
a1.sinks.k1.channel=c1
a1.sinks.k1.type=logger


a1.sources=s1
a1.channels=c1
a1.sinks=k1

#設定sources 數據源監聽本機的44444端口
a1.sources.s1.channels=c1
a1.sources.s1.type=avro

#指定ip或者主機名
a1.sources.s1.bind=192.168.191.203
#指定需要監控的端口
a1.sources.s1.port=44445

#channel
a1.channels.c1.type=memory

#sinks 寫出數據 logger
a1.sinks.k1.channel=c1
a1.sinks.k1.type=logger

 

自定義:【待調研】

5、load balance【負載均衡:高可用】
Agent1是一個路由節點,負責將Channel暫存的Event均衡到對應的多個Sink組件上,而每個Sink組件分別連接到一個獨立的Agent上,當其中的一個agent失去服務時,agent1會自動切換【maxTimeOut】到相同sinkgroups中的一個,保證數據不丟失。

agent配置:引入sinkgroups

# Name the components on this agent
a1.sources = r1
a1.channels = c1
a1.sinkgroups = g1
a1.sinks = k1 k2

# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

# round輪訓
a1.sinkgroups.g1.processor.type = load_balance
a1.sinkgroups.g1.processor.backoff = true
a1.sinkgroups.g1.processor.selector = round_robin
a1.sinkgroups.g1.processor.selector.maxTimeOut=10000

# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop102
a1.sinks.k1.port = 4141

a1.sinks.k2.type = avro
a1.sinks.k2.hostname = hadoop102
a1.sinks.k2.port = 4142

# Describe the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c1

 


同一個sinkgroup的collector配置:
collector1

# Name the components on this agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1

# Describe/configure the source
a2.sources.r1.type = avro
a2.sources.r1.bind = hadoop102
a2.sources.r1.port = 4141

# Describe the sink
a2.sinks.k1.type = logger

# Describe the channel
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1


collector2配置:

# Name the components on this agent
a3.sources = r1
a3.sinks = k1
a3.channels = c2

# Describe/configure the source
a3.sources.r1.type = avro
a3.sources.r1.bind = hadoop102
a3.sources.r1.port = 4142

# Describe the sink
a3.sinks.k1.type = logger

# Describe the channel
a3.channels.c2.type = memory
a3.channels.c2.capacity = 1000
a3.channels.c2.transactionCapacity = 100

# Bind the source and sink to the channel
a3.sources.r1.channels = c2
a3.sinks.k1.channel = c2

 


Flume的安裝:
1、下載
http://mirrors.hust.edu.cn/apache/
http://flume.apache.org/download.html
2、安裝
Flume框架對hadoop和zookeeper的依賴只是在jar包上,並不要求flume啟動時必須將hadoop和zookeeper服務也啟動。
上傳服務器並解壓
tar -zxvf apache-flume-1.8.0-bin.tar.gz -C apps/
創建軟連接
ln -s apache-flume-1.8.0-bin/ flume
修改配置文件
cp flume-env.sh.template flume-env.sh
export JAVA_HOME=/usr/local/jdk1.8.0_73
配置環境變量
vi ~/.bashrc
export FLUME_HOME=/home/hadoop/apps/flume
export PATH=$PATH:$FLUME_HOME/bin
使配置生效
source ~/.bashrc
驗證【查看版本】
flume-ng version

 

Flume斷點續傳

https://blog.csdn.net/Abysscarry/article/details/89420560
1、官方現成版
條件:Flume版本1.7.0及其后的版本
來自官網:
Taildir Source
Note
This source is provided as a preview feature. It does not work on Windows.
Watch the specified files, and tail them in nearly real-time once detected new lines appended to the each files. If the new lines are being written, this source will retry reading them in wait for the completion of the write.
This source is reliable and will not miss data even when the tailing files rotate. It periodically writes the last read position of each files on the given position file in JSON format. If Flume is stopped or down for some reason, it can restart tailing from the position written on the existing position file.
In other use case, this source can also start tailing from the arbitrary position for each files using the given position file. When there is no position file on the specified path, it will start tailing from the first line of each files by default.
Files will be consumed in order of their modification time. File with the oldest modification time will be consumed first.
This source does not rename or delete or do any modifications to the file being tailed. Currently this source does not support tailing binary files. It reads text files line by line.

大概的意思是:提供的這個新的source源,會實時記錄采集文件的position記錄,每次重新采集可以從該記錄中讀取上一次的position,並從該position開始讀取,從而解決斷點續傳的問題。
先來看下配置吧

# 先看下官網提供配置示例的【沒有給出sink】
a1.sources = r1
a1.channels = c1

a1.sources.r1.type = TAILDIR
a1.sources.r1.channels = c1
a1.sources.r1.positionFile = /var/log/flume/taildir_position.json
# 文件組的空格分隔列表。每個文件組表示要跟蹤的一組文件
a1.sources.r1.filegroups = f1 f2
a1.sources.r1.filegroups.f1 = /var/log/test1/example.log
#這個header不知道怎么使用呢
a1.sources.r1.headers.f1.headerKey1 = value1
a1.sources.r1.filegroups.f2 = /var/log/test2/.*log.*
a1.sources.r1.headers.f2.headerKey1 = value2
a1.sources.r1.headers.f2.headerKey2 = value2-2
# 是否添加存儲絕對路徑文件名的頭文件。
a1.sources.r1.fileHeader = true
a1.sources.ri.maxBatchCount = 1000

以下是完整的配置實例:

pro.sources = s1
pro.channels = c1
pro.sinks = k1

pro.sources.s1.type = TAILDIR
#位置信息保存的文件
pro.sources.s1.positionFile = /home/dev/flume/flume-1.8.0/log/taildir_position.json
pro.sources.s1.filegroups = f1
pro.sources.s1.filegroups.f1 = /home/dev/log/moercredit/logstash.log
pro.sources.s1.headers.f1.headerKey1 = aaa
pro.sources.s1.fileHeader = true

pro.channels.c1.type = memory
pro.channels.c1.capacity = 1000
pro.channels.c1.transactionCapacity = 100

pro.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
pro.sinks.k1.kafka.topic = moercredit_log_test
pro.sinks.k1.kafka.bootstrap.servers = cdh1:9092,cdh2:9092,cdh3:9092
pro.sinks.k1.kafka.flumeBatchSize = 20
pro.sinks.k1.kafka.producer.acks = 1
pro.sinks.k1.kafka.producer.linger.ms = 1
pro.sinks.k1.kafka.producer.compression.type = snappy

pro.sources.s1.channels = c1
pro.sinks.k1.channel = c1

其中pro.sources.s1.positionFile = /home/dev/flume/flume-1.8.0/log/taildir_position.json 內容
其實就是個json array,每個采集文件對應一個數組元素,每個元素包含三個屬性:inode(文件唯一標識號碼)、
pos(被采集文件的最后采集位置,也就是文件的byte字節數)、
file(被采集文件的絕對路徑)

通過以上配置,再讀取的文件不變【沒有新建等】情況下,保證消費數據的斷點續傳及重復消費,如果被讀取的文件被重命名和重新創建后,依然可以保證讀取新寫入的文件內容。但如果Flume進程掛掉,同時文件又被重新創建並寫入數據,怎無法保證續傳的問題。
??看到過log4j支持不太好的博客,因為log4j日志時自動切分日志文件,且會重命名新的文件??(https://blog.csdn.net/mcj1314bb/article/details/78850748 通過源碼來支持文件重命名的博客。)

2、tail命令版
首先明確以下tail命令的-f/-F參數問題:【囧:之前一直不知道】
tail -f 當文件變了,不會再輸出
tail -F當文件變了,還會再輸出 【使用這個實現續傳】
只需改動配置文件中command命令:
tail -n +$(tail -n1 /root/log) -F /root/data/nginx.log | awk 'ARGIND==1{i=$0;next}{i++;if($0~/^tail/){i=0};print $0;print i >> "/root/log";fflush("")}' /root/log
實現思路大概是,把讀取的文件的位置【行數】實時寫入一個指定的文件中,在啟動flume時會先獲取上次讀取到的行數,然后從該文件的這一行開始消費。
路過~shell命令真心很渣~看了也不懂~
3、大神NB版【未親測】
https://baijiahao.baidu.com/po/feed/share?wfr=spider&for=pc&context=%7B%22sourceFrom%22%3A%22bjh%22%2C%22nid%22%3A%22news_3433179683779105534%22%7D
自己修改Flume source源碼!!!基本實在官方現成版的基礎上對Taildir Source組件修改和重新編譯。
下面是我寄幾梳理的【當然大部分都是別人的】
1、下載apache-flume-1.7.0源碼
2、修改Taildir Source組件中的類代碼
apache-flume-1.7.0-src\flume-ng-sources\flume-taildir-source\src\main\java\org\apache\flume\source\taildir
ReliableTaildirEventReader.java類
2.1、修改ReliableTaildirEventReader構造方法里面的updateTailFiles(skipToEnd)方法。
修改如下代碼:

if(tf==null||!tf.getPath().equals(f.getAbsolutePath()))
{
//skipToEnd如果沒有記錄讀取位置時,是否跳過文件結尾,默認false
longstartPos=skipToEnd?f.length():0;
//根據f具體文件生成TailFile
tf=openFile(f,headers,inode,startPos);
}

修改為:

if(tf==null) //去掉了絕對路徑判斷

科普一下:

Unix/linux系統內部不使用文件名,而使用inode號碼來識別文件。對於系統來說,文件名只是inode號碼便於識別的別稱或者綽號【Inode:每個文件的唯一標識(即使文件名稱改變,此標識也不會變)】。表面上,用戶通過文件名,打開文件。實際上,系統內部這個過程分成三步:

 首先,系統找到這個文件名對應的inode號碼;

其次,通過inode號碼,獲取inode信息;

最后,根據inode信息,找到文件數據所在的block,讀出數據。

/所以就算文件名相同,內部的inode號碼也是不同的,所以不能通過絕對路徑找到之前的文件

由於inode號碼與文件名分離,這種機制導致了一些Unix/Linux系統特有的現象

  1. 有時,文件名包含特殊字符,無法正常刪除。這時,直接刪除inode節點,就能起到刪除文件的作用。

  2. 移動文件或重命名文件,只是改變文件名,不影響inode號碼。

  3. 打開一個文件以后,系統就以inode號碼來識別這個文件,不再考慮文件名

2.2、修改ReliableTaildirEventReader構造方法里面的loadPositionFile(positionFilePath)方法

修改如下代碼:

if(tf!=null&&tf.updatePos(path,inode,pos)){
//更新inode與tf映射
tailFiles.put(inode,tf);
//修改為:
//將updatePos方法中的path修改為了tf.getPath(),強制修改前的文件路徑跟修改后的文件路徑一致。
if(tf!=null&&tf.updatePos(tf.getPath(),inode,pos)){

2.3、Flume編譯打包

mvnpackage對上述兩個模塊進行源碼編譯生成flume-taildirsource.jar

將flume-taildirsource.jar上傳到flumelib目錄下

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM