主要用到了一個 JDBC importer for Elasticsearch的庫。
想要增量同步,有一些先決條件。首先數據庫中要維護一個update_time的時間戳,這個字段表示了該記錄的最后更新時間。然后用上面的那個庫,定時執行一個任務,這個任務中執行的sql就是根據時間戳判斷該記錄是否應該被更新。
這里先寫一個最簡單的例子來展示一下。
從上方插件官網中下載適合的dist包,然后解壓。進入bin目錄,可以看到一堆sh腳本。在bin目錄下創建一個test.sh:
bin=/home/csonezp/Dev/elasticsearch-jdbc-2.3.1.0/bin lib=/home/csonezp/Dev/elasticsearch-jdbc-2.3.1.0/lib echo '{ "type" : "jdbc", "statefile" : "statefile.json", "jdbc": { "url" : "jdbc:mysql://myaddr", "user" : "myuser", "password" : "mypwd", "type" : "mytype", "index": "myindex", "schedule" : "0 * * * * ?", "metrics" : { "enabled" : true }, "sql" : [ { "statement" : "select * from gd_actor_info where update_time > ?", "parameter" : [ "$metrics.lastexecutionstart" ] } ] } }' | java \ -cp "${lib}/*" \ -Dlog4j.configurationFile=${bin}/log4j2.xml \ org.xbib.tools.Runner \ org.xbib.tools.JDBCImporter
schedule現在設置成每分鍾都執行一次,是為了方便觀察行為。statefile這一句是一定要加的。$metrics.lastexecutionstart就是這個腳本的關鍵所在了,這個指的是上一次腳本執行的時間,可以通過比較這個時間和數據庫里的字段來判斷是否要更新。