1、使用datax工具將postgresql或者greenplum數據庫中的數據同步到elasticsearch中。DataX目前已經有了比較全面的插件體系,主流的RDBMS數據庫、NOSQL、大數據計算系統都已經接入,目前支持數據如下圖:
類型 | 數據源 | Reader(讀) | Writer(寫) | 文檔 |
---|---|---|---|---|
RDBMS 關系型數據庫 | MySQL | √ | √ | 讀 、寫 |
Oracle | √ | √ | 讀 、寫 | |
SQLServer | √ | √ | 讀 、寫 | |
PostgreSQL | √ | √ | 讀 、寫 | |
DRDS | √ | √ | 讀 、寫 | |
通用RDBMS(支持所有關系型數據庫) | √ | √ | 讀 、寫 | |
阿里雲數倉數據存儲 | ODPS | √ | √ | 讀 、寫 |
ADS | √ | 寫 | ||
OSS | √ | √ | 讀 、寫 | |
OCS | √ | √ | 讀 、寫 | |
NoSQL數據存儲 | OTS | √ | √ | 讀 、寫 |
Hbase0.94 | √ | √ | 讀 、寫 | |
Hbase1.1 | √ | √ | 讀 、寫 | |
Phoenix4.x | √ | √ | 讀 、寫 | |
Phoenix5.x | √ | √ | 讀 、寫 | |
MongoDB | √ | √ | 讀 、寫 | |
Hive | √ | √ | 讀 、寫 | |
Cassandra | √ | √ | 讀 、寫 | |
無結構化數據存儲 | TxtFile | √ | √ | 讀 、寫 |
FTP | √ | √ | 讀 、寫 | |
HDFS | √ | √ | 讀 、寫 | |
Elasticsearch | √ | 寫 | ||
時間序列數據庫 | OpenTSDB | √ | 讀 | |
TSDB | √ | √ | 讀 、寫 |
可以看到Elasticsearch只支持寫,但是不支持讀的,如果支持從Elasticsearch讀出來,寫到postgresql或者greenplum也是很好的哦!
2、datax的安裝,配置就不寫了,之前搞過,現在需要搞一個postgresql或者greenplum寫到elasticsearch的json,需要注意的是需要安裝一個postgresqlreader讀插件的,將插件放到datax\datax\plugin\reader中。
然后,需要搞一個elasticsearchwriter寫插件,將elasticsearchwriter插件放在datax\datax\plugin\writer中。
1 { 2 "job": { 3 "setting": { 4 "speed": { 5 "byte": 8388608, 6 "channel": 3 7 }, 8 "errorLimit": { 9 "record": 0, 10 "percentage": 0.02 11 } 12 }, 13 "content": [{ 14 "reader": { 15 "name": "postgresqlreader", 16 "parameter": { 17 "username": "數據庫賬號", 18 "password": "數據庫密碼", 19 "connection": [ 20 { 21 "querySql": [ 22 "SELECT "update_time",\"UserName\",\"Age\" from core_data.up_lic_data where \"Id\" > 1 and \"ID\" < 10000 limit 1000 ;" 23 ], 24 "jdbcUrl": ["jdbc:postgresql://數據庫ip地址:數據庫端口號/數據庫名稱"] 25 } 26 ] 27 } 28 }, 29 "writer": { 30 "name": "elasticsearchwriter", 31 "parameter": { 32 "endpoint": "http://es主節點ip地址:es端口號", 33 "index": "user_info", // 索引名稱 34 "type": "doc", // 文檔類型 35 "cleanup": false, 36 "dynamic": false, 37 "settings": { 38 "index": { 39 "number_of_shards": 5, // 分片數量 40 "number_of_replicas": 1 // 副本數量 41 } 42 }, 43 "batchSize": 10000, 44 "splitter": ",", 45 "column": [ 46 { 47 "name": "update_time", 48 "type": "keyword" 49 },{ 50 "name": "UserName", 51 "type": "keyword" 52 },{ 53 "name": "Age", 54 "type": "keyword" 55 } 56 ] 57 } 58 } 59 }] 60 } 61 }