FlinkCDC - mysql


1. 背景

FlinkSQL双流关联时,mysql流未能捕捉到Mysql表更新,所以尝试使用FlinkCDC解决此问题。

2022-3-31 21:31:39 语法使用不当导致的未能捕捉,修改后可以。但是FlinkCDC研究下也没坏处。

2. 代码

2.1 使用

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DebeziumDeserializationSchema schema = new StringDebeziumDeserializationSchema();
        DebeziumSourceFunction function = MySQLSource.builder()
                .hostname("101.71.102.255")
                .port(8081)
                .databaseList("kafka")
                .username("kafka")
                .password("Bonc@123")
                .deserializer(schema)
                .build();

        env.addSource(function).print().setParallelism(1);

        env.execute();

3. 问题及解决办法

3.1 无权限

Caused by: org.apache.kafka.connect.errors.ConnectException: Access denied; you need (at least one of) the RELOAD privilege(s) for this operation Error code: 1227; SQLSTATE: 42000.
	at io.debezium.connector.mysql.AbstractReader.wrap(AbstractReader.java:241)
	at io.debezium.connector.mysql.AbstractReader.failed(AbstractReader.java:218)
	at io.debezium.connector.mysql.SnapshotReader.execute(SnapshotReader.java:857)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.sql.SQLSyntaxErrorException: Access denied; you need (at least one of) the RELOAD privilege(s) for this operation
	at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:120)

mysql授权

grant reload on kafka.* to kafka@'%';
flush privilleges;


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM