Kafka: Connect


轉自:http://www.cnblogs.com/f1194361820/p/6108025.html

 

Kafka Connect 簡介

    Kafka Connect 是一個可以在Kafka與其他系統之間提供可靠的、易於擴展的數據流處理工具。使用它能夠使得數據進出Kafka變得很簡單。Kafka Connect有如下特性:

·是一個通用的構造kafka connector的框架

·有單機、分布式兩種模式。開發時建議使用單機模式,生產環境下使用分布式模式。

·提供restful的管理connector的API。

·自動化的offset管理。Kafka Connect自動的管理offset提交。

·分布式、可擴展。采用與concumer group中對partition rebalance同樣的機制來管理在worker group中的connector、task。

·流/批處理的集成。

接下來會對Kafka Connect做一個全面的分析,來幫助了解上述特性。

 

 

 

1、基本組件:

1.1 Worker

 

 

 

 

Worker用於調度source task、sink task 來處理數據的進出Kafka。一個Worker中包括多個Connector。一個Connector與多個task關聯。當啟動(或者加入)一個Connector時,與之關聯的Tasks會被創建並提交到executor去執行;停止(或移除)一個Connector時,與之關聯的Tasks會被停止並移除。也就是說Worker管理Connector插件的插、拔,Task的啟、停。

 

 

1.2 Connector

Connector,可以看做是Kafka Connect的插件,用來與其他系統進行集成。有兩類:SourceConnector、SinkConnector。如果要實現一個Connector,那么必須繼承這兩個類中的一個,這是硬性要求。Connector 與Task關聯,Worker對Connector的啟、停、插、拔,其實最終反映在對Task啟、停、插、拔上。

Connector主要為創建Task實例提供配置,可以提供connector本身的配置,也可以提供task的配置。

 

 

1.3 Task

Task是由Worker中的executor來執行的。在Worker中包括了所有的WorkerTask。WorkerTask有兩類:WorkerSourceTask、WorkSinkTask。

在運行過程中,WorkerSourceTask不斷的做如下調用:

1)調用SourceTask.poll()方法取得數據SourceRecords,

2)使用相應的Converter將SourceRecords轉換為ProducerRecords,

3)並由KafkaProducer#send推給Kafka Broker

 

 

 

 

在運行過程中,WorkerSinkTask不斷的做如下調用:

1)調用KafkaConsumer#poll方法從Kafka Broker拉取ConsumerRecords,

2)使用相應的Converter將ConsumerRecords轉換為SinkRecords,

3)調用SinkTask#flush方法來執行自定義的處理

4)提交 offset到Kafka Broker。

 

 

 

 

不難看出,只有系統運行正常,SinkTask、SourceTask都會被一次又一次的調用。即一個Task實例會占用一個線程。如果一個為Connector配置了task數量大於1,就會有多個tasks實例(這多個Task實例是同一個Java類的多個實例)與該Connector關聯,那么這多個tasks在運行時必然是並發執行的。這就要求SourceTask#poll()、SinkTask#flush()在運行時能夠保證線程安全性。

另外從WorkerSinkTask#flush如果出現異常,是不會提交offset的。此外,還會調用consumer#seek找到上一次提交的offset,seek的作用是:讓下一次使用consumer#poll方法時,從上一次提交的offset開始poll。這樣就可以保證數據在consume過程中不會丟失。

 

1.3.1 Kafka Connect與Kafka Producer、Kafka Consumer關系

Kafka Connect必然是要依賴於KafkaProducer、KafkaConsumer的。這種依賴關系到底是怎樣的呢?從上面WorkerSinkTask、WorkerSourceTask的執行過程來看,他們是直接與KafkaConsumer、KafkaProducer直接關聯的。經過源碼的查看可以了解到:

在一個Worker內,所有的WorkerSourceTask共享同一個KafkaProducer對象來發送record到Kafka,這種設計也符合KafkaProducer的使用原則;每一個WorkerSinkTask擁有一個私有的KafkaConsumer,也就是說不存在KafkaConsumer共享使用的情況。

    既然一個WorkerSinkTask獨自使用一個KafkaConsumer,那么一個WorkerSinkTask就可以認為是一個獨立的KafkaConsumer。那么如果一個WorkerSinkTask因某種原因啟動、加入、重啟、結束、中斷、停止,就等於是說一個KafkaConsumer加入、退出consumer group,那么該consumer group 的Coordinator Broker就會發起rebalance,使得該KafkaConsumer被分配到的partiion分配到group 內其他的KafkaConsumer上。

 

Kafka-Connect中,如何對KafkaConsumer進行分組呢?

從WorkerSinkTask的相關源碼來看:

private KafkaConsumer<byte[], byte[]> createConsumer() {

        // Include any unknown worker configs so consumer configs can be set globally on the worker

        // and through to the task

        Map<String, Object> props = new HashMap<>();

 

        props.put(ConsumerConfig.GROUP_ID_CONFIG, "connect-" + id.connector());

        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,

                Utils.join(workerConfig.getList(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG), ","));

        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");

        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");

        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");

 

        props.putAll(workerConfig.originalsWithPrefix("consumer."));

 

        KafkaConsumer<byte[], byte[]> newConsumer;

        try {

            newConsumer = new KafkaConsumer<>(props);

        } catch (Throwable t) {

            throw new ConnectException("Failed to create consumer", t);

        }

 

        return newConsumer;

    }

 

是把與同一個Connector關聯的Task會分到一個Consumer Group中。如果要自定義並且 組名默認是connector的名字。如果要自定義Consumer的屬性,可以在worker 的配置文件中使用consumer.前綴的方式來完成。

通過上述代碼也可以看出,默認情況下,關閉了KafkaConsumer的自動提交offset,改為由Kafka Connect來完成offset的提交。

 

1.4 Converter

WorkerSinkTask、WorkerSourceTask的執行過程中,會使用Key的Converter、Value的Converter對Key、Value進行轉換。

 

 

這樣做的原因:

作為一個Connector框架,在進行producer#send、consumer#pool時,框架本身並不知道用戶的程序中,如何進行序列化、反序列化操作。並且進入Kafka的數據類別不同,所需要的序列化、反序列化工具也不同。

為了解決這一問題,Kafka Connect中引入了Converter、Schema。

 

WorkerSoureTask中,將SourceRecord中的key、value取出來,經過Converter轉換(序列化)成byte[],轉成ProducerRecord。

for (final SourceRecord record : toSend) {

            byte[] key = keyConverter.fromConnectData(record.topic(), record.keySchema(), record.key());

            byte[] value = valueConverter.fromConnectData(record.topic(), record.valueSchema(), record.value());

            final ProducerRecord<byte[], byte[]> producerRecord = newProducerRecord<>(record.topic(), record.kafkaPartition(), key, value);

}

 

WorkerSinkTask中,將consumer#poll到的ConsumerRecord轉換(反序列化)成SinkRecord:

private void convertMessages(ConsumerRecords<byte[], byte[]> msgs) {

        for (ConsumerRecord<byte[], byte[]> msg : msgs) {

            log.trace("Consuming message with key {}, value {}", msg.key(), msg.value());

            SchemaAndValue keyAndSchema = keyConverter.toConnectData(msg.topic(), msg.key());

            SchemaAndValue valueAndSchema = valueConverter.toConnectData(msg.topic(), msg.value());

            messageBatch.add(

                    new SinkRecord(msg.topic(), msg.partition(),

                            keyAndSchema.schema(), keyAndSchema.value(),

                            valueAndSchema.schema(), valueAndSchema.value(),

                            msg.offset())

            );

        }

    }

 

 

2、Rebalance

    Kafka Connect支持單機、分布式(集群)兩種模式。在生產環境下,通常會使用分布式模式。

    通常情況下業務系統集群的數據進入到kafka,然后有Consumer集群進行數據消費處理。如果使用Kafka Connect,集群方案就變得很簡單並且擴展性很好。

 

    根據前面所了解的內容來看,一個Connector關聯到一類Task,可以指定該類Task的數目,也就是Consumer的數目。也就是說一個Connector關聯到一個Consumer Group上。如果Worker是單機模式,那么這個Consumer Group內的多個Consumer就是在同一進程內。如果進程掛掉,也就是整個Consumer Group全部掛掉。

在使用集群模式(Worker Cluster)時,這個問題如果不解決,所謂的集群就沒有任何意義。所以在Worker Cluster模式時,重點要解決上述問題。

    通過之前的Consumer、Broker的學習,知道Broker會為每一個Consumer Group提供一個協處理器。在Broker Coordinator、 Consumer Leader的配合下,當有Consumer 加入或退出Consumer Group時,會對partition進行rebalance。使得partition相對均勻的分配給各個Consumer上,以此來避免出現熱點(partition分配給少數的Consumer上)問題。

 

    Kafka Connect在解決這個問題時,也采用了同樣的方案:即使用Broker Coordinator 配合 Worker Leader的方式,對connector、task進行rebalance。這樣一來,使connector、task相對均勻的分配到同一個Worker Group中的不同的Worker上。如此便可以解決上述問題了。

 

一個Worker Leader分配connector、task時的處理是:

private Map<String, ByteBuffer> performTaskAssignment(String leaderId,long maxOffset, Map<String, ConnectProtocol.WorkerState> allConfigs) {

        Map<String, List<String>> connectorAssignments = new HashMap<>();

        Map<String, List<ConnectorTaskId>> taskAssignments = newHashMap<>();

 

        // Perform round-robin task assignment

        CircularIterator<String> memberIt = new CircularIterator<>(sorted(allConfigs.keySet()));

        for (String connectorId : sorted(configSnapshot.connectors())) {

            String connectorAssignedTo = memberIt.next();

            log.trace("Assigning connector {} to {}", connectorId, connectorAssignedTo);

            List<String> memberConnectors = connectorAssignments.get(connectorAssignedTo);

            if (memberConnectors == null) {

                memberConnectors = new ArrayList<>();

                connectorAssignments.put(connectorAssignedTo, memberConnectors);

            }

            memberConnectors.add(connectorId);

 

            for (ConnectorTaskId taskId :sorted(configSnapshot.tasks(connectorId))) {

                String taskAssignedTo = memberIt.next();

                log.trace("Assigning task {} to {}", taskId, taskAssignedTo);

                List<ConnectorTaskId> memberTasks = taskAssignments.get(taskAssignedTo);

                if (memberTasks == null) {

                    memberTasks = new ArrayList<>();

                    taskAssignments.put(taskAssignedTo, memberTasks);

                }

                memberTasks.add(taskId);

            }

        }

 

        this.leaderState = new LeaderState(allConfigs, connectorAssignments, taskAssignments);

 

        return fillAssignmentsAndSerialize(allConfigs.keySet(), ConnectProtocol.Assignment.NO_ERROR,

                leaderId, allConfigs.get(leaderId).url(), maxOffset, connectorAssignments, taskAssignments);

    }

 

下面這張圖展示了某個Worker失敗情況下,對Task Rebalance的情況:

 

 

一個Connector 1設置了5個task:

第一列表示所有的Worker都正常工作情況下 Task的分配。

第二列表示Worker 2進程出現故障, task2、task3也就掛掉了。

第三列表示對Task進行rebalance后,task2、task3被rebalance到其他的Worker名下了。

 

3、Storage

3.1 內置Storage

    在Kafka Connect中,內置了3種Storages。這三種配置都會(不論是Standalone,還是Distributed)有一份基於內存的Storage。下面只是針對內存Storage以外的方式進行說明:

·status’s storage

存儲各個connector、task的狀態信息。在Worker分布式的情況下,存儲在Kafka 的一個Topic中,topic的名字由worker配置項status.storage.topic來指定。

Status 有5種:

1) UNASSIGNED:connector或者task是否被分配到了個Worker上。

2) RUNNING: connector或者task是否正在運行。

3) PAUSED: connector或者task是否已暫停運行。

4) FAILED: connector或者task是否失敗(通常是拋出異常)。

5) DESTROYED:connector或者task是否已被刪除。

在運行時,會定時更新狀態信息。

 

·offset’s storage

在Kafka Connect中,WorkerSourceTask  用於從某個源拉數據,然后使用kafkaProducer#send推送到Kafka中。如果一個Connector 配置了多個Task,那么這多個Task應該是從不同的源中取數據。

其中這里說的源可以是多個File,一個或者多個Topic,可以是RDBMS中的多張表,可以是多個Queue等等。

為了記錄從哪個源讀取數據時讀取到了哪個位置,當WorkerSourceTask中斷重啟時,能夠繼續從這個位置讀取。Kafka Connect 為 SourceRecord設計了 兩個字段:sourcePartition, sourceOffset:

 public SourceRecord(Map<String, ?> sourcePartition, Map<String, ?> sourceOffset,
                        String topic, Integer partition, Schema valueSchema, Object value) {
        this(sourcePartition, sourceOffset, topic, partition, null, null, valueSchema, value);
    }

sourcePartition :用於表示從哪個Source中讀取數據。所以它可以是一個File名、某個topic的partition、一個table名 等等。

sourceOffset:用於表示讀取到哪個位置。可以它可以是一個File中哪一行、某個topic的partition的哪個offset、table中的第幾行 等等。

 

因為sourcePartition, sourceOffset是可以自定義的,所以它具體的數據類型,並不知道。為了能夠方便的進行序列化、反序列化,可以配置internal.key.converter (用於序列化、反序列化sourcePartition), interval.value.converter (用於序列化、反序列化sourceOffset)。

Kafka Connect會 會把sourcePartition, sourceOffset 以 一個key-value 方式(key 是 sourcePartition, value 是sourceOffset)存儲起來。單機模式、分布式模式下,都會存儲這個offset的。

只是在Standalone模式下,是以本地File方式存儲。分布式模式下,是把sourcePartition, sourceOffset做為一條 record存儲在Kafka中的一個topic中。Topic的名稱由worker配置項offset.storage.topic來指定。

這個offset與Broker中的Log的offset 沒有任何的關系。

 

·config’s storage

    在Kafka connect中,每一個Connector,以及與之關聯的Task都會有一些配置信息。在rebalance后,還是需要用到這些配置的。為了使得Worker Group內共享配置,也需要對connector、task的配置進行存儲。

    分布式模式下,會在一個topic中存儲。Topic的名稱由worker配置項 config.storage.topic來指定。

 

4、Connector 管理

4.1 Herder

    在前面的基本組件中,可以了解到,Worker可以對Connector進行啟、停、插、拔。為了更加方便的管理Connector,Kafka-Connect中在Worker之外提供了一個工具Herder(翻譯為漢語:牧人,其實也就是管理者的意思)。

 

 

使用Herder可以插、拔Connector,可以啟、停Task,可以修改Connector的配置,獲取connector、task的狀態。

    針對Standalone、Distributed模式,實現的Herder也是不盡相同的。一些修改性質的操作都是只有Worker Leader能夠進行的。所以一個Worker Group中的某個Worker的管理者Herder接收到一個修改的請求時,會將該請求路由(通過Restful請求)到Worker Leader。

    在使用Kafka Connect時,如果是將Kafka Connect內嵌到業務系統中時,我們是可以通過Herder 對象來對Kafka Connect中的組件進行管理操作的。

但是很多情況下,並不是將Kafka Connect內嵌到我們的系統中的,而是使用它的CLI獨立的啟動一個或者多個進程的。這種情況下,如何來管理Kafka Connect的組件呢?

 

 

4.2 Restful

    為了方便管理,另外也專門提供了一種基於Restful的方式來管理connector、task。不論是Standalone還是Distributed模式都支持Restful方式管理。

    Restful管理方式,會在Kafka-Connect進程內部內嵌一個基於Jetty的Web Server。默認的端口是8083。

    下面會列出當前Kafka-Connector 版本0.10.1.0支持的Restful請求:

 

  • GET /connectors – 獲取當前活動的 connectors列表
  • POST /connectors – 創建一個新的Connector。請求體是一個JSON對象,包括connector的名稱,和配置。

{

name:”conn-2”,

config:{

     // connector的配置項。

}

}

  • GET /connectors/{name} – 獲取指定的connector的信息。
  • GET /connectors/{name}/config -  獲取指定的connector的配置項。
  • PUT /connectors/{name}/config – 修改指定的connector的配置項。
  • GET /connectors/{name}/status – 獲取指定的connector的當前狀態。包括: 1)connector的status, 2)分配在哪個worker上, 3)如果失敗的話,錯誤信息,4)與之關聯的所有任務的狀態。
  • GET /connectors/{name}/tasks – 獲取指定connector的任務列表。
  • GET /connectors/{name}/tasks/{taskid}/status – 獲取指定的connector的指定task的狀態信息。包括:1)該任務的status,2)該任務分配在哪個worker上,3)如果有失敗,錯誤信息。
  • PUT /connectors/{name}/pause – 暫停某個Connector。暫停某個connector時,與之關聯的task都會暫停。
  • PUT /connectors/{name}/resume – 繼續某個Connector。如果這個connector根本就沒有暫停,那就什么也不用做。
  • POST /connectors/{name}/restart – 重啟一個Connector。通常在一Connector失敗時。
  • POST /connectors/{name}/tasks/{taskId}/restart – 重啟一個task。
  • DELETE /connectors/{name} –刪除一個Connector。刪除Connector時,會停止它關聯的task,並刪除相關的配置信息。

Kafka Connect 也為獲取Connector Plugin的信息提供了 REST API :

  • GET /connector-plugins- 獲取在改Kafka-Connect集群中安裝的Connector Plugin列表。需要注意的是,該API只能獲取到正在Worker中正常運行狀態的connector。某些connector是看不到的,特別是在進行升級時,例如add一個connector jar包。
  • PUT/connector-plugins/{connector-type}/config/validate   驗證connector的配置。

 

4.3 通用Connector升級

    目前已提供了很多通用的Connector插件。

地址:https://www.confluent.io/product/connectors/

 

    在使用這些插件時,如果需要升級connector,需要按照下面步驟進行:

1) 下載新版的connector

2) 停止所有的Kafka connect workers。

3)根據connector plugin 的安裝說明來安裝。

4)啟動workers

5)如果采用分布式模式,啟動connector。

 

5、Configuration

5.1 Worker

Worker 可以在Standalone模式運行,也可以在Distributed模式下運行,兩種情況下有不同的配置,所以這里需要對他們加以區分:

5.1.1 Common Configuration

·bootstrap.server

因為Worker必然要有至少一個KafkaProducer實例(分布式時有兩個),所以需要至少配置一個Kafka broker的host:port對。如果是Broker集群模式,也沒有必要將所有的broker配置上。格式是:host1:port1,host2:port2,host3:port3...

 

·key.converter, value.converter

    配置record中的key、value 所采用的converter類名。比較流行的Converter有:JSON、Avro。

 

·internal.key.converter, internal.value.converter

配置connect-offset topic (也即 offset storage’s topic)中的record的key、value所采用的converter類名。比較流行的Converter有JSON、Avro。

 

·rest.host.name, rest.port

Rest Server的要綁定的IP和port。其中port模式是8083。

 

·session.time.out

每一個worker會周期性的發送一個heartbeat到Broker,以告訴Broker自己還活着。如果超過session.timeout.ms 還沒有發送heartbeat給Broker,Worker就會斷開相關的連接。那么Broker就不能收到heartbeat,就會導致Broker 認為該Worker已經掛掉了。然后Broker就會觸發Rebalance,來重新分配connector、task。這個值必須配置的 group.min.session.timeout.ms 與 group.max.session.timeout.ms之間。

默認值:10000,即10s。

 

·heartbeat.interval.ms

心跳間隔。心跳用於keep-alive Worker與Broker之間的連接。這個值必須低於session.timeout.ms。建議設置的值不要高於session.timeout.ms的1/3。

默認值:3000,即3s 。

 

. rebalance.timeout.ms

一旦一個rebalance開始了,一個Worker加入到group的時間。在此期間,運行在該Worker上的所有的任務需要flush掉那些還沒有處理完畢的數據,並且要提交offset。

如果超過了這個時間,offset commit就會失敗。

默認值 60000,即60s 。

 

·connection.max.idle.ms

連接最大空閑時間。模式值:540000,即9 min。

 

·client.id

指定一個認為可讀的worker id,只是用於跟蹤。

 

 

5.1.2 Standalone Worker Configuration

·offset.storage.file.filename

存儲connector offset 的文件。

 

5.1.3 Distributed Worker Configuration

·group.id

指定worker屬於哪個worker group。

 

·config.storage.topic, offset.storage.topic, status.storage.topic

在一個Worker Group內,  Workers 能夠發現彼此並共享connector、Task相關的config、offset、status信息。這些共享信息分別存儲在一個topic內。這3個配置項就是用來指定topic名稱的。在同一個Worker Group下的每一個worker的配置文件中,這三項必須以一致的。

 

創建這三個topic時的注意事項:

1)Offset存儲的topic 需要有很多個partation並且每一個partition至少三個replicas。

2)config存儲的topic 只創建一個partition,並且至少3個replicas。

3)status存儲的topic需要創建多個partition並且每一個partition至少3個replicas。

 要了解更多信息,可以參考前面Storage部分。

 

如果想要了解更多關於Kafka-Connect的知識,可以參考:http://docs.confluent.io/current

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM