1、source為http模式,sink為logger模式,將數據在控制台打印出來。
conf配置文件如下:
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = http #該設置表示接收通過http方式發送過來的數據
a1.sources.r1.bind = hadoop-master #運行flume的主機或IP地址都可以
a1.sources.r1.port = 9000#端口
#a1.sources.r1.fileHeader = true
# Describe the sink
a1.sinks.k1.type = logger#該設置表示將數據在控制台打印出來
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
啟動flume命令為:
bin/flume-ng agent -c conf -f conf/http.conf -n a1 -Dflume.root.logger=INFO,console。
顯示如下的信息表示啟動flume成功。
895 (lifecycleSupervisor-1-3) [INFO -org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:96)] Component type: SOURCE, name: r1 started
打開另外一個終端,通過http post的方式發送數據:
curl -X POST -d '[{"headers":{"timestampe":"1234567","host":"master"},"body":"badou flume"}]' hadoop-master:9000。
hadoop-master就是flume配置文件綁定的主機名,9000就是綁定的端口。
然后在運行flume的窗口就是看到如下的內容:
2018-06-12 08:24:04,472 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{timestampe=1234567, host=master} body: 62 61 64 6F 75 20 66 6C 75 6D 65 badou flume }
2、source為netcat(udp、tcp模式),sink為logger模式,將數據打印在控制台
conf配置文件如下:
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = netcat
a1.sources.r1.bind = hadoop-master#綁定的主機名或IP地址
a1.sources.r1.port = 44444
a1.sinks.k1.type = logger
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transcationCapacity = 100
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
啟動flume
bin/flume-ng agent -c conf -f conf/netcat.conf -n a1 -Dflume.root.logger=INFO,console。
然后在另外一個終端,使用telnet發送數據:
命令為:telnet hadoop-maser 44444
[root@hadoop-master ~]# telnet hadoop-master 44444
Trying 192.168.194.6...
Connected to hadoop-master.
Escape character is '^]'.
顯示上面的信息表示連接flume成功,然后輸入:
12213213213
OK
12321313
OK
在flume就會收到相應的信息:
2018-06-12 08:38:51,129 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{} body: 31 32 32 31 33 32 31 33 32 31 33 0D 12213213213. }
2018-06-12 08:38:51,130 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{} body: 31 32 33 32 31 33 31 33 0D 12321313. }
3、source為netcat/http模式,sink為hdfs模式,將數據存儲在hdfs中。
conf配置文件如下,文件名為hdfs.conf:
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = hadoop-master
a1.sources.r1.port = 44444
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type =regex_filter
a1.sources.r1.interceptors.i1.regex =^[0-9]*$
a1.sources.r1.interceptors.i1.excludeEvents =true
# Describe the sink
#a1.sinks.k1.type = logger
a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path = hdfs:/flume/events #文件在hdfs文件系統中存放的位置
a1.sinks.k1.hdfs.filePrefix = events- #文件的前綴
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute
a1.sinks.k1.hdfs.fileType = DataStream #制定文件的存放格式,這個設置是以text的格式存放從flume傳輸過來的數據。
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
在hdfs文件系統中創建文件存放的路徑:
hadoop fs -mkdir /flume/event1。
啟動flume:
bin/flume-ng agent -c conf -f conf/hdfs.conf -n a1 -Dflume.root.logger=INFO,console
通過telnet模式向flume中發送文件:
telnet hadoop-master 44444
然后輸入:
aaaaaaaa
bbbbbbb
ccccccccc
dddddddddd
通過如下的命令hadoop fs -ls /flume/events/查看hdfs中的文件,可以看到hdfs中有/flume/events有如下文件:
-rw-r--r-- 3 root supergroup 16 2018-06-05 06:02 /flume/events/events-.1528203709070
-rw-r--r-- 3 root supergroup 5 2018-06-05 06:02 /flume/events/events-.1528203755556
-rw-r--r-- 3 root supergroup 11 2018-06-05 06:03 /flume/events/events-.1528203755557
-rw-r--r-- 3 root supergroup 26 2018-06-13 07:28 /flume/events/events-.1528900112215
-rw-r--r-- 3 root supergroup 209 2018-06-13 07:29 /flume/events/events-.1528900112216
-rw-r--r-- 3 root supergroup 72 2018-06-13 07:29 /flume/events/events-.1528900112217
通過hadoop fs -cat /flume/events/events-.1528900112216查看文件events-.1528900112216的內容:
aaaaaaaaaaaaaaaaa
bbbbbbbbbbbbbbbb
ccccccccccccccccccc
dddddddddddddddd
eeeeeeeeeeeeeeeeeee
fffffffffffffffffffffff
gggggggggggggggggg
hhhhhhhhhhhhhhhhhhhhhhh
iiiiiiiiiiiiiiiiiii
jjjjjjjjjjjjjjjjjjj
http模式就是把hdfs.conf文件中的netcat改為http,然后傳輸文件從telnet改為:
curl -X POST -d '[{"headers":{"timestampe":"1234567","host":"master"},"body":"badou flume"}]' hadoop-master:44444。
在hadoop文件中就會看到上面命令傳輸的內容:badou flume。
4、source為netcat/http模式,sink為hive模式,將數據存儲在hive中,並分區存儲。
conf配置如下,文件名為hive.conf:
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = hadoop-master
a1.sources.r1.port = 44444
# Describe the sink
#a1.sinks.k1.type = logger
a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = hive
a1.sinks.k1.hive.metastore=thrift://hadoop-master:9083
a1.sinks.k1.hive.database=default#hive數據庫名
a1.sinks.k1.hive.table=flume_user1
a1.sinks.k1.serializer=DELIMITED
a1.sinks.k1.hive.partition=3#如果以netcat模式,只能靜態設置分區的值,因為netcat模式傳輸數據,無法傳輸某個字段的值,只能按照順序來。這里設置age的分區值為3。
#a1.sinks.k1.hive.partition=%{age}#如果以http或json等模式,只能動態設置分區的值,因為http模式可以動態傳輸age的值。
a1.sinks.k1.serializer.delimiter=" "
a1.sinks.k1.serializer.serderSeparator=' '
a1.sinks.k1.serializer.fieldnames=user_id,user_name
a1.sinks.k1.hive.txnsPerBatchAsk = 10
a1.sinks.k1.hive.batchSize = 1500
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
在hive中創建表:
create table flume_user(
user_id int
,user_name string
)
partitioned by(age int)
clustered by (user_id) into 2 buckets
stored as orc
在hive-site.xml中添加如下內容:
<property>
<name>javax.jdo.option.ConnectionPassword</name>
<value>hive</value>
<description>password to use against metastore database</description>
</property>
<property>
<name>hive.support.concurrency</name>
<value>true</value>
</property>
<property>
<name>hive.exec.dynamic.partition.mode</name>
<value>nonstrict</value>
</property>
<property>
<name>hive.txn.manager</name>
<value>org.apache.hadoop.hive.ql.lockmgr.DbTxnManager</value>
</property>
<property>
<name>hive.compactor.initiator.on</name>
<value>true</value>
</property>
<property>
<name>hive.compactor.worker.threads</name>
<value>1</value>
</property>
將hive根目錄下的/hcatalog/share/hcatalog文件夾中的如下三個文件夾添加到flume的lib目錄下。
運行flume:
bin/flume-ng agent -c conf -f conf/hive.conf -n a1 -Dflume.root.logger=INFO,console。
重新打開一個窗口,
啟動metastroe服務:
hive --service metastore &
重新打開一個客戶端,通過telnet連接到flume
telnet hadoop-master 44444
然后輸入:
1 1
3 3
就會在hive中看到如下兩行數據:
flume_user1.user_id flume_user1.user_name flume_user1.age
1 1 3
3 3 3
age是在hive.conf中設置的值3。
現在將flume的source換成http模式,然后hive分區通過參數模式動態的傳輸分區值。
將hive.conf中的
a1.sources.r1.type = netcat改成a1.sources.r1.type = http
a1.sinks.k1.hive.partition=3改成a1.sinks.k1.hive.partition=%{age}。
然后啟動flume:
bin/flume-ng agent -c conf -f conf/hive.conf -n a1 -Dflume.root.logger=INFO,console。
在重新打開的窗口中通過http的模式傳輸數據到flume
curl -X POST -d '[{"headers":{"age":"109"},"body":"11 ligongong"}]' hadoop-master:44444。
在hive中可以看到如下的數據:
flume_user1.user_id flume_user1.user_name flume_user1.age
11 ligongong 109
由此可以看出通過http模式傳輸數據到hive中時,分區字段的信息是在header中傳輸,而其他字段的信息是放在bady中傳輸,並且不同列之間以hive.conf文件定義好的分隔符分隔。
5、使用avro模式,將數據在控制台打印出來。
不同的agent之間傳輸數據只能通過avro模式。
這里我們需要兩台服務器來演示avro的使用,兩台服務器分別是hadoop-master和hadoop-slave2
hadoop-master中運行agent2,然后指定agent2的sink為avro,並且將數據發送的主機名設置為hadoop-slave2。hadoop-master中flume的conf文件設置如下,名字為push.conf:
#Name the components on this agent
a2.sources= r1
a2.sinks= k1
a2.channels= c1
#Describe/configure the source
a2.sources.r1.type= netcat
a2.sources.r1.bind= hadoop-master
a2.sources.r1.port = 44444
a2.sources.r1.channels= c1
#Use a channel which buffers events in memory
a2.channels.c1.type= memory
a2.channels.c1.keep-alive= 10
a2.channels.c1.capacity= 100000
a2.channels.c1.transactionCapacity= 100000
#Describe/configure the source
a2.sinks.k1.type= avro#制定sink為avro
a2.sinks.k1.channel= c1
a2.sinks.k1.hostname= hadoop-slave2#指定sink要發送數據到的目的服務器名
a2.sinks.k1.port= 44444#目的服務器的端口
hadoop-slave2中運行的是agent1,agent1的source為avro。flume配置內容如下,文件名為pull.conf
#Name the components on this agent
a1.sources= r1
a1.sinks= k1
a1.channels= c1
#Describe/configure the source
a1.sources.r1.type= avro
a1.sources.r1.channels= c1
a1.sources.r1.bind= hadoop-slave2
a1.sources.r1.port= 44444
#Describe the sink
a1.sinks.k1.type= logger
a1.sinks.k1.channel = c1
#Use a channel which buffers events in memory
a1.channels.c1.type= memory
a1.channels.c1.keep-alive= 10
a1.channels.c1.capacity= 100000
a1.channels.c1.transactionCapacity= 100000。
現在hadoop-slave2中啟動flume,然后在hadoop-master中啟動flume,順序一定要對,否則會報如下的錯誤:org.apache.flume.FlumeException: java.net.SocketException: Unresolved address
在hadoop-slave2中啟動flume:
bin/flume-ng agent -c conf -f conf/pull.conf -n a1 -Dflume.root.logger=INFO,console
在hadoop-master中啟動flume:
bin/flume-ng agent -c conf -f conf/push.conf -n a2 -Dflume.root.logger=INFO,console
重新打開一個窗口,通過telnet連接到hadoop-master
telnet hadoop-master 44444
然后發送11111aaaa
在hadoop-slave2的控制台中就會顯示之前發送的,11111aaaa,如下所示:
2018-06-14 06:43:00,686 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{} body: 31 31 31 31 31 61 61 61 61 0D 11111aaaa. }
6、通過flume將數據通傳輸到kafka,然后通過kafka將數據存儲在hdfs和hive中。
首先要配置kafka。配置kafka請參考:
https://blog.csdn.net/zxy987872674/article/details/72466504
在分別在hadoop-master、hadoop-slave1、hadoop-slave2上啟動zookeeper。
命令為:
然后啟動kafka,進入kafka的安裝目錄,執行命令:
./bin/kafka-server-start.sh config/server.properties &
在kafka中創建topic:
bin/kafka-topics.sh --create --zookeeper hadoop-master:2181,hadoop-slave1:2181,hadoop-slave2:2181 --replication-factor 1 --partitions 2 --topic flume_kafka
查看kafka中的topic:
bin/kafka-topics.sh --list --zookeeper hadoop-master:2181,hadoop-slave1:2181,hadoop-slave2:2181
啟動kafka的消費者:
./kafka-console-consumer.sh --zookeeper hadoop-master:2181,hadoop-slave1:2181,hadoop-slave2:2181 --topic flume_kafka
配置flume中conf文件,設置source類型為exec,sink為org.apache.flume.sink.kafka.KafkaSink,設置kafka的topic為上面創建的flume_kafka,具體配置如下:
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
#設置sources的類型為exec,就是執行命令的意思
a1.sources.r1.type = exec
#設置sources要執行的命令
a1.sources.r1.command = tail -f /home/hadoop/flumeHomeWork/flumeCode/flume_exec_test.txt
# 設置kafka接收器
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
# 設置kafka的broker地址和端口號
a1.sinks.k1.brokerList=hadoop-master:9092
# 設置Kafka的topic
a1.sinks.k1.topic=flume_kafka
# 設置序列化的方式
a1.sinks.k1.serializer.class=kafka.serializer.StringEncoder
# use a channel which buffers events in memory
a1.channels.c1.type=memory
a1.channels.c1.capacity = 100000
a1.channels.c1.transactionCapacity = 1000
# Bind the source and sink to the channel
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1
啟動flume:
只要/home/hadoop/flumeHomeWork/flumeCode/flume_exec_test.txt中有數據時flume就會加載kafka中,然后被上面啟動的kafka消費者消費掉。
我們查看發現/home/hadoop/flumeHomeWork/flumeCode/flume_exec_test.txt文件中有如下的數據:
131,dry pasta
131,dry pasta
132,beauty
133,muscles joints pain relief
133,muscles joints pain relief
133,muscles joints pain relief
133,muscles joints pain relief
134,specialty wines champagnes
134,specialty wines champagnes
134,specialty wines champagnes