Kafka connect快速構建數據ETL通道


摘要: 作者:Syn良子 出處:http://www.cnblogs.com/cssdongl 轉載請注明出處

業余時間調研了一下Kafka connect的配置和使用,記錄一些自己的理解和心得,歡迎指正.

一.背景介紹

Kafka connect是Confluent公司(當時開發出Apache Kafka的核心團隊成員出來創立的新公司)開發的confluent platform的核心功能.

大家都知道現在數據的ETL過程經常會選擇kafka作為消息中間件應用在離線和實時的使用場景中,而kafka的數據上游和下游一直沒有一個

無縫銜接的pipeline來實現統一,比如會選擇flume或者logstash采集數據到kafka,然后kafka又通過其他方式pull或者push數據到目標存儲.

而kafka connect旨在圍繞kafka構建一個可伸縮的,可靠的數據流通道,通過kafka connect可以快速實現大量數據進出kafka從而和其

他源數據源或者目標數據源進行交互構造一個低延遲的數據pipeline.給個圖更直觀點,大家感受下.

0

二.Kafka-connect快速配置

這里Confluent官方很貼心的提供了一個集成的鏡像以便quickstart,如下鏈接

https://s3-us-west-2.amazonaws.com/confluent-files/kafka_connect_blog.ova

這是存儲在Amazon S3上的,直接點擊即可下載.這里我使用VMWare直接打開,剛開始會提示一個錯誤,不用管它直接點擊重試即可

系統加載的過程中會默認初始化虛擬機的網絡配置,這里我建議提前設置好橋接網絡,讓該虛擬機使用橋接網絡初始化.

加載成功后,登錄進入該Ubuntu系統,默認的用戶名和密碼都是:vagrant.

然后ls查看vagrant用戶目錄,查看幾個關鍵的腳本內容后,我分別介紹它們的功能

1>setup.sh:自動下載mysql,mysql jdbc driver,配置好mysql以及做為hive的metastore

2>start.sh:啟動confluent platform,kafka,hadoop,hive相關服務

3>clean_up.sh:和start.sh相反的,會關閉掉所有的服務,而且還會刪除掉所有的數據(例如hdfs namenode和 datanode的數據,其實相當於fs format了)

那么很明顯,第一步肯定是執行setup.sh,這里執行后會報錯如下

setupFailed

這里無法下載相關的軟件包,好吧,那么我們需要更新一下下載源的索引,執行如下命令

sudo apt-get update

更新完畢后再次執行setup.sh安裝好mysql,hive等服務

緊接着執行start.sh來啟動上述服務,啟動后應該有如下進程,這是一個偽分布式節點

jpsService

對了,虛擬機各個服務(例如hive,zookeeper等),配置文件和日志文件在路徑/mnt/下,組件的安裝位置位於/opt下

三.Kafka connect快速使用

配置完以后就可以准備使用kafka-connect來快速構建一個數據pipeline了,如下圖所示

wholePic

整個過程是將數據以mysql作為數據源,將數據通過kafka connect快速ETL到hive中去.注意這里圖中沒畫kafka

但是實際上是包含在kafka connect里面的,話不多說,開始使用

1>Mysql數據准備

執行如下命令

$ mysql -u root --password="mypassword"
mysql> CREATE DATABASE demo;
mysql> USE demo;
mysql> CREATE TABLE users (
    ->   id serial NOT NULL PRIMARY KEY,
    ->   name varchar(100),
    ->   email varchar(200),
    ->   department varchar(200),
    ->   modified timestamp default CURRENT_TIMESTAMP NOT NULL,
    ->   INDEX `modified_index` (`modified`)
    -> );
mysql> INSERT INTO users (name, email, department) VALUES ('alice', 'alice@abc.com', 'engineering');
mysql> INSERT INTO users (name, email, department) VALUES ('bob', 'bob@abc.com', 'sales');
mysql> exit;

注意上面第一行,--password="mypassword" ,對,你沒看錯,這里虛擬機mysql的root默認密碼就是mypassword,

強迫症患者請自行更改.隨后建庫,建表,插入數據.

2>關鍵概念准備

這里我快速普及一下參考官方文檔理解的一些關鍵概念.

kafka connector:kafka connector是kafka connect的關鍵組成部分,它是一個邏輯上的job,用於在kafka和其他系統之間拷貝數據,比如

從上游系統拷貝數據到kafka,或者從kafka拷貝數據到下游系統

Tasks:每個kafka connector可以初始化一組task進行數據的拷貝

Workers:邏輯上包含kafka connector和tasks用來調度執行具體任務的進程,具體執行時分為standalone模式和distributed模式

見下圖,這個是kafka上游的數據stream過來后,定義好對應的kafka connector后,分解為一組tasks然后push數據到kafka的不同topic

kafkaConnectors

3>利用Kafka-connect攝取數據

主要是通過配置來實現從mysql攝取數據到kafka,然后按照topic來獲取數據寫入hdfs,命令如下

connect-standalone /mnt/etc/connect-avro-standalone.properties \
 /mnt/etc/mysql.properties /mnt/etc/hdfs.properties &

注意上面這些properties文件是虛擬機已經事先配置好的,可以直接執行實現數據的攝取

當前使用的kafka connect的standalone模式,當然還有distributed模式后續可以嘗試

上面的那條命令的格式是這樣:

connect-standalone worker.properties connector1.properties [connector2.properties connector3.properties ...]

主要解釋一下connect-standalone后面的參數

worker.properties:就是上面提到過的worker進程的配置文件,可以定義kafka cluster的相關信息以及數據序列化的格式.

隨后的一些參數就是kafka connector的配置參數了,比如上面的mysql.properties定義了一個kafka jdbc connector,用來同步mysql數據到kafka

最后一個hdfs.properties是kafka hdfs connector的配置文件,用來消費kafka topic數據push到hdfs.

那么執行這條命令后就可以將mysql的數據通過kafka connect快速ETL到hdfs了.

最后可以通過hive創建外表映射hdfs上的數據文件,然后在hive中查看對應數據,如下

$ hive
hive> SHOW TABLES;
OK
test_jdbc_users
hive> SELECT * FROM test_jdbc_users;
OK
1 alice alice@abc.com engineering 1450305345000
2 bob   bob@abc.com   sales       1450305346000

四.Kafka connect使用總結

1>Kafka connect的使用其實就是配置不同的kafka connectors,這里大家可以把kafka作為中間組件,然后可以類比flume理解,kafka上游的

connector其實就是fllume的source從上游數據源sink到kafka,kafka的下游connector其實就是flume的source是kafka,sink到下游系統.

2>Kafka connect的數據pipeline要打通,它要求數據遵守confluent自己的一套通用的schema機制,細心的同學會發現上面jps后會有個進程名

SchemaRegistryMain,這里官方默認使用Avro格式進出Kafka,所以要留意worker.properties文件的配置信息.

3>我在使用中沒有發現Flume 相關的connector,因此很好奇它應該是沒有實現上游flume conector的屬性配置。問題應該出在Flume的數據是基

於event的,而和上面2中所說的schema定義格式沒有很好的兼容.

4>kafka connect的distributed模式應該更實用,隨后會嘗試,以及confluent所支持的實時處理流kafka streams.

參考資料:http://docs.confluent.io/2.0.0/platform.html


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM