單個進程 logstash 可以實現對數據的讀取、解析和輸出處理。但是在生產環境中,從每台應用服務器運行 logstash 進程並將數據直接發送到 Elasticsearch 里,顯然不是第一選擇:第一,過多的客戶端連接對 Elasticsearch 是一種額外的壓力;第二,網絡抖動會影響到 logstash 進程,進而影響生產應用;第三,運維人員未必願意在生產服務器上部署 Java,或者讓 logstash 跟業務代碼爭奪 Java 資源。
所以,在實際運用中,logstash 進程會被分為兩個不同的角色。運行在應用服務器上的,盡量減輕運行壓力,只做讀取和轉發,這個角色叫做 shipper;運行在獨立服務器上,完成數據解析處理,負責寫入 Elasticsearch 的角色,叫 indexer。
Kafka 是一個高吞吐量的分布式發布訂閱日志服務,具有高可用、高性能、分布式、高擴展、持久性等特性。和Redis做輕量級消息隊列不同,Kafka利用磁盤做消息隊列,所以也就無所謂消息緩沖時的磁盤問題。生產環境中還是推薦使用Kafka做消息隊列。此外,如果公司內部已經有 Kafka 服務在運行,logstash也可以快速接入,免去重復建設的麻煩。
一、Logstash搭建
詳細搭建可以參考Logstash安裝搭建(一)。
二、配置Shipper
Shipper 即為Nginx服務器上運行的 logstash 進程,logstash 通過 logstash-input-file 寫入,然后通過 logstash-output-kafka 插件將日志寫入到 kafka 集群中。
Logstash使用一個名叫 FileWatch 的 Ruby Gem 庫來監聽文件變化。這個庫支持 glob 展開文件路徑,而且會記錄在 .sincedb 的數據庫文件來跟蹤被監聽的日志文件的當前讀取位置。
.sincedb 文件中記錄了每個被監聽的文件的 inode, major number, minor number 和 pos。
Input配置實例
input { file { path => "/var/log/nginx/log_access.log" type => "nginx-access" discover_interval => 15 #logstash 每隔多久去檢查一次被監聽的 path 下是否有新文件。默認值是 15 秒。 sincedb_path => "/etc/logstash/.sincedb" #定義sincedb文件的位置 start_position => "beginning" #定義文件讀取的位置 } }
其他配置詳解:
exclude 不想被監聽的文件可以排除出去。 close_older 已經監聽的文件,若超過這個時間內沒有更新,就關閉監聽該文件的句柄。默認為:3600s,即一小時。 ignore_older 在每次檢查文件列表時,若文件的最后修改時間超過該值,則忽略該文件。默認為:86400s,即一天。 sincedb_path 定義 .sincedb 文件路徑,默認為 $HOME/.sincedb 。 sincedb_write_interval 間隔多久寫一次sincedb文件,默認15s。 stat_interval 每隔多久檢查被監聽文件狀態(是否有更新),默認為1s。 start_position logstash從什么位置開始讀取文件數據。默認為結束位置,類似 tail -f 的形式。設置為“beginning”,則從頭讀取,類似 cat ,到最后一行以后變成為 tail -f 。
Output配置實例
以下配置可以實現對 kafka producer 的基本使用。生產者更多詳細配置請查看 Kafka 官方文檔中生產者部分配置文檔。
output { kafka { bootstrap_servers => "localhost:9092" #生產者 topic_id => "nginx-access-log" #設置寫入kafka的topic compression_type => "snappy" #消息壓縮模式,默認是none,可選gzip、snappy。 } }
logstash-out-kafka 其他配置詳解:
compression_type 消息壓縮模式,默認是none,有效值為:none、gzip、snappy。 asks 消息確認模式,默認為1,有效值為:0、1、all。設置為0,生產者不等待 broker 回應;設置為1,生產者會收到 leader 寫入之后的回應;設置為all, leader 將要等待 in-sync 中所有的 replication 同步確認。 send_buffer_bytes TCP發送數據時的緩沖區的大小。
logstash-kafka 插件輸入和輸出默認 codec 為 json 格式。在輸入和輸出的時候注意下編碼格式。消息傳遞過程中 logstash 默認會為消息編碼內加入相應的時間戳和 hostname 等信息。如果不想要以上信息(一般做消息轉發的情況下),可以使用以下配置,例如:
output { kafka { codec => plain { format => "%{message}" } } }
三、搭建配置Kafka
搭建配置Kafka可以參考 Kafka集群搭建。
四、配置Indexer
是用logstash-input-kafka插件,從kafka集群中讀取數據。
Input配置示例:
input { kafka { zk_connect => "localhost:2181" #zookeeper地址 topic_id => "nginx-access-log" #kafka中topic名稱,記得創建該topic group_id => "nginx-access-log" #默認為“logstash” codec => "plain" #與Shipper端output配置項一致 consumer_threads => 1 #消費的線程數 decorate_events => true #在輸出消息的時候回輸出自身的信息,包括:消費消息的大小、topic來源以及consumer的group信息。 type => "nginx-access-log" } }
更多 logstash-input-kafka 配置可以從 logstash 官方文檔 查看。
Logstash 是一個 input | decode | filter | encode | output 的數據流。上述配置中有 codec => "plain" ,即logstash 采用轉發的形式,不會對原有信息進行編碼轉換。豐富的過濾器插件(Filter)的存在是 logstash 威力強大的重要因素,提供的不單單是過濾的功能,可以進行復雜的邏輯處理,甚至無中生有添加新的logstash事件到后續的流程中去。這里只列舉 logstash-output-elasticsearch 配置。
logstash-output-elasticsearch 配置實例:
output { elasticsearch { hosts => ["localhost:9200"] //Elasticsearch 地址,多個地址以逗號分隔。 index => "logstash-%{type}-%{+YYYY.MM.dd}" //索引命名方式,不支持大寫字母(Logstash除外) document_type => "%{type}" //文檔類型 workers => 1 flush_size => 20000 //向Elasticsearch批量發送數據的條數 idle_flush_time => 10 //向Elasticsearch批量發送數據的時間間隔,即使不滿足 flush_size 也會發送 template_overwrite => true //設置為true,將會把自定義的模板覆蓋logstash自帶模板 } }
到此就已經把Nginx上的日志轉發到Elasticsearch中。