1.概述
最近,有同學留言咨詢Kafka連接器的相關內容,今天筆者給大家分享一下Kafka連接器建立數據管道的相關內容。
2.內容
Kafka連接器是一種用於Kafka系統和其他系統之間進行功能擴展、數據傳輸的工具。通過Kafka連接器能夠簡單、快速的將大量數據集移入到Kafka系統,或者從Kafka系統中移出,例如Kafka連接器可以低延時的將數據庫或者應用服務器中的指標數據收集到Kafka系統主題中。
另外,Kafka連接器可以通過作業導出的方式,將Kafka系統主題傳輸到二次存儲和查詢系統中,或者傳輸到批處理系統中進行離線分析。
2.1 使用場景
Kafka連接器通常用來構建數據管道,一般來說有兩種使用場景。
1. 開始和結束的端點
第一種,將Kafka系統作為數據管道的開始和結束的端點。例如,將Kafka系統主題中的數據移出到HBase數據庫,或者把Oracle數據庫中的數據移入到Kafka系統。
2. 數據傳輸的中間介質
第二種,把Kafka系統作為一個中間傳輸介質。例如,為了把海量日志數據存儲到ElasticSearch中,可以先把這些日志數據傳輸到Kafka系統中,然后再從Kafka系統中將這些數據移出到ElasticSearch進行存儲。
ElasticSearch是一個基於Lucene(Lucene是一款高性能、可擴展的信息檢索工具庫)實現的存儲介質。它提供了一個分布式多用戶能力的全文搜索引擎,基RESTful(一種軟件架構風格、設計風格,但是並非標准,只是提供了一組設計原則和約束條件)接口實現。
Kafka連接器的存在,給數據管道帶來很重要的價值。例如,Kafka連接器可以作為數據管道各個數據階段的緩沖區,有效的將消費者實例和生產者實例進行解耦。
Kafka系統解除耦合的能力、系統的安全性、數據處理的效率等方面均表現不俗,因而使用Kafka連接器來構建數據管道是一個最佳的選舉。
2.2 特性和優勢
Kafka連接器包含一些重要的特性,並且給數據管道提供了一個成熟穩定的框架。同時,Kafka連接器還提供了一些簡單易用的工具庫,大大降低的開發人員的研發成本。
1. 特性
Kafka連接器具體包含的特性如下。
- 通用框架:Kafka連接器制定了一種標准,用來約束Kafka系統與其他系統集成,簡化了Kafka連接器的開發、部署和管理;
- 單機模式和分布式模式:Kafka連接器支持兩種模式,既能擴展到支持大型集群的服務管理,也可以縮小到開發、測試等小規模的集群;
- REST接口:使用REST API來提交請求並管理Kafka集群;
- 自動管理偏移量:通過連接器的少量信息,Kafka連接器可以自動管理偏移量;
- 分布式和可擴展:Kafka連接器是建立在現有的組管理協議上,通過添加更多的連接器實例來水平擴展,實現分布式服務;
- 數據流和批量集成:利用Kafka系統已有的能力,Kafka連接器是橋接數據流和批處理系統的一種理想的解決方案。
2. 優勢
在Kafka連接器中有兩個核心的概念,它們分別是Source和Sink。其中Source負責將數據導入到Kafka系統,而Sink則負責將數據從Kafka系統中進行導出。
Source和Sink在實現數據導入和導出的過程被稱之連接器,即Source連接器和Sink連接器。這兩種連接器提供了對業務層面數據讀取和寫入的抽象接口,簡化了生命周期的管理工作。
在處理數據時,Source連接器和Sink連接器會初始化各自的任務,並將數據結構進行標准化的封裝。在實際應用場景中,不同的業務中的數據格式是不一樣的,因此,Kafka連接器通過注冊數據結構,來解決數據格式驗證和兼容性問題。
當數據源發生變化時,Kafka連接器會生成新的數據結構,通過不同的處理策略來完成對數據格式的兼容。
2.3 核心概念
在Kafka連接器中存在幾個核心的概念,它們分別是連接器實例(Connectors)、任務數(Tasks)、事件線程數(Workers)、轉換器(Converters)。
1. 連機器實例
在Kafka連接器中,連接器實例決定了消息數據的流向,即消息數據從何處復制,以及將復制的消息數據寫入到何處。
一個連接器實例負責Kafka系統與其他系統之間的邏輯處理,連接器實例通常以JAR包的形式存在,通過實現Kafka系統應用接口來完成。
2. 任務數
在分布式模式下,每一個連接器實例可以將一個作業切分成多個任務(Task),然后再將任務分發到各個事件線程(Worker)中去執行。任務不會保存當前的狀態信息,通常由特定的Kafka主題來保存,例如指定具體屬性offset.storage.topic和status.storage.topic的值來保存。
在分布式模式中,會存在任務均衡的概念。當一個連接器實例首次提交到Kafka集群,所有的事件線程都會做一個任務均衡的操作,來保證每一個事件線程都運行差不多數量的任務,避免所有任務集中到某一個事件線程。
3. 事件線程
在Kafka系統中,連接器實例和任務數都是邏輯層面的,需要有具體的線程來執行。在Kafka連接器中,事件線程就是用來執行具體的任務,事件線程包含兩種,分別是單機模式和分布式模式。
4. 轉換器
轉換器會將字節數據轉換成Kafka連接器內部的格式,同時,也能將Kafka連接器內部存儲的數據格式轉換成字節數據。
3.操作連接器
連接器作為Kafka的一部分,隨着Kafka系統一起發布,所以無需獨立安裝。在大數據應用場景下,建議在每台物理機上安裝一個Kafka。根據實際需求,可以在一部分物理機上啟動Kafka實例(即代理節點Broker),在另一部分物理機上啟動連接器。
在Kafka系統中,Kafka連接器最終是以一個常駐進程的形式運行在后台服務中,它提供了一個用來管理連接器實例的REST API。默認情況下,服務端口地址是8083。
提示:
Representational State Transfer,簡稱REST,即表現層狀態轉移。REST是所有Web應用程序都應用遵守的一種規范。符合REST設計規范的應用接口,即REST API。
在Kafka連接器中,REST API支持獲取、寫入、創建等接口,具體內容如下圖所示:
在Kafka系統中,Kafka連接器目前支持兩種運行模式,它們分別是單機模式和分布式模式。
3.1 單擊模式導入
在單機模式下,所有的事件線程都在一個單進程中運行。單機模式使用起來更加簡單,特別是在開發和定位分析問題的時候,使用單機模式會比較適合。
(1)編輯單機模式配置文件。
在單機模式下,主題的偏移量是存儲在/tmp/connect.offsets目錄下,在$KAFKA_HOME/config目錄下有一個connect-standalone.properties文件,通過設置offset.storage.file.filename屬性值來改變存儲路徑。
每次Kafka連接器啟動時,通過加載$KAFKA_HOME/config/connect-file-source.properties配置文件中的name屬性來獲取主題的偏移量,然后執行后續的讀寫操作。
# 設置連接器名稱 name=local-file-source # 指定連接器類 connector.class=FileStreamSource # 設置最大任務數 tasks.max=1 # 指定讀取的文件 file=/tmp/test.txt # 指定主題名 topic=connect_test
(2)在即將讀取的文件中,添加數據,具體操作命令如下。
# 新建一個test.txt文件並添加數據
[hadoop@dn1 ~]$ vi /tmp/test.txt
# 添加內容如下 kafka hadoop kafka-connect # 保存並退出
在使用Kafka文件連接器時,連接器實例會監聽配置的數據文件,如果文件中有數據更新,例如:追加新的消息數據。連接器實例會及時處理新增的消息數據。
(3)啟動Kafka連接器單機模式的命令與啟動Kafka代理節點類似,具體操作命令如下。
# 啟動一個單機模式的連接器 [hadoop@dn1 bin]$ ./connect-standalone.sh ../config/connect-standalone.properties\ ../config/connect-file-source.properties
(4)使用Kafka系統命令查看導入到主題(connect_test)中的數據,具體操作命令如下。
# 使用Kafka命令查看 [hadoop@dn1 bin]$ ./kafka-console-consumer.sh --zookeeper dn1:2181 --topic connect_test\ --from-beginning
3.2 分布式模式導入
在分布式模式中,Kafka連接器會自動均衡每個事件線程所處理的任務數。允許用戶動態的增加或者減少,在執行任務、修改配置、以及提交偏移量時能夠得到容錯保障。
在分布式模式中,Kafka連接器會在主題中存儲偏移量、配置、以及任務狀態。建議手動創建存儲偏移量的主題,可以按需設置主題分區數和副本數。
需要注意的是,除了配置一些通用的屬性之外,還需要配置以下幾個重要的屬性。
- group.id(默認值connect-cluster):連接器組唯一名稱,切記不能和消費者組名稱沖突;
- config.storage.topic(默認值connect-configs):用來存儲連接器實例和任務配置,需要注意的是,該主題應該以單分區多副本的形式存在,建議手動創建,如果自動創建可能會存在多個分區;
- offset.storage.topic(默認值connect-offsets):用來存儲偏移量,該主題應該以多分區多副本的形式存在;
- status.storage.topic(默認值connect-status):用來存儲任務狀態,該主題建議以多分區多副本的形式存在。
在分布式模式中,Kafka連接器配置文件不能使用命令行,需要使用REST API來執行創建、修改和銷毀Kafka連接器操作。
(1)編輯分布式模式配置文件(connect-distributed.properties)
# 設置Kafka集群地址 bootstrap.servers=dn1:9092,dn2:9092,dn3:9092 # 設置連接器唯一組名稱 group.id=connect-cluster # 指定鍵值對JSON轉換器類 key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter # 啟用鍵值對轉換器 key.converter.schemas.enable=true value.converter.schemas.enable=true # 設置內部鍵值對轉換器, 例如偏移量、配置等 internal.key.converter=org.apache.kafka.connect.json.JsonConverter internal.value.converter=org.apache.kafka.connect.json.JsonConverter internal.key.converter.schemas.enable=false internal.value.converter.schemas.enable=false # 設置偏移量存儲主題 offset.storage.topic=connect_offsets # 設置配置存儲主題 config.storage.topic=connect_configs # 設置任務狀態存儲主題 status.storage.topic=connect_status # 設置偏移量持久化時間間隔 offset.flush.interval.ms=10000
(2)創建偏移量、配置、以及任務狀態主題,具體操作命令如下。
# 創建配置主題 kafka-topics.sh --create --zookeeper dn1:2181 --replication-factor 3 --partitions 1\ --topic connect_configs # 創建偏移量主題 kafka-topics.sh --create --zookeeper dn1:2181 --replication-factor 3 --partitions 6\ --topic connect_offsets # 創建任務狀態主題 kafka-topics.sh --create --zookeeper dn1:2181 --replication-factor 3 --partitions 6\ --topic connect_status
(3)啟動分布式模式連接器,具體操作命令如下。
# 啟動分布式模式連接器 [hadoop@dn1 bin]$ ./connect-distributed.sh ../config/connect-distributed.properties
(4)執行REST API命令查看當前Kafka連接器的版本號,具體操作命令如下。
# 查看連接器版本號 [hadoop@dn1 ~]$ curl http://dn1:8083/
(5)查看當前已安裝的連接器插件,通過瀏覽器訪問http://dn1:8083/connector-plugins地址來查看
(6)創建一個新的連接器實例,具體操作命令如下。
# 創建一個新的連接器實例 [hadoop@dn1 ~]$ curl 'http://dn1:8083/connectors' -X POST -i –H\ "Content-Type:application/json" -d '{"name":"distributed-console-source","config":\ {"connector.class":"org.apache.kafka.connect.file.FileStreamSourceConnector",\ "tasks.max":"1","topic":"distributed_connect_test",\ "file":"/tmp/distributed_test.txt"}}'
然后在瀏覽器訪問http://dn1:8083/connectors地址查看當前成功創建的連接器實例名稱,如下圖所示:
(7)查看使用分布式模式導入到主題(distributed_connect_test)中的數據,具體操作命令如下。
# 在文件/tmp/distributed_test.txt中添加消息數據 [hadoop@dn1 ~]$ vi /tmp/distributed_test.txt # 添加如下內容(這條注釋不要寫入到distributed_test.txt文件中) distributed_kafka kafka_connection kafka hadoop # 然后保存並退出(這條注釋不要寫入到distributed_test.txt文件中) # 使用Kafka系統命令,查看主題distributed_connect_test中的數據 [hadoop@dn1 ~]$ kafka-console-consumer.sh --zookeeper dn1:2181 –topic\ distributed_connect_test --from-beginning
4.總結
Kafka 連接器可以從DB存儲或應用程序服務器收集數據到Topic,使數據可用於低延遲的流處理。導出作業可以將數據從Topic傳輸到二次存儲和查詢系統,或者傳遞到批處理系統以便進行離線分析。
5.結束語
這篇博客就和大家分享到這里,如果大家在研究學習的過程當中有什么問題,可以加群進行討論或發送郵件給我,我會盡我所能為您解答,與君共勉!
另外,博主出書了《Kafka並不難學》和《Hadoop大數據挖掘從入門到進階實戰》,喜歡的朋友或同學, 可以在公告欄那里點擊購買鏈接購買博主的書進行學習,在此感謝大家的支持。關注下面公眾號,根據提示,可免費獲取書籍的教學視頻。