ELK實時日志分析平台環境部署--完整記錄


 

在日常運維工作中,對於系統和業務日志的處理尤為重要。今天,在這里分享一下自己部署的ELK(+Redis)-開源實時日志分析平台的記錄過程(僅依據本人的實際操作為例說明,如有誤述,敬請指出)~

================概念介紹================
日志主要包括系統日志、應用程序日志和安全日志。系統運維和開發人員可以通過日志了解服務器軟硬件信息、檢查配置過程中的錯誤及錯誤發生的原因。經常分析日志可以了解服務器的負荷,性能安全性,從而及時采取措施糾正錯誤。

通常,日志被分散在儲存不同的設備上。如果你管理數十上百台服務器,你還在使用依次登錄每台機器的傳統方法查閱日志。這樣是不是感覺很繁瑣和效率低下。當務之急我們使用集中化的日志管理,例如:開源的syslog,將所有服務器上的日志收集匯總。

集中化管理日志后,日志的統計和檢索又成為一件比較麻煩的事情,一般我們使用grep、awk和wc等Linux命令能實現檢索和統計,但是對於要求更高的查詢、排序和統計等要求和龐大的機器數量依然使用這樣的方法難免有點力不從心。

通過我們需要對日志進行集中化管理,將所有機器上的日志信息收集、匯總到一起。完整的日志數據具有非常重要的作用:
1)信息查找。通過檢索日志信息,定位相應的bug,找出解決方案。
2)服務診斷。通過對日志信息進行統計、分析,了解服務器的負荷和服務運行狀態,找出耗時請求進行優化等等。
3)數據分析。如果是格式化的log,可以做進一步的數據分析,統計、聚合出有意義的信息,比如根據請求中的商品id,找出TOP10用戶感興趣商品。

開源實時日志分析ELK平台能夠完美的解決我們上述的問題,ELK由ElasticSearch、Logstash和Kiabana三個開源工具組成:
1)ElasticSearch是一個基於Lucene的開源分布式搜索服務器。它的特點有:分布式,零配置,自動發現,索引自動分片,索引副本機制,restful風格接口,多數據源,自動搜索負載等。它提供了一個分布式多用戶能力的全文搜索引擎,基於RESTful web接口。Elasticsearch是用Java開發的,並作為Apache許可條款下的開放源碼發布,是第二流行的企業搜索引擎。設計用於雲計算中,能夠達到實時搜索,穩定,可靠,快速,安裝使用方便。
在elasticsearch中,所有節點的數據是均等的。
2)Logstash是一個完全開源的工具,它可以對你的日志進行收集、過濾、分析,支持大量的數據獲取方法,並將其存儲供以后使用(如搜索)。說到搜索,logstash帶有一個web界面,搜索和展示所有日志。一般工作方式為c/s架構,client端安裝在需要收集日志的主機上,server端負責將收到的各節點日志進行過濾、修改等操作在一並發往elasticsearch上去。
3)Kibana 是一個基於瀏覽器頁面的Elasticsearch前端展示工具,也是一個開源和免費的工具,Kibana可以為 Logstash 和 ElasticSearch 提供的日志分析友好的 Web 界面,可以幫助您匯總、分析和搜索重要數據日志。

為什么要用到ELK?
一般我們需要進行日志分析場景是:直接在日志文件中 grep、awk 就可以獲得自己想要的信息。但在規模較大的場景中,此方法效率低下,面臨問題包括日志量太大如何歸檔、文本搜索太慢怎么辦、如何多維度查詢。需要集中化的日志管理,所有服務器上的日志收集匯總。常見解決思路是建立集中式日志收集系統,將所有節點上的日志統一收集,管理,訪問。
一般大型系統是一個分布式部署的架構,不同的服務模塊部署在不同的服務器上,問題出現時,大部分情況需要根據問題暴露的關鍵信息,定位到具體的服務器和服務模塊,構建一套集中式日志系統,可以提高定位問題的效率。

一般大型系統是一個分布式部署的架構,不同的服務模塊部署在不同的服務器上,問題出現時,大部分情況需要根據問題暴露的關鍵信息,定位到具體的服務器和服務模塊,構建一套集中式日志系統,可以提高定位問題的效率。
一個完整的集中式日志系統,需要包含以下幾個主要特點:
1)收集-能夠采集多種來源的日志數據
2)傳輸-能夠穩定的把日志數據傳輸到中央系統
3)存儲-如何存儲日志數據
4)分析-可以支持 UI 分析
5)警告-能夠提供錯誤報告,監控機制

ELK提供了一整套解決方案,並且都是開源軟件,之間互相配合使用,完美銜接,高效的滿足了很多場合的應用。目前主流的一種日志系統。

ELK工作原理展示圖:

如上圖:Logstash收集AppServer產生的Log,並存放到ElasticSearch集群中,而Kibana則從ES集群中查詢數據生成圖表,再返回給Browser。

Logstash工作原理:
Logstash事件處理有三個階段:inputs → filters → outputs。是一個接收,處理,轉發日志的工具。支持系統日志,webserver日志,錯誤日志,應用日志,總之包括所有可以拋出來的日志類型。

Input:輸入數據到logstash。

一些常用的輸入為:
file:從文件系統的文件中讀取,類似於tial -f命令
syslog:在514端口上監聽系統日志消息,並根據RFC3164標准進行解析
redis:從redis service中讀取
beats:從filebeat中讀取
Filters:數據中間處理,對數據進行操作。

一些常用的過濾器為:
grok:解析任意文本數據,Grok 是 Logstash 最重要的插件。它的主要作用就是將文本格式的字符串,轉換成為具體的結構化的數據,配合正則表達式使用。內置120多個解析語法。
mutate:對字段進行轉換。例如對字段進行刪除、替換、修改、重命名等。
drop:丟棄一部分events不進行處理。
clone:拷貝 event,這個過程中也可以添加或移除字段。
geoip:添加地理信息(為前台kibana圖形化展示使用)
Outputs:outputs是logstash處理管道的最末端組件。一個event可以在處理過程中經過多重輸出,但是一旦所有的outputs都執行結束,這個event也就完成生命周期。

一些常見的outputs為:
elasticsearch:可以高效的保存數據,並且能夠方便和簡單的進行查詢。
file:將event數據保存到文件中。
graphite:將event數據發送到圖形化組件中,一個很流行的開源存儲圖形化展示的組件。
Codecs:codecs 是基於數據流的過濾器,它可以作為input,output的一部分配置。Codecs可以幫助你輕松的分割發送過來已經被序列化的數據。

一些常見的codecs:
json:使用json格式對數據進行編碼/解碼。
multiline:將匯多個事件中數據匯總為一個單一的行。比如:java異常信息和堆棧信息。

======================ELK整體方案=======================
ELK中的三個系統分別扮演不同的角色,組成了一個整體的解決方案。Logstash是一個ETL工具,負責從每台機器抓取日志數據,對數據進行格式轉換和處理后,輸出到Elasticsearch中存儲。Elasticsearch是一個分布式搜索引擎和分析引擎,用於數據存儲,可提供實時的數據查詢。Kibana是一個數據可視化服務,根據用戶的操作從Elasticsearch中查詢數據,形成相應的分析結果,以圖表的形式展現給用戶。
ELK的安裝很簡單,可以按照"下載->修改配置文件->啟動"方法分別部署三個系統,也可以使用docker來快速部署。具體的安裝方法這里不詳細介紹,下面來看一個常見的部署方案,如下圖所示,部署思路是:
1)在每台生成日志文件的機器上,部署Logstash,作為Shipper的角色,負責從日志文件中提取數據,但是不做任何處理,直接將數據輸出到Redis隊列(list)中;
2)需要一台機器部署Logstash,作為Indexer的角色,負責從Redis中取出數據,對數據進行格式化和相關處理后,輸出到Elasticsearch中存儲;
3)部署Elasticsearch集群,當然取決於你的數據量了,數據量小的話可以使用單台服務,如果做集群的話,最好是有3個以上節點,同時還需要部署相關的監控插件;
4)部署Kibana服務,提供Web服務。

在前期部署階段,主要工作是Logstash節點和Elasticsearch集群的部署,而在后期使用階段,主要工作就是Elasticsearch集群的監控和使用Kibana來檢索、分析日志數據了,當然也可以直接編寫程序來消費Elasticsearch中的數據。

在上面的部署方案中,我們將Logstash分為Shipper和Indexer兩種角色來完成不同的工作,中間通過Redis做數據管道,為什么要這樣做?為什么不是直接在每台機器上使用Logstash提取數據、處理、存入Elasticsearch?

首先,采用這樣的架構部署,有三點優勢:第一,降低對日志所在機器的影響,這些機器上一般都部署着反向代理或應用服務,本身負載就很重了,所以盡可能的在這些機器上少做事;第二,如果有很多台機器需要做日志收集,那么讓每台機器都向Elasticsearch持續寫入數據,必然會對Elasticsearch造成壓力,因此需要對數據進行緩沖,同時,這樣的緩沖也可以一定程度的保護數據不丟失;第三,將日志數據的格式化與處理放到Indexer中統一做,可以在一處修改代碼、部署,避免需要到多台機器上去修改配置。 

其次,我們需要做的是將數據放入一個消息隊列中進行緩沖,所以Redis只是其中一個選擇,也可以是RabbitMQ、Kafka等等,在實際生產中,Redis與Kafka用的比較多。由於Redis集群一般都是通過key來做分片,無法對list類型做集群,在數據量大的時候必然不合適了,而Kafka天生就是分布式的消息隊列系統。

1)配置nginx日志格式
首先需要將nginx日志格式規范化,便於做解析處理。在nginx.conf文件中設置:

log_format main '$remote_addr "$time_iso8601" "$request" $status $body_bytes_sent "$http_user_agent" "$http_referer" "$http_x_forwarded_for" "$request_time" "$upstream_response_time" "$http_cookie" "$http_Authorization" "$http_token"';
access_log  /var/log/nginx/example.access.log  main;

2)nginx日志–>>Logstash–>>消息隊列
這部分是Logstash Shipper的工作,涉及input和output兩種插件。input部分,由於需要提取的是日志文件,一般使用file插件,該插件常用的幾個參數是:
path:指定日志文件路徑。
type:指定一個名稱,設置type后,可以在后面的filter和output中對不同的type做不同的處理,適用於需要消費多個日志文件的場景。
start_position:指定起始讀取位置,“beginning”表示從文件頭開始,“end”表示從文件尾開始(類似tail -f)。
sincedb_path:與Logstash的一個坑有關。通常Logstash會記錄每個文件已經被讀取到的位置,保存在sincedb中,如果Logstash重啟,那么對於同一個文件,會繼續從上次記錄的位置開始讀取。如果想重新從頭讀取文件,需要刪除sincedb文件,sincedb_path則是指定了該文件的路徑。為了方便,我們可以根據需要將其設置為“/dev/null”,即不保存位置信息。

input {
    file {
        type => "example_nginx_access"
        path => ["/var/log/nginx/example.access.log"]

        start_position => "beginning"
        sincedb_path => "/dev/null"
    }
}

output部分,將數據輸出到消息隊列,以redis為例,需要指定redis server和list key名稱。另外,在測試階段,可以使用stdout來查看輸出信息。

# 輸出到redis
output {
    if [type] == "example_nginx_access" {
        redis {
            host => "127.0.0.1"
            port => "6379"
            data_type => "list"
            key => "logstash:example_nginx_access"
        }
      #  stdout {codec => rubydebug}
    }
}

3)消息隊列–>>Logstash–>>Elasticsearch
這部分是Logstash Indexer的工作,涉及input、filter和output三種插件。在input部分,我們通過redis插件將數據從消息隊列中取出來。在output部分,我們通過elasticsearch插件將數據寫入Elasticsearch。

# 從redis輸入數據
input {
    redis {
            host => "127.0.0.1"
            port => "6379"
            data_type => "list"
            key => "logstash:example_nginx_access"
    }
}

output {
    elasticsearch {
        index => "logstash-example-nginx-%{+YYYY.MM}"
        hosts => ["127.0.0.1:9200"]
    }
}

這里,需要重點關注filter部分,下面列舉幾個常用的插件,實際使用中根據自身需求從官方文檔中查找適合自己業務的插件並使用即可,當然也可以編寫自己的插件。
grok:是Logstash最重要的一個插件,用於將非結構化的文本數據轉化為結構化的數據。grok內部使用正則語法對文本數據進行匹配,為了降低使用復雜度,其提供了一組pattern,我們可以直接調用pattern而不需要自己寫正則表達式,參考源碼grok-patterns。grok解析文本的語法格式是%{SYNTAX:SEMANTIC},SYNTAX是pattern名稱,SEMANTIC是需要生成的字段名稱,使用工具Grok Debugger可以對解析語法進行調試。例如,在下面的配置中,我們先使用grok對輸入的原始nginx日志信息(默認以message作為字段名)進行解析,並添加新的字段request_path_with_verb(該字段的值是verb和request_path的組合),然后對request_path字段做進一步解析。
kv:用於將某個字段的值進行分解,類似於編程語言中的字符串Split。在下面的配置中,我們將request_args字段值按照“&”進行分解,分解后的字段名稱以“request_args_”作為前綴,並且丟棄重復的字段。
geoip:用於根據IP信息生成地理位置信息,默認使用自帶的一份GeoLiteCity database,也可以自己更換為最新的數據庫,但是需要數據格式需要遵循Maxmind的格式(參考GeoLite),似乎目前只能支持legacy database,數據類型必須是.dat。下載GeoLiteCity.dat.gz后解壓, 並將文件路徑配置到source中即可。
translate:用於檢測某字段的值是否符合條件,如果符合條件則將其翻譯成新的值,寫入一個新的字段,匹配pattern可以通過YAML文件來配置。例如,在下面的配置中,我們對request_api字段翻譯成更加易懂的文字描述。

filter {
    grok {
        match => {"message" => "%{IPORHOST:client_ip} \"%{TIMESTAMP_ISO8601:timestamp}\" \"%{WORD:verb} %{NOTSPACE:request_path} HTTP/%{NUMBER:httpversion}\" %{NUMBER:response_status:int} %{NUMBER:response_body_bytes:int} \"%{DATA:user_agent}\" \"%{DATA:http_referer}\" \"%{NOTSPACE:http_x_forwarder_for}\" \"%{NUMBER:request_time:float}\" \"%{DATA:upstream_resopnse_time}\" \"%{DATA:http_cookie}\" \"%{DATA:http_authorization}\" \"%{DATA:http_token}\""}
        add_field => {"request_path_with_verb" => "%{verb} %{request_path}"}
    }

    grok {
        match => {"request_path" => "%{URIPATH:request_api}(?:\?%{NOTSPACE:request_args}|)"}
        add_field => {"request_annotation" => "%{request_api}"}
    }

    kv {
        prefix => "request_args_"
        field_split => "&"
        source => "request_args"
        allow_duplicate_values => false
    }

    geoip {
        source => "client_ip"
        database => "/home/elktest/geoip_data/GeoLiteCity.dat"
    }

   translate {
        field => request_path
        destination => request_annotation
        regex => true
        exact => true
        dictionary_path => "/home/elktest/api_annotation.yaml"
        override => true
    }
}

Elasticsearch
Elasticsearch承載了數據存儲和查詢的功能,其基礎概念和使用方法可以參考另一篇博文Elasticsearch使用總結,這里主要介紹些實際生產中的問題和方法:
1)關於集群配置,重點關注三個參數:第一,discovery.zen.ping.unicast.hosts,Elasticsearch默認使用Zen Discovery來做節點發現機制,推薦使用unicast來做通信方式,在該配置項中列舉出Master節點。第二,discovery.zen.minimum_master_nodes,該參數表示集群中可工作的具有Master節點資格的最小數量,默認值是1。為了提高集群的可用性,避免腦裂現象(所謂腦裂,就是同一個集群中的不同節點,對集群的狀態有不一致的理解。),官方推薦設置為(N/2)+1,其中N是具有Master資格的節點的數量。第三,discovery.zen.ping_timeout,表示節點在發現過程中的等待時間,默認值是3秒,可以根據自身網絡環境進行調整,一定程度上提供可用性。

discovery.zen.ping.unicast.hosts: ["master1", "master2", "master3"] 
discovery.zen.minimum_master_nodes: 2
discovery.zen.ping_timeout: 10

2)關於集群節點,第一,節點類型包括:候選Master節點、數據節點和Client節點。通過設置兩個配置項node.master和node.data為true或false,來決定將一個節點分配為什么類型的節點。第二,盡量將候選Master節點和Data節點分離開,通常Data節點負載較重,需要考慮單獨部署。
3)關於內存,Elasticsearch默認設置的內存是1GB,對於任何一個業務部署來說,這個都太小了。通過指定ES_HEAP_SIZE環境變量,可以修改其堆內存大小,服務進程在啟動時候會讀取這個變量,並相應的設置堆的大小。建議設置系統內存的一半給Elasticsearch,但是不要超過32GB。參考官方文檔。
4)關於硬盤空間,Elasticsearch默認將數據存儲在/var/lib/elasticsearch路徑下,隨着數據的增長,一定會出現硬盤空間不夠用的情形,此時就需要給機器掛載新的硬盤,並將Elasticsearch的路徑配置到新硬盤的路徑下。通過“path.data”配置項來進行設置,比如“path.data: /data1,/var/lib/elasticsearch,/data”。需要注意的是,同一分片下的數據只能寫入到一個路徑下,因此還是需要合理的規划和監控硬盤的使用。
5)關於Index的划分和分片的個數,這個需要根據數據量來做權衡了,Index可以按時間划分,比如每月一個或者每天一個,在Logstash輸出時進行配置,shard的數量也需要做好控制。
6)關於監控,筆者使用過head和marvel兩個監控插件,head免費,功能相對有限,marvel現在需要收費了。另外,不要在數據節點開啟監控插件。

Kibana
Kibana提供的是數據查詢和顯示的Web服務,有豐富的圖表樣板,能滿足大部分的數據可視化需求,這也是很多人選擇ELK的主要原因之一。UI的操作沒有什么特別需要介紹的,經常使用就會熟練,這里主要介紹經常遇到的三個問題。
a)查詢語法
在Kibana的Discover頁面中,可以輸入一個查詢條件來查詢所需的數據。查詢條件的寫法使用的是Elasticsearch的Query String語法,而不是Query DSL,參考官方文檔query-string-syntax,這里列舉其中部分常用的:
.單字段的全文檢索,比如搜索args字段中包含first的文檔,寫作 args:first;
.單字段的精確檢索,比如搜索args字段值為first的文檔,寫作 args: “first”;
.多個檢索條件的組合,使用 NOT, AND 和 OR 來組合,注意必須是大寫,比如 args:(“first” OR “second”) AND NOT agent: “third”;
.字段是否存在,_exists_:agent表示要求agent字段存在,_missing_:agent表示要求agent字段不存在;
.通配符:用 ? 表示單字母,* 表示任意個字母。
b)錯誤“Discover: Request Timeout after 30000ms”
這個錯誤經常發生在要查詢的數據量比較大的情況下,此時Elasticsearch需要較長時間才能返回,導致Kibana發生Timeout報錯。解決這個問題的方法,就是在Kibana的配置文件中修改elasticsearch.requestTimeout一項的值,然后重啟Kibana服務即可,注意單位是ms。
c)疑惑“字符串被分解了
經常碰到這樣一個問題:為什么查詢結果的字段值是正確的,可是做圖表時卻發現字段值被分解了,不是想要的結果?如下圖所示的client_agent_info字段。

得到這樣一個不正確結果的原因是使用了Analyzed字段來做圖表分析,默認情況下Elasticsearch會對字符串數據進行分析,建立倒排索引,所以如果對這么一個字段進行terms聚合,必然會得到上面所示的錯誤結果了。那么應該怎么做才對?默認情況下,Elasticsearch還會創建一個相對應的沒有被Analyzed的字段,即帶“.raw”后綴的字段,在這樣的字段上做聚合分析即可。
又會有很多人問這樣的問題:為什么我的Elasticsearch沒有自動創建帶“.raw”后綴的字段?然而在Logstash中輸出數據時,設置index名稱前綴為“logstash-”就有了這個字段。這個問題的根源是Elasticsearch的dynamic template在搗鬼,dynamic temlate用於指導Elasticsearch如何為插入的數據自動建立Schema映射關系,默認情況下,Logstash會在Elasticsearch中建立一個名為“logstash”的模板,所有前綴為“logstash-”的index都會參照這個模板來建立映射關系,在該模板中申明了要為每個字符串數據建立一個額外的帶“.raw”后綴的字段。可以向Elasticsearch來查詢你的模板,使用API:GET http://localhost:9200/_template。

以上便是對ELK日志系統的總結介紹,還有一個重要的功能沒有提到,就是如何將日志數據與自身產品業務的數據融合起來。舉個例子,在nginx日志中,通常會包含API請求訪問時攜帶的用戶Token信息,由於Token是有時效性的,我們需要及時將這些Token轉換成真實的用戶信息存儲下來。這樣的需求通常有兩種實現方式,一種是自己寫一個Logstash filter,然后在Logstash處理數據時調用;另一種是將Logstash Indexer產生的數據再次輸出到消息隊列中,由我們自己的腳本程序從消息隊列中取出數據,做相應的業務處理后,輸出到Elasticsearch中。

==================ELK環境部署==================

(0)基礎環境介紹

系統: Centos7.1
防火牆: 關閉
Sellinux: 關閉

機器環境: 兩台
elk-node1: 192.168.1.160       #master機器
elk-node2:192.168.1.161      #slave機器

注明:
master-slave模式:
master收集到日志后,會把一部分數據碎片到salve上(隨機的一部分數據);同時,master和slave又都會各自做副本,並把副本放到對方機器上,這樣就保證了數據不會丟失。
如果master宕機了,那么客戶端在日志采集配置中將elasticsearch主機指向改為slave,就可以保證ELK日志的正常采集和web展示。

==========================================================================
由於elk-node1和elk-node2兩台是虛擬機,沒有外網ip,所以訪問需要通過宿主機進行代理轉發實現。

有以下兩種轉發設置:(任選其一)

通過訪問宿主機的19200,19201端口分別轉發到elk-node1,elk-node2的9200端口
通過訪問宿主機的15601端口轉發到elk-node1的5601端口

宿主機:112.110.115.10(內網ip為192.168.1.7)  (為了不讓線上的真實ip暴露,這里任意給了一個ip做記錄)

a)通過宿主機的haproxy服務進行代理轉發,如下是宿主機上的代理配置:

[root@kvm-server conf]# pwd
/usr/local/haproxy/conf
[root@kvm-server conf]# cat haproxy.cfg
..........
..........
listen node1-9200 0.0.0.0:19200
mode tcp
option tcplog
balance roundrobin
server 192.168.1.160 192.168.1.160:9200 weight 1 check inter 1s rise 2 fall 2

listen node2-9200 0.0.0.0:19201
mode tcp
option tcplog
balance roundrobin
server 192.168.1.161 192.168.1.161:9200 weight 1 check inter 1s rise 2 fall 2

listen node1-5601 0.0.0.0:15601
mode tcp
option tcplog
balance roundrobin
server 192.168.1.160 192.168.1.160:5601 weight 1 check inter 1s rise 2 fall 2

重啟haproxy服務
[root@kvm-server conf]# /etc/init.d/haproxy restart

設置宿主機防火牆
[root@kvm-server conf]# cat /etc/sysconfig/iptables
.........
-A INPUT -p tcp -m state --state NEW -m tcp --dport 19200 -j ACCEPT
-A INPUT -p tcp -m state --state NEW -m tcp --dport 19201 -j ACCEPT
-A INPUT -p tcp -m state --state NEW -m tcp --dport 15601 -j ACCEPT

[root@kvm-server conf]# /etc/init.d/iptables restart

b)通過宿主機的NAT端口轉發實現

[root@kvm-server conf]# iptables -t nat -A PREROUTING -p tcp -m tcp --dport 19200 -j DNAT --to-destination 192.168.1.160:9200
[root@kvm-server conf]# iptables -t nat -A POSTROUTING -d 192.168.1.160/32 -p tcp -m tcp --sport 9200 -j SNAT --to-source 192.168.1.7
[root@kvm-server conf]# iptables -t filter -A INPUT -p tcp -m state --state NEW -m tcp --dport 19200 -j ACCEPT

[root@kvm-server conf]# iptables -t nat -A PREROUTING -p tcp -m tcp --dport 19201 -j DNAT --to-destination 192.168.1.161:9200
[root@kvm-server conf]# iptables -t nat -A POSTROUTING -d 192.168.1.161/32 -p tcp -m tcp --sport 9200 -j SNAT --to-source 192.168.1.7
[root@kvm-server conf]# iptables -t filter -A INPUT -p tcp -m state --state NEW -m tcp --dport 19201 -j ACCEPT

[root@kvm-server conf]# iptables -t nat -A PREROUTING -p tcp -m tcp --dport 15601 -j DNAT --to-destination 192.168.1.160:5601
[root@kvm-server conf]# iptables -t nat -A POSTROUTING -d 192.168.1.160/32 -p tcp -m tcp --sport 5601 -j SNAT --to-source 192.168.1.7
[root@kvm-server conf]# iptables -t filter -A INPUT -p tcp -m state --state NEW -m tcp --dport 15601 -j ACCEPT

[root@kvm-server conf]# service iptables save
[root@kvm-server conf]# service iptables restart

提醒一點:
nat端口轉發設置成功后,/etc/sysconfig/iptables文件里要注釋掉下面兩行!不然nat轉發會有問題!一般如上面在nat轉發規則設置好並save和restart防火牆之后就會自動在/etc/sysconfig/iptables文件里刪除掉下面兩行內容了。
[root@kvm-server conf]# vim /etc/sysconfig/iptables
..........
#-A INPUT -j REJECT --reject-with icmp-host-prohibited
#-A FORWARD -j REJECT --reject-with icmp-host-prohibited
[root@linux-node1 ~]# service iptables restart

=============================================================

(1)Elasticsearch安裝配置

基礎環境安裝(elk-node1和elk-node2同時操作)

1)下載並安裝GPG Key
[root@elk-node1 ~]# rpm --import https://packages.elastic.co/GPG-KEY-elasticsearch

2)添加yum倉庫
[root@elk-node1 ~]# vim /etc/yum.repos.d/elasticsearch.repo
[elasticsearch-2.x]
name=Elasticsearch repository for 2.x packages
baseurl=http://packages.elastic.co/elasticsearch/2.x/centos
gpgcheck=1
gpgkey=http://packages.elastic.co/GPG-KEY-elasticsearch
enabled=1

3)安裝elasticsearch
[root@elk-node1 ~]# yum install -y elasticsearch

4)安裝相關測試軟件
#提前先下載安裝epel源:epel-release-latest-7.noarch.rpm,否則yum會報錯:No Package.....
[root@elk-node1 ~]# wget http://dl.fedoraproject.org/pub/epel/epel-release-latest-7.noarch.rpm
[root@elk-node1 ~]# rpm -ivh epel-release-latest-7.noarch.rpm
#安裝Redis
[root@elk-node1 ~]# yum install -y redis
#安裝Nginx
[root@elk-node1 ~]# yum install -y nginx
#安裝java
[root@elk-node1 ~]# yum install -y java

安裝完java后,檢測
[root@elk-node1 ~]# java -version
openjdk version "1.8.0_102"
OpenJDK Runtime Environment (build 1.8.0_102-b14)
OpenJDK 64-Bit Server VM (build 25.102-b14, mixed mode)

配置部署(下面先進行elk-node1的配置)

1)配置修改配置文件
[root@elk-node1 ~]# mkdir -p /data/es-data
[root@elk-node1 ~]# vim /etc/elasticsearch/elasticsearch.yml                               【將里面內容情況,配置下面內容】
cluster.name: huanqiu                            # 組名(同一個組,組名必須一致)
node.name: elk-node1                            # 節點名稱,建議和主機名一致
path.data: /data/es-data                         # 數據存放的路徑
path.logs: /var/log/elasticsearch/             # 日志存放的路徑
bootstrap.mlockall: true                         # 鎖住內存,不被使用到交換分區去
network.host: 0.0.0.0                            # 網絡設置
http.port: 9200                                    # 端口

2)啟動並查看
[root@elk-node1 ~]# chown -R elasticsearch.elasticsearch /data/
[root@elk-node1 ~]# systemctl start elasticsearch
[root@elk-node1 ~]# systemctl status elasticsearch
CGroup: /system.slice/elasticsearch.service
└─3005 /bin/java -Xms256m -Xmx1g -Djava.awt.headless=true -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:CMSI...

注意:上面可以看出elasticsearch設置的內存最小256m,最大1g

=====================溫馨提示:  Elasticsearch啟動出現"could not find java"===================

yum方法安裝elasticsearch, 使用"systemctl start elasticsearch"啟動服務失敗.
"systemctl status elasticsearch"查看, 發現報錯說could not find java
但是"java -version" 查看發現java已經安裝了

這是因為elasticsearch在啟動過程中, 引用的java路徑找不到

解決辦法: 在elasticsearch配置文件中定義java全路徑

[root@elk-node01 ~]# java -version
java version "1.8.0_131"
Java(TM) SE Runtime Environment (build 1.8.0_131-b11)
Java HotSpot(TM) 64-Bit Server VM (build 25.131-b11, mixed mode

[root@elk-node01 ~]# find / -name java
/var/lib/alternatives/java
/usr/share/swig/2.0.10/java
/usr/java
/usr/java/jdk1.8.0_131/bin/java
/usr/java/jdk1.8.0_131/jre/bin/java
/usr/bin/java
/etc/pki/java
/etc/pki/ca-trust/extracted/java
/etc/alternatives/java

[root@elk-node01 ~]# vim /etc/sysconfig/elasticsearch
添加JAVA_HOME環境變量的配置
JAVA_HOME=/usr/java/jdk1.8.0_131

[root@linux-node1 src]# netstat -antlp |egrep "9200|9300"
tcp6 0 0 :::9200 :::* LISTEN 3005/java
tcp6 0 0 :::9300 :::* LISTEN 3005/java

然后通過web訪問(訪問的瀏覽器最好用google瀏覽器)

http://112.110.115.10:19200/

3)通過命令的方式查看數據(在112.110.115.10宿主機或其他外網服務器上查看,如下)
[root@kvm-server src]# curl -i -XGET 'http://192.168.1.160:9200/_count?pretty' -d '{"query":{"match_all":{}}}'
HTTP/1.1 200 OK
Content-Type: application/json; charset=UTF-8
Content-Length: 95

{
"count" : 0,
"_shards" : {
"total" : 0,
"successful" : 0,
"failed" : 0
}
}

這樣感覺用命令來查看,特別的不爽。

4)接下來安裝插件,使用插件進行查看~  (下面兩個插件要在elk-node1和elk-node2上都要安裝)
4.1)安裝head插件
==================================================================
a)插件安裝方法一
[root@elk-node1 src]# /usr/share/elasticsearch/bin/plugin install mobz/elasticsearch-head

b)插件安裝方法二
首先下載head插件,下載到/usr/loca/src目錄下
下載地址:https://github.com/mobz/elasticsearch-head

