通過Logstash同步數據到ElasticSearch的兩種方式


通過Logstash同步json文件數據到ElasticSearch

最短可以設為每秒同步一次,可以說是實時的。

1,生成需要同步到 es 的 json 文件,json文件生成時建議用 utf-8 格式,和 logstash 保持一致。

2,在logstash配置目錄:/etc/logstash/conf.d 新建配置文件bs_domain.conf,並如下配置:

input中設置的文件*.json、.sincedb、file.log等,建議把權限都設置為chmod 777 filename,防止logstash沒有讀寫權限引起的錯誤。

filter過濾格式化數據時,logstash會默認添加兩個字段:host、path,如果不需要可以過濾掉。

# 1.讀取json文件
input {
    file {
		# 必選項,配置文件路徑,可定義多個,也可模糊匹配;
        path => "/home/ldy/logstash/bs_domain/*.json"
		# path => ["name1.json","name2.json", "name3.json"]
	
		# 選擇logstash開始讀取文件的位置,begining或者end
		start_position => "beginning"
	
        # 設置編碼
        codec => json { charset => "UTF-8" }

		# 可選項,Logstash多久檢查一下path下有新文件,默認15s;
        #discover_interval => 15

		# 可選項,logstash多久檢查一次被監聽文件的變化,默認1s;
        #stat_interval => 1
	
		# 可選項,記錄文件以及文件讀取信息位置的數據文件;
        #sincedb_path => "/home/ldy/logstash/bs_domain/.sincedb"

		# 可選項,logstash多久寫一次sincedb文件,默認15s;
        #sincedb_write_interval => 15

        #mode => "read"
        #file_completed_action => "log"
        #file_completed_log_path => "/var/log/logstash/bs_domain/file.log"
    }
}


# 2.過濾格式化數據階段
filter {
    mutate {
        # 刪除無效的字段
        remove_field => ["_id", "host", "path", "@version", "@timestamp"]
    }
    # 新增timestamp字段,將@timestamp時間增加8小時
    # ruby {code => "event.set('timestamp', event.get('@timestamp').time.localtime + 8*60*60)"}

}


# 3.數據輸出到ES階段
output {
    stdout {
        codec => rubydebug
    }
    elasticsearch {
        hosts => ["127.0.0.1:9200"]
        # user => "username"
        # password => "123456"
        document_id => "%{name}"  # 用json的name字段代替es記錄的_id
        index => "bs_domain"
    }
}

3,重啟 logstash,service logstash restart。logstash 每秒檢查一次文件變化,同步文件內容。

Logstash jdbc插件實現數據增量更新到ElasticSearch

不足:最短同步間隔為一分鍾,不是實時的。

# 輸入階段
input {
    stdin {
    }
    jdbc {
      jdbc_connection_string => "jdbc:postgresql://127.0.0.1:5432/new_website_stage"
      jdbc_user => "postgres"
      jdbc_password => "123456"
      
      # PG驅動,在PG官網下載,下載前查清和logstash版本對應關系
      jdbc_driver_library => "/etc/logstash/dirve/postgresql-42.2.12.jar"
      jdbc_driver_class => "org.postgresql.Driver"
      
      jdbc_paging_enabled => "true"
      jdbc_page_size => "300000"
      
      # 實現數據增量同步,sql_last_value是最后被同步記錄的update_date字段值
      statement => "select * from website_news where update_time > :sql_last_value"
	  # statement => "select * from website_news"
      use_column_value => "true"
      tracking_column => "update_time"     # 指定用於被跟蹤的列
      tracking_column_type => "timestamp"  # 被跟蹤列的類型
      
      # 每分鍾同步一次
      schedule => "* * * * *"
      type => "jdbc"
      jdbc_default_timezone =>"Asia/Shanghai"
    }
}


# 數據輸出到ES階段
output {
    elasticsearch {
        hosts => ["127.0.0.1:9200"]  # ES的IP地址及端口
        index => "website_news"      # 索引名稱
        document_id => "%{id}"       # 用id字段作為es _id
    }
    stdout {
        codec => json_lines  # JSON格式輸出
    }
}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM