ELK的工作原理:
使用多播進行機器發現同一個集群內的節點,並匯總各個節點的返回組成一個集群,主節點要讀取各個節點的狀態,在關鍵時候進行數據的恢復,主節點會堅持各個節點的狀態,並決定每個分片的位置,通過ping的request檢測各失效的節點.
ELK架構:
ElasticSearch:用於存儲、索引日志.
Logstash:用於收集、處理和轉發事件或日志信息的工具.
Kibana:搜索和可視化的日志的WEB界面.
ELK優點:
a.處理方式靈活:ElasticSearch是實時全文索引.
b.配置簡單易上手.
c.檢索性能高效:雖然每次計算都是實時計算的,但是優秀的設計基本可以達到全天數據查詢的秒級響應.
d.集群線性擴展:ElasticSearch和Logstash集群都是可以線性擴展的.
e.前端操作絢麗:Kibana界面上,只需要點擊鼠標,就可以完成搜索、聚合功能,生成絢麗的儀表板.
0.安裝前准備:
ElasticSearch和Logstash需要java環境,需要安裝JDK1.7以上的版本.
a.下載JDK的rpm包
b.安裝
c.Java -version :檢測安裝的JDK
Elasticsearch:
概念:
1.索引:數據會放在多個索引中,索引可以理解為database,索引里面存放的基本單位是文檔,elasticsearch會把索引分片,便於橫向擴展,分別可以做備份,多個分片讀比較快,備份分片在主的掛掉之后可以自動將自己提升為主分片(實現橫向擴展和冗余)
2.文檔類型:和redis一樣,key是有類型的
3.節點:一個elasticsearch的實例是一個節點
4.集群:多節點的集合組成集群,類似於zookeeper會選舉出主節點,客戶端不需要關注主節點,連接任何一個都可以,數據會自動同步.
安裝Elasticsearch
a.wget https://download.elastic.co/elasticsearch/release/org/elasticsearch/distribution/rpm/elasticsearch/2.3.5/elasticsearch-2.3.5.rpm
b.rpm -ivh elasticsearch-2.3.5.rpm
c.mkdir /opt/develop/elasticsearch/data -p
mkdir /opt/develop/elasticsearch/log -p
d.# vi /usr/share/elasticsearch/config/elasticsearch.yml
Cluster.name:my-application --集群的名稱,名稱相同就是一個集群
Node.name:node-1 --集群情況下,當前node的名字,每個node應該不一樣
Path.data=/opt/develop/elasticsearch/data
Path.log=/opt/develop/elasticsearch/log
Network.host=xxx.xxx.xx.xx
http.port:9200 --客戶端訪問端口
node.max_local_storage_nodes: 1
e.ElasticSearch需要使用非root用戶啟動服務
Groupadd ela
Useradd ela -g ela -p xxx
Su – ela
執行安裝路徑下的/elasticsearch啟動服務
f.curl -X GET http://localhost:9200/ 查看ElasticSearch的安裝信息----啟動成功
g.chkconfig –add elasticsearch
Elasticsearch集群:
1.基於http的restful API:以jsop返回查詢結果:
$curl -XGET http://10.26.44.42:9200/_count?pretty -d '
{
"query":{
"match_all":{}
}
}
'
{
"count" : 308590265,
"_shards" : {
"total" : 4180,
"successful" : 4180,
"failed" : 0
}
}
安裝Logstash
a.wget https://download.elastic.co/logstash/logstash/packages/centos/logstash-2.3.4-1.noarch.rpm
b.安裝
c.啟動服務
d.測試:cd /opt/logstash/bin
./logstash -e ‘input { stdin {} } output { stdout {} }’
e.使用ruby進行更詳細的輸出:
./logstash -e 'input { stdin {}} output { stdout{codec => rubydebug}}'
Settings: Default pipeline workers: 8
Pipeline main started
asd
{
"message" => "asd",
"@version" => "1",
"@timestamp" => "2017-02-13T08:39:56.079Z",
"host" => "ali-hk-ops-elk1"
}
f.通過logstash將輸出交給elasticsearch:
./logstash -e ‘input { stdin{} } output { elasticsearch { host => “ali-hk-ops-elk1:9200”protocol => “http”} }’
g.配置文件格式:
input {
file {
path => “/var/log/messages”
type => “syslog”
}
file {
path => “/var/log/apache/access.log”
type => “apache”
}
}
Logstash的input使用語法:
1.input,默認不支持目錄的遞歸,即目錄中還有文件是不支持直接讀取的,但是可以使用/進行匹配
2.Exclude---->排除文件
Exclude => “*.gz”
3.sincedb_path,記錄讀取的時候位置,默認是一個隱藏文件
4.Sincedb_write_interval,記錄sincedb_path文件的寫間隔,默認是15秒
5.Start_position,從這個文件的什么位置開始讀,默認是end,可以改成beginning
6.start_interval,多久檢測一次此文件的更新狀態
logstash的output使用及插件:
1.可以輸出到文件、redis等
2.gzip,是否壓縮,默認為false,壓縮是安裝數據流一點點增量壓縮的
3.Message_format,消息的格式
Logstash-->file-->elasticsearch:
通過logstash輸出到文件再輸出到elasticsearch;
1.啟動腳本:
Vim /etc/init.d/logstash
-#!/bin/sh
-# Init script for logstash
-# Maintained by Elasticsearch
-# Generated by pleaserun.
-# Implemented based on LSB Core 3.1:
-# * Sections: 20.2, 20.3
-#
-### BEGIN INIT INFO
-# Provides: logstash
-# Required-Start: $remote_fs $syslog
-# Required-Stop: $remote_fs $syslog
-# Default-Start: 2 3 4 5
-# Default-Stop: 0 1 6
-# Short-Description:
-# Description: Starts Logstash as a daemon.
-### END INIT INFO
PATH=/sbin:/usr/sbin:/bin:/usr/bin
export PATH
if [ id -u
-ne 0 ]; then
echo "You need root privileges to run this script"
exit 1
fi
name=logstash
pidfile="/var/run/$name.pid"
LS_USER=logstash
LS_GROUP=logstash
LS_HOME=/var/lib/logstash
LS_HEAP_SIZE="4g"
LS_LOG_DIR=/var/log/logstash
LS_LOG_FILE="${LS_LOG_DIR}/$name.log"
LS_CONF_DIR=/etc/logstash/conf.d
LS_OPEN_FILES=16384
LS_NICE=19
KILL_ON_STOP_TIMEOUT=${KILL_ON_STOP_TIMEOUT-0} #default value is zero to this variable but could be updated by user request
LS_OPTS=""
[ -r /etc/default/$name ] && . /etc/default/$name
[ -r /etc/sysconfig/$name ] && . /etc/sysconfig/$name
program=/opt/logstash/bin/logstash
args="agent -f ${LS_CONF_DIR} -l ${LS_LOG_FILE} ${LS_OPTS}"
quiet() {
"$@" > /dev/null 2>&1
return $?
}
start() {
LS_JAVA_OPTS="${LS_JAVA_OPTS} -Djava.io.tmpdir=${LS_HOME}"
HOME=${LS_HOME}
export PATH HOME LS_HEAP_SIZE LS_JAVA_OPTS LS_USE_GC_LOGGING LS_GC_LOG_FILE
-# chown doesn't grab the suplimental groups when setting the user:group - so we have to do it for it.
-# Boy, I hope we're root here.
SGROUPS=$(id -Gn "$LS_USER" | tr " " "," | sed 's/,$//'; echo '')
if [ ! -z $SGROUPS ]
then
EXTRA_GROUPS="--groups $SGROUPS"
fi
-# set ulimit as (root, presumably) first, before we drop privileges
ulimit -n ${LS_OPEN_FILES}
-# Run the program!
nice -n ${LS_NICE} chroot --userspec $LS_USER:$LS_GROUP $EXTRA_GROUPS / sh -c "
cd $LS_HOME
ulimit -n ${LS_OPEN_FILES}
exec "$program" $args
" > "${LS_LOG_DIR}/$name.stdout" 2> "${LS_LOG_DIR}/$name.err" &
-# Generate the pidfile from here. If we instead made the forked process
-# generate it there will be a race condition between the pidfile writing
-# and a process possibly asking for status.
echo $! > $pidfile
echo "$name started."
return 0
}
stop() {
-# Try a few times to kill TERM the program
if status ; then
pid=cat "$pidfile"
echo "Killing $name (pid $pid) with SIGTERM"
kill -TERM $pid
-# Wait for it to exit.
for i in 1 2 3 4 5 6 7 8 9 ; do
echo "Waiting $name (pid $pid) to die..."
status || break
sleep 1
done
if status ; then
if [ $KILL_ON_STOP_TIMEOUT -eq 1 ] ; then
echo "Timeout reached. Killing $name (pid $pid) with SIGKILL. This may result in data loss."
kill -KILL $pid
echo "$name killed with SIGKILL."
else
echo "$name stop failed; still running."
return 1 # stop timed out and not forced
fi
else
echo "$name stopped."
fi
fi
}
status() {
if [ -f "$pidfile" ] ; then
pid=cat "$pidfile"
if kill -0 $pid > /dev/null 2> /dev/null ; then
-# process by this pid is running.
-# It may not be our pid, but that's what you get with just pidfiles.
-# TODO(sissel): Check if this process seems to be the same as the one we
-# expect. It'd be nice to use flock here, but flock uses fork, not exec,
-# so it makes it quite awkward to use in this case.
return 0
else
return 2 # program is dead but pid file exists
fi
else
return 3 # program is not running
fi
}
reload() {
if status ; then
kill -HUP cat "$pidfile"
fi
}
force_stop() {
if status ; then
stop
status && kill -KILL cat "$pidfile"
fi
}
configtest() {
-# Check if a config file exists
if [ ! "$(ls -A ${LS_CONF_DIR}/* 2> /dev/null)" ]; then
echo "There aren't any configuration files in ${LS_CONF_DIR}"
return 1
fi
HOME=${LS_HOME}
export PATH HOME
test_args="--configtest -f ${LS_CONF_DIR} ${LS_OPTS}"
$program ${test_args}
[ $? -eq 0 ] && return 0
-# Program not configured
return 6
}
case "$1" in
start)
status
code=$?
if [ $code -eq 0 ]; then
echo "$name is already running"
else
start
code=$?
fi
exit $code
;;
stop) stop ;;
force-stop) force_stop ;;
status)
status
code=$?
if [ $code -eq 0 ] ; then
echo "$name is running"
else
echo "$name is not running"
fi
exit $code
;;
reload) reload ;;
restart)
quiet configtest
RET=$?
if [ ${RET} -ne 0 ]; then
echo "Configuration error. Not restarting. Re-run with configtest parameter for details"
exit ${RET}
fi
stop && start
;;
configtest)
configtest
exit $?
;;
*)
echo "Usage: $SCRIPTNAME {start|stop|force-stop|status|reload|restart|configtest}" >&2
exit 3
;;
esac
exit $?
分析的日志類型:
1.系統日志:/var/log下的所有的內容,google每一個文件的內容
2.通過elasticsearch分析某一個訪問記錄
3.錯誤日志,收集后反饋給開發
4.系統運行日志
5.其他類型的日志
日志的字段划分:
1.gork模塊:通過正則表達式,比較復雜,而且當數據大的時候會占用CPU
2.Json,簡單易用
3.將nginx的日志設置為json模式
安裝kibana
a.wget https://download.elastic.co/kibana/kibana/kibana-4.5.4-1.x86_64.rpm
b.安裝
c.vi /opt/kibana/config/kibana.yml
server.port:5601
server.host:’0.0.0.0’
elasticsearch.url:’http://xxx.xxx.xx.xx:9200’
d.service kibana start
e.chkconfig –add kibana
f.訪問網頁:http://localhost:5601
常用模塊:
1.系統日志收集--->syslog:配置syslog結果寫入到elasticsearch,指定端口514,主機就是要收集日志的服務器IP地址
2.訪問日志:nginx轉換成json格式
3.錯誤日志:使用codec插件:
Input {
Stdin {
Codec =>multiline {
Pattern => “^\s”
Negate => “false”
What => “previous”
}
}
}
Pattern:使用正則表達式匹配文件.
Negate的默認值為false,當設置為true的時候,不匹配pattern的信息會繼續執行what的內容.
What:值為previous或next:將匹配到的信息合並到前一行還是下一行.
4.運行日志codec =>json,如果不是json要使用gork進行匹配
在地圖顯示IP的訪問次數統計:
1.在easticsearch服務器用戶家目錄下載一個filebeat:
2.加載模板:
$curl -XPUT 'http://10.26.44.42:9200/_template/filebeat?pretty' -d@/etc/filebeat/filebeat.template.json
$curl -XPUT 'http://10.26.44.42:9200/_template/filebeat?pretty' -d@/etc/filebeat/filebeat.template-es2x.json
$curl -XPUT 'http://10.26.44.42:9200/_template/filebeat?pretty' -d@/root/filebeat.template.json
3.下載GeoIP數據庫文件:
$cd /opt/logstash
$curl -o “http://geolite.maxmind.com/download/geoip/database/GeoLiteCity.dat.gz”
$gunzip GeoLiteCity.dat.gz
4.配置logstash使用GeoIP:
input {
redis {
data_type => "list"
key => "mobile-tomcat-access-log"
host => "192.168.0.251"
port => "6379"
db => "0"
codec => "json"
}
}
--#input部分為從redis讀取客戶端logstash分析提交后的訪問日志
filter {
if [type] == "mobile-tomcat" {
geoip {
source => "client" --client 是客戶端logstash收集日志時定義的公網IP的key名稱,一定要和實際名稱一致,因為要通過此名稱獲取到其對於的ip地址
target => "geoip"
database => "/etc/logstash/GeoLiteCity.dat"
add_field => [ "[geoip][coordinates]", "%{[geoip][longitude]}" ]
add_field => [ "[geoip][coordinates]", "%{[geoip][latitude]}" ]
}
mutate {
convert => [ "[geoip][coordinates]", "float"]
}
}
}
output {
if [type] == "mobile-tomcat" {
elasticsearch {
hosts => ["192.168.0.251"]
manage_template => true
index => "logstash-mobile-tomcat-access-log-%{+YYYY.MM.dd}" --index的名稱一定要是logstash開頭的,否則會在使用地圖的時候出現geoIP type無法找找到的類似錯誤
flush_size => 2000
idle_flush_time => 10
}
}
}
5.在kibana界面添加新的索引:
visualize---->Tile map---->From a new search---->Select a index patterm--->選擇之前的index---->Geo coordinates
【參考文檔:】
1.https://www.elastic.co/guide/index.html
2.http://www.ttlsa.com/elk/howto-install-elasticsearch-logstash-and-kibana-elk-stack/
3.https://www.elastic.co/guide/en/logstash/current/plugins-inputs-log4j.html
4.http://blog.chinaunix.net/xmlrpc.php?r=blog/article&uid=21142030&id=5671032
6.http://517sou.net/archives/centos下使用elk套件搭建日志分析和監控平台/
問題:
1.重新啟動elasticsearch后,報錯:Elasticsearch is still initializing the kibana index.
解決:curl -XDELETE http://localhost:9200/.kibana
---上述方法會丟失所有的kibana配置,索引、圖、儀表板,如果只是區分索引,可使用以下方法:
curl -s http://localhost:9200/.kibana/_recovery?pretty
curl -XPUT 'localhost:9200/.kibana/_settings' -d '
{
"index" : {
"number_of_replicas" : 0
}
}'
修改后還有報錯的話,重啟kibana.
哈哈!忘記重啟elasticsearch,導致頁面索引丟失,沒有數據.
添加索引模板:
$curl -XPUT 'http://10.26.44.42:9200/_template/filebeat?pretty' -d@/root/filebeat.template.json
模板文件:
Vim /root/fillebeat.template.json
{
"mappings": {
"default": {
"_all": {
"enabled": true,
"norms": {
"enabled": false
}
},
"dynamic_templates": [
{
"template1": {
"mapping": {
"doc_values": true,
"ignore_above": 1024,
"index": "not_analyzed",
"type": "{dynamic_type}"
},
"match": ""
}
}
],
"properties": {
"geoip": {
"properties" : {
"location": {
"type": "geo_point"
},
"ip": { "type": "ip" },
"coordinates": { "type": "geo_point" }
}},
"@timestamp": {
"type": "date"
},
"message": {
"type": "string",
"index": "analyzed"
},
"offset": {
"type": "long",
"doc_values": "true"
}
}
}
},
"settings": {
"index.refresh_interval": "5s"
},
"template": "filebeat-"
}
查看集群的狀態:
$ curl -XGET 'http://10.26.44.42:9200/_cluster/health?pretty=true'
{
"cluster_name" : "elks",
"status" : "red",
"timed_out" : false,
"number_of_nodes" : 3,
"number_of_data_nodes" : 3,
"active_primary_shards" : 5269,
"active_shards" : 6812,
"relocating_shards" : 0,
"initializing_shards" : 6,
"unassigned_shards" : 4151,
"delayed_unassigned_shards" : 0,
"number_of_pending_tasks" : 5136,
"number_of_in_flight_fetch" : 0,
"task_max_waiting_in_queue_millis" : 4711822,
"active_shards_percent_as_number" : 62.10228826693409
}
查看unassigned_shards:
$curl -s 'http://10.26.44.42:9200/_cat/shards' | grep UNASSIGNED | awk '{print $1}' | sort | uniq
elk集群存在問題:單節點刪除過索引
將unassigned_shards刪除后,重啟elasticsearch,服務狀態正常.
將unassigned_shards清除:
curl -XPUT 'localhost:9200/_all/_settings?pretty' -H 'Content-Type: application/json' -d'
{
"settings": {
"index.unassigned.node_left.delayed_timeout": "0"
}
}
'