======================================================
head插件包百度雲盤下載:https://pan.baidu.com/s/1boBE0qj
提取密碼:ifj7
======================================================

[root@elk-node1 src]# unzip elasticsearch-head-master.zip
[root@elk-node1 src]# ls
elasticsearch-head-master elasticsearch-head-master.zip

在/usr/share/elasticsearch/plugins目錄下創建head目錄
然后將上面下載的elasticsearch-head-master.zip解壓后的文件都移到/usr/share/elasticsearch/plugins/head下
接着重啟elasticsearch服務即可!
[root@elk-node1 src]# cd /usr/share/elasticsearch/plugins/
[root@elk-node1 plugins]# mkdir head
[root@elk-node1 plugins]# ls
head
[root@elk-node1 plugins]# cd head
[root@elk-node1 head]# cp -r /usr/local/src/elasticsearch-head-master/* ./
[root@elk-node1 head]# pwd
/usr/share/elasticsearch/plugins/head

[root@elk-node1 head]# chown -R elasticsearch:elasticsearch /usr/share/elasticsearch/plugins
[root@elk-node1 head]# ll
total 40
-rw-r--r--. 1 elasticsearch elasticsearch 104 Sep 28 01:57 elasticsearch-head.sublime-project
-rw-r--r--. 1 elasticsearch elasticsearch 2171 Sep 28 01:57 Gruntfile.js
-rw-r--r--. 1 elasticsearch elasticsearch 3482 Sep 28 01:57 grunt_fileSets.js
-rw-r--r--. 1 elasticsearch elasticsearch 1085 Sep 28 01:57 index.html
-rw-r--r--. 1 elasticsearch elasticsearch 559 Sep 28 01:57 LICENCE
-rw-r--r--. 1 elasticsearch elasticsearch 795 Sep 28 01:57 package.json
-rw-r--r--. 1 elasticsearch elasticsearch 100 Sep 28 01:57 plugin-descriptor.properties
-rw-r--r--. 1 elasticsearch elasticsearch 5211 Sep 28 01:57 README.textile
drwxr-xr-x. 5 elasticsearch elasticsearch 4096 Sep 28 01:57 _site
drwxr-xr-x. 4 elasticsearch elasticsearch 29 Sep 28 01:57 src
drwxr-xr-x. 4 elasticsearch elasticsearch 66 Sep 28 01:57 test

[root@elk-node1 _site]# systemctl restart elasticsearch
=========================================================================

插件訪問(最好提前將elk-node2節點的配置和插件都安裝后,再來進行訪問和數據插入測試)
http://112.110.115.10:19200/_plugin/head/

先插入數據實例,測試下
如下:打開”復合查詢“,在POST選項下,任意輸入如/index-demo/test,然后在下面輸入數據(注意內容之間換行的逗號不要漏掉)
數據輸入好之后(如下輸入wangshibo;hello world內容),下面點擊”驗證JSON“->”提交請求“,提交成功后,觀察右欄里出現的信息:有index,type,version等信息,failed:0(成功消息)

再查看測試實例,如下:
"復合查詢"下,選擇GET選項在/index-demo/test/后面輸入上面POST結果中的id號不輸入內容,即{}括號里為空
然后點擊”驗證JSON“->"提交請求",觀察右欄內就有了上面插入的數據了(即wangshibo,hello world)

打開"基本查詢",查看下數據,如下,即可查詢到上面插入的數據:

打開“數據瀏覽”,也能查看到插入的數據:

如下:一定要提前在elk-node2節點上也完成配置(配置內容在下面提到),否則上面插入數據后,集群狀態會呈現黃色yellow狀態,elk-node2完成配置加入到集群里后就會恢復到正常的綠色狀態。

4.2)安裝kopf監控插件
==========================================================================

a)監控插件安裝方法一

[root@elk-node1 src]# /usr/share/elasticsearch/bin/plugin install lmenezes/elasticsearch-kopf

b)監控插件安裝方法二

首先下載監控插件kopf,下載到/usr/loca/src目錄下
下載地址:https://github.com/lmenezes/elasticsearch-kopf

====================================================
kopf插件包百度雲盤下載:https://pan.baidu.com/s/1qYixSL2
提取密碼:ya4t
===================================================

[root@elk-node1 src]# unzip elasticsearch-kopf-master.zip
[root@elk-node1 src]# ls
elasticsearch-kopf-master elasticsearch-kopf-master.zip

在/usr/share/elasticsearch/plugins目錄下創建kopf目錄
然后將上面下載的elasticsearch-kopf-master.zip解壓后的文件都移到/usr/share/elasticsearch/plugins/kopf下
接着重啟elasticsearch服務即可!
[root@elk-node1 src]# cd /usr/share/elasticsearch/plugins/
[root@elk-node1 plugins]# mkdir kopf
[root@elk-node1 plugins]# cd kopf
[root@elk-node1 kopf]# cp -r /usr/local/src/elasticsearch-kopf-master/* ./
[root@elk-node1 kopf]# pwd
/usr/share/elasticsearch/plugins/kopf

[root@elk-node1 kopf]# chown -R elasticsearch:elasticsearch /usr/share/elasticsearch/plugins
[root@elk-node1 kopf]# ll
total 40
-rw-r--r--. 1 elasticsearch elasticsearch 237 Sep 28 16:28 CHANGELOG.md
drwxr-xr-x. 2 elasticsearch elasticsearch 22 Sep 28 16:28 dataset
drwxr-xr-x. 2 elasticsearch elasticsearch 73 Sep 28 16:28 docker
-rw-r--r--. 1 elasticsearch elasticsearch 4315 Sep 28 16:28 Gruntfile.js
drwxr-xr-x. 2 elasticsearch elasticsearch 4096 Sep 28 16:28 imgs
-rw-r--r--. 1 elasticsearch elasticsearch 1083 Sep 28 16:28 LICENSE
-rw-r--r--. 1 elasticsearch elasticsearch 1276 Sep 28 16:28 package.json
-rw-r--r--. 1 elasticsearch elasticsearch 102 Sep 28 16:28 plugin-descriptor.properties
-rw-r--r--. 1 elasticsearch elasticsearch 3165 Sep 28 16:28 README.md
drwxr-xr-x. 6 elasticsearch elasticsearch 4096 Sep 28 16:28 _site
drwxr-xr-x. 4 elasticsearch elasticsearch 27 Sep 28 16:28 src
drwxr-xr-x. 4 elasticsearch elasticsearch 4096 Sep 28 16:28 tests

[root@elk-node1 _site]# systemctl restart elasticsearch

============================================================================

訪問插件:(如下,同樣要提前安裝好elk-node2節點上的插件,否則訪問時會出現集群節點為黃色的yellow告警狀態)

http://112.110.115.10:19200/_plugin/kopf/#!/cluster

*************************************************************************
下面進行節點elk-node2的配置  (如上的兩個插件也在elk-node2上同樣安裝)

注釋:其實兩個的安裝配置基本上是一樣的。

[root@elk-node2 src]# mkdir -p /data/es-data
[root@elk-node2 ~]# cat /etc/elasticsearch/elasticsearch.yml
cluster.name: huanqiu
node.name: elk-node2
path.data: /data/es-data
path.logs: /var/log/elasticsearch/
bootstrap.mlockall: true
network.host: 0.0.0.0
http.port: 9200
discovery.zen.ping.multicast.enabled: false
discovery.zen.ping.unicast.hosts: ["192.168.1.160", "192.168.1.161"]

# 修改權限配置
[root@elk-node2 src]# chown -R elasticsearch.elasticsearch /data/

# 啟動服務
[root@elk-node2 src]# systemctl start elasticsearch
[root@elk-node2 src]# systemctl status elasticsearch
● elasticsearch.service - Elasticsearch
Loaded: loaded (/usr/lib/systemd/system/elasticsearch.service; enabled; vendor preset: disabled)
Active: active (running) since Wed 2016-09-28 16:49:41 CST; 1 weeks 3 days ago
Docs: http://www.elastic.co
Process: 17798 ExecStartPre=/usr/share/elasticsearch/bin/elasticsearch-systemd-pre-exec (code=exited, status=0/SUCCESS)
Main PID: 17800 (java)
CGroup: /system.slice/elasticsearch.service
└─17800 /bin/java -Xms256m -Xmx1g -Djava.awt.headless=true -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFra...

Oct 09 13:42:22 elk-node2 elasticsearch[17800]: [2016-10-09 13:42:22,295][WARN ][transport ] [elk-node2] Transport res...943817]
Oct 09 13:42:23 elk-node2 elasticsearch[17800]: [2016-10-09 13:42:23,111][WARN ][transport ] [elk-node2] Transport res...943846]
................
................

# 查看端口
[root@elk-node2 src]# netstat -antlp|egrep "9200|9300"
tcp6 0 0 :::9200 :::* LISTEN 2928/java
tcp6 0 0 :::9300 :::* LISTEN 2928/java
tcp6 0 0 127.0.0.1:48200 127.0.0.1:9300 TIME_WAIT -
tcp6 0 0 ::1:41892 ::1:9300 TIME_WAIT -
*************************************************************************

通過命令的方式查看elk-node2數據(在112.110.115.10宿主機或其他外網服務器上查看,如下)
[root@kvm-server ~]# curl -i -XGET 'http://192.168.1.161:9200/_count?pretty' -d '{"query":{"match_all":{}}}'
HTTP/1.1 200 OK
Content-Type: application/json; charset=UTF-8
Content-Length: 95

{
"count" : 1,
"_shards" : {
"total" : 5,
"successful" : 5,
"failed" : 0
}

然后通過web訪問elk-node2
http://112.110.115.10:19201/

訪問兩個插件:
http://112.110.115.10:19201/_plugin/head/
http://112.110.115.10:19201/_plugin/kopf/#!/cluster

 

 (2)Logstash安裝配置(這個在客戶機上是要安裝的。elk-node1和elk-node2都安裝)

基礎環境安裝(客戶端安裝logstash,收集到的數據寫入到elasticsearch里,就可以登陸logstash界面查看到了

1)下載並安裝GPG Key
[root@elk-node1 ~]# rpm --import https://packages.elastic.co/GPG-KEY-elasticsearch

2)添加yum倉庫
[root@hadoop-node1 ~]# vim /etc/yum.repos.d/logstash.repo
[logstash-2.1]
name=Logstash repository for 2.1.x packages
baseurl=http://packages.elastic.co/logstash/2.1/centos
gpgcheck=1
gpgkey=http://packages.elastic.co/GPG-KEY-elasticsearch
enabled=1

3)安裝logstash
[root@elk-node1 ~]# yum install -y logstash

4)logstash啟動
[root@elk-node1 ~]# systemctl start elasticsearch
[root@elk-node1 ~]# systemctl status elasticsearch
● elasticsearch.service - Elasticsearch
Loaded: loaded (/usr/lib/systemd/system/elasticsearch.service; disabled; vendor preset: disabled)
Active: active (running) since Mon 2016-11-07 18:33:28 CST; 3 days ago
Docs: http://www.elastic.co
Main PID: 8275 (java)
CGroup: /system.slice/elasticsearch.service
└─8275 /bin/java -Xms256m -Xmx1g -Djava.awt.headless=true -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFrac...
..........
..........

數據的測試

1)基本的輸入輸出
[root@elk-node1 ~]# /opt/logstash/bin/logstash -e 'input { stdin{} } output { stdout{} }'
Settings: Default filter workers: 1
Logstash startup completed
hello                                                                                     #輸入這個
2016-11-11T06:41:07.690Z elk-node1 hello                        #輸出這個
wangshibo                                                                            #輸入這個
2016-11-11T06:41:10.608Z elk-node1 wangshibo               #輸出這個

2)使用rubydebug詳細輸出
[root@elk-node1 ~]# /opt/logstash/bin/logstash -e 'input { stdin{} } output { stdout{ codec => rubydebug} }'
Settings: Default filter workers: 1
Logstash startup completed
hello                                                                                    #輸入這個
{                                                                                         #輸出下面信息
           "message" => "hello",
           "@version" => "1",
      "@timestamp" => "2016-11-11T06:44:06.711Z",
                  "host" => "elk-node1"
}
wangshibo                                                                         #輸入這個
{                                                                                       #輸出下面信息
         "message" => "wangshibo",
        "@version" => "1",
   "@timestamp" => "2016-11-11T06:44:11.270Z",
               "host" => "elk-node1"
}

3) 把內容寫到elasticsearch中
[root@elk-node1 ~]# /opt/logstash/bin/logstash -e 'input { stdin{} } output { elasticsearch { hosts => ["192.168.1.160:9200"]} }'
Settings: Default filter workers: 1
Logstash startup completed                       #輸入下面的測試數據
123456
wangshibo
huanqiu
hahaha

使用rubydebug和寫到elasticsearch中的區別:其實就在於后面標准輸出的區別,前者使用codec;后者使用elasticsearch

寫到elasticsearch中在logstash中查看,如下圖:
注意:
master收集到日志后,會把一部分數據碎片到salve上(隨機的一部分數據),master和slave又都會各自做副本,並把副本放到對方機器上,這樣就保證了數據不會丟失。
如下,master收集到的數據放到了自己的第1,3分片上,其他的放到了slave的第0,2,4分片上。

4)即寫到elasticsearch中又寫在文件中一份
[root@elk-node1 ~]# /opt/logstash/bin/logstash -e 'input { stdin{} } output { elasticsearch { hosts => ["192.168.1.160:9200"]} stdout{ codec => rubydebug}}'
Settings: Default filter workers: 1
Logstash startup completed
huanqiupc
{
           "message" => "huanqiupc",
          "@version" => "1",
     "@timestamp" => "2016-11-11T07:27:42.012Z",
                 "host" => "elk-node1"
}
wangshiboqun
{
         "message" => "wangshiboqun",
        "@version" => "1",
   "@timestamp" => "2016-11-11T07:27:55.396Z",
               "host" => "elk-node1"
}

以上文本可以長期保留、操作簡單、壓縮比大。下面登陸elasticsearch界面中查看;

 logstash的配置和文件的編寫

1)logstash的配置
簡單的配置方式:
[root@elk-node1 ~]# vim /etc/logstash/conf.d/01-logstash.conf
input { stdin { } }
output {
        elasticsearch { hosts => ["192.168.1.160:9200"]}
        stdout { codec => rubydebug }
}

它的執行:
[root@elk-node1 ~]# /opt/logstash/bin/logstash -f /etc/logstash/conf.d/01-logstash.conf
Settings: Default filter workers: 1
Logstash startup completed
beijing                                                #輸入內容
{                                                       #輸出下面信息
             "message" => "beijing",
            "@version" => "1",
       "@timestamp" => "2016-11-11T07:41:48.401Z",
                   "host" => "elk-node1"
}

===============================================================
參考內容:
https://www.elastic.co/guide/en/logstash/current/configuration.html
https://www.elastic.co/guide/en/logstash/current/configuration-file-structure.html
===============================================================

2)收集系統日志

[root@elk-node1 ~]# vim  file.conf
input {
    file {
      path => "/var/log/messages"
      type => "system"
      start_position => "beginning"
    }
}

output {
    elasticsearch {
       hosts => ["192.168.1.160:9200"]
       index => "system-%{+YYYY.MM.dd}"
    }
}

執行上面日志信息的收集,如下,這個命令會一直在執行中,表示日志在監控收集中;如果中斷,就表示日志不在收集!所以需要放在后台執行~
[root@elk-node1 ~]# /opt/logstash/bin/logstash -f file.conf &

登陸elasticsearch界面,查看本機系統日志的信息:

 

 

================================================================
參考內容:
https://www.elastic.co/guide/en/logstash/current/plugins-outputs-elasticsearch.html
================================================================

3)收集java日志,其中包含上面講到的日志收集

[root@elk-node1 ~]# vim  file.conf
input {
    file {
      path => "/var/log/messages"
      type => "system"
      start_position => "beginning"
    }
}

input {
    file {
       path => "/var/log/elasticsearch/huanqiu.log"
       type => "es-error" 
       start_position => "beginning"
    }
}


output {

    if [type] == "system"{
        elasticsearch {
           hosts => ["192.168.1.160:9200"]
           index => "system-%{+YYYY.MM.dd}"
        }
    }

    if [type] == "es-error"{
        elasticsearch {
           hosts => ["192.168.1.160:9200"]
           index => "es-error-%{+YYYY.MM.dd}"
        }
    }
}

注意:
如果你的日志中有type字段 那你就不能在conf文件中使用type

執行如下命令收集:
[root@elk-node1 ~]# /opt/logstash/bin/logstash -f file.conf &

登陸elasticsearch界面,查看數據:

====================================================================
參考內容:
https://www.elastic.co/guide/en/logstash/current/event-dependent-configuration.html
====================================================================

有個問題:
每個報錯都給收集成一行了,不是按照一個報錯,一個事件模塊收集的。

下面將行換成事件的方式展示:

[root@elk-node1 ~]# vim multiline.conf
input {
    stdin {
       codec => multiline {
          pattern => "^\["
          negate => true
          what => "previous"
        }
    }
}
output {
    stdout {
      codec => "rubydebug"
     }  
}

執行命令:

[root@elk-node1 ~]# /opt/logstash/bin/logstash -f multiline.conf 
Settings: Default filter workers: 1
Logstash startup completed
123
456
[123
{
    "@timestamp" => "2016-11-11T09:28:56.824Z",
       "message" => "123\n456",
      "@version" => "1",
          "tags" => [
        [0] "multiline"
    ],
          "host" => "elk-node1"
}
123]
[456]
{
    "@timestamp" => "2016-11-11T09:29:09.043Z",
       "message" => "[123\n123]",
      "@version" => "1",
          "tags" => [
        [0] "multiline"
    ],
          "host" => "elk-node1"
}

在沒有遇到[的時候,系統不會收集,只有遇見[的時候,才算是一個事件,才收集起來。
======================================================================
參考內容
https://www.elastic.co/guide/en/logstash/current/plugins-codecs-multiline.html
======================================================================

(3)Kibana安裝配置

1)kibana的安裝:
[root@elk-node1 ~]# cd /usr/local/src
[root@elk-node1 src]# wget https://download.elastic.co/kibana/kibana/kibana-4.3.1-linux-x64.tar.gz
[root@elk-node1 src]# tar zxf kibana-4.3.1-linux-x64.tar.gz
[root@elk-node1 src]# mv kibana-4.3.1-linux-x64 /usr/local/
[root@elk-node1 src]# ln -s /usr/local/kibana-4.3.1-linux-x64/ /usr/local/kibana

2)修改配置文件:
[root@elk-node1 config]# pwd
/usr/local/kibana/config
[root@elk-node1 config]# cp kibana.yml kibana.yml.bak
[root@elk-node1 config]# vim kibana.yml
server.port: 5601
server.host: "0.0.0.0"
elasticsearch.url: "http://192.168.1.160:9200"
kibana.index: ".kibana"        #注意這個.Kibana索引用來存儲數據,千萬不要刪除了它。它是將es數據通過kibana進行web展示的關鍵。這個配置后,在es的web界面里就會看到這個.kibana索引。

因為他一直運行在前台,要么選擇開一個窗口,要么選擇使用screen。
安裝並使用screen啟動kibana:
[root@elk-node1 ~]# yum -y install screen
[root@elk-node1 ~]# screen                          #這樣就另開啟了一個終端窗口
[root@elk-node1 ~]# /usr/local/kibana/bin/kibana
log [18:23:19.867] [info][status][plugin:kibana] Status changed from uninitialized to green - Ready
log [18:23:19.911] [info][status][plugin:elasticsearch] Status changed from uninitialized to yellow - Waiting for Elasticsearch
log [18:23:19.941] [info][status][plugin:kbn_vislib_vis_types] Status changed from uninitialized to green - Ready
log [18:23:19.953] [info][status][plugin:markdown_vis] Status changed from uninitialized to green - Ready
log [18:23:19.963] [info][status][plugin:metric_vis] Status changed from uninitialized to green - Ready
log [18:23:19.995] [info][status][plugin:spyModes] Status changed from uninitialized to green - Ready
log [18:23:20.004] [info][status][plugin:statusPage] Status changed from uninitialized to green - Ready
log [18:23:20.010] [info][status][plugin:table_vis] Status changed from uninitialized to green - Ready

然后按ctrl+a+d組合鍵,這樣在上面另啟的screen屏里啟動的kibana服務就一直運行在前台了....
[root@elk-node1 ~]# screen -ls
There is a screen on:
15041.pts-0.elk-node1 (Detached)
1 Socket in /var/run/screen/S-root.

(3)訪問kibana:http://112.110.115.10:15601/
如下,如果是添加上面設置的java日志收集信息,則在下面填寫es-error*;如果是添加上面設置的系統日志信息system*,以此類型(可以從logstash界面看到日志收集項)

 然后點擊上面的Discover,在Discover中查看:

