ELK 安裝與配置


ELK日志分析之安裝

1.介紹:

  • NRT 
    elasticsearch是一個近似實時的搜索平台,從索引文檔到可搜索有些延遲,通常為1秒。
  • 集群 
    集群就是一個或多個節點存儲數據,其中一個節點為主節點,這個主節點是可以通過選舉產生的,並提供跨節點的聯合索引和搜索的功能。集群有一個唯一性標示的名字,默認是elasticsearch,集群名字很重要,每個節點是基於集群名字加入到其集群中的。因此,確保在不同環境中使用不同的集群名字。一個集群可以只有一個節點。強烈建議在配置elasticsearch時,配置成集群模式。
  • 節點 
    節點就是一台單一的服務器,是集群的一部分,存儲數據並參與集群的索引和搜索功能。像集群一樣,節點也是通過名字來標識,默認是在節點啟動時隨機分配的字符名。當然啦,你可以自己定義。該名字也蠻重要的,在集群中用於識別服務器對應的節點。 

         節點可以通過指定集群名字來加入到集群中。默認情況下,每個節點被設置成加入到elasticsearch集群。如果啟動了多個節點,假設能自動發現對方,他們將會自動組建一個名為                             elasticsearch的集群。

  • 索引 
    索引是有幾分相似屬性的一系列文檔的集合。如nginx日志索引、syslog索引等等。索引是由名字標識,名字必須全部小寫。這個名字用來進行索引、搜索、更新和刪除文檔的操作。 
    索引相對於關系型數據庫的庫。
  • 類型 
    在一個索引中,可以定義一個或多個類型。類型是一個邏輯類別還是分區完全取決於你。通常情況下,一個類型被定於成具有一組共同字段的文檔。如ttlsa運維生成時間所有的數據存入在一個單一的名為logstash-ttlsa的索引中,同時,定義了用戶數據類型,帖子數據類型和評論類型。 
    類型相對於關系型數據庫的表。
  • 文檔 
    文檔是信息的基本單元,可以被索引的。文檔是以JSON格式表現的。 
    在類型中,可以根據需求存儲多個文檔。 
    雖然一個文檔在物理上位於一個索引,實際上一個文檔必須在一個索引內被索引和分配一個類型。 
    文檔相對於關系型數據庫的列。
  • 分片和副本 
    在實際情況下,索引存儲的數據可能超過單個節點的硬件限制。如一個十億文檔需1TB空間可能不適合存儲在單個節點的磁盤上,或者從單個節點搜索請求太慢了。為了解決這個問題,elasticsearch提供將索引分成多個分片的功能。當在創建索引時,可以定義想要分片的數量。每一個分片就是一個全功能的獨立的索引,可以位於集群中任何節點上。 
    分片的兩個最主要原因: 
    a、水平分割擴展,增大存儲量 
    b、分布式並行跨分片操作,提高性能和吞吐量 
    分布式分片的機制和搜索請求的文檔如何匯總完全是有elasticsearch控制的,這些對用戶而言是透明的。 
    網絡問題等等其它問題可以在任何時候不期而至,為了健壯性,強烈建議要有一個故障切換機制,無論何種故障以防止分片或者節點不可用。 
    為此,elasticsearch讓我們將索引分片復制一份或多份,稱之為分片副本或副本。 
    副本也有兩個最主要原因: 
    高可用性,以應對分片或者節點故障。出於這個原因,分片副本要在不同的節點上。 
    提供性能,增大吞吐量,搜索可以並行在所有副本上執行。 
    總之,每一個索引可以被分成多個分片。索引也可以有0個或多個副本。復制后,每個索引都有主分片(母分片)和復制分片(復制於母分片)。分片和副本數量可以在每個索引被創建時定義。索引創建后,可以在任何時候動態的更改副本數量,但是,不能改變分片數。 
    默認情況下,elasticsearch為每個索引分片5個主分片和1個副本,這就意味着集群至少需要2個節點。索引將會有5個主分片和5個副本(1個完整副本),每個索引總共有10個分片。 
    每個elasticsearch分片是一個Lucene索引。一個單個Lucene索引有最大的文檔數LUCENE-5843, 文檔數限制為2147483519(MAX_VALUE – 128)。 可通過_cat/shards來監控分片大小。
  •  索引和類型的解釋:

 

  • ELK的含義:

E: elasticsearch

  ElasticSearch是一個基於Lucene的搜索服務器。它提供了一個分布式多用戶能力的全文搜索引擎,基於RESTful web接口。

  也就是將logstach收集上來的日志儲存,建立索引(便於查找),搜索(提供web展示)

l:logstash

  收集日志

  數據源:各種log,文本,session,silk,snmp

k:kibana

  數據展示,web頁面,可視化

  可以完成批量分析

  數據集之間關聯

  產生圖表

  報警 (python / R 語言 )

  ES python api的文檔

  大量的查詢或者過濾選項可以使用json 語法:
  任何周期都能查詢

 

  •  ELK 關系:

LEK : logstatsh 收集日志,存到elasticserach (存儲,產生索引,搜索) 到kibana展現(view)
 
 

2.安裝

1、下載tar包直接解壓(靈活)

2、配置yum源直接安裝(方便)

服務器部署:

logstatsh : 部署在想收集日志的服務器上。

elasticsearch:主要是用於數據收集,索引,搜索提供展示,隨意安裝在那台服務器上都可以,重要的是es支持分布式,而且再大規模的日志分析中必須做分布式集群。這樣可以跨節點索引和搜索。提高吞吐量與計算能力。

kibana:數據展示,部署在任意服務器上。

 

這里我們做實驗使用的是兩台服務器

node1.wawa.com : 192.168.31.179
node2.wawa.com : 192.168.31.205

a、准備環境:

  配置hosts兩台服務器網絡通暢  

  node1 安裝es,node2安裝es 做成集群,后期可能還會用到redis,redis提供的功能相當於kafka,收集logstatsh發來的數據,es從redis中提取數據。

  node1 安裝kibana 做數據展示

  node2 安裝logstatsh 做數據收集

  創建  elasticsearch 用戶

b、安裝:

  由於es logstatsh kibana基於java 開發,所以安裝jdk ,jdk版本不要過低,否則會提醒升級jdk。

安裝elasticsearch(node1,node2全都安裝es)

下載並安裝GPG key

2.x

[root@linux-node1 ~]# rpm --import https://packages.elastic.co/GPG-KEY-elasticsearch

 

5.1

 

rpm --import https://artifacts.elastic.co/GPG-KEY-elasticsearch

 

  

 

 

添加yum倉庫 

[root@linux-node2 ~]# vim /etc/yum.repos.d/elasticsearch.repo
[elasticsearch-2.x]
name=Elasticsearch repository for 2.x packages
baseurl=http://packages.elastic.co/elasticsearch/2.x/centos
gpgcheck=1
gpgkey=http://packages.elastic.co/GPG-KEY-elasticsearch
enabled=1

 

 

[elasticsearch-5.x]
name=Elasticsearch repository for 5.x packages
baseurl=https://artifacts.elastic.co/packages/5.x/yum
gpgcheck=1
gpgkey=https://artifacts.elastic.co/GPG-KEY-elasticsearch
enabled=1
autorefresh=1
type=rpm-md

 

 

es需要jdk8,但是由於服務器有的業務需要1.7,所以可以讓兩個共存  

 

 

 

安裝elasticsearch

[root@hadoop-node2 ~]# yum install -y elasticsearch

 

問題:

阿里服務器下載和用yum安裝由於鏈接是https的問題報錯

增加yum 源報錯

[root@szdz-SLAVE svr]# yum repolist
Loaded plugins: security
https://artifacts.elastic.co/packages/5.x/yum/repodata/repomd.xml: [Errno 14] PYCURL ERROR 51 - "SSL: certificate subject name 'server.co.com' does not match target host name 'artifacts.elastic.co'"
報錯內容

 

下載GPG key

[root@szdz-SLAVE src]# rpm --import https://artifacts.elastic.co/GPG-KEY-elasticsearch
curl: (6) Couldn't resolve host 'artifacts.elastic.co'
error: https://artifacts.elastic.co/GPG-KEY-elasticsearch: import read failed(2).
[root@szdz-SLAVE src]# curl  https://artifa
View Code

 

 

  • 下載tar包安裝,更簡單,解壓即可運行,只不過沒有yum安裝提供的啟動腳本

安裝kibana(這里使用的tar包安裝,es、log tar包方法一樣)

[root@linux-node2 ~]#cd /usr/local/src
[root@linux-node2 ~]#wget https://download.elastic.co/kibana/kibana/kibana-4.3.1-linux-x64.tar.gz
tar zxf kibana-4.3.1-linux-x64.tar.gz
[root@linux-node1 src]# mv kibana-4.3.1-linux-x64 /usr/local/
[root@linux-node2 src]# ln -s /usr/local/kibana-4.3.1-linux-x64/ /usr/local/kibana

 

安裝logstatsh (node2安裝)

下載並安裝GPG key

[root@linux-node2 ~]# rpm --import https://packages.elastic.co/GPG-KEY-elasticsearch 

添加yum倉庫  

[root@linux-node2 ~]# vim /etc/yum.repos.d/logstash.repo
[logstash-2.1]
name=Logstash repository for 2.1.x packages
baseurl=http://packages.elastic.co/logstash/2.1/centos
gpgcheck=1
gpgkey=http://packages.elastic.co/GPG-KEY-elasticsearch
enabled=1 

安裝logstash  

[root@linux-node2 ~]# yum install -y logstash

  

c、配置管理elasticsearch

[root@linux-node1 src]# grep -n '^[a-Z]' /etc/elasticsearch/elasticsearch.yml 
17:cluster.name: chuck-cluster  判別節點是否是統一集群,多台統一集群的es名稱要一致
23:node.name: linux-node1 節點的hostname
33:path.data: /data/es-data 數據存放路徑
37:path.logs: /var/log/elasticsearch/ 日志路徑
43:bootstrap.memory_lock: true 鎖住內存,使內存不會再swap中使用
54:network.host: 0.0.0.0  允許訪問的ip
58:http.port: 9200  端口
[root@linux-node1 ~]# mkdir -p /data/es-data
[root@linux-node1 src]# chown  elasticsearch.elasticsearch /data/es-data/

  

d、啟動 elasticsearch

[root@node2 ~]# /etc/init.d/elasticsearch status
elasticsearch (pid  23485) 正在運行...
You have new mail in /var/spool/mail/root
[root@node2 ~]# ps aux| grep elasticsearch
505      23485  2.1 53.1 2561964 264616 ?      Sl   17:09   6:07 /usr/bin/java -Xms256m -Xmx1g -Djava.awt.headless=true -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=75 -XX:+UseCMSInitiatingOccupancyOnly -XX:+HeapDumpOnOutOfMemoryError -XX:+DisableExplicitGC -Dfile.encoding=UTF-8 -Djna.nosys=true -Des.path.home=/usr/share/elasticsearch -cp /usr/share/elasticsearch/lib/elasticsearch-2.4.2.jar:/usr/share/elasticsearch/lib/* org.elasticsearch.bootstrap.Elasticsearch start -p /var/run/elasticsearch/elasticsearch.pid -d -Des.default.path.home=/usr/share/elasticsearch -Des.default.path.logs=/var/log/elasticsearch -Des.default.path.data=/var/lib/elasticsearch -Des.default.path.conf=/etc/elasticsearch
root     26425  0.0  0.1 103260   844 pts/0    S+   21:57   0:00 grep elasticsearch
[root@node2 ~]# ss -tunlp | grep elasticsearch
[root@node2 ~]# ss -tunlp | grep 23485
tcp    LISTEN     0      50                    :::9200                 :::*      users:(("java",23485,132))
tcp    LISTEN     0      50                    :::9300                 :::*      users:(("java",23485,89))

 

啟動問題:

[root@szdz-SLAVE svr]# /etc/init.d/elasticsearch start
正在啟動 elasticsearch:Exception in thread "main" BindTransportException[Failed to bind to [9300-9400]]; nested: ChannelException[Failed to bind to: /221.223.97.142:9400]; nested: BindException[無法指定被請求的地址];
Likely root cause: java.net.BindException: 無法指定被請求的地址
	at sun.nio.ch.Net.bind0(Native Method)
	at sun.nio.ch.Net.bind(Net.java:344)
	at sun.nio.ch.Net.bind(Net.java:336)
	at sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:199)
	at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:74)
	at org.jboss.netty.channel.socket.nio.NioServerBoss$RegisterTask.run(NioServerBoss.java:193)
	at org.jboss.netty.channel.socket.nio.AbstractNioSelector.processTaskQueue(AbstractNioSelector.java:391)
	at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:315)
	at org.jboss.netty.channel.socket.nio.NioServerBoss.run(NioServerBoss.java:42)
	at org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)
	at org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
	at java.lang.Thread.run(Thread.java:722)
Refer to the log for complete error details.

network.host: 要填寫本機的ip地址,最好是內網。

 

e、測試

交互方式:

交互的兩種方法

  • Java API : 
    node client 
    Transport client
  • RESTful API 
    Javascript 
    .NET 
    php 
    Perl 
    Python 
    Ruby
  •  ElasticSearch是一個基於Lucene的搜索服務器。它提供了一個分布式多用戶能力的全文搜索引擎,基於RESTful web接口。

 1、我們使用RESTful web接口

[root@linux-node1 src]# curl -i -XGET 'http://192.168.56.11:9200/_count?pretty' -d '{
"query" {   #查詢
     "match_all": {}   #所有信息
}
}'
####################
HTTP/1.1 200 OK Content-Type: application/json; charset=UTF-8 Content-Length: 95 { "count" : 0, 索引0個 "_shards" : { 分區0個 "total" : 0, "successful" : 0, 成功0個 "failed" : 0 失敗0個 } }

  

2、使用es 強大的插件  : head插件顯示索引和分片情況

 

f、安裝插件

[root@linux-node1 src]# /usr/share/elasticsearch/bin/plugin install mobz/elasticsearch-head
[root@linux-node1 src]# /usr/share/elasticsearch/bin/plugin list 可以查看當前已經安裝的插件

 

 

訪問剛剛安裝的head插件

http://192.168.31.179:9200/_plugin/head/

  

 

添加數據測試

 

 

 

  

 

增加:

命令行插入數據與查詢數據(RESTful接口處理的JSON請求)

curl -XPOST http://127.0.0.1:9330/logstash-2017.01.09/testlog -d '{"date":"123456","user":"chenlin7","mesg":"first mesasge"}'

 

返回值

{"_index":"logstash-2017.01.09","_type":"testlog","_id":"AVmBUmd9WXPobNRX0V5f","_version":1,"_shards":{"total":2,"successful":1,"failed":0},"created":true}

可以看到,在數據寫入的時候,會返回該數據的 。這就是后續用來獲取數據 的關鍵: 

 

獲取數據

curl -XGET http://127.0.0.1:9330/logstash-2017.01.09/testlog/AVmBUmd9WXPobNRX0V5f

返回值:

{"_index":"logstash-2017.01.09","_type":"testlog","_id":"AVmBUmd9WXPobNRX0V5f","_version":1,"found":true,"_source":{"date":"123456","user":"chenlin7","mesg":"first mesasge"}}

這個 里的內容,正是之前寫入的數據。 

如果覺得這個返回看起來有點太過麻煩,可以使用_source 直接指定要獲取內容

curl -XGET http://127.0.0.1:9330/logstash-2017.01.09/testlog/AVmBUmd9WXPobNRX0V5f/_source

返回值

{"date":"123456","user":"chenlin7","mesg":"first mesasge"}

 

也可以直接指定字段:

curl -XGET http://115.29.229.72:9330/logstash-2017.01.09/testlog/AVmBUmd9WXPobNRX0V5f\?fields\=user,mesg

 

返回值

{"_index":"logstash-2017.01.09","_type":"testlog","_id":"AVmBUmd9WXPobNRX0V5f","_version":1,"found":true,"fields":{"user":["chenlin7"],"mesg":["first mesasge"]}}%

 

刪除

刪除指定的單條數據

curl -XDELETE http://115.29.229.72:9330/logstash-2017.01.09/testlog/AVmB7OKdWXPobNRX0V5m

 

 

刪除整個索引(嘗試刪除某一個類型應該是不支持)

curl -XDELETE http://115.29.229.72:9330/logstash-2017.01.09  or   curl -XDELETE http://115.29.229.72:9330/logstash-2017.01.* (支持通配符)

  

 

更新

更新有兩種方法,意識全量提交,指明_id才發一次請求

# curl -XPOST http://127.0.0.1:9200/logstash-2015.06.21/testlog/
AU4ew3h2nBE6n0qcyVJK -d '{
    "date" : "1434966686000",
    "user" : "chenlin7",
    "mesg" " "first message into Elasticsearch but version 2"
}'

 

 

另一個是局部更新使用/_update接口

指定doc 添加或修改字段

curl -XPOST 'http://127.0.0.1:9330/logstash-2017.01.09/testlog/AVmB92lCWXPobNRX0V5v/_update' -d '{"doc":{"age":"18"}}'

 

指定script(文檔中操作是這樣。沒有試過)

# curl -XPOST 'http://127.0.0.1:9200/logstash-2015.06.21/testlog
/AU4ew3h2nBE6n0qcyVJK/_update' -d '{
    "script" : "ctx._source.user = \"someone\""
}'

 

搜索請求

全文搜索:ES的搜索請求,有簡易語法和完整語法兩種

簡易語法作為以后在kibana上最常用的方式。

curl -XGET http://115.29.229.72:9330/logstash-2017.01.09/testlog1/_search\?q\=first

這樣就獲取到了logstash-2017.01.09索引中的testlog1類型中first關鍵字的所有數據

{"took":4,"timed_out":false,"_shards":{
    
    "total":5,"successful":5,"failed":0
  
    },

  "hits":{
    
    "total":1,"max_score":0.30685282,
  
    "hits":[{
    
      "_index":"logstash-2017.01.09",
  
      "_type":"testlog1","_id":"AVmB90IfWXPobNRX0V5u",

      "_score":0.30685282,"_source":{

          "date":"123456",

          "user":"chenlin7",

          "mesg":"first mesasge"}
        }]
    }
}

 

還可以使用

curl -XGET http://115.29.229.72:9330/logstash-2017.01.09/testlog/_search\?q\=user:"chenlin7"

 

或者知道某個字段一定在那個key中:例子中就是 first一定是在mesg中

 curl -XGET http://115.29.229.72:9330/logstash-2017.01.09/testlog/_search\?q\=mesg:first

  

  

 

node2安裝好以后配置集群模式

[root@node1 src]# scp /etc/elasticsearch/elasticsearch.yml 192.168.56.12:/etc/elasticsearch/elasticsearch.yml
[root@node2 elasticsearch]# sed -i '23s#node.name: linux-node1#node.name: linux-node2#g' elasticsearch.yml 
[root@node2 elasticsearch]# mkdir -p /data/es-data
[root@node2 elasticsearch]# chown elasticsearch.elasticsearch /data/es-data/

 

node1與node2中都配置上(單播模式,聽說還有組播默認,可以嘗試一下)

[root@linux-node1 ~]# grep -n "^discovery" /etc/elasticsearch/elasticsearch.yml 
79:discovery.zen.ping.unicast.hosts: ["linux-node1", "linux-node2"]
[root@linux-node1 ~]# systemctl restart elasticsearch.service

  在瀏覽器中查看分片信息,一個索引默認被分成了5個分片,每份數據被分成了五個分片(可以調節分片數量),下圖中外圍帶綠色框的為主分片,不帶框的為副本分片,主分片丟失,副本分片會復制一份成為主分片,起到了高可用的作用,主副分片也可以使用負載均衡加快查詢速度,但是如果主副本分片都丟失,則索引就是徹底丟失。 

 

安裝使用kopf插件,監控elasticsearch(elasticsearch服務器都安裝)

[root@linux-node1 bin]# /usr/share/elasticsearch/bin/plugin install lmenezes/elasticsearch-kopf

重啟es服務,訪問,沒有意外你就能看到這個界面

 

還有什么別的用暫時還不知道

 

安裝logstatsh

下載並安裝GPG key

 

[root@linux-node2 ~]# rpm --import https://packages.elastic.co/GPG-KEY-elasticsearch

 

添加yum倉庫

[root@linux-node2 ~]# vim /etc/yum.repos.d/logstash.repo
[logstash-2.1]
name=Logstash repository for 2.1.x packages
baseurl=http://packages.elastic.co/logstash/2.1/centos
gpgcheck=1
gpgkey=http://packages.elastic.co/GPG-KEY-elasticsearch
enabled=1

安裝logstash

[root@linux-node2 ~]# yum install -y logstash
  • 也可以下載logstash的tar包解壓即可使用

安裝后就可以測試了

logstatsh有兩種啟動方式,一種用就是測試啟動,一種就是正式啟動

logstash工作方式:logstatsh的功能是收集日志文件,並將收集的日志文件發送給es服務器。然后es服務器產生索引,提供搜索,並且再交給web展示

但是日志類型和索引名稱都是在logstatsh中定義的

 

a、首先我們熟悉logstatsh的格式是以jason為格式,其中定義輸入輸出

 

‘input { stdin{} } output { stdout{} }’

input :輸入,output :輸出

input可以是命令行手動輸入,也可以是指定一個文件,或者一個服務,  

output是輸出位置。可以是屏幕打印,也可以指定es服務器

  • 我們先做一個最基礎的命令行輸入,和屏幕輸出
[root@node2 bin]# /opt/logstash/bin/logstash -e 'input { stdin{} } output { stdout{} }'   
#stdin 指定輸入為stdin標准輸入 output:指定stdout標准輸出

Settings: Default filter workers: 1 Logstash startup completed
chuck --> 命令行輸入 2016-01-14T06:01:07.184Z node2 chuck ==>屏幕輸出
www.chuck-blog.com --> 命令行輸入 2016-01-14T06:01:18.581Z node2 www.chuck-blog.com ==>屏幕輸出
  • 使用rubudebug顯示詳細輸出,codec為一種編解碼器
[root@node2 bin]# /opt/logstash/bin/logstash -e 'input { stdin{} } output { stdout{ codec => rubydebug} }'  #codec 指定輸出的解碼器,不知道還有沒有別的解碼器
Settings: Default filter workers: 1
Logstash startup completed
chuck   ---> 屏幕輸入
{
       "message" => "chuck",
      "@version" => "1",
    "@timestamp" => "2016-01-14T06:07:50.117Z",
          "host" => "node2"
}   --->rubydebug格式輸出

上述每一條輸出的內容稱為一個事件,多個相同的輸出的內容合並到一起稱為一個事件(舉例:日志中連續相同的日志輸出稱為一個事件)!  

 

** Logstash 會給時間添加一些額外信息,最重要的就是@timestamp,用來標記時間的發生時間。因為這個字段涉及到Logs他說的內部流傳,所以必須是一個joda對象,如果你嘗試自己給一個字符串

字段命名為@timestamp,Logstash會直接報錯。所以,青絲用filter/data插件來管理這個特殊字段

此外大多數時候,還可以見到另外幾個。

1、host標記時間發生在哪里

2、type標記時間的唯一類型

3、tags標記時間的某方面屬性。這是一個數組,一個時間可以有多個標簽。

 

Logstash 格式及支持的數據類型:
Logstash 格式被命名為區段(section)
section的格式是:
input{
  stdin{
  }
  syslog{
  }
}

數據類型
  • bool

    debug => true

  • string

    host => "hostname"

  • number

    ip => 127.0.0.1

  • array

    match => ["datetime","Unix"]

  • hash

    options => {

      key1 = > "value1",

      key2 => "value2"

    }

** 如果版本低於1.2.0 hash的寫法和array是一樣的

 

 

 

字段:

 

 

 

 

 

  • 使用logstash將信息寫入到elasticsearch
[root@linux-node2 bin]# /opt/logstash/bin/logstash -e 'input { stdin{} } output { elasticsearch { hosts => ["192.168.1.105:9200"] } }'
#這里定義的output 就是指定es服務器的地址以及端口,也可以直接寫hostname
Settings: Default filter workers: 1
Logstash startup completed
maliang
chuck
www.google.com
www.baidu.com

也可以本地輸出,和遠程發送同時進行

[root@-node2 bin]# /opt/logstash/bin/logstash -e 'input { stdin{} } output { elasticsearch { hosts => ["192.168.1.105:9200"] } stdout{ codec => rubydebug } }' 
Settings: Default filter workers: 1
Logstash startup completed
www.google.com
{
       "message" => "www.google.com",
      "@version" => "1",
    "@timestamp" => "2016-01-14T06:27:49.014Z",
          "host" => "node2"
}
www.elastic.com  
{
       "message" => "www.elastic.com",
      "@version" => "1",
    "@timestamp" => "2016-01-14T06:27:58.058Z",
          "host" => "node2"
}

  

 

  • 使用logstatsh讀取一個配置文件,把寫好的規則放在文件中
[root@node2 ~]# cat test.conf 
input { stdin { } }
output {
  elasticsearch { hosts => ["192.168.31.105:9200"] }   #發送
  stdout { codec => rubydebug }               #並且顯示
}

[root@linux-node1 ~]# /opt/logstash/bin/logstash -f test.conf
Settings: Default filter workers: 1 Logstash startup completed 123
{ "message" => "123", "@version" => "1", "@timestamp" => "2016-01-14T06:51:13.411Z", "host" => "lnode1

如果你是yum安裝,就可以把這個位置文件放在 /etc/logstash/conf.d/ 下面 直接啟動logstatsh 就直接發送給es服務器了

 

b、學習編寫conf格式

  • 輸入插件配置,此處以file為例,可以設置多個
input {
  file {
    path => "/var/log/messages"
    type => "syslog"  #類型
  }
  file {
    path => "/var/log/apache/access.log"
    type => "apache"  #類型
  }
}  
  • 介紹幾種收集文件的方式,可以使用數組方式或者用*匹配,也可以寫多個path
path => ["/var/log/messages","/var/log/*.log"]
path => ["/data/mysql/mysql.log"]
  • 設置boolean值
ssl_enable => true
  • 文件大小單位
  my_bytes => "1113"   # 1113 bytes
  my_bytes => "10MiB"  # 10485760 bytes
  my_bytes => "100kib" # 102400 bytes
  my_bytes => "180 mb" # 180000000 bytes
  • jason收集 
codec => “json”
  • hash收集 
match => {
  "field1" => "value1"
  "field2" => "value2"
  ...
}
  • 端口
port =>  33
  • 密碼
my_password =>  "password"

 

c、學習編寫input的file插件

  

sincedb_path:記錄logstash讀取位置的路徑 
start_postion :包括beginning和end,指定收集的位置,默認是end,從尾部開始 
add_field 加一個域 
discover_internal 發現間隔,每隔多久收集一次,默認15秒

 

d、學習編寫output的file插件

 

e、通過input和output插件編寫conf文件

[root@node3 ~]# cat /etc/logstash/conf.d/syslog.conf
input {
    file {
        path => "/var/log/my_syslog"   #日志地址
        type => "syslog"         #自定義類型
	start_position => "beginning" #從頭開始讀取日志 
    }
}
output {
    elasticsearch {            #輸出推送給es服務器  
        hosts => ["node2.gitlab.com"]      #es服務器地址
        index => "system-%{+YYYY.MM.dd}"  #自定義索引  
    }
}
  • 我們不是配置了兩台es嗎,怎么就發給一個呢?是因為es服務器本身支持集群分片,當數據到達es服務器的時候,es服務器自己會將日志信息分散到所有其他的服務器上。
  • 然后我們就能夠在頁面上看到了

  兩台服務器,然后每台服務分成了5份,在瀏覽器中查看分片信息,一個索引默認被分成了5個分片,每份數據被分成了五個分片(可以調節分片數量),下圖中外圍帶綠色框的為主分片,不帶框的為副本分片,主分片丟失,副本分片會復制一份成為主分片,起到了高可用的作用,主副分片也可以使用負載均衡加快查詢速度,但是如果主副本分片都丟失,則索引就是徹底丟失。 

 

 f、使用type來匹配類型

input {
    file {
        path => "/var/log/my_syslog"
        type => "syslog"
        start_position => "beginning"
    }

    file {
        path => "/var/log/messages"
        type => "system"
        start_position => "beginning"
    }

    file {
        path => "/opt/ela/logs/my-application.log"
        type => "elk-log"
        start_position => "beginning"
    }
}


output {
    if [type] == "system" {
        elasticsearch {
            hosts => ["node2.gitlab.com"]
            index => "system-%{+YYYY.MM.dd}"
        }
    }

    if [type] == "elk-log" {
        elasticsearch {
            hosts => ["node2.gitlab.com"]
            index => "elklog-%{+YYYY.MM.dd}"
        }
}

    if [type] == "syslog" {
        elasticsearch {
            hosts => ["node2.gitlab.com"]
            index => "system-%{+YYYY.MM.dd}"
        }
    }
}

**start_position 僅在文件未被監控過的時候起作用,如果sincedb文件中已經有監控文件的inode記錄了,那么Logstash依然會從記錄過的pos開始讀取。所以重復測試的時候每次需要刪除sincedb文件。不過有一個巧妙的方法

就是把sincedb文件的位置定義在/dev/null中,這樣每次重啟自動從開頭讀取

 

 

g、把多行整個報錯收集到一個事件中

以at.org開頭的內容都屬於同一個事件,但是顯示在不同行,這樣的日志格式看起來很不方便,所以需要把他們合並到一個事件中

引入codec的multiline插件

官方文檔提供

input {
  stdin {
    codec => multiline {
 `     pattern => "pattern, a regexp"
      negate => "true" or "false"
      what => "previous" or "next"`
    }
  }
}

regrxp:使用正則,什么情況下把多行合並起來 
negate:正向匹配和反向匹配 
what:合並到當前行還是下一行 
在標准輸入和標准輸出中測試以證明多行收集到一個日志成功

[root@linux-node1 ~]# cat muliline.conf 
input {
    stdin {
        codec => multiline {
            pattern => "^\["
            negate => true
            what => "previous"
        }
    }
}
output {
    stdout {
        codec => "rubydebug"
    }
}
[root@linux-node1 ~]# /opt/logstash/bin/logstash -f muliline.conf 
Settings: Default filter workers: 1
Logstash startup completed
[1 
[2
{
    "@timestamp" => "2016-01-15T06:46:10.712Z",
       "message" => "[1",
      "@version" => "1",
          "host" => "linux-node1"
}
chuck
chuck-blog.com
123456
[3
{
    "@timestamp" => "2016-01-15T06:46:16.306Z",
       "message" => "[2\nchuck\nchuck-bloh\nchuck-blog.com\n123456",
      "@version" => "1",
          "tags" => [
        [0] "multiline"
    ],
          "host" => "linux-node1"

繼續將上述實驗結果放到all.conf的es-error索引中

[root@linux-node1 ~]# cat all.conf 
input {
    file {
        path => "/var/log/messages"
        type => "system"
        start_position => "beginning"
    }
    file {
        path => "/var/log/elasticsearch/chuck-clueser.log"
        type => "es-error"
        start_position => "beginning"
        codec => multiline {
            pattern => "^\["
            negate => true
            what => "previous"
        }
    }
}
output {
    if [type] == "system" {
        elasticsearch {
            hosts => ["192.168.56.11:9200"]
            index => "system-%{+YYYY.MM.dd}"
        }
    }
    if [type] == "es-error" {
        elasticsearch {
            hosts => ["192.168.56.11:9200"]
            index => "es-error-%{+YYYY.MM.dd}"
        }
    }
}

 

Logstash 使用一個名叫FileWatch 的Ruby Gem 庫來監聽文件變化。這個庫支持glob展開文件路徑,並且會記錄一個叫.sincedb的數據庫文件來跟蹤被堅挺的日志文件的當前讀取位置。

sincedb文件中記錄了每個被堅挺的文件的 inode,major number, minor number 和 pos

  

h. 使用log4j插件收集tomcat日志

首先在tomcat的log4j配置文件中進行修改,讓日志輸出到一個地方,然后使用Logstash去這個地方收集

這個地方就是一個ip+port

一般tomat中log4j的配置有兩種形式,一種是log4j.properties 另一種是log4j.xml 文件位置:

第一種:

webapps/ROOT/WEB-INF/classes/log4j.properties
log4j.rootLogger=INFO,stdout,logstash  #然后把添加好的日志對象,放在最終的rootLogger中,讓其既輸出到catalina.out 中也輸出在對應的ip+port中

#,joyreader,appExceptionCollector

log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d[%p]%C{1}.%M(%L)|%m%n

log4j.logger.joyreader=INFO,joyreader
log4j.appender.joyreader=org.apache.log4j.DailyRollingFileAppender
log4j.appender.joyreader.DatePattern='.'yyyyMMddHH
log4j.appender.joyreader.layout=org.apache.log4j.PatternLayout
log4j.appender.joyreader.layout.ConversionPattern=%d[%p]%C{1}.%M(%L)|%m%n
log4j.appender.joyreader.file=${log.root}joyreader-api.txt

log4j.logger.appExceptionCollector=INFO,appExceptionCollector
log4j.appender.appExceptionCollector=org.apache.log4j.RollingFileAppender
log4j.appender.appExceptionCollector.DatePattern='.'yyyyMMddHH
log4j.appender.appExceptionCollector.layout=org.apache.log4j.PatternLayout
log4j.appender.appExceptionCollector.layout.ConversionPattern=%d[%p]%C{1}.%M(%L)|%m%n
log4j.appender.appExceptionCollector.file=${log.root}appExceptions.txt

##########下面就是自己添加的內容#############

log4j.appender.logstash=org.apache.log4j.net.SocketAppender
log4j.appender.logstash.Port=4990
log4j.appender.logstash.RemoteHost=127.0.0.1
log4j.appender.logstash.ReconnectionDelay=60000
log4j.appender.logstash.LocationInfo=true
log4j.properties

 

input {
    log4j {
        type => "testapi3"      #日志類型
        host => "127.0.0.1"    #接受的地址
        port => 4990              #接受的端口
        }
}

output {
    stdout{
        codec => rubydebug
    }
}    

 

其他參數

add_field :添加一個字段到時間中

類型 hash

默認為空 {}

 

codec:輸入時的字符編碼

默認為"plain"

 

 data_timeout  : 超時時間

默認值為5

讀超時秒。如果一個特定的TCP連接空閑時間超過這個超時周期,就認為這個任務死了,並不在監聽。如果你不想超時,用-1。

 

host: 監聽地址

默認: 0.0.0.0

如實是服務器的話,就監聽這個。如果是客戶端則連接這個地址

 

mode:設置是服務器還是客戶端(server|client)

默認 server

模式切換:服務器監聽客戶端的連接,客戶端發送到服務器

 

tags

類型:array

沒有設置默認值

添加任意數量的任意標簽的事件。這可以幫助處理。

處理結果樣式圖:

 

 

 

 

kibana的配置

[root@node2 logs]# grep '^[a-Z]' /opt/svr/kibana/config/kibana.yml
server.port: 5601
server.host: "0.0.0.0"
elasticsearch.url: "http://localhost:9200"
kibana.index: ".kibana"

啟動

[root@node2 kibana]# nohup ./bin/kibana  &
[1] 6722

 

[root@node2 kibana]# ss -tunlp | grep 5601
tcp    LISTEN     0      128                    *:5601                  *:*      users:(("node",6722,11))

 

在kibana中添加一個elklog索引

點擊create 創建

kibana通過elklog的索引去es服務器上搜索有關日志

 

 

 

點擊discover即可查看到圖形界面

 

 

 

  

 

 

  

  

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM