2020-03-01
組件版本
Confluent Platform:5.2.2 : https://www.confluent.io/
Debezium:1.0.0 : https://debezium.io/
Kafka:2.3.0
confluent 下載安裝包需要注冊一下賬號,Platform版本大部分組件是開源免費的,但是部分組件是免費不開源的。
搭建過程
1.首先下載各個組件。
2.kafka集群先建設好。
3.解壓組件
confluent 解壓命令:tar -zxvf confluent-5.2.2-2.12.tar.gz -C 安裝路徑
Debezium 解壓命令:tar -zxvf debezium-connector-mysql-1.0.0.Final-plugin.tar.gz -C /confluent 插件路徑
confluent的默認插件地址為 /安裝路徑/confluent-5.2.2/share/java。
4.Confluent,修改配置文件。
confluent的啟動模式有兩種模式,standalone和cluster模式,本人使用cluster模式進行部署。
配置文件路徑為:/安裝路徑/confluent-5.2.2/etc/schema-registry/connect-avro-distributed.properties
需要注意以下幾個配置項:
bootstrap.servers : kafka集群broker地址
group.id : 給定confluent一個id名稱(但是實際驗證,此id無法作為同一集群的標識,寫上就可以了)
key.converter:此為kafka 官方配置相同,選定key的傳輸格式,本人搭建使用avro格式傳輸。
value.converter:此為kafka 官方配置相同,選定value的傳輸格式,本人搭建使用avro格式傳輸。
plugin.path : 此為增加的插件的路徑(有時候會出現寫入相對路徑無法加載的問題,本人直接改為絕對路徑)
5.啟動confluent
/安裝路徑/confluent-5.2.2/bin/connect-distributed /安裝路徑/confluent-5.2.2/etc/schema-registry/connect-avro-distributed.properties
此配置為本人 connect-avro-distributed.properties 配置文件,可以作為配置參考。
# Sample configuration for a distributed Kafka Connect worker that uses Avro serialization and
# integrates the the Schema Registry. This sample configuration assumes a local installation of
# Confluent Platform with all services running on their default ports.
# Bootstrap Kafka servers. If multiple servers are specified, they should be comma-separated.
bootstrap.servers=broker1:9092,broker2:9092,broker3:9092
# The group ID is a unique identifier for the set of workers that form a single Kafka Connect
# cluster
group.id=connect-cluster-binlog-realtime
# The converters specify the format of data in Kafka and how to translate it into Connect data.
# Every Connect user will need to configure these based on the format they want their data in
# when loaded from or stored into Kafka
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
#key.converter=org.apache.kafka.connect.json.JsonConverter
#value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
# Internal Storage Topics.
#
# Kafka Connect distributed workers store the connector and task configurations, connector offsets,
# and connector statuses in three internal topics. These topics MUST be compacted.
# When the Kafka Connect distributed worker starts, it will check for these topics and attempt to create them
# as compacted topics if they don't yet exist, using the topic name, replication factor, and number of partitions
# as specified in these properties, and other topic-specific settings inherited from your brokers'
# auto-creation settings. If you need more control over these other topic-specific settings, you may want to
# manually create these topics before starting Kafka Connect distributed workers.
#
# The following properties set the names of these three internal topics for storing configs, offsets, and status.
config.storage.topic=realtime-connect-configs
offset.storage.topic=realtime-connect-offsets
status.storage.topic=realtime-connect-statuses
# The following properties set the replication factor for the three internal topics, defaulting to 3 for each
# and therefore requiring a minimum of 3 brokers in the cluster. Since we want the examples to run with
# only a single broker, we set the replication factor here to just 1. That's okay for the examples, but
# ALWAYS use a replication factor of AT LEAST 3 for production environments to reduce the risk of
# losing connector offsets, configurations, and status.
config.storage.replication.factor=3
offset.storage.replication.factor=3
status.storage.replication.factor=3
# The config storage topic must have a single partition, and this cannot be changed via properties.
# Offsets for all connectors and tasks are written quite frequently and therefore the offset topic
# should be highly partitioned; by default it is created with 25 partitions, but adjust accordingly
# with the number of connector tasks deployed to a distributed worker cluster. Kafka Connect records
# the status less frequently, and so by default the topic is created with 5 partitions.
#offset.storage.partitions=25
#status.storage.partitions=5
# The offsets, status, and configurations are written to the topics using converters specified through
# the following required properties. Most users will always want to use the JSON converter without schemas.
# Offset and config data is never visible outside of Connect in this format.
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
# Confluent Control Center Integration -- uncomment these lines to enable Kafka client interceptors
# that will report audit data that can be displayed and analyzed in Confluent Control Center
# producer.interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor
# consumer.interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor
# These are provided to inform the user about the presence of the REST host and port configs
# Hostname & Port for the REST API to listen on. If this is set, it will bind to the interface used to listen to requests.
#rest.host.name=0.0.0.0
rest.port=8083
# The Hostname & Port that will be given out to other workers to connect to i.e. URLs that are routable from other servers.
#rest.advertised.host.name=0.0.0.0
rest.advertised.port=8083
# Set to a list of filesystem paths separated by commas (,) to enable class loading isolation for plugins
# (connectors, converters, transformations). The list should consist of top level directories that include
# any combination of:
# a) directories immediately containing jars with plugins and their dependencies
# b) uber-jars with plugins and their dependencies
# c) directories immediately containing the package directory structure of classes of plugins and their dependencies
# Examples:
# plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors,
# Replace the relative path below with an absolute path if you are planning to start Kafka Connect from within a
# directory other than the home directory of Confluent Platform.
plugin.path=/opt/confluent-5.2.2/share/java,/opt/confluent-hup/share/confluent-hub-components
Debezium 采集任務配置文件Demo
{
"name" : "test-mysql-binlog-source",
"config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "database.hostname": "test_db_ip", "database.port": "3306", "database.user": "read", "database.password": "read", "database.server.id": "202003013306", "database.server.name": "test_server", "database.whitelist": "test_db", "table.whitelist": "test_db.test_table,test_db.test_table.(.*)", "event.deserialization.failure.handling.mode": "warn", "transforms":"unwrap,insertuuid", "transforms.unwrap.type":"io.debezium.transforms.ExtractNewRecordState", "transforms.unwrap.operation.header":"true", "transforms.unwrap.drop.tombstones":"false", "transforms.unwrap.delete.handling.mode":"rewrite", "transforms.unwrap.operation.header":true, "transforms.unwrap.add.source.fields":"table,version,connector,name,ts_ms,db,server_id,file,pos,row", "transforms.insertuuid.type":"com.github.cjmatta.kafka.connect.smt.InsertUuid$Value", "transforms.insertuuid.uuid.field.name":"uuid", "database.history.kafka.bootstrap.servers": "kafka1:9092,kafka2:9092,kafka3:9092", "database.history.kafka.topic": "dbhistory.test_server", "include.schema.changes": "true", "snapshot.locking.mode": "none", "decimal.handling.mode": "double", "include.query":"false", "tombstones.on.delete":"false", "snapshot.mode":"schema_only", "database.serverTimezone":"UTC", "database.history.skip.unparseable.ddl":"true" } }
Confluent注意事項
1.如果使用Avro格式進行數據傳輸的話,需要配置schema-register服務才可以使用,schema-register 默認使用8081端口。
可以參考一下官方文檔查看schema-register服務啟動以及http restful api。
https://docs.confluent.io/current/schema-registry/using.html
2.confluent group id 此設置如果在多個集群使用一個kafka集群的情況下,即使group id不同的情況下仍會被認為是同一confluent集群,解決辦法如下:
config.storage.topic=realtime-connect-configs
offset.storage.topic=realtime-connect-offsets
status.storage.topic=realtime-connect-statuses
修改此配置,保證與之前的confluent的集群做出區分,這樣可以配置多個confluent集群實例使用同一kafka集群。