1、ELK-ES簡介
對於日志來說,最常見的就是收集、存儲、查詢、展示。對應的有一個開源項目組合:ELKStack。其中包括logstash(日志收集)、elasticsearch(存儲+搜索)和kibana(展示)這三個項目。
安裝logstash:
# yum install -y java
導入GPG-KEY:
# rpm --import https://artifacts.elastic.co/GPG-KEY-elasticsearch
添加yum倉庫:
# vim /etc/yum.repos.d/logstash.repo
[logstash-6.x]
name=Elastic repository for 6.x packages
baseurl=https://artifacts.elastic.co/packages/6.x/yum
gpgcheck=1
gpgkey=https://artifacts.elastic.co/GPG-KEY-elasticsearch
enabled=1
autorefresh=1
type=rpm-md
# yum install -y logstash
# systemctl start logstash
安裝ElasticSearch,使用6.x版本的安裝包有太多的坑,所以改用2.x版本
————————————————————————————————————————————————————————————————————這是個坑,放棄———————————————————————————————————————————————————————————————————————————————————
tar.gz安裝:官網上有現成的壓縮包,下載下來解壓就完成了。
# tar zxf elasticsearch-6.2.4.tar.gz
然后是配置。配置文件在/etc/elasticsearch/下:
# vim elasticsearch.yml
# ---------------------------------- Cluster -----------------------------------
#
cluster.name: myes //elasticsearch就是分布式的
#
# ------------------------------------ Node ------------------------------------
#
# Use a descriptive name for the node:
#
node.name: linux-node01 //配置節點名稱
#
# ----------------------------------- Paths ------------------------------------
#
path.data: /data //配置存儲es數據的路徑,多個的話可以使用,分隔
#
path.logs: /var/log/elasticsearch/ //日志的路徑
#
# ----------------------------------- Memory -----------------------------------
#
bootstrap.memory_lock: true //保證內存不會放入交換分區
#
# ---------------------------------- Network -----------------------------------
#
network.host: 172.16.0.3 //本機IP地址
#
http.port: 9200 //默認端口,不需要修改
#
# --------------------------------- Discovery ----------------------------------
# 這里配置的是集群的各個節點互相發現的方式,有單播或者組播的方式。
然后創建/data目錄:
# mkdir /data
接下來啟動:
# ./bin/elasticsearch
失敗。。。然后查看日志:
# cat /home/es/elasticsearch-6.2.4/logs/myes.log
[2018-05-22T16:03:12,235][ERROR][o.e.b.Bootstrap ] Exception
java.lang.RuntimeException: can not run elasticsearch as root
查看網上解釋發現如果是以root權限來執行elasticsearch會有上面的報錯。解決方法,創建es用戶,然后修改所有相關文件的屬主和屬組。
# useradd es
# passwd es
# mv ~/elasticsearch-6.2.4 /home/es/
# chown -R es:es /home/es/elasticsearch-6.2.4
# chown -R es:es /data/
然后切換用戶啟動:
# su - es
$ ./elasticsearch-6.2.4/bin/elasticsearch
接下來又遇到報錯:
[2018-05-22T16:32:36,857][WARN ][o.e.b.JNANatives ] Unable to lock JVM Memory: error=12, reason=無法分配內存
[2018-05-22T16:32:36,874][WARN ][o.e.b.JNANatives ] This can result in part of the JVM being swapped out.
[2018-05-22T16:32:36,874][WARN ][o.e.b.JNANatives ] Increase RLIMIT_MEMLOCK, soft limit: 65536, hard limit: 65536
[2018-05-22T16:32:36,875][WARN ][o.e.b.JNANatives ] These can be adjusted by modifying /etc/security/limits.conf, for example:
# allow user 'elasticsearch' mlockall
elasticsearch soft memlock unlimited
elasticsearch hard memlock unlimited
...
[2018-05-22T16:33:28,310][INFO ][o.e.b.BootstrapChecks ] [linux-node1] bound or publishing to a non-loopback address, enforcing bootstrap checks
ERROR: [3] bootstrap checks failed
[1]: max file descriptors [4096] for elasticsearch process is too low, increase to at least [65536]
[2]: memory locking requested for elasticsearch process but memory is not locked
[3]: max virtual memory areas vm.max_map_count [65530] is too low, increase to at least [262144]
在root用戶下修改系統文件:
# vim /etc/security/limits.conf
* soft nofile 65535
* hard nofile 131072
* soft memlock unlimited
* hard memlock unlimited
# vim /etc/sysctl.conf
vm.max_map_count=655360
安裝kibana和marvel:
由於elasticsearch使用的是6.2.4,這里kibana也應該使用6.2.4版本,同樣使用tar包:
$ wget https://artifacts.elastic.co/downloads/kibana/kibana-6.2.4-linux-x86_64.tar.gz
$ tar zxf kibana-6.2.4.linux-x86_64.tar.gz
$ cd /kibana-6.2.4-linux-x86_64/config/kibana.yml //修改IP地址
server.host: "172.16.0.3"
elasticsearch.url: "http://172.16.0.3:9200"
elasticsearch.username: "elastic"
elasticsearch.passsword: "123456” //這個要與后面使用set-password所設置的密碼保持一致
logging.dest: /var/log/kibana
# touch /var/log/kibana
# chown -R es:es /var/log/kibana
根據官網的描述,在5.0以后,Marvel插件歸入X-Pack,所以這里安裝X-Pack:
$ pwd
/home/es
$ ./elasticsearch-6.2.4/bin/elasticsearch-plugin install x-pack
接下來分別啟動elasticsearch和kibana
$ ./elasticsearch-6.2.4/bin/elasticsearch -d
$ ./kibana-6.2.4-linux-x86_64/bin/kibana
注意,kibana這里用tar包運行,沒有像elasticsearch那么方便直接-d就可以后台運行,所以需要如下的方式:
$ nohup /home/es/kibana-6.2.4-linux-x86_64/bin/kibana &
運行完之后可以看到9200正常監聽,然后可以用http://172.16.0.3:9200打開網頁,但是這個時候需要輸入密碼。原因在於安裝了X-pack。
$ ./elasticsearch-6.2.4/bin/x-pack/set-passwords ineractive
設置密碼。完成之后可以正常打開網頁。
安裝head插件(集群管理插件):
在6.2.3版本中無法直接通過plugin安裝head,可以通過git安裝:
$ yum install -y git bzip2 nodejs npm
$ git clone https://github.com/mobz/elasticsearch-head.git
$ cd elasticsearch-head/
$ npm install
$ vim elastisearch-6.2.4/config/elasticsearch.yml
末尾新增:
http.cors.enable: true
http.cors.allow-origin: "*"
$ vim elasticsearch-head/Gruntifile.js
在快要結尾的位置添加hostname:
options:{
hostname: '*',
port: 9100,
base: '.',
keepalive: true
}
$ vim elasticsearch-head/_site/app.js
將localhost改成本機IP:
this.base_uri = this.config.base_uri || this.prefs.get("app-base_uri") || "http://172.16.0.3:9300";
后台啟動:
$ cd elasticsearch-head/node_modules/grunt/bin/
$ nohup ./grunt server &
$ netstat -lntup
[root@3-linux-node01 elasticsearch-head]# netstat -lntup
Active Internet connections (only servers)
Proto Recv-Q Send-Q Local Address Foreign Address State PID/Program name
tcp6 0 0 :::9100 :::* LISTEN 21679/grunt
tcp6 0 0 172.16.0.3:9200 :::* LISTEN 3118/java
可以看到9100正在處於監聽狀態中,而且使用網頁打開http://172.16.0.3:9100可以看到正常顯示。
———————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————
rpm安裝:
在網站上找到2.4.6的rpm安裝包,然后下載下來上傳至虛擬機,之后安裝
# wget https://download.elastic.co/elasticsearch/release/org/elasticsearch/distribution/rpm/elasticsearch/2.4.6/elasticsearch-2.4.6.rpm
# rpm --install elasticsearch-2.4.6
配置文件還是和上面一樣保持不變,但是/data的權限要變化
# chown -R elasticsearch:elasticsearch /data
# /usr/share/elasticsearch/bin/plugin install marvel-agent
# /usr/share/elasticsearch/bin/plugin install license
# /usr/share/elasticsearch/bin/plugin install mobz/elasticsearch-head
# /usr/share/elasticsearch/bin/plugin install lmenezes/elasticsearch-kopf
# /usr/share/elasticsearch/bin/plugin install hlstudio/bigdesk
# systemctl restart elasticsearch
啟動完成之后就可以打開網頁了:
http://172.16.0.3:9200/_plugin/head/
http://172.16.0.3:9200/_plugin/kopf/#!/cluster
但是日志中有以下錯誤,先等到6月23日再說吧:
# License will expire on [Saturday, June 23, 2018]. If you have a new license, please update it.
# Otherwise, please reach out to your support contact.
2、ELK-ES集群
啟動第二台服務器,地址172.16.0.4。配置文件的集群名稱必須一致,主機名則需要不同,IP地址需要修改。
這里不清楚是否是虛擬機故障,集群是靠組播協議發現,所以改成單播發現:
# vim /etc/elasticsearch/elasticsearch.yml
discovery.zen.ping.unicast.hosts: ["172.16.0.3", "172.16.0.4"]
# systemctl restart elasticsearch
之后可以看到兩台服務器組成了集群:


所有參與集群的主機都要配置,可以不加自己的地址,但是對方的地址一定是要加的。實心五角星代表着master節點。
安裝kibana,這里選擇4.6.5版本:
# rpm --install kibana-4.6.5-x86_64.rpm
# vim /opt/kibana/config/kibana.yml
修改配置文件中的url地址為本機地址:
elasticsearch.url: "http://172.16.0.3:9200"
# systemctl start kibana
安裝logstash,這里選擇5.5.1版本:
# rpm --install logstash-5.5.1.rpm
logstash的實現主要是依賴於插件,核心在於input和output
# /usr/share/logstash/bin/logstash -e 'input { stdin{} } output { stdout{} }'
然后遇到一系列問題:
ERROR StatusLogger No log4j2 configuration file found. Using default configuration: logging only errors to the console.
WARNING: Could not find logstash.yml which is typically located in $LS_HOME/config or /etc/logstash. You can specify the path using --path.settings. Continuing using the defaults
Could not find log4j2 configuration at path //usr/share/logstash/config/log4j2.properties. Using default config which logs to console
13:37:17.095 [main] INFO logstash.setting.writabledirectory - Creating directory {:setting=>"path.queue", :path=>"/usr/share/logstash/data/queue"}
13:37:17.265 [main] INFO logstash.setting.writabledirectory - Creating directory {:setting=>"path.dead_letter_queue", :path=>"/usr/share/logstash/data/dead_letter_queue"}
13:37:17.840 [LogStash::Runner] INFO logstash.agent - No persistent UUID file found. Generating new UUID {:uuid=>"26451586-698c-48cb-87ca-8a2a80500e9e", :path=>"/usr/share/logstash/data/uuid"}
13:37:22.527 [[main]-pipeline-manager] INFO logstash.pipeline - Starting pipeline {"id"=>"main", "pipeline.workers"=>2, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>5, "pipeline.max_inflight"=>250}
13:37:23.734 [[main]-pipeline-manager] INFO logstash.pipeline - Pipeline main started
The stdin plugin is now waiting for input:
13:37:26.194 [Api Webserver] INFO logstash.agent - Successfully started Logstash API endpoint {:port=>9600}
# ln -s /etc/logstash/ config
# /usr/share/logstash/bin/logstash -e 'input { stdin{} } output { stdout{} }'
解決了一部分:
ERROR StatusLogger No log4j2 configuration file found. Using default configuration: logging only errors to the console.
Sending Logstash's logs to /var/log/logstash which is now configured via log4j2.properties
The stdin plugin is now waiting for input:
3、ELK-Logstash實驗
接下來做實驗,從標准輸入讀取,然后輸出到es里面去。這里要是用插件elasticsearch,選擇版本為v7.3.2:
# /usr/share/logstash/bin/logstash -e 'input { stdin{} } output { elasticsearch { hosts => ['172.16.0.3:9200'] index => "logstash-%{+YYYY.MM.dd}" } }'
之后輸入的內容就可以在elasticsearch上看到了:


將上面的命令寫成一個腳本:
# pwd
/usr/share/logstash/config/conf.d
# vim demo.conf
input{
stdin{}
}
filter{
}
output{
stdout{
codec => rubydebug
}
elasticsearch {
hosts => ["172.16.0.3:9200"]
index => "logstash-%{+YYYY.MM.dd}"
}
}
之后可以指定啟動腳本啟動:
# /usr/share/logstash/bin/logstash -f /usr/share/logstash/config/conf.d/demo.conf
這樣就可以用啟動腳本達成需要的效果了。
啟動腳本的寫法很簡單,包括input、filter和output三個模塊,其中filter可以為空。其中的原理需要理解。首先,logstash的讀取是按"行"讀取,但是大多數情況下我們是希望按"事件"讀取,比如一條日志包含若干行,那么我們肯定是希望這一條日志的所有行被顯示在一條日志中,而不是有多少行就顯示多少條日志。其次,logstash在這個過程中相當於從input得到日志,經過codec編碼之后,再經過filter過濾日志,然后經過codec解碼之后再由output輸出日志。每個模塊都有自己的成對的{},數組則使用[],字符串則使用""。
discover_interval:logstash 每隔多久去檢查一次被監聽的 path 下是否有新文件。默認值是 15 秒。
exclude:不想被監聽的文件可以排除出去,這里跟 path 一樣支持 glob 展開。
sincedb_path:sincedb文件是用於存儲Logstash讀取文件的位置,每行表示一個文件,每行有兩個數字,第一個表示文件的inode,第二個表示文件讀取到的位置(byteoffset),默認為$HOME/.sincedb*(Windows 平台上在 C:\Windows\System32\config\systemprofile\.sincedb),文件名是日志文件路徑MD5加密后的結果。sincedb_path只能指定為具體的file文件,不能是path目錄。
sincedb_write_interval:logstash 每隔多久寫一次 sincedb 文件,默認是 15 秒。
stat_interval:logstash 每隔多久檢查一次被監聽文件狀態(是否有更新),默認是 1 秒。
start_position:logstash 從什么位置開始讀取文件數據,默認是結束位置,也就是說 logstash 進程會以類似 tail -F 的形式運行。如果你是要導入原有數據,把這個設定改成 "beginning",logstash 進程就從頭開始讀取,有點類似 cat,但是讀到最后一行不會終止,而是繼續變成 tail -F。
接下來再做一個實驗,收集系統日志:
# vim file.conf
input{
file{
path => ["/var/log/messages","/var/log/secure"]
type => "system-log"
start_position => "beginning"
}
}
filter{
}
output{
elasticsearch {
hosts => ["172.16.0.3:9200"]
index => "system-log-%{+YYYY.MM}"
}
}
# /usr/share/logstash/bin/logstash -f /usr/share/logstash/config/conf.d/file.conf


可以看到system-log的相關信息已經出來了。
4、ELK-Kibana簡單使用
kibana為ELK的一個模塊,為用戶提供可視化界面。4.6.5版本。
# wget https://download.elastic.co/kibana/kibana/kibana-4.6.5-x86_64.rpm
# rpm --install kibana-4.6.5-x86_64.rpm
# vim /opt/kibana/config/kibana.yml
elasticsearch.url: "http://172.16.0.3:9200"
kibana.index: ".kibana"
# systemctl enable kibana
# systemctl start kibana
# netstat -lntup
Active Internet connections (only servers)
Proto Recv-Q Send-Q Local Address Foreign Address State PID/Program name
tcp 0 0 0.0.0.0:5601 0.0.0.0:* LISTEN 63193/node
打開網頁之后,默認會在setting頁面,kibana會引導設置第一個監控頁面。通過選擇時間戳或者輸入名稱可以對已經設置了的日志文件進行匹配然后直接讀取。之后就可以在discover頁面進行搜索了。需要注意的是,kibana不會自己去發現日志,所以必須通過setting進行手動添加才能讀出來。同時,也會給被讀取的文件記錄下一個叫.sincedb的隱藏文件,如果不想使用默認的$HOME/.sincedb,可以自己通過sincedb_path來定義路徑。


5、ELK-Logstash-Input-if判斷
設計收集elasticsearch的日志myes.log,先按照以前的老方法配置:
[root@3-linux-node01 ~]# vim /usr/share/logstash/config/conf.d/file.conf
input{
file{
path => ["/var/log/messages","/var/log/secure"]
type => "system-log"
start_position => "beginning"
}
file{
path => "/var/log/elasticsearch/myes.log"
type => "es-log"
start_position => "beginning"
}
}
filter{
}
output{
if [type] == "system-log" {
elasticsearch {
hosts => ["172.16.0.3:9200"]
index => "system-log-%{+YYYY.MM}"
}
}
if [type] == "es-log" {
elasticsearch {
hosts => ["172.16.0.3:9200"]
index => "es-log-%{+YYYY.MM}"
}
}
}
# /usr/share/logstash/bin/logstash -f /usr/share/logstash/config/conf.d/file.conf
總是會遇到報錯:
[2018-05-26T18:34:17,069][FATAL][logstash.runner ] Logstash could not be started because there is already another instance using the configured data directory. If you wish to run multiple instances, you must change the "path.data" setting.
這里是要運行多實例,修改啟動命令:
# /usr/share/logstash/bin/logstash -f /usr/share/logstash/config/conf.d/file.conf --path.data /data/
啟動之后可以看到已經開始出現日志,但是顯示出現了"行"與"事件"的情況:
實際的日志:
[2018-05-26 18:00:06,384][WARN ][transport.netty ] [linux-node01] exception caught on transport layer [[id: 0x9fb06663]], closing connection
java.net.NoRouteToHostException: 沒有到主機的路由
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
at org.jboss.netty.channel.socket.nio.NioClientBoss.connect(NioClientBoss.java:152)
at org.jboss.netty.channel.socket.nio.NioClientBoss.processSelectedKeys(NioClientBoss.java:105)
at org.jboss.netty.channel.socket.nio.NioClientBoss.process(NioClientBoss.java:79)
at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:337)
at org.jboss.netty.channel.socket.nio.NioClientBoss.run(NioClientBoss.java:42)
at org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)
at org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
而收集到的日志(從下往上看):


實際上雖然日志沒有漏下,但是將一條日志采集成了多條日志,即logstash是按照"行"進行的采集。那么這個不是我所期望的,因此需要將其改成按"事件"進行采集。
6、ELK-Logstash-Codec-multiline
先插一個小知識點,搜索的語法:
接下來解決上面說的按"行"收集日志的問題,這里就需要用到multiline插件了。
filter {
multiline {
pattern => "pattern, a regexp" //可以支持正則表達式
negate => boolean
what => "previous" or "next"
}}
先來演示一下用法:
使用正則表達式匹配以"["開頭的行,兩個以"["開頭的行中間的內容,就會被認為是一個"事件",記錄並且輸出出來,換行符會以\n記錄下來。
# vim odec.conf
input{
stdin{
codec => multiline{
pattern => "^\["
negate => true
what => "previous"
}
}
}
filter{
}
output{
stdout{
codec => rubydebug
}
}
# /usr/share/logstash/bin/logstash -f codec.conf --path.data /data/
ERROR StatusLogger No log4j2 configuration file found. Using default configuration: logging only errors to the console.
Sending Logstash's logs to /var/log/logstash which is now configured via log4j2.properties
The stdin plugin is now waiting for input:
[hjashfasj
dsajcbajs
dashidhasn
[
{
"@timestamp" => 2018-05-26T18:02:46.631Z,
"@version" => "1",
"host" => "3-linux-node01",
"message" => "[hjashfasj\ndsajcbajs\ndashidhasn",
"tags" => [
[0] "multiline"
]
}
接下來修改正式的啟動配置文件:
# vim /usr/share/logstash/config/conf.d/file.conf
input{
file{
path => ["/var/log/messages","/var/log/secure"]
type => "system-log"
start_position => "beginning"
}
file{
path => "/var/log/elasticsearch/myes.log"
type => "es-log"
start_position => "beginning"
codec => multiline{ //加上這一段
pattern => "^\["
negate => true
what => "previous"
}
}
}
filter{
}
output{
if [type] == "system-log" {
elasticsearch {
hosts => ["172.16.0.3:9200"]
index => "system-log-%{+YYYY.MM}"
}
}
if [type] == "es-log" {
elasticsearch {
hosts => ["172.16.0.3:9200"]
index => "es-log-%{+YYYY.MM}"
}
}
}
之后啟動logstash,發現日志已經按"事件"來進行采集了:


接下來是sincedb的一些實驗:
file{
path => ["/var/log/messages","/var/log/secure"]
type => "system-log"
start_position => "beginning"
sincedb_path => "/data/.sincedb_mes_sec"
}
可以看到有文件顯示出來:
[root@3-linux-node01 data]# ls -a
. .. dead_letter_queue .lock myes plugins queue .sincedb_ela .sincedb_mes_sec uuid
[root@3-linux-node01 data]# cat .sincedb_ela
17180165 0 64768 47540
[root@3-linux-node01 data]# ll -i /var/log/elasticsearch/myes.log
17180165 -rw-r--r-- 1 elasticsearch elasticsearch 48029 5月 27 17:00 /var/log/elasticsearch/myes.log
7、ELK-Logstash-Codec-json
對於某些日志,如果顯示成一團會影響閱讀,這個時候就要是用到json插件了。比如Nginx。
方法1、nginx日志改成json輸出:
對nginx配置文件的日志格式以及讀取方式做如下修改:
log_format access_log_json '{"user_ip":"$http_x_real_ip","lan_ip":"$remote_addr","log_time":"$time_iso8601","user_req":"$request","http_code":"$status","body_bytes_sent":"$body_bytes_sent","req_time":"$request_time","user_ua":"$http_user_agent"}';
access_log /var/log/nginx/access_log_json.log access_log_json;
之后寫配置文件:
input{
file{
path => "/var/log/nginx/access_log_json.log"
codec => "json"
}
}
filter{
}
output{
elasticsearch{
hosts => ["172.16.0.3:9200"]
index => "nginx-access-log-%{+YYYY.MM.dd}"
}
stdout{
codec => rubydebug
}
}
加與不加json的區別在於:
{
"path" => "/var/log/nginx/access_log_json.log",
"@timestamp" => 2018-05-28T07:45:57.070Z,
"@version" => "1",
"host" => "4-linux-node02",
"message" => "{\"user_ip\":\"-\",\"lan_ip\":\"172.16.0.4\",\"log_time\":\"2018-05-28T15:45:56+08:00\",\"user_req\":\"GET / HTTP/1.0\",\"http_code\":\"200\",\"body_bytes_sent\":\"612\",\"req_time\":\"0.000\",\"user_ua\":\"ApacheBench/2.3\"}"
}
和
{
"user_ip" => "-",
"path" => "/var/log/nginx/access_log_json.log",
"@timestamp" => 2018-05-28T07:47:02.448Z,
"http_code" => "200",
"body_bytes_sent" => "612",
"lan_ip" => "172.16.0.4",
"user_req" => "GET / HTTP/1.0",
"@version" => "1",
"host" => "4-linux-node02",
"user_ua" => "ApacheBench/2.3",
"log_time" => "2018-05-28T15:46:44+08:00",
"req_time" => "0.000"
}
可以達成的效果:


可以在左側選擇需要查看的選項,如果不選擇默認則是全部顯示成一條,這就是json的應用。對於http請求以及響應,可能只是去搜索其中一個字段,比如http_code,這樣可以更加簡潔明了的顯示出來。
方法2、文件直接收取進redis,然后是用Python腳本讀取redis,寫成json后寫入ES。
十三、ELKStack(下)
1、ELK-kibana圖形化
kibana可以支持可視化,用不同的圖形和模塊來實現:


這里使用Markdown,metric和vertical bar還有搜索方案來做一個面板:


選擇nginx-access-log作為源,之后可以選擇不同的模塊進行監控:


之后保存:


之后在面板上添加:


達到最終的效果:


對之前的搜索同樣使用,顯示的模塊為搜索的結果。
2、ELK-LogStash實戰-input插件rsyslog
Rsyslog是一個input插件,使用514端口。遠端的機器會將日志信息發送至logstash監聽的514端口,logstash通過監聽514端口來獲取對應的日志信息,達到搜集日志的目的。
編輯啟動測試腳本:
# vim /etc/logstash/conf.d/syslog.conf
input{
syslog{
type => "system-syslog"
port => "514"
}
}
filter{
}
output{
stdout{
codec => rubydebug
}
}
# netstat -lntup
Active Internet connections (only servers)
Proto Recv-Q Send-Q Local Address Foreign Address State PID/Program name
tcp6 0 0 :::514 :::* LISTEN 1889/java
udp6 0 0 :::514 :::* 1889/java
修改被采集syslog的主機(172.16.0.3)的rsyslog配置文件:
# vim /etc/rsyslog.conf
# remote host is: name/ip:port, e.g. 192.168.0.1:514, port optional
*.* @@172.16.0.4:514 //改成遠端主機的IP地址
小知識點:
# The authpriv file has restricted access.
authpriv.* /var/log/secure
# Log all the mail messages in one place.
mail.* -/var/log/maillog
在路徑前面加一個-表示不立即生效。
# systemctl restart rsyslog
重啟之后就可以在遠端主機(172.16.0.4)上看到有日志更新出來了
接下來修改測試腳本為正式腳本:
# vim syslog.conf
input{
syslog{
type => "system-syslog"
port => "514"
}
}
filter{
}
output{
elasticsearch{
hosts => ["172.16.0.4:9200"]
index => "system-syslog-%{+YYYY.MM}"
}
}
跑起來之后雖然kibana的顯示無誤,但是始終在報錯如下:
[2018-05-28T21:33:18,154][INFO ][logstash.inputs.syslog ] Starting syslog udp listener {:address=>"0.0.0.0:514"}
[2018-05-28T21:33:18,156][WARN ][logstash.inputs.syslog ] syslog listener died {:protocol=>:udp, :address=>"0.0.0.0:514", :exception=>#<Errno::EADDRINUSE: Address already in use - bind - å°åå·²å¨ä½¿ç¨>, :backtrace=>["org/jruby/ext/socket/RubyUDPSocket.java:161:in `bind'", "/usr/share/logstash/vendor/bundle/jruby/1.9/gems/logstash-input-syslog-3.2.1/lib/logstash/inputs/syslog.rb:141:in `udp_listener'", "/usr/share/logstash/vendor/bundle/jruby/1.9/gems/logstash-input-syslog-3.2.1/lib/logstash/inputs/syslog.rb:122:in `server'", "/usr/share/logstash/vendor/bundle/jruby/1.9/gems/logstash-input-syslog-3.2.1/lib/logstash/inputs/syslog.rb:102:in `run'"]}
[2018-05-28T21:33:18,165][INFO ][logstash.inputs.syslog ] Starting syslog tcp listener {:address=>"0.0.0.0:514"}
[2018-05-28T21:33:18,167][WARN ][logstash.inputs.syslog ] syslog listener died {:protocol=>:tcp, :address=>"0.0.0.0:514", :exception=>#<Errno::EADDRINUSE: Address already in use - bind - å°åå·²å¨ä½¿ç¨>, :backtrace=>["org/jruby/ext/socket/RubyTCPServer.java:118:in `initialize'", "org/jruby/RubyIO.java:871:in `new'", "/usr/share/logstash/vendor/bundle/jruby/1.9/gems/logstash-input-syslog-3.2.1/lib/logstash/inputs/syslog.rb:159:in `tcp_listener'", "/usr/share/logstash/vendor/bundle/jruby/1.9/gems/logstash-input-syslog-3.2.1/lib/logstash/inputs/syslog.rb:122:in `server'", "/usr/share/logstash/vendor/bundle/jruby/1.9/gems/logstash-input-syslog-3.2.1/lib/logstash/inputs/syslog.rb:106:in `run'"]}
結合啟動日志發現tcp和udp都在監聽同一個端口,懷疑可能是由於這個原因引起:
[2018-05-28T21:43:39,451][INFO ][logstash.inputs.syslog ] Starting syslog udp listener {:address=>"0.0.0.0:514"}
[2018-05-28T21:43:39,555][INFO ][logstash.inputs.syslog ] Starting syslog tcp listener {:address=>"0.0.0.0:514"}
3、ELK-LogStash實戰-input插件tcp
TCP可以用來實現抓取tcp對應端口的日志文件。
# vim tcp.conf
input{
tcp{
type = "tcp"
port => "6666"
mode => "server"
}
}
filter{
}
output{
stdout{
codec => rubydebug
}
}
# /usr/share/logstash/bin/logstash -f tcp.conf --path.data /data
執行完成后可以看到java監聽的6666端口:
# netstat -lntup
Active Internet connections (only servers)
Proto Recv-Q Send-Q Local Address Foreign Address State PID/Program name
tcp6 0 0 :::6666 :::* LISTEN 5676/java
測試一下:
# echo "hehe" | nc 172.16.0.4 6666
收到消息:
{
"@timestamp" => 2018-05-30T10:08:17.088Z,
"port" => 40620,
"@version" => "1",
"host" => "172.16.0.3",
"message" => "hehe",
"type" => "tcp"
}
小技巧:
# nc 172.16.0.4 6666 < test.txt
# echo "hehe1" > /dev/tcp/172.16.0.4/6666
4、ELK-LogStash實戰-filter插件grok
對於Apache的日志,不能支持json插件,可以使用filter的grok插件完成。
寫配置文件:
# cat grok.conf
input{
stdin {}
}
filter{
grok{
match => { "message" => "%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}" }
}
}
output{
stdout {
codec => rubydebug
}
}
驗證效果:
# /usr/share/logstash/bin/logstash -f grok.conf --path.data /data
ERROR StatusLogger No log4j2 configuration file found. Using default configuration: logging only errors to the console.
Sending Logstash's logs to /var/log/logstash which is now configured via log4j2.properties
The stdin plugin is now waiting for input:
55.3.244.1 GET /index.html 15824 0.043
{
"duration" => "0.043",
"request" => "/index.html",
"@timestamp" => 2018-05-30T13:17:37.564Z,
"method" => "GET",
"bytes" => "15824",
"@version" => "1",
"host" => "3-linux-node01",
"client" => "55.3.244.1",
"message" => "55.3.244.1 GET /index.html 15824 0.043"
}
達到這樣的效果之后就可以正常的將結果輸出到es中去了。但是需要注意的是,grok很吃性能,如果不是很懂ruby的話,那么grok就不靈活。
5、ELK-LogStash實戰-采集Apache日志
一般情況下是不會用到grok去采集的,缺點上面說了。對於傳參數量巨大(甚至可能是攻擊)grok就不行了,有使用logstash將日志抓去redis,然后使用python腳本將日志過濾之后導入es。
但是學了可以實踐一下,grok會自帶一些參數可以直接調用,具體路徑如下:
# pwd
/usr/share/logstash/vendor/bundle/jruby/1.9/gems/logstash-patterns-core-4.1.1/patterns
# ls
aws exim httpd maven nagios ruby
bacula firewalls java mcollective postgresql squid
bind grok-patterns junos mcollective-patterns rails
bro haproxy linux-syslog mongodb redis
這里需要使用到grok-patterns文件,但是在當前版本中似乎沒有apache的日志格式,所以可以手動添加:
# Log formats
SYSLOGBASE %{SYSLOGTIMESTAMP:timestamp} (?:%{SYSLOGFACILITY} )?%{SYSLOGHOST:logsource} %{SYSLOGPROG}:
COMMONAPACHELOG %{IPORHOST:clientip} %{HTTPDUSER:ident} %{USER:auth} \[%{HTTPDATE:timestamp}\] "(?:%{WORD:verb} %{NOTSPACE:request}(?: HTTP/%{NUMBER:httpversion})?|%{DATA:rawrequest})" %{NUMBER:response} (?:%{NUMBER:bytes}|-)
COMBINEDAPACHELOG %{COMMONAPACHELOG} %{QS:referrer} %{QS:agent}
書寫啟動文件:
# vim apache_log.conf
input{
file {
path => "/var/log/httpd/access_log"
start_position => "beginning"
}
}
filter{
grok{
match => { "message" => "%{COMMONAPACHELOG}" }
}
}
output{
stdout{
codec => rubydebug
}
}
可以看到訪問結果:
{
"request" => "/",
"auth" => "-",
"ident" => "-",
"verb" => "GET",
"message" => "172.16.0.1 - - [31/May/2018:15:55:14 +0800] \"GET / HTTP/1.1\" 200 13 \"-\" \"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/66.0.3359.170 Safari/537.36 OPR/53.0.2907.68\"",
"path" => "/var/log/httpd/access_log",
"@timestamp" => 2018-05-31T07:55:15.030Z,
"response" => "200",
"bytes" => "13",
"clientip" => "172.16.0.1",
"@version" => "1",
"host" => "4-linux-node02",
"httpversion" => "1.1",
"timestamp" => "31/May/2018:15:55:14 +0800"
}
之后就可以使用output將日志寫去es中
# vim apache_log.conf
input{
file {
path => "/var/log/httpd/access_log"
start_position => "beginning"
}
}
filter{
grok{
match => { "message" => "%{COMMONAPACHELOG}" }
}
}
output{
elasticsearch {
hosts => ["172.16.0.4:9200"]
index => "apache-%{+YYYY.MM.dd}"
}
}
看看kibana:


正常顯示
6、ELK-使用消息隊列擴展
線上環境出了使用插件完成日志收集之外,還可以使用消息隊列。logstash進行采集至redis,然后通過消息隊列處理之后再發給logstash。這里就學習一下怎么通過插件將日志送入redis。
配置文件:
# vim redis.conf
input{
stdin{
}
}
filter{}
output{
redis{
host => "172.16.0.4"
port => "6379"
db => "6"
data_type => "list"
key => "demo"
}
}
# vim /etc/redis.conf
...
bind 172.16.0.4
...
daemonize yes
...
# /usr/share/logstash/bin/logstash -f /usr/share/logstash/config/conf.d/redis.conf --path.data /data
ERROR StatusLogger No log4j2 configuration file found. Using default configuration: logging only errors to the console.
Sending Logstash's logs to /var/log/logstash which is now configured via log4j2.properties
The stdin plugin is now waiting for input:
disahdioashd //隨便輸入4行
dsaihfashfa
dshiacvash
dssjiadh
然后去172.16.0.4上面看redis:
172.16.0.4:6379> info
# Keyspace
db0:keys=1,expires=0,avg_ttl=0
db6:keys=1,expires=0,avg_ttl=0 //有db6產生
172.16.0.4:6379> select 6
OK
172.16.0.4:6379[6]> KEYS *
1) "demo"
172.16.0.4:6379[6]> LLEN demo
(integer) 4
172.16.0.4:6379[6]> LINDEX demo -1
"{\"@timestamp\":\"2018-06-01T03:26:17.080Z\",\"@version\":\"1\",\"host\":\"3-linux-node01\",\"message\":\"dssjiadh\"}"
可以看到demo里面的值和輸入的值相同。
然后寫實際應用的腳本:
# vim apache.conf
input{
file {
path => "/var/log/httpd/access_log"
start_position => "beginning"
}
}
filter{
}
output{
redis{
host => "172.16.0.4"
port => "6379"
db => "7"
data_type => "list"
key => "apache-accesslog"
}
}
接下來再從redis中讀出來,寫入es中去。結合前幾章的內容,apache的日志需要經過grok的處理才可以寫入es,所以需要些filter了。
# vim index.conf
input{
redis{
host => "172.16.0.4"
port => "6379"
db => "7"
data_type => "list"
key => "apache-accesslog"
}
}
filter{
grok{
match => { "message" => "%{COMMONAPACHELOG}" }
}
}
output{
elasticsearch {
hosts => ["172.16.0.3:9200"]
index => "apache-%{+YYYY.MM.dd}"
}
}
之后同時運行apache.conf和index.conf,就可以看到redis里面沒有了,而出現在es中了。如果redis的日志只增不減,就要注意一下了,表示es讀不過來或者根本沒有在讀了,如果時間變長空間占滿的話,可能會導致日志丟失。
細講:
對於一個搜索引擎來講,主要有兩個模塊,分別是索引的建立以及結果的展示。可以使用類似爬蟲程序去網上爬所有的網頁,並且返回每個網站的信息,以此來獲取全球范圍內的網站信息。之后通過獲取到的網站信息(原始數據)來創建提供搜索的內容(文檔)。之后分析所有的文檔來創建索引。這一塊就是索引的建立。之后搜索引擎提供用戶接口UI(搜索頁面)來讓用戶執行搜索操作,這一塊便是搜索結果的展示。如果說結果的展示使用的是Elasticsearch的話,索引的建立則是使用Lucene。
Lucene:
文檔:Document
包含一個或多個域的容器。文檔就是由field:value組成,一個filed:value組合被稱為一個域。
域:創建域的時候可以通過給域多個選項,控制lucene將文件添加進域索引之后可以對域進行什么樣的操作。包括索引選項、存儲選項、域向量使用選項。
索引選項通過倒排索引控制文本是否可被搜索。
Index.ANYLYZED:分析(切詞)並單獨作為索引項;
Index.Not-ANYLYZED:不分析(不切詞),把整個內容當做一個索引項;
Index.ANYLYZED_NORMS:類似於Index.ANYLYZED,但是不存儲token的Norms(加權基准)信息
Index.Not_ANYLYZED_NORMS:類似於Index.Not_ANYLYZED,但是不存儲token信息
Index.NO:不做索引
存儲選項用於確定是否需要存儲域的真實值
store.YES:存儲真實值
store.NO:不存儲真實值
域向量選項用於在搜索期間該文檔所有的唯一項都能完全從文檔中檢索時使用。
文檔和域的加權操作
搜索:
查詢Lucene索引時,它返回的是一個有序的scoreDOC對象:查詢時,Lucene會為每個文檔計算出分值並且排序。
API:
IndexSearcher:搜索索引入口。
Query及其子類:
QueryParser
TopDocs
Lucene的多元化查詢:
IndexSearcher中的search方法:
TermQuery:對索引中的特定項進行搜索,Term是索引中的最小索引片段,每個Term包含一個域名和一個文本值。
TermRangeQuery:在索引中的多個特定項中進行搜索,能搜索指定的多個域。
NumericRangeQuery:做數值范圍搜索。
PrefixQuery:用於搜索以指定字符串開頭的項。
BooleanQuery:實現組合查詢,組合邏輯為AND, OR, NOT
PhraseQuery:根據詞語的長度以及位置信息
WildcardQuery:通配符
FuzzyQuery:模糊查詢
Elasticsearch是一個基於Lucene實現的開源、分布式、Restful的全文本搜索引擎;此外,它還是一個分布式實時文檔存檔,其中每個文檔的每個field都是被索引的數據,且都可被搜索;也是一個帶實時分析功能的分布式搜索引擎,能擴展至數以百計的節點實時處理PB級的數據。
基本組件:
索引(index):文檔容器,具有類似屬性的文檔的合集。類似於表。必須使用小寫。
類型(type):索引是索引內部的邏輯分區,其意義完全取決於用戶需求。一個索引可以定義一個或者多個類型。一般來說,類型就是擁有相同的域的文檔的預定義。
文檔(document):文檔是Lucene索引和搜索的原子單位,它包含了一個或多個域,是域的容器,基於JSON格式表示。每個域的組成部分,是由一個名字,一個或多個值,擁有多個值的域,通常稱為多值域。
映射(mapping):原始內容存儲為文檔之前,需要事先進行分析,例如切詞、過濾掉某些詞等,映射用於定義此分析機制該如何實現。此外,ES還為映射提供了諸如將域中的內容排序等功能。
ES的集群組件:
Cluster:ES集群標識為集群名稱。一個節點只能屬於一個集群。
Node:運行了單個ES實例的主機即為節點。用於存儲數據、參與集群索引及搜索操作。節點標識為節點名。
Shard:將索引切割成為的物理存組件,但是每一個shard都是一個獨立且完整索引;創建索引時,ES默認將其分割為5個(或者自定義)shard。
shard有兩種類型:primary shard和replica shard。每個索引都會創建出5個主shard,每個主shard都有一個(或者自定義個)replica shard。Replica用於數據冗余以及查詢時的負載均衡。primary和replica shard的數量都可以自定義,不同點在於primary定義之后無法修改,replica定義之后可以修改。
index.number_of_shards和index.number_of_replicas在5.x版本里面不支持在yml文件中修改了,如果需要修改的話要使用下面的方法:
# curl -XPUT ip:9200/index_name -d '{
"settings":{
"index": {
"number_of_shards": "10",
"number_of_replicas": "1",
"max_result_window": 999999
}
}
}'
# curl -XPUT
http://ip:9200/myindex/_settings -d'{"index.number_of_replicas": 2}'
ES Cluster工作過程:
啟動時,通過組播(默認)或者單播方式在9300/tcp查找同意集群中的其他節點,並與之建立通信。會選擇出一個主節點負責管理整個集群狀態,以及在集群范圍內決定各shards分布方式,每個均可接收並響應用戶的各類請求。
集群狀態有:green、red和yellow。
JDK:
Oracle JDK
OpenJDK
ES默認端口:
參與集群事物:9300/tcp:transport.tcp.port
訪問以及接收請求:9200/tcp:http.port
Restful API:
1.檢查集群、節點、索引等健康與否,以及獲取其相應狀態;
2.管理集群、節點、索引及元數據;
3.執行CRUD操作;
4.執行高級操作,例如paging、filtering等
_cat API:
# curl -X GET '
http://172.16.0.3:9200/?preey'
# curl -X GET '
http://172.16.0.3:9200/_cat'
# curl -X GET '
http://172.16.0.3:9200/_cat/nodes'
# curl -X GET '
http://172.16.0.3:9200/_cat/nodes?v' //加?v看詳細信息
# curl -X GET '
http://172.16.0.3:9200/_cat/master'
# curl -X GET '
http://172.16.0.3:9200/_cat/master?v'
# curl -X GET '
http://172.16.0.3:9200/_cat/nodes?help' //加help之后可以看到具體的使用方法
_cluster API:
# curl -X GET '
http://172.16.0.3:9200/_cluster/health?pretty' //查看集群信息
# curl -X GET '
http://172.16.0.3:9200/_cluster/state/version?pretty'
# curl -X GET '
http://172.16.0.3:9200/_cluster/state/master_node?pretty'
# curl -X GET '
http://172.16.0.3:9200/_cluster/stats?pretty'
Plugins:
插件擴展ES的功能:
添加自定義的映射類型、自定義分析器、本地腳本、自定義發現方式
安裝:
直接將插件放置於plugins目錄下即可
使用plugin腳本
# /usr/share/elasticsearch/bin/plugin -h(install/remove/list)
# /usr/share/elasticsearch/bin/plugin list
Installed plugins in /usr/share/elasticsearch/plugins:
- marvel-agent
- license
- head
- kopf
- bigdesk
站點插件:
可以通過_plugin API直接訪問的,
http://host:9200/_plugin/plugin_name
CRUD操作相關的API:
CRUD主要用在文檔的增刪改查。
創建文檔:
# curl -XPUT '172.16.0.3:9200/students/class1/1?pretty' -d '
{
"first_name":"Jing",
"last_name":"Guo",
"gender":"Male",
"age":25,
"courses":"Xianglong Shiba Zhang"
}'
如果數據出現重復,PUT操作會直接覆蓋掉原有的數據,所以創建的時候要小心。
獲取文檔:
# curl -XGET '172.16.0.3:9200/students/class1/1?pretty'
{
"_index" : "students",
"_type" : "class1",
"_id" : "1",
"_version" : 3,
"found" : true,
"_source" : {
"first_name" : "Jing",
"last_name" : "Guo",
"gender" : "Male",
"age" : 25,
"courses" : "Xianglong Shiba Zhang"
}
}
更新文檔:
# curl -XPOST '172.16.0.3:9200/students/class1/2/_update?pretty' -d '
{
"doc":{ "age":22 }
}'
刪除文檔:
# curl -XDELETE '172.16.0.3:9200/students/class1/2'
刪除索引:
# curl -XGET '172.16.0.3:9200/_cat/indices?v' //查看索引
# curl -XDELETE '172.16.0.3:9200/students' //刪除索引
查詢數據:
Query API:
Query DSL(Domain Search Language):JSON based language for building complex queries.用戶實現諸多類型的查詢類型,比如,simple term query, phrase, range boolean, fuzzy等;
ES的查詢操作執行分為兩個階段:
分散階段:將查詢請求分散到各個節點上面去。
合並階段:將查詢結果匯總到主節點上面去。
查詢方式:
向ES發起查詢請求的方式有兩種:
1、通過Restful request API查詢,也成為query string;
2、通過發送REST request body進行;
# curl -XGET '172.16.0.3:9200/students/_search?pretty'
{
"took" : 144, //執行時間,單位ms
"timed_out" : false, //是否超時
"_shards" : {
"total" : 5, //有幾個分片
"successful" : 5, //涉及多少分片
"failed" : 0
},
"hits" : { //命中文檔
"total" : 1, //命中了幾個
"max_score" : 1.0,
"hits" : [ { //命中的具體內容,使用數組表示
"_index" : "students",
"_type" : "class1",
"_id" : "1",
"_score" : 1.0,
"_source" : {
"first_name" : "Jing",
"last_name" : "Guo",
"gender" : "Male",
"age" : 25,
"courses" : "Xianglong Shiba Zhang"
}
} ]
}
}
# curl -XGET '172.16.0.3:9200/students/_search?pretty' -d '
> {
> "query":{ "match_all": {} }
> }'
多索引、多類型查詢:
/_search:所有索引
/INDEX_NAME/_search:單索引
/INDEX1,INDEX2/_search:多索引
/s*,t*/_search:通配符索引
/students/class1/_search:單類型搜索
/students/class1,class2/_search:多類型搜索
Mapping和Analysis:
ES:對每一個文檔,會取的其所有域的所有值,生成一個名為all的域。如果query_search未指定查詢的域,則在_all域上執行查詢操作。
Mapping:在各個特定域中的數據類型可能會不一致,mapping可以看到一個文檔中的數據類型是如何被定義的。
# curl '172.16.0.3:9200/students/_mapping/class1?pretty'
ES中搜索的數據廣義上可被理解為兩位:Type:exact(指明類型),full-text(全文搜索)
精確值:未經過加工的原始值,在搜索時進行精確匹配
full-text:用於引用文本中數據,判斷文檔在多大程度上匹配查詢請求,而非做精確匹配。即文檔與用戶請求查詢的相關度。為了完成full-text搜索,ES必須首先分析文本,並創建出倒排索引,倒排索引中的數據還需要"正規化"為標准格式。
分析需要由分析器進行:analyzer。由字符過濾器、分詞器、分詞過濾器組件構成。內置分析器有standard analyzer、simple analyzer、whitespace analyzer、language analyzer
Query DSL:
request body:
query dsl:執行full-text查詢時,基於相關度來評判其匹配結果。此方法執行結果復雜。
match_all Query:用於匹配所有文檔,沒有指定任何query
{"match_all":{}}
match Query:在幾乎任何域上執行full-text或者exact-value查詢。如果執行full-text查詢,首先對查詢語句做分析,如果執行exact-value查詢,將搜索精確值。
multi_match Query:用於在多個域上執行相同的查詢
{"multi_match":
"query":full-text search
"filed":{'filed1','filed2'}
}
bool query:基於boolean邏輯合並多個查詢語句;與bool filter不同的是查詢子句不是返回"yes"或"no",而是其計算出的匹配分值,因此boolean Query會為各子句合並其score
filter dsl:執行exact查詢時,基於其結果為"yes"或者"no"來評判。此方法速度快且結果緩存。
查詢語句結構:
{
QUERY_NAME:{
AGGUMENT: VALUE,
AGGUMENT: VALUE,...
}
}
{
QUERY_NAME: {
FILED_NAME: {
ARGUMENT: VALUE,...
}
}
}
term filter:精確匹配包含指定term的文檔
{"term": {"key":"value"}}
# curl -XGET '172.16.0.3:9200/students/_search' -d '
> {
> "query":{
> "term":{"name":"Guo"}
> }
> }'
關於這個實例遇到了一個小問題,當匹配"name":"Guo"的時候,無法匹配,但是"name":"guo"就可以匹配了。這里的原因在於分析器不同。雖然PUT進去的數據是Guo沒錯,但是默認分析器是analyzed,數據已經被處理(大寫被改成小寫),所以存儲的數據實際上是guo。對於string類型的filed index 默認值是: analyzed.如果我們想對進行精確查找, 那么我們需要將它設置為:not_analyzed。
terms filter:用於多值精確匹配
{"terms":{"key":["value1","value2"]}}
range filter:用於在指定的范圍內查找數值或時間,只能查數值或時間
{"range":"age"{"gte":15,"lte":25}}
gt,lt,gte,lte和shell腳本一樣
exists and missing filter:判斷值是否存在
{"exists":{"age":25}}
boolean filter:基於boolean的邏輯來合並多個filter子句。
must:內部子句條件必須同時匹配
must:{"term":{"age":25}"term":{"name":"Guo"}}
must_not:其所有子句必須不匹配
must_not:{"term":{"age":25}"term":{"name":"Guo"}}
should:至少有一個子句匹配
should:{"term":{"age":25}"term":{"name":"Guo"}}
合並filter和query:filter是過濾,query是查詢,常常會將filter用於query中進行過濾,而不會講query用於filter進行查詢。
查詢語句語法檢查:
GET /INDEX/_validate/query?explain&pretty
{
...
}
LOGSTASH:
logstash是個整合的框架,雖然自己也有索引構建功能,但是現在不用了。由於logstash占用的資源很多,因此大多數情況下會自己開發收集日志的程序,然后傳給kafka,kafka是一種分布式的消息隊列,可以替代logstash完成日志收集。
支持多種數據獲取機制,通過TCP/UDP協議、文件、syslog、windows eventlogs以及STDIN等;獲取到數據后還可以支持對數據進行過濾、修改等操作。使用JRuby語言,所以必須運行在JVM環境。為agent/server架構。
如果agent的數量過多,那么可以在agent與server中間搭建一個broker,可能會使用消息隊列(rabbitmq、activemq、qpid、zeromq、kafka、redis等),來接駁agent和server。關於幾種消息隊列的選擇,可以參考文章:
https://blog.csdn.net/vtopqx/article/details/76382934
配置框架:
input{...}
filter{...} //如果無需對數據進行額外處理,則可以省略filter
output{...}
四種類型的插件:
input,filter,codec,output
數據類型:
array: [item1, item2, ...]
boolean: true, false
bytes:
codec: 編碼器
hash: key => value
number:
password:
path: 文件系統路徑
string: 字符串
條件判斷:
==, !=, <, <=, >, >=, =~, !~, in, not in, and, or
():多個條件判斷
logstash的插件:
input插件:
file:從指定的文件中讀取事件流。使用FileWatch監聽文件的變化,然后將文件的變化保存在一個的.sincedb的隱藏文件中。有了.sincedb文件之后,如果讀取文件的過程中將logstash關了,再打開之后也不會重新去讀取文件的,節省時間與資源,也不會漏掉,還支持文件的滾動讀取。默認這個文件是存放在啟動logstash進程的用戶的家目錄里面的。在日志讀取的過程中,盡量不要指定文件是beginning(start_position => "beginning"),這樣的話不會從斷點開始讀取的。
udp:通過udp協議通過網絡連接來讀取message,其唯一必備參數為port,指定自己監聽的端口,用來接收其他主機發來的數據,host則用來指明自己監聽的地址。
collectd:性能監控程序;
安裝完成之后可以修改配置文件來使collectd收集特定日志,並且將收集到的日志文件發送給特定的端口(/etc/collectd.conf):
#LoadPlugin memcached
LoadPlugin memory
##LoadPlugin mic
LoadPlugin network
<Plugin network>
<Server "172.16.0.4" "25125">
</Server>
</Plugin>
接下來配置啟動腳本:
input {
udp{
port => 25125
codec => collectd {}
type => "collectd"
}
}
output {
stdout {
codec => rubydebug
}
}
然后可以看到kibana上有收集到collectd信息。
redis:從redis讀取數據,支持redis channel和lists兩種方式;
filter插件:主要用於將event執行output之前對其實現處理功能。
grok:用於分析並結構化文本數據;目前是logstash中將非格式化日志數據轉化為結構化的可查詢數據的不二之選。
預定義grok的模式:模式定義默認位置:/usr/share/logstash/vendor/bundle/jruby/1.9/gems/logstash-patterns-core-4.1.1/patterns/grok-patterns,可以使用patterns_dir => [ "XXX" ]來指定定義模式的文件的位置。
模式的語法格式:%{SYNTAX:SEMANTIC};其中SYNTAX是預定義模式名稱;SEMANTIC是匹配到的文本的自定義標識符。對於想要定義的模式,都是要已經定義了的,也是在這個文件里面,如果沒有定義的話,就需要自己用全大寫來自己進行定義。
先來看一個示例:
172.16.0.1 - - [13/Jun/2018:16:24:15 +0800] "GET /favicon.ico HTTP/1.1" 404 209 "
http://172.16.0.4/" "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/66.0.3359.170 Safari/537.36 OPR/53.0.2907.68"
COMMONAPACHELOG %{IPORHOST:clientip} %{HTTPDUSER:ident} %{USER:auth} \[%{HTTPDATE:timestamp}\] "(?:%{WORD:verb} %{NOTSPACE:request}(?: HTTP/%{NUMBER:httpversion})?|%{DATA:rawrequest})" %{NUMBER:response} (?:%{NUMBER:bytes}|-)
所對應的關系就有了:%{IPORHOST:clientip}|172.16.0.1;%{HTTPDUSER:ident}|-;%{USER:auth}|-;\[%{HTTPDATE:timestamp}\]|[13/Jun/2018:16:24:15 +0800];%{WORD:verb}|GET;%{NOTSPACE:request}|/favicon.ico;HTTP/%{NUMBER:httpversion}|HTTP/1.1;%{DATA:rawrequest}|無;%{NUMBER:response}|404; %{NUMBER:bytes}|209。?:這些的作用,參考另外一個筆記《知識文檔-正則表達式之 pattern+?、pattern*?、(?!pattern)、(?:pattern)》
下面來使用預定義模式寫一個格式:
1.1.1.1 GET /index.html 30 0.23
%{IP:clientip} %{WORD:method} %{WORD:verb} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}
模式的定義位置並沒有硬性要求,可以寫在patterns文件中,也可以寫在配置文件中。寫在patterns文件中,取個名字,之后在配置文件中調用即可。比如下面這個:
grok {
patterns_dir => [ "/etc/logstash/network_device_log_patterns" ]
match => [
#IOSXE
"message", "%{SYSLOG5424PRI}(<%{NUMBER:seqnum1}>)?(%{NUMBER:seqnum2}:)? (\*|\.)?%{IOSXETIMESTAMP:log_date} (%{TZ:timezone})?: \%%{WORD:facility}-%{INT:severity_level}-%{NETWORKDEVICE_REASON:log_brief}: %{GREEDYDATA:message}"
]
overwrite => [ "message" ]
remove_field => [ "@version", "syslog5424_pri" ]
}
grok插件中的參數具體說明可以參照官網的介紹,
https://www.elastic.co/guide/en/logstash/5.4/plugins-filters-grok.html,了解其作用。
自定義grok的模式:grok的模式是基於正則表達式編寫,其元字符與其他 用到正則表達式的工具awk/sed/grep/pcre差別不大。也可以直接套用其他的pattern來實現自己的功能。
USERNAME [a-zA-Z0-9._-]+
USER %{USERNAME}
output插件:
stdout {}
elasticsearch {}
常用參數:action,hosts,index,cluster,port,protocol,workers(實現output的線程數)
redis {}
常用參數:host(在哪),port(哪個端口),timeout,workers(線程數量),db(放在哪),data_type,batch(一條RPUSH推送多個值)
消息隊列使用發布訂閱機制,里面有引入頻道的概念。對於將數據發布進消息隊列的服務器,我們稱之為消息的生產者producer,對於接收消息隊列推送的消息的服務器,我們稱之為消息的消費者customer,連接producer頻道和customer頻道,並且進行消息的分配的角色,我們稱之為exchanger,exchanger不是單獨的服務器,而是消息隊列內部的角色。customer可以訂閱自己感興趣的頻道,當producer將消息發布進消息隊列的各個頻道中,消息隊列再將各頻道的消息推送至訂閱該頻道的customer手中,這就是發布訂閱機制,也是消息隊列的最基本概念。再來看消息隊列的內部,customer和producer的頻道不同,一個producer可能會產生出多個頻道的消息,而頻道本身是不做分類的,那么在消息隊列內部就會有一個exchanger來進行消息的路由(分配)。exchanger將producer的隊列中的消息分配至不同的customer頻道中去,以便customer頻道將消息推送給customer。