Elasticsearch mysql 增量同步


主要用到了一個 JDBC importer for Elasticsearch的庫。

 

 

想要增量同步,有一些先決條件。首先數據庫中要維護一個update_time的時間戳,這個字段表示了該記錄的最后更新時間。然后用上面的那個庫,定時執行一個任務,這個任務中執行的sql就是根據時間戳判斷該記錄是否應該被更新。

 

這里先寫一個最簡單的例子來展示一下。

 

從上方插件官網中下載適合的dist包,然后解壓。進入bin目錄,可以看到一堆sh腳本。在bin目錄下創建一個test.sh:

bin=/home/csonezp/Dev/elasticsearch-jdbc-2.3.1.0/bin
lib=/home/csonezp/Dev/elasticsearch-jdbc-2.3.1.0/lib

echo '{
    "type" : "jdbc",
    "statefile" : "statefile.json",
   "jdbc": {
        "url" : "jdbc:mysql://myaddr",
        "user" : "myuser",
        "password" : "mypwd",
        "type" : "mytype",
        "index": "myindex",
        "schedule" : "0 * * * * ?",
        "metrics" : {
            "enabled" : true
        },
        
       "sql" : [
            {
                "statement" : "select * from gd_actor_info where update_time > ?",
                "parameter" : [ "$metrics.lastexecutionstart" ]
            }
        ]
      
    }
}' | java \
       -cp "${lib}/*" \
       -Dlog4j.configurationFile=${bin}/log4j2.xml \
       org.xbib.tools.Runner \
       org.xbib.tools.JDBCImporter

schedule現在設置成每分鍾都執行一次,是為了方便觀察行為。statefile這一句是一定要加的。$metrics.lastexecutionstart就是這個腳本的關鍵所在了,這個指的是上一次腳本執行的時間,可以通過比較這個時間和數據庫里的字段來判斷是否要更新。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM