1、在Greenplum數據庫中創建目標表
2、Kafka創建Topic並向Topic寫一些消息,格式:{"org_id":"2B79D272-016A-11EB-88A7-000C29496EB0","org_name":"測試單位"}
3、配置yaml文件
DATABASE: gpkafka_test
USER: root
PASSWORD: 123456
HOST: 10.10.14.206
PORT: 5432
KAFKA:
INPUT:
SOURCE:
BROKERS: 10.10.14.238:9092
TOPIC: gp-kafka-test
COLUMNS:
- NAME: jdata
TYPE: json
FORMAT: json
ERROR_LIMIT: 10
OUTPUT:
TABLE: t_base_org
MAPPING:
- NAME: org_id
EXPRESSION: (jdata->>'org_id')::varchar
- NAME: org_name
EXPRESSION: (jdata->>'org_name')::varchar
COMMIT:
MAX_ROW: 5 #多少條一提交
MINIMAL_INTERVAL: 10000 #等待多少時間一提交(毫秒)
4、啟動服務
gpkafka load gp_kafka_test.yaml --gpfdist-host 10.10.14.206
增加 --quit-at-eof 參數 gpkafka load 會在消費完topic中的消息后退出,不加該參數會無限等待消息來消費
注:--gpfdist-host 為master服務器的IP或機器名