通過Bottledwater同步PostgreSQL中的數據變化到Kafka消息隊列


當我們遇到需要捕獲數據庫中數據變化的時候,總是會想到通過消息隊列來實現該需求,通過把數據變化發布到消息隊列,來完成系統上下游的解耦。關心這些數據變化的應用可以從消息隊列上獲取這些數據。

Bottledwater-pg是針對PostgreSQL數據庫的一種消息生產者,可以將PostgreSQL數據庫的數據寫入confluent Kafka,從而實時的分享給消息訂閱者。支持PostgreSQL 9.4以及以上版本,支持全量快照,以及持續解析數據WAL日志中的增量數據並寫入Kafka。每一張數據庫表為一個topic。數據在使用decode從WAL取出后,使用Avro將數據格式化(通常格式化為JSON)再寫入Kafka。

Bottledwater-pg有docker、源碼編譯、Ubuntu三種使用方式,本文以源碼編譯方式說明如何部署。

一. 環境說明

  • 源端

IP:10.19.100.23

操作系統:RHEL6.3

數據庫:PostgreSQL-9.4.11

  • 目的端

IP:10.19.100.21

操作系統:RHEL6.3

Kafka:2.10-0.10.2.0

二. 源端配置(10.19.100.23)

Bottledwater-pg依賴以下軟件包:

  • avro-c > =1.8.0
  • jansson
  • libcurl
  • librdkafka > =0.9.1

Bottledwater-pg可選以下軟件包:

  • libsnappy
  • boost

另外編譯要求較高的cmake版本,操作系統自帶的cmake會出現編譯錯誤,本文使用:

  • cmake-3.8.0

2.0 卸載系統自帶PG包

root用戶操作)

 RedHat根據安裝模式可能自帶8.4版本的PostgreSQL,請務必卸載,否則會對編譯造成影響

# rpm -qa|grep postgres
postgresql-libs-8.4.11-1.el6_2.x86_64
postgresql-devel-8.4.11-1.el6_2.x86_64
postgresql-8.4.11-1.el6_2.x86_64

# rpm -e postgresql-devel-8.4.11-1.el6_2.x86_64
# rpm -e postgresql-8.4.11-1.el6_2.x86_64
# rpm -e postgresql-libs-8.4.11-1.el6_2.x86_64

2.1 編譯安裝cmake

root用戶操作)

# cd /opt
# tar zxvf cmake-3.8.0.tar.gz
# cd cmake-3.8.0
# ./bootstrap
# make
# make install

# cmake -version
cmake version 3.8.0
CMake suite maintained and supported by Kitware (kitware.com/cmake).

2.2 編譯安裝jansson

root用戶操作)

# cd /opt
# tar jxvf jansson-2.9.tar.bz2
# cd jansson-2.9
# ./configure
# make
# make install

# ls /usr/local/lib/pkgconfig
jansson.pc

#export PKG_CONFIG_PATH=/usr/local/lib/pkgconfig:$PKG_CONFIG_PATH

2.3 編譯安裝avro

root用戶操作)

# cd /opt
# yum install -y xz-*
# yum install -y zlib-devel.x86_64

# tar zxvf avro-src-1.8.1.tar.gz
# cd avro-src-1.8.1/lang/c
# mkdir build
# cd build
# cmake .. -DCMAKE_INSTALL_PREFIX=/opt/avro -DCMAKE_BUILD_TYPE=Release -DTHREADSAFE=true
# make
# make test
# make install

導入庫文件

# vi /etc/ld.so.conf
/opt/avro/lib

# ldconfig

配置臨時環境變量

# export LD_LIBRARY_PATH=/opt/avro/lib:$LD_LIBRARY_PATH

# export PKG_CONFIG_PATH=/opt/avro/lib/pkgconfig:/usr/local/lib/pkgconfig:$PKG_CONFIG_PATH

2.4 安裝libcurl

root用戶操作)

# yum install -y libcurl-devel.x86_64

2.5 編譯安裝librdkafka

root用戶操作)

# cd /opt
# unzip librdkafka-master.zip
# cd librdkafka-master
# ./configure
# make
# make install

# ls /usr/local/lib/pkgconfig
jansson.pc  rdkafka.pc  rdkafka++.pc

2.6 添加引用庫

root用戶操作)

# vi /etc/ld.so.conf.d/bottledwater.conf
/opt/avro/lib
/usr/local/lib
/PostgreSQL/9.4/lib

# ldconfig

2.7 編譯安裝bottledwater-pg

root用戶操作)

# chown -R postgres:postgres /opt/

postgres用戶操作)

配置環境變量

$ vi ~/.bash_profile

# .bash_profile
# Get the aliases and functions

if [ -f ~/.bashrc ]; then
        . ~/.bashrc
fi

# User specific environment and startup programs

export PG_HOME=/PostgreSQL/9.4
export LD_LIBRARY_PATH=/opt/avro/lib:$LD_LIBRARY_PATH
export PKG_CONFIG_PATH=/opt/avro/lib/pkgconfig:/usr/local/lib/pkgconfig:$PKG_CONFIG_PATH

PATH=$PG_HOME/bin:$PATH:$HOME/bin
export PATH

准備安裝包

$ unzip bottledwater-pg-master.zip
$ cd bottledwater-pg-master

這里不知道是操作系統環境的問題還是開源軟件本身的問題,需要修改源碼包里的2處Makefile才能通過編譯。

  • 修改client/Makefile
PG_CFLAGS = -I$(shell pg_config --includedir) -I$(shell pg_config --includedir-server) -I$(shell pg_config --pkgincludedir)
  • 修改kafka/Makefile
PG_CFLAGS = -I$(shell pg_config --includedir) -I$(shell pg_config --includedir-server) -I$(shell pg_config --pkgincludedir)

LDFLAGS=-L/usr/lib64 $(CURL_LDFLAGS) $(PG_LDFLAGS) $(KAFKA_LDFLAGS) $(AVRO_LDFLAGS) $(JSON_LDFLAGS)

編譯並安裝bottledwater-pg

$ make
$ make install

安裝完成后會自動在PostgreSQL數據庫擴展包目錄下生成擴展庫文件和擴展庫控制文件。

$ ls /PostgreSQL/9.4/lib/postgresql/bottledwater*
/PostgreSQL/9.4/lib/postgresql/bottledwater.so

$ ls /PostgreSQL/9.4/share/postgresql/extension/bottledwater*
/PostgreSQL/9.4/share/postgresql/extension/bottledwater--0.1.sql
/PostgreSQL/9.4/share/postgresql/extension/bottledwater.control

2.7 數據庫配置

試驗環境中PostgreSQL已有數據庫mas,用戶mas,密碼mas。

Mas庫中有一張表:

mastest(col1 numeric primary key,col2 character varying(10));
  • 修改數據庫配置文件
$ vi /PostgreSQL/9.4/data/postgresql.conf

max_worker_processes = 8        # 至少為8
wal_level = logical             # 至少為logical,可以更高
max_wal_senders = 8             # 至少為8
wal_keep_segments = 256         # 至少為256
max_replication_slots = 4       # 至少為4
  • 修改數據庫白名單配置文件

  *這部分權限可能過大,可以精簡

$ vi /PostgreSQL/9.4/data/pg_hba.conf

local   replication     all                                     trust
host    replication     all             127.0.0.1/32            trust
host    replication     all             0.0.0.0/0               md5
host    all             postgres        10.19.100.23/32         trust
  • 重啟數據庫並對要監控的庫創建bottledwater擴展
$ pg_ctl restart -D /PostgreSQL/9.4/data/ -l /PostgreSQL/9.4/data/pglog.log -m fast

$ psql -U postgres -d mas -c "create extension bottledwater;"
Password for user postgres:

CREATE EXTENSION

三. 測試同步

  • 在源端(10.19.100.23)啟動bottledwater

 

$ cd /opt/bottledwater-pg-master/kafka

$ ./bottledwater --postgres=postgres://postgres:123456@10.19.100.23:5432/mas --broker=10.19.100.21:9092 -f json

[INFO] Writing messages to Kafka in JSON format
[INFO] Created replication slot "bottledwater", capturing consistent snapshot "00000718-1".
INFO:  bottledwater_export: Table mas.mastest is keyed by index mastest_pkey
[INFO] Snapshot complete, streaming changes from 0/1749F58.
  • 向源端數據庫寫入數據
mas=> insert into mastest values(1,'A');
INSERT 0 1

mas=> insert into mastest values(2,'B');
INSERT 0 1

mas=> update mastest set col2='C' where col1=2;
UPDATE 1

mas=> delete from mastest where col1=1;
DELETE 1
  • 目的端查看消息事件
# bin/kafka-topics.sh --list --zookeeper localhost:2181
mas.mastest

# bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic mas.mastest --from-beginning --property print.key=true
{"col1": {"double": 1.0}}       {"col1": {"double": 1.0}, "col2": {"string": "A"}}
{"col1": {"double": 2.0}}       {"col1": {"double": 2.0}, "col2": {"string": "B"}}
{"col1": {"double": 2.0}}       {"col1": {"double": 2.0}, "col2": {"string": "C"}}
{"col1": {"double": 1.0}}       null

說明:

1. 目的端Kafka配置文件中,Listener的配置不能用默認值localhost

2. 每一張PostgreSQL中的表都會被做為一個topic,每個topic是自動創建的不需要人工干預,即使后期新建的表也是如此。

3. 消息事件以“主鍵列 + 變更后的所有列”作為消息內容

4. 主從復制模式下,slave不能啟動bottledwater,因為備庫不產生wal日志


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM