前言
Elastic Stack 提供 Beats 和 Logstash 套件來采集任何來源、任何格式的數據。其實Beats 和 Logstash的功能差不多,都能夠與 Elasticsearch 產生協同作用,而且
logstash比filebeat功能更強大一點,2個都使用是因為:Beats 是一個輕量級的采集器,支持從邊緣機器向 Logstash 和 Elasticsearch 發送數據。考慮到 Logstash 占用系
統資源較多,我們采用 Filebeat 來作為我們的日志采集器。並且這里采用kafka作為傳輸方式是為了避免堵塞和丟失,以實現日志的實時更新。
介紹
1.Filebeat:filebat
是一個用於轉發和集中日志數據的輕量級shipper
。作為代理安裝在服務器上,filebeat
監視指定的日志文件或位置,收集日志事件,並將它們轉發給ElasticSearch
或logstash
進行索引。
2.Logstash:Logstash 是開源的服務器端數據處理管道,能夠同時從多個來源采集數據,轉換數據,然后將數據發送到存儲庫。
3.ElasticSearch:Elasticsearch 是基於 JSON 的分布式搜索和分析引擎,專為實現水平擴展、高可靠性和管理便捷性而設計。
4.Kibana:Kibana 能夠以圖表的形式呈現數據,並且具有可擴展的用戶界面,供您全方位配置和管理 Elastic Stack。
Elastic Stack架構
軟件環境
Ubuntu 18.04.1
Elastic Stack官網:https://www.elastic.co/
kafka、Zookeeper安裝
kafka是一種高吞吐量的分布式發布訂閱消息系統,它可以處理消費者規模的網站中的所有動作流數據。
ZooKeeper是一個分布式的,開放源碼的分布式應用程序協調服務,是Google的Chubby一個開源的實現,是Hadoop和Hbase的重要組件。它是一個為分布式應用
提供一致性服務的軟件,提供的功能包括:配置維護、域名服務、分布式同步、組服務等。
注:安裝kafka時會發現它需要安裝Zookeeper的。kafka的官方文檔有說明。zookeeper是為了解決分布式一致性問題的工具,kafka使用ZooKeeper用於管理、協調代理。當Kafka系統中新增了代理或某個代理失效時,Zookeeper服務將通知生產者和消費者。生產者與消費者據此開始與其他代理協調工作。
Zookeeper單機安裝配置:
Step 1:下載壓縮包並解壓
1 >wget http://mirrors.cnnic.cn/apache/zookeeper/zookeeper-3.4.13/zookeeper-3.4.13.tar.gz 2 >tar -zxvf zookeeper-3.4.13.tar.gz 3 >cd zookeeper-3.4.13
Step 2:修改配置文件
先復制模板配置文件,並重命名,然后里面存放數據的路徑dataDir可以自己定義
1 >cp -rf conf/zoo_sample.cfg conf/zoo.cfg 2 >vim zoo.cfg
Step 3:啟動服務
1 >./bin/zkServer.sh start 2 3 >./bin/zkServer.sh status
先啟動,系統會默認加載zoo.cfg配置,然后使用 zkServer.sh status 查看服務狀態
kafka安裝配置:
根據官網教程開始安裝(復制的官網教程):
Step 1: Download the code
1 > wget http://mirror.apache-kr.org/kafka/2.1.0/kafka_2.11-2.1.0.tgz 2 > tar -xzf kafka_2.11-2.1.0.tgz 3 > cd kafka_2.11-2.1.0/
Step 2: Start the server
Kafka uses ZooKeeper so you need to first start a ZooKeeper server if you don't already have one. You can use the convenience script packaged with kafka to get a quick-and-dirty single-node ZooKeeper instance.(由於我自己安裝的Zookeeper,所以這一步可以省略)
1 > ./bin/zookeeper-server-start.sh config/zookeeper.properties 2 [2019-01-23 15:01:37,495] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig) 3 ...
Now start the Kafka server:
首先可以先修改一下配置文件server.propertise
broker.id=0 #當前機器在集群中的唯一標識,和zookeeper的myid性質一樣 port=9092 #當前kafka對外提供服務的端口默認是9092 num.network.threads=3 #這個是borker進行網絡處理的線程數 num.io.threads=8 #這個是borker進行I/O處理的線程數 log.dirs=/opt/kafka/kafkalogs/ #消息存放的目錄,這個目錄可以配置為“,”逗號分割的表達式,上面的num.io.threads要大於這個目錄的個數這個目錄,如果配置多個目錄,新創建的topic他把消息持久化的地方是,當前以逗號分割的目錄中,那個分區數最少就放那一個 socket.send.buffer.bytes=102400 #發送緩沖區buffer大小,數據不是一下子就發送的,先回存儲到緩沖區了到達一定的大小后在發送,能提高性能 socket.receive.buffer.bytes=102400 #kafka接收緩沖區大小,當數據到達一定大小后在序列化到磁盤 socket.request.max.bytes=104857600 #這個參數是向kafka請求消息或者向kafka發送消息的請請求的最大數,這個值不能超過java的堆棧大小 num.partitions=1 #默認的分區數,一個topic默認1個分區數 log.retention.hours=168 #默認消息的最大持久化時間,168小時,7天 message.max.byte=5242880 #消息保存的最大值5M default.replication.factor=2 #kafka保存消息的副本數,如果一個副本失效了,另一個還可以繼續提供服務 replica.fetch.max.bytes=5242880 #取消息的最大直接數 log.segment.bytes=1073741824 #這個參數是:因為kafka的消息是以追加的形式落地到文件,當超過這個值的時候,kafka會新起一個文件 log.retention.check.interval.ms=300000 #每隔300000毫秒去檢查上面配置的log失效時間(log.retention.hours=168 ),到目錄查看是否有過期的消息如果有,刪除 log.cleaner.enable=false #是否啟用log壓縮,一般不用啟用,啟用的話可以提高性能 zookeeper.connect=localhost:12181#設置zookeeper的連接端口
1 > ./bin/kafka-server-start.sh config/server.properties 2 [2019-01-23 15:01:47,028] INFO Verifying properties (kafka.utils.VerifiableProperties) 3 [2019-01-23 15:01:47,051] INFO Property socket.send.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties) 4 ...
后台啟動的命令(藍色部分為控制台輸出地址):
>nohup bin/kafka-server-start.sh config/server.properties 1>/home/ec2-user/tools/kafka_2.11-2.1.0/logs/kafka.log 2>&1 &
到這里服務就啟動成功了。
查看java進程pid也可以發現kafka
Step 3: Create a topic
Let's create a topic named "test" with a single partition and only one replica:
1 > ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
if you want delete a topic:
1 >./bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test
We can now see that topic if we run the list topic command:
1 > ./bin/kafka-topics.sh --list --zookeeper localhost:2181 2 test
Alternatively, instead of manually creating topics you can also configure your brokers to auto-create topics when a non-existent topic is published to.
在上一次配置的日子目錄(server.properties里配置的log.dirs)上可以看到,已經存在了該主題的日志信息文件了,由於server.properties里面配置的num.partitions=1(默認的分區數,一個topic默認1個分區數)。所以只有一個文件夾
要查看topic詳細信息的話:
1 >bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
Step 4: Star a producer(創建發布者)
1 > ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
Step 5: Start a consumer(創建訂閱者)
1 > ./bin/kafka-console-consumer.sh --bootstrap-server localhost:2181 --topic test --from-beginning
創建好后,可以在發布者端發送消息進行測試,我們這里的發布者是filebeat,訂閱者是logstash,可以不用配置。
Filebeat安裝配置
beat是一個輕量級的數據傳輸者,它有好幾種分類,這里暫時只用到Filebeat。
Filebeat的處理流程基本分為3部分:Input、Filter、Output。
開始安裝
1.官網下載 :filebeat-6.5.4-linux-x86_64.tar.gz,復制到LINUX服務器,然后解壓( tar -xzvf filebeat-6.5.4-linux-x86_64.tar.gz)
2.進入filebeat-6.5.4-linux-x86_64文件夾,修改filebeat.yml,filebeat默認是將數據傳輸到elasticsearch,需要修改為kafka
這里放一個filebeat詳細配置參考:https://www.cnblogs.com/kuku0223/p/8316922.html
配置完成后啟動filebeat:
1 > sudo ./filebeat -e -c filebeat.yml
ELK(elasticsearch+logstash+kibana 6.5.4)配置
elasticsearch安裝配置
Elasticsearch 是一個分布式、RESTful 風格的搜索和數據分析引擎,能夠解決不斷涌現出的各種用例。
順帶提一下Elasticsearch的數據結構:FST(Finite StateTransducers),其特點高度重復利用數據前綴和后綴存儲數據,能很大層度上壓縮數據。
elasticsearch安裝配置:
從官網上下載軟件:https://www.elastic.co/cn/downloads/elasticsearch
解壓之后,修改config/elasticsearch.yml配置文件,配置一下ip和端口號:
1 skyworth@skyworth-ubuntu:~$ cd /home/tools/elk/elasticsearch-6.5.4/ 2 skyworth@skyworth-ubuntu:/home/tools/elk/elasticsearch-6.5.4$ vim config/elasticsearch.yml
然后啟動命令:
1 skyworth@skyworth-ubuntu:/home/tools/elk/elasticsearch-6.5.4$ ./bin/elasticsearch
后台運行命令:
>./bin/elasticsearch -d
然后瀏覽器打開端口(http://192.168.80.15:9200/),出現如下信息說明啟動成功
elasticsearch啟動常見錯誤參考:https://www.cnblogs.com/zhi-leaf/p/8484337.html
elastucsearch安全框架(Search Guard ,x-pack收費)
官方參考文檔:https://docs.search-guard.com/latest/demo-installer.html#install-search-guard-on-elasticsearch
配置參考:http://xiaoqiangge.com/aritcle/1536058241842.html
Install Search Guard on Elasticsearch:
>bin/elasticsearch-plugin install -b com.floragunn:search-guard-6:6.5.4-24.0
接着,配置ssl,由於ElasticSearch節點之間通訊默值非加密,造成數據不安全,Search Guard強制ElasticSearch節點之間通訊為加密方式。
可以在 https://search-guard.com/tls-certificate-generator 上申請密匙。
Email用來接收密鑰,
Organization Name可以隨便填寫,
Hostname填寫ElasticSearch集群中每個節點的node name,這是一一對應的,這里我只有一個節點,node name為node-1。
填寫好之后,郵箱會收到一個壓縮包,將這個壓縮包解壓到Elastucsearch的config目錄下,並在elasticsearch.yml追加配置,如下:
http.compression: true xpack.security.enabled: false searchguard.ssl.transport.pemcert_filepath: CN=node-1.crtfull.pem searchguard.ssl.transport.pemkey_filepath: CN=node-1.key.pem searchguard.ssl.transport.pemkey_password: 3816b97cbd40d1a97402 searchguard.ssl.transport.pemtrustedcas_filepath: chain-ca.pem searchguard.ssl.transport.enforce_hostname_verification: false searchguard.ssl.http.enabled: true searchguard.ssl.http.pemcert_filepath: CN=node-1.crtfull.pem searchguard.ssl.http.pemkey_filepath: CN=node-1.key.pem searchguard.ssl.http.pemkey_password: 3816b97cbd40d1a97402 searchguard.ssl.http.pemtrustedcas_filepath: chain-ca.pem searchguard.authcz.admin_dn: - CN=sgadmin searchguard.audit.type: internal_elasticsearch searchguard.enable_snapshot_restore_privilege: true searchguard.check_snapshot_restore_write_privileges: true searchguard.restapi.roles_enabled: ["sg_all_access"] cluster.routing.allocation.disk.threshold_enabled: false
注:有關配置屬性以及pemkey_password屬性可以在下載的密鑰包中README.txt文件中查看到,如下:
## Passwords ### Common passwords Root CA password: 9c5eeb48cee19d1ae6ce0315154f7271ff6f8b8c Truststore password: 1d6df567841c5e8a7663 Admin keystore and private key password: 83f5fdded6c38208a9ad Demouser keystore and private key password: e598a7e1684295923d36 ## Host/Node specific passwords Host: node-1 node-1 keystore and private key password: 3816b97cbd40d1a97402 node-1 keystore: node-certificates/CN=node-1-keystore.jks node-1 PEM certificate: node-certificates/CN=node-1.crtfull.pem node-1 PEM private key: node-certificates/CN=node-1.key.pem
這時還無法啟動ES,需要對節點密鑰文件進行授權,權限打小為0644(rw-r--r--),否則會出現對密鑰文件沒有讀取權限,如下:
>chmod 644 root-ca.pem chain-ca.pem node-certificates/CN\=node-1.key.pem node-certificates/CN\=node-1.crtfull.pem /home/ec2-user/tools/elasticsearch-6.5.4/config/
把節點授權的4個文件復制到ES的config文件夾下
cp root-ca.pem chain-ca.pem node-certificates/CN\=node-1.key.pem node-certificates/CN\=node-1.crtfull.pem /home/ec2-user/tools/elasticsearch-6.5.4/config/
接着對sgadmin客戶端密鑰進行授權:
>chmod 644 root-ca.pem chain-ca.pem client-certificates/CN\=sgadmin.key.pem client-certificates/CN\=sgadmin.crtfull.pem
然后把客戶端密匙的4個文件復制到插件的tools目錄下
cp root-ca.pem chain-ca.pem client-certificates/CN\=sgadmin.key.pem client-certificates/CN\=sgadmin.crtfull.pem /home/ec2-user/tools/elasticsearch-6.5.4/plugins/search-guard-6/tools
注:這里都沒有去使用root用戶權限,自己創建用戶
然后再啟動ES,分別檢查狀態和日志是否存在錯誤,如果沒有存在錯誤開始執行最后一步,初始化sgadmin配置,如下:
進入目錄 /home/ec2-user/tools/elasticsearch-6.5.4/plugins/search-guard-6/tools 對sgadmin.sh進行授權 chmod 744 sgadmin.sh 執行初始化語句 sh sgadmin.sh -cd ../sgconfig/ -icl -nhnv -cacert root-ca.pem -cert CN=sgadmin.crtfull.pem -key CN=sgadmin.key.pem -h localhost -keypass 83f5fdded6c38208a9ad
執行完之后會出現如下效果:
最后在瀏覽器書如https://admin:admin@localhost:9200可以查看結果:
kibana安裝配置
通過 Kibana,能夠對 Elasticsearch 中的數據進行可視化並在 Elastic Stack 進行操作.
kibana安裝配置:
首先下載軟件,然后解壓,修改配置文件config/kibana.yml,再啟動就行,與elasticsearch類似。
1 skyworth@skyworth-ubuntu:/home/tools/elk/kibana-6.5.4-linux-x86_64$ vim config/kibana.yml
1 skyworth@skyworth-ubuntu:/home/tools/elk/kibana-6.5.4-linux-x86_64$ ./bin/kibana
kibana后台啟動
>nohup bin/kibana 1>/home/ec2-user/tools/kibana-6.5.4-linux-x86_64/logs/kibana.log 2>&1 &
Install Search Guard on Kibana
這里只需要修改一下 kibana.yml,增加如下配置就好:
# Use HTTPS instead of HTTP elasticsearch.url: "https://localhost:9200" # Configure the Kibana internal server user elasticsearch.username: "kibanaserver" elasticsearch.password: "kibanaserver" # Disable SSL verification because we use self-signed demo certificates elasticsearch.ssl.verificationMode: none # Whitelist the Search Guard Multi Tenancy Header elasticsearch.requestHeadersWhitelist: [ "Authorization", "sgtenant" ]
logstash安裝配置
filebeat中message要么是一段字符串,要么在日志生成的時候拼接成json然后在filebeat中指定為json。但是大部分系統日志無法去修改日志格式,filebeat則無法通過正則去匹配出對應的field,這時需要結合logstash的grok來過濾。
Logstash 是開源的服務器端數據處理管道,能夠同時從多個來源采集數據,轉換數據,然后將數據發送到存儲庫中。
安裝過程依然是下載壓縮包,解壓后修改配置文件,再啟動。
進入config文件夾,我們可以修改原有的logstash-sample.conf,或者自己新建一個(這里選擇自己新建一個logstash-skyworth.conf)。
修改配置文件:
input{ kafka{ bootstrap_servers => ["192.168.80.15:9092"] client_id => "test" group_id => "test" auto_offset_reset => "latest" //從最新的偏移量開始消費 consumer_threads => 5 decorate_events => true //此屬性會將當前topic、offset、group、partition等信息也帶到message中 topics => ["test"] type => "skyworth" } } filter { if ([message]== "") { drop {} } } output { if [type] == "skyworth"{ elasticsearch { hosts => ["192.168.80.15:9200"] # ElasticSearch的地址加端口 index => "skyworth-log-%{+YYYYMMdd}" # ElasticSearch的保存文檔的index名稱, } } }
啟動命令:
1 >./bin/logstash -f config/logstash-skyworth.conf
注:elastic stack 安全框架X-Pack 安裝配置請參考:https://www.cnblogs.com/cjsblog/p/9501858.html
總結
業務流程偷了張圖如下:
注:有的人會在filebeat和kafka之間再加一層前置的logstash用於格式化日志,目前我也沒明白為什么要那樣做。