2020-03-01
组件版本
Confluent Platform:5.2.2 : https://www.confluent.io/
Debezium:1.0.0 : https://debezium.io/
Kafka:2.3.0
confluent 下载安装包需要注册一下账号,Platform版本大部分组件是开源免费的,但是部分组件是免费不开源的。
搭建过程
1.首先下载各个组件。
2.kafka集群先建设好。
3.解压组件
confluent 解压命令:tar -zxvf confluent-5.2.2-2.12.tar.gz -C 安装路径
Debezium 解压命令:tar -zxvf debezium-connector-mysql-1.0.0.Final-plugin.tar.gz -C /confluent 插件路径
confluent的默认插件地址为 /安装路径/confluent-5.2.2/share/java。
4.Confluent,修改配置文件。
confluent的启动模式有两种模式,standalone和cluster模式,本人使用cluster模式进行部署。
配置文件路径为:/安装路径/confluent-5.2.2/etc/schema-registry/connect-avro-distributed.properties
需要注意以下几个配置项:
bootstrap.servers : kafka集群broker地址
group.id : 给定confluent一个id名称(但是实际验证,此id无法作为同一集群的标识,写上就可以了)
key.converter:此为kafka 官方配置相同,选定key的传输格式,本人搭建使用avro格式传输。
value.converter:此为kafka 官方配置相同,选定value的传输格式,本人搭建使用avro格式传输。
plugin.path : 此为增加的插件的路径(有时候会出现写入相对路径无法加载的问题,本人直接改为绝对路径)
5.启动confluent
/安装路径/confluent-5.2.2/bin/connect-distributed /安装路径/confluent-5.2.2/etc/schema-registry/connect-avro-distributed.properties
此配置为本人 connect-avro-distributed.properties 配置文件,可以作为配置参考。
# Sample configuration for a distributed Kafka Connect worker that uses Avro serialization and
# integrates the the Schema Registry. This sample configuration assumes a local installation of
# Confluent Platform with all services running on their default ports.
# Bootstrap Kafka servers. If multiple servers are specified, they should be comma-separated.
bootstrap.servers=broker1:9092,broker2:9092,broker3:9092
# The group ID is a unique identifier for the set of workers that form a single Kafka Connect
# cluster
group.id=connect-cluster-binlog-realtime
# The converters specify the format of data in Kafka and how to translate it into Connect data.
# Every Connect user will need to configure these based on the format they want their data in
# when loaded from or stored into Kafka
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
#key.converter=org.apache.kafka.connect.json.JsonConverter
#value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
# Internal Storage Topics.
#
# Kafka Connect distributed workers store the connector and task configurations, connector offsets,
# and connector statuses in three internal topics. These topics MUST be compacted.
# When the Kafka Connect distributed worker starts, it will check for these topics and attempt to create them
# as compacted topics if they don't yet exist, using the topic name, replication factor, and number of partitions
# as specified in these properties, and other topic-specific settings inherited from your brokers'
# auto-creation settings. If you need more control over these other topic-specific settings, you may want to
# manually create these topics before starting Kafka Connect distributed workers.
#
# The following properties set the names of these three internal topics for storing configs, offsets, and status.
config.storage.topic=realtime-connect-configs
offset.storage.topic=realtime-connect-offsets
status.storage.topic=realtime-connect-statuses
# The following properties set the replication factor for the three internal topics, defaulting to 3 for each
# and therefore requiring a minimum of 3 brokers in the cluster. Since we want the examples to run with
# only a single broker, we set the replication factor here to just 1. That's okay for the examples, but
# ALWAYS use a replication factor of AT LEAST 3 for production environments to reduce the risk of
# losing connector offsets, configurations, and status.
config.storage.replication.factor=3
offset.storage.replication.factor=3
status.storage.replication.factor=3
# The config storage topic must have a single partition, and this cannot be changed via properties.
# Offsets for all connectors and tasks are written quite frequently and therefore the offset topic
# should be highly partitioned; by default it is created with 25 partitions, but adjust accordingly
# with the number of connector tasks deployed to a distributed worker cluster. Kafka Connect records
# the status less frequently, and so by default the topic is created with 5 partitions.
#offset.storage.partitions=25
#status.storage.partitions=5
# The offsets, status, and configurations are written to the topics using converters specified through
# the following required properties. Most users will always want to use the JSON converter without schemas.
# Offset and config data is never visible outside of Connect in this format.
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
# Confluent Control Center Integration -- uncomment these lines to enable Kafka client interceptors
# that will report audit data that can be displayed and analyzed in Confluent Control Center
# producer.interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor
# consumer.interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor
# These are provided to inform the user about the presence of the REST host and port configs
# Hostname & Port for the REST API to listen on. If this is set, it will bind to the interface used to listen to requests.
#rest.host.name=0.0.0.0
rest.port=8083
# The Hostname & Port that will be given out to other workers to connect to i.e. URLs that are routable from other servers.
#rest.advertised.host.name=0.0.0.0
rest.advertised.port=8083
# Set to a list of filesystem paths separated by commas (,) to enable class loading isolation for plugins
# (connectors, converters, transformations). The list should consist of top level directories that include
# any combination of:
# a) directories immediately containing jars with plugins and their dependencies
# b) uber-jars with plugins and their dependencies
# c) directories immediately containing the package directory structure of classes of plugins and their dependencies
# Examples:
# plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors,
# Replace the relative path below with an absolute path if you are planning to start Kafka Connect from within a
# directory other than the home directory of Confluent Platform.
plugin.path=/opt/confluent-5.2.2/share/java,/opt/confluent-hup/share/confluent-hub-components
Debezium 采集任务配置文件Demo
{
"name" : "test-mysql-binlog-source",
"config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "database.hostname": "test_db_ip", "database.port": "3306", "database.user": "read", "database.password": "read", "database.server.id": "202003013306", "database.server.name": "test_server", "database.whitelist": "test_db", "table.whitelist": "test_db.test_table,test_db.test_table.(.*)", "event.deserialization.failure.handling.mode": "warn", "transforms":"unwrap,insertuuid", "transforms.unwrap.type":"io.debezium.transforms.ExtractNewRecordState", "transforms.unwrap.operation.header":"true", "transforms.unwrap.drop.tombstones":"false", "transforms.unwrap.delete.handling.mode":"rewrite", "transforms.unwrap.operation.header":true, "transforms.unwrap.add.source.fields":"table,version,connector,name,ts_ms,db,server_id,file,pos,row", "transforms.insertuuid.type":"com.github.cjmatta.kafka.connect.smt.InsertUuid$Value", "transforms.insertuuid.uuid.field.name":"uuid", "database.history.kafka.bootstrap.servers": "kafka1:9092,kafka2:9092,kafka3:9092", "database.history.kafka.topic": "dbhistory.test_server", "include.schema.changes": "true", "snapshot.locking.mode": "none", "decimal.handling.mode": "double", "include.query":"false", "tombstones.on.delete":"false", "snapshot.mode":"schema_only", "database.serverTimezone":"UTC", "database.history.skip.unparseable.ddl":"true" } }
Confluent注意事项
1.如果使用Avro格式进行数据传输的话,需要配置schema-register服务才可以使用,schema-register 默认使用8081端口。
可以参考一下官方文档查看schema-register服务启动以及http restful api。
https://docs.confluent.io/current/schema-registry/using.html
2.confluent group id 此设置如果在多个集群使用一个kafka集群的情况下,即使group id不同的情况下仍会被认为是同一confluent集群,解决办法如下:
config.storage.topic=realtime-connect-configs
offset.storage.topic=realtime-connect-offsets
status.storage.topic=realtime-connect-statuses
修改此配置,保证与之前的confluent的集群做出区分,这样可以配置多个confluent集群实例使用同一kafka集群。