Flume+Kafka獲取MySQL數據


摘要

MySQL被廣泛用於海量業務的存儲數據庫,在大數據時代,我們亟需對其中的海量數據進行分析,但在MySQL之上進行大數據分析顯然是不現實的,這會影響業務系統的運行穩定。如果我們要實時地分析這些數據,則需要實時地將其復制到適合OLAP的數據系統上。本文介紹一種數據采集工具——Flume,由cloudera軟件公司於2009年被捐贈了apache軟件基金會,現已成為apache top項目之一。本文使用Flume構建一個對MySQL數據的采集,並投遞到Kafka的一個鏈路。

環境依賴

  • Java 8+
  • Flume 1.9
  • Kafka 2.7
  • ZooKeeper 3.6
  • MySQL 5.7.23
  • Git

一、下載&打包flume-ng-sql-source插件

1、下載

$ cd /data/gitproject
$ git clone git@github.com:keedio/flume-ng-sql-source.git
$ cd flume-ng-sql-source
$ vim pom.xml

#在
#<build>
#        <plugins>
#內添加如下內容,maven-dependency-plugin是一個可以將當前maven項目所有依賴的jar文件提取到指定文件夾的工具
            <plugin>
                <artifactId>maven-dependency-plugin</artifactId>
                <configuration>
                    <outputDirectory>$FLUME_HOME/lib</outputDirectory>
                    <excludeTransitive>false</excludeTransitive>
                    <stripVersion>true</stripVersion>
                </configuration>
            </plugin>
            ...

# outputDirectory是指定jar包提取路徑,表示target目錄。如果不寫的話,將在根目錄下創建lib目錄。
# excludeTransitive,表示是否不包含間接依賴的包。
# stripVersion表示復制的jar文件去掉版本信息。

# 執行如下maven命令,所有依賴的jar將提取到lib目錄:
$ mvn dependency:copy-dependencies


2、打包

通過maven打jar包:

# 編譯Java項目
$ mvn clean
$ mvn compile
$ mvn package

# 執行完畢后,在./target文件夾下將會有構建好的項目對應的jar包
root@ubuntu-master:/data/gitproject/flume-ng-sql-source/target# ll
total 9088
drwxr-xr-x 11 root root    4096 2月   8 19:53 ./
drwxr-xr-x  5 root root    4096 2月   8 20:21 ../
drwxr-xr-x  3 root root    4096 2月   8 19:53 apidocs/
drwxr-xr-x  3 root root    4096 2月   8 19:50 classes/
-rw-r--r--  1 root root 9163594 2月   8 19:52 flume-ng-sql-source-1.5.3-SNAPSHOT.jar #這個就是我們要的jar包
-rw-r--r--  1 root root   59249 2月   8 19:53 flume-ng-sql-source-1.5.3-SNAPSHOT-javadoc.jar
-rw-r--r--  1 root root   10856 2月   8 19:53 flume-ng-sql-source-1.5.3-SNAPSHOT-sources.jar
drwxr-xr-x  3 root root    4096 2月   8 19:50 generated-sources/
drwxr-xr-x  3 root root    4096 2月   8 19:52 generated-test-sources/
drwxr-xr-x  2 root root    4096 2月   8 19:53 javadoc-bundle-options/
drwxr-xr-x  2 root root    4096 2月   8 19:52 maven-archiver/
drwxr-xr-x  3 root root    4096 2月   8 19:50 maven-status/
-rw-r--r--  1 root root   17724 2月   8 19:52 original-flume-ng-sql-source-1.5.3-SNAPSHOT.jar
drwxr-xr-x  2 root root    4096 2月   8 19:52 surefire-reports/
drwxr-xr-x  3 root root    4096 2月   8 19:52 test-classes/

3、復制jar包到$FLUME_HOME/lib目錄

$ cp /data/gitproject/flume-ng-sql-source/target/flume-ng-sql-source-1.5.3-SNAPSHOT.jar $FLUME_HOME/lib/flume-ng-sql-source-1.5.3.jar

