Mysql數據實時同步


 

 

企業運維的數據庫最常見的是 mysql;但是 mysql 有個缺陷:當數據量達到千萬條的時候,mysql 的相關操作會變的非常遲緩; 如果這個時候有需求需要實時展示數據;對於 mysql 來說是一種災難;而且對於 mysql 來說,同一時間還要給多個開發人員和用戶操作; 所以經過調研,將 mysql 數據實時同步到 hbase 中;
最開始使用的架構方案:

Mysql---logstash—kafka---sparkStreaming---hbase---web

Mysql—sqoop---hbase---web
但是無論使用 logsatsh 還是使用 kafka,都避免不了一個尷尬的問題: 他們在導數據過程中需要去 mysql 中做查詢操作:

比如 logstash:

比如 sqoop: 

不可避免的,都需要去 sql 中查詢出相關數據,然后才能進行同步;這樣對於 mysql 來說本身就是增加負荷操作; 所以我們真正需要考慮的問題是:有沒有什么方法,能將 mysql 數據實時同步到 hbase;但是不增加 mysql 的負擔; 答案是有的:可以使用 canal 或者 maxwell 來解析 mysql 的 binlog 日志
那么之前的架構就需要改動了:

Mysql---canal—kafka—flink—hbase—web 

第一步:開啟 mysql 的 binlog 日志 

Mysql 的 binlog 日志作用是用來記錄 mysql 內部增刪等對 mysql 數據庫有更新的內容的 記錄(對數據庫的改動),對數據庫的查詢 select 或 show 等不會被 binlog 日志記錄;主 要用於數據庫的主從復制以及增量恢復。
mysql 的 binlog 日志必須打開 log-bin 功能才能生存 binlog 日志
-rw-rw---- 1 mysql mysql 669 5 月 10 21:29 mysql-bin.000001
-rw-rw---- 1 mysql mysql 126 5 月 10 22:06 mysql-bin.000002
-rw-rw---- 1 mysql mysql 11799 5 月 15 18:17 mysql-bin.000003

(1):修改/etc/my.cnf,在里面添加如下內容 

log-bin=/var/lib/mysql/mysql-bin 【binlog 日志存放路徑】 
binlog-format=ROW 【⽇日志中會記錄成每⼀一⾏行行數據被修改的形式】
server_id=1 【指定當前機器的服務 ID(如果是集群,不能重復)】

(2):配置完畢之后,登錄 mysql,輸入如下命令: 

show variables like ‘%log_bin%’ 

出現如下形式,代表 binlog 開啟; 

第二步:安裝 canal 

Canal 介紹 canal 是阿里巴巴旗下的一款開源項目,純 Java 開發。基於數據庫增量日志解析,提供增量數據訂閱&消費,目前主要支持了 MySQL(也支持 mariaDB)。
起源:早期,阿里巴巴 B2B 公司因為存在杭州和美國雙機房部署,存在跨機房同步的業務需求。不過早期的數據庫同步業務,主要是基於 trigger 的方式獲取增量變更,不過從 2010 年開始,阿里系公司開始逐步的嘗試基於數據庫的日志解析,獲取增量變更進行同步,由此衍生出了增量訂閱 &消費的業務,從此開啟了一段新紀元。

原理相對比較簡單:
1、canal 模擬 mysql slave 的交互協議,偽裝自己為 mysql slave,向 mysql master 發送 dump 協議
2、mysql master 收到 dump 請求,開始推送 binary log 給 slave(也就是 canal) 3、canal 解析 binary log 對象(原始為 byte 流)

使用 canal 解析 binlog,數據落地到 kafka
(1):解壓安裝包:canal.deployer-1.0.23.tar.gz
tar -zxvf canal.deployer-1.0.23.tar.gz -C /export/servers/canal 修改配置文件:
vim /export/servers/canal/conf/example/instance.properties

(2):編寫 canal 代碼 

僅僅安裝了 canal 是不夠的;canal 從架構的意義上來說相當於 mysql 的“從庫”,此時還並不能將 binlog 解析出來實時轉發到 kafka 上,因此需 要進一步開發 canal 代碼;
Canal 已經幫我們提供了示例代碼,只需要根據需求稍微更改即可;
Canal 提供的代碼:

上面的代碼中可以解析出 binlog 日志,但是沒有將數據落地到 kafka 的代碼邏輯,所以我們還需要添加將數據落地 kafka 的代碼; Maven 導入依賴:

 <groupId>com.alibaba.otter</groupId>
  
<artifactId>canal.client</artifactId>
  <version>1.0.23</version>
  </dependency>
 
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka -->
<dependency>
 <groupId>org.apache.kafka</groupId>
  <artifactId>kafka_2.11</artifactId>
  <version>0.9.0.1</version>
  </dependency>
 

測試 canal 代碼 

1、 啟動 kafka 並創建 topic
/export/servers/kafka/bin/kafka-server-start.sh /export/servers/kafka/config/server.properties >/dev/null 2>&1 & /export/servers/kafka/bin/kafka-topics.sh --create --zookeeper hadoop01:2181 --replication-factor 1 --partitions 1 --topic mycanal
2、 啟動 mysql 的消費者客戶端,觀察 canal 是否解析 binlog
/export/servers/kafka/bin/kafka-console-consumer.sh --zookeeper hadoop01:2181 --from-beginning --topic mycanal 2、啟動 mysql:service mysqld start
3、啟動 canal:canal/bin/startup.sh
4、進入 mysql:mysql -u 用戶 -p 密碼;然后進行增刪改

使用 flink 將 kafka 中的數據解析成 Hbase 的 DML 操作 

<properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <zookeeper.version>3.4.5</zookeeper.version>
    <scala.version>2.11.5</scala.version>
    <hadoop.version>2.6.1</hadoop.version>
    <flink.version>1.5.0</flink.version>
</properties>

<dependency>
    <groupId>org.scala-lang</groupId>
    <artifactId>scala-library</artifactId>
    <version>${scala.version}</version>
</dependency>

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-scala_2.11</artifactId>
    <version>${flink.version}</version>
</dependency>

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-scala_2.11</artifactId>
    <version>${flink.version}</version>
</dependency>

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-clients_2.11</artifactId>
    <version>${flink.version}</version>
</dependency>

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table_2.11</artifactId>
    <version>${flink.version}</version>
</dependency>

<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-client</artifactId>
    <version>${hadoop.version}</version>
    <exclusions>
        <exclusion>
            <groupId>com.google.protobuf</groupId>
            <artifactId>protobuf-java</artifactId>
        </exclusion>
    </exclusions>
</dependency>

<dependency>
    <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-kafka-0.9_2.11</artifactId>
    <version>${flink.version}</version>
</dependency>


<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-hbase_2.11</artifactId>
    <version>${flink.version}</version>
</dependency>
View Code

代碼:

import java.util
import java.util.Properties
import org.apache.commons.lang3.StringUtils
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09
import org.apache.flink.streaming.util.serialization.SimpleStringSchema
import org.apache.flink.api.scala._
import org.apache.flink.runtime.state.filesystem.FsStateBackend
import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic}
import org.apache.hadoop.hbase.{HBaseConfiguration, HColumnDescriptor, HTableDescriptor, TableName}
import org.apache.hadoop.hbase.client.{ConnectionFactory, Delete, Put}
import org.apache.hadoop.hbase.util.Bytes


/**
  * Created by angel;
  */
//[uname, spark, true], [upassword, 11122221, true]
case class UpdateFields(key:String , value:String)


//(fileName , fileOffset , dbName , tableName ,eventType, columns  , rowNum)
case class Canal(fileName:String ,
                 fileOffset:String,
                 dbName:String ,
                 tableName:String ,
                 eventType:String ,
                 columns:String ,
                 rowNum:String
                )
object DataExtraction {
  //1指定相關信息
  val zkCluster = "hadoop01,hadoop02,hadoop03"
  val kafkaCluster = "hadoop01:9092,hadoop02:9092,hadoop03:9092"
  val kafkaTopicName = "canal"
  val hbasePort = "2181"
  val tableName:TableName = TableName.valueOf("canal")
  val columnFamily = "info"


  def main(args: Array[String]): Unit = {
    //2.創建流處理環境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStateBackend(new FsStateBackend("hdfs://hadoop01:9000/flink-checkpoint/checkpoint/"))
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    env.getConfig.setAutoWatermarkInterval(2000)//定期發送
    env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
    env.getCheckpointConfig.setCheckpointInterval(6000)
    System.setProperty("hadoop.home.dir", "/");
    //3.創建kafka數據流
    val properties = new Properties()
    properties.setProperty("bootstrap.servers", kafkaCluster)
    properties.setProperty("zookeeper.connect", zkCluster)
    properties.setProperty("group.id", kafkaTopicName)
    val kafka09 = new FlinkKafkaConsumer09[String](kafkaTopicName, new SimpleStringSchema(), properties)
    //4.添加數據源addSource(kafka09)
    val text = env.addSource(kafka09).setParallelism(1)
    //5、解析kafka數據流,封裝成canal對象
    val values = text.map{
      line =>
        val values = line.split("#CS#")
        val valuesLength = values.length
        //
        val fileName = if(valuesLength > 0) values(0) else ""
        val fileOffset = if(valuesLength > 1) values(1) else ""
        val dbName = if(valuesLength > 2) values(2) else ""
        val tableName = if(valuesLength > 3) values(3) else ""
        val eventType = if(valuesLength > 4) values(4) else ""
        val columns = if(valuesLength > 5) values(5) else ""
        val rowNum = if(valuesLength > 6) values(6) else ""
        //(mysql-bin.000001,7470,test,users,[uid, 18, true, uname, spark, true, upassword, 1111, true],null,1)
        Canal(fileName , fileOffset , dbName , tableName ,eventType, columns  , rowNum)
    }


    //6、將數據落地到Hbase
    val list_columns_ = values.map{
      line =>
        //處理columns字符串
        val strColumns = line.columns
        println(s"strColumns --------> ${strColumns}")
        //[[uid, 22, true], [uname, spark, true], [upassword, 1111, true]]
        val array_columns = packaging_str_list(strColumns)
        //獲取主鍵
        val primaryKey = getPrimaryKey(array_columns)
        //拼接rowkey  DB+tableName+primaryKey
        val rowkey = line.dbName+"_"+line.tableName+"_"+primaryKey
        //獲取操作類型INSERT UPDATE DELETE
        val eventType = line.eventType
        //獲取觸發的列:inser update

        val triggerFileds: util.ArrayList[UpdateFields] = getTriggerColumns(array_columns , eventType)
//        //因為不同表直接有關聯,肯定是有重合的列,所以hbase表=line.dbName + line.tableName
//        val hbase_table = line.dbName + line.tableName
        //根據rowkey刪除數據
        if(eventType.equals("DELETE")){
          operatorDeleteHbase(rowkey , eventType)
        }else{
          if(triggerFileds.size() > 0){
            operatorHbase(rowkey , eventType , triggerFileds)
          }

        }
    }
    env.execute()

  }



  //封裝字符串列表
  def packaging_str_list(str_list:String):String ={
    val substring = str_list.substring(1 , str_list.length-1)

    substring
  }


  //獲取每個表的主鍵
  def getPrimaryKey(columns :String):String = {
    //  [uid, 1, false], [uname, abc, false], [upassword, uabc, false]
     val arrays: Array[String] = StringUtils.substringsBetween(columns , "[" , "]")
    val primaryStr: String = arrays(0)//uid, 13, true
    primaryStr.split(",")(1).trim
  }

  //獲取觸發更改的列
  def getTriggerColumns(columns :String , eventType:String): util.ArrayList[UpdateFields] ={
    val arrays: Array[String] = StringUtils.substringsBetween(columns , "[" , "]")
    val list = new util.ArrayList[UpdateFields]()
    eventType match {
      case "UPDATE" =>
        for(index <- 1 to arrays.length-1){
          val split: Array[String] = arrays(index).split(",")
          if(split(2).trim.toBoolean == true){
            list.add(UpdateFields(split(0) , split(1)))
          }
        }
        list
      case "INSERT" =>
        for(index <- 1 to arrays.length-1){
          val split: Array[String] = arrays(index).split(",")
          list.add(UpdateFields(split(0) , split(1)))
        }
        list
      case _ =>
        list

    }
  }
  //增改操作
  def operatorHbase(rowkey:String , eventType:String , triggerFileds:util.ArrayList[UpdateFields]): Unit ={
    val config = HBaseConfiguration.create();
    config.set("hbase.zookeeper.quorum", zkCluster);
    config.set("hbase.master", "hadoop01:60000");
    config.set("hbase.zookeeper.property.clientPort", hbasePort);
    config.setInt("hbase.rpc.timeout", 20000);
    config.setInt("hbase.client.operation.timeout", 30000);
    config.setInt("hbase.client.scanner.timeout.period", 200000);
    val connect = ConnectionFactory.createConnection(config);
    val admin = connect.getAdmin
    //構造表描述器
    val hTableDescriptor = new HTableDescriptor(tableName)
    //構造列族描述器
    val hColumnDescriptor = new HColumnDescriptor(columnFamily)
    hTableDescriptor.addFamily(hColumnDescriptor)
    if(!admin.tableExists(tableName)){
      admin.createTable(hTableDescriptor);
    }
    //如果表存在,則開始插入數據
    val table = connect.getTable(tableName)
    val put = new Put(Bytes.toBytes(rowkey))
    //獲取對應的列[UpdateFields(uname, spark), UpdateFields(upassword, 1111)]
    for(index <- 0 to triggerFileds.size()-1){
      val fields = triggerFileds.get(index)
      val key = fields.key
      val value = fields.value
      put.addColumn(Bytes.toBytes(columnFamily) , Bytes.toBytes(key) , Bytes.toBytes(value))
    }
    table.put(put)
  }
  //刪除操作
  def operatorDeleteHbase(rowkey:String , eventType:String): Unit ={
    val config = HBaseConfiguration.create();
    config.set("hbase.zookeeper.quorum", zkCluster);
    config.set("hbase.zookeeper.property.clientPort", hbasePort);
    config.setInt("hbase.rpc.timeout", 20000);
    config.setInt("hbase.client.operation.timeout", 30000);
    config.setInt("hbase.client.scanner.timeout.period", 200000);
    val connect = ConnectionFactory.createConnection(config);
    val admin = connect.getAdmin
    //構造表描述器
    val hTableDescriptor = new HTableDescriptor(tableName)
    //構造列族描述器
    val hColumnDescriptor = new HColumnDescriptor(columnFamily)
    hTableDescriptor.addFamily(hColumnDescriptor)
    if(admin.tableExists(tableName)){
      val table = connect.getTable(tableName)
      val delete = new Delete(Bytes.toBytes(rowkey))
      table.delete(delete)
    }
  }


}
View Code

打包scala程序

將上述的maven依賴紅色標記處修改成:

<**sourceDirectory**>**src/main/scala**</**sourceDirectory**>

<**mainClass**>scala的驅動類</**mainClass**>

運行canal代碼

java -jar canal.jar -Xms100m -Xmx100m
View Code

運行flink代碼

/opt/cdh/flink-1.5.0/bin/flink run -m yarn-cluster -yn 2  -p 1 /home/elasticsearch/flinkjar/SynDB-1.0-SNAPSHOT.jar

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM