Kafka-導入導出數據


從控制台寫入數據並將其寫回控制台是一個方便的起點,但您可能希望使用其他來源的數據或將數據從Kafka導出到其他系統。對於許多系統,您可以使用Kafka Connect導入或導出數據,而不是編寫自定義集成代碼。 Kafka Connect是Kafka附帶的工具,可以向Kafka導入和導出數據。它是一個可擴展的工具,可以運行 連接器,實現與外部系統交互的自定義​​邏輯。在本快速入門中,我們將了解如何使用簡單的連接器運行Kafka Connect,這些連接器將數據從文件導入Kafka主題並將數據從Kafka主題導出到文件。

首先,我們將首先創建一些種子數據進行測試:

echo -e "foo\nbar" > test.txt

修改config/connect-standalone.properties配置:

bootstrap.servers=192.168.1.5:9092

修改config/connect-file-source.properties配置:

file=/opt/app/kafka_2.12-2.3.0/config/test.txt

修改config/connect-file-sink.properties配置:

file=/opt/app/kafka_2.12-2.3.0/config/test.sink.txt

接下來,我們將啟動以獨立模式運行的兩個

連接器,這意味着它們在單個本地專用進程中運行。我們提供三個配置文件作為參數。第一個始終是Kafka Connect流程的配置,包含常見配置,例如要連接的Kafka代理和數據的序列化格式。其余配置文件均指定要創建的連接器。這些文件包括唯一的連接器名稱,要實例化的連接器類以及連接器所需的任何其他配置。

bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties

在啟動過程中,您將看到許多日志消息,包括一些指示正在實例化連接器的日志消息。一旦Kafka Connect進程啟動,源連接器應該開始從test.txt主題讀取行並將其生成到主題connect-test,並且接收器連接器應該開始從主題讀取消息connect-test 並將它們寫入文件test.sink.txt。我們可以通過檢查輸出文件的內容來驗證數據是否已通過整個管道傳遞:

more test.sink.txt
foo
bar

請注意,數據存儲在Kafka主題中connect-test,因此我們還可以運行控制台使用者來查看主題中的數據(或使用自定義使用者代碼來處理它):

bin/kafka-console-consumer.sh --bootstrap-server 192.168.1.5:9092 --topic connect-test --from-beginning
{"schema":{"type":"string","optional":false},"payload":"foo"}
{"schema":{"type":"string","optional":false},"payload":"bar"}

連接器繼續處理數據,因此我們可以將數據添加到文件中,並看到它在管道中移動:

echo Another line>> test.txt

bin/kafka-console-consumer.sh --bootstrap-server 192.168.1.5:9092 --topic connect-test --from-beginning
{"schema":{"type":"string","optional":false},"payload":"foo"}
{"schema":{"type":"string","optional":false},"payload":"bar"}
{"schema":{"type":"string","optional":false},"payload":"Another line"}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM