一、kakfak-connect簡介
kafka-connet是一個工具,用來在kafka和外部數據存儲系統之間移動數據,kafka-connect可以簡單快捷地將數據從kafka導入或導出,數據范圍涵蓋了關系型數據庫、日志、度量數據、Hadoop、數據倉庫、NoSql數據存儲、ES等。
kafka-connect架構如下(圖片來源:百度):

kafka-connect有兩個核心概念:Source和Sink。Source:負責導入數據到kafka,Sink負責從kafka導出數據,它們統稱Connector,即連接器。
另外還有兩個重要概念:Task和Worker,每一個Connector都會協調一系列的task去執行任務,Connector把一項工作任務分割成許多的task,然后把task分發到各個worker進程中去執行。task不保存自己的狀態信息,而是交給特定的kafka主題去保存。
kafka-connect提供了以下特性:

即:
- 通用性:規范化其他數據系統與kafka的繼集成,簡化了連接器的開發、部署和管理
- 支持獨立模式(standalone)分布式模式(distributed)
- REST接口:使用Rest API提交和管理Connector
- 自動位移管理:自動管理位移的提交,不需要開發人員干預,降低了開發成本
- 分布式和可擴展性:Kafka Connect 基於現有的組管理協議來實現擴展Kafka Connect 集群
- 流式計算和批處理的集成
kafka中通過connect-standalone.sh和connect-distributed.sh命令來實現獨立模式和分布式模式運行的Kafka Connect,可以在kafka的/bin目錄下看到:

二、獨立模式
在獨立模式中,所有操作都是在一個進程中完成的,它比較適合測試和功能驗證的場景,但是無法充分利用kafka自身所提供的負載均衡和高容錯特性。
下面來演示一下使用獨立模式將一個文件中的內容導入到kafka中。
2.1 Source連接器用法
1、修改配置文件
- connect-standalone.properties:用於Work進程運行的配置文件
- connect-file-source.properties:Source連接器的配置文件
connect-standalone.properties內容如下(一般情況下使用默認配置即可):
bootstrap.servers=localhost:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
offset.storage.file.filename=/tmp/connect.offsets
connect-file-source.properties
name=local-file-source
connector.class=FileStreamSource
tasks.max=1
file=/tmp/kafka/test-connect-source.txt
topic=topic-connect
要修改:
file:該連接器數據源文件路徑
topic:設置連接器將數據導入哪個主題,如果該主題不存在則會自動創建,當然也可以自己提前創建好(推薦)
2、創建topic
我選擇手動創建topic-connect,創建命令如下:
./kafka-topics --zookeeper localhost:2181 --create --topic topic-connect --replication-factor 1 --partitions 1
創建完成后查看以下topic的創建結果:
./kafka-topics --zookeeper localhost:2181 --describe --topic topic-connect
創建結果如下:

3、啟動source連接器
./bin/connect-standalone ./libexec/config/connect-standalone.properties ./libexec/config/connect-file-source.properties
4、向test-connect-source.txt文件寫入數據
echo "hello world">>/tmp/kafka/test-connect-source.txt
5、查看結果
查看的方式有兩種,一種是通過kafka-console-consumer.sh腳本,一種是kafka-dump-log.sh腳本,前者可以實時查看效果,后者每次寫入后都要執行命令才能看到,下面演示通過這兩種方式查看的效果:
5.1 kafka-console-consumer.sh
./kafka-console-consumer --bootstrap-server localhost:9092 --topic topic-connect
結果如下:

5.2 kafka-dump-log.sh
./kafka-dump-log --files /usr/local/var/lib/kafka-logs/topic-connect-0/00000000000000000000.log --print-data-log
結果如下:

以上是Source連接器的用法,下面再來探索一下Sink連接器的用法.
2.2 Sink連接器用法
1、修改配置文件
- connect-standalone.properties:用於Work進程運行的配置文件
- connect-file-sink.properties:Sink連接器的配置文件
connect-standalone.properties內容如下(需要修改key和value的converter方式):
bootstrap.servers=localhost:9092 key.converter=org.apache.kafka.connect.storage.StringConverter value.converter=org.apache.kafka.connect.storage.StringConverter key.converter.schemas.enable=true value.converter.schemas.enable=true offset.storage.file.filename=/tmp/connect.offsets
connect-file-sink.properties內容如下:
name=local-file-source connector.class=FileStreamSource tasks.max=1 file=/tmp/kafka/test-connect-sink.txt topic=topic-connect-sink
2、創建topic
我選擇手動創建topic-connect,創建命令如下:
./kafka-topics --zookeeper localhost:2181 --create --topic topic-connect-sink --replication-factor 1 --partitions 1
創建完成后查看以下topic的創建結果:
./kafka-topics --zookeeper localhost:2181 --describe --topic topic-connect-sink

3、啟動sink連接器
./bin/connect-standalone ./libexec/config/connect-standalone.properties ./libexec/config/connect-file-sink.properties
4、發送消息
發送消息到topic-connect-sink
./bin/kafka-console-producer --broker-list localhost:9092 --topic topic-connect-sink

5、查看sink文件
cat test-connect-sink.txt

可以看到發送到test-connect-sink 這個topic的消息成功存儲到sink文件test-connect-sink.txt中了。
三、問題
在實踐sink的過程中,我本來是想通過Source將一條消息從source文件導入kafka,同時通過Sink將該條消息從kafka中導出到sink文件,配置如下:
只要將connect-file-source.properties和connect-file-sink.properties這兩個配置文件中的topic改成相同的即可,但是執行之后卻發現:
- 當向test-connect-source.txt文件寫入消息時,並不會在test-connect-sink.txt文件中寫入,也不會在通過kafka-console-consumer.sh消費到;
- 當通過命令向topic-connect發送消息時,不會寫入test-connect-sink.txt文件,但是能通過kafka-console-consumer.sh消費到;
后來經過一下午的排查,居然發現一直改的是connect-console-source.properties和connect-console-sink.properties文件,而不是connect-file-source.properties和connect-file-properties,
真的也是被自己蠢哭的一下午,然后重新修改了配置文件,通過以下命令重啟connecter之后,在conncect-source.txt寫入,在connect-sink.txt中就能看到了。
四、REST API
可以通過Kafka Connect 提供的基於REST風格的API接口來管理連接器,默認端口號是8083,可以通過Worker進程配置文件中的rest.port參數來修改端口號。
如:
curl localhost:8083/connectors

| 方法請求類型 | REST API | 接口說明 |
| GET | / | 查看kafka集群版本信息 |
| GET | /connectors | 查看當前活躍的連接器列表,顯示連接器的名字,即配置connector配置文件中的name屬性 |
| POST | /connectors | 根據指定配置,創建一個新的連接器 |
| GET | /connectors/{name} | 查看指定連接器的信息 |
| GET | /connectors/{name}/config | 查看指定連接器的配置信息 |
| PUT | /connectors/{name}/config | 修改指定連接器的配置信息 |
| GET | /connectors/{name}/state | 查看指定連接器的狀態 |
| POST | /connectors/{name}/restart | 重啟指定的連接器 |
| PUT | /connectors/{name}/pause | 暫停指定的連接器 |
| GET | /connectors/{name}/tasks | 查詢指定連接器正在運行的task |
| POST | /connectors/name}/tasks | 修改指定連接器的Task配置 |
| GET | /connectors/{name}/tasks/{taskId}/status | 查詢指定連接器中指定Task的狀態 |
| POST | /connectors/{name}/tasks/{taskId}/restat | 重啟指定連接器中指定的Task |
| DELETE | /connectors/{name} | 刪除指定的連接器 |
大周末的,寫了一天博客,這效率太低了,出門透透氣~~~
參考文獻:
朱忠華 《深入理解Kafka核心設計與實踐原理》