查看日志登陸,需要點擊“Discover”-->"message",點擊它后面的“add”
注意:
需要右邊查看日志內容時帶什么屬性,就在左邊點擊相應屬性后面的“add”
如下圖,添加了message和path的屬性:

這樣,右邊顯示的日志內容的屬性就帶了message和path

點擊右邊日志內容屬性后面隱藏的<<,就可將內容向前縮進

添加新的日志采集項,點擊Settings->+Add New,比如添加system系統日志。注意后面的*不要忘了。

 

 

刪除kibana里的日志采集項,如下,點擊刪除圖標即可。

 

如果打開kibana查看日志,發現沒有日志內容,出現“No results found”,如下圖所示,這說明要查看的日志在當前時間沒有日志信息輸出,可以點擊右上角的時間鍾來調試日志信息的查看。

 

4)收集nginx的訪問日志

修改nginx的配置文件,分別在nginx.conf的http和server配置區域添加下面內容:

##### http 標簽中
          log_format json '{"@timestamp":"$time_iso8601",'
                           '"@version":"1",'
                           '"client":"$remote_addr",'
                           '"url":"$uri",'
                           '"status":"$status",'
                           '"domain":"$host",'
                           '"host":"$server_addr",'
                           '"size":$body_bytes_sent,'
                           '"responsetime":$request_time,'
                           '"referer": "$http_referer",'
                           '"ua": "$http_user_agent"'
'}';
##### server標簽中
            access_log /var/log/nginx/access_json.log json;

 

截圖如下:

啟動nginx服務:

[root@elk-node1 ~]# systemctl start nginx
[root@elk-node1 ~]# systemctl status nginx
● nginx.service - The nginx HTTP and reverse proxy server
   Loaded: loaded (/usr/lib/systemd/system/nginx.service; disabled; vendor preset: disabled)
   Active: active (running) since Fri 2016-11-11 19:06:55 CST; 3s ago
  Process: 15119 ExecStart=/usr/sbin/nginx (code=exited, status=0/SUCCESS)
  Process: 15116 ExecStartPre=/usr/sbin/nginx -t (code=exited, status=0/SUCCESS)
  Process: 15114 ExecStartPre=/usr/bin/rm -f /run/nginx.pid (code=exited, status=0/SUCCESS)
 Main PID: 15122 (nginx)
   CGroup: /system.slice/nginx.service
           ├─15122 nginx: master process /usr/sbin/nginx
           ├─15123 nginx: worker process
           └─15124 nginx: worker process

Nov 11 19:06:54 elk-node1 systemd[1]: Starting The nginx HTTP and reverse proxy server...
Nov 11 19:06:55 elk-node1 nginx[15116]: nginx: the configuration file /etc/nginx/nginx.conf syntax is ok
Nov 11 19:06:55 elk-node1 nginx[15116]: nginx: configuration file /etc/nginx/nginx.conf test is successful
Nov 11 19:06:55 elk-node1 systemd[1]: Started The nginx HTTP and reverse proxy server.

編寫收集文件
這次使用json的方式收集:

[root@elk-node1 ~]# vim json.conf 
input {
   file {
      path => "/var/log/nginx/access_json.log"
      codec => "json"
   }
}

output {
   stdout {
      codec => "rubydebug"
   }
}

啟動日志收集程序:
[root@elk-node1 ~]# /opt/logstash/bin/logstash -f json.conf        #或加個&放在后台執行

訪問nginx頁面(在elk-node1的宿主機上執行訪問頁面的命令:curl http://192.168.1.160)就會出現以下內容:

[root@elk-node1 ~]# /opt/logstash/bin/logstash -f json.conf
Settings: Default filter workers: 1
Logstash startup completed
{
      "@timestamp" => "2016-11-11T11:10:53.000Z",
        "@version" => "1",
          "client" => "192.168.1.7",
             "url" => "/index.html",
          "status" => "200",
          "domain" => "192.168.1.160",
            "host" => "192.168.1.160",
            "size" => 3700,
    "responsetime" => 0.0,
         "referer" => "-",
              "ua" => "curl/7.19.7 (x86_64-redhat-linux-gnu) libcurl/7.19.7 NSS/3.14.0.0 zlib/1.2.3 libidn/1.18 libssh2/1.4.2",
            "path" => "/var/log/nginx/access_json.log"
}

注意:
上面的json.conf配置只是將nginx日志輸出,還沒有輸入到elasticsearch里,所以這個時候在elasticsearch界面里是采集不到nginx日志的。

需要配置一下,將nginx日志輸入到elasticsearch中,將其匯總到總文件file.conf里,如下也將nginx-log日志輸入到elasticserach里:(后續就可以只用這個匯總文件,把要追加的日志匯總到這個總文件里即可)

[root@elk-node1 ~]# cat file.conf 
input {
    file {
      path => "/var/log/messages"
      type => "system"
      start_position => "beginning"
    }

    file {
       path => "/var/log/elasticsearch/huanqiu.log"
       type => "es-error" 
       start_position => "beginning"
       codec => multiline {
           pattern => "^\["
           negate => true
           what => "previous"
       }
    }
    file {
       path => "/var/log/nginx/access_json.log"
       codec => json
       start_position => "beginning"
       type => "nginx-log"
    }
}


output {

    if [type] == "system"{
        elasticsearch {
           hosts => ["192.168.1.160:9200"]
           index => "system-%{+YYYY.MM.dd}"
        }
    }

    if [type] == "es-error"{
        elasticsearch {
           hosts => ["192.168.1.160:9200"]
           index => "es-error-%{+YYYY.MM.dd}"
        }
    }
    if [type] == "nginx-log"{
        elasticsearch {
           hosts => ["192.168.1.160:9200"]
           index => "nignx-log-%{+YYYY.MM.dd}"
        }
    }
}

可以加上--configtest參數,測試下配置文件是否有語法錯誤或配置不當的地方,這個很重要!!
[root@elk-node1 ~]# /opt/logstash/bin/logstash -f file.conf --configtest
Configuration OK

然后接着執行logstash命令(由於上面已經將這個執行命令放到了后台,所以這里其實不用執行,也可以先kill之前的,再放后台執行),然后可以再訪問nginx界面測試下
[root@elk-node1 ~]# /opt/logstash/bin/logstash -f file.conf &

登陸elasticsearch界面查看:

 將nginx日志整合到kibana界面里,如下:

5)收集系統日志

編寫收集文件並執行。

[root@elk-node1 ~]# cat syslog.conf
input {
    syslog {
        type => "system-syslog"
        host => "192.168.1.160"
        port => "514"
    }
}

output {
    stdout {
        codec => "rubydebug"
    }
}

對上面的采集文件進行執行:
[root@elk-node1 ~]# /opt/logstash/bin/logstash -f syslog.conf

重新開啟一個窗口,查看服務是否啟動:
[root@elk-node1 ~]# netstat -ntlp|grep 514
tcp6 0 0 192.168.1.160:514 :::* LISTEN 17842/java
[root@elk-node1 ~]# vim /etc/rsyslog.conf
#*.* @@remote-host:514                                                           【在此行下面添加如下內容】
*.* @@192.168.1.160:514

[root@elk-node1 ~]# systemctl restart rsyslog

回到原來的窗口(即上面采集文件的執行終端),就會出現數據:

[root@elk-node1 ~]# /opt/logstash/bin/logstash -f syslog.conf
Settings: Default filter workers: 1
Logstash startup completed
{
           "message" => "Stopping System Logging Service...\n",
          "@version" => "1",
        "@timestamp" => "2016-11-13T10:35:30.000Z",
              "type" => "system-syslog",
              "host" => "192.168.1.160",
          "priority" => 30,
         "timestamp" => "Nov 13 18:35:30",
         "logsource" => "elk-node1",
           "program" => "systemd",
          "severity" => 6,
          "facility" => 3,
    "facility_label" => "system",
    "severity_label" => "Informational"
}
........
........

再次添加到總文件file.conf中:

[root@elk-node1 ~]# cat file.conf
input {
    file {
      path => "/var/log/messages"
      type => "system"
      start_position => "beginning"
    }

    file {
       path => "/var/log/elasticsearch/huanqiu.log"
       type => "es-error" 
       start_position => "beginning"
       codec => multiline {
           pattern => "^\["
           negate => true
           what => "previous"
       }
    }
    file {
       path => "/var/log/nginx/access_json.log"
       codec => json
       start_position => "beginning"
       type => "nginx-log"
    }
    syslog {
        type => "system-syslog"
        host => "192.168.1.160"
        port => "514"
    }
}


output {

    if [type] == "system"{
        elasticsearch {
           hosts => ["192.168.1.160:9200"]
           index => "system-%{+YYYY.MM.dd}"
        }
    }

    if [type] == "es-error"{
        elasticsearch {
           hosts => ["192.168.1.160:9200"]
           index => "es-error-%{+YYYY.MM.dd}"
        }
    }
    if [type] == "nginx-log"{
        elasticsearch {
           hosts => ["192.168.1.160:9200"]
           index => "nignx-log-%{+YYYY.MM.dd}"
        }
    }
    if [type] == "system-syslog"{
        elasticsearch {
           hosts => ["192.168.1.160:9200"]
           index => "system-syslog-%{+YYYY.MM.dd}"
        }
    }
}

執行總文件(先測試下總文件配置是否有誤,然后先kill之前在后台啟動的file.conf文件,再次執行):
[root@elk-node1 ~]# /opt/logstash/bin/logstash -f file.conf --configtest
Configuration OK
[root@elk-node1 ~]# /opt/logstash/bin/logstash -f file.conf &

測試:
向日志中添加數據,看elasticsearch和kibana的變化:
[root@elk-node1 ~]# logger "hehehehehehe1"
[root@elk-node1 ~]# logger "hehehehehehe2"
[root@elk-node1 ~]# logger "hehehehehehe3"
[root@elk-node1 ~]# logger "hehehehehehe4"
[root@elk-node1 ~]# logger "hehehehehehe5"

添加到kibana界面中:

 

6)TCP日志的收集

編寫日志收集文件,並執行:(有需要的話,可以將下面收集文件的配置匯總到上面的總文件file.conf里,進而輸入到elasticsearch界面里和kibana里查看)
[root@elk-node1 ~]# cat tcp.conf
input {
tcp {
host => "192.168.1.160"
port => "6666"
}
}
output {
stdout {
codec => "rubydebug"
}
}

[root@elk-node1 ~]# /opt/logstash/bin/logstash -f tcp.conf

開啟另外一個窗口,測試一(安裝nc命令:yum install -y nc):
[root@elk-node1 ~]# nc 192.168.1.160 6666 </etc/resolv.conf

回到原來的窗口(即上面采集文件的執行終端),就會出現數據:
[root@elk-node1 ~]# /opt/logstash/bin/logstash -f tcp.conf
Settings: Default filter workers: 1
Logstash startup completed
{
        "message" => "",
       "@version" => "1",
   "@timestamp" => "2016-11-13T11:01:15.280Z",
              "host" => "192.168.1.160",
              "port" => 49743
}

測試二:
[root@elk-node1 ~]# echo "hehe" | nc 192.168.1.160 6666
[root@elk-node1 ~]# echo "hehe" > /dev/tcp/192.168.1.160/6666

回到之前的執行端口,在去查看,就會顯示出來:

[root@elk-node1 ~]# /opt/logstash/bin/logstash -f tcp.conf 
Settings: Default filter workers: 1
Logstash startup completed
....... { "message" => "hehe", "@version" => "1", "@timestamp" => "2016-11-13T11:39:58.263Z", "host" => "192.168.1.160", "port" => 53432 } { "message" => "hehe", "@version" => "1", "@timestamp" => "2016-11-13T11:40:13.458Z", "host" => "192.168.1.160", "port" => 53457 }

7)使用filter
編寫文件:

[root@elk-node1 ~]# cat grok.conf
input {
    stdin{}
}
filter {
  grok {
    match => { "message" => "%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}" }
  }
}
output {
    stdout{
        codec => "rubydebug"
    }
}

執行檢測:

[root@elk-node1 ~]# /opt/logstash/bin/logstash -f grok.conf 
Settings: Default filter workers: 1
Logstash startup completed
55.3.244.1 GET /index.html 15824 0.043                    #輸入這個,下面就會自動形成字典的形式
{
       "message" => "55.3.244.1 GET /index.html 15824 0.043",
      "@version" => "1",
    "@timestamp" => "2016-11-13T11:45:47.882Z",
          "host" => "elk-node1",
        "client" => "55.3.244.1",
        "method" => "GET",
       "request" => "/index.html",
         "bytes" => "15824",
      "duration" => "0.043"
}

其實上面使用的那些變量在程序中都有定義:

[root@elk-node1 ~]# cd /opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-patterns-core-2.0.2/patterns/
[root@elk-node1 patterns]# ls
aws     bro   firewalls      haproxy  junos         mcollective           mongodb  postgresql  redis
bacula  exim  grok-patterns  java     linux-syslog  mcollective-patterns  nagios   rails       ruby
[root@elk-node1 patterns]# cat grok-patterns
filter {
      # drop sleep events
    grok {
        match => { "message" =>"SELECT SLEEP" }
        add_tag => [ "sleep_drop" ]
        tag_on_failure => [] # prevent default _grokparsefailure tag on real records
      }
     if "sleep_drop" in [tags] {
        drop {}
     }
     grok {
        match => [ "message", "(?m)^# User@Host: %{USER:user}\[[^\]]+\] @ (?:(?<clienthost>\S*) )?\[(?:%{IP:clientip})?\]\s+Id: %{NUMBER:row_id:int}\s*# Query_time: %{NUMBER:query_time:float}\s+Lock_time: %{NUMBER:lock_time:float}\s+Rows_sent: %{NUMBER:rows_sent:int}\s+Rows_examined: %{NUMBER:rows_examined:int}\s*(?:use %{DATA:database};\s*)?SET timestamp=%{NUMBER:timestamp};\s*(?<query>(?<action>\w+)\s+.*)\n#\s*" ]
      }
      date {
        match => [ "timestamp", "UNIX" ]
        remove_field => [ "timestamp" ]
      }
}

8)mysql慢查詢

收集文件:

[root@elk-node1 ~]# cat mysql-slow.conf
input {
    file {
        path => "/root/slow.log"
        type => "mysql-slowlog"
        codec => multiline {
            pattern => "^# User@Host"
            negate => true
            what => "previous"
        }
    }
}

filter {
      # drop sleep events
    grok {
        match => { "message" =>"SELECT SLEEP" }
        add_tag => [ "sleep_drop" ]
        tag_on_failure => [] # prevent default _grokparsefailure tag on real records
      }
     if "sleep_drop" in [tags] {
        drop {}
     }
     grok {
        match => [ "message", "(?m)^# User@Host: %{USER:user}\[[^\]]+\] @ (?:(?<clienthost>\S*) )?\[(?:%{IP:clientip})?\]\s+Id: %{NUMBER:row_id:int}\s*# Query_time: %{NUMBER:query_time:float}\s+Lock_time: %{NUMBER:lock_time:float}\s+Rows_sent: %{NUMBER:rows_sent:int}\s+Rows_examined: %{NUMBER:rows_examined:int}\s*(?:use %{DATA:database};\s*)?SET timestamp=%{NUMBER:timestamp};\s*(?<query>(?<action>\w+)\s+.*)\n#\s*" ]
      }
      date {
        match => [ "timestamp", "UNIX" ]
        remove_field => [ "timestamp" ]
      }
}


output {
    stdout {
       codec =>"rubydebug"
    }
}

執行檢測:
上面需要的/root/slow.log是自己上傳的,然后自己插入數據保存后,會顯示:

[root@elk-node1 ~]# /opt/logstash/bin/logstash -f mysql-slow.conf
Settings: Default filter workers: 1
Logstash startup completed
{
    "@timestamp" => "2016-11-14T06:53:54.100Z",
       "message" => "# Time: 161114 11:05:18",
      "@version" => "1",
          "path" => "/root/slow.log",
          "host" => "elk-node1",
          "type" => "mysql-slowlog",
          "tags" => [
        [0] "_grokparsefailure"
    ]
}
{
    "@timestamp" => "2016-11-14T06:53:54.105Z",
       "message" => "# User@Host: test[test] @  [124.65.197.154]\n# Query_time: 1.725889  Lock_time: 0.000430 Rows_sent: 0  Rows_examined: 0\nuse test_zh_o2o_db;\nSET timestamp=1479092718;\nSELECT trigger_name, event_manipulation, event_object_table, action_statement, action_timing, DEFINER FROM information_schema.triggers WHERE BINARY event_object_schema='test_zh_o2o_db' AND BINARY event_object_table='customer';\n# Time: 161114 12:10:30",
      "@version" => "1",
          "tags" => [
        [0] "multiline",
        [1] "_grokparsefailure"
    ],
          "path" => "/root/slow.log",
          "host" => "elk-node1",
          "type" => "mysql-slowlog"
}
.........
.........

======================================================================
接下來描述會遇見到的一個問題:
一旦我們的elasticsearch出現問題,就不能進行日志采集處理了!
這種情況下該怎么辦呢?

解決方案;
可以在client和elasticsearch之間添加一個中間件作為緩存,先將采集到的日志內容寫到中間件上,然后再從中間件輸入到elasticsearch中。
這就完美的解決了上述的問題了。

(4)ELK中使用redis作為中間件,緩存日志采集內容

1)redis的配置和啟動

[root@elk-node1 ~]# vim /etc/redis.conf               #修改下面兩行內容
daemonize yes
bind 192.168.1.160
[root@elk-node1 ~]# systemctl start redis
[root@elk-node1 ~]# lsof -i:6379
COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME
redis-ser 19474 redis 4u IPv4 1344465 0t0 TCP elk-node1:6379 (LISTEN)
[root@elk-node1 ~]# redis-cli -h 192.168.1.160
192.168.1.160:6379> info
# Server
redis_version:2.8.19
.......

2)編寫從Client端收集數據的文件

[root@elk-node1 ~]# vim redis-out.conf
input {
   stdin {}
}

output {
   redis {
      host => "192.168.1.160"
      port => "6379"
      db => "6"
      data_type => "list"
      key => "demo"
   }
}

3)執行收集數據的文件,並輸入數據hello redis 

[root@elk-node1 ~]# /opt/logstash/bin/logstash -f redis-out.conf
Settings: Default filter workers: 1
Logstash startup completed             #下面輸入數據hello redis
hello redis

4)在redis中查看數據

[root@elk-node1 ~]# redis-cli -h 192.168.1.160
192.168.1.160:6379> info
# Server
.......
.......
# Keyspace
db6:keys=1,expires=0,avg_ttl=0                   #在最下面一行,顯示是db6
192.168.1.160:6379> select 6
OK
192.168.1.160:6379[6]> keys *
1) "demo"
192.168.1.160:6379[6]> LINDEX demo -1
"{\"message\":\"hello redis\",\"@version\":\"1\",\"@timestamp\":\"2016-11-14T08:04:25.981Z\",\"host\":\"elk-node1\"}"

5)繼續隨便寫點數據

[root@elk-node1 ~]# /opt/logstash/bin/logstash -f redis-out.conf 
Settings: Default filter workers: 1
Logstash startup completed
hello redis
123456
asdf
ert
wang
shi
bo
guohuihui
as
we
r
g

asdfjkdfsak
5423wer
34rt3
6y
7uj
u
io9
sdjfhsdk890
huanqiu
huanqiuchain
hqsb
asda    

6)在redis中查看

在redis中查看長度:
[root@elk-node1 ~]# redis-cli -h 192.168.1.160
192.168.1.160:6379> info
# Server
redis_version:2.8.19
.......
.......
# Keyspace
db6:keys=1,expires=0,avg_ttl=0      #顯示是db6
192.168.1.160:6379> select 6
OK
192.168.1.160:6379[6]> keys *
1) "demo"
192.168.1.160:6379[6]> LLEN demo
(integer) 24

7)將redis中的內容寫到ES中

[root@elk-node1 ~]# vim redis-in.conf
input { 
    redis {
      host => "192.168.1.160"
      port => "6379"
      db => "6"
      data_type => "list"
      key => "demo"
   }
}

output {
    elasticsearch {
      hosts => ["192.168.1.160:9200"]
      index => "redis-in-%{+YYYY.MM.dd}"
    }
}

執行:
[root@elk-node1 ~]# /opt/logstash/bin/logstash -f redis-in.conf --configtest
Configuration OK
[root@elk-node1 ~]# /opt/logstash/bin/logstash -f redis-in.conf &

在redis中查看,發現數據已被讀出:
192.168.1.160:6379[6]> LLEN demo
(integer) 0

=============================溫馨提示===========================

redis默認只有16個數據庫, 也就是說最多只能有16個db, 即db01-db15
但是key值可以設置不同, 也就是針對不同日志的key前綴可以設置不同.
比如: 
key => "nginx.log"的值最多可以設置16個db, 即db01-db15
key => "mysql.log"的值最多可以設置16個db, 即db01-db15
key => "tomcat.log"的值最多可以設置16個db, 即db01-db15

登陸elasticsearch界面查看:

 

8)接着,將收集到的所有日志寫入到redis中。這了重新定義一個添加redis緩存后的總文件shipper.conf。(可以將之前執行的總文件file.conf停掉)

[root@elk-node1 ~]# vim shipper.conf
input {
    file {
      path => "/var/log/messages"
      type => "system"
      start_position => "beginning"
    }
 
    file {
       path => "/var/log/elasticsearch/huanqiu.log"
       type => "es-error"
       start_position => "beginning"
       codec => multiline {
           pattern => "^\["
           negate => true
           what => "previous"
       }
    }
    file {
       path => "/var/log/nginx/access_json.log"
       codec => json
       start_position => "beginning"
       type => "nginx-log"
    }
    syslog {
        type => "system-syslog"
        host => "192.168.1.160"
        port => "514"
    }
 
}
 
 
output {
   if [type] == "system"{
     redis {
        host => "192.168.1.160"
        port => "6379"
        db => "6"
        data_type => "list"
        key => "system"
     }
   }
 
    if [type] == "es-error"{
      redis {
        host => "192.168.1.160"
        port => "6379"
        db => "6"
        data_type => "list"
        key => "demo"
        }
     }
    if [type] == "nginx-log"{    
       redis {
          host => "192.168.1.160"
          port => "6379"
          db => "6"
          data_type => "list"
          key => "nginx-log"
       }
    }
    if [type] == "system-syslog"{
       redis {
          host => "192.168.1.160"
          port => "6379"
          db => "6"
          data_type => "list"
          key => "system-syslog"
       }    
     }
}

執行上面的文件(提前將上面之前啟動的file.conf文件的執行給結束掉!)
[root@elk-node1 ~]# /opt/logstash/bin/logstash -f shipper.conf --configtest
Configuration OK
[root@elk-node1 ~]# /opt/logstash/bin/logstash -f shipper.conf
Settings: Default filter workers: 1
Logstash startup completed

在redis中查看:
[root@elk-node1 ~]# redis-cli -h 192.168.1.160
192.168.1.160:6379> info
# Server
redis_version:2.8.19
.......
.......
# Keyspace
db6:keys=1,expires=0,avg_ttl=0                      #顯示是db6
192.168.1.160:6379> select 6
OK
192.168.1.160:6379[6]> keys *
1) "demo"
2) "system"
192.168.1.160:6379[6]> keys *
1) "nginx-log"
2) "demo"
3) "system"

另開一個窗口,添加點日志:
[root@elk-node1 ~]# logger "12325423"
[root@elk-node1 ~]# logger "12325423"
[root@elk-node1 ~]# logger "12325423"
[root@elk-node1 ~]# logger "12325423"
[root@elk-node1 ~]# logger "12325423"
[root@elk-node1 ~]# logger "12325423"

又會增加日志:
192.168.1.160:6379[6]> keys *
1) "system-syslog"
2) "nginx-log"
3) "demo"
4) "system"

其實可以在任意的一台ES中將數據從redis讀取到ES中。
下面咱們在elk-node2節點,將數據從redis讀取到ES中:

編寫文件:

[root@elk-node2 ~]# cat file.conf
input {
     redis {
        type => "system"
        host => "192.168.1.160"
        port => "6379"
        db => "6"
        data_type => "list"
        key => "system"
     }

      redis {
        type => "es-error"
        host => "192.168.1.160"
        port => "6379"
        db => "6"
        data_type => "list"
        key => "es-error"
        }
       redis {
          type => "nginx-log"
          host => "192.168.1.160"
          port => "6379"
          db => "6"
          data_type => "list"
          key => "nginx-log"
       }
       redis {
          type => "system-syslog"
          host => "192.168.1.160"
          port => "6379"
          db => "6"
          data_type => "list"
          key => "system-syslog"
       }    

}


output {

    if [type] == "system"{
        elasticsearch {
           hosts => ["192.168.1.160:9200"]
           index => "system-%{+YYYY.MM.dd}"
        }
    }

    if [type] == "es-error"{
        elasticsearch {
           hosts => ["192.168.1.160:9200"]
           index => "es-error-%{+YYYY.MM.dd}"
        }
    }
    if [type] == "nginx-log"{
        elasticsearch {
           hosts => ["192.168.1.160:9200"]
           index => "nignx-log-%{+YYYY.MM.dd}"
        }
    }
    if [type] == "system-syslog"{
        elasticsearch {
           hosts => ["192.168.1.160:9200"]
           index => "system-syslog-%{+YYYY.MM.dd}"
        }
    }
}

執行:
[root@elk-node2 ~]# /opt/logstash/bin/logstash -f file.conf --configtest
Configuration OK
[root@elk-node2 ~]# /opt/logstash/bin/logstash -f file.conf &

去redis中檢查,發現數據已經被讀出到elasticsearch中了。
192.168.1.160:6379[6]> keys *
(empty list or set)

同時登陸logstash和kibana看,發現可以正常收集到日志了

可以執行這個 去查看nginx日志
[root@elk-node1 ~]# ab -n10000 -c1 http://192.168.1.160/

也可以啟動多個redis寫到ES中,具體根據自己的實際情況而定。

==============logstash配置java環境===============
由於新版的ELK環境要求java1.8,但是有些服務器由於業務代碼自身限制只能用java6或java7。
這種情況下,要安裝Logstash,就只能單獨配置Logstas自己使用的java環境了。

操作如下:
0) 使用rpm包安裝logstash

1)安裝java8,參考:http://www.cnblogs.com/kevingrace/p/7607442.html

2)在/etc/sysconfig/logstash文件結尾添加下面兩行內容:
[root@cx-app01 ~]# vim /etc/sysconfig/logstash
.......
JAVA_CMD=/usr/local/jdk1.8.0_172/bin 
JAVA_HOME=/usr/local/jdk1.8.0_172

3)在/opt/logstash/bin/logstash.lib.sh文件添加下面一行內容:
[root@cx-app02 ~]# vim /opt/logstash/bin/logstash.lib.sh
.......
export JAVA_HOME=/usr/local/jdk1.8.0_172

4) 然后使用logstash收集日志,就不會報java環境錯誤了。

==================配置范例===================

如下的配置范例:
192.168.10.44為elk的master節點,同時也是redis節點
  
[root@client-node01 opt]# pwd
/opt
[root@client-node01 opt]# cat redis-in.conf
input {
    file {
       path => "/usr/local/tomcat8/logs/catalina.out"
       type => "tomcat8-logs"
       start_position => "beginning"
       codec => multiline {
           pattern => "^\["           //表示收集以"["開頭的日志信息
           negate => true
           what => "previous"
       }
    }
}
  
output {
    if [type] == "tomcat8-logs"{
       redis {
          host => "192.168.10.44"
          port => "6379"
          db => "1"
          data_type => "list"
          key => "tomcat8-logs"
       } 
     }
}
  
[root@client-node01 opt]# cat redis-input.conf
input {
  file {
        path => "/var/log/messages"
        type => "systemlog"
        start_position => "beginning"
        stat_interval => "2"
  }
}
  
output {
  if [type] == "systemlog" {
        redis {
                data_type => "list"
                host => "192.168.10.44"
                db => "2"
                port => "6379"
                key => "systemlog"
        }
  }
  
}
  
[root@client-node01 opt]# cat file.conf
input {
     redis {
        type => "tomcat8-logs"
        host => "192.168.10.44"
        port => "6379"
        db => "1"
        data_type => "list"
        key => "tomcat8-logs"
     }
  
       redis {
          type => "systemlog"
          host => "192.168.10.44"
          port => "6379"
          db => "2"
          data_type => "list"
          key => "systemlog"
       } 
   
}
   
   
output {
   
    if [type] == "tomcat8-logs"{
        elasticsearch {
           hosts => ["192.168.10.44:9200"]
           index => "elk-node2-tomcat8-logs-%{+YYYY.MM.dd}"
        }
    }
  
    if [type] == "systemlog"{
        elasticsearch {
           hosts => ["192.168.10.44:9200"]
           index => "elk-node2-systemlog-%{+YYYY.MM.dd}"
        }
    }
}
  
  
[root@client-node01 opt]# /opt/logstash/bin/logstash -f /opt/redis-in.conf --configtest
Configuration OK
[root@client-node01 opt]# /opt/logstash/bin/logstash -f /opt/redis-input.conf --configtest
Configuration OK
[root@client-node01 opt]# /opt/logstash/bin/logstash -f /opt/file.conf --configtest
Configuration OK
  
啟動logstash
[root@client-node01 opt]# /opt/logstash/bin/logstash -f /opt/redis-in.conf &
[root@client-node01 opt]# /opt/logstash/bin/logstash -f /opt/redis-input.conf &
[root@client-node01 opt]# /opt/logstash/bin/logstash -f /opt/file.conf &
  
這時候,當/usr/local/tomcat8/logs/catalina.out和/var/log/messages文件里有新日志信息寫入時,就會觸發動作,
在redis里就能查看到相關信息,並查看寫入到es里。

=========================================================================================================
溫馨提示:
當客戶機的日志信息收集后,經過redis剛讀到es數據庫里后,如果沒有新數據寫入,則默認在es的訪問界面里是看不到
數據的,只有當日志文件里有新的日志寫入后才會觸發數據展示的動作,即es的訪問界面(http://192.168.10.44:9200/_plugin/head/)
里才能看到日志數據的展示效果。
==========================================================================================================
  
假設想上面兩個文件里寫入測試數據
[root@client-node01 opt]# echo "hellohellohellohello" >> /var/log/messages
[root@client-node01 opt]# echo "[hahahahahahhahahahahahahahahahahahah]" >> /usr/local/tomcat8/logs/catalina.out
  
到redis里發現有相關的key,很快就會讀到es里。可以配置到kibana里觀察。
  
可以先測試下日志信息是否寫到redis里?然后再測試下數據是否從redis讀到es里?一步步確定數據去向。
 
注意上面redis-in.conf文件中的下面設置,使用正則匹配,收集以哪些字符開頭的日志信息:
pattern => "^\["                    表示收集以"["開頭的日志信息
pattern => "^2018"                  表示收集以"2018"開頭的日志信息
pattern => "^[a-zA-Z0-9]"           表示收集以字母(大小寫)或數字開頭的日志信息
pattern => "^[a-zA-Z0-9]|[^ ]+"     表示收集以字母(大小寫)或數字或空格的日志信息


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM