ELK 之二:ElasticSearch 和Logstash高級使用


一:文檔

官方文檔地址:1.x版本和2.x版本

https://www.elastic.co/guide/en/elasticsearch/guide/index.html

硬件要求:

1、內存,官方推薦64G,但是自己使用32G或16G也可以

2、CPU,核心越多越好

3、硬盤,越快越好,不建議使用NAS網絡存儲,官方建議使用RAID 0

4、網絡,千兆或以上

5、JVM,建議使用比較新的版本,可以是openJDK或oracle的Java JDK

6、文件描述符,即可以打開的最大文件數,一定要改大

7、cluster 同一個集群要一樣,集群被的各node name不能相同

8、組播和單播設置

9、JDK 內存設置不要超過32G,可以關閉Swap分區

10、鎖住內存設置

 

動態改變配置:

PUT /_cluster/settings
{
    "persistent" : {
        "discovery.zen.minimum_master_nodes" : 2 
    },
    "transient" : {
        "indices.store.throttle.max_bytes_per_sec" : "50mb" 
    }
}

滾動升級或維護:

1、盡量不寫新的數據,就不產生新的索引

2、關閉自動分片:

PUT /_cluster/settings
{
    "transient" : {
        "cluster.routing.allocation.enable" : "none"
    }
}

3、關閉當前節點:

停止后會在集群當中找其他的幾點並提升為主節點,並將被停止的節點的分片分配給其他節點,並將數據同步到其他節點。

POST /_cluster/nodes/_local/_shutdown

4、執行升級或維護操作

5、重啟節點,會自動加入到集群

6、開啟當前節點的分片功能

注意:即分片同步需要一段時間,需要等待集群狀態轉換為綠色即集群可用狀態。

Shard rebalancing may take some time. Wait until the cluster has returned to status green before continuing.

7、對集群當中的其他節點做2-6步驟的操作

 

數據備份--->快照備份:curl

1、創建備份目錄,節點之間需要共享一個目錄,共享的文件系統需要每個節點都可以訪問,並且每個節點可掛載的路徑要一致

支持的共享系統:

Shared filesystem, such as a NAS
Amazon S3
HDFS (Hadoop Distributed File System)
Azure Cloud

執行掛載目錄的命令:

PUT _snapshot/my_backup 
{
    "type": "fs", 
    "settings": {
        "location": "/mount/backups/my_backup" 
    }
}

2、創建快照:

POST _snapshot/my_backup/ 
{
    "type": "fs",
    "settings": {
        "location": "/mount/backups/my_backup",
        "max_snapshot_bytes_per_sec" : "50mb", 
        "max_restore_bytes_per_sec" : "50mb"
    }
}

 

ELK的工作原理:

使用多播進行機器發現同一個集群內的節點,並匯總各個節點的返回組成一個集群,主節點要讀取各個節點的狀態,在關鍵的時候進行數據的恢復,主節點會堅持各個節點的狀態,並決定每個分片的位置,通過ping的request檢測各失效的節點

 

三:安裝logstash:

官網下載地址:

https://www.elastic.co/downloads/logstash

1、安裝:

tar xvf logstash-1.5.3.zip

mv logstash-1.5.3 /usr/local/logstash

2、測試:

[root@node6 ~]# /usr/local/logstash/bin/logstash -e 'input { stdin{} }  output { stdout{} }'
test    
Logstash startup completed
2016-04-09T18:14:47.891Z node6.a.com test

3、使用ruby進行更詳細的輸出:

需要主機能解析自己的主機名,可以在hosts文件解析:

[root@node6 ~]# /usr/local/logstash/bin/logstash -e 'input { stdin{} }  output { stdout{codec => rubydebug}}'
asd
Logstash startup completed
{
       "message" => "asd",
      "@version" => "1",
    "@timestamp" => "2016-04-09T18:13:51.250Z",
          "host" => "node6.a.com"
}

4、通過logstas將輸出交給elasticsearch:

啟動:

[root@node6 ~]# /usr/local/logstash/bin/logstash -e 'input { stdin{} }  output {  elasticsearch { host => "192.168.10.206" protocol => "http"}  }'
'[DEPRECATED] use `require 'concurrent'` instead of `require 'concurrent_ruby'`
Logstash startup completed

5、通過logstash直接輸出到屏幕:

#配置文件如下:
input { stdin{ } } output { stdout { codec
=> rubydebug } }

測試標准輸出:

[root@elk-server2 conf.d]# /opt/logstash/bin/logstash  -f /etc/logstash/conf.d/03.conf 

       "message" => "{\"@timestamp\":\"2016-05-14T11:24:45+08:00\",\"host\":\"192.168.0.22\",\"clientip\":\"36.104.21.88\",\"size\":650,\"responsetime\":0.000,\"upstreamtime\":\"-\",\"upstreamhost\":\"-\",\"http_host\":\"webapi.weather.com.cn\",\"url\":\"/data/\",\"domain\":\"webapi.weather.com.cn\",\"xff\":\"-\",\"referer\":\"-\",\"status\":\"200\"}",
      "@version" => "1",
    "@timestamp" => "2016-05-14T03:25:04.068Z",
          "host" => "elk-server2"

6、輸入一個hello word! 以進行測試:

[root@node6 ~]# /usr/local/logstash/bin/logstash -e 'input { stdin{} }  output {  elasticsearch { host => "192.168.10.206" protocol => "http"}  }'
'[DEPRECATED] use `require 'concurrent'` instead of `require 'concurrent_ruby'`
Logstash startup completed
hello word!

7、查看集群管理軟件head的狀態:

8、進行基本查詢:

9、配置文件格式:

input {
  file {
    path => "/var/log/messages"
    type => "syslog"
  }

  file {
    path => "/var/log/apache/access.log"
    type => "apache"
  }
}

10、數組類型--->多個文件:

  path => [ "/var/log/messages", "/var/log/*.log" ] #通過*匹配多個文件
  path => "/data/mysql/mysql.log"

11、bool類型:

 ssl_enable => true

12、字節設置:

  my_bytes => "1113"   # 1113 bytes
  my_bytes => "10MiB"  # 10485760 bytes
  my_bytes => "100kib" # 102400 bytes
  my_bytes => "180 mb" # 180000000 bytes

13、codec:

 codec => "json"

14、Hash:

match => {
  "field1" => "value1"
  "field2" => "value2"
  ...
}

15、Number--->數字:

port => 33

16、Path--->密碼:

 my_password => "password"

17、Path---->路徑:

 my_path => "/tmp/logstash"

18、string-->字符串:

 name => "Hello world"

 

四:logstash的input使用語法:

1、input,默認不支持目錄的遞歸,即目錄中還有文件是不支持直接讀取的,但是可以使用*/*進行匹配。

2、exclude--->排除文件,

exclude => "*.gz"

3、sincedb_path,記錄讀取的時候位置,默認是一個隱藏文件

4、sincedb_write_interval,記錄sincedb_path文件的寫間隔,默認是15秒

5、start_position,從這個文件的什么位置開始讀,默認是end,可以改成beginning

6、stat_interval,多久檢測一次此文件的更新狀態

 

五:logstash的output使用及插件:

1、可以輸出到文件、rendis等

2、gzip,是否壓縮,默認為false,壓縮是安裝數據流一點點增量壓縮的

3、message_format,消息的格式

 

六:logstash --> file -->elasticsearch:

通過logstash輸出到文件在輸入到elasticsearch:

1、啟動腳本:

vim /etc/init.d/logstash 

[root@node6 tmp]# cat /etc/init.d/logstash 
#!/bin/sh
# Init script for logstash
# Maintained by Elasticsearch
# Generated by pleaserun.
# Implemented based on LSB Core 3.1:
#   * Sections: 20.2, 20.3
#
### BEGIN INIT INFO
# Provides:          logstash
# Required-Start:    $remote_fs $syslog
# Required-Stop:     $remote_fs $syslog
# Default-Start:     2 3 4 5
# Default-Stop:      0 1 6
# Short-Description:
# Description:        Starts Logstash as a daemon.
### END INIT INFO

PATH=/sbin:/usr/sbin:/bin:/usr/bin:/usr/local/logstash/bin/
export PATH

if [ `id -u` -ne 0 ]; then
   echo "You need root privileges to run this script"
   exit 1
fi

name=logstash
pidfile="/var/run/$name.pid"

export JAVA_HOME=/opt/jdk1.8.0_45
export JRE_HOME=/opt/jdk1.8.0_45/jre
export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar:$JRE_HOME/lib:$CLASSPATH
export PATH=$JAVA_HOME/bin:$PATH


LS_USER=root
LS_GROUP=root
LS_HOME=/usr/local/logstash
LS_HEAP_SIZE="512m"
LS_LOG_DIR=/usr/local/logstash
LS_LOG_FILE="${LS_LOG_DIR}/$name.log"
LS_CONF_DIR=/etc/logstash.conf
LS_OPEN_FILES=16384
LS_NICE=19
LS_OPTS=""


[ -r /etc/default/$name ] && . /etc/default/$name
[ -r /etc/sysconfig/$name ] && . /etc/sysconfig/$name

program=/usr/local/logstash/bin/logstash
args="agent -f ${LS_CONF_DIR} -l ${LS_LOG_FILE} ${LS_OPTS}"

quiet() {
  "$@" > /dev/null 2>&1
  return $?
}

start() {

  LS_JAVA_OPTS="${LS_JAVA_OPTS} -Djava.io.tmpdir=${LS_HOME}"
  HOME=${LS_HOME}
  export PATH HOME LS_HEAP_SIZE LS_JAVA_OPTS LS_USE_GC_LOGGING

  # chown doesn't grab the suplimental groups when setting the user:group - so we have to do it for it.
  # Boy, I hope we're root here.
  SGROUPS=$(id -Gn "$LS_USER" | tr " " "," | sed 's/,$//'; echo '')

  if [ ! -z $SGROUPS ]
  then
    EXTRA_GROUPS="--groups $SGROUPS"
  fi

  # set ulimit as (root, presumably) first, before we drop privileges
  ulimit -n ${LS_OPEN_FILES}

  # Run the program!
  nice -n ${LS_NICE} chroot --userspec $LS_USER:$LS_GROUP $EXTRA_GROUPS / sh -c "
    cd $LS_HOME
    ulimit -n ${LS_OPEN_FILES}
    exec \"$program\" $args
  " > "${LS_LOG_DIR}/$name.stdout" 2> "${LS_LOG_DIR}/$name.err" &

  # Generate the pidfile from here. If we instead made the forked process
  # generate it there will be a race condition between the pidfile writing
  # and a process possibly asking for status.
  echo $! > $pidfile

  echo "$name started."
  return 0
}

stop() {
  # Try a few times to kill TERM the program
  if status ; then
    pid=`cat "$pidfile"`
    echo "Killing $name (pid $pid) with SIGTERM"
    kill -TERM $pid
    # Wait for it to exit.
    for i in 1 2 3 4 5 ; do
      echo "Waiting $name (pid $pid) to die..."
      status || break
      sleep 1
    done
    if status ; then
      if [ "$KILL_ON_STOP_TIMEOUT" -eq 1 ] ; then
        echo "Timeout reached. Killing $name (pid $pid) with SIGKILL. This may result in data loss."
        kill -KILL $pid
        echo "$name killed with SIGKILL."
      else
        echo "$name stop failed; still running."
      fi
    else
      echo "$name stopped."
    fi
  fi
}

status() {
  if [ -f "$pidfile" ] ; then
    pid=`cat "$pidfile"`
    if kill -0 $pid > /dev/null 2> /dev/null ; then
      # process by this pid is running.
      # It may not be our pid, but that's what you get with just pidfiles.
      # TODO(sissel): Check if this process seems to be the same as the one we
      # expect. It'd be nice to use flock here, but flock uses fork, not exec,
      # so it makes it quite awkward to use in this case.
      return 0
    else
      return 2 # program is dead but pid file exists
    fi
  else
    return 3 # program is not running
  fi
}

force_stop() {
  if status ; then
    stop
    status && kill -KILL `cat "$pidfile"`
  fi
}

configtest() {
  # Check if a config file exists
  if [ ! "$(ls -A ${LS_CONF_DIR}/* 2> /dev/null)" ]; then
    echo "There aren't any configuration files in ${LS_CONF_DIR}"
    return 1
  fi

  HOME=${LS_HOME}
  export PATH HOME JAVA_OPTS LS_HEAP_SIZE LS_JAVA_OPTS LS_USE_GC_LOGGING

  test_args="-f ${LS_CONF_DIR} --configtest ${LS_OPTS}"
  $program ${test_args}
  [ $? -eq 0 ] && return 0
  # Program not configured
  return 6
}

case "$1" in
  start)
    status
    code=$?
    if [ $code -eq 0 ]; then
      echo "$name is already running"
    else
      start
      code=$?
    fi
    exit $code
    ;;
  stop) stop ;;
  force-stop) force_stop ;;
  status)
    status
    code=$?
    if [ $code -eq 0 ] ; then
      echo "$name is running"
    else
      echo "$name is not running"
    fi
    exit $code
    ;;
  restart)
    quiet configtest
    RET=$?
    if [ ${RET} -ne 0 ]; then
      echo "Configuration error. Not restarting. Re-run with configtest parameter for details"
      exit ${RET}
    fi
    stop && start
    ;;
  configtest)
    configtest
    exit $?
    ;;
  *)
    echo "Usage: $SCRIPTNAME {start|stop|force-stop|status|restart|configtest}" >&2
    exit 3
  ;;
esac

exit $?

2、設置開機啟動:

chmod  a+x /etc/init.d/logstash
chkconfig  --add logstash

3、編輯配置文件:

[root@node6 tmp]# vim /etc/logstash.conf 

input {
        file {
                path =>  "/var/log/messages"
        }
}


output {
        file {
                path => "/tmp/log-%{+YYYY-MM-dd}messages.gz"
                gzip => true
        }
}

4、測試:

[root@node6 tmp]# echo "Apr 12 14:03:53 HTC-Server2 snmpd[1573]: Connection from UDP: [60.195.252.107]:31001->[192.168.0.116]" >>  /var/log/messages

測試結果:

[root@node6 tmp]# ls  /tmp/
hsperfdata_root  log-2016-04-09.messages.gz

5、把輸出直接傳輸到elasticsearch:

 [root@node6 ~]# vim /etc/logstash.conf

input {
        file {
                path =>  "/var/log/messages"
        }
}


output {
        file {
                path => "/tmp/log-%{+YYYY-MM-dd}.messages.gz"
                gzip => true
        }

        elasticsearch {
                host => ["192.168.10.206"]
                protocol => "http"
                index => "system-message-%{+YYYY.MM.dd}"
        }
}

6、在集群管理平台查看結果:

 

七:將logstash輸出給redis:

 1、基本語法:

db:使用的數據庫,默認為0,也可以使用其他的,但是redis主從不支持其他數據庫同步。
host: redis服務器的地址
key:key的名稱
password:redis服務器的redis連接密碼
port:redis端口,默認6379
data_type:數據類型,支持string和list,我們使用list

2、重啟logstash

3、向logstash監控的文件寫入數據

[root@node6 yum.repos.d]# echo "Apr 12 14:03:53 HTC-Server2 snmpd[1573]: Connection from UDP: [60.195.252.107]:31001->[192.168.0.116]" >>  /var/log/messages
[root@node6 yum.repos.d]# echo "Apr 12 14:03:53 HTC-Server2 snmpd[1573]: Connection from UDP: [60.195.252.107]:31001->[192.168.0.116]" >>  /var/log/messages
[root@node6 yum.repos.d]# echo "Apr 12 14:03:53 HTC-Server2 snmpd[1573]: Connection from UDP: [60.195.252.107]:31001->[192.168.0.116]" >>  /var/log/messages
[root@node6 yum.repos.d]# echo "Apr 12 14:03:53 HTC-Server2 snmpd[1573]: Connection from UDP: [60.195.252.107]:31001->[192.168.0.116]" >>  /var/log/messages
[root@node6 yum.repos.d]# echo "Apr 12 14:03:53 HTC-Server2 snmpd[1573]: Connection from UDP: [60.195.252.107]:31001->[192.168.0.116]" >>  /var/log/messages
[root@node6 yum.repos.d]# echo "Apr 12 14:03:53 HTC-Server2 snmpd[1573]: Connection from UDP: [60.195.252.107]:31001->[192.168.0.116]" >>  /var/log/messages

4、到redis查看結果:

[root@node5 ~]# redis-cli 
127.0.0.1:6379> KEYS *
1) "system-message-jack" #已經生成數據
127.0.0.1:6379> LLEN  system-message-jack #查看key的長度
(integer) 681
127.0.0.1:6379> LINDEX system-message-jack -1  #查看最后一行數據
"{\"message\":\"Apr 12 14:03:53 HTC-Server2 snmpd[1573]: Connection from UDP: [60.195.252.107]:31001->[192.168.0.116]\",\"@version\":\"1\",\"@timestamp\":\"2016-04-12T08:37:51.025Z\",\"host\":\"node6.a.com\",\"path\":\"/var/log/messages\"}"

5、在重新找一台機器安裝logstash,步驟參考之前的步驟:

6、另外一台logstash的配置文件:

input {   #讀取redis的數據
    redis {
        data_type => "list"
        key => "system-message-jack"
        host => "192.168.10.205"
        port => "6379"
        db => "0"
    }
}


output {  #將讀取到的reids的數據寫入到elasticsearch
    elasticsearch {
        host => ["192.168.10.206"]
        protocol => "http"
        index => "redis-message-%{+YYYY.MM.dd}"
    }
}

7、向message文件寫入數據,寫入的數據會讀取到redis,reids的數據則會被傳輸給

[root@node6 yum.repos.d]# echo "Apr 12 14:03:53 HTC-Server2 snmpd[1573]: Connection from UDP: [60.195.252.107]:31001->[192.168.0.116]" >>  /var/log/messages
[root@node6 yum.repos.d]# echo "Apr 12 14:03:53 HTC-Server2 snmpd[1573]: Connection from UDP: [60.195.252.107]:31001->[192.168.0.116]" >>  /var/log/messages
[root@node6 yum.repos.d]# echo "Apr 12 14:03:53 HTC-Server2 snmpd[1573]: Connection from UDP: [60.195.252.107]:31001->[192.168.0.116]" >>  /var/log/messages
[root@node6 yum.repos.d]# echo "Apr 12 14:03:53 HTC-Server2 snmpd[1573]: Connection from UDP: [60.195.252.107]:31001->[192.168.0.116]" >>  /var/log/messages

8、在集群管理平台查看:

9、 查看索引:

 八:分析的日志類型:

1、系統日志:/var/log下的所有的內容,google每一個文件的內容
2、通過ELS分析某一個訪問記錄
3、錯誤日志,收集后反饋給開發
4、系統運行日志
5、其他類型的日志

九:日志的字段划分:

1、grok模塊:通過正則表達式,比較復雜,而且當數據大的時候會占用CPU

2、json,簡單易用

3、將nginx的日志設置為json模式:

安裝nginx:可以編譯或yum安裝,省略

4、日志配置部分:

http {
    include       /etc/nginx/mime.types;
    default_type  application/octet-stream;

   # log_format  main  '$remote_addr - $remote_user [$time_local] "$request" '
   #                   '$status $body_bytes_sent "$http_referer" '
   #                   '"$http_user_agent" "$http_x_forwarded_for"';

    #access_log  /var/log/nginx/access.log  main;
    
    log_format logstash_json '{"@timestamp":"$time_iso8601",' #定義日志格式logstash_json
        '"host":"$server_addr",'
        '"clientip":"$remote_addr",'
        '"size":$body_bytes_sent,'
        '"responsetime":$request_time,'
        '"upstreamtime":"$upstream_response_time",'
        '"upstreamhost":"$upstream_addr",'
        '"http_host":"$host",'
        '"url":"$uri",'
        '"domain":"$host",'
        '"xff":"$http_x_forwarded_for",'
        '"referer":"$http_referer",'
        '"agent":"$http_user_agent",'
        '"status":"$status"}';


    sendfile        on; 

serevr配置:
#
server {
    listen       9009;
    server_name  localhost;

    #charset koi8-r;
  
    access_log  /var/log/nginx/json.access.log  logstash_json;  #日志文件保存路徑及使用上面定義的日志格式logstash_json
    # Load configuration files for the default server block.
    include /etc/nginx/default.d/*.conf;

    location / { 
        root   /usr/share/nginx/html;
        index  index.html index.htm;
    }   

    error_page  404              /404.html;
    location = /404.html {
        root   /usr/share/nginx/html;
    }   

    # redirect server error pages to the static page /50x.html
    #
    error_page   500 502 503 504  /50x.html;
    location = /50x.html {
        root   /usr/share/nginx/html;
    }  

5、使用ab訪問nginx測試日志:

[root@node5 nginx]# ab -n1000 -c10 http://192.168.10.205:9009/  #一共1000個請求,每次並發10個,即100次請求完成

6、查看日志是否有內容:

[root@node5 nginx]# tail /var/log/nginx/json.access.log 
{"@timestamp":"2016-04-12T18:21:31+08:00","host":"192.168.10.205","clientip":"192.168.10.205","size":3698,"responsetime":0.000,"upstreamtime":"-","upstreamhost":"-","http_host":"192.168.10.205","url":"/index.html","domain":"192.168.10.205","xff":"-","referer":"-","agent":"ApacheBench/2.3","status":"200"}
{"@timestamp":"2016-04-12T18:21:31+08:00","host":"192.168.10.205","clientip":"192.168.10.205","size":3698,"responsetime":0.000,"upstreamtime":"-","upstreamhost":"-","http_host":"192.168.10.205","url":"/index.html","domain":"192.168.10.205","xff":"-","referer":"-","agent":"ApacheBench/2.3","status":"200"}
{"@timestamp":"2016-04-12T18:21:31+08:00","host":"192.168.10.205","clientip":"192.168.10.205","size":3698,"responsetime":0.000,"upstreamtime":"-","upstreamhost":"-","http_host":"192.168.10.205","url":"/index.html","domain":"192.168.10.205","xff":"-","referer":"-","agent":"ApacheBench/2.3","status":"200"}
{"@timestamp":"2016-04-12T18:21:31+08:00","host":"192.168.10.205","clientip":"192.168.10.205","size":3698,"responsetime":0.000,"upstreamtime":"-","upstreamhost":"-","http_host":"192.168.10.205","url":"/index.html","domain":"192.168.10.205","xff":"-","referer":"-","agent":"ApacheBench/2.3","status":"200"}
{"@timestamp":"2016-04-12T18:21:31+08:00","host":"192.168.10.205","clientip":"192.168.10.205","size":3698,"responsetime":0.001,"upstreamtime":"-","upstreamhost":"-","http_host":"192.168.10.205","url":"/index.html","domain":"192.168.10.205","xff":"-","referer":"-","agent":"ApacheBench/2.3","status":"200"}
{"@timestamp":"2016-04-12T18:21:31+08:00","host":"192.168.10.205","clientip":"192.168.10.205","size":3698,"responsetime":0.000,"upstreamtime":"-","upstreamhost":"-","http_host":"192.168.10.205","url":"/index.html","domain":"192.168.10.205","xff":"-","referer":"-","agent":"ApacheBench/2.3","status":"200"}
{"@timestamp":"2016-04-12T18:21:31+08:00","host":"192.168.10.205","clientip":"192.168.10.205","size":3698,"responsetime":0.000,"upstreamtime":"-","upstreamhost":"-","http_host":"192.168.10.205","url":"/index.html","domain":"192.168.10.205","xff":"-","referer":"-","agent":"ApacheBench/2.3","status":"200"}
{"@timestamp":"2016-04-12T18:21:31+08:00","host":"192.168.10.205","clientip":"192.168.10.205","size":3698,"responsetime":0.000,"upstreamtime":"-","upstreamhost":"-","http_host":"192.168.10.205","url":"/index.html","domain":"192.168.10.205","xff":"-","referer":"-","agent":"ApacheBench/2.3","status":"200"}
{"@timestamp":"2016-04-12T18:21:31+08:00","host":"192.168.10.205","clientip":"192.168.10.205","size":3698,"responsetime":0.000,"upstreamtime":"-","upstreamhost":"-","http_host":"192.168.10.205","url":"/index.html","domain":"192.168.10.205","xff":"-","referer":"-","agent":"ApacheBench/2.3","status":"200"}
{"@timestamp":"2016-04-12T18:21:31+08:00","host":"192.168.10.205","clientip":"192.168.10.205","size":3698,"responsetime":0.000,"upstreamtime":"-","upstreamhost":"-","http_host":"192.168.10.205","url":"/index.html","domain":"192.168.10.205","xff":"-","referer":"-","agent":"ApacheBench/2.3","status":"200"}

 7、配置logstash手機nginx的json日志並寫入到redis:

[root@node5 logstash]# cat /etc/logstash.conf 
input {
#    redis {
#        data_type => "list"
#        key => "system-message-jack"
#        host => "192.168.10.205"
#        port => "6379"
#        db => "0"
#    }

    file {
        path => "/var/log/nginx/json.access.log" #讀取指定的json格式的日志
        codec => "json" #指定json格式
    }

}


output {
#    elasticsearch {
#        host => ["192.168.10.206"]
#        protocol => "http"
#        index => "redis-message-%{+YYYY.MM.dd}"
#    }
    redis {
        data_type => "list"
        key => "nginx-json-log" #nginx的json格式日志的key名稱
        host => "192.168.10.205" #寫入到redis服務器
        port => "6379"
        db => "1" #使用redis的數據庫1
    }
}

8、重啟logstash服務,並使用ab重新訪問web地址,以產生新的日志寫入redis:

[root@node5 nginx]# ab -n1000 -c10 http://192.168.10.205:9009/

9、在redis查詢是否有當前key的日志:

[root@node5 nginx]# redis-cli 
127.0.0.1:6379> SELECT 1
OK
127.0.0.1:6379[1]> KEYS *
1) "nginx-json-log"  #已有日志
127.0.0.1:6379[1]> LLEN nginx-json-log  #日志長度
(integer) 1000

 10、配置logstash當前redis日志並輸出至elasticsearch:

[root@node5 nginx]# grep "#" -v  /etc/logstash.conf 
input {
    redis {
        data_type => "list"
        key => "nginx-json-log"
        host => "192.168.10.205"
        port => "6379"
        db => "1"
    }
}

output {
    elasticsearch {
        host => ["192.168.10.206"]
        protocol => "http"
        index => "nginx-json-log-%{+YYYY.MM.dd}"
    }
}

11、重啟logstash並使用ab批量訪問web,再查看elasticsearch集群管理界面是否有nginx的json定義的nginx-json-log的key出現:

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM