gpkafka 從 Kafka 消費數據加載到 Greenplum


1、在Greenplum數據庫中創建目標表

2、Kafka創建Topic並向Topic寫一些消息,格式:{"org_id":"2B79D272-016A-11EB-88A7-000C29496EB0","org_name":"測試單位"}

3、配置yaml文件

DATABASE: gpkafka_test
USER: root
PASSWORD: 123456
HOST: 10.10.14.206
PORT: 5432
KAFKA:
   INPUT:
     SOURCE:
        BROKERS: 10.10.14.238:9092
        TOPIC: gp-kafka-test
     COLUMNS:
        - NAME: jdata
          TYPE: json
     FORMAT: json
     ERROR_LIMIT: 10
   OUTPUT:
     TABLE: t_base_org
     MAPPING:
        - NAME: org_id
          EXPRESSION: (jdata->>'org_id')::varchar
        - NAME: org_name
          EXPRESSION: (jdata->>'org_name')::varchar
   COMMIT:
     MAX_ROW: 5                     #多少條一提交
     MINIMAL_INTERVAL: 10000        #等待多少時間一提交(毫秒)

4、啟動服務

gpkafka load gp_kafka_test.yaml --gpfdist-host 10.10.14.206

增加 --quit-at-eof 參數 gpkafka load 會在消費完topic中的消息后退出,不加該參數會無限等待消息來消費

 注:--gpfdist-host 為master服務器的IP或機器名

 
       


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM