flume將數據發送到kafka、hdfs、hive、http、netcat等模式的使用總結


1、source為http模式,sink為logger模式,將數據在控制台打印出來。
conf配置文件如下:
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
 
# Describe/configure the source
a1.sources.r1.type = http #該設置表示接收通過http方式發送過來的數據
a1.sources.r1.bind = hadoop-master #運行flume的主機或IP地址都可以
a1.sources.r1.port = 9000#端口
#a1.sources.r1.fileHeader = true
 
# Describe the sink
a1.sinks.k1.type = logger#該設置表示將數據在控制台打印出來
 
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
 
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
啟動flume命令為:
bin/flume-ng agent -c conf -f conf/http.conf -n a1 -Dflume.root.logger=INFO,console。
顯示如下的信息表示啟動flume成功。
895 (lifecycleSupervisor-1-3) [INFO -org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:96)] Component type: SOURCE, name: r1 started
打開另外一個終端,通過http post的方式發送數據:
curl -X POST -d '[{"headers":{"timestampe":"1234567","host":"master"},"body":"badou flume"}]' hadoop-master:9000。
hadoop-master就是flume配置文件綁定的主機名,9000就是綁定的端口。
然后在運行flume的窗口就是看到如下的內容:
2018-06-12 08:24:04,472 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{timestampe=1234567, host=master} body: 62 61 64 6F 75 20 66 6C 75 6D 65 badou flume }
 
2、source為netcat(udp、tcp模式),sink為logger模式,將數據打印在控制台
conf配置文件如下:
a1.sources = r1
a1.sinks = k1
a1.channels = c1
 
a1.sources.r1.type = netcat
a1.sources.r1.bind = hadoop-master#綁定的主機名或IP地址
a1.sources.r1.port = 44444
 
a1.sinks.k1.type = logger
 
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transcationCapacity = 100
 
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
啟動flume
bin/flume-ng agent -c conf -f conf/netcat.conf -n a1 -Dflume.root.logger=INFO,console。
 
然后在另外一個終端,使用telnet發送數據:
命令為:telnet hadoop-maser 44444
 
[root@hadoop-master ~]# telnet hadoop-master 44444
Trying 192.168.194.6...
Connected to hadoop-master.
Escape character is '^]'.
顯示上面的信息表示連接flume成功,然后輸入:
12213213213
OK
12321313
OK
在flume就會收到相應的信息:
2018-06-12 08:38:51,129 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{} body: 31 32 32 31 33 32 31 33 32 31 33 0D 12213213213. }
2018-06-12 08:38:51,130 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{} body: 31 32 33 32 31 33 31 33 0D 12321313. }
 
3、source為netcat/http模式,sink為hdfs模式,將數據存儲在hdfs中。
conf配置文件如下,文件名為hdfs.conf:
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
 
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = hadoop-master
a1.sources.r1.port = 44444
 
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type =regex_filter
a1.sources.r1.interceptors.i1.regex =^[0-9]*$
a1.sources.r1.interceptors.i1.excludeEvents =true
 
# Describe the sink
#a1.sinks.k1.type = logger
a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path = hdfs:/flume/events #文件在hdfs文件系統中存放的位置
a1.sinks.k1.hdfs.filePrefix = events- #文件的前綴
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute
a1.sinks.k1.hdfs.fileType = DataStream #制定文件的存放格式,這個設置是以text的格式存放從flume傳輸過來的數據。
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
 
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
 
在hdfs文件系統中創建文件存放的路徑:
hadoop fs -mkdir /flume/event1。
 
啟動flume:
bin/flume-ng agent -c conf -f conf/hdfs.conf -n a1 -Dflume.root.logger=INFO,console
 
通過telnet模式向flume中發送文件:
telnet hadoop-master 44444
然后輸入:
aaaaaaaa
bbbbbbb
ccccccccc
dddddddddd
 
通過如下的命令hadoop fs -ls /flume/events/查看hdfs中的文件,可以看到hdfs中有/flume/events有如下文件:
-rw-r--r-- 3 root supergroup 16 2018-06-05 06:02 /flume/events/events-.1528203709070
-rw-r--r-- 3 root supergroup 5 2018-06-05 06:02 /flume/events/events-.1528203755556
-rw-r--r-- 3 root supergroup 11 2018-06-05 06:03 /flume/events/events-.1528203755557
-rw-r--r-- 3 root supergroup 26 2018-06-13 07:28 /flume/events/events-.1528900112215
-rw-r--r-- 3 root supergroup 209 2018-06-13 07:29 /flume/events/events-.1528900112216
-rw-r--r-- 3 root supergroup 72 2018-06-13 07:29 /flume/events/events-.1528900112217
通過hadoop fs -cat /flume/events/events-.1528900112216查看文件events-.1528900112216的內容:
aaaaaaaaaaaaaaaaa
bbbbbbbbbbbbbbbb
ccccccccccccccccccc
dddddddddddddddd
eeeeeeeeeeeeeeeeeee
fffffffffffffffffffffff
gggggggggggggggggg
hhhhhhhhhhhhhhhhhhhhhhh
iiiiiiiiiiiiiiiiiii
jjjjjjjjjjjjjjjjjjj
 
http模式就是把hdfs.conf文件中的netcat改為http,然后傳輸文件從telnet改為:
curl -X POST -d '[{"headers":{"timestampe":"1234567","host":"master"},"body":"badou flume"}]' hadoop-master:44444。
在hadoop文件中就會看到上面命令傳輸的內容:badou flume。
 
4、source為netcat/http模式,sink為hive模式,將數據存儲在hive中,並分區存儲。
conf配置如下,文件名為hive.conf:
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = hadoop-master
a1.sources.r1.port = 44444
 
# Describe the sink
#a1.sinks.k1.type = logger
a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = hive
a1.sinks.k1.hive.metastore=thrift://hadoop-master:9083
a1.sinks.k1.hive.database=default#hive數據庫名
a1.sinks.k1.hive.table=flume_user1
a1.sinks.k1.serializer=DELIMITED
a1.sinks.k1.hive.partition=3#如果以netcat模式,只能靜態設置分區的值,因為netcat模式傳輸數據,無法傳輸某個字段的值,只能按照順序來。這里設置age的分區值為3。
#a1.sinks.k1.hive.partition=%{age}#如果以http或json等模式,只能動態設置分區的值,因為http模式可以動態傳輸age的值。
a1.sinks.k1.serializer.delimiter=" "
a1.sinks.k1.serializer.serderSeparator=' '
a1.sinks.k1.serializer.fieldnames=user_id,user_name
a1.sinks.k1.hive.txnsPerBatchAsk = 10
a1.sinks.k1.hive.batchSize = 1500
 
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
 
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
 
在hive中創建表:
create table flume_user(
user_id int
,user_name string
)
partitioned by(age int)
clustered by (user_id) into 2 buckets
stored as orc
 
在hive-site.xml中添加如下內容:
<property>
<name>javax.jdo.option.ConnectionPassword</name>
<value>hive</value>
<description>password to use against metastore database</description>
</property>
 
<property>
<name>hive.support.concurrency</name>
<value>true</value>
</property>
<property>
<name>hive.exec.dynamic.partition.mode</name>
<value>nonstrict</value>
</property>
<property>
<name>hive.txn.manager</name>
<value>org.apache.hadoop.hive.ql.lockmgr.DbTxnManager</value>
</property>
<property>
<name>hive.compactor.initiator.on</name>
<value>true</value>
</property>
<property>
<name>hive.compactor.worker.threads</name>
<value>1</value>
</property>
 
將hive根目錄下的/hcatalog/share/hcatalog文件夾中的如下三個文件夾添加到flume的lib目錄下。
運行flume:
bin/flume-ng agent -c conf -f conf/hive.conf -n a1 -Dflume.root.logger=INFO,console。
 
重新打開一個窗口,
啟動metastroe服務:
hive --service metastore &
重新打開一個客戶端,通過telnet連接到flume
telnet hadoop-master 44444
然后輸入:
1 1
3 3
就會在hive中看到如下兩行數據:
flume_user1.user_id flume_user1.user_name flume_user1.age
1 1 3
3 3 3
age是在hive.conf中設置的值3。
 
現在將flume的source換成http模式,然后hive分區通過參數模式動態的傳輸分區值。
將hive.conf中的
a1.sources.r1.type = netcat改成a1.sources.r1.type = http
a1.sinks.k1.hive.partition=3改成a1.sinks.k1.hive.partition=%{age}。
然后啟動flume:
bin/flume-ng agent -c conf -f conf/hive.conf -n a1 -Dflume.root.logger=INFO,console。
在重新打開的窗口中通過http的模式傳輸數據到flume
curl -X POST -d '[{"headers":{"age":"109"},"body":"11 ligongong"}]' hadoop-master:44444。
在hive中可以看到如下的數據:
flume_user1.user_id flume_user1.user_name flume_user1.age
11 ligongong 109
由此可以看出通過http模式傳輸數據到hive中時,分區字段的信息是在header中傳輸,而其他字段的信息是放在bady中傳輸,並且不同列之間以hive.conf文件定義好的分隔符分隔。
 
5、使用avro模式,將數據在控制台打印出來。
不同的agent之間傳輸數據只能通過avro模式。
這里我們需要兩台服務器來演示avro的使用,兩台服務器分別是hadoop-master和hadoop-slave2
hadoop-master中運行agent2,然后指定agent2的sink為avro,並且將數據發送的主機名設置為hadoop-slave2。hadoop-master中flume的conf文件設置如下,名字為push.conf:
#Name the components on this agent
a2.sources= r1
a2.sinks= k1
a2.channels= c1
 
#Describe/configure the source
a2.sources.r1.type= netcat
a2.sources.r1.bind= hadoop-master
a2.sources.r1.port = 44444
a2.sources.r1.channels= c1
 
#Use a channel which buffers events in memory
a2.channels.c1.type= memory
a2.channels.c1.keep-alive= 10
a2.channels.c1.capacity= 100000
a2.channels.c1.transactionCapacity= 100000
 
#Describe/configure the source
a2.sinks.k1.type= avro#制定sink為avro
a2.sinks.k1.channel= c1
a2.sinks.k1.hostname= hadoop-slave2#指定sink要發送數據到的目的服務器名
a2.sinks.k1.port= 44444#目的服務器的端口
 
 
hadoop-slave2中運行的是agent1,agent1的source為avro。flume配置內容如下,文件名為pull.conf
#Name the components on this agent
a1.sources= r1
a1.sinks= k1
a1.channels= c1
 
#Describe/configure the source
a1.sources.r1.type= avro
a1.sources.r1.channels= c1
a1.sources.r1.bind= hadoop-slave2
a1.sources.r1.port= 44444
 
#Describe the sink
a1.sinks.k1.type= logger
a1.sinks.k1.channel = c1
 
#Use a channel which buffers events in memory
a1.channels.c1.type= memory
a1.channels.c1.keep-alive= 10
a1.channels.c1.capacity= 100000
a1.channels.c1.transactionCapacity= 100000。
現在hadoop-slave2中啟動flume,然后在hadoop-master中啟動flume,順序一定要對,否則會報如下的錯誤:org.apache.flume.FlumeException: java.net.SocketException: Unresolved address
 
在hadoop-slave2中啟動flume:
bin/flume-ng agent -c conf -f conf/pull.conf -n a1 -Dflume.root.logger=INFO,console
在hadoop-master中啟動flume:
bin/flume-ng agent -c conf -f conf/push.conf -n a2 -Dflume.root.logger=INFO,console
 
重新打開一個窗口,通過telnet連接到hadoop-master
telnet hadoop-master 44444
然后發送11111aaaa
在hadoop-slave2的控制台中就會顯示之前發送的,11111aaaa,如下所示:
2018-06-14 06:43:00,686 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{} body: 31 31 31 31 31 61 61 61 61 0D 11111aaaa. }
 
 
6、通過flume將數據通傳輸到kafka,然后通過kafka將數據存儲在hdfs和hive中。
首先要配置kafka。配置kafka請參考: https://blog.csdn.net/zxy987872674/article/details/72466504
在分別在hadoop-master、hadoop-slave1、hadoop-slave2上啟動zookeeper。
命令為:
然后啟動kafka,進入kafka的安裝目錄,執行命令:
./bin/kafka-server-start.sh config/server.properties &
在kafka中創建topic:
bin/kafka-topics.sh --create --zookeeper hadoop-master:2181,hadoop-slave1:2181,hadoop-slave2:2181 --replication-factor 1 --partitions 2 --topic flume_kafka
 
查看kafka中的topic:
bin/kafka-topics.sh --list --zookeeper hadoop-master:2181,hadoop-slave1:2181,hadoop-slave2:2181
 
啟動kafka的消費者:
./kafka-console-consumer.sh --zookeeper hadoop-master:2181,hadoop-slave1:2181,hadoop-slave2:2181 --topic flume_kafka
 
配置flume中conf文件,設置source類型為exec,sink為org.apache.flume.sink.kafka.KafkaSink,設置kafka的topic為上面創建的flume_kafka,具體配置如下:
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
 
# Describe/configure the source
#設置sources的類型為exec,就是執行命令的意思
a1.sources.r1.type = exec
#設置sources要執行的命令
a1.sources.r1.command = tail -f /home/hadoop/flumeHomeWork/flumeCode/flume_exec_test.txt
 
# 設置kafka接收器
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
# 設置kafka的broker地址和端口號
a1.sinks.k1.brokerList=hadoop-master:9092
# 設置Kafka的topic
a1.sinks.k1.topic=flume_kafka
# 設置序列化的方式
a1.sinks.k1.serializer.class=kafka.serializer.StringEncoder
 
# use a channel which buffers events in memory
a1.channels.c1.type=memory
a1.channels.c1.capacity = 100000
a1.channels.c1.transactionCapacity = 1000
 
# Bind the source and sink to the channel
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1
 
啟動flume:
只要/home/hadoop/flumeHomeWork/flumeCode/flume_exec_test.txt中有數據時flume就會加載kafka中,然后被上面啟動的kafka消費者消費掉。
我們查看發現/home/hadoop/flumeHomeWork/flumeCode/flume_exec_test.txt文件中有如下的數據:
131,dry pasta
131,dry pasta
132,beauty
133,muscles joints pain relief
133,muscles joints pain relief
133,muscles joints pain relief
133,muscles joints pain relief
134,specialty wines champagnes
134,specialty wines champagnes
134,specialty wines champagnes

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM