注: 環境: skylin-linux
Flume的下載方式:
wget http://www.apache.org/dyn/closer.lua/flume/1.6.0/apache-flume-1.6.0-bin.tar.
下載完成之后,使用tar進行解壓
tar -zvxf apache-flume-1.6..0-bin.tar.
進入flume的conf配置包中,使用命令touch flume.conf,然后cp flume-conf.properties.template flume.conf
使vim/gedit flume.conf 編輯配置文件,需要說明的的是,Flume conf文件用的是Java版的property文件的key-value鍵值對模式.
在Flume配置文件中,我們需要
1. 需要命名當前使用的Agent的名稱.
2. 命名Agent下的source的名字.
3. 命名Agent下的channal的名字.
4. 命名Agent下的sink的名字.
5. 將source和sink通過channal綁定起來.
一般來說,在Flume中會存在着多個Agent,所以我們需要給它們分別取一個名字來區分它們,注意名字不要相同,名字保持唯一!
例如:
#Agent取名為 agent_name #source 取名為 source_name ,一次類推 agent_name.source = source_name agent_name.channels = channel_name agent_name.sinks = sink_name
上圖對應的是單個Agent,單個sink,單個channel情況,如下圖
如果我們需要在一個Agent上配置n個sink,m個channel(n>1, m>1),
那么只需要這樣配置即可:
#Agent取名為 agent_name #source 取名為 source_name ,一次類推 agent_name.source = source_name ,source_name1 agent_name.channels = channel_name,channel_name1 agent_name.sinks = sink_name,sink_name1
上面的配置就表示一個Agent中有兩個 source,sink,channel的情況,如圖所示
以上是對多sink,channel,source情況,對於 多個Agent,只需要給每個Agent取一個獨一無二的名字即可!
Flume支持各種各樣的sources,sinks,channels,它們支持的類型如下:
Sources | Channels | Sinks |
---|---|---|
|
|
|
以上的類型,你可以根據自己的需求來搭配組合使用,當然如果你願意,你可以為所欲為的搭配.比如我們使用Avro source類型,采用Memory channel,使用HDFS sink存儲,那我們的配置可以接着上的配置這樣寫
#Agent取名為 agent_name #source 取名為 source_name ,一次類推 agent_name.source = Avro agent_name.channels = MemoryChannel agent_name.sinks = HDFS
當你命名好Agent的組成部分后,你還需要對Agent的組成sources , sinks, channles去一一描述. 下面我們來逐一的細說;
Source的配置
注: 需要特別說明,在Agent中對於存在的N(N>1)個source,其中的每一個source都需要單獨進行配置,首先我們需要對source的type進行設置,然后在對每一個type進行對應的屬性設置.其通用的模式如下:
agent_name.sources. source_name.type = value agent_name.sources. source_name.property2 = value agent_name.sources. source_name.property3 = value
具體的例子,比如我們Source選用的是Avro模式
#Agent取名為 agent_name #source 取名為 source_name ,一次類推 agent_name.source = Avro agent_name.channels = MemoryChannel agent_name.sinks = HDFS #——————————sourcec配置——————————————# agent_name.source.Avro.type = avro agent_name.source.Avro.bind = localhost agent_name.source.Avro.port = 9696 #將source綁定到MemoryChannel管道上 agent_name.source.Avro.channels = MemoryChannel
Channels的配置
Flume在source和sink配間提供各種管道(channels)來傳遞數據.因而和source一樣,它也需要配置屬性,同source一樣,對於N(N>0)個channels,
需要單個對它們注意設置屬性,它們的通用模板為:
agent_name.channels.channel_name.type = value agent_name.channels.channel_name. property2 = value agent_name.channels.channel_name. property3 = value
具體的例子,假如我們選用memory channel類型,那么我先要配置管道的類型
agent_name.channels.MemoryChannel.type = memory
但是我們現在只是設置好了管道自個兒屬性,我們還需要將其和sink,source鏈接起來,也就是綁定,綁定設置如下,我們可以分別寫在source,sink處,也可以集中寫在channel處
agent_name.sources.Avro.channels = MemoryChannel
agent_name.sinks.HDFS.channels = MemoryCHannel
Sink的配置
sink的配置和Source配置類似,它的通用格式:
agent_name.sinks. sink_name.type = value agent_name.sinks. sink_name.property2 = value agent_name.sinks. sink_name.property3 = value
具體例子,比如我們設置Sink類型為HDFS ,那么我們的配置單就如下:
agent_name.sinks.HDFS.type = hdfs
agent_name.sinks.HDFS.path = HDFS‘s path
以上就是對Flume的配置文件詳細介紹,下面在補全一張完整的配置圖:
# Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. # The configuration file needs to define the sources, # the channels and the sinks. # Sources, channels and sinks are defined per agent, # in this case called 'agent' #define agent agent.sources = seqGenSrc agent.channels = memoryChannel agent.sinks = loggerSink kafkaSink # # For each one of the sources, the type is defined #默認模式 agent.sources.seqGenSrc.type = seq / netcat / avro agent.sources.seqGenSrc.type = avro agent.sources.seqGenSrc.bind = localhost agent.sources.seqGenSrc.port = 9696 #####數據來源#### #agent.sources.seqGenSrc.coommand = tail -F /home/gongxijun/Qunar/data/data.log # The channel can be defined as follows. agent.sources.seqGenSrc.channels = memoryChannel #+++++++++++++++定義sink+++++++++++++++++++++# # Each sink's type must be defined agent.sinks.loggerSink.type = logger agent.sinks.loggerSink.type = hbase agent.sinks.loggerSink.channel = memoryChannel #表名 agent.sinks.loggerSink.table = flume #列名 agent.sinks.loggerSink.columnFamily= gxjun agent.sinks.loggerSink.serializer = org.apache.flume.sink.hbase.MyHbaseEventSerializer #agent.sinks.loggerSink.serializer = org.apache.flume.sink.hbase.RegexHbaseEventSerializer agent.sinks.loggerSink.zookeeperQuorum=localhost:2181 agent.sinks.loggerSink.znodeParent= /hbase #Specify the channel the sink should use agent.sinks.loggerSink.channel = memoryChannel # Each channel's type is defined. #memory agent.channels.memoryChannel.type = memory agent.channels.memortChhannel.keep-alive = 10 # Other config values specific to each type of channel(sink or source) # can be defined as well # In this case, it specifies the capacity of the memory channel #agent.channels.memoryChannel.checkpointDir = /home/gongxijun/Qunar/data #agent.channels.memoryChannel.dataDirs = /home/gongxijun/Qunar/data , /home/gongxijun/Qunar/tmpData agent.channels.memoryChannel.capacity = 10000000 agent.channels.memoryChannel.transactionCapacity = 10000 #define the sink2 kafka #+++++++++++++++定義sink+++++++++++++++++++++# # Each sink's type must be defined agent.sinks.kafkaSink.type = logger agent.sinks.kafkaSink.type = org.apache.flume.sink.kafka.KafkaSink agent.sinks.kafkaSink.channel = memoryChannel #agent.sinks.kafkaSink.server=localhost:9092 agent.sinks.kafkaSink.topic= kafka-topic agent.sinks.kafkaSink.batchSize = 20 agent.sinks.kafkaSink.brokerList = localhost:9092 #Specify the channel the sink should use agent.sinks.kafkaSink.channel = memoryChannel
該配置類型如下如所示:
參考資料:
http://www.tutorialspoint.com/apache_flume/apache_flume_configuration.htm
作者: 龔細軍
引用請注明出處:http://www.cnblogs.com/gongxijun/p/5661037.html