通過DataX同步數據至Elasticsearch


使用總結

  • Long值導入時精度丟失,字段類型使用text
  • splitPk使用ID流水號時,導入無進度0%
    • 因Id取最小值遞加至最大值,范圍間隔大空查詢較多
    • 將數據源的查詢時間 拆細
  • ES日期字段創建需指定格式 yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis
  • 日期數據導入時,text寫入為日期格式,long寫入為時間戳(數據有精度錯誤)
  • 注意時區問題 寫入時指定時區 或 對UTC時間戳進行轉換
    • 指定:“2019-03-12T12:12:12.123+0800”  
    • 轉換:東八區時間戳 = 3600000*8 + UTC時間戳

執行命令 

python /datax/bin/datax.py --jvm=-Xmx4g job.json

 

配置樣例

job.json

{

  "job": {

    "setting": {

        "speed": {

            "channel": 1

        }

    },

    "content": [

      {

        "reader": {

          ...

        },

        "writer": {

          "name": "elasticsearchwriter",

          "parameter": {

            "endpoint": "http://xxx:9999",

            "accessId": "xxxx",

            "accessKey": "xxxx",

            "index": "test-1",

            "type": "default",

            "cleanup": true,

            "settings": {"index" :{"number_of_shards": 1, "number_of_replicas": 0}},

            "discovery": false,

            "batchSize": 1000,

            "splitter": ",",

            "column": [

              {"name": "pk", "type": "id"},

              { "name": "col_ip","type": "ip" },

              { "name": "col_double","type": "double" },

              { "name": "col_long","type": "long" },

              { "name": "col_integer","type": "integer" },

              { "name": "col_keyword", "type": "keyword" },

              { "name": "col_text", "type": "text", "analyzer": "ik_max_word"},

              { "name": "col_geo_point", "type": "geo_point" },

              { "name": "col_date", "type": "date", "format": "yyyy-MM-dd HH:mm:ss"},

              { "name": "col_nested1", "type": "nested" },

              { "name": "col_nested2", "type": "nested" },

              { "name": "col_object1", "type": "object" },

              { "name": "col_object2", "type": "object" },

              { "name": "col_integer_array", "type":"integer", "array":true},

              { "name": "col_geo_shape", "type":"geo_shape", "tree": "quadtree", "precision": "10m"}

            ]

          }

        }

      }

    ]

  }

 

}

參數說明

 

  • endpoint

  • 描述:ElasticSearch的連接地址

  • 必選:是

  • 默認值:無

  • accessId

  • 描述:http auth中的user

  • 必選:否

  • 默認值:空

  • accessKey

  • 描述:http auth中的password

  • 必選:否

  • 默認值:空

  • index

  • 描述:elasticsearch中的index名

  • 必選:是

  • 默認值:無

  • type

  • 描述:elasticsearch中index的type名

  • 必選:否

  • 默認值:index名

  • cleanup

  • 描述:是否刪除原表

  • 必選:否

  • 默認值:false

  • batchSize

  • 描述:每次批量數據的條數

  • 必選:否

  • 默認值:1000

  • trySize

  • 描述:失敗后重試的次數

  • 必選:否

  • 默認值:30

  • timeout

  • 描述:客戶端超時時間

  • 必選:否

  • 默認值:600000

  • discovery

  • 描述:啟用節點發現將(輪詢)並定期更新客戶機中的服務器列表。

  • 必選:否

  • 默認值:false

  • compression

  • 描述:http請求,開啟壓縮

  • 必選:否

  • 默認值:true

  • multiThread

  • 描述:http請求,是否有多線程

  • 必選:否

  • 默認值:true

  • ignoreWriteError

  • 描述:忽略寫入錯誤,不重試,繼續寫入

  • 必選:否

  • 默認值:false

  • ignoreParseError

  • 描述:忽略解析數據格式錯誤,繼續寫入

  • 必選:否

  • 默認值:true

  • alias

  • 描述:數據導入完成后寫入別名

  • 必選:否

  • 默認值:無

  • aliasMode

  • 描述:數據導入完成后增加別名的模式,append(增加模式), exclusive(只留這一個)

  • 必選:否

  • 默認值:append

  • settings

  • 描述:創建index時候的settings, 與elasticsearch官方相同

  • 必選:否

  • 默認值:無

  • splitter

  • 描述:如果插入數據是array,就使用指定分隔符

  • 必選:否

  • 默認值:-,-

  • column

  • 描述:elasticsearch所支持的字段類型,樣例中包含了全部

  • 必選:是

  • dynamic

  • 描述: 不使用datax的mappings,使用es自己的自動mappings

  • 必選: 否

  • 默認值: false

 

 


參考

https://github.com/alibaba/DataX

https://github.com/alibaba/DataX/blob/master/introduction.md

https://github.com/alibaba/DataX/blob/master/elasticsearchwriter/doc/elasticsearchwriter.md

DataX

DataX 是阿里巴巴集團內被廣泛使用的離線數據同步工具/平台,實現包括 MySQL、Oracle、SqlServer、Postgre、HDFS、Hive、ADS、HBase、TableStore(OTS)、MaxCompute(ODPS)、DRDS 等各種異構數據源之間高效的數據同步功能。

Features

DataX本身作為數據同步框架,將不同數據源的同步抽象為從源頭數據源讀取數據的Reader插件,以及向目標端寫入數據的Writer插件,理論上DataX框架可以支持任意數據源類型的數據同步工作。同時DataX插件體系作為一套生態系統, 每接入一套新數據源該新加入的數據源即可實現和現有的數據源互通。

DataX詳細介紹

請參考:DataX-Introduction

Quick Start

Download DataX下載地址
請點擊:Quick Start

Support Data Channels

DataX目前已經有了比較全面的插件體系,主流的RDBMS數據庫、NOSQL、大數據計算系統都已經接入,目前支持數據如下圖,詳情請點擊:DataX數據源參考指南

類型 數據源 Reader(讀) Writer(寫) 文檔
RDBMS 關系型數據庫 MySQL  、
            Oracle         √         √      、
  SQLServer  、
  PostgreSQL  、
  DRDS  、
  通用RDBMS(支持所有關系型數據庫)  、
阿里雲數倉數據存儲 ODPS  、
  ADS  
  OSS  、
  OCS  、
NoSQL數據存儲 OTS  、
  Hbase0.94  、
  Hbase1.1  、
  Phoenix4.x  、
  Phoenix5.x  、
  MongoDB  、
  Hive  、
  Cassandra  、
無結構化數據存儲 TxtFile  、
  FTP  、
  HDFS  、
  Elasticsearch  
時間序列數據庫 OpenTSDB  
  TSDB  、

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM