通過DataX同步數據至Elasticsearch
使用總結
- Long值導入時精度丟失,字段類型使用text
- splitPk使用ID流水號時,導入無進度0%
- 因Id取最小值遞加至最大值,范圍間隔大空查詢較多
- 將數據源的查詢時間 拆細
- ES日期字段創建需指定格式 yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis
- 日期數據導入時,text寫入為日期格式,long寫入為時間戳(數據有精度錯誤)
- 注意時區問題 寫入時指定時區 或 對UTC時間戳進行轉換
- 指定:“2019-03-12T12:12:12.123+0800”
- 轉換:東八區時間戳 = 3600000*8 + UTC時間戳
執行命令
python /datax/bin/datax.py --jvm=-Xmx4g job.json
配置樣例
job.json
{
"job": {
"setting": {
"speed": {
"channel": 1
}
},
"content": [
{
"reader": {
...
},
"writer": {
"name": "elasticsearchwriter",
"parameter": {
"endpoint": "http://xxx:9999",
"accessId": "xxxx",
"accessKey": "xxxx",
"index": "test-1",
"type": "default",
"cleanup": true,
"settings": {"index" :{"number_of_shards": 1, "number_of_replicas": 0}},
"discovery": false,
"batchSize": 1000,
"splitter": ",",
"column": [
{"name": "pk", "type": "id"},
{ "name": "col_ip","type": "ip" },
{ "name": "col_double","type": "double" },
{ "name": "col_long","type": "long" },
{ "name": "col_integer","type": "integer" },
{ "name": "col_keyword", "type": "keyword" },
{ "name": "col_text", "type": "text", "analyzer": "ik_max_word"},
{ "name": "col_geo_point", "type": "geo_point" },
{ "name": "col_date", "type": "date", "format": "yyyy-MM-dd HH:mm:ss"},
{ "name": "col_nested1", "type": "nested" },
{ "name": "col_nested2", "type": "nested" },
{ "name": "col_object1", "type": "object" },
{ "name": "col_object2", "type": "object" },
{ "name": "col_integer_array", "type":"integer", "array":true},
{ "name": "col_geo_shape", "type":"geo_shape", "tree": "quadtree", "precision": "10m"}
]
}
}
}
]
}
}
參數說明
-
endpoint
-
描述:ElasticSearch的連接地址
-
必選:是
-
默認值:無
-
accessId
-
描述:http auth中的user
-
必選:否
-
默認值:空
-
accessKey
-
描述:http auth中的password
-
必選:否
-
默認值:空
-
index
-
描述:elasticsearch中的index名
-
必選:是
-
默認值:無
-
type
-
描述:elasticsearch中index的type名
-
必選:否
-
默認值:index名
-
cleanup
-
描述:是否刪除原表
-
必選:否
-
默認值:false
-
batchSize
-
描述:每次批量數據的條數
-
必選:否
-
默認值:1000
-
trySize
-
描述:失敗后重試的次數
-
必選:否
-
默認值:30
-
timeout
-
描述:客戶端超時時間
-
必選:否
-
默認值:600000
-
discovery
-
描述:啟用節點發現將(輪詢)並定期更新客戶機中的服務器列表。
-
必選:否
-
默認值:false
-
compression
-
描述:http請求,開啟壓縮
-
必選:否
-
默認值:true
-
multiThread
-
描述:http請求,是否有多線程
-
必選:否
-
默認值:true
-
ignoreWriteError
-
描述:忽略寫入錯誤,不重試,繼續寫入
-
必選:否
-
默認值:false
-
ignoreParseError
-
描述:忽略解析數據格式錯誤,繼續寫入
-
必選:否
-
默認值:true
-
alias
-
描述:數據導入完成后寫入別名
-
必選:否
-
默認值:無
-
aliasMode
-
描述:數據導入完成后增加別名的模式,append(增加模式), exclusive(只留這一個)
-
必選:否
-
默認值:append
-
settings
-
描述:創建index時候的settings, 與elasticsearch官方相同
-
必選:否
-
默認值:無
-
splitter
-
描述:如果插入數據是array,就使用指定分隔符
-
必選:否
-
默認值:-,-
-
column
-
描述:elasticsearch所支持的字段類型,樣例中包含了全部
-
必選:是
-
dynamic
-
描述: 不使用datax的mappings,使用es自己的自動mappings
-
必選: 否
-
默認值: false
參考
https://github.com/alibaba/DataX
https://github.com/alibaba/DataX/blob/master/introduction.md
https://github.com/alibaba/DataX/blob/master/elasticsearchwriter/doc/elasticsearchwriter.md
DataX
DataX 是阿里巴巴集團內被廣泛使用的離線數據同步工具/平台,實現包括 MySQL、Oracle、SqlServer、Postgre、HDFS、Hive、ADS、HBase、TableStore(OTS)、MaxCompute(ODPS)、DRDS 等各種異構數據源之間高效的數據同步功能。
Features
DataX本身作為數據同步框架,將不同數據源的同步抽象為從源頭數據源讀取數據的Reader插件,以及向目標端寫入數據的Writer插件,理論上DataX框架可以支持任意數據源類型的數據同步工作。同時DataX插件體系作為一套生態系統, 每接入一套新數據源該新加入的數據源即可實現和現有的數據源互通。
DataX詳細介紹
請參考:DataX-Introduction
Quick Start
Download DataX下載地址
請點擊:Quick Start
Support Data Channels
DataX目前已經有了比較全面的插件體系,主流的RDBMS數據庫、NOSQL、大數據計算系統都已經接入,目前支持數據如下圖,詳情請點擊:DataX數據源參考指南
類型 | 數據源 | Reader(讀) | Writer(寫) | 文檔 |
---|---|---|---|---|
RDBMS 關系型數據庫 | MySQL | √ | √ | 讀 、寫 |
Oracle | √ | √ | 讀 、寫 | |
SQLServer | √ | √ | 讀 、寫 | |
PostgreSQL | √ | √ | 讀 、寫 | |
DRDS | √ | √ | 讀 、寫 | |
通用RDBMS(支持所有關系型數據庫) | √ | √ | 讀 、寫 | |
阿里雲數倉數據存儲 | ODPS | √ | √ | 讀 、寫 |
ADS | √ | 寫 | ||
OSS | √ | √ | 讀 、寫 | |
OCS | √ | √ | 讀 、寫 | |
NoSQL數據存儲 | OTS | √ | √ | 讀 、寫 |
Hbase0.94 | √ | √ | 讀 、寫 | |
Hbase1.1 | √ | √ | 讀 、寫 | |
Phoenix4.x | √ | √ | 讀 、寫 | |
Phoenix5.x | √ | √ | 讀 、寫 | |
MongoDB | √ | √ | 讀 、寫 | |
Hive | √ | √ | 讀 、寫 | |
Cassandra | √ | √ | 讀 、寫 | |
無結構化數據存儲 | TxtFile | √ | √ | 讀 、寫 |
FTP | √ | √ | 讀 、寫 | |
HDFS | √ | √ | 讀 、寫 | |
Elasticsearch | √ | 寫 | ||
時間序列數據庫 | OpenTSDB | √ | 讀 | |
TSDB | √ | √ | 讀 、寫 |