flume實時采集mysql數據到kafka


1.flume連接mysql驅動包准備

進入鏈接下載源碼
https://github.com/keedio/flume-ng-sql-source
現在最新是1.5.3
在這里插入圖片描述
解壓,
在這里插入圖片描述

在這里插入圖片描述
進入到目錄中編譯

直接編譯可能報錯,跳過test

mvn package  -DskipTests 

2.flume與kafka,mysql集成

agent.sources = sql-source
agent.sinks = k1
agent.channels = ch
#這個是flume采集mysql的驅動,git地址https://github.com/keedio/flume-ng-sql-source,
#需要自己編譯,編譯完成后,將flume-ng-sql-source-1.x.x.jar包放到FLUME_HOME/lib下,
#如果是CM下CDH版本的flume,則放到/opt/cloudera/parcels/CDH-xxxx/lib/flume-ng/lib下
agent.sources.sql-source.type= org.keedio.flume.source.SQLSource
# URL to connect to database (currently only mysql is supported)
#?useUnicode=true&characterEncoding=utf-8&useSSL=false參數需要加上
agent.sources.sql-source.hibernate.connection.url=jdbc:mysql://hostname:3306/yinqing?useUnicode=true&characterEncoding=utf-8&useSSL=false

# Database connection properties
agent.sources.sql-source.hibernate.connection.user=root
agent.sources.sql-source.hibernate.connection.password =password

agent.sources.sql-source.hibernate.dialect = org.hibernate.dialect.MySQLDialect
#需要將mysql-connector-java-X-bin.jar放到FLUME_HOME/lib下,
#如果是CM下CDH版本的flume,則放到/opt/cloudera/parcels/CDH-xxxx/lib/flume-ng/lib下
#此處直接提供5.1.48版本(理論mysql5.x的都可以用)的
#wget https://dev.mysql.com/get/Downloads/Connector-J/mysql-connector-java-5.1.48.tar.gz
#注意,mysql驅動版本太低會報錯:
#org.hibernate.exception.JDBCConnectionException: Error calling DriverManager#getConnection
agent.sources.sql-source.hibernate.driver_class = com.mysql.jdbc.Driver
agent.sources.sql-source.hibernate.connection.autocommit = true
#填寫你需要采集的數據表名字 
agent.sources.sql-source.table =table_name
agent.sources.sql-source.columns.to.select = * 
# Query delay, each configured milisecond the query will be sent
agent.sources.sql-source.run.query.delay=10000
# Status file is used to save last readed row
#儲存flume的狀態數據,因為是增量查找
agent.sources.sql-source.status.file.path = /var/lib/flume-ng
agent.sources.sql-source.status.file.name = sql-source.status

#kafka.sink配置,此處是集群,需要zookeeper和kafka集群的地址已經端口號,不懂的,看后面kafka的配置已經介紹
agent.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
agent.sinks.k1.topic = yinqing
agent.sinks.k1.brokerList = kafka-node1:9092,kafka-node2:9092,kafka-node3:9092
agent.sinks.k1.batchsize = 200
agent.sinks.kafkaSink.requiredAcks=1
agent.sinks.k1.serializer.class = kafka.serializer.StringEncoder
#此處的zookeeper端口根據配置來,我配的是2180,基本應該是2181
agent.sinks.kafkaSink.zookeeperConnect=zookeeper-node1:2180,zookeeper-node2:2180,zookeeper-node3:2180
agent.channels.ch.type = memory
agent.channels.ch.capacity = 10000
agent.channels.ch.transactionCapacity = 10000
agent.channels.hbaseC.keep-alive = 20

agent.sources.sql-source.channels = ch
agent.sinks.k1.channel = ch

conf文件說明

參數說明:

       --conf conf/  :表示配置文件存儲在conf/目錄

       --name a1      :表示給agent起名為a1

       --conf-file job/flume-telnet.conf :flume本次啟動讀取的配置文件是在job文件夾下的flume-telnet.conf文件。

       -Dflume.root.logger==INFO,console :-D表示flume運行時動態修改flume.root.logger參數屬性值,並將控制台日志打印級別設置為INFO級別。日志級別包括:log、info、warn、error。

 

#開啟flume服務
flume-ng agent -n agent -c ./ -f ./mysql-flume-kafka.conf -Dflume.root.logger=DEBUG,console

 

git地址https://github.com/keedio/flume-ng-sql-source 下載的也要注意版本,不然conf文件中會找不到對應的配置報錯

 

 啟動一個kafak的消費監控:

kafka-console-consumer --bootstrap-server app02:9092 --from-beginning --topic topic_start

 

 

 往監控的mysql表中插入數據即可,只能獲取新增的數據,更新的數據還不能識別。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM