PostgreSQL邏輯復制到kafka-實踐


kafka 安裝

  wget http://mirror.bit.edu.cn/apache/kafka/2.3.0/kafka_2.12-2.3.0.tgz

  cp kafka_2.12-2.0.1.tgz  kafka.tgz

      sudo tar xzvf kafka.tgz --directory=/opt/java/kafka --strip 1

 

啟動 kafka需要先啟動本地的 zookeeper,注意修改配置文件中zk的連接地址

 

  /opt/kafka/bin/kafka-server-start.sh   /opt/kafka/config/server.properties

 

kafkacat 是一個C語言編寫的 kafka 生產者、消費者程序。安裝過程需要一些庫可能需要手動下載。

postgres 邏輯解碼, 程序 jsoncdc

jsoncdc 依賴於rust可能需要先安裝 rust 或者可以使用 wal2json替代

編譯好之后本地目錄下有 jsoncdc.so 或者 wal2json.so

 

postgres 安裝解碼插件:

vim  $PGDATA/postgresql.conf

shared_preload_libraries = 'jsoncdc.so'

安裝完成插件

postgres 插入數據

 

生產數據到本地 kafka

/opt/bin/pgsql/pg_10/bin/pg_recvlogical -d postgres -S jsoncdc --start -o pretty-print=1 -f - | ./kafkacat/kafkacat -b 127.0.0.1:9092 -t pg

 Auto-selecting Producer mode (use -P or -C to override)

 

消費數據測試:

./kafkacat/kafkacat -b 127.0.0.1:9092 -t pg

 

 

kafka自消費實驗:

 

啟動zk,我這邊有zk服務器,因此不需要啟動:

bin/zookeeper-server-start.sh config/zookeeper.properties &

 

啟動kafka

bin/kafka-server-start.sh config/server.properties

在里面修改zk連接信息

 

創建topic

bin/kafka-topics.sh --create --zookeeper --replication-factor 1 --partitions 1 --topic test

 

創建一個叫做“test”topic,它只有一個分區,一個副本。

> bin/kafka-topics.sh --create --zookeeper 10.9.5.20:4119,10.9.5.21:4119,10.9.5.22:4119,10.9.5.24:4119,10.9.5.35:4119,10.9.5.36:4119 --replication-factor 1 --partitions 1 --topic test

 

可以通過list命令查看創建的topic:

> bin/kafka-topics.sh --list --zookeeper localhost:2181

test

 

運行producer並在控制台中輸一些消息,這些消息將被發送到服務端:

> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test 

This is a messageThis is another message

ctrl+c可以退出發送。

 

消費消息:

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --consumer.config config/server.properties --topic test --from-beginning

 

接下來想寫一個nodejs程序,將pg的變化動態回放到web中,顯示出來。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM