kafka connect 使用說明


KAFKA CONNECT 使用說明

一、概述

kafka connect 是一個可擴展的、可靠的在kafka和其他系統之間流傳輸的數據工具。簡而言之就是他可以通過Connector(連接器)簡單、快速的將大集合數據導入和導出kafka。可以接收整個數據庫或收集來自所有的應用程序的消息到kafka的topic中,kafka connect 功能包括:

1,kafka連接器通用框架:kafka connect 規范了kafka和其他數據系統集成,簡化了開發、部署和管理。

2,分布式和單機式:擴展到大型支持整個organization的集中管理服務,也可以縮小到開發,測試和小規模生產部署。

3,REST接口:通過rest API 來提交(和管理)Connector到kafka connect 集群。

4,offset自動化管理:從Connector 獲取少量信息,connect來管理offset提交。

5,分布式和默認擴展:kafka connect建立在現有的組管理協議上,更多的工作可以添加擴展到connect集群。

6,流/批量集成:利用kafka現有能力,connect是一個橋接流和批量數據系統的理想解決方案。

在這里我們測試connect的kafka版本是:0.9.0.0

二,單機模式

單機模式的命令格式如下:

bin/connect-standalone.sh config/connect-standalone.properties Connector1.properties [Connector2.properties ...]

 

現在就上述文件我的配

1,connect-standalone.sh 是執行單機模式的命令。

#!/bin/sh
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

base_dir=$(dirname $0)

if [ "x$KAFKA_LOG4J_OPTS" = "x" ]; then
    export KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:$base_dir/../config/connect-log4j.properties"
fi
if [ -z "$KAFKA_HEAP_OPTS" ]; then
export KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false  -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.port=9901 "
fi
if [ -z "$KAFKA_HEAP_OPTS" ]; then
 export KAFKA_HEAP_OPTS="-Xmx1024M"
fi
exec $(dirname $0)/kafka-run-class.sh org.apache.kafka.connect.cli.ConnectStandalone "$@"

 

在這里可以設置給connect的虛擬機內存設置:

if [ -z "$KAFKA_HEAP_OPTS" ]; then
 export KAFKA_HEAP_OPTS="-Xmx1024M"
fi

也可以設置JMS配置:

if [ -z "$KAFKA_HEAP_OPTS" ]; then
export KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false  -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.port=9901 "
fi

2,connect-standalone.properties的配置:

 
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# These are defaults. This file just demonstrates how to override some settings.
bootstrap.servers=10.253.129.237:9092,10.253.129.238:9092,10.253.129.239:9092

# The converters specify the format of data in Kafka and how to translate it into Connect data. Every Connect user will
# need to configure these based on the format they want their data in when loaded from or stored into Kafka
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
# Converter-specific settings can be passed in by prefixing the Converter's setting with the converter we want to apply
# it to
key.converter.schemas.enable=true
value.converter.schemas.enable=true

# The internal converter used for offsets and config data is configurable and must be specified, but most users will
# always want to use the built-in default. Offset and config data is never visible outside of Copcyat in this format.
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

offset.storage.file.filename=/datafs/20181106/connect.offsets
# Flush much faster than normal, which is useful for testing/debugging
offset.flush.interval.ms=10000
 
        

 

 
        

這里需要注意broker的配置,其余配置在kafka官網都有說明參考:

http://kafka.apache.org/090/documentation.html#connectconfigs

 

3,connect-file-source.properties的配置

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

name=test_source2
connector.class=FileStreamSource
tasks.max=2
file=/datafs/20181106/json2/log.out
topic=TEST_MANAGER5

 

注意路徑和topic的配置

 

4,connect-file-sink.properties 的配置:

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

name=test_sink1
connector.class=FileStreamSink
#connector.class=org.apache.kafka.connect.file.FileStreamSinkConnector
tasks.max=1
file=/datafs/20181106/a.out
topics=TEST_MANAGER5

 

三,集群模式

命令格式:

bin/connect-distributed.sh config/connect-distributed.properties
在不同的類中,配置參數定義了 Kafka Connect 如何處理,哪里存儲配置,如何分配 work,哪里存儲 offset 和任務狀態。在分布式模式中,Kafka Connect topic 中存儲 offset,配置和任務狀態。建議手動創建 offsettopic,可以自己來定義需要的分區數和副本數。如果啟動 Kafka Connect 時還沒有創建 topic,那么 topic 將自動創建(使用默認的分區和副本),這可能不是最合適的(因為 Kafka 可不知道業務需要,只能根據默認參數創建)。特別是以下配置參數尤為關鍵,啟動集群之前設置:
group.id (默認 connect-cluster):Connect cluster group 使用唯一的名稱;注意這不能和 consumer group ID(消費者組)沖突。
config.storage.topic (默認 connect-configs):topic 用於存儲 Connector 和任務配置;注意,這應該是一個單個的 partition,多副本的 topic。你需要手動創建這個 topic,以確保是單個 partition(自動創建的可能會有多個partition)。
offset.storage.topic (默認 connect-offsets) :topic 用於存儲 offsets;這個topic應該配置多個 partition 和副本。
status.storage.topic (默認 connect-status):topic 用於存儲狀態;這個 topic 可以有多個 partitions 和副本
注意,在分布式模式中,Connector(連接器)配置不能使用命令行。要使用下面介紹的 REST API 來創建,修改和銷毀 Connector

{"name":"test","config":{"topic":"TEST_MANAGER","connector.class":"FileStreamSource","tasks.max":"2","file":"/datafs/log1.out"}}

 

四,REST API

由於 Kafka Connect 的目的是作為一個服務運行,提供了一個用於管理 ConnectorREST API。默認情況下,此服務的端口是8083。以下是當前支持的終端入口:
GET /Connectors:返回活躍的 Connector 列表
POST /Connectors:創建一個新的 Connector;請求的主體是一個包含字符串name字段和對象 config 字段(Connector 的配置參數)的 JSON 對象。
GET /Connectors/{name}:獲取指定 Connector 的信息
GET /Connectors/{name}/config:獲取指定 Connector 的配置參數
PUT /Connectors/{name}/config:更新指定 Connector 的配置參數
GET /Connectors/{name}/status:獲取 Connector 的當前狀態,包括它是否正在運行,失敗,暫停等。
GET /Connectors/{name}/tasks:獲取當前正在運行的 Connector 的任務列表。
GET /Connectors/{name}/tasks/{taskid}/status:獲取任務的當前狀態,包括是否是運行中的,失敗的,暫停的等,
PUT /Connectors/{name}/pause:暫停連接器和它的任務,停止消息處理,直到 Connector 恢復。
PUT /Connectors/{name}/resume:恢復暫停的 Connector(如果 Connector 沒有暫停,則什么都不做)
POST /Connectors/{name}/restart:重啟 ConnectorConnector 已故障)
POST /Connectors/{name}/tasks/{taskId}/restart:重啟單個任務 (通常這個任務已失敗)

DELETE /Connectors/{name}:刪除 Connector, 停止所有的任務並刪除其配置

 

Kafka Connector 還提供了獲取有關 Connector plugins 信息的 REST API
GET /Connector-plugins:返回已在 Kafka Connect 集群安裝的 Connector plugin 列表。請注意,API 僅驗證處理請求的 worker  Connector。這以為着你可能看不不一致的結果,特別是在滾動升級的時候(添加新的 Connector jar
PUT /Connector-plugins/{Connector-type}/config/validate :對提供的配置值進行驗證,執行對每個配置驗證,返回驗證的建議值和錯誤信息


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM