mysql增量同步到greenplum


采用工具:maxwell+Kafka+bireme

maxwell:maxwell能實時解析MySQL的binlog,並輸出json格式的數據發送到Kafka(還支持其它的消息中間件),具體參見:maxwell官網

Kafka: 一種消息中間件,在該方案中主要用於消息中轉,具體參見Kafka官網

bireme:支持Greenplum的數據增量同步工具,在寫入Greenplum的過程中,由於采用Copy模式,所以性能較高,具體參見bireme官網

大致原理就是:利用maxwell把mysql binlog解析成json,然后用kafka創建topic,然后用bireme消費,從而達到增量,增量的前提是先把數據全量同步一次,然后再增量。

全量同步初始化個人推薦dbswitch工具作者項目地址,個人測試使用體驗最佳,可以自動創建表結構,同步速度也很快。

操作步驟:

1.下載並搭建Kafka服務

2.下載並搭建maxwell服務,修改配置使其能夠連接MySQL並能向kafka寫入數據

3.下載並搭建bireme服務,修改配置使其能讀取kafka的數據並能向Greenplum寫入數據

kafka:

(1)下載安裝:

wget http://mirrors.hust.edu.cn/apache/kafka/2.5.0/kafka_2.12-2.5.0.tgz
tar -xzf kafka_2.12-2.5.0.tgz -C /usr/local

(2)配置server.properties,我的簡單配置如下:

[root@szwpldb1080 config]# cat server.properties |grep -vE '^#|^$'
broker.id=0
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=16
log.cleanup.policy=delete
log.segment.bytes=1073741824
log.retention.check.interval.ms=3000
delete.topic.enable = true
zookeeper.connect=localhost:2181
zookeeper.connection.timeout.ms=6000
delete.topic.enable=true
group.initial.rebalance.delay.ms=0
advertised.host.name=172.18.1.150

(3)制作kafka啟停腳本(提前安裝好java):

#!/usr/bin/env bash
# chkconfig: 2345 20 80
#description: start and stop server
ZOOP_HOME=/usr/local/kafka_2.12-2.5.0/bin
JAVA_HOME=/usr/java/jdk1.8.0_221/
PATH=$JAVA_HOME/bin:$PATH
CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
SVR_NAME
=kafka case $1 in start) echo "starting $ZOOP_HOME/$SVR_NAME ..." $ZOOP_HOME/kafka-server-start.sh /usr/local/kafka_2.12-2.5.0/config/server.properties > /tmp/kafka.logs.out& ;; stop) echo "stopping $PRO_HOME/$SVR_NAME ..." ps -ef|grep *.$SVR_NAME* |grep -v grep |awk '{print $2}' | sed -e "s/^/kill -9 /g" | sh - ;; restart) "$0" stop sleep 3 "$0" start ;; status) ps -ef|grep *.$SVR_NAME* ;; logs) tail -f /tmp/zookeeper.logs.out ;; *) echo "Example: server-$SVR_NAME [start|stop|restart|status|logs]" ;; esac

添加到 /etc/rc.d/init.d ,然后就可以直接service kafka xxx來管理,或者添加到systemd下面,確保服務正常啟動安裝zookerper,然后啟動kafka。

我的zookeeper簡單配置如下:

[root@szwpldb1080 config]# cat zookeeper.properties |grep -Ev '^$|^#'
dataDir=/tmp/zookeeper
clientPort=2181
maxClientCnxns=0

zookeeper啟停腳本

#!/usr/bin/env bash
# chkconfig: 2345 20 80
#description: start and stop server
ZOOP_HOME=/usr/local/kafka_2.12-2.5.0/bin
JAVA_HOME=/usr/java/jdk1.8.0_221/
PATH=$JAVA_HOME/bin:$PATH
CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar

SVR_NAME=zookeeper
case $1 in 
    start)
        echo "starting $ZOOP_HOME/$SVR_NAME ..."
       $ZOOP_HOME/zookeeper-server-start.sh /usr/local/kafka_2.12-2.5.0/config/zookeeper.properties  > /tmp/zookeeper.logs.out&
        ;;
    stop)
        echo "stopping $PRO_HOME/$SVR_NAME ..."
        ps -ef|grep *.$SVR_NAME* |grep -v grep |awk '{print $2}'  | sed -e "s/^/kill -9 /g" | sh - 
        ;;
    restart)
        "$0" stop
        sleep 3
        "$0" start
        ;;
    status)
        ps -ef|grep *.$SVR_NAME*
        ;;
    logs)
        tail -f /tmp/zookeeper.logs.out
        ;;
    *)       
        echo "Example: server-$SVR_NAME [start|stop|restart|status|logs]" ;;
esac

(4)檢查狀態:

[root@szwpldb1080 config]# jps
1762 Kafka
18521 QuorumPeerMain
30383 Jps

maxwell:

(1)docker鏡像下載

docker pull zendesk/maxwell

(2)在源端mysql建好用戶設置好權限,測試maxwell:

docker run -ti --rm zendesk/maxwell bin/maxwell --user='xxxx' --password='xxxx' --host='x.x.x.x' --producer=stdout 

看到日志輸出正常,可以放后台運行:

docker run -d --rm zendesk/maxwell bin/maxwell --user='xx' \
 --password='xx' --host='x.x.x.x' --port=3306\
 --producer=kafka --kafka.bootstrap.servers='x.x.x.x:9092'\
 --kafka_topic=syncdb --log_level=debug --output_ddl

然后創建kafka topic:

bin/kafka-topics.sh --create --topic syncdb --zookeeper localhost:2181 --partitions 1 --replication-factor 2

topic名字與maxwell創建的topic一致,並且由於maxwell可以解析所有binlog,但是bireme工具只能同步dml,因此沒有加 --output_ddl

具體maxwell參數用法可以參考: maxwell配置

檢查kafka消費情況:

[root@szwpldb1080 bin]# ./kafka-topics.sh --zookeeper localhost:2181 --describe --topic syncdb
Topic:syncdb    PartitionCount:1    ReplicationFactor:1    Configs:
    Topic: syncdb    Partition: 0    Leader: 0    Replicas: 0    Isr: 0
[root@szwpldb1080 bin]#./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic syncdb --from-beginning

看到kafka能接收maxwell產生的json文件,表示沒問題。

bireme

安裝配置都很簡單,此處略過。

主要是修改以下2個配置文件。

config.properties
maxwell1.properties

然后監控 http://x.x.x.x:8080或者監控bireme日志就可以了。

總結:

只能同步DML語句,無法處理DDL,對比幾款開源的工具已經同步方式,我覺得我這種是最舒服的。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM