Kafka連接器建立數據管道


1.概述

最近,有同學留言咨詢Kafka連接器的相關內容,今天筆者給大家分享一下Kafka連接器建立數據管道的相關內容。

2.內容

Kafka連接器是一種用於Kafka系統和其他系統之間進行功能擴展、數據傳輸的工具。通過Kafka連接器能夠簡單、快速的將大量數據集移入到Kafka系統,或者從Kafka系統中移出,例如Kafka連接器可以低延時的將數據庫或者應用服務器中的指標數據收集到Kafka系統主題中。
另外,Kafka連接器可以通過作業導出的方式,將Kafka系統主題傳輸到二次存儲和查詢系統中,或者傳輸到批處理系統中進行離線分析。

2.1 使用場景

Kafka連接器通常用來構建數據管道,一般來說有兩種使用場景。
1. 開始和結束的端點
第一種,將Kafka系統作為數據管道的開始和結束的端點。例如,將Kafka系統主題中的數據移出到HBase數據庫,或者把Oracle數據庫中的數據移入到Kafka系統。

 

 

 

2. 數據傳輸的中間介質
第二種,把Kafka系統作為一個中間傳輸介質。例如,為了把海量日志數據存儲到ElasticSearch中,可以先把這些日志數據傳輸到Kafka系統中,然后再從Kafka系統中將這些數據移出到ElasticSearch進行存儲。

ElasticSearch是一個基於Lucene(Lucene是一款高性能、可擴展的信息檢索工具庫)實現的存儲介質。它提供了一個分布式多用戶能力的全文搜索引擎,基RESTful(一種軟件架構風格、設計風格,但是並非標准,只是提供了一組設計原則和約束條件)接口實現。

 

 

 

Kafka連接器的存在,給數據管道帶來很重要的價值。例如,Kafka連接器可以作為數據管道各個數據階段的緩沖區,有效的將消費者實例和生產者實例進行解耦。
Kafka系統解除耦合的能力、系統的安全性、數據處理的效率等方面均表現不俗,因而使用Kafka連接器來構建數據管道是一個最佳的選舉。

2.2 特性和優勢

Kafka連接器包含一些重要的特性,並且給數據管道提供了一個成熟穩定的框架。同時,Kafka連接器還提供了一些簡單易用的工具庫,大大降低的開發人員的研發成本。
1. 特性
Kafka連接器具體包含的特性如下。

  • 通用框架:Kafka連接器制定了一種標准,用來約束Kafka系統與其他系統集成,簡化了Kafka連接器的開發、部署和管理;
  • 單機模式和分布式模式:Kafka連接器支持兩種模式,既能擴展到支持大型集群的服務管理,也可以縮小到開發、測試等小規模的集群;
  • REST接口:使用REST API來提交請求並管理Kafka集群;
  • 自動管理偏移量:通過連接器的少量信息,Kafka連接器可以自動管理偏移量;
  • 分布式和可擴展:Kafka連接器是建立在現有的組管理協議上,通過添加更多的連接器實例來水平擴展,實現分布式服務;
  • 數據流和批量集成:利用Kafka系統已有的能力,Kafka連接器是橋接數據流和批處理系統的一種理想的解決方案。

2. 優勢
在Kafka連接器中有兩個核心的概念,它們分別是Source和Sink。其中Source負責將數據導入到Kafka系統,而Sink則負責將數據從Kafka系統中進行導出。
Source和Sink在實現數據導入和導出的過程被稱之連接器,即Source連接器和Sink連接器。這兩種連接器提供了對業務層面數據讀取和寫入的抽象接口,簡化了生命周期的管理工作。
在處理數據時,Source連接器和Sink連接器會初始化各自的任務,並將數據結構進行標准化的封裝。在實際應用場景中,不同的業務中的數據格式是不一樣的,因此,Kafka連接器通過注冊數據結構,來解決數據格式驗證和兼容性問題。
當數據源發生變化時,Kafka連接器會生成新的數據結構,通過不同的處理策略來完成對數據格式的兼容。

2.3 核心概念

在Kafka連接器中存在幾個核心的概念,它們分別是連接器實例(Connectors)、任務數(Tasks)、事件線程數(Workers)、轉換器(Converters)。
1. 連機器實例
在Kafka連接器中,連接器實例決定了消息數據的流向,即消息數據從何處復制,以及將復制的消息數據寫入到何處。
一個連接器實例負責Kafka系統與其他系統之間的邏輯處理,連接器實例通常以JAR包的形式存在,通過實現Kafka系統應用接口來完成。
2. 任務數
在分布式模式下,每一個連接器實例可以將一個作業切分成多個任務(Task),然后再將任務分發到各個事件線程(Worker)中去執行。任務不會保存當前的狀態信息,通常由特定的Kafka主題來保存,例如指定具體屬性offset.storage.topic和status.storage.topic的值來保存。
在分布式模式中,會存在任務均衡的概念。當一個連接器實例首次提交到Kafka集群,所有的事件線程都會做一個任務均衡的操作,來保證每一個事件線程都運行差不多數量的任務,避免所有任務集中到某一個事件線程。
3. 事件線程
在Kafka系統中,連接器實例和任務數都是邏輯層面的,需要有具體的線程來執行。在Kafka連接器中,事件線程就是用來執行具體的任務,事件線程包含兩種,分別是單機模式和分布式模式。
4. 轉換器
轉換器會將字節數據轉換成Kafka連接器內部的格式,同時,也能將Kafka連接器內部存儲的數據格式轉換成字節數據。

3.操作連接器

連接器作為Kafka的一部分,隨着Kafka系統一起發布,所以無需獨立安裝。在大數據應用場景下,建議在每台物理機上安裝一個Kafka。根據實際需求,可以在一部分物理機上啟動Kafka實例(即代理節點Broker),在另一部分物理機上啟動連接器。

在Kafka系統中,Kafka連接器最終是以一個常駐進程的形式運行在后台服務中,它提供了一個用來管理連接器實例的REST API。默認情況下,服務端口地址是8083。

提示:
Representational State Transfer,簡稱REST,即表現層狀態轉移。REST是所有Web應用程序都應用遵守的一種規范。符合REST設計規范的應用接口,即REST API。

在Kafka連接器中,REST API支持獲取、寫入、創建等接口,具體內容如下圖所示:

 

 在Kafka系統中,Kafka連接器目前支持兩種運行模式,它們分別是單機模式和分布式模式。

3.1 單擊模式導入

在單機模式下,所有的事件線程都在一個單進程中運行。單機模式使用起來更加簡單,特別是在開發和定位分析問題的時候,使用單機模式會比較適合。
(1)編輯單機模式配置文件。
在單機模式下,主題的偏移量是存儲在/tmp/connect.offsets目錄下,在$KAFKA_HOME/config目錄下有一個connect-standalone.properties文件,通過設置offset.storage.file.filename屬性值來改變存儲路徑。
每次Kafka連接器啟動時,通過加載$KAFKA_HOME/config/connect-file-source.properties配置文件中的name屬性來獲取主題的偏移量,然后執行后續的讀寫操作。

# 設置連接器名稱
name=local-file-source
# 指定連接器類
connector.class=FileStreamSource
# 設置最大任務數
tasks.max=1
# 指定讀取的文件
file=/tmp/test.txt
# 指定主題名
topic=connect_test

(2)在即將讀取的文件中,添加數據,具體操作命令如下。
# 新建一個test.txt文件並添加數據
[hadoop@dn1 ~]$ vi /tmp/test.txt

# 添加內容如下
kafka
hadoop
kafka-connect

# 保存並退出

在使用Kafka文件連接器時,連接器實例會監聽配置的數據文件,如果文件中有數據更新,例如:追加新的消息數據。連接器實例會及時處理新增的消息數據。
(3)啟動Kafka連接器單機模式的命令與啟動Kafka代理節點類似,具體操作命令如下。

# 啟動一個單機模式的連接器
[hadoop@dn1 bin]$ ./connect-standalone.sh ../config/connect-standalone.properties\
../config/connect-file-source.properties

(4)使用Kafka系統命令查看導入到主題(connect_test)中的數據,具體操作命令如下。

# 使用Kafka命令查看
[hadoop@dn1 bin]$ ./kafka-console-consumer.sh --zookeeper dn1:2181 --topic connect_test\
--from-beginning

3.2 分布式模式導入

在分布式模式中,Kafka連接器會自動均衡每個事件線程所處理的任務數。允許用戶動態的增加或者減少,在執行任務、修改配置、以及提交偏移量時能夠得到容錯保障。
在分布式模式中,Kafka連接器會在主題中存儲偏移量、配置、以及任務狀態。建議手動創建存儲偏移量的主題,可以按需設置主題分區數和副本數。
需要注意的是,除了配置一些通用的屬性之外,還需要配置以下幾個重要的屬性。

  • group.id(默認值connect-cluster):連接器組唯一名稱,切記不能和消費者組名稱沖突;
  • config.storage.topic(默認值connect-configs):用來存儲連接器實例和任務配置,需要注意的是,該主題應該以單分區多副本的形式存在,建議手動創建,如果自動創建可能會存在多個分區;
  • offset.storage.topic(默認值connect-offsets):用來存儲偏移量,該主題應該以多分區多副本的形式存在;
  • status.storage.topic(默認值connect-status):用來存儲任務狀態,該主題建議以多分區多副本的形式存在。

在分布式模式中,Kafka連接器配置文件不能使用命令行,需要使用REST API來執行創建、修改和銷毀Kafka連接器操作。
(1)編輯分布式模式配置文件(connect-distributed.properties)

# 設置Kafka集群地址
bootstrap.servers=dn1:9092,dn2:9092,dn3:9092
# 設置連接器唯一組名稱
group.id=connect-cluster
# 指定鍵值對JSON轉換器類
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
# 啟用鍵值對轉換器
key.converter.schemas.enable=true
value.converter.schemas.enable=true
# 設置內部鍵值對轉換器, 例如偏移量、配置等
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
# 設置偏移量存儲主題
offset.storage.topic=connect_offsets
# 設置配置存儲主題
config.storage.topic=connect_configs
# 設置任務狀態存儲主題
status.storage.topic=connect_status
# 設置偏移量持久化時間間隔
offset.flush.interval.ms=10000

(2)創建偏移量、配置、以及任務狀態主題,具體操作命令如下。

# 創建配置主題
kafka-topics.sh --create --zookeeper dn1:2181 --replication-factor 3 --partitions 1\
 --topic connect_configs
# 創建偏移量主題
kafka-topics.sh --create --zookeeper dn1:2181 --replication-factor 3 --partitions 6\
--topic connect_offsets
# 創建任務狀態主題
kafka-topics.sh --create --zookeeper dn1:2181 --replication-factor 3 --partitions 6\
 --topic connect_status

(3)啟動分布式模式連接器,具體操作命令如下。

# 啟動分布式模式連接器
[hadoop@dn1 bin]$ ./connect-distributed.sh ../config/connect-distributed.properties

(4)執行REST API命令查看當前Kafka連接器的版本號,具體操作命令如下。

# 查看連接器版本號
[hadoop@dn1 ~]$ curl http://dn1:8083/

(5)查看當前已安裝的連接器插件,通過瀏覽器訪問http://dn1:8083/connector-plugins地址來查看

 

 

(6)創建一個新的連接器實例,具體操作命令如下。

# 創建一個新的連接器實例
[hadoop@dn1 ~]$ curl 'http://dn1:8083/connectors' -X POST -i –H\
 "Content-Type:application/json" -d '{"name":"distributed-console-source","config":\
{"connector.class":"org.apache.kafka.connect.file.FileStreamSourceConnector",\
"tasks.max":"1","topic":"distributed_connect_test",\
"file":"/tmp/distributed_test.txt"}}'

然后在瀏覽器訪問http://dn1:8083/connectors地址查看當前成功創建的連接器實例名稱,如下圖所示:

 

 

(7)查看使用分布式模式導入到主題(distributed_connect_test)中的數據,具體操作命令如下。

# 在文件/tmp/distributed_test.txt中添加消息數據
[hadoop@dn1 ~]$ vi /tmp/distributed_test.txt

# 添加如下內容(這條注釋不要寫入到distributed_test.txt文件中)
distributed_kafka
kafka_connection
kafka
hadoop

# 然后保存並退出(這條注釋不要寫入到distributed_test.txt文件中)

# 使用Kafka系統命令,查看主題distributed_connect_test中的數據
[hadoop@dn1 ~]$ kafka-console-consumer.sh --zookeeper dn1:2181 –topic\
 distributed_connect_test --from-beginning

4.總結

Kafka 連接器可以從DB存儲或應用程序服務器收集數據到Topic,使數據可用於低延遲的流處理。導出作業可以將數據從Topic傳輸到二次存儲和查詢系統,或者傳遞到批處理系統以便進行離線分析。

5.結束語

這篇博客就和大家分享到這里,如果大家在研究學習的過程當中有什么問題,可以加群進行討論或發送郵件給我,我會盡我所能為您解答,與君共勉!

另外,博主出書了《Kafka並不難學》和《Hadoop大數據挖掘從入門到進階實戰》,喜歡的朋友或同學, 可以在公告欄那里點擊購買鏈接購買博主的書進行學習,在此感謝大家的支持。關注下面公眾號,根據提示,可免費獲取書籍的教學視頻。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM