初識kafka-connect


一、kakfak-connect簡介

kafka-connet是一個工具,用來在kafka和外部數據存儲系統之間移動數據,kafka-connect可以簡單快捷地將數據從kafka導入或導出,數據范圍涵蓋了關系型數據庫、日志、度量數據、Hadoop、數據倉庫、NoSql數據存儲、ES等。

kafka-connect架構如下(圖片來源:百度):

kafka-connect有兩個核心概念:Source和Sink。Source:負責導入數據到kafka,Sink負責從kafka導出數據,它們統稱Connector,即連接器。

另外還有兩個重要概念:Task和Worker,每一個Connector都會協調一系列的task去執行任務,Connector把一項工作任務分割成許多的task,然后把task分發到各個worker進程中去執行。task不保存自己的狀態信息,而是交給特定的kafka主題去保存。

kafka-connect提供了以下特性:

即:

  • 通用性:規范化其他數據系統與kafka的繼集成,簡化了連接器的開發、部署和管理
  • 支持獨立模式(standalone)分布式模式(distributed)
  • REST接口:使用Rest API提交和管理Connector
  • 自動位移管理:自動管理位移的提交,不需要開發人員干預,降低了開發成本
  • 分布式和可擴展性:Kafka Connect 基於現有的組管理協議來實現擴展Kafka Connect 集群
  • 流式計算和批處理的集成

kafka中通過connect-standalone.sh和connect-distributed.sh命令來實現獨立模式和分布式模式運行的Kafka Connect,可以在kafka的/bin目錄下看到:

 

二、獨立模式

在獨立模式中,所有操作都是在一個進程中完成的,它比較適合測試和功能驗證的場景,但是無法充分利用kafka自身所提供的負載均衡和高容錯特性。

下面來演示一下使用獨立模式將一個文件中的內容導入到kafka中。

2.1 Source連接器用法

1、修改配置文件

  • connect-standalone.properties:用於Work進程運行的配置文件
  • connect-file-source.properties:Source連接器的配置文件

connect-standalone.properties內容如下(一般情況下使用默認配置即可):

bootstrap.servers=localhost:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
offset.storage.file.filename=/tmp/connect.offsets

connect-file-source.properties

name=local-file-source
connector.class=FileStreamSource
tasks.max=1
file=/tmp/kafka/test-connect-source.txt
topic=topic-connect

要修改:

file:該連接器數據源文件路徑

topic:設置連接器將數據導入哪個主題,如果該主題不存在則會自動創建,當然也可以自己提前創建好(推薦)

2、創建topic

我選擇手動創建topic-connect,創建命令如下:

./kafka-topics --zookeeper localhost:2181 --create --topic topic-connect --replication-factor 1 --partitions 1

創建完成后查看以下topic的創建結果:

./kafka-topics --zookeeper localhost:2181 --describe --topic topic-connect

創建結果如下:

 

 3、啟動source連接器

./bin/connect-standalone ./libexec/config/connect-standalone.properties ./libexec/config/connect-file-source.properties

4、向test-connect-source.txt文件寫入數據

echo "hello world">>/tmp/kafka/test-connect-source.txt

5、查看結果

查看的方式有兩種,一種是通過kafka-console-consumer.sh腳本,一種是kafka-dump-log.sh腳本,前者可以實時查看效果,后者每次寫入后都要執行命令才能看到,下面演示通過這兩種方式查看的效果:

5.1 kafka-console-consumer.sh
./kafka-console-consumer --bootstrap-server localhost:9092 --topic topic-connect

結果如下:

 5.2 kafka-dump-log.sh
./kafka-dump-log --files /usr/local/var/lib/kafka-logs/topic-connect-0/00000000000000000000.log --print-data-log

結果如下:

 

 

 以上是Source連接器的用法,下面再來探索一下Sink連接器的用法.

2.2 Sink連接器用法

1、修改配置文件

  • connect-standalone.properties:用於Work進程運行的配置文件
  • connect-file-sink.properties:Sink連接器的配置文件

connect-standalone.properties內容如下(需要修改key和value的converter方式):

bootstrap.servers=localhost:9092
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
offset.storage.file.filename=/tmp/connect.offsets

connect-file-sink.properties內容如下:

name=local-file-source
connector.class=FileStreamSource
tasks.max=1
file=/tmp/kafka/test-connect-sink.txt
topic=topic-connect-sink

2、創建topic

我選擇手動創建topic-connect,創建命令如下:

./kafka-topics --zookeeper localhost:2181 --create --topic topic-connect-sink --replication-factor 1 --partitions 1

創建完成后查看以下topic的創建結果:

./kafka-topics --zookeeper localhost:2181 --describe --topic topic-connect-sink

 

 

 3、啟動sink連接器

./bin/connect-standalone ./libexec/config/connect-standalone.properties ./libexec/config/connect-file-sink.properties

4、發送消息

發送消息到topic-connect-sink

./bin/kafka-console-producer --broker-list localhost:9092 --topic topic-connect-sink

5、查看sink文件

cat test-connect-sink.txt

 

 可以看到發送到test-connect-sink 這個topic的消息成功存儲到sink文件test-connect-sink.txt中了。

三、問題

在實踐sink的過程中,我本來是想通過Source將一條消息從source文件導入kafka,同時通過Sink將該條消息從kafka中導出到sink文件,配置如下:

只要將connect-file-source.properties和connect-file-sink.properties這兩個配置文件中的topic改成相同的即可,但是執行之后卻發現:

  • 當向test-connect-source.txt文件寫入消息時,並不會在test-connect-sink.txt文件中寫入,也不會在通過kafka-console-consumer.sh消費到;
  • 當通過命令向topic-connect發送消息時,不會寫入test-connect-sink.txt文件,但是能通過kafka-console-consumer.sh消費到;

后來經過一下午的排查,居然發現一直改的是connect-console-source.propertiesconnect-console-sink.properties文件,而不是connect-file-source.propertiesconnect-file-properties,

真的也是被自己蠢哭的一下午,然后重新修改了配置文件,通過以下命令重啟connecter之后,在conncect-source.txt寫入,在connect-sink.txt中就能看到了。

四、REST API

可以通過Kafka Connect 提供的基於REST風格的API接口來管理連接器,默認端口號是8083,可以通過Worker進程配置文件中的rest.port參數來修改端口號。

如:

curl localhost:8083/connectors

 

方法請求類型 REST API 接口說明
GET / 查看kafka集群版本信息
GET /connectors 查看當前活躍的連接器列表,顯示連接器的名字,即配置connector配置文件中的name屬性
POST /connectors 根據指定配置,創建一個新的連接器
GET /connectors/{name} 查看指定連接器的信息
GET /connectors/{name}/config 查看指定連接器的配置信息
PUT /connectors/{name}/config 修改指定連接器的配置信息
GET /connectors/{name}/state 查看指定連接器的狀態
POST /connectors/{name}/restart 重啟指定的連接器
PUT /connectors/{name}/pause 暫停指定的連接器
GET /connectors/{name}/tasks 查詢指定連接器正在運行的task
POST /connectors/name}/tasks 修改指定連接器的Task配置
GET /connectors/{name}/tasks/{taskId}/status 查詢指定連接器中指定Task的狀態
POST /connectors/{name}/tasks/{taskId}/restat 重啟指定連接器中指定的Task
DELETE /connectors/{name} 刪除指定的連接器

大周末的,寫了一天博客,這效率太低了,出門透透氣~~~

參考文獻:

朱忠華 《深入理解Kafka核心設計與實踐原理》


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM