6.8版本elk日志管理系統部署elasticsearch + kibana + logstash + filebeat + kafka_2.12 + x-pack破解 + 各組件間的ssl認證


6.8版本elk日志管理系統部署elasticsearch + kibana + logstash  + filebeat + kafka_2.12 + x-pack破解 + 各組件間的ssl認證

 

目錄:

1.組件介紹
2.架構圖
3.軟件下載地址
4.安裝elasticsearch
5.安裝kibana
6.啟用安全認證
7.安裝kafka
8.安裝logstash
9.安裝filebeat
10.破解x-pack
11.啟動各組件間的ssl認證

 

 

1.組件介紹

Filebeat是一個日志文件托運工具,在你的服務器上安裝客戶端后,filebeat會監控日志目錄或者指定的日志文件,追蹤讀取這些文件(追蹤文件的變化,不停的讀)
Kafka是一種高吞吐量的分布式發布訂閱消息系統,它可以處理消費者規模的網站中的所有動作流數據
Logstash是一根具備實時數據傳輸能力的管道,負責將數據信息從管道的輸入端傳輸到管道的輸出端;與此同時這根管道還可以讓你根據自己的需求在中間加上濾網,Logstash提供里很多功能強大的濾網以滿足你的各種應用場景
ElasticSearch它提供了一個分布式多用戶能力的全文搜索引擎,基於RESTful web接口
Kibana是ElasticSearch的用戶界面

在實際應用場景下,為了滿足大數據實時檢索的場景,利用Filebeat去監控日志文件,將Kafka作為Filebeat的輸出端,Kafka實時接收到Filebeat后以Logstash作為輸出端輸出,到Logstash的數據也許還不是我們想要的格式化或者特定業務的數據,這時可以通過Logstash的一些過了插件對數據進行過濾最后達到想要的數據格式以ElasticSearch作為輸出端輸出,數據到ElasticSearch就可以進行豐富的分布式檢索了

2.架構圖

3.軟件下載地址:https://www.elastic.co/cn/products/     【建議裝在/data/elk/目錄下,因為使用長時間后會產生很大的數據,需要較大的磁盤支撐】

  jdk安裝:https://www.cnblogs.com/chenjw-note/p/10838160.html

  注意:所有配置文件中配置項后面的注釋內容都需要去掉

4.安裝elasticsearch:

  解壓

  修改配置文件:vim /usr/local/elasticsearch/config/elasticsearch.yml

cluster.name: MyElk
node.name: node-1
path.data: /usr/local/elasticsearch/data
path.logs: /usr/local/elasticsearch/logs
network.host: 本機地址
http.port: 9200
#因為Centos6不支持SecComp,而ES6默認bootstrap.system_call_filter為true進行檢測,所以導致檢測失敗,失敗后直接導致ES不能啟動,需要禁用一下兩行,centos7則不用
bootstrap.memory_lock: false
bootstrap.system_call_filter: false
#xpack.security.enabled: true #啟用安全認證  這兩個參數在后面6.啟用安全認證再啟用
#xpack.security.transport.ssl.enabled: true

  修改系統參數:

    sysctl -p

net.ipv4.ip_forward = 0
net.ipv4.conf.default.rp_filter = 1
net.ipv4.conf.default.accept_source_route = 0
kernel.sysrq = 0
kernel.core_uses_pid = 1
net.ipv4.tcp_syncookies = 1
kernel.msgmnb = 65536
kernel.msgmax = 65536
kernel.shmmax = 68719476736
kernel.shmall = 4294967296
kernel.core_pattern = /data/corefile/core.%p.%e
vm.max_map_count = 655360
vm.swappiness = 0

    vim /etc/security/limits.conf

elastic soft memlock unlimited
elastic hard memlock unlimited
* soft nofile 65536
* hard nofile 131072
* soft nproc 2048
* hard nproc 4096

  創建普通用戶:useradd elastic   passwd elastic

  賦予權限:chown -R elastic:elastic /usr/local/elasticsearch

  后台啟動elasticsearch:su - elastic && cd /usr/local/elasticsearch/bin && ./elasticsearch -d

5.安裝kibana:

  解壓

  修改配置文件:egrep -Ev '#|^$' kibana.yml 

server.port: 5601
server.host: "0.0.0.0"
elasticsearch.hosts: ["http://es_ip:9200"]

  漢化kibana:https://github.com/anbai-inc/Kibana_Hanization

    下載漢化包:https://github.com/anbai-inc/Kibana_Hanization/archive/master.zip

    拷貝此項目中的translations文件夾到kibana目錄下的src/legacy/core_plugins/kibana/目錄下

    修改kibana配置文件kibana.yml中的配置項: egrep -Ev '#|^$' kibana.yml 

server.port: 5601
server.host: "0.0.0.0" elasticsearch.hosts: ["http://es_ip:9200"]
i18n.locale: "zh-CN"

     7.x版本官方自帶漢化資源文件(位於kibana目錄下的node_modules/x-pack/plugins/translations/translations/目錄

    直接修改kibana配置文件kibana.yml中的配置項:i18n.locale: "zh-CN" 即可

  啟動kibana:cd /usr/local/kibana/bin && ./kibana

6.啟用安全認證:

  瀏覽器訪問kibana:http://IP地址:5601,點擊【管理】,點擊【許可管理】,啟用【試用30白金版】,啟用后再次刷新不能正常進入kibana

  修改elasticsearch配置文件:vim /usr/local/elasticsearch/config/elasticsearch.yml,添加

xpack.security.enabled: true #啟用安全認證
xpack.security.transport.ssl.enabled: true

  重啟elasticsearch

  初始化elasticsearch各組件密碼:cd /usr/local/elasticsearch/bin && ./elasticsearch-setup-passwords interactive   輸入你的密碼

  注意:配置elasticsearch ssl認證集群可參考這篇文章:https://www.cnblogs.com/chenjw-note/articles/10901632.html

  修改kibana配置文件:egrep -Ev '#|^$' /usr/local/kibana/config/kibana.yml

server.port: 5601
server.host: "0.0.0.0"
elasticsearch.hosts: ["http://es_ip:9200"]
elasticsearch.username: "elastic"
elasticsearch.password: "你的密碼"
i18n.locale: "zh-CN"

  重啟kibana:cd /usr/local/kibana/bin && ./kibana

  訪問登陸頁面,超級管理員是:elastic 你的密碼

  完成安全認證

 7.安裝kafka:

  下載kafka:http://kafka.apache.org/downloads

  解壓

  修改配置文件:

  egrep -Ev '#|^$' /usr/local/kafka/config/zookeeper.properties 

dataDir=/usr/local/kafka/data
dataLogDir=/usr/local/kafka/logs
clientPort=2181
maxClientCnxns=100
tickTime=2000
initLimit=10

  egrep -Ev '#|^$' /usr/local/kafka/config/server.properties

broker.id=0
listeners=PLAINTEXT://:9092
host.name=本機ip地址  #最好用域名
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/usr/local/kafka/logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=localhost:2181  #最好用域名地址,多個地址則用逗號隔開
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0

  啟動kafka:cd /usr/local/kafka/bin && sh zookeeper-server-start.sh -daemon ../config/zookeeper.properties && sh kafka-server-start.sh -daemon ../config/server.properties

8.安裝logstash:

  解壓

  編寫規則文件:vim /usr/local/logstash/bin/logstash_template.conf

#輸入為kafka,topics是test,查詢時間是1秒  【舊】
input {
    kafka {
        enable_auto_commit => true
        auto_commit_interval_ms => "1000"
        codec => "json"
        bootstrap_servers => "kafka_地址:9092"
        topics => ["test"]
     
auto_offset_reset => "latest"
     group_id => "logstash-g1"  #組建集群要用相同group_id
   }
}

  #使用filter 過濾出來我們想要的日志(未啟用)
  filter {
      #需要在filebeat配置文件中定義自己的tags
      if "tags_name" in [tags] {
          grok {
          #自定義日志格式,括號里邊包含所有一個key和value,例子:(?<key>value)
              match => { "message" => "(?<date>\d{4}/\d{2}/\d{2}\s(?<datetime>%{TIME}))\s-\s(?<status>\w{2})\s-\s(?<respond_time>\d+)\.\d+\w{2}\s-\s%{IP:client}:(?<client-port>\d+)\[\d+\]->%{IP:server}:(?<server-port>\d+).*:(?<database
s><\w+>):(?<SQL>.*)"}
              #過濾完成之后把之前的message 移出掉,節約磁盤空間
              remove_field => ["message"]
          }
      }
  }

#stdout{ codec=>rubydebug}用來調試使用,運行時會輸出到屏幕,調試完成注釋掉
#在filebeat里面我們定義了fields的logtype,在這里可以用來區別不同的日志
#index為ES的數據庫index名

  output {
      #stdout{ codec=>rubydebug}
      elasticsearch {
          hosts => ["es_ip:9200"]
          user => "elastic"
          password => "xxxxx"
          index => "%{[fields][project]}-%{[fields][logtype]}-%{+YYYY.MM.dd}"
      }

      file {
          path => "/usr/local/logstash/logs/%{[fields][project]}-%{[fields][logtype]}-%{+YYYY.MM.dd}.log"  #輸出到日志文件
      }

  }

input {    【新】
  kafka {
    codec => "json"
    topics => ["k8s1","k8s2","k8s3","k8s4"]
    auto_offset_reset => "latest"
    group_id => "logstash-g1"  #組建集群需要使用相同group_id
    bootstrap_servers => "kafka1地址:9092,kafka2地址:9092"
  }
}
filter {
  grok{
     match => {
        "source" => "/data/%{USERNAME:server}/"        #獲取server的值
     }
  }

  #日志分類1
  if [fields][type] == "logtype" and [fields][project]=="docker" {
    grok {
       #日志格式:程序名 時間 日志等級 日志信息
       match => { "message" => "\[%{USER:name}\]\[%{TIME:time}\]\[%{WORD:level}\] web session ip: %{IP:clientip}" }        #clientip:獲取請求客戶端地址
       match => { "message" => "\[%{USER:name}\]\[%{TIME:time}\]\[%{WORD:level}\] %{GREEDYDATA:data}" }
       remove_field => [ "message", "offset", "beat", "@version", "input_type" ]
    }
  }

  #日志分類2
  if [fields][type] == "boamp" and [fields][project]=="k8s" {
    grok {
       #日志格式:HTTP GET /super_cmdb/login/ 200 [0.16, 172.17.1.1:39298]
       match => { "message" => "%{WORD:request} %{WORD:method} %{URIPATHPARAM:request_url} %{NUMBER:status} \[%{NUMBER:http_response_time}, %{IP:clientip}:%{NUMBER:port}\]" }
       #remove_field => [ "message" ]
    }
  }

  geoip {            #解析請求客戶端地址詳細信息放到 "geoip":{}下
    source => "clientip"
  }
}
#stdout{ codec=>rubydebug}用來調試使用,運行時會輸出到屏幕,調試完成注釋掉
#在filebeat里面我們定義了fields的logtype,在這里可以用來區別不同的日志
#index為ES的數據庫index名
output {
#stdout{ codec=>rubydebug}
if "_geoip_lookup_failure" in [tags] { #判斷geoip解析客戶端ip是否成功,tags不用定義,如果有問題,程序自己寫入的 elasticsearch { hosts => ["es_ip:9200","es_ip:9200"] index => "%{[fields][project]}-%{[fields][type]}-%{+YYYY.MM.dd}" user => "elastic" password => "密碼" template => "/data/logstash/config/template_base.json" template_name => "template_base" template_overwrite => true } } else { elasticsearch { hosts => ["es_ip:9200","es_ip:9200"] index => "logstash-%{[fields][project]}-%{[fields][type]}-%{+YYYY.MM.dd}" user => "elastic" password => "密碼" template => "/data/logstash/config/template_base.json" template_name => "template_base" template_overwrite => true } } file { #將日志輸出到文件 path => "/data/logstash/logs/%{[fields][project]}/%{[fields][project]}-%{[fields][type]}-%{+YYYY.MM.dd}.log" } }

  模板文件:vim /data/logstash/config/template_base.json 【可不使用,取消對應配置即可】

  分片數一般以(節點數*1.5或3倍)來計算,分片副本一般有1個就夠了,一個分片一般可以處理30G數據,也可以根據數據大小進行分片數量調整

{
   "template" : "*",
   "order" : 0,
    "settings" : {
      "index" : {
        "number_of_shards" : "3",
        "number_of_replicas" : "1"
      }
    }
}

  啟動logstash:cd /usr/local/logstash/bin/ && ./logstash -f logstash_template.conf

9.安裝filebeat:

  解壓

  修改配置文件:

egrep -Ev '#|^$' /usr/local/filebeat/filebeat.yml
filebeat.prospectors:
- enabled: true
  paths:
    - /data/dddjs_*_s*a/server/build_result/app/log/*
  fields:
    project: dddjs
    type: game
    host_ip: 10.21.210.170

output.kafka:
  enabled: true
  hosts: ["kafka.com:9092"]
  topic: '%{[fields][project]}'
  #username: "bbh"
  #password: "xxxxx"
  #ssl.certificate_authorities: ["/etc/filebeat/ssl/ca.crt"]
  #ssl.certificate: "/etc/filebeat/ssl/client.crt"
  #ssl.key: "/etc/filebeat/ssl/client.key"

  啟動filebeat:cd /usr/local/logstash/bin/ && ./filebeat -e -f filebeat.yml

  【注意:若啟動后無法連接kafka,請修改/etc/hosts,綁定kafka主機名和ip】

10.破解x-pack

  elk系統很多功能需要x-pack的支持,但官方要收費。

  elasticsearch 啟動前,記得添加這兩個參數vim /usr/local/elasticsearch/config/elasticsearch.yml,導入license時需要開啟安全認證

xpack.security.enabled: true
xpack.security.transport.ssl.enabled: true

  編譯java文件方法參考此鏈接:https://www.cnblogs.com/bigben0123/p/10305204.html

  修改jar包:此處6.7.2版本已修改好,點擊鏈接下載即可:

    版本6.7.2:https://files.cnblogs.com/files/chenjw-note/x-pack-core-6.7.2.zip

    版本6.8.0:https://files.cnblogs.com/files/chenjw-note/x-pack-core-6.8.0.zip

  替換jar包:【注意:當elasticsearch是集群時,需要將集群個節點先替換修改后的jar包,才可以成功導入license,在主節點導一次即可,會同步到其他節點】

cp -r /usr/local/elasticsearch/modules/x-pack-core/x-pack-core-6.7.2.jar /usr/local/elasticsearch/modules/x-pack-core/x-pack-core-6.7.2.jar.bak 
cp x-pack-core/x-pack-core-6.7.2.jar /usr/local/elasticsearch/modules/x-pack-core/x-pack-core-6.7.2.jar /usr/local/elasticsearch/modules/x-pack-core/

  重啟elasticsearch

  申請license:https://license.elastic.co/registration

  修改license.json,修改 type,expiry_date_in_millis,max_nodes 即可

    "license":{
        "uid":"719615b9-5188-4db4-a0f9-29863d669b4c",
        "type":"platinum",  #白金會員
        "issue_date_in_millis":1558310400000,
        "expiry_date_in_millis":252457920099,  #到期時間戳,單位為毫秒ms
        "max_nodes":1000,
        "issued_to":"xxxxxxxxxx",
        "issuer":"Web Form",
        "signature":"AAAAAxxxxxxxxxxxxxxxxxxxxx",
        "start_date_in_millis":1558310400000
    }
}

  導入license:

curl -XPUT -u elastic 'http://es_ip:9200/_xpack/license' -H "Content-Type: application/json" -d @license.json
輸入密碼
輸出:{"acknowledged":true,"license_status":"valid"} 則成功

  驗證許可時間:

curl -XGET -u elastic es_ip:9200/_license

  

11.啟動各組件間的ssl認證

   (1) 生成密鑰證書

    cat /usr/local/kafka/ssl/fd.ext

subjectAltName = DNS:*.kafka.xxx.com, DNS:server.kafka.xxx.com  #ssl證書對應匹配的域名格式,通過添加DNS來增加多個域名匹配

    cat /usr/local/kafka/ssl/ssl.sh

#!/bin/bash
BASE_DIR=.  # 保存路徑
DAYS_VALID=7200  # 證書有效期
PASSWORD=xxxxx  # 證書密碼
NAME=server.kafka.xxx.com # 域名
DEPT=yunwei  # 部門名
COMPANY=xxx  # 公司名
CITY=gz  # 城市
PROVINCE=gd  # 省份
COUNTRY=CN  # 國家

CERT_DIR="$BASE_DIR/ca"
SERVER_DIR="$BASE_DIR/secrets"
CLIENT_DIR="$BASE_DIR/client"
CA_CERT_NAME="$CLIENT_DIR/ca.crt"
CA_KEY_NAME="$CERT_DIR/ca.key"
PWD_NAME="$SERVER_DIR/password"
SERVER_KEYSTORE="$SERVER_DIR/server.keystore.jks"
SERVER_TRUSTSTORE="$SERVER_DIR/server.truststore.jks"
SERVER_CSR="$CERT_DIR/server.csr"
SERVER_CERT="$CERT_DIR/server.crt"
CLIENT_KEY="$CLIENT_DIR/client.key"
CLIENT_CSR="$CERT_DIR/client.csr"
CLIENT_CERT="$CLIENT_DIR/client.crt"
SUBJ="/C=$COUNTRY/ST=$PROVINCE/L=$CITY/O=$COMPANY/OU=$DEPT/CN=$NAME/CN=$NAME"
DNAME="CN=$NAME, OU=$DEPT, O=$COMPANY, L=$CITY, ST=$PROVINCE, C=$COUNTRY"

mkdir -p $CERT_DIR
mkdir -p $SERVER_DIR
mkdir -p $CLIENT_DIR
rm $CERT_DIR/*
rm $SERVER_DIR/*
rm $CLIENT_DIR/*

echo "1. Generate CA certificate and key..."
openssl req -new -x509 -keyout $CA_KEY_NAME -out $CA_CERT_NAME -days $DAYS_VALID \
    -passin pass:"$PASSWORD" -passout pass:"$PASSWORD" -subj "$SUBJ"

echo ""
echo "2. Generate server key store..."
keytool -genkey -keyalg RSA -keystore $SERVER_KEYSTORE -alias $NAME \
    -keysize 2048 -validity $DAYS_VALID -storepass $PASSWORD -keypass $PASSWORD \
    -dname "$DNAME" -ext SAN=DNS:$NAME

echo ""
echo "3. Export server certificate signing request..."
keytool -certreq -keystore $SERVER_KEYSTORE -alias $NAME \
    -file $SERVER_CSR  -storepass $PASSWORD -keypass $PASSWORD -noprompt

echo ""
echo "4. Sign server certificate by CA..."
openssl x509 -req -CAcreateserial -CA $CA_CERT_NAME -CAkey $CA_KEY_NAME \
    -in $SERVER_CSR -out $SERVER_CERT -days $DAYS_VALID -passin pass:$PASSWORD -extfile fd.ext

echo ""
echo "5. Import CA to server key store..."
keytool -import -keystore $SERVER_KEYSTORE -alias CARoot -file $CA_CERT_NAME \
    -storepass $PASSWORD -keypass $PASSWORD -noprompt

echo ""
echo "6. Import server certificate to server key store..."
keytool -import -keystore $SERVER_KEYSTORE -alias $NAME -file $SERVER_CERT \
    -storepass $PASSWORD -keypass $PASSWORD -noprompt

echo ""
echo "7. Import CA to server trust store..."
keytool -import -keystore $SERVER_TRUSTSTORE -alias CARoot -file $CA_CERT_NAME \
    -storepass $PASSWORD -keypass $PASSWORD -noprompt

echo ""
echo "8. Generate client key and certificate request..."
openssl req -nodes -new -keyout $CLIENT_KEY -out $CLIENT_CSR -days $DAYS_VALID \
    -subj "$SUBJ"

echo ""
echo "9. Sign client certificate by CA..."
openssl x509 -req -CAcreateserial -CA $CA_CERT_NAME -CAkey $CA_KEY_NAME \
    -in $CLIENT_CSR -out $CLIENT_CERT -days $DAYS_VALID -passin pass:$PASSWORD

echo ""
echo "10. Generate password file..."
echo "$PASSWORD" > $PWD_NAME
rm .srl

echo ""
echo "####### Done. #######"
echo "Following files were generated"
echo "Server password file: $PWD_NAME"
echo "Server java keystore: $SERVER_KEYSTORE"
echo "Server java truststore: $SERVER_TRUSTSTORE"
echo "Signed Client cert: $CLIENT_CERT"
echo "Client RSA private key: $CLIENT_KEY"
echo "Client PEM truststore: $CA_CERT_NAME"

    執行:cd /usr/local/kafka/ssl/ && ./ssl.sh

  (2) 把ssl證書拷貝到各組件的機器上

  (3) 修改各組件配置文件

    vim /usr/local/kafka/config/server.properties

host.name=server.kafka.xxx.com
listeners=PLAINTEXT://:9091,SSL://:9092
ssl.keystore.location=/usr/local/kafka/ssl/secrets/server.keystore.jks
ssl.keystore.password=xxxx
ssl.key.password=xxxx
ssl.truststore.location=/usr/local/kafka/ssl/secrets/server.truststore.jks
ssl.truststore.password=xxxx

    重啟kafka

    vim /usr/local/logstash/bin/logstash_template.conf

        security_protocol => "SSL"
        ssl_keystore_password => "xxxx"
        ssl_keystore_location => "/usr/local/kafka/ssl/secrets/server.keystore.jks"
        ssl_keystore_password => "xxxx"
        ssl_truststore_password => "xxxx"
        ssl_truststore_location => "/usr/local/kafka/ssl/secrets/server.truststore.jks"

    重啟logstash

    vim /usr/local/filebeat/filebeat.yml

#輸出到kafka
output.kafka:
   enable: true
   hosts: ["server.kafka.xxx.com:9092"]
   topic: 'test'
   ssl.certificate_authorities: ["/usr/local/filebeat/ssl/ca.crt"]
   ssl.certificate: "/usr/local/filebeat/ssl/client.crt"
   ssl.key: "/usr/local/filebeat/ssl/client.key"

    重啟filebeat

  (4) 注意:記得進行域名解析或者綁定主機hosts

 

至此,elk平台詳解已完成 ,真是一個漫長的過程,在此總結,希望給正在研究苦惱的朋友有所幫助,同時也給自己留下筆記,方便日后查閱。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM