ELK+kafka構建日志收集系統
背景:
最近線上上了ELK,但是只用了一台Redis在中間作為消息隊列,以減輕前端es集群的壓力,Redis的集群解決方案暫時沒有接觸過,並且Redis作為消息隊列並不是它的強項;所以最近將Redis換成了專業的消息信息發布訂閱系統Kafka, Kafka的更多介紹大家可以看這里: 傳送門 ,關於ELK的知識網上有很多的哦, 此篇博客主要是總結一下目前線上這個平台的實施步驟,ELK是怎么跟Kafka結合起來的。好吧,動手!
ELK架構拓撲:
然而我這里的整個日志收集平台就是這樣的拓撲:

1,使用一台Nginx代理訪問kibana的請求;
2,兩台es組成es集群,並且在兩台es上面都安裝kibana;( 以下對elasticsearch簡稱es )
3,中間三台服務器就是我的kafka(zookeeper)集群啦; 上面寫的 消費者/生產者 這是kafka(zookeeper)中的概念;
4,最后面的就是一大堆的生產服務器啦,上面使用的是logstash,當然除了logstash也可以使用其他的工具來收集你的應用程序的日志,例如:Flume,Scribe,Rsyslog,Scripts……
角色:
軟件選用:
elasticsearch - 1.7.3.tar.gz #這里需要說明一下,前幾天使用了最新的elasticsearch2.0,java-1.8.0報錯,目前未找到原因,故這里使用1.7.3版本 Logstash - 2.0.0.tar.gz kibana - 4.1.2 - linux - x64 . tar . gz 以上軟件都可以從官網下載 : https : //www.elastic.co/downloads java - 1.8.0 , nginx 采用 yum 安裝 |
部署步驟:
1.ES集群安裝配置;
2.Logstash客戶端配置(直接寫入數據到ES集群,寫入系統messages日志);
3.Kafka(zookeeper)集群配置;(Logstash寫入數據到Kafka消息系統);
4.Kibana部署;
5.Nginx負載均衡Kibana請求;
6.案例:nginx日志收集以及MySQL慢日志收集;
7.Kibana報表基本使用;
ES集群安裝配置;
es1.example.com:
1.安裝java-1.8.0以及依賴包
yum install - y epel - release yum install - y java - 1.8.0 git wget lrzsz |
2.獲取es軟件包
wget https : //download.elastic.co/elasticsearch/elasticsearch/elasticsearch-1.7.3.tar.gz tar - xf elasticsearch - 1.7.3.tar.gz - C / usr / local ln - sv / usr / local / elasticsearch - 1.7.3 / usr / local / elasticsearch |
3.修改配置文件
[ root @ es1 ~ ] # vim /usr/local/elasticsearch/config/elasticsearch.yml 32 cluster . name : es - cluster #組播的名稱地址 40 node . name : "es-node1 " #節點名稱,不能和其他節點重復 47 node . master : true #節點能否被選舉為master 51 node . data : true #節點是否存儲數據 107 index . number_of_shards : 5 #索引分片的個數 111 index . number_of_replicas : 1 #分片的副本個數 145 path . conf : / usr / local / elasticsearch / config / #配置文件的路徑 149 path . data : / data / es / data #數據目錄路徑 159 path . work : / data / es / worker #工作目錄路徑 163 path . logs : / usr / local / elasticsearch / logs / #日志文件路徑 167 path . plugins : / data / es / plugins #插件路徑 184 bootstrap . mlockall : true #內存不向swap交換 232 http . enabled : true #啟用http |
4.創建相關目錄
mkdir / data / es / { data , worker , plugins } - p |
5.獲取es服務管理腳本
[ root @ es1 ~ ] # git clone https://github.com/elastic/elasticsearch-servicewrapper.git [ root @ es1 ~ ] # mv elasticsearch-servicewrapper/service /usr/local/elasticsearch/bin/ [ root @ es1 ~ ] # /usr/local/elasticsearch/bin/service/elasticsearch install Detected RHEL or Fedora : Installing the Elasticsearch daemon . . [ root @ es1 ~ ] # #這時就會在/etc/init.d/目錄下安裝上es的管理腳本啦 #修改其配置: [ root @ es1 ~ ] # set . default . ES_HOME = / usr / local / elasticsearch #安裝路徑 set . default . ES_HEAP_SIZE = 1024 #jvm內存大小,根據實際環境調整即可 |
6.啟動es ,並檢查其服務是否正常
[ root @ es1 ~ ] # netstat -nlpt | grep -E "9200|"9300 tcp 0 0 0.0.0.0 : 9200 0.0.0.0 : * LISTEN 1684/ java tcp 0 0 0.0.0.0 : 9300 0.0.0.0 : * LISTEN 1684/ java |
訪問http://192.168.2.18:9200/ 如果出現以下提示信息說明安裝配置完成啦,
7.es1節點好啦,我們直接把目錄復制到es2
[ root @ es1 local ] # scp -r elasticsearch-1.7.3 192.168.12.19:/usr/local/ [ root @ es2 local ] # ln -sv elasticsearch-1.7.3 elasticsearch [ root @ es2 local ] # elasticsearch/bin/service/elasticsearch install #es2只需要修改node.name即可,其他都與es1相同配置 |
8.安裝es的管理插件
es官方提供一個用於管理es的插件,可清晰直觀看到es集群的狀態,以及對集群的操作管理,安裝方法如下:
[ root @ es1 local ] # /usr/local/elasticsearch/bin/plugin -i mobz/elasticsearch-head |
安裝好之后,訪問方式為: http://192.168.2.18:9200/_plugin/head,由於集群中現在暫時沒有數據,所以顯示為空,
此時,es集群的部署完成。
Logstash客戶端安裝配置;
在webserve1上面安裝Logstassh
1.downloads 軟件包 ,這里注意,Logstash是需要依賴java環境的,所以這里還是需要yum install -y java-1.8.0.
[ root @ webserver1 ~ ] # wget https://download.elastic.co/logstash/logstash/logstash-2.0.0.tar.gz [ root @ webserver1 ~ ] # tar -xf logstash-2.0.0.tar.gz -C /usr/local [ root @ webserver1 ~ ] # cd /usr/local/ [ root @ webserver1 local ] # ln -sv logstash-2.0.0 logstash [ root @ webserver1 local ] # mkdir logs etc |
2.提供logstash管理腳本,其中里面的配置路徑可根據實際情況修改
#!/bin/bash #chkconfig: 2345 55 24 #description: logstash service manager #auto: Maoqiu Guo FILE = '/usr/local/logstash/etc/*.conf' #logstash配置文件 LOGBIN = '/usr/local/logstash/bin/logstash agent --verbose --config' #指定logstash配置文件的命令 LOCK = '/usr/local/logstash/locks' #用鎖文件配合服務啟動與關閉 LOGLOG = '--log /usr/local/logstash/logs/stdou.log' #日志 START ( ) { if [ - f $ LOCK ] ; then echo - e "Logstash is already \033[32mrunning\033[0m, do nothing." else echo - e "Start logstash service.\033[32mdone\033[m" nohup $ { LOGBIN } $ { FILE } $ { LOGLOG } & touch $ LOCK fi } STOP ( ) { if [ ! - f $ LOCK ] ; then echo - e "Logstash is already stop, do nothing." else echo - e "Stop logstash serivce \033[32mdone\033[m" rm - rf $ LOCK ps - ef | grep logstash | grep - v "grep" | awk '{print $2}' | xargs kill - s 9 > / dev / null fi } STATUS ( ) { ps aux | grep logstash | grep - v "grep" > / dev / null if [ - f $ LOCK ] && [ $ ? - eq 0 ] ; then echo - e "Logstash is: \033[32mrunning\033[0m..." else echo - e "Logstash is: \033[31mstopped\033[0m..." fi } TEST ( ) { $ { LOGBIN } $ { FILE } -- configtest } case "$1" in start ) START ; ; stop ) STOP ; ; status ) STATUS ; ; restart ) STOP sleep 2 START ; ; test ) TEST ; ; * ) echo "Usage: /etc/init.d/logstash (test|start|stop|status|restart)" ; ; esac |
3.Logstash 向es集群寫數據
(1)編寫一個logstash配置文件
[ root @ webserver1 etc ] # cat logstash.conf input { #數據的輸入從標准輸入 stdin { } } output { #數據的輸出我們指向了es集群 elasticsearch { hosts = > [ "192.168.2.18:9200" , "192.168.2.19:9200" ] # es 主機的 ip 及端口 } } [ root @ webserver1 etc ] # |
(2)檢查配置文件是否有語法錯
[ root @ webserver1 etc ] # /usr/local/logstash/bin/logstash -f logstash.conf --configtest --verbose Configuration OK [ root @ webserver1 etc ] # |
(3)既然配置ok我們手動啟動它,然后寫點東西看能否寫到es
ok.上圖已經看到logstash已經可以正常的工作啦.
4.下面演示一下如何收集系統日志
將之前的配置文件修改如下所示內容,然后啟動logstash服務就可以在web頁面中看到messages的日志寫入es,並且創建了一條索引
[ root @ webserver1 etc ] # cat logstash.conf input { #這里的輸入使用的文件,即日志文件messsages file { path = > "/var/log/messages" #這是日志文件的絕對路徑 start_position = > "beginning" #這個表示從 messages 的第一行讀取,即文件開始處 } } output { #輸出到 es elasticsearch { hosts = > [ "192.168.2.18:9200" , "192.168.2.19:9200" ] index = > "system-messages-%{+YYYY-MM}" #這里將按照這個索引格式來創建索引 } } [ root @ webserver1 etc ] # |
啟動logstash后,我們來看head這個插件的web頁面
ok,系統日志我們已經成功的收集,並且已經寫入到es集群中,那上面的演示是logstash直接將日志寫入到es集群中的,這種場合我覺得如果量不是很大的話直接像上面已將將輸出output定義到es集群即可,如果量大的話需要加上消息隊列來緩解es集群的壓力。前面已經提到了我這邊之前使用的是單台redis作為消息隊列,但是redis不能作為list類型的集群,也就是redis單點的問題沒法解決,所以這里我選用了kafka ;下面就在三台server上面安裝kafka集群
Kafka集群安裝配置;
在搭建kafka集群時,需要提前安裝zookeeper集群,當然kafka已經自帶zookeeper程序只需要解壓並且安裝配置就行了
kafka1上面的配置:
1.獲取軟件包.官網: http://kafka.apache.org
[ root @ kafka1 ~ ] # wget http://mirror.rise.ph/apache/kafka/0.8.2.1/kafka_2.11-0.8.2.1.tgz [ root @ kafka1 ~ ] # tar -xf kafka_2.11-0.8.2.1.tgz -C /usr/local/ [ root @ kafka1 ~ ] # cd /usr/local/ [ root @ kafka1 local ] # ln -sv kafka_2.11-0.8.2.1 kafka |
2.配置zookeeper集群,修改配置文件
[ root @ kafka1 ~ ] # vim /usr/local/kafka/config/zookeeper.propertie dataDir = / data / zookeeper clienrtPort = 2181 tickTime = 2000 initLimit = 20 syncLimit = 10 server . 2 = 192.168.2.22 : 2888 : 3888 server . 3 = 192.168.2.23 : 2888 : 3888 server . 4 = 192.168.2.24 : 2888 : 3888 #說明: tickTime : 這個時間是作為 Zookeeper 服務器之間或客戶端與服務器之間維持心跳的時間間隔,也就是每個 tickTime 時間就會發送一個心跳。 2888 端口:表示的是這個服務器與集群中的 Leader 服務器交換信息的端口; 3888 端口:表示的是萬一集群中的 Leader 服務器掛了,需要一個端口來重新進行選舉,選出一個新的 Leader ,而這個端口就是用來執行選舉時服務器相互通信的端口。 |
3.創建zookeeper所需要的目錄
[ root @ kafka1 ~ ] # mkdir /data/zookeeper |
4.在/data/zookeeper目錄下創建myid文件,里面的內容為數字,用於標識主機,如果這個文件沒有的話,zookeeper是沒法啟動的哦
[ root @ kafka1 ~ ] # echo 2 > /data/zookeeper/myid |
以上就是zookeeper集群的配置,下面等我配置好kafka之后直接復制到其他兩個節點即可
5.kafka配置
[ root @ kafka1 ~ ] # vim /usr/local/kafka/config/server.properties broker . id = 2 # 唯一,填數字,本文中分別為 2 / 3 / 4 prot = 9092 # 這個 broker 監聽的端口 host . name = 192.168.2.22 # 唯一,填服務器 IP log . dir = / data / kafka - logs # 該目錄可以不用提前創建,在啟動時自己會創建 zookeeper . connect = 192.168.2.22 : 2181 , 192.168.2.23 : 2181 , 192.168.2.24 :2181 #這個就是 zookeeper 的 ip 及端口 num . partitions = 16 # 需要配置較大 分片影響讀寫速度 log . dirs = / data / kafka - logs # 數據目錄也要單獨配置磁盤較大的地方 log . retention . hours = 168 # 時間按需求保留過期時間 避免磁盤滿 |
6.將kafka(zookeeper)的程序目錄全部拷貝至其他兩個節點
[ root @ kafka1 ~ ] # scp -r /usr/local/kafka 192.168.2.23:/usr/local/ [ root @ kafka1 ~ ] # scp -r /usr/local/kafka 192.168.2.24:/usr/local/ |
7.修改兩個借點的配置,注意這里除了以下兩點不同外,都是相同的配置
( 1 ) zookeeper 的配置 mkdir / data / zookeeper echo "x" > / data / zookeeper / myid ( 2 ) kafka 的配置 broker . id = 2 host . name = 192.168.2.22 |
8.修改完畢配置之后我們就可以啟動了,這里先要啟動zookeeper集群,才能啟動kafka
我們按照順序來,kafka1 –> kafka2 –>kafka3
[ root @ kafka1 ~ ] # /usr/local/kafka/bin/zookeeper-server-start.sh /usr/local/kafka/config/zookeeper.properties & #zookeeper啟動命令 [ root @ kafka1 ~ ] # /usr/local/kafka/bin/zookeeper-server-stop.sh #zookeeper停止的命令 |
注意,如果zookeeper有問題 nohup的日志文件會非常大,把磁盤占滿,這個zookeeper服務可以通過自己些服務腳本來管理服務的啟動與關閉。
后面兩台執行相同操作,在啟動過程當中會出現以下報錯信息
[ 2015 - 11 - 13 19 : 18 : 04 , 225 ] WARN Cannot open channel to 3 at election address / 192.168.2.23 : 3888 ( org . apache . zookeeper . server . quorum . QuorumCnxManager ) java . net . ConnectException : Connection refused at java . net . PlainSocketImpl . socketConnect ( Native Method ) at java . net . AbstractPlainSocketImpl . doConnect ( AbstractPlainSocketImpl . java : 350 ) at java . net . AbstractPlainSocketImpl . connectToAddress ( AbstractPlainSocketImpl . java : 206 ) at java . net . AbstractPlainSocketImpl . connect ( AbstractPlainSocketImpl . java :188 ) at java . net . SocksSocketImpl . connect ( SocksSocketImpl . java : 392 ) at java . net . Socket . connect ( Socket . java : 589 ) at org . apache . zookeeper . server . quorum . QuorumCnxManager . connectOne( QuorumCnxManager . java : 368 ) at org . apache . zookeeper . server . quorum . QuorumCnxManager . connectAll (QuorumCnxManager . java : 402 ) at org . apache . zookeeper . server . quorum . FastLeaderElection . lookForLeader( FastLeaderElection . java : 840 ) at org . apache . zookeeper . server . quorum . QuorumPeer . run ( QuorumPeer . java : 762 ) [ 2015 - 11 - 13 19 : 18 : 04 , 232 ] WARN Cannot open channel to 4 at election address / 192.168.2.24 : 3888 ( org . apache . zookeeper . server . quorum . QuorumCnxManager ) java . net . ConnectException : Connection refused at java . net . PlainSocketImpl . socketConnect ( Native Method ) at java . net . AbstractPlainSocketImpl . doConnect ( AbstractPlainSocketImpl . java : 350 ) at java . net . AbstractPlainSocketImpl . connectToAddress ( AbstractPlainSocketImpl . java : 206 ) at java . net . AbstractPlainSocketImpl . connect ( AbstractPlainSocketImpl . java :188 ) at java . net . SocksSocketImpl . connect ( SocksSocketImpl . java : 392 ) at java . net . Socket . connect ( Socket . java : 589 ) at org . apache . zookeeper . server . quorum . QuorumCnxManager . connectOne( QuorumCnxManager . java : 368 ) at org . apache . zookeeper . server . quorum . QuorumCnxManager . connectAll (QuorumCnxManager . java : 402 ) at org . apache . zookeeper . server . quorum . FastLeaderElection . lookForLeader( FastLeaderElection . java : 840 ) at org . apache . zookeeper . server . quorum . QuorumPeer . run ( QuorumPeer . java : 762 ) [ 2015 - 11 - 13 19 : 18 : 04 , 233 ] INFO Notification time out : 6400 ( org . apache. zookeeper . server . quorum . FastLeaderElection ) |
由於zookeeper集群在啟動的時候,每個結點都試圖去連接集群中的其它結點,先啟動的肯定連不上后面還沒啟動的,所以上面日志前面部分的異常是可以忽略的。通過后面部分可以看到,集群在選出一個Leader后,最后穩定了。
其他節點也可能會出現類似的情況,屬於正常。
9.zookeeper服務檢查
[ root @ kafka1 ~ ] # netstat -nlpt | grep -E "2181|2888|3888" tcp 0 0 192.168.2.24 : 3888 0.0.0.0 : * LISTEN 1959 / java tcp 0 0 0.0.0.0 : 2181 0.0.0.0 : * LISTEN 1959/ java [ root @ kafka2 ~ ] # netstat -nlpt | grep -E "2181|2888|3888" tcp 0 0 192.168.2.23 : 3888 0.0.0.0 : * LISTEN 1723 / java tcp 0 0 0.0.0.0 : 2181 0.0.0.0 : * LISTEN 1723/ java [ root @ kafka3 ~ ] # netstat -nlpt | grep -E "2181|2888|3888" tcp 0 0 192.168.2.24 : 3888 0.0.0.0 : * LISTEN 950 / java tcp 0 0 0.0.0.0 : 2181 0.0.0.0 : * LISTEN 950 /java tcp 0 0 192.168.2.24 : 2888 0.0.0.0 : * LISTEN 950 / java #可以看出,如果哪台是Leader,那么它就擁有2888這個端口 |
ok. 這時候zookeeper集群已經啟動起來了,下面啟動kafka,也是依次按照順序啟動
[ root @ kafka1 ~ ] # nohup /usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties & #kafka啟動的命令 [ root @ kafka1 ~ ] # /usr/local/kafka/bin/kafka-server-stop.sh #kafka停止的命令 |
注意,跟zookeeper服務一樣,如果kafka有問題 nohup的日志文件會非常大,把磁盤占滿,這個kafka服務同樣可以通過自己些服務腳本來管理服務的啟動與關閉。
此時三台上面的zookeeper及kafka都已經啟動完畢,來檢測以下吧
(1)建立一個主題
[ root @ kafka1 ~ ] # /usr/local/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic summer #注意:factor大小不能超過broker數 |
(2)查看有哪些主題已經創建
[ root @ kafka1 ~ ] # /usr/local/kafka/bin/kafka-topics.sh --list --zookeeper 192.168.2.22:2181 #列出集群中所有的topic summer #已經創建成功 |
(3)查看summer這個主題的詳情
[ root @ kafka1 ~ ] # /usr/local/kafka/bin/kafka-topics.sh --describe --zookeeper 192.168.2.22:2181 --topic summer Topic : summer PartitionCount : 1 ReplicationFactor : 3 Configs : Topic : summer Partition : 0 Leader : 2 Replicas : 2 , 4 , 3 Isr : 2 , 4 , 3 #主題名稱:summer #Partition:只有一個,從0開始 #leader :id為2的broker #Replicas 副本存在於broker id為2,3,4的上面 #Isr:活躍狀態的broker |
(4)發送消息,這里使用的是生產者角色
[ root @ kafka1 ~ ] # /bin/bash /usr/local/kafka/bin/kafka-console-producer.sh --broker-list 192.168.2.22:9092 --topic summer This is a messages welcome to kafka |
(5)接收消息,這里使用的是消費者角色
[ root @ kafka2 ~ ] # /usr/local/kafka/bin/kafka-console-consumer.sh --zookeeper 192.168.2.24:2181 --topic summer --from-beginning This is a messages welcome to kafka |
如果能夠像上面一樣能夠接收到生產者發過來的消息,那說明基於kafka的zookeeper集群就成功啦。
10,下面我們將webserver1上面的logstash的輸出改到kafka上面,將數據寫入到kafka中
(1)修改webserver1上面的logstash配置,如下所示:各個參數可以到 官網 查詢.
root @ webserver1 etc ] # cat logstash.conf input { #這里的輸入還是定義的是從日志文件輸入 file { type = > "system-message" path = > "/var/log/messages" start_position = > "beginning" } } output { #stdout { codec => rubydebug } #這是標准輸出到終端,可以用於調試看有沒有輸出,注意輸出的方向可以有多個 kafka { #輸出到kafka bootstrap_servers = > "192.168.2.22:9092,192.168.2.23:9092,192.168.2.24:9092" #他們就是生產者 topic_id = > "system-messages" #這個將作為主題的名稱,將會自動創建 compression_type = > "snappy" #壓縮類型 } } [ root @ webserver1 etc ] # |
(2)配置檢測
[ root @ webserver1 etc ] # /usr/local/logstash/bin/logstash -f logstash.conf --configtest --verbose Configuration OK [ root @ webserver1 etc ] # |
(2)啟動Logstash,這里我直接在命令行執行即可
[ root @ webserver1 etc ] # /usr/local/logstash/bin/logstash -f logstash.conf |
(3)驗證數據是否寫入到kafka,這里我們檢查是否生成了一個叫system-messages的主題
[ root @ kafka1 ~ ] # /usr/local/kafka/bin/kafka-topics.sh --list --zookeeper 192.168.2.22:2181 summer system - messages #可以看到這個主題已經生成了 #再看看這個主題的詳情: [ root @ kafka1 ~ ] # /usr/local/kafka/bin/kafka-topics.sh --describe --zookeeper 192.168.2.22:2181 --topic system-messages Topic : system - messages PartitionCount : 16 ReplicationFactor : 1 Configs : Topic : system - messages Partition : 0 Leader : 2 Replicas : 2 Isr : 2 Topic : system - messages Partition : 1 Leader : 3 Replicas : 3 Isr : 3 Topic : system - messages Partition : 2 Leader : 4 Replicas : 4 Isr : 4 Topic : system - messages Partition : 3 Leader : 2 Replicas : 2 Isr : 2 Topic : system - messages Partition : 4 Leader : 3 Replicas : 3 Isr : 3 Topic : system - messages Partition : 5 Leader : 4 Replicas : 4 Isr : 4 Topic : system - messages Partition : 6 Leader : 2 Replicas : 2 Isr : 2 Topic : system - messages Partition : 7 Leader : 3 Replicas : 3 Isr : 3 Topic : system - messages Partition : 8 Leader : 4 Replicas : 4 Isr : 4 Topic : system - messages Partition : 9 Leader : 2 Replicas : 2 Isr : 2 Topic : system - messages Partition : 10 Leader : 3 Replicas : 3 Isr : 3 Topic : system - messages Partition : 11 Leader : 4 Replicas : 4 Isr : 4 Topic : system - messages Partition : 12 Leader : 2 Replicas : 2 Isr : 2 Topic : system - messages Partition : 13 Leader : 3 Replicas : 3 Isr : 3 Topic : system - messages Partition : 14 Leader : 4 Replicas : 4 Isr : 4 Topic : system - messages Partition : 15 Leader : 2 Replicas : 2 Isr : 2 [ root @ kafka1 ~ ] # |
可以看出,這個主題生成了16個分區,每個分區都有對應自己的Leader,但是我想要有10個分區,3個副本如何辦?還是跟我們上面一樣命令行來創建主題就行,當然對於logstash輸出的我們也可以提前先定義主題,然后啟動logstash 直接往定義好的主題寫數據就行啦,命令如下:
[ root @ kafka1 ~ ] # /usr/local/kafka/bin/kafka-topics.sh --create --zookeeper 192.168.2.22:2181 --replication-factor 3 --partitions 10 --topic TOPIC_NAME |
好了,我們將logstash收集到的數據寫入到了kafka中了,在實驗過程中我使用while腳本測試了如果不斷的往kafka寫數據的同時停掉兩個節點,數據寫入沒有任何問題。
那如何將數據從kafka中讀取然后給我們的es集群呢?那下面我們在kafka集群上安裝Logstash,安裝步驟不再贅述;三台上面的logstash 的配置如下,作用是將kafka集群的數據讀取然后轉交給es集群,這里為了測試我讓他新建一個索引文件,注意這里的輸入日志還是messages,主題名稱還是“system-messages”
[ root @ kafka1 etc ] # more logstash.conf input { kafka { zk_connect = > "192.168.2.22:2181,192.168.2.23:2181,192.168.2.24:2181" #消費者們 topic_id = > "system-messages" codec = > plain reset_beginning = > false consumer_threads = > 5 decorate_events = > true } } output { elasticsearch { hosts = > [ "192.168.2.18:9200" , "192.168.2.19:9200" ] index = > "test-system-messages-%{+YYYY-MM}" #為了區分之前實驗,我這里新生成的所以名字為“test-system-messages-%{+YYYY-MM}” } } |
在三台kafka上面啟動Logstash,注意我這里是在命令行啟動的;
[ root @ kafka1 etc ] # pwd / usr / local / logstash / etc [ root @ kafka1 etc ] # /usr/local/logstash/bin/logstash -f logstash.conf [ root @ kafka2 etc ] # pwd / usr / local / logstash / etc [ root @ kafka2 etc ] # /usr/local/logstash/bin/logstash -f logstash.conf [ root @ kafka3 etc ] # pwd / usr / local / logstash / etc [ root @ kafka3 etc ] # /usr/local/logstash/bin/logstash -f logstash.conf |
在webserver1上寫入測試內容,即webserver1上面利用message這個文件來測試,我先將其清空,然后啟動
[ root @ webserver1 etc ] # >/var/log/messages [ root @ webserver1 etc ] # echo "我將通過kafka集群達到es集群哦^0^" >> /var/log/messages #啟動logstash,讓其讀取messages中的內容 |
下圖為我在客戶端寫入到kafka集群的同時也將其輸入到終端,這里寫入了三條內容
而下面三張圖側可以看出,三台Logstash 很平均的從kafka集群當中讀取出來了日志內容
再來看看我們的es管理界面
ok ,看到了吧,
流程差不多就是下面 醬紫咯
由於篇幅較長,我將
4.Kibana部署;
5.Nginx負載均衡Kibana請求;
6.案例:nginx日志收集以及MySQL慢日志收集;
7.Kibana報表基本使用;
Kibana的部署;
Kibana的作用,想必大家都知道了就是一個展示工具,報表內容非常的豐富;
下面我們在兩台es上面搭建兩套kibana
1.獲取kibana軟件包
1
2
|
[root@es1 ~]# wget https://download.elastic.co/kibana/kibana/kibana-4.1.2-linux-x64.tar.gz
[root@es1 ~]# tar -xf kibana-4.2.0-linux-x64.tar.gz -C /usr/local/
|
2.修改配置文件
1
2
3
4
5
6
7
8
9
|
[root@es1 ~]# cd /usr/local/
[root@es1 local]# ln -sv kibana-4.1.2-linux-x64 kibana
`kibana' -> `kibana-4.2.0-linux-x64'
[root@es1 local]# cd kibana
[root@es1 kibana]# vim config/kibana.yml
server.port: 5601 #默認端口可以修改的
server.host: "0.0.0.0" #kibana監聽的ip
elasticsearch.url: "http://localhost:9200" #由於es在本地主機上面,所以這個選項打開注釋即可
|
3.提供kibana服務管理腳本,我這里寫了個相對簡單的腳本
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
|
[root@es1 config]# cat /etc/init.d/kibana
#!/bin/bash
#chkconfig: 2345 55 24
#description: kibana service manager
KIBBIN='/usr/local/kibana/bin/kibana'
LOCK='/usr/local/kibana/locks'
START() {
if [ -f $LOCK ];then
echo -e "kibana is already \033[32mrunning\033[0m, do nothing."
else
echo -e "Start kibana service.\033[32mdone\033[m"
cd /usr/local/kibana/bin
nohup ./kibana & >/dev/null
touch $LOCK
fi
}
STOP() {
if [ ! -f $LOCK ];then
echo -e "kibana is already stop, do nothing."
else
echo -e "Stop kibana serivce \033[32mdone\033[m"
rm -rf $LOCK
ps -ef | grep kibana | grep -v "grep" | awk '{print $2}' | xargs kill -s 9 >/dev/null
fi
}
STATUS() {
Port=$(netstat -tunl | grep ":5602")
if [ "$Port" != "" ] && [ -f $LOCK ];then
echo -e "kibana is: \033[32mrunning\033[0m..."
else
echo -e "kibana is: \033[31mstopped\033[0m..."
fi
}
case "$1" in
start)
START
;;
stop)
STOP
;;
status)
STATUS
;;
restart)
STOP
sleep 2
START
;;
*)
echo "Usage: /etc/init.d/kibana (|start|stop|status|restart)"
;;
esac
|
4.啟動kibana服務
1
2
3
4
|
[root@es1 config]# chkconfig --add kibana
[root@es1 config]# service kibana start
Start kibana service.done
[root@es1 config]#
|
5.服務檢查
1
2
3
|
[root@es1 config]# ss -tunl | grep "5601"
tcp LISTEN 0 511 *:5601 *:*
[root@es1 config]#
|
ok,此時我直接訪問es1這台主機的5601端口
ok,能成功的訪問5601端口,那我把es1這台的配置放到es2上面去然后啟動,效果跟訪問es1一樣
Nginx負載均衡kibana的請求
1.在nginx-proxy上面yum安裝nginx
1
|
yum install -y nignx
|
2.編寫配置文件es.conf
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
[root@saltstack-node1 conf.d]# pwd
/etc/nginx/conf.d
[root@saltstack-node1 conf.d]# cat es.conf
upstream es {
server 192.168.2.18:5601 max_fails=3 fail_timeout=30s;
server 192.168.2.19:5601 max_fails=3 fail_timeout=30s;
}
server {
listen 80;
server_name localhost;
location / {
proxy_pass http://es/;
index index.html index.htm;
#auth
auth_basic "ELK Private";
auth_basic_user_file /etc/nginx/.htpasswd;
}
}
|
3.創建認證
1
2
3
4
5
6
7
8
|
[root@saltstack-node1 conf.d]# htpasswd -cm /etc/nginx/.htpasswd elk
New password:
Re-type new password:
Adding password for user elk-user
[root@saltstack-node1 conf.d]# /etc/init.d/nginx restart
Stopping nginx: [ OK ]
Starting nginx: [ OK ]
[root@saltstack-node1 conf.d]#
|
4.直接輸入認證用戶及密碼就可訪問啦http://192.168.2.21/
Nginx及MySQL慢日志收集
首先我們在webserver1上面都分別安裝了nginx 及mysql.
1.為了方便nginx日志的統計搜索,這里設置nginx訪問日志格式為json
(1)修改nginx主配置文件
說明:如果想實現日志的報表展示,最好將業務日志直接以json格式輸出,這樣可以極大減輕cpu負載,也省得運維需要寫負載的filter過濾正則。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
[root@webserver1 nginx]# vim nginx.conf
log_format json '{"@timestamp":"$time_iso8601",'
'"@version":"1",'
'"client":"$remote_addr",'
'"url":"$uri",'
'"status":"$status",'
'"domain":"$host",'
'"host":"$server_addr",'
'"size":$body_bytes_sent,'
'"responsetime":$request_time,'
'"referer": "$http_referer",'
'"ua": "$http_user_agent"'
'}';
access_log /var/log/access_json.log json;
|
(2)收集nginx日志和MySQL日志到消息隊列中;這個文件我們是定義在客戶端,即生產服務器上面的Logstash文件哦.
注意:這里剛搭建完畢,沒有什么數據,為了展示效果,我這里導入了線上的nginx和MySQL慢日志
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
|
input {
file { #從nginx日志讀入
type => "nginx-access"
path => "/var/log/nginx/access.log"
start_position => "beginning"
codec => "json" #這里指定 codec格式為json
}
file { #從MySQL慢日志讀入
type => "slow-mysql"
path => "/var/log/mysql/slow-mysql.log"
start_position => "beginning"
codec => multiline { #這里用到了logstash的插件功能,將本來屬於一行的多行日志條目整合在一起,讓他屬於一條
pattern => "^# User@Host" #用到了正則去匹配
negate => true
what => "previous"
}
}
}
output {
# stdout { codec=> rubydebug }
if [type] == "nginx-access" { #通過判斷input中定義的type,來讓它在kafka集群中生成的主題名稱
kafka { #輸出到kafka集群
bootstrap_servers => "192.168.2.22:9092,192.168.2.23:9092,192.168.2.24:9092" #生產者們
topic_id => "nginx-access" #主題名稱
compression_type => "snappy" #壓縮類型
}
}
if [type] == "slow-mysql" {
kafka {
bootstrap_servers => "192.168.2.22:9092,192.168.2.23:9092,192.168.2.24:9092"
topic_id => "slow-mysql"
compression_type => "snappy"
}
}
}
|
(3)Logstash 從kafka集群中讀取日志存儲到es中,這里的定義logstash文件是在三台kafka服務器上面的哦,並且要保持一致,你可以在一台上面修改測試好之后,拷貝至另外兩台即可。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
|
input {
kafka {
zk_connect => "192.168.2.22:2181,192.168.2.23:2181,192.168.2.24:2181"
type => "nginx-access"
topic_id => "nginx-access"
codec => plain
reset_beginning => false
consumer_threads => 5
decorate_events => true
}
kafka {
zk_connect => "192.168.2.22:2181,192.168.2.23:2181,192.168.2.24:2181"
type => "slow-mysql"
topic_id => "slow-mysql"
codec => plain
reset_beginning => false
consumer_threads => 5
decorate_events => true
}
}
output {
# stdout { codec=> rubydebug }
if [type] == "nginx-access" {
elasticsearch {
hosts => ["192.168.2.18:9200","192.168.2.19:9200"]
index => "nginx-access-%{+YYYY-MM}"
}
}
if [type] == "slow-mysql" {
elasticsearch {
hosts => ["192.168.2.18:9200","192.168.2.19:9200"]
index => "slow-mysql-%{+YYYY-MM}"
}
}
}
|
通過上圖可以看到,nginx日志以及MySQL慢日志已經成功抵達es集群
然后我們在kibana上面創建索引就可以啦
此時就可以看到索引啦
(5)創建MySQL慢日志索引
MySQL的索引也出來啦
kibana報表功能非常的強大,也就是可視化;可以制作出下面不同類型的圖形
下面就是我簡單的一些圖形展示
由於篇幅問題,可以看官方介紹。
參考:
https://github.com/liquanzhou/ops_doc/tree/master/Service/kafka
http://www.lujinhong.com/kafka%E9%9B%86%E7%BE%A4%E6%93%8D%E4%BD%9C%E6%8C%87%E5%8D%97.html
http://www.it165.net/admin/html/201405/3192.html
http://blog.csdn.net/lizhitao/article/details/39499283
轉自: http://blog.sctux.com/?p=451