flume實時采集mysql數據到kafka中並輸出


環境說明

  • centos7(運行於vbox虛擬機)
  • flume1.9.0(flume-ng-sql-source插件版本1.5.3)
  • jdk1.8
  • kafka(版本忘了后續更新)
  • zookeeper(版本忘了后續更新)
  • mysql5.7.24
  • xshell

准備工作

flume安裝

暫略,后續更新

flume簡介

Apache Flume是一個分布式的、可靠的、可用的系統,用於有效地收集、聚合和將大量日志數據從許多不同的源移動到一個集中的數據存儲。在大數據生態圈中,flume經常用於完成數據采集的工作。

其實時性很高,延遲大約1-2s,可以做到准實時。

又因為mysql是程序員常用的數據庫,所以以flume實時采集mysql數據庫為例子。要了解flume如何采集數據,首先要初探其架構:

Flume 運行的核心是 Agent。Flume以agent為最小的獨立運行單位。一個agent就是一個JVM。它是一個完整的數據收集工具,含有三個核心組件,分別是

source、 channel、 sink。通過這些組件, Event 可以從一個地方流向另一個地方,如下圖所示。

三大組件

source

Source是數據的收集端,負責將數據捕獲后進行特殊的格式化,將數據封裝到事件(event) 里,然后將事件推入Channel中。

Flume提供了各種source的實現,包括Avro Source、Exce Source、Spooling Directory Source、NetCat Source、Syslog Source、Syslog TCP Source、Syslog UDP Source、HTTP Source、HDFS Source等。如果內置的Source無法滿足需要, Flume還支持自定義Source。

可以看到原生flume的source並不支持sql source,所以我們需要添加插件,后續將提到如何添加。

channel

Channel是連接Source和Sink的組件,大家可以將它看做一個數據的緩沖區(數據隊列),它可以將事件暫存到內存中也可以持久化到本地磁盤上, 直到Sink處理完該事件。

Flume對於Channel,則提供了Memory Channel、JDBC Chanel、File Channel,etc。

  • MemoryChannel可以實現高速的吞吐,但是無法保證數據的完整性。
  • MemoryRecoverChannel在官方文檔的建議上已經建義使用FileChannel來替換。
  • FileChannel保證數據的完整性與一致性。在具體配置不現的FileChannel時,建議FileChannel設置的目錄和程序日志文件保存的目錄設成不同的磁盤,以便提高效率。

sink

Flume Sink取出Channel中的數據,進行相應的存儲文件系統,數據庫,或者提交到遠程服務器。

Flume也提供了各種sink的實現,包括HDFS sink、Logger sink、Avro sink、File Roll sink、Null sink、HBase sink,etc。

Flume Sink在設置存儲數據時,可以向文件系統中,數據庫中,hadoop中儲數據,在日志數據較少時,可以將數據存儲在文件系中,並且設定一定的時間間隔保存數據。在日志數據較多時,可以將相應的日志數據存儲到Hadoop中,便於日后進行相應的數據分析。

這個例子中,我使用了kafka作為sink

下載flume-ng-sql-source插件

這里下載flume-ng-sql-source,最新版本是1.5.3。

下載完后解壓,我通過idea運行程序,使用maven打包為jar包,改名為flume-ng-sql-source-1.5.3.jar

編譯完的jar包要放在放到FLUME_HOME/lib下,FLUME_HOME是自己linux下flume的文件夾,比如我的是 /opt/install/flume

kafka安裝

我們使用flume將數據采集到kafka, 並啟動一個kafak的消費監控,就能看到實時數據了

jdk1.8安裝

暫略,后續更新

zookeeper安裝

暫略,后續更新

kafka安裝

暫略,后續更新

mysql5.7.24安裝

暫略,后續更新

flume抽取mysql數據到kafka實戰

新建一個數據庫和表

在完成上述的安裝工作后就可以開始着手實現demo了

首先我們要抓取mysql的數據,那么必然需要一個數據庫和表,並且要記住這個數據庫和表的名字,之后這些信息要寫入flume的配置文件。

創建數據庫:

create database test

創建表:

create table fk(
id int UNSIGNED AUTO_INCREMENT,
name VARCHAR(100) NOT NULL,
PRIMARY KEY ( id )
);

新增配置文件(重要)

cd 到flume的conf文件夾中,新增一個文件mysql-flume.conf

[root@localhost ~]# cd /opt/install/flume
[root@localhost flume]# ls
bin        conf      doap_Flume.rdf  lib      NOTICE     RELEASE-NOTES  tools
CHANGELOG  DEVNOTES  docs            LICENSE  README.md  status
[root@localhost flume]# cd conf
[root@localhost conf]# ls
flume-conf.properties.template  log4j.properties
flume-env.ps1.template          mysql-connector-java-5.1.35
flume-env.sh                    mysql-connector-java-5.1.35.tar.gz
flume-env.sh.template           mysql-flume.conf

注:mysql-flume.conf本來是沒有的,是我生成的,具體配置如下所示

在這個文件中寫入:

a1.channels = ch-1
a1.sources = src-1
a1.sinks = k1
###########sql source#################
# For each one of the sources, the type is defined
a1.sources.src-1.type = org.keedio.flume.source.SQLSource
a1.sources.src-1.hibernate.connection.url = jdbc:mysql://youhost:3306/test?useUnicode=true&characterEncoding=utf-8&useSSL=false
# Hibernate Database connection properties
a1.sources.src-1.hibernate.connection.user = root
a1.sources.src-1.hibernate.connection.password = xxxxxxxx
a1.sources.src-1.hibernate.connection.autocommit = true
a1.sources.src-1.hibernate.dialect = org.hibernate.dialect.MySQL5Dialect
a1.sources.src-1.hibernate.connection.driver_class = com.mysql.jdbc.Driver
a1.sources.src-1.run.query.delay=5000
a1.sources.src-1.status.file.path = /opt/install/flume/status
a1.sources.src-1.status.file.name = sqlSource.status
# Custom query
a1.sources.src-1.start.from = 0
a1.sources.src-1.custom.query = select `id`, `name` from fk
a1.sources.src-1.batch.size = 1000
a1.sources.src-1.max.rows = 1000
a1.sources.src-1.hibernate.connection.provider_class = org.hibernate.connection.C3P0ConnectionProvider
a1.sources.src-1.hibernate.c3p0.min_size=1
a1.sources.src-1.hibernate.c3p0.max_size=10

################################################################
a1.channels.ch-1.type = memory
a1.channels.ch-1.capacity = 10000
a1.channels.ch-1.transactionCapacity = 10000
a1.channels.ch-1.byteCapacityBufferPercentage = 20
a1.channels.ch-1.byteCapacity = 800000

################################################################
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.topic = testTopic
a1.sinks.k1.brokerList = 10.100.4.6:9092
a1.sinks.k1.requiredAcks = 1
a1.sinks.k1.batchSize = 20

這是我的文件,其中一些隱私信息已被我用其他字符串替代了,在寫mysql-flume.conf時你可以復制上面的一段代碼。下面是這段代碼的詳細注釋,你可以更加帶注釋版本的代碼來修改自己的conf文件

# a1表示agent的名稱
# source是a1的輸入源
# channels是緩沖區
# sinks是a1輸出目的地,本例子sinks使用了kafka
a1.channels = ch-1
a1.sources = src-1
a1.sinks = k1
###########sql source#################
# For each one of the sources, the type is defined
a1.sources.src-1.type = org.keedio.flume.source.SQLSource
# 連接mysql的一系列操作,youhost改為你虛擬機的ip地址,可以通過ifconfig或者ip addr查看
# url中要加入?useUnicode=true&characterEncoding=utf-8&useSSL=false,否則有可能連接失敗
a1.sources.src-1.hibernate.connection.url = jdbc:mysql://youhost:3306/test?useUnicode=true&characterEncoding=utf-8&useSSL=false
# Hibernate Database connection properties
# mysql賬號,一般都是root
a1.sources.src-1.hibernate.connection.user = root
# 填入你的mysql密碼
a1.sources.src-1.hibernate.connection.password = xxxxxxxx
a1.sources.src-1.hibernate.connection.autocommit = true
# mysql驅動
a1.sources.src-1.hibernate.dialect = org.hibernate.dialect.MySQL5Dialect
# 驅動版本過低會無法使用,驅動安裝下文會提及
a1.sources.src-1.hibernate.connection.driver_class = com.mysql.jdbc.Driver
a1.sources.src-1.run.query.delay=5000
# 存放status文件
a1.sources.src-1.status.file.path = /opt/install/flume/status
a1.sources.src-1.status.file.name = sqlSource.status
# Custom query
a1.sources.src-1.start.from = 0
# 填寫需要采集的數據表信息,你也可以使用下面的方法:
# agent.sources.sql-source.table =table_name
# agent.sources.sql-source.columns.to.select = *
a1.sources.src-1.custom.query = select `id`, `name` from fk
a1.sources.src-1.batch.size = 1000
a1.sources.src-1.max.rows = 1000
a1.sources.src-1.hibernate.connection.provider_class = org.hibernate.connection.C3P0ConnectionProvider
a1.sources.src-1.hibernate.c3p0.min_size=1
a1.sources.src-1.hibernate.c3p0.max_size=10

################################################################
a1.channels.ch-1.type = memory
a1.channels.ch-1.capacity = 10000
a1.channels.ch-1.transactionCapacity = 10000
a1.channels.ch-1.byteCapacityBufferPercentage = 20
a1.channels.ch-1.byteCapacity = 800000

################################################################
# 使用kafka
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
# 這個項目中你創建的或使用的topic名字
a1.sinks.k1.topic = testTopic
# kafka集群,broker列表,由於我沒有使用集群所以只有一個
# 如果你搭建了集群,代碼如下:agent.sinks.k1.brokerList = kafka-node1:9092,kafka-node2:9092,kafka-node3:9092
a1.sinks.k1.brokerList = 10.100.4.6:9092
a1.sinks.k1.requiredAcks = 1
a1.sinks.k1.batchSize = 20

添加mysql驅動到flume的lib目錄下

wget https://dev.mysql.com/get/Downloads/Connector-J/mysql-connector-java-5.1.35.tar.gz

tar xzf mysql-connector-java-5.1.35.tar.gz

cp mysql-connector-java-5.1.35-bin.jar /你flume的位置/lib/

啟動zookeeper

啟動kafka前要啟動zookeeper

cd 到zookeeper的bin目錄下

啟動:

./zkServer.sh start

等待運行

./zkCli.sh

啟動kafka

xshell中打開一個新窗口,cd到kafka目錄下,啟動kafka

bin/kafka-server-start.sh config/server.properties &

新建一個topic

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic testTopic

注1:testTopic就是你使用的topic名稱,這個和上文mysql-flume.conf里的內容是對應的。

注2:可以使用bin/kafka-topics.sh --list --zookeeper localhost:2181來查看已創建的topic。

啟動flume

xshell中打開一個新窗口,cd到flume目錄下,啟動flume

bin/flume-ng agent -n a1 -c conf -f conf/mysql-flume.conf -Dflume.root.logger=INFO,console

等待他運行,同時我們可以打開一個新窗口連接數據庫,使用我們新建的test數據庫和fk表。

實時采集數據

flume會實時采集數據到kafka中,我們可以啟動一個kafak的消費監控,用於查看mysql的實時數據

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic testTopic --from-beginning

這時就可以查看數據了,kafka會打印mysql中的數據

然后我們更改數據庫中的一條數據,新讀取到的數據也會變更

before:

after:


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM