FLUME安裝&環境(二):拉取MySQL數據庫數據到Kafka


Flume安裝成功,環境變量配置成功后,開始進行agent配置文件設置。

 

1.agent配置文件(mysql+flume+Kafka)

#利用Flume將MySQL表數據准實時抽取到Kafka

a1.channels = c1

a1.sinks = k1

a1.sources = s1

 

#sources(mysql)

a1.sources.s1.type = org.keedio.flume.source.SQLSource

a1.sources.s1.channels = c1

a1.sources.s1.connection.url = jdbc:mysql://192.168.121.4:3306/alarm

a1.sources.s1.user = root

a1.sources.s1.password = root

a1.sources.s1.table = alarm_query

a1.sources.s1.columns.to.select = *

a1.sources.s1.incremental.column.name = id

a1.sources.s1.incremental.value = 0

a1.sources.s1.run.query.delay=5000

#source狀態寫入路徑(必須存在且可寫入)

a1.sources.s1.status.file.path = /opt/apps/flume-1.6.0-cdh5.14.4-bin

a1.sources.s1.status.file.name = sqlsource.status

 

#channels(memory)

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

 

#sinks(kafka)

a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink

# kfk29,kfk30,kfk31對應主機hosts配置的Kafka主機

a1.sinks.k1.brokerList= D-QP-Safe-4:9092, D-QP-Safe-5:9092, D-QP-Safe-6:9092

a1.sinks.k1.topic=qpdy

a1.sinks.k1.requiredAcks = 1

a1.sinks.k1.batchSize = 2

a1.sinks.k1.channel = c1

 

2.配置准備

2.1創建flume狀態寫入的文件夾和文件

mkdir  /var/lib/flume

vi s1.status

給文件寫入的權力 chmod 777 s1.status

 

2.2將flume內存空間設置增大(開始時沒有進行設置,結果報了內存溢出的錯誤)

在flume啟動腳本flume-ng中,修改JAVA_OPTS="-Xmx20m"為JAVA_OPTS="-Xmx10240m"

此處將堆內存的閾值跳轉到了10G,實際生產環境中可以根據具體的硬件情況作出調整

2.3添加主機對應的kafka主機

(flume.conf配置文件需要添加主機對應的Kafka主機,否則無法找到對應的sink)

# vim /etc/hosts

#添加主機對應的kafka主機

192.168.241.229    D-QP-Safe-4

192.168.241.230    D-QP-Safe-5

192.168.241.231    D-QP-Safe-6

2.4向flume安裝目標的/lib目錄下添加啟動mysql,Kafka等的jar包

3.啟動flume

要在flume的安裝目錄的bin目錄下啟動

#啟動命令

flume-ng agent -c /opt/apps/flume-1.6.0-cdh5.14.4-bin/conf -f /opt/apps/flume-1.6.0-cdh5.14.4-bin/conf/flume.conf -n a1 -Dflume.root.logger=INFO,console

 

a1為配置的agent名,-c和-f后是flume的安裝路徑(必須一致才能啟動成功)

 

4.查看flume進程

ps -aux | grep flume

如果存在多個進程必須將多余進程kill

為了避免一個個的kill,我們需要提取flume的進程號:

ps -aux | grep flume | awk '{print $2}'

然后全部刪除

ps -aux | grep flume | awk '{print $2}' | xargs kill

以上,拉取mysql數據庫數據到Kafka就配置好了


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM