Canal+Kafka實現MySql與Redis數據一致性


  在生產環境中,經常會遇到MySql與Redis數據不一致的問題。那么如何能夠保證MySql與Redis數據一致性的問題呢?話不多說,咱們直接上解決方案。

  如果對Canal還不太了解的可以先去看一下官方文檔:https://github.com/alibaba/canal

  首先,咱們得先開啟MySql的允許基於BinLog文件主從復制。因為Canal的核心原理也是相當於把自己當成MySql的一個從節點,然后去訂閱主節點的BinLog日志。

  開啟BinLog文件配置

 

  1. 配置MySQL  my.ini/my.cnf  開啟允許基於binlog文件主從同步

 

 

log-bin=mysql-bin #添加這一行就ok
binlog-format=ROW #選擇row模式 
server_id=1 #配置mysql replaction需要定義,不能和canal的slaveId重復

 

  配置該文件后,重啟mysql服務器即可

  show variables like 'log_bin';//查詢MySql是否開啟了log_bin.沒有開啟log_bin的值是OFF,開啟之后是ON

 

  2. 添加cannl的賬號或者直接使用自己的root賬號。添加完后一定要檢查mysql  user 權限為y(SELECT* from `user` where user='canal')

 

 

drop user 'canal'@'%';
CREATE USER 'canal'@'%' IDENTIFIED BY 'canal';
grant all privileges on *.* to 'canal'@'%' identified by 'canal'; 
flush privileges;

 

  整合Kafka

  1. 由於Kafka依賴Zookeeper,先安裝zookeeper

 

   zoo_sample.cfg  修改 zoo.cfg

 

   修改 zoo.cfg 中 dataDir=E:\zkkafka\zookeeper-3.4.14\data

   新增環境變量:

   ZOOKEEPER_HOME: E:\zkkafka\zookeeper-3.4.14  (zookeeper目錄)

   Path: 在現有的值后面添加 ";%ZOOKEEPER_HOME%\bin;"

 

   運行zk  zkServer.cmd。

   針對閃退,可按照以下步驟進行解決(參考:https://blog.csdn.net/pangdongh/article/details/90208230):

   1 、編輯zkServer.cmd文件末尾添加pause 。這樣運行出錯就不會退出,會提示錯誤信息,方便找到原因。

   2.如果報錯內容為:-Dzookeeper.log.dir=xxx"' 不是內部或外部命令,也不是可運行的程序 或批處理文件的解決。則建議修改zkServer.cmd文件:

@echo off
REM Licensed to the Apache Software Foundation (ASF) under one or more
REM contributor license agreements.  See the NOTICE file distributed with
REM this work for additional information regarding copyright ownership.
REM The ASF licenses this file to You under the Apache License, Version 2.0
REM (the "License"); you may not use this file except in compliance with
REM the License.  You may obtain a copy of the License at
REM
REM     http://www.apache.org/licenses/LICENSE-2.0
REM
REM Unless required by applicable law or agreed to in writing, software
REM distributed under the License is distributed on an "AS IS" BASIS,
REM WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
REM See the License for the specific language governing permissions and
REM limitations under the License.

setlocal
call "%~dp0zkEnv.cmd"

set ZOOMAIN=org.apache.zookeeper.server.quorum.QuorumPeerMain
echo on
java "-Dzookeeper.log.dir=%ZOO_LOG_DIR%" "-Dzookeeper.root.logger=%ZOO_LOG4J_PROP%" -cp "%CLASSPATH%" %ZOOMAIN% "%ZOOCFG%" %*

endlocal
pause

 

  2. 安裝kafka

  解壓 kafka_2.13-2.4.0 改名 kafka

  修改 server.properties中配置 

  log.dirs=E:\zkkafka\kafka\logs

  啟動Kafka:

  Cmd  進入到該目錄

  cd   E:\zkkafka\kafka

  .\bin\windows\kafka-server-start.bat .\config\server.properties

  如果啟動報系統找不到指定的路徑,進入kafka目錄kafka\bin\windows\kafka-run-class.bat,將set JAVA="%JAVA_HOME%/bin/java"改為java環境安裝的絕對路徑

  例如:set JAVA="D:\LI\JDK\jdk1.8.0_152\bin\java"

Canal配置更改

  1.修改 example/instance.properties

  canal.mq.topic=maikt-topic

  2.修改 canal.properties

  # tcp, kafka, RocketMQ

  canal.serverMode = kafka

  canal.mq.servers = 127.0.0.1:9092

  3.啟動startup.bat  查看 \logs\example example.log日志文件是否有 start successful....

SpringBoot項目整合kafka
  maven依賴
      <!-- springBoot集成kafka -->
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>
        <!-- SpringBoot整合Web組件 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.62</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>

   application.yml

# kafka
spring:
  kafka:
    # kafka服務器地址(可以多個)
    bootstrap-servers: 127.0.0.1:9092
    consumer:
      # 指定一個默認的組名
      group-id: kafka2
      # earliest:當各分區下有已提交的offset時,從提交的offset開始消費;無提交的offset時,從頭開始消費
      # latest:當各分區下有已提交的offset時,從提交的offset開始消費;無提交的offset時,消費新產生的該分區下的數據
      # none:topic各分區都存在已提交的offset時,從offset后開始消費;只要有一個分區不存在已提交的offset,則拋出異常
      auto-offset-reset: earliest
      # key/value的反序列化
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    producer:
      # key/value的序列化
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      # 批量抓取
      batch-size: 65536
      # 緩存容量
      buffer-memory: 524288
  redis:
    host: 1.1.1.1
#    password:
    port: 6379
    database: 10
    password: 123456

  Redis工具類

@Component
public class RedisUtils {

    /**
     * 獲取我們的redis模版
     */
    @Autowired
    private StringRedisTemplate stringRedisTemplate;


    public void setString(String key, String value) {
        setString(key, value, null);
    }

    public void setString(String key, String value, Long timeOut) {
        stringRedisTemplate.opsForValue().set(key, value);
        if (timeOut != null) {
            stringRedisTemplate.expire(key, timeOut, TimeUnit.SECONDS);
        }
    }


    public String getString(String key) {
        return stringRedisTemplate.opsForValue().get(key);
    }

    /**
     * redis當成數據庫中
     * <p>
     * 注意事項:對我們的redis的key設置一個有效期
     */

    public boolean deleteKey(String key) {
        return stringRedisTemplate.delete(key);
    }


}

  Kafka主題監聽方法(往redis同步數據的代碼可以根據自己的需求去完善,本代碼只是做測試用)

 @KafkaListener(topics = "maikt-topic")
    public void receive(ConsumerRecord<?, ?> consumer) {
        System.out.println("topic名稱:" + consumer.topic() + ",key:" +
                consumer.key() + "," +
                "分區位置:" + consumer.partition()
                + ", 下標" + consumer.offset() + "," + consumer.value());

        String json = (String) consumer.value();
        JSONObject jsonObject = JSONObject.parseObject(json);
        String sqlType = jsonObject.getString("type");
        JSONArray data = jsonObject.getJSONArray("data");
        if(data!=null)
        {
            JSONObject userObject = data.getJSONObject(0);
            String id = userObject.getString("id");
            String database = jsonObject.getString("database");
            String table = jsonObject.getString("table");
            String key = database + "_" + table + "_" + id;
            if ("UPDATE".equals(sqlType) || "INSERT".equals(sqlType)) {
                redisUtils.setString(key, userObject.toJSONString());
                return;
            }
            if ("DELETE".equals(sqlType)) {
                redisUtils.deleteKey(key);
            }
        }



    }

 

第一次寫文章,如果有不足的地方,歡迎各位大佬指正。  

來源於:螞蟻課堂

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM