一、 環境准備:
3台Linux服務器,系統為CentOS 7.5
角色划分:
3台機器全部安裝jdk1.8,全部安裝elasticsearch (后續都簡稱為es集群)
主節點上需要安裝kibana與logstash
ELK版本信息為7.1.1,你可以從官網下,也可以直接從下面地址下載:
https://artifacts.elastic.co/downloads/logstash/logstash-7.1.1.tar.gz https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-7.1.1-linux-x86_64.tar.gz https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-7.1.1-linux-x86_64.tar.gz https://artifacts.elastic.co/downloads/kibana/kibana-7.1.1-linux-x86_64.tar.gz
關閉防火牆(如果因為其他原因不能關閉防火牆,也請不要禁止80端口):systemctl stop firewalld.service
禁止防火牆自動啟動:systemctl disable firewalld.service
關閉selinux : setenforce 0
禁止selinux啟動:vim /etc/selinux/config
SELINUX=disabled
打開文件vim /etc/security/limits.conf,添加下面四行內容:
* soft nofile 65536 * hard nofile 131072 * soft nproc 2048 * hard nproc 4096
打開文件vim /etc/sysctl.conf,添加下面一行內容:
vm.max_map_count=655360
加載sysctl配置,執行命令:sysctl -p
各節點配置hosts解析,內容如下:
10.0.0.11 node01 10.0.0.12 node02 10.0.0.13 node03
二、 安裝java環境
安裝包版本:jdk-8u25-linux-x64.tar.gz
tar xf jdk-8u25-linux-x64.tar.gz -C /opt/ cd /opt/ ln -s jdk1.8.0_211 jdk
添加環境變量
sed -i.ori '$a export JAVA_HOME=/opt/jdk\nexport PATH=$JAVA_HOME/bin:$JAVA_HOME/jre/bin:$PATH\nexport CLASSPATH=.:$JAVA_HOME/lib:$JAVA_HOME/jre/lib:$JAVA_HOME/lib/tools.jar' /etc/profile
加載環境變量
source /etc/profile
查看版本
java -version
三、 安裝Elasticsearch(簡稱ES)集群
tar xf elasticsearch-7.1.1-linux-x86_64.tar.gz -C /usr/local/src/ ln -s /usr/local/src/elasticsearch-7.1.1 /usr/local/elasticsearch
ElasticSerach要求以非root身份啟動
在每個節點創建用戶及用戶組
groupadd elasticsearch useradd elasticsearch -g elasticsearch
在每個節點上創建數據data和logs目錄:
mkdir -p /data/elasticsearch/{data,logs} chown -R elasticsearch. /data/elasticsearch/ chown -R elasticsearch. /usr/local/elasticsearch/
修改elasticsearch.yml配置文件
vim /usr/local/elasticsearch/config/elasticsearch.yml
path.data: /data/elasticsearch/data #數據 path.logs: /data/elasticsearch/logs #日志 cluster.name: elasticsearch # 集群中的名稱 cluster.initial_master_nodes: ["node01","node02","node03"] #主節點 node.name: node01 # 該節點名稱,與前面配置hosts保持一致 node.master: true # 意思是該節點是否可選舉為主節點 node.data: true # 表示這不是數據節點 network.host: 0.0.0.0 # 監聽全部ip,在實際環境中應為一個安全的ip http.port: 9200 # es服務的端口號 http.cors.enabled: true http.cors.allow-origin: "*" discovery.zen.ping.unicast.hosts: ["node01", "node02", "node03"] # 配置自動發現 discovery.zen.minimum_master_nodes: 2 #防止集群“腦裂”,需要配置集群最少主節點數目,通常為 (主節點數目/2) + 1
elasticsearch是非常消耗cpu,在實際生產中,需要將初始申請的JVM內存調高,默認是1G
vim /usr/local/elasticsearch/config/jvm.options
#修改這兩行 -Xms8g #設置最小堆的值為8g -Xmx8g #設置組大堆的值為8g
在配置好相應的節點后,首先啟動主節點,然后在啟動相應節點
su - elasticsearch cd /usr/local/elasticsearch/bin/ ./elasticsearch -d
稍等會兒,通過查看段口,看到9200與9300端口號,表示服務已經啟動。
ss -lntup
查看集群的健康信息:
curl 'localhost:9200/_cluster/health?pretty'
查看集群的詳細信息:
curl 'localhost:9200/_cluster/state?pretty'
查詢索引列表:
curl -XGET http://localhost:9200/_cat/indices?v
創建索引:
curl -XPUT http://localhost:9200/customer?pretty
查詢索引:
curl -XGET http://localhost:9200/customer/external/1?pretty
刪除索引:
curl -XDELETE http://localhost:9200/customer?pretty
刪除指定索引:
curl -XDELETE node01:9200/system-log-2019.06
刪除多個索引:
curl -XDELETE node01:9200/system-log-2019.0606,system-log-2019.0607
刪除所有索引:
curl -XDELETE node01:9200/_all
在刪除數據時,通常不建議使用通配符,誤刪后果會很嚴重,所有的index都可能被刪除,為了安全起見需要禁止通配符,可以在elasticsearch.yml配置文件中設置禁用_all和*通配符
action.destructive_requires_name: true
安裝集群head插件,可以參照ELK7.1.1之插件安裝,方便查看與管理集群。
四、安裝logstash
在主節點上進行部署
tar xf logstash-7.1.1.tar.gz -C /usr/local/src/ ln -s /usr/local/src/logstash-7.1.1 /usr/local/logstash mkdir -p /data/logstash/{logs,data}
修改logstash配置
vim /usr/local/logstash/config/logstash.yml
#pipeline 線程數(CPU內核數或幾倍cpu內核數)
pipeline.workers: 15
# 實際output 時的線程數
pipeline.output.workers: 15
#每次發送的事件數
pipeline.batch.size: 5000
#批處理的最大等待值
pipeline.batch.delay: 10
http.host: "node01"
path.data: /data/logstash/data
path.logs: /data/logstash/logs
xpack.monitoring.enabled: true #kibana監控插件中啟動監控logstash
xpack.monitoring.elasticsearch.hosts: ["node01:9200","node02:9200","node03:9200"]
添加logstash收集日志配置,詳細配置可以參照logstash配置文件詳解
vim /usr/local/logstash/logstash.conf
input { beats { port => "5044" } } filter { grok { match => ["message", "\[?%{TIMESTAMP_ISO8601:timestamp}\]? \[?%{LOGLEVEL:level}\]? \[?%{JAVACLASS:class}\]? - %{JAVALOGMESSAGE:logmessage}"] remove_field => ["input"] remove_field => ["message"] } } #多輸出分片,數據量大,不能只弄一個分片(默認情況下一個分片索引速率在4000/s) output{ if "sunline_error" in [tags] { elasticsearch { hosts => ["node01:9200","node02:9200","node03:9200"] index => "sunline_error-%{+YYYY.MM.dd}" } } if "sunline_info" in [tags] { elasticsearch { hosts => ["node01:9200","node02:9200","node03:9200"] index => "sunline_info-%{+YYYY.MM.dd}" } } }
grok大約有120種模式,你可以在這找到https://github.com/logstash-plugins/logstash-patterns-core/tree/master/patterns
啟動服務
cd /usr/local/logstash nohup bin/logstash -f logstash.conf -r &
命令說明:
-f 指定配置文件
-r 重載
--path.settings 用於指定logstash的配置文件所在的目錄
-t 指定檢測完之后就退出
logstash 高級用法pipelines
詳細見:https://www.cnblogs.com/wzxmt/p/13083281.html
五、安裝kibana
tar xf kibana-7.1.1-linux-x86_64.tar.gz -C /usr/local/src/ ln -s /usr/local/src/kibana-7.1.1-linux-x86_64 /usr/local/kibana mkdir -p /data/kibana/logs/
修改kibana配置
vim /usr/local/kibana/config/kibana.yml
server.port: 5601 # 配置kibana的端口 server.host: 127.0.0.1 # 配置監聽ip(設置本地ip使用nginx認證登錄) elasticsearch.hosts: ["http://node01:9200","http://node02:9200","http://node03:9200"] # 配置es服務器的ip logging.dest: /data/kibana/logs/kibana.log # 配置kibana的日志文件路徑,默認messages
i18n.locale: "zh-CN" #配置中文語言
啟動服務
cd /usr/local/kibana nohup bin/kibana &
六、安裝filebeat
tar xf filebeat-7.1.1-linux-x86_64.tar.gz -C /usr/local/src/ ln -s /usr/local/src/filebeat-7.1.1-linux-x86_64 /usr/local/filebeat
修改配置,詳細配置可參照filebeat配置詳解
vim /usr/local/filebeat/filebeat.yml
output.logstash: hosts: ["10.0.0.11:5044"] filebeat.config.modules: path: ${path.config}/modules.d/*.yml reload.enabled: false setup.template.settings: index.number_of_shards: 3 filebeat.inputs: - type: log tags: ["sunline_error"] enabled: true paths: - /home/sunline/adviser/logs/*.log* encoding: gbk exclude_lines: ['^INFO']
exclude_files: ['\.gz$'] include_lines: ['ERR','WARN','ERROR','Error'] multiline.pattern: '^[0-9]{4}-[0-9]{2}-[0-9]{2}' multiline.negate: true multiline.match: after
close_inactive: 1h - type: log tags: ["sunline_info"] enabled: true paths: - /home/sunline/adviser/logs/** #表示logs目錄下的所有文件(最高八級子目錄) encoding: gbk #字符集 exclude_lines: ['ERR','WARN','ERROR','Error'] include_lines: ['^INFO'] #排除某行 exclude_files: ['\.gz$'] #排除某個文件 multiline.pattern: '^[0-9]{4}-[0-9]{2}-[0-9]{2}' #將以*開頭為起始位置 multiline.negate: true multiline.match: after
close_inactive: 1h #表示1小時未導入,超時退出
tail_files: true #只會導入寫入的數據(第一次導入,最好用這個參數,防止導入不必要的數據)
#要添加多個路徑,只需要填加:
- type: log enabled: true paths: - /var/log/nginx/*.log
啟動filebeat
cd /usr/local/filebeat/ nohup ./filebeat -e -c filebeat.yml &
通過tail -f nohup.out 可以看到我們的服務已經啟動!
七、 kibana平台增加安全認證
kibana是nodejs開發的,本身並沒有任何安全限制,直接瀏覽url就能訪問,如果公網環境非常不安全,可以通過nginx請求轉發增加認證,方法如下:
yum install -y nginx httpd-tools
在kibana所在的服務器上安裝nginx服務,利用nginx的轉發指令實現。
安裝好nginx后,進入nginx配置頁面,修改如下:
vim /etc/nginx/conf.d/kiban_auth.conf
server { listen 80; listen 443 ssl; server_name www.wzxmt.com wzxmt.com; ssl_certificate /etc/nginx/ssl/wzxmt.crt; ssl_certificate_key /etc/nginx/ssl/wzxmt.key; ssl_session_timeout 5m; return 301 https://www.wzxmt.com$request_uri; access_log /var/log/nginx/kiban_auth.log; error_log /var/log/nginx/kiban_auth_error.log; location / { root /usr/share/nginx/html; index index.html index.htm; proxy_set_header Host $host; proxy_pass http://127.0.0.1:5601; auth_basic "kibana login auth"; auth_basic_user_file /etc/nginx/Kbn_htpasswd; } }
生成密碼文件:
htpasswd -bc /etc/nginx/Kbn_htpasswd wzxmt wzxmt chmod 400 /etc/nginx/Kbn_htpasswd chown -R nginx. /etc/nginx/Kbn_htpasswd
啟動nginx
nginx
訪問我們的web網站www.wzxmt.com,這時候訪問是需要驗證登錄,這樣大大加強了,kibana的安全性。
這樣,我們的elk集群已經搭建成功!接下來需要在客戶端搭建filbeat進行收集日志。
八、使用redis來緩解耦合
Redis 作為一個緩存,能夠幫助我們在主節點上屏蔽掉多個從節點之間不同日志文件的差異,負責管理日志端(從節點)的人可以專注於向 Redis 里生產數據,而負責數據分析聚合端的人則可以專注於從 Redis 內消費數據。
下載Redis
https://redis.io 官網下載
wget http://download.redis.io/releases/redis-5.0.5.tar.gz
安裝redis
tar xf redis-5.0.5.tar.gz -C /usr/local/src/ cd /usr/local/src/redis-5.0.5 make mkdir /usr/local/redis/{data,logs} -p cd src && make install PREFIX=/usr/local/redis
修改環境變量
echo 'export PATH=/usr/local/redis/bin:$PATH' >>/etc/profile source /etc/profile
修改redis配置
vim /usr/local/redis/redis.conf
port 6379 daemonize yes bind node01 127.0.0.1 pidfile /usr/local/redis/redis.pid logfile /usr/local/redis/logs/redis.log dbfilename dump.rdb save 900 1 save 300 10 save 60 10000 dir /usr/local/redis/data
啟動redis
redis-server /usr/local/redis/redis.conf
將filebeat收集到的日志輸出到redis中,只需要配置filebeat.yml
vim /usr/local/filebeat/filebeat.yml
output.redis: hosts: ["10.0.0.11:6379"] key: "sunline_log" data_type: "list" db: 1 timeout: 5 filebeat.config.modules: path: ${path.config}/modules.d/*.yml reload.enabled: false setup.template.settings: index.number_of_shards: 3 filebeat.inputs: - type: log tags: ["sunline_error"] enabled: true paths: - /home/sunline/adviser/logs/*.log* encoding: gbk exclude_lines: ['^INFO'] include_lines: ['ERR','WARN','ERROR','Error'] multiline.pattern: '^[0-9]{4}-[0-9]{2}-[0-9]{2}' multiline.negate: true multiline.match: after - type: log tags: ["sunline_info"] enabled: true paths: - /home/sunline/adviser/logs/*.log* encoding: gbk exclude_lines: ['ERR','WARN','ERROR','Error' include_lines: ['^INFO'] exclude_file: ['err.log'] multiline.pattern: '^[0-9]{4}-[0-9]{2}-[0-9]{2}' multiline.negate: true multiline.match: after
啟動filebeat
cd /usr/local/filebeat/ nohup ./filebeat -e -c filebeat.yml &
將redis輸入到logstash
vim /usr/local/logstash/logstash.conf
input { redis { host => "node01" port => "6379" db => "1" data_type => "list" key => "sunline_log" } } filter { grok { match => ["message", "%{TIMESTAMP_ISO8601:timestamp}"] #2019-06-10 16:00:00,063 remove_field => ["input"] } } output{ if "sunline_error" in [tags] { elasticsearch { hosts => ["node01:9200","node02:9200","node03:9200"] index => "sunline_error-%{+YYYY.MM.dd}" } } if "sunline_info" in [tags] { elasticsearch { hosts => ["node01:9200","node02:9200","node03:9200"] index => "sunline_info-%{+YYYY.MM.dd}" } } }
啟動logstash
cd /usr/local/logstash nohup bin/logstash -f logstash.conf -r &
通過kibana平台,我們可以看到收集的數據。
九、使用kafka來緩解耦合
當我們的數據量很大時,redis與mq都撐不住時,這時候可以考慮kafka來解決!Kafka是由Apache軟件基金會開發的一個開源流處理平台,由Scala和Java編寫。Kafka是一種高吞吐量的分布式發布訂閱消息系統,它可以處理消費者在網站中的所有動作流數據。 這些數據通常是由於吞吐量的要求而通過處理日志和日志聚合來解決。 對於像Hadoop一樣的日志數據和離線分析系統,但又要求實時處理的限制,這是一個可行的解決方案。Kafka的目的是通過Hadoop的並行加載機制來統一線上和離線的消息處理,也是為了通過集群來提供實時的消息。Kafka是一種高吞吐量的分布式發布訂閱消息系統:
(1)通過O的磁盤數據結構提供消息的持久化,夠保持長時間的穩定性能。
(2)高吞吐量:即使是非常普通的硬件Kafka也可以支持每秒數百萬的消息。
(3)支持通過Kafka服務器和消費機集群來分區消息。
(4)支持Hadoop並行數據加載。
Kafka集群持久地保留所有已發布的記錄 - 無論它們是否已被消耗 - 使用可配置的保留期。而redis只要被消耗了,數據也就消失了,這一點上,要比redis要安全很多!我們都知道Redis是以key的hash方式來分散對列存儲數據的,且Redis作為集群使用時,對應的應用對應一個Redis,在某種程度上會造成數據的傾斜性,從而導致數據的丟失。而從之前我們部署Kafka集群來看,kafka的一個topic(主題),可以有多個partition(副本),而且是均勻的分布在Kafka集群上,這就不會出現redis那樣的數據傾斜性。Kafka同時也具備Redis的冗余機制,像Redis集群如果有一台機器宕掉是很有可能造成數據丟失,而Kafka因為是均勻的分布在集群主機上,即使宕掉一台機器,是不會影響使用。同時Kafka作為一個訂閱消息系統,還具備每秒百萬級別的高吞吐量,持久性的、分布式的特點等。
我們可以使用kafka單點,也可以使用集群,kafka依賴zookeeper集群,這里按照我前面寫的博客部署zookeeper3.5.5集群部署即可!然后部署kafka集群,按照我的上篇博客Kafka 集群部署即可;
在搭建好kafka集群后,需要手動創建創建topic,因為默認導入的創建topic只有一個副本,一個分區,我們使用集群,需要設置多個副本與分片
kafka-topics.sh --create --zookeeper node01:2181 --replication-factor 3 --partitions 3 --topic test_info
接下來是要將我們的日志收集到kafka中,客戶端我們選擇filebeat,具體安裝我們前面已經講過,直接進行配置:
vim /usr/local/filebeat/filebeat-kafka.yml
output.kafka: enabled: true hosts: ["node01:9092", "node02:9092", "node03:9092"]
group_id: "logstash-group" topic: 'test_info'
#默認為100000,消息長度過長,會忽略消息
max_message_bytes: 10000000 filebeat.config.modules: path: ${path.config}/modules.d/*.yml reload.enabled: false setup.template.settings: index.number_of_shards: 3 filebeat.inputs: - type: log tags: ["ela_error"] enabled: true paths: - /data/elasticsearch/logs/elasticsearch.log encoding: gbk exclude_lines: ['^INFO'] include_lines: ['ERR','WARN'] multiline.pattern: '^[0-9]{4}-[0-9]{2}-[0-9]{2}' multiline.negate: true multiline.match: after - type: log tags: ["ela_info"] enabled: true paths: - /data/elasticsearch/logs/elasticsearch.log exclude_lines: ['ERR','WARN'] include_lines: ['INFO'] multiline.pattern: '^[0-9]{4}-[0-9]{2}-[0-9]{2}' multiline.negate: true multiline.match: after
可以看到我們的數據導入成功
[root@node01 _site]# kafka-topics.sh --list --zookeeper node01:2181 test_info
接下來是把kafka數據導入到es集群,通過logstash過濾將數據導入es集群
vim /usr/local/logstash/logstash_kafak.conf
input { kafka { bootstrap_servers=> "node01:9092,node02:9092,node03:9092" topics =>"test_info" #配置的消費者的group名稱,因為同一個組內的消費消息不會重復 group_id => "logstash-group" codec => "json" } } filter { grok { #解析message的timestamp字段 match => ["message", "%{TIMESTAMP_ISO8601:timestamp}"] #2019-06-10 16:00:00,063 remove_field => ["input"] } } output{ if "ela_error" in [tags] { elasticsearch { hosts => ["node01:9200","node02:9200","node03:9200"] index => "ela_error-%{+YYYY.MM.dd}" } } if "ela_info" in [tags] { elasticsearch { hosts => ["node01:9200","node02:9200","node03:9200"] index => "ela_info-%{+YYYY.MM.dd}" } } }
當我們從kafka讀取消息的時候,消息體是通過message字段來進行傳遞的,所以message是一個字符串,但是我們的es索引模板可能會非常復雜,所以我們需要對其進行json解析后,再交給es。否則es收到的之后一個message字段。
可以看到已經采集到數據。
隨着分片的逐漸增多,數據增大,查詢效率就會降低,如果不刪除數據,將會導致ES存儲的數據越來越大,甚至無法寫入新的數據,這時需要我們清楚不必要的歷史數據。可以在head插件上刪除,只不過這種方式比較麻煩,這時候我們可以通過
curl -XDELETE http://localhost:9200/customer?pretty
這條命令來實現腳本化來定時刪除數據,腳本如下:
vim es-index-clear.sh
#全部刪除:
#!/bin/bash date_index='30 day ago' nodes=`ip a s eth0|awk -F "[ /]+" 'NR==3{print $3}'` del_index() { date1=$1 date2=`date -d "${date_index}" +"%Y.%m.%d"` t1=`echo ${date1}|sed 's/\.//g'` t2=`echo ${date2}|sed 's/\.//g'` if [ $t1 -le $t2 ]; then echo "indexs時間${t1}早於${t2},進行索引刪除!" curl -XDELETE http://${nodes}:9200/*$1 fi } curl -XGET http://${nodes}:9200/_cat/indices|sed -rn "s#(.*)([0-9]{4}(\.([0-9]){2}){2})(.*)#\2#gp"|uniq|sort -nr|while read LINE do del_index $LINE done
#排除某分片:
#!/bin/bash #適用於分片格式:test-2019.05.28 date_index='0 day ago' nodes=`ip a s eth0|awk -F "[ /]+" 'NR==3{print $3}'` exclude_index=test # a|b|c排除多個分片格式 del_index() { date1=$1 date2=`date -d "${date_index}" +"%Y.%m.%d"` t1=`echo ${date1}|sed 's/\.//g'` t2=`echo ${date2}|sed 's/\.//g'` if [ $t1 -le $t2 ]; then #獲取分片名稱 curl -XGET http://${nodes}:9200/_cat/indices|grep "\-2"|grep "$1"|grep -v "${exclude_index}"|awk '{print $3}'|sed -r "s/(.*)(-20.*)/\1/g"|while read line do echo "${line}時間${t1}早於${t2},進行索引刪除!" #刪除分片 curl -XDELETE http://${nodes}:9200/${line}-$1 &>/dev/null done fi } #獲取分片時間 curl -XGET http://${nodes}:9200/_cat/indices|sed -rn "s#(.*)([0-9]{4}(\.([0-9]){2}){2})(.*)#\2#gp"|uniq|sort -nr|while read LINE do del_index $LINE 2>/dev/null done
腳本添加執行權限
chmod +x /usr/local/elasticsearch/scripts/es-index-clear.sh
crontab -e添加定時任務:
0 1 1 * * /usr/local/elasticsearch/scripts/es-index-clear.sh
表示在每個月1號的凌晨一點進行清除索引
最后再附上es啟動腳本:
#!/bin/bash ################################################# # File Name: es_start.sh # Author: lxw # Mail: 1451343603@qq.com # Function: # Created Time: Thu Oct 24 10:12:24 CST 2019 ################################################# es_work=/usr/local/elasticsearch es_pid=`ps -aux|grep 'elasticsearch/lib'|grep -v 'grep'|awk '{print $2}'` es(){ RETVAL=$? if [ $RETVAL -eq 0 ];then echo "Elasticsearch $1 successed!" else echo "Elasticsearch $1 failsed!" exit fi } start_m(){ su - elasticsearch &>/dev/null<<-EOF cd $es_work bin/elasticsearch -d &>/dev/null EOF } es_start(){ if [ ! -z $es_pid ];then echo "Elasticsearch is running!" else start_m es start fi } es_stop(){ if [ ! -z $es_pid ];then kill $es_pid es stop else echo "Elasticsearch is stoped!" fi } es_restart(){ if [ ! -z $es_pid ];then kill $es_pid es stop sleep 2 start_m es start else echo "Elasticsearch is stoped!" sleep 2 start_m es start fi } case $1 in start) es_start ;; stop) es_stop ;; restart) es_restart ;; *) echo $"USAGE: $0 {start|stop|restart}" esac exit $?