簡略介紹:
Logstash是elastic技術棧中的一員。它是一個數據采集引擎,可以從數據庫采集數據到es中。我們可以通過設置自增id主鍵或者時間來控制數據的自動同步,這個id或者時間就是用於給logstash進行識別哪些數據需要同步的。
-
使用ID的方式,假設表中有1000條數據,Logstatsh第一次同步完畢后會記錄id為1000。以后表中新增數據時,主鍵id自動累加。Logstatsh的定時任務在掃描表數據時發現最大的id和自身記錄的不一樣,就會再次做增量同步。
-
使用時間的方式,假設表中有1000條數據,每一條數據都有一個時間(創建時間或更新時間)。在初次同步完畢以后,Logstatsh會記錄最大的那個時間值。定時任務在掃描時會判斷表中的數據時間值是否超過了自身所記錄的時間,如果超過了就做增量更新。
使用ID的弊端:修改數據無法同步到es。包括做邏輯刪除。無論采用哪一種方式,物理刪除是不會同步到es的。
也可以在操作數據庫的同時,手動對es索引進行修改。不使用Logstatsh。
安裝與配置Logstatsh:
https://www.elastic.co/cn/downloads/past-releases/logstash-6-4-3,注意Logstatsh版本和es版本相一致。
解壓縮后的目錄結構:
bin CONTRIBUTORS Gemfile lib logs logstash-core-plugin-api NOTICE.TXT tools x-pack config data Gemfile.lock LICENSE.txt logstash-core modules vendor
示例腳本——抓取數據庫中的數據並推送到Elasticsearch:
新建文件夾,存放數據同步的配置文件:
mkdir sync
1.配置文件——啟動Logstash時使用

新建配置文件 vim logstash-db-sync.conf #添加如下內容 input { jdbc { # 設置 MySql/MariaDB 數據庫url以及數據庫名稱 jdbc_connection_string => "jdbc:mysql://192.168.1.6:3306/foodie-shop-dev?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true" # 用戶名和密碼 jdbc_user => "root" jdbc_password => "root" # 數據庫驅動所在位置,可以是絕對路徑或者相對路徑 jdbc_driver_library => "/usr/local/logstash-6.4.3/sync/mysql-connector-java-5.1.41.jar" # 驅動類名 jdbc_driver_class => "com.mysql.jdbc.Driver" # 開啟分頁 jdbc_paging_enabled => "true" # 分頁每頁數量,可以自定義 jdbc_page_size => "10000" # 執行的sql文件路徑 statement_filepath => "/usr/local/logstash-6.4.3/sync/foodie-items.sql" # 設置定時任務間隔 含義:分、時、天、月、年,全部為*默認含義為每分鍾跑一次任務 schedule => "* * * * *" # 索引類型 #受重寫模板的影響,最終數據同步后type的值要以實際為准。(7.x版本后沒這個問題,都是_type) type => "_doc" # 是否開啟記錄上次追蹤的結果,也就是上次更新的時間,這個會記錄到 last_run_metadata_path 的文件 use_column_value => true # 記錄上一次追蹤的結果值 last_run_metadata_path => "/usr/local/logstash-6.4.3/sync/track_time" # 如果 use_column_value 為true, 配置本參數追蹤的 column 名,可以是自增id或者時間 tracking_column => "updated_time" # tracking_column 對應字段的類型 tracking_column_type => "timestamp" # 是否清除 last_run_metadata_path 的記錄,true則每次都從頭開始查詢所有的數據庫記錄 clean_run => false # 數據庫字段名稱大寫轉小寫 lowercase_column_names => false } } output { elasticsearch { # es地址 hosts => ["192.168.1.187:9200"] # 同步的索引名 index => "foodie-items" # 設置_docID和數據相同 document_id => "%{id}" # 設置中文分詞器 # 定義模板名稱 template_name => "myik" # 模板所在位置 template => "/usr/local/logstash-6.4.3/sync/logstash-ik.json" # 重寫模板 template_overwrite => true # 默認為true,false關閉logstash自動管理模板功能,如果自定義模板,則設置為false manage_template => false } # 日志輸出 stdout { codec => json_lines } }
2.修改Logstash的mappings模板,配置中文分詞器

#新建json文件,重寫模板 vim logstash-ik.json; { "order": 0, "version": 1, "index_patterns": ["*"], "settings": { "index": { "refresh_interval": "5s" } }, "mappings": { "_default_": { "dynamic_templates": [ { "message_field": { "path_match": "message", "match_mapping_type": "string", "mapping": { "type": "text", "norms": false } } }, { "string_fields": { "match": "*", "match_mapping_type": "string", "mapping": { "type": "text", "norms": false, "analyzer": "ik_max_word", "fields": { "keyword": { "type": "keyword", "ignore_above": 256 } } } } } ], "properties": { "@timestamp": { "type": "date" }, "@version": { "type": "keyword" }, "geoip": { "dynamic": true, "properties": { "ip": { "type": "ip" }, "location": { "type": "geo_point" }, "latitude": { "type": "half_float" }, "longitude": { "type": "half_float" } } } } } }, "aliases": {} }
啟動logstatsh:
./logstash -f /usr/local/logstash-6.4.3/sync/logstash-db-sync.conf
過程較慢,耐心等待。