Kafka Connect Develop Details 開發詳解


Kafka Connect Develop Details 開發詳解

不得不吐槽下Kafka的官方文檔寫的不夠詳細,可能他們需要干的事情太多了,無暇顧及。
看本文之前請先閱讀:Kafka Connect Details 詳解 。

Core Concepts 核心概念

Connecttors

Kafka Connect分2種,一種是Source Connector,一種是Sink Connector。
Source將數據從第三方系統導入到Kafka;Sink將Kafka中的數據導出到第三方系統。

Connector主要是接收配置文件,描述如何將數據進行傳輸。
並且將工作(job)分解成任務(task)分配給Task去執行。

Tasks

Task也分2種,SourceTask和SinkTask。
主要工作就是將Connector分配給的數據集,進行處理傳遞。
SourceTask使用的pull拉的方式,SinkTask使用的push推的方式。
它們處理的都是Record的子類,包含topics,partition,offset,key和value信息。
Task是無狀態的,不需要處理offset等信息,這些是由Connector來處理的。

Workers

Connector和Task是邏輯單元,一個Worker對應一個進程,包含Connector和Tasks的。
在分布式模式下,是在Worker層面進行failover的處理的。

具體實現

只需要實現2個抽象類,一個是Connector,一個是Task。
Source Connector的就是SourceConnector和SourceTask。
Sink Connector的就是SinkConnector和SinkTask。

具體代碼怎么寫,可以直接抄file包目錄下的FileStreamSourceTask和FileStreamSinkTask,
或者參看我的Github項目:maxwell-sink。
其實Kafka Connect本質就是包了一層的producer和consumer。

新建項目,添加依賴

1.新建maven項目,添加依賴:

<dependency> <groupId>org.apache.kafka</groupId> <artifactId>connect-api</artifactId> <version>0.10.1.1</version> <!--<version>0.11.0.0</version>--> </dependency>

這里的版本最好保持和線上Kafka Brokers集群的版本一致。
這里要提下的是0.11.0.0版本的Kafka實現了exactly once消息投遞語義,還是非常強大的。

  1. 添加maven assembly plugin
    因為我們最終要將實現的Connector作為插件打包成jar,部署到Kafka Broker機器上。
    要注意的是:connector的依賴,不要kafka的運行時jar的版本有沖突。

  2. 繼承並實現抽象類
  3. 打包,部署
    詳情請參看:Kafka Connect Deploy 部署

注意事項

這里我說幾點要注意的:

  1. Sink Connector中不需要自己提交offset,但是要保證put方法中得到的records集合的處理,要么全部成功,要么全部失敗
    @Override public void put(final Collection<SinkRecord> records) { }

    特別是在做JDBC的batch的時候,有可能將一次records分成多個batch,必須要保證在所有batch執行完后,
    再設置connection.commit(),而不是每次batch之后就commit了。

  2. 順序性
    maxwell使用的主鍵hash策略,所以能夠保證相同primary key值的記錄的所有增刪改產生的消息都hash到相同的partition上去,
    而且我們知道一個partiton只會被分配到一個task上去(也就是一個消費者),所以不會有亂序的問題。

  3. version()方法
    不太重要,反正我直接用的”1.0.0”。

參考

8.3 Connector Development Guide
Kafka Concepts

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM