目錄
Kafka Connect Develop Details 開發詳解
不得不吐槽下Kafka的官方文檔寫的不夠詳細,可能他們需要干的事情太多了,無暇顧及。
看本文之前請先閱讀:Kafka Connect Details 詳解 。
Core Concepts 核心概念
Connecttors
Kafka Connect分2種,一種是Source Connector,一種是Sink Connector。
Source將數據從第三方系統導入到Kafka;Sink將Kafka中的數據導出到第三方系統。
Connector主要是接收配置文件,描述如何將數據進行傳輸。
並且將工作(job)分解成任務(task)分配給Task去執行。
Tasks
Task也分2種,SourceTask和SinkTask。
主要工作就是將Connector分配給的數據集,進行處理傳遞。
SourceTask使用的pull拉的方式,SinkTask使用的push推的方式。
它們處理的都是Record的子類,包含topics,partition,offset,key和value信息。
Task是無狀態的,不需要處理offset等信息,這些是由Connector來處理的。
Workers
Connector和Task是邏輯單元,一個Worker對應一個進程,包含Connector和Tasks的。
在分布式模式下,是在Worker層面進行failover的處理的。
具體實現
只需要實現2個抽象類,一個是Connector,一個是Task。
Source Connector的就是SourceConnector和SourceTask。
Sink Connector的就是SinkConnector和SinkTask。
具體代碼怎么寫,可以直接抄file包目錄下的FileStreamSourceTask和FileStreamSinkTask,
或者參看我的Github項目:maxwell-sink。
其實Kafka Connect本質就是包了一層的producer和consumer。
新建項目,添加依賴
1.新建maven項目,添加依賴:
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>connect-api</artifactId> <version>0.10.1.1</version> <!--<version>0.11.0.0</version>--> </dependency>
這里的版本最好保持和線上Kafka Brokers集群的版本一致。
這里要提下的是0.11.0.0版本的Kafka實現了exactly once消息投遞語義,還是非常強大的。
-
添加maven assembly plugin
因為我們最終要將實現的Connector作為插件打包成jar,部署到Kafka Broker機器上。
要注意的是:connector的依賴,不要kafka的運行時jar的版本有沖突。 - 繼承並實現抽象類
- 打包,部署
詳情請參看:Kafka Connect Deploy 部署
注意事項
這里我說幾點要注意的:
- Sink Connector中不需要自己提交offset,但是要保證put方法中得到的records集合的處理,要么全部成功,要么全部失敗
@Override public void put(final Collection<SinkRecord> records) { }
特別是在做JDBC的batch的時候,有可能將一次records分成多個batch,必須要保證在所有batch執行完后,
再設置connection.commit(),而不是每次batch之后就commit了。 -
順序性
maxwell使用的主鍵hash策略,所以能夠保證相同primary key值的記錄的所有增刪改產生的消息都hash到相同的partition上去,
而且我們知道一個partiton只會被分配到一個task上去(也就是一個消費者),所以不會有亂序的問題。 - version()方法
不太重要,反正我直接用的”1.0.0”。
參考
8.3 Connector Development Guide
Kafka Concepts