通過Logstash同步json文件數據到ElasticSearch
最短可以設為每秒同步一次,可以說是實時的。
1,生成需要同步到 es 的 json 文件,json文件生成時建議用 utf-8 格式,和 logstash 保持一致。
2,在logstash配置目錄:/etc/logstash/conf.d
新建配置文件bs_domain.conf
,並如下配置:
input中設置的文件*.json、.sincedb、file.log等,建議把權限都設置為chmod 777 filename
,防止logstash沒有讀寫權限引起的錯誤。
filter過濾格式化數據時,logstash會默認添加兩個字段:host、path,如果不需要可以過濾掉。
# 1.讀取json文件
input {
file {
# 必選項,配置文件路徑,可定義多個,也可模糊匹配;
path => "/home/ldy/logstash/bs_domain/*.json"
# path => ["name1.json","name2.json", "name3.json"]
# 選擇logstash開始讀取文件的位置,begining或者end
start_position => "beginning"
# 設置編碼
codec => json { charset => "UTF-8" }
# 可選項,Logstash多久檢查一下path下有新文件,默認15s;
#discover_interval => 15
# 可選項,logstash多久檢查一次被監聽文件的變化,默認1s;
#stat_interval => 1
# 可選項,記錄文件以及文件讀取信息位置的數據文件;
#sincedb_path => "/home/ldy/logstash/bs_domain/.sincedb"
# 可選項,logstash多久寫一次sincedb文件,默認15s;
#sincedb_write_interval => 15
#mode => "read"
#file_completed_action => "log"
#file_completed_log_path => "/var/log/logstash/bs_domain/file.log"
}
}
# 2.過濾格式化數據階段
filter {
mutate {
# 刪除無效的字段
remove_field => ["_id", "host", "path", "@version", "@timestamp"]
}
# 新增timestamp字段,將@timestamp時間增加8小時
# ruby {code => "event.set('timestamp', event.get('@timestamp').time.localtime + 8*60*60)"}
}
# 3.數據輸出到ES階段
output {
stdout {
codec => rubydebug
}
elasticsearch {
hosts => ["127.0.0.1:9200"]
# user => "username"
# password => "123456"
document_id => "%{name}" # 用json的name字段代替es記錄的_id
index => "bs_domain"
}
}
3,重啟 logstash,service logstash restart
。logstash 每秒檢查一次文件變化,同步文件內容。
Logstash jdbc插件實現數據增量更新到ElasticSearch
不足:最短同步間隔為一分鍾,不是實時的。
# 輸入階段
input {
stdin {
}
jdbc {
jdbc_connection_string => "jdbc:postgresql://127.0.0.1:5432/new_website_stage"
jdbc_user => "postgres"
jdbc_password => "123456"
# PG驅動,在PG官網下載,下載前查清和logstash版本對應關系
jdbc_driver_library => "/etc/logstash/dirve/postgresql-42.2.12.jar"
jdbc_driver_class => "org.postgresql.Driver"
jdbc_paging_enabled => "true"
jdbc_page_size => "300000"
# 實現數據增量同步,sql_last_value是最后被同步記錄的update_date字段值
statement => "select * from website_news where update_time > :sql_last_value"
# statement => "select * from website_news"
use_column_value => "true"
tracking_column => "update_time" # 指定用於被跟蹤的列
tracking_column_type => "timestamp" # 被跟蹤列的類型
# 每分鍾同步一次
schedule => "* * * * *"
type => "jdbc"
jdbc_default_timezone =>"Asia/Shanghai"
}
}
# 數據輸出到ES階段
output {
elasticsearch {
hosts => ["127.0.0.1:9200"] # ES的IP地址及端口
index => "website_news" # 索引名稱
document_id => "%{id}" # 用id字段作為es _id
}
stdout {
codec => json_lines # JSON格式輸出
}
}