使用 amazonriver將postgres 數據實時同步 至elasticsearch (基於pg的邏輯復制功能)


amazonriver

 

一、    簡介

amazonriver 是一個將postgresql的增、刪、改實時同步到es或kafka的服務(基於pg的邏輯復制功能),go語言實現,哈羅單車開源的一個項目。

git地址https://github.com/hellobike/amazonriver

也有java版本https://github.com/hellobike/tunnel(需依賴zookeeper)

版本支持

  • Postgresql 9.4 or later
  • Kafka 0.8 or later
  • ElasticSearch 5.x

 

amazonriver 利用pg內部的邏輯復制功能,通過在pg創建邏輯復制槽,接收數據庫的邏輯變更,通過解析test_decoding特定格式的消息,得到邏輯數據,與es的通訊是發布訂閱的方式,

效果要比觸發器好的多,並且配置簡易

 

 

安裝

下載最新版本

https://github.com/hellobike/amazonriver/releases

 

 

壓縮包中amazonriver是可執行文件,放到/opt/amazonriver目錄下

 

二、    配置

amazonriver 配置

在該目錄下新增一個配置文件 config.json

 

 

config.json配置內容為

{

    "pg_dump_path": "",

"subscribes": [{

       #歷史數據導入的功能,測試發現無法使用

        "dump": false,

              #若有多個amazonriver實例,這里要配置成不一樣的

        "slotName": "slot_for_es1",

        "pgConnConf": {

            #pg數據庫配置

            "host": "10.168.4.91",

            "port": 5321,

            "database": "xzdssituation",

            "schema": "public",

            "user": "postgres",

            "password": "postgres"

        },

        "rules": [

            {

                #表名

                "table": "ds_command",

                 #表的主鍵

                "pks": ["command_id"],

                 #es中的主鍵

                "esid": ["command_id"],

                "index": "ds_command",

                 #固定為data, 禁止以下划線開頭

                "type": "data"

            },

            {

                "table": "ds_alarm",

                "pks": ["command_id"],

                "esid": ["command_id"],

                "index": "ds_alarm",

                "type": "data"

            }

        ],

        "esConf": {

            #elasticsearch 地址

            "addrs": "http://10.168.4.60:9200/",

            "user": "",

            "password": ""

        },

         #失敗重試0為不重試,-1會一直重試直到成功

        "retry": -1

}],

# 填":8080"會開啟端口,供普羅米修斯監控,這里不配置

    "prometheus_address": ""

}

 

PostgreSQL 配置

1.修改/app/pgsql_data/postgresql.conf

wal_level = 'logical';
max_replication_slots = 5; #該值要大於1

修改后需要重啟才能生效

2.創建有replication權限的用戶

CREATE ROLE syn_rep LOGIN  ENCRYPTED PASSWORD 'postgres' REPLICATION;
GRANT CONNECT ON DATABASE xzdssituation to syn_rep;
COMMIT;

3.修改白名單配置

在 /app/pgsql_data/pg_hba.conf 中增加配置:

 host replication syn_rep all md5

修改后需要reload才能生效

4.重啟pg

su – postgres

./stop-pgsql.sh

./start-pgsql.sh

三、      啟停

先給amazonriver加可執行權限

chmod +x /opt/amazonriver/amazonriver

啟動(指定配置 和 日志)

已寫成腳本:/opt/amazonriver/start.sh

nohup /opt/amazonriver/amazonriver -config /opt/amazonriver/config.json -level debug > /opt/amazonriver/logfile.log 2>&1 & echo $! > /opt/amazonriver/pid

停止

已寫成腳本:/opt/amazonriver/stop.sh

kill `cat /opt/amazonriver/pid`

四、          日志查看

tail -200f /opt/amazonriver/logfile.log

 

附錄:

/app/pgsql/bin/pg_ctl 控制pg啟停

pg_ctl start [-w] [-s] [-D datadir] [-l filename] [-o options] [-p path]
pg_ctl stop [-W] [-s] [-D datadir] [-m s[mart] | f[ast] | i[mmediate] ]
pg_ctl restart [-w] [-s] [-D datadir] [-m s[mart] | f[ast] | i[mmediate] ] [-o options]
pg_ctl reload [-s] [-D datadir]
pg_ctl status [-D datadir]
pg_ctl kill [signal_name] [process_id]
pg_ctl register [-N servicename] [-U username] [-P password] [-D datadir] [-w] [-o options]
pg_ctl unregister [-N servicename]

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM