https://www.jianshu.com/p/1f7889273845?from=timeline&isappinstalled=0
一 背景
1 binlog定義
binlog基本定義:二進制日志,也成為二進制日志,記錄對數據發生或潛在發生更改的SQL語句,並以二進制的形式保存在磁盤中。
作用:MySQL的作用類似於Oracle的歸檔日志,可以用來查看數據庫的變更歷史(具體的時間點所有的SQL操作)、數據庫增量備份和恢復(增量備份和基於時間點的恢復)、Mysql的復制(主主數據庫的復制、主從數據庫的復制)。
2 開啟binlog
找到mysql的配置文件,linux下一般為my.cnf在/etc 下,window下一般為my.ini
在[mysqld]下添加
log-bin=mysql-bin
binlog_format="ROW"
添加完成后重啟mysql
mysql> show binary logs;
會顯示如下:
+------------------+-----------+ | Log_name | File_size | +------------------+-----------+ | mysql-bin.000001 | 732 | +------------------+-----------+
3 binlog格式
mysql的binlog有多種格式
a Statement:每一條會修改數據的sql都會記錄在binlog中
b Row:不記錄sql語句上下文相關信息,僅保存哪條記錄被修改
c Mixed:是以上兩種level的混合使用,一般的語句修改使用statment格式保存binlog,如一些函數,statement無法完成主從復制的操作,則采用row格式保存binlog
注:我們的binlog-access只支持row格式的解析
二 binlog-accessor
由於我們的項目中需要實時獲取mysql中某些字段的修改,考慮到添加觸發器或者在代碼層面監聽修改過大,因此最終決定通過監聽myslq的binlog來完成。
調研了一些現有的方案后,最終基於open-replicator實現了一套binlog的監聽及解析程序。
1 open-replicator
open-replicator是一個開源的binlog解析框架。
它的主要原理是將自己偽裝成一台mysql的備庫從而從主庫獲取binlog數據。
比如刪除mysql中的一條數據,open-replicator會返回:
DeleteRowsEventV2[header=BinlogEventV4HeaderImpl[timestamp=1488177443000,eventType=32,serverId=1,eventLength=72,nextPosition=1653,flags=0,timestampOfReceipt=1488177443997],tableId=116,reserved=1,extraInfoLength=2,extraInfo=<null>,columnCount=5,usedColumns=11111,rows=[Row[columns=[13, 0, 0, 0, 100]]]]
這個返回結果基本和binlog的格式完全一樣,但對於我們實際的使用中,有許多不方便的地方。
比如:tableId是mysql內部使用的,如果對外使用,我們需要將tableId翻譯為tableName。還有row的值,只描述了原始值,並沒有描述列的字段名。鑒於此,我們需要對open-replicator做諸多的加工。
2 加工數據
我們只關注binlog中的4種event類型
a tableMapEvent,該event主要描述tableId和tableName的對應
b insertEvent,該event描述insert事件
c updateEvent,該event描述update事件
d deleteEvent,該event描述delete事件
加工分為兩個截斷
a 通過tableId獲取tableName(解析tableMapEvent)
b 獲取每個字段的列名,主要功過調用 desc tableName 得到
加工后的輸出結果為一個bean:
比如上條的刪除事件,加工后返回的結果為:
RowDiffModel(timestamp=1488177443000, tableName=lx_charge.user_fund, pkColumnName=[], pk=[], type=3, diffColumns=[user_id, invest, extend, rebate, balance], preValue={extend=0, balance=100, user_id=13, rebate=0, invest=0}, newValue={})
3 訂閱數據
我們將加工后的binlog發送到rabbitmq的一個topic中,所有的需求放訂閱需要的數據即可。這里貼一個訂閱的示例:
@Service public class RowDiffRawMessageConsumerPool { private static final String EXCHANGE = "db-diff"; private static final String ROUTING = "row-diff"; private static final String QUEUE = "row-diff-raw"; @Autowired ConnectionFactory connectionFactory; private ThreadPoolConsumer<RowDiffModel> threadPoolConsumer; @PostConstruct public void init() { MQAccessBuilder mqAccessBuilder = new MQAccessBuilder(connectionFactory); MessageProcess<RowDiffModel> messageProcess = message -> { System.out.println("received: " + message); return new DetailRes(true, ""); }; threadPoolConsumer = new ThreadPoolConsumer.ThreadPoolConsumerBuilder<RowDiffModel>() .setThreadCount(Constants.CONSUMER_THREAD_COUNT).setIntervalMils(Constants.INTERVAL_MILS) .setExchange(EXCHANGE).setRoutingKey(ROUTING).setQueue(QUEUE).setType("topic") .setMQAccessBuilder(mqAccessBuilder).setMessageProcess(messageProcess) .build(); } public void start() throws IOException { threadPoolConsumer.start(); } public void stop() { threadPoolConsumer.stop(); } }
在本例中,將所有的binlog直接打印。
關於rabbitmq的使用請參考
4 高可用性
任何一個項目都需要考慮高可用性,尤其是一些偏底層的模塊。在binlog-access中,我們從兩方面考慮高可用性
a mysql的可用性。我們需要考慮mysql掛掉,網絡異常的情況。我們對原始的open-replicator做了一個加強,重寫了它的start方法,保證在各種情況下的自動重試
@Override public void start() { new Thread(() -> { while (!stop) { try { if (!isRunning()) { if (this.transport != null || this.binlogParser != null) { this.stopQuietly(0, TimeUnit.SECONDS); this.transport = null; this.binlogParser = null; } BinlogMeta binlogMeta = binlogMetaBuilder.getBinlogMeta(); setBinlogFileName(binlogMeta.getBinlogName()); setBinlogPosition(binlogMeta.getPos()); log.info(binlogMeta.toString()); super.start(); } } catch (Exception e) { e.printStackTrace(); } finally { try { Thread.sleep(10 * 1000); } catch (InterruptedException e) { e.printStackTrace(); } } } }).start(); }
b 多機器部署。為了避免單點效應,我們需要將binlog-access支持多機部署。這里引入redis來保證不會發送重復數據到topic中,主要通過日志偏移去重:
@Log public DetailRes send(long pos, List<RowDiffModel> rowDiffModels) { if (redisCache.cacheIfAbsent("binlog:" + pos, Constants.TIMESTAMP_VALID_TIME)) { DetailRes detailRes = new DetailRes(true, ""); for (RowDiffModel rowDiffModel : rowDiffModels) { if (detailRes.isSuccess()) { String dbName = rowDiffModel.getTableName().split("\\.")[0].toLowerCase(); if (dbSet.isEmpty()) { detailRes = messageSender.send(rowDiffModel); } else { if (dbSet.contains(dbName)) { detailRes = messageSender.send(rowDiffModel); } } } else { break; } } return detailRes; } else { return new DetailRes(true, ""); } }
關於redis的使用,請參考
5 項目依賴
a open-replicator
<dependency> <groupId>com.flipkart</groupId> <artifactId>open-replicator</artifactId> <version>1.0.8</version> </dependency>
b rabbitmq-access
<dependency> <groupId>com.littlersmall.rabbitmq-access</groupId> <artifactId>rabbitmq-access</artifactId> <version>1.0-SNAPSHOT</version> </dependency>
注:該模塊需要自己打包成jar包導入項目或者deploy在自己的代碼庫中
c redis-access
<dependency> <groupId>com.littlersmall.redis-access</groupId> <artifactId>redis-access</artifactId> <version>1.0-SNAPSHOT</version> </dependency>
注:同上
三 binlog-access的使用
1 准備好所依賴的jar包(或deploy在自己的代碼庫中,rabbitmq-access & redis-access)
2 安裝好rabbitmq和redis
3 確定所監聽的mysql開啟了binlog,且binlog的格式為ROW
4 配置文件(resources/application.properties),如下
#db
db.host=127.0.0.1
db.port=3306
db.username=root
db.password=root
db.url=jdbc:mysql://${db.host}:${db.port}/?useUnicode=true&characterEncoding=utf8
#rabbitmq
rabbit.ip=127.0.0.1
rabbit.port=5672
rabbit.user_name=guest
rabbit.password=guest
#redis
redis.ip=127.0.0.1
redis.port=6379
#監聽的庫','分割,例如: diff.db=user,info,不配置則表示監聽全部庫
diff.db=
5 權限配置。需要確保mysql賬戶擁有備庫的全部權限+所有表的讀權限
6 項目啟動:java -jar binlog-access.jar
項目代碼見
路過的麻煩點個星星,謝謝(__)
