canal實時同步mysql數據到redis或ElasticSearch


 

一、Canal架包下載上傳

(一)下載

官網架包地址為:https://github.com/alibaba/canal/releases/tag/canal-1.1.5-alpha-2

本人百度雲盤下載地址:

鏈接:https://pan.baidu.com/s/1MM5YGubaTW3Y2hy1tvBmPw

提取碼:jiur

(二)上傳解壓

創建canal文件夾

 

cd /usr/local
mkdir canal

 

  將下載好的canal上傳至Linux服務器 /usr/local/canal目錄下進行解壓。

 

 tar -zxvf canal.deployer-1.1.5-SNAPSHOT.tar.gz

 

二、配置MySQL文件 

(一)修改MySQL my.cnf配置文件

1.查找MySQLLinux環境中的my.cnf

 

mysql --help|grep 'my.cnf'

 

 如圖:

2.修改my.cnf

vi /etc/my.cnf
log-bin=mysql-bin #添加這一行就ok

binlog-format=ROW #選擇row模式

server_id=1 #配置mysql replaction需要定義,不能和canal的slaveId重復

(二)重啟 MySQL

查看MySQL啟動狀態

service mysqld status(5.0版本是mysqld)
service mysql status(5.5.7版本是mysql)

重啟MySQL

service mysqld restart 
service mysql restart (5.5.7版本命令)

(三)查看MySQL binlog文件是否開啟

1.Linux中登錄MySQL

 

mysql -u 用戶名 -p
如:mysql -u root -p
輸入密碼對應的賬號密碼

 

  2.查看binlog文件是否開啟

 

show variables like 'log_%';

 

效果如下:

 

三、創建canal賬號

(一)設置canal賬號並賦權

 

drop user 'canal'@'%';
CREATE USER 'canal'@'%' IDENTIFIED BY 'canal';  -- 創建canal用戶
grant all privileges on *.* to 'canal'@'%' identified by 'canal';  -- 為canal用戶賦予repication權限
flush privileges;

 

 如果出現以下情況,說明MySQL設置了密碼難度,需要修改MySQL設置,如果沒有出現則調過以下步驟

1.查看MySQL的策略

 

SHOW VARIABLES LIKE 'validate_password%'; 

 

2.設置MySQL密碼驗證強調的策略 

set global validate_password_policy=LOW;

3.設置MySQL密碼最低長度

 

 set global validate_password_length=5;

 

4.重新設置一下賬號即可

drop user 'canal'@'%';
CREATE USER 'canal'@'%' IDENTIFIED BY 'canal';  -- 創建canal用戶
grant all privileges on *.* to 'canal'@'%' identified by 'canal';  -- 為canal用戶賦予repication權限
flush privileges;

如圖

 

四、構建CanalService

(一)修改instance.properties配置

 

vi /usr/local/canal/conf/example/instance.properties
canal.instance.master.address=127.0.0.1:3306
canal.instance.dbUsername=canal#此處是我們為mysql配置的canal用戶
canal.instance.dbPassword=canal#此處是我們為mysql配置的canal用戶的密碼
canal.mq.topic=mysql-yjlcplatform-topic #mq消息主題

(二)修改canal.properties

 vi /usr/local/canal/conf/canal.properties 
canal.serverMode = kafka
kafka.bootstrap.servers = 192.168.200.7:9092

(三)啟動canal

cd /usr/local/canal/bin
./ startup.sh

  查看canal啟動日志

 

cat /usr/local/canal/logs/example/example.log 

 

以下效果需要等一會兒

注意:

在啟動canal的時候可能會報kafka連接超時,則重新啟動kafka即可。

(四)驗證MySQLkafka是否關聯成功

下載ZooInspector工具進行驗證:

鏈接:https://pan.baidu.com/s/1SbiszPvYVfbmdDQRsdLAqg

提取碼:cyb8

 

 

五、后端項目代碼

(一)Pom配置文件

 

<dependencies>
    <!-- springBoot集成kafka -->
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
    </dependency>

</dependencies>

(二)bootstrap.yml配置文件

# kafka
spring:
  kafka:
    # kafka服務器地址(可以多個)
    bootstrap-servers: 192.168.200.7:9092
    consumer:
      # 指定一個默認的組名
      group-id: kafka2
      # earliest:當各分區下有已提交的offset時,從提交的offset開始消費;無提交的offset時,從頭開始消費
      # latest:當各分區下有已提交的offset時,從提交的offset開始消費;無提交的offset時,消費新產生的該分區下的數據
      # none:topic各分區都存在已提交的offset時,從offset后開始消費;只要有一個分區不存在已提交的offset,則拋出異常
      auto-offset-reset: earliest
      # key/value的反序列化
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    producer:
      # key/value的序列化
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      # 批量抓取
      batch-size: 65536
      # 緩存容量
      buffer-memory: 524288

(三)Java文件

1.啟動類

/**
 * --------------------------------------------------------------
 * FileName: AppMemberCanalClient.java
 *
 * @Description:消費端
 * @author: cyb
 * @CreateDate: 2020-09-07
 * --------------------------------------------------------------
 */
@SpringBootApplication
public class AppMemberCanalClient {
    public static void main(String[] args) {
        SpringApplication.run(AppMemberCanalClient.class);
    }
}

2.后端監聽類

package com.yjlc.kafka.client;

import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Component
public class MembetKafkaConsumer {

    @KafkaListener(topics = "mysql-yjlcplatform-topic")
    public void receive(ConsumerRecord<?, ?> consumer) {
        System.out.println("topic名稱:" + consumer.topic() + ",key:" +
                consumer.key() + "," +
                "分區位置:" + consumer.partition()
                + ", 下標" + consumer.offset() + "," + consumer.value());
        String json = (String) consumer.value();
        JSONObject jsonObject = JSONObject.parseObject(json);
        String type = jsonObject.getString("type");

        String pkNames = jsonObject.getJSONArray("pkNames").getString(0);
        JSONArray data = jsonObject.getJSONArray("data");

        String table = jsonObject.getString("table");
        String database = jsonObject.getString("database");
        for (int i = 0; i < data.size(); i++) {
            JSONObject dataObject = data.getJSONObject(i);
            String key = database + ":" + table + ":"+dataObject.getString(pkNames);
            switch (type) {
                case "UPDATE":
                case "INSERT":
                    break;
                case "DELETE":
                    break;
            }
        }

    }
}

  以上功能親測有效。如對以上內容有疑問的可以留言討論,轉載請說明出處,本人博客地址為:https://www.cnblogs.com/chenyuanbo/

 技術在於溝通交流!

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM