kafka源碼系列之mysql數據增量同步到kafka


一,架構介紹

生產中由於歷史原因web后端,mysql集群,kafka集群(或者其它消息隊列)會存在一下三種結構。

1,數據先入mysql集群,再入kafka

數據入mysql集群是不可更改的,如何再高效的將數據寫入kafka呢?

A),在表中存在自增ID的字段,然后根據ID,定期掃描表,然后將數據入kafka。

B),有時間字段的,可以按照時間字段定期掃描入kafka集群。

C),直接解析binlog日志,然后解析后的數據寫入kafka。

640?wx_fmt=png

2,web后端同時將數據寫入kafka和mysql集群

640?wx_fmt=png

3,web后端將數據先入kafka,再入mysql集群

這個方式,有很多優點,比如可以用kafka解耦,然后將數據按照離線存儲和計算,實時計算兩個模塊構建很好的大數據架構。抗高峰,便於擴展等等。

640?wx_fmt=png

二,實現步驟

1,mysql安裝准備

安裝mysql估計看這篇文章的人都沒什么問題,所以本文不具體講解了。

A),假如你單機測試請配置好server_id

B),開啟binlog,只需配置log-bin

[root@localhost ~]# cat /etc/my.cnf

[mysqld]

server_id=1

datadir=/var/lib/mysql

socket=/var/lib/mysql/mysql.sock

user=mysql

# Disabling symbolic-links is recommended to prevent assorted security risks

symbolic-links=0

log-bin=/var/lib/mysql/mysql-binlog

[mysqld_safe]

log-error=/var/log/mysqld.log

pid-file=/var/run/mysqld/mysqld.pid

?

創建測試庫和表

create database school character set utf8 collate utf8_general_ci;

?

create table student(

name varchar(20) not null comment '姓名',

sid int(10) not null primary key comment '學員',

majora varchar(50) not null default '' comment '專業',

tel varchar(11) not null unique key comment '手機號',

birthday date not null comment '出生日期'

);

2,binlog日志解析

兩種方式:

一是掃面binlog文件(有需要的話請聯系浪尖)

二是通過復制同步的方式

暫實現了第二種方式,樣例代碼如下:

MysqlBinlogParse mysqlBinlogParse=new MysqlBinlogParse(args[0],Integer.valueOf(args[1]),args[2],args[3]){
@Override
public void processDelete(String queryType, String database, String sql) {
try {
String jsonString=SqlParse.parseDeleteSql(sql);
JSONObject jsonObject=JSONObject.fromObject(jsonString);
jsonObject.accumulate("database", database);
jsonObject.accumulate("queryType", queryType);
System.out.println(sql);
System.out.println(" ");
System.out.println(" ");
System.out.println(jsonObject.toString());
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}

@Override
public void processInsert(String queryType, String database, String sql) {
try {
String jsonString=SqlParse.parseInsertSql(sql);
JSONObject jsonObject=JSONObject.fromObject(jsonString);
jsonObject.accumulate("database", database);
jsonObject.accumulate("queryType", queryType);
System.out.println(sql);
System.out.println(" ");
System.out.println(" ");
System.out.println(jsonObject.toString());
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}

@Override
public void processUpdate(String queryType, String database, String sql) {
String jsonString;
try {
jsonString=SqlParse.parseUpdateSql(sql);
JSONObject jsonObject=JSONObject.fromObject(jsonString);
jsonObject.accumulate("database", database);
jsonObject.accumulate("queryType", queryType);
System.out.println(sql);
System.out.println(" ");
System.out.println(" ");
System.out.println(jsonObject.toString());
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}

};
mysqlBinlogParse.setServerId(3);
mysqlBinlogParse.start();

?

?

3,sql語法解析

從原始的mysql 的binlog event中,我們能解析到的信息,主要的也就是mysql的database,query類型(INSERT,DELETE,UPDATE),具體執行的sql。我這里封裝了三個重要的方法。只暴露了這三個接口,那么我們要明白的事情是,美味的英文我們入kafka,然后流式處理的時候希望的到的是跟插入mysql后一樣格式的數據。這個時候我們就要自己做sql的解析,將query的sql解析成字段形式的數據,供流式處理。解析的格式如下:

A),INSERT

640?wx_fmt=png

B),DELETE

640?wx_fmt=png

C),UPDATE

640?wx_fmt=png

最終浪尖是將解析后的數據封裝成了json,然后我們自己寫kafka producer將消息發送到kafka,后端就可以處理了。

三,總結

最后,浪尖還是建議web后端數據最好先入消息隊列,如kafka,然后分離線和實時將數據進行解耦分流,用於實時處理和離線處理。

?

消息隊列的訂閱者可以根據需要隨時擴展,可以很好的擴展數據的使用者。

?

消息隊列的橫向擴展,增加吞吐量,做起來還是很簡單的。這個用傳統數據庫,分庫分表還是很麻煩的。

?

由於消息隊列的存在,也可以幫助我們抗高峰,避免高峰時期后端處理壓力過大導致整個業務處理宕機。

?

具體源碼球友可以在知識星球獲取。

 

歡迎大家進入知識星球,學習更多更深入的大數據知識,面試經驗,獲取更多更詳細的資料。

640?wx_fmt=jpeg


文章來源:https://blog.csdn.net/rlnLo2pNEfx9c/article/details/80682269


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM