6.8版本elk日志管理系統部署elasticsearch + kibana + logstash + filebeat + kafka_2.12 + x-pack破解 + 各組件間的ssl認證
目錄:
1.組件介紹 2.架構圖 3.軟件下載地址 4.安裝elasticsearch 5.安裝kibana 6.啟用安全認證 7.安裝kafka 8.安裝logstash 9.安裝filebeat 10.破解x-pack 11.啟動各組件間的ssl認證
1.組件介紹
Filebeat是一個日志文件托運工具,在你的服務器上安裝客戶端后,filebeat會監控日志目錄或者指定的日志文件,追蹤讀取這些文件(追蹤文件的變化,不停的讀)
Kafka是一種高吞吐量的分布式發布訂閱消息系統,它可以處理消費者規模的網站中的所有動作流數據
Logstash是一根具備實時數據傳輸能力的管道,負責將數據信息從管道的輸入端傳輸到管道的輸出端;與此同時這根管道還可以讓你根據自己的需求在中間加上濾網,Logstash提供里很多功能強大的濾網以滿足你的各種應用場景
ElasticSearch它提供了一個分布式多用戶能力的全文搜索引擎,基於RESTful web接口
Kibana是ElasticSearch的用戶界面
在實際應用場景下,為了滿足大數據實時檢索的場景,利用Filebeat去監控日志文件,將Kafka作為Filebeat的輸出端,Kafka實時接收到Filebeat后以Logstash作為輸出端輸出,到Logstash的數據也許還不是我們想要的格式化或者特定業務的數據,這時可以通過Logstash的一些過了插件對數據進行過濾最后達到想要的數據格式以ElasticSearch作為輸出端輸出,數據到ElasticSearch就可以進行豐富的分布式檢索了
2.架構圖
3.軟件下載地址:https://www.elastic.co/cn/products/ 【建議裝在/data/elk/目錄下,因為使用長時間后會產生很大的數據,需要較大的磁盤支撐】
jdk安裝:https://www.cnblogs.com/chenjw-note/p/10838160.html
注意:所有配置文件中配置項后面的注釋內容都需要去掉
4.安裝elasticsearch:
解壓
修改配置文件:vim /usr/local/elasticsearch/config/elasticsearch.yml
cluster.name: MyElk node.name: node-1 path.data: /usr/local/elasticsearch/data path.logs: /usr/local/elasticsearch/logs network.host: 本機地址 http.port: 9200
#因為Centos6不支持SecComp,而ES6默認bootstrap.system_call_filter為true進行檢測,所以導致檢測失敗,失敗后直接導致ES不能啟動,需要禁用一下兩行,centos7則不用
bootstrap.memory_lock: false
bootstrap.system_call_filter: false
#xpack.security.enabled: true #啟用安全認證 這兩個參數在后面6.啟用安全認證再啟用
#xpack.security.transport.ssl.enabled: true
修改系統參數:
sysctl -p
net.ipv4.ip_forward = 0 net.ipv4.conf.default.rp_filter = 1 net.ipv4.conf.default.accept_source_route = 0 kernel.sysrq = 0 kernel.core_uses_pid = 1 net.ipv4.tcp_syncookies = 1 kernel.msgmnb = 65536 kernel.msgmax = 65536 kernel.shmmax = 68719476736 kernel.shmall = 4294967296 kernel.core_pattern = /data/corefile/core.%p.%e vm.max_map_count = 655360 vm.swappiness = 0
vim /etc/security/limits.conf
elastic soft memlock unlimited elastic hard memlock unlimited * soft nofile 65536 * hard nofile 131072 * soft nproc 2048 * hard nproc 4096
創建普通用戶:useradd elastic passwd elastic
賦予權限:chown -R elastic:elastic /usr/local/elasticsearch
后台啟動elasticsearch:su - elastic && cd /usr/local/elasticsearch/bin && ./elasticsearch -d
5.安裝kibana:
解壓
修改配置文件:egrep -Ev '#|^$' kibana.yml
server.port: 5601 server.host: "0.0.0.0" elasticsearch.hosts: ["http://es_ip:9200"]
漢化kibana:https://github.com/anbai-inc/Kibana_Hanization
下載漢化包:https://github.com/anbai-inc/Kibana_Hanization/archive/master.zip
拷貝此項目中的translations文件夾
到kibana目錄下的src/legacy/core_plugins/kibana/
目錄下
修改kibana配置文件kibana.yml中的配置項: egrep -Ev '#|^$' kibana.yml
server.port: 5601
server.host: "0.0.0.0" elasticsearch.hosts: ["http://es_ip:9200"]
i18n.locale: "zh-CN"
7.x版本官方自帶漢化資源文件(位於kibana目錄下的node_modules/x-pack/plugins/translations/translations/
目錄
直接修改kibana配置文件kibana.yml中的配置項:i18n.locale: "zh-CN" 即可
啟動kibana:cd /usr/local/kibana/bin && ./kibana
6.啟用安全認證:
瀏覽器訪問kibana:http://IP地址:5601,點擊【管理】,點擊【許可管理】,啟用【試用30白金版】,啟用后再次刷新不能正常進入kibana
修改elasticsearch配置文件:vim /usr/local/elasticsearch/config/elasticsearch.yml,添加
xpack.security.enabled: true #啟用安全認證 xpack.security.transport.ssl.enabled: true
重啟elasticsearch
初始化elasticsearch各組件密碼:cd /usr/local/elasticsearch/bin && ./elasticsearch-setup-passwords interactive 輸入你的密碼
注意:配置elasticsearch ssl認證集群可參考這篇文章:https://www.cnblogs.com/chenjw-note/articles/10901632.html
修改kibana配置文件:egrep -Ev '#|^$' /usr/local/kibana/config/kibana.yml
server.port: 5601 server.host: "0.0.0.0" elasticsearch.hosts: ["http://es_ip:9200"] elasticsearch.username: "elastic" elasticsearch.password: "你的密碼" i18n.locale: "zh-CN"
重啟kibana:cd /usr/local/kibana/bin && ./kibana
訪問登陸頁面,超級管理員是:elastic 你的密碼
完成安全認證
7.安裝kafka:
下載kafka:http://kafka.apache.org/downloads
解壓
修改配置文件:
egrep -Ev '#|^$' /usr/local/kafka/config/zookeeper.properties
dataDir=/usr/local/kafka/data dataLogDir=/usr/local/kafka/logs clientPort=2181 maxClientCnxns=100 tickTime=2000 initLimit=10
egrep -Ev '#|^$' /usr/local/kafka/config/server.properties
broker.id=0 listeners=PLAINTEXT://:9092 host.name=本機ip地址 #最好用域名 num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 log.dirs=/usr/local/kafka/logs num.partitions=1 num.recovery.threads.per.data.dir=1 offsets.topic.replication.factor=1 transaction.state.log.replication.factor=1 transaction.state.log.min.isr=1 log.retention.hours=168 log.segment.bytes=1073741824 log.retention.check.interval.ms=300000 zookeeper.connect=localhost:2181 #最好用域名地址,多個地址則用逗號隔開 zookeeper.connection.timeout.ms=6000 group.initial.rebalance.delay.ms=0
啟動kafka:cd /usr/local/kafka/bin && sh zookeeper-server-start.sh -daemon ../config/zookeeper.properties && sh kafka-server-start.sh -daemon ../config/server.properties
8.安裝logstash:
解壓
編寫規則文件:vim /usr/local/logstash/bin/logstash_template.conf
#輸入為kafka,topics是test,查詢時間是1秒 【舊】 input { kafka { enable_auto_commit => true auto_commit_interval_ms => "1000" codec => "json" bootstrap_servers => "kafka_地址:9092" topics => ["test"]
auto_offset_reset => "latest"
group_id => "logstash-g1" #組建集群要用相同group_id
}
}
#使用filter 過濾出來我們想要的日志(未啟用)
filter {
#需要在filebeat配置文件中定義自己的tags
if "tags_name" in [tags] {
grok {
#自定義日志格式,括號里邊包含所有一個key和value,例子:(?<key>value)
match => { "message" => "(?<date>\d{4}/\d{2}/\d{2}\s(?<datetime>%{TIME}))\s-\s(?<status>\w{2})\s-\s(?<respond_time>\d+)\.\d+\w{2}\s-\s%{IP:client}:(?<client-port>\d+)\[\d+\]->%{IP:server}:(?<server-port>\d+).*:(?<database
s><\w+>):(?<SQL>.*)"}
#過濾完成之后把之前的message 移出掉,節約磁盤空間
remove_field => ["message"]
}
}
}
#stdout{ codec=>rubydebug}用來調試使用,運行時會輸出到屏幕,調試完成注釋掉 #在filebeat里面我們定義了fields的logtype,在這里可以用來區別不同的日志 #index為ES的數據庫index名
output {
#stdout{ codec=>rubydebug}
elasticsearch {
hosts => ["es_ip:9200"]
user => "elastic"
password => "xxxxx"
index => "%{[fields][project]}-%{[fields][logtype]}-%{+YYYY.MM.dd}"
}
file {
path => "/usr/local/logstash/logs/%{[fields][project]}-%{[fields][logtype]}-%{+YYYY.MM.dd}.log" #輸出到日志文件
}
}
input { 【新】 kafka { codec => "json" topics => ["k8s1","k8s2","k8s3","k8s4"] auto_offset_reset => "latest" group_id => "logstash-g1" #組建集群需要使用相同group_id bootstrap_servers => "kafka1地址:9092,kafka2地址:9092" } } filter { grok{ match => { "source" => "/data/%{USERNAME:server}/" #獲取server的值 } } #日志分類1 if [fields][type] == "logtype" and [fields][project]=="docker" { grok { #日志格式:程序名 時間 日志等級 日志信息 match => { "message" => "\[%{USER:name}\]\[%{TIME:time}\]\[%{WORD:level}\] web session ip: %{IP:clientip}" } #clientip:獲取請求客戶端地址 match => { "message" => "\[%{USER:name}\]\[%{TIME:time}\]\[%{WORD:level}\] %{GREEDYDATA:data}" } remove_field => [ "message", "offset", "beat", "@version", "input_type" ] } } #日志分類2 if [fields][type] == "boamp" and [fields][project]=="k8s" { grok { #日志格式:HTTP GET /super_cmdb/login/ 200 [0.16, 172.17.1.1:39298] match => { "message" => "%{WORD:request} %{WORD:method} %{URIPATHPARAM:request_url} %{NUMBER:status} \[%{NUMBER:http_response_time}, %{IP:clientip}:%{NUMBER:port}\]" } #remove_field => [ "message" ] } } geoip { #解析請求客戶端地址詳細信息放到 "geoip":{}下 source => "clientip" } }
#stdout{ codec=>rubydebug}用來調試使用,運行時會輸出到屏幕,調試完成注釋掉
#在filebeat里面我們定義了fields的logtype,在這里可以用來區別不同的日志
#index為ES的數據庫index名
output {
#stdout{ codec=>rubydebug} if "_geoip_lookup_failure" in [tags] { #判斷geoip解析客戶端ip是否成功,tags不用定義,如果有問題,程序自己寫入的 elasticsearch { hosts => ["es_ip:9200","es_ip:9200"] index => "%{[fields][project]}-%{[fields][type]}-%{+YYYY.MM.dd}" user => "elastic" password => "密碼" template => "/data/logstash/config/template_base.json" template_name => "template_base" template_overwrite => true } } else { elasticsearch { hosts => ["es_ip:9200","es_ip:9200"] index => "logstash-%{[fields][project]}-%{[fields][type]}-%{+YYYY.MM.dd}" user => "elastic" password => "密碼" template => "/data/logstash/config/template_base.json" template_name => "template_base" template_overwrite => true } } file { #將日志輸出到文件 path => "/data/logstash/logs/%{[fields][project]}/%{[fields][project]}-%{[fields][type]}-%{+YYYY.MM.dd}.log" } }
模板文件:vim /data/logstash/config/template_base.json 【可不使用,取消對應配置即可】
分片數一般以(節點數*1.5或3倍)來計算,分片副本一般有1個就夠了,一個分片一般可以處理30G數據,也可以根據數據大小進行分片數量調整
{ "template" : "*", "order" : 0, "settings" : { "index" : { "number_of_shards" : "3", "number_of_replicas" : "1" } } }
啟動logstash:cd /usr/local/logstash/bin/ && ./logstash -f logstash_template.conf
9.安裝filebeat:
解壓
修改配置文件:
egrep -Ev '#|^$' /usr/local/filebeat/filebeat.yml
filebeat.prospectors: - enabled: true paths: - /data/dddjs_*_s*a/server/build_result/app/log/* fields: project: dddjs type: game host_ip: 10.21.210.170 output.kafka: enabled: true hosts: ["kafka.com:9092"] topic: '%{[fields][project]}' #username: "bbh" #password: "xxxxx" #ssl.certificate_authorities: ["/etc/filebeat/ssl/ca.crt"] #ssl.certificate: "/etc/filebeat/ssl/client.crt" #ssl.key: "/etc/filebeat/ssl/client.key"
啟動filebeat:cd /usr/local/logstash/bin/ && ./filebeat -e -f filebeat.yml
【注意:若啟動后無法連接kafka,請修改/etc/hosts,綁定kafka主機名和ip】
10.破解x-pack
elk系統很多功能需要x-pack的支持,但官方要收費。
elasticsearch 啟動前,記得添加這兩個參數vim /usr/local/elasticsearch/config/elasticsearch.yml,導入license時需要開啟安全認證
xpack.security.enabled: true
xpack.security.transport.ssl.enabled: true
編譯java文件方法參考此鏈接:https://www.cnblogs.com/bigben0123/p/10305204.html
修改jar包:此處6.7.2版本已修改好,點擊鏈接下載即可:
版本6.7.2:https://files.cnblogs.com/files/chenjw-note/x-pack-core-6.7.2.zip
版本6.8.0:https://files.cnblogs.com/files/chenjw-note/x-pack-core-6.8.0.zip
替換jar包:【注意:當elasticsearch是集群時,需要將集群個節點先替換修改后的jar包,才可以成功導入license,在主節點導一次即可,會同步到其他節點】
cp -r /usr/local/elasticsearch/modules/x-pack-core/x-pack-core-6.7.2.jar /usr/local/elasticsearch/modules/x-pack-core/x-pack-core-6.7.2.jar.bak cp x-pack-core/x-pack-core-6.7.2.jar /usr/local/elasticsearch/modules/x-pack-core/x-pack-core-6.7.2.jar /usr/local/elasticsearch/modules/x-pack-core/
重啟elasticsearch
申請license:https://license.elastic.co/registration
修改license.json,修改 type,expiry_date_in_millis,max_nodes 即可
"license":{ "uid":"719615b9-5188-4db4-a0f9-29863d669b4c", "type":"platinum", #白金會員 "issue_date_in_millis":1558310400000, "expiry_date_in_millis":252457920099, #到期時間戳,單位為毫秒ms "max_nodes":1000, "issued_to":"xxxxxxxxxx", "issuer":"Web Form", "signature":"AAAAAxxxxxxxxxxxxxxxxxxxxx", "start_date_in_millis":1558310400000 } }
導入license:
curl -XPUT -u elastic 'http://es_ip:9200/_xpack/license' -H "Content-Type: application/json" -d @license.json 輸入密碼 輸出:{"acknowledged":true,"license_status":"valid"} 則成功
驗證許可時間:
curl -XGET -u elastic es_ip:9200/_license
11.啟動各組件間的ssl認證
(1) 生成密鑰證書
cat /usr/local/kafka/ssl/fd.ext
subjectAltName = DNS:*.kafka.xxx.com, DNS:server.kafka.xxx.com #ssl證書對應匹配的域名格式,通過添加DNS來增加多個域名匹配
cat /usr/local/kafka/ssl/ssl.sh
#!/bin/bash BASE_DIR=. # 保存路徑 DAYS_VALID=7200 # 證書有效期 PASSWORD=xxxxx # 證書密碼 NAME=server.kafka.xxx.com # 域名 DEPT=yunwei # 部門名 COMPANY=xxx # 公司名 CITY=gz # 城市 PROVINCE=gd # 省份 COUNTRY=CN # 國家 CERT_DIR="$BASE_DIR/ca" SERVER_DIR="$BASE_DIR/secrets" CLIENT_DIR="$BASE_DIR/client" CA_CERT_NAME="$CLIENT_DIR/ca.crt" CA_KEY_NAME="$CERT_DIR/ca.key" PWD_NAME="$SERVER_DIR/password" SERVER_KEYSTORE="$SERVER_DIR/server.keystore.jks" SERVER_TRUSTSTORE="$SERVER_DIR/server.truststore.jks" SERVER_CSR="$CERT_DIR/server.csr" SERVER_CERT="$CERT_DIR/server.crt" CLIENT_KEY="$CLIENT_DIR/client.key" CLIENT_CSR="$CERT_DIR/client.csr" CLIENT_CERT="$CLIENT_DIR/client.crt" SUBJ="/C=$COUNTRY/ST=$PROVINCE/L=$CITY/O=$COMPANY/OU=$DEPT/CN=$NAME/CN=$NAME" DNAME="CN=$NAME, OU=$DEPT, O=$COMPANY, L=$CITY, ST=$PROVINCE, C=$COUNTRY" mkdir -p $CERT_DIR mkdir -p $SERVER_DIR mkdir -p $CLIENT_DIR rm $CERT_DIR/* rm $SERVER_DIR/* rm $CLIENT_DIR/* echo "1. Generate CA certificate and key..." openssl req -new -x509 -keyout $CA_KEY_NAME -out $CA_CERT_NAME -days $DAYS_VALID \ -passin pass:"$PASSWORD" -passout pass:"$PASSWORD" -subj "$SUBJ" echo "" echo "2. Generate server key store..." keytool -genkey -keyalg RSA -keystore $SERVER_KEYSTORE -alias $NAME \ -keysize 2048 -validity $DAYS_VALID -storepass $PASSWORD -keypass $PASSWORD \ -dname "$DNAME" -ext SAN=DNS:$NAME echo "" echo "3. Export server certificate signing request..." keytool -certreq -keystore $SERVER_KEYSTORE -alias $NAME \ -file $SERVER_CSR -storepass $PASSWORD -keypass $PASSWORD -noprompt echo "" echo "4. Sign server certificate by CA..." openssl x509 -req -CAcreateserial -CA $CA_CERT_NAME -CAkey $CA_KEY_NAME \ -in $SERVER_CSR -out $SERVER_CERT -days $DAYS_VALID -passin pass:$PASSWORD -extfile fd.ext echo "" echo "5. Import CA to server key store..." keytool -import -keystore $SERVER_KEYSTORE -alias CARoot -file $CA_CERT_NAME \ -storepass $PASSWORD -keypass $PASSWORD -noprompt echo "" echo "6. Import server certificate to server key store..." keytool -import -keystore $SERVER_KEYSTORE -alias $NAME -file $SERVER_CERT \ -storepass $PASSWORD -keypass $PASSWORD -noprompt echo "" echo "7. Import CA to server trust store..." keytool -import -keystore $SERVER_TRUSTSTORE -alias CARoot -file $CA_CERT_NAME \ -storepass $PASSWORD -keypass $PASSWORD -noprompt echo "" echo "8. Generate client key and certificate request..." openssl req -nodes -new -keyout $CLIENT_KEY -out $CLIENT_CSR -days $DAYS_VALID \ -subj "$SUBJ" echo "" echo "9. Sign client certificate by CA..." openssl x509 -req -CAcreateserial -CA $CA_CERT_NAME -CAkey $CA_KEY_NAME \ -in $CLIENT_CSR -out $CLIENT_CERT -days $DAYS_VALID -passin pass:$PASSWORD echo "" echo "10. Generate password file..." echo "$PASSWORD" > $PWD_NAME rm .srl echo "" echo "####### Done. #######" echo "Following files were generated" echo "Server password file: $PWD_NAME" echo "Server java keystore: $SERVER_KEYSTORE" echo "Server java truststore: $SERVER_TRUSTSTORE" echo "Signed Client cert: $CLIENT_CERT" echo "Client RSA private key: $CLIENT_KEY" echo "Client PEM truststore: $CA_CERT_NAME"
執行:cd /usr/local/kafka/ssl/ && ./ssl.sh
(2) 把ssl證書拷貝到各組件的機器上
(3) 修改各組件配置文件
vim /usr/local/kafka/config/server.properties
host.name=server.kafka.xxx.com listeners=PLAINTEXT://:9091,SSL://:9092 ssl.keystore.location=/usr/local/kafka/ssl/secrets/server.keystore.jks ssl.keystore.password=xxxx ssl.key.password=xxxx ssl.truststore.location=/usr/local/kafka/ssl/secrets/server.truststore.jks ssl.truststore.password=xxxx
重啟kafka
vim /usr/local/logstash/bin/logstash_template.conf
security_protocol => "SSL" ssl_keystore_password => "xxxx" ssl_keystore_location => "/usr/local/kafka/ssl/secrets/server.keystore.jks" ssl_keystore_password => "xxxx" ssl_truststore_password => "xxxx" ssl_truststore_location => "/usr/local/kafka/ssl/secrets/server.truststore.jks"
重啟logstash
vim /usr/local/filebeat/filebeat.yml
#輸出到kafka output.kafka: enable: true hosts: ["server.kafka.xxx.com:9092"] topic: 'test' ssl.certificate_authorities: ["/usr/local/filebeat/ssl/ca.crt"] ssl.certificate: "/usr/local/filebeat/ssl/client.crt" ssl.key: "/usr/local/filebeat/ssl/client.key"
重啟filebeat
(4) 注意:記得進行域名解析或者綁定主機hosts
至此,elk平台詳解已完成 ,真是一個漫長的過程,在此總結,希望給正在研究苦惱的朋友有所幫助,同時也給自己留下筆記,方便日后查閱。