Flume安裝成功,環境變量配置成功后,開始進行agent配置文件設置。
1.agent配置文件(mysql+flume+Kafka)
#利用Flume將MySQL表數據准實時抽取到Kafka
a1.channels = c1
a1.sinks = k1
a1.sources = s1
#sources(mysql)
a1.sources.s1.type = org.keedio.flume.source.SQLSource
a1.sources.s1.channels = c1
a1.sources.s1.connection.url = jdbc:mysql://192.168.121.4:3306/alarm
a1.sources.s1.user = root
a1.sources.s1.password = root
a1.sources.s1.table = alarm_query
a1.sources.s1.columns.to.select = *
a1.sources.s1.incremental.column.name = id
a1.sources.s1.incremental.value = 0
a1.sources.s1.run.query.delay=5000
#source狀態寫入路徑(必須存在且可寫入)
a1.sources.s1.status.file.path = /opt/apps/flume-1.6.0-cdh5.14.4-bin
a1.sources.s1.status.file.name = sqlsource.status
#channels(memory)
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
#sinks(kafka)
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
# kfk29,kfk30,kfk31對應主機hosts配置的Kafka主機
a1.sinks.k1.brokerList= D-QP-Safe-4:9092, D-QP-Safe-5:9092, D-QP-Safe-6:9092
a1.sinks.k1.topic=qpdy
a1.sinks.k1.requiredAcks = 1
a1.sinks.k1.batchSize = 2
a1.sinks.k1.channel = c1
2.配置准備
2.1創建flume狀態寫入的文件夾和文件
mkdir /var/lib/flume
vi s1.status
給文件寫入的權力 chmod 777 s1.status
2.2將flume內存空間設置增大(開始時沒有進行設置,結果報了內存溢出的錯誤)
在flume啟動腳本flume-ng中,修改JAVA_OPTS="-Xmx20m"為JAVA_OPTS="-Xmx10240m"
此處將堆內存的閾值跳轉到了10G,實際生產環境中可以根據具體的硬件情況作出調整
2.3添加主機對應的kafka主機
(flume.conf配置文件需要添加主機對應的Kafka主機,否則無法找到對應的sink)
# vim /etc/hosts
#添加主機對應的kafka主機
192.168.241.229 D-QP-Safe-4
192.168.241.230 D-QP-Safe-5
192.168.241.231 D-QP-Safe-6
2.4向flume安裝目標的/lib目錄下添加啟動mysql,Kafka等的jar包
3.啟動flume
要在flume的安裝目錄的bin目錄下啟動
#啟動命令
flume-ng agent -c /opt/apps/flume-1.6.0-cdh5.14.4-bin/conf -f /opt/apps/flume-1.6.0-cdh5.14.4-bin/conf/flume.conf -n a1 -Dflume.root.logger=INFO,console
a1為配置的agent名,-c和-f后是flume的安裝路徑(必須一致才能啟動成功)
4.查看flume進程
ps -aux | grep flume
如果存在多個進程必須將多余進程kill
為了避免一個個的kill,我們需要提取flume的進程號:
ps -aux | grep flume | awk '{print $2}'
然后全部刪除
ps -aux | grep flume | awk '{print $2}' | xargs kill
以上,拉取mysql數據庫數據到Kafka就配置好了