注:$FLUME_HOME是自己linux下flume的文件夾,比如我的是/usr/local/flume_1.9.0

二、新建數據庫和表

接下來我們要使用Flume采集MySQL數據庫中的相關表數據。

這里我們新建一個數據庫和表,並且要記住這個數據庫和表的名字,之后這些信息要寫入Flume的配置文件。

創建數據庫:

create database test;

創建表:

use test;
create table fk (
id int UNSIGNED AUTO_INCREMENT,
name VARCHAR(100) NOT NULL,
age int NOT NULL,
PRIMARY KEY ( id )
);

三、新增配置文件(重要)

在flume的conf文件夾中,新增一個文件mysql-flume.conf

$ cd /usr/local/flume_1.9.0/conf
$ vim mysql-flume.conf

注:mysql-flume.conf本來是沒有的,是我生成的,具體配置如下所示

# a1表示agent的名稱
# source是a1的輸入源
# channels是緩沖區
# sinks是a1輸出目的地,本例子sinks使用了kafka
a1.channels = ch-1
a1.sources = src-1
a1.sinks = k1
###########sql source#################
# For each one of the sources, the type is defined
a1.sources.src-1.type = org.keedio.flume.source.SQLSource
# 連接mysql的一系列操作,youhost改為你虛擬機的ip地址,可以通過ifconfig或者ip addr查看
# url中要加入?useUnicode=true&characterEncoding=utf-8&useSSL=false,否則有可能連接失敗
a1.sources.src-1.hibernate.connection.url = jdbc:mysql://youhost:3306/test?useUnicode=true&characterEncoding=utf-8&useSSL=false
# Hibernate Database connection properties
# mysql賬號,一般都是root
a1.sources.src-1.hibernate.connection.user = root
# 填入你的mysql密碼
a1.sources.src-1.hibernate.connection.password = xxxxxxxx
a1.sources.src-1.hibernate.connection.autocommit = true
# mysql驅動
a1.sources.src-1.hibernate.dialect = org.hibernate.dialect.MySQL5Dialect
# 驅動版本過低會無法使用,驅動安裝下文會提及
a1.sources.src-1.hibernate.connection.driver_class = com.mysql.jdbc.Driver
a1.sources.src-1.run.query.delay=5000
# 存放status文件
a1.sources.src-1.status.file.path = /opt/install/flume/status
a1.sources.src-1.status.file.name = sqlSource.status
# Custom query
a1.sources.src-1.start.from = 0
# 填寫需要采集的數據表信息,你也可以使用下面的方法:
# agent.sources.sql-source.table =table_name
# agent.sources.sql-source.columns.to.select = *
a1.sources.src-1.custom.query = select `id`, `name` from fk
a1.sources.src-1.batch.size = 1000
a1.sources.src-1.max.rows = 1000
a1.sources.src-1.hibernate.connection.provider_class = org.hibernate.connection.C3P0ConnectionProvider
a1.sources.src-1.hibernate.c3p0.min_size=1
a1.sources.src-1.hibernate.c3p0.max_size=10

################################################################
a1.channels.ch-1.type = memory
a1.channels.ch-1.capacity = 10000
a1.channels.ch-1.transactionCapacity = 10000
a1.channels.ch-1.byteCapacityBufferPercentage = 20
a1.channels.ch-1.byteCapacity = 800000

################################################################
# 使用kafka
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
# 這個項目中你創建的或使用的topic名字
a1.sinks.k1.topic = testTopic
# kafka集群,broker列表,由於我沒有使用集群所以只有一個
# 如果你搭建了集群,代碼如下:agent.sinks.k1.brokerList = kafka-node1:9092,kafka-node2:9092,kafka-node3:9092
# Flume 1.6- 的寫法
#a1.sinks.k1.brokerList = 192.168.1.113:9092,192.168.1.114:9092,192.168.1.115:9092
# Flume 1.7+ 的寫法
a1.sinks.k1.kafka.bootstrap.servers=192.168.1.113:9092,192.168.1.114:9092,192.168.1.115:9092
a1.sinks.k1.requiredAcks = 1
a1.sinks.k1.batchSize = 20

# Bind the source and sink to the channel
a1.sinks.k1.channel=ch-1
a1.sources.src-1.channels=ch-1

四、添加mysql驅動到flume的lib目錄下

$ cd /data
$ wget https://dev.mysql.com/get/Downloads/Connector-J/mysql-connector-java-5.1.49.tar.gz
$ tar -zxvf mysql-connector-java-5.1.49.tar.gz
$ cp /data/mysql-connector-java-5.1.49/mysql-connector-java-5.1.49-bin.jar $FLUME_HOME/lib/

五、啟動相關組件

安裝Zookeeper詳細請參考:JasonCengBlog/zookeeper/20210206_Linux下搭建ZooKeeper集群.md

1、啟動ZooKeeper

所有節點上啟動zkServer

$ cd $ZOOKEEPER_HOME/bin
$ zkServer.sh start

2、啟動Kafka

安裝Kafka詳細請參考:JasonCengBlog/Kafka/20210207_Linux下搭建kafka集群.md

新開一個終端窗口

#從后台啟動Kafka集群(3台都需要啟動)
$ cd $KAFKA_HOME/bin #進入到kafka的bin目錄 
$ ./kafka-server-start.sh -daemon ../config/server.properties

新建一個topic

$ cd $KAFKA_HOME
$ bin/kafka-topics.sh --create --zookeeper 192.168.1.113:2181,192.168.1.114:2181,192.168.1.115:2181 --replication-factor 1 --partitions 1 --topic testTopic
  • 注1:testTopic就是你使用的topic名稱,這個和上文mysql-flume.conf里的內容是對應的。

  • 注2:可以使用bin/kafka-topics.sh --list --zookeeper 192.168.1.113:2181,192.168.1.114:2181,192.168.1.115:2181來查看已創建的topic。

3、啟動flume

新開一個終端窗口

$ cd $FLUME_HOME
$ bin/flume-ng agent -n a1 -c conf -f conf/mysql-flume.conf -Dflume.root.logger=INFO,console

4、啟動mysql終端

新開一個終端窗口

$ mysql -u mysql -p
mysql> 

六、實時采集數據

1、啟動kafka消費者查看flume采集的MySQL數據

flume會實時采集數據到kafka中,我們可以啟動一個kafak的消費監控,用於查看mysql的實時數據。

$ cd $KAFKA_HOME
$ bin/kafka-console-consumer.sh --bootstrap-server 192.168.1.113:9092,192.168.1.114:9092,192.168.1.115:9092 --topic testTopic --from-beginning

我們在MySQL數據庫中新插入(1,'xiaohong',18),(2,'xiaobai',20),這時就可以在Kafka消費者終端查看數據了,kafka會打印Flume所采集的MySQL中的數據。

2、修改MySQL數據,查看變化

我們在數據庫中新增一條數據,Kafka消費者新讀取到的Flume所采集的數據也會變更。

如我們新插入一條數據為(3,'heiheiheiheihei',22),Kafka消費者終端會打印如下:

參考

[1]flume實時采集mysql數據到kafka中並輸出[https://www.cnblogs.com/kylinxxx/p/14137607.html]
[2]Flume常見錯誤整理(持續更新ing...)[https://blog.csdn.net/dr_guo/article/details/52193881]:參考其mysql-flume.conf配置
[3]https://github.com/keedio/flume-ng-sql-source
[4]https://cwiki.apache.org/confluence/display/FLUME/Getting+Started
[5]在linux上編譯maven工程[https://www.jianshu.com/p/672be18b0f54]
[6]利用maven將項目依賴的jar提取到指定文件夾[https://blog.51cto.com/keeplearning/1225581]


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM