ELK之使用kafka作為消息隊列收集日志


  參考:https://www.cnblogs.com/fengjian2016/p/5841556.html

       https://www.cnblogs.com/hei12138/p/7805475.html

                 https://blog.csdn.net/lhmood/article/details/79099615

      https://www.cnblogs.com/Orgliny/p/5730381.html

  

  ELK可以使用redis作為消息隊列,但redis作為消息隊列不是強項而且redis集群不如專業的消息發布系統kafka,以下使用ELK結合kafka作為消息隊列進行配置

  本次配置拓撲圖

 

  主機角色

 主機名  IP  系統  角色
 prd-elk-logstash-01 172.16.90.20  CentOS7.5  logstash日志傳輸
 prd-elk-kafka-01 172.16.90.21  CentOS7.5  日志消息隊列kafka01
 prd-elk-kafka-02 172.16.90.22  CentOS7.5  日志消息隊列kafka02
 prd-elk-kafka-03 172.16.90.23  CentOS7.5  日志消息隊列kafka03
prd-elk-logstash-02 172.16.90.24  CentOS7.5 logstash日志過濾,存儲

  軟件選用

JKD 1.8.0_171
elasticsearch 6.5.4
logstash 6.5.4
kibana 6.5.4
kafka 2.11-2.1.1

  172.16.90.20安裝logstash和filebeat

rpm -ivh logstash-6.5.4.rpm 
rpm -ivh filebeat-6.5.4-x86_64.rpm 

  修改filebeat配置文件收集系統messages日志 

/etc/filebeat/filebeat.yml
filebeat.inputs:
- type: log
  enabled: true
  paths:
    - /var/log/messages    #需收集日志路徑
  tags: ["messages"]
filebeat.config.modules:
  path: ${path.config}/modules.d/*.yml
  reload.enabled: false
setup.template.settings:
  index.number_of_shards: 3
setup.kibana:
output.logstash:
  hosts: ["172.16.90.20:5044"] #輸出至logstash
processors:
  - add_host_metadata:
  - add_cloud_metadata:
  - drop_fields:
      fields: ["beat", "input", "source", "offset", "prospector","host"] #刪除無用的參數

  修改logstash先標准輸出至屏幕

/etc/logstash/conf.d/filebeat-logstash.conf
input{
    beats {
        port => 5044
    }
}

output{
    if "messages" in [tags]{
        stdout{
            codec => rubydebug
        }
    }
}
                        

  運行logstash查看輸出

/usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/filebeat-logstash.conf 

   輸出如下

  172.16.90.24安裝logstash,elasticsearch,kibana安裝配置過程不詳敘,通過172.16.90.20收集的日志信息可以直接傳遞至172.16.90.24的logstash進行過濾及es存儲,規模不大可以使用這種方式,下面安裝配置kafka集群

  

  Kafka集群安裝配置

  在搭建kafka集群之前需要提前安裝zookeeper集群,kafka壓縮包只帶zookeeper程序,只需要解壓配置即可使用

  獲取軟件包,官方網站 http://kafka.apache.org/

  PS:不要下載src包,運行會報錯

  解壓拷貝至指定目錄

tar -xf kafka_2.11-2.1.1.tgz
cp -a kafka_2.11-2.1.1 /usr/local/
cd /usr/local/
ln -s kafka_2.11-2.1.1/ kafka

  修改配置文件zookeeper配置文件

 /usr/local/kafka/config/zookeeper.properties

#數據路徑
dataDir=/data/zookeeper 
# the port at which the clients will connect
clientPort=2181
# disable the per-ip limit on the number of connections since this is a non-production config
maxClientCnxns=0
#tickTime : 這個時間是作為 Zookeeper 服務器之間或客戶端與服務器之間維持心跳的時間間隔,也就是每個 tickTime 時間就會發送一個心跳。
tickTime=2000
initLimit=20
syncLimit=10
#2888 端口:表示的是這個服務器與集群中的 Leader 服務器交換信息的端口;
#3888 端口:表示的是萬一集群中的 Leader 服務器掛了,需要一個端口來重新進行選舉,選出一個新的 Leader ,而這個端口就是用來執行選舉時服務器相互通信的端口
server.1=172.16.90.21:2888:3888
server.2=172.16.90.22:2888:3888
server.3=172.16.90.23:2888:3888

  創建數據目錄並創建myid文件,文件為數字,用於標識唯一主機,必須有這個文件否則zookeeper無法啟動

mkdir /data/zookeeper -p
echo 1 >/data/zookeeper/myid

  修改kafka配置/usr/local/kafka/config/server.properties

#唯一數字分別為1,2,2
broker.id=1
#這個broker監聽的端口
prot=9092
#唯一填服務器IP
host.name=172.16.90.21
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
#kafka日志路徑,不需要提前創建,啟動kafka時創建
log.dirs=/data/kafka-logs
#分片數,需要配置較大,分片影響讀寫速度
num.partitions=16
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
#zookpeer集群
zookeeper.connect=192.168.1.11:2181,192.168.1.12:2181,192.168.1.13:2181
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0

  把配置拷貝至其他kafka主機,zookeeper.properties配置一樣 ,server.properties配置一下兩處不一樣,myid也不一樣

broker.id=2
host.name=172.16.90.22

broker.id=3
host.name=172.16.90.23

  配置完三台主機,啟動zookeeper啟動順序為服務器1 2 3

 /usr/local/kafka/bin/zookeeper-server-start.sh /usr/local/kafka/config/zookeeper.properties 

  啟動過程中提示拒絕連接不用理會,由於zookeeper集群在啟動的時候,每個結點都試圖去連接集群中的其它結點,先啟動的肯定連不上后面還沒啟動的,所以上面日志前面部分的異常是可以忽略的。通過后面部分可以看到,集群在選出一個Leader后,最后穩定了。其他節點也可能會出現類似的情況,屬於正常

  檢測是否啟動

netstat -ntalp|grep -E "2181|2888|3888"

  啟動kafka 啟動順序也是1 2 3

nohup /usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties &

  啟動成功會啟動9092端口

  PS:如果系統有問題會導致日志不停輸出

  三台主機啟動完畢,下面來檢測一下

  在kafka01創建一個主題,主題名為sumer

/usr/local/kafka/bin/kafka-topics.sh --create --zookeeper 172.16.90.21:2181 --replication-factor 3 --partitions 1 --topic summer

  查看summer主題詳情

 /usr/local/kafka/bin/kafka-topics.sh --describe --zookeeper 172.16.90.21:2181 --topic summer

  

Topic:summer	PartitionCount:1	ReplicationFactor:3	Configs:
	Topic: summer	Partition: 0	Leader: 2	Replicas: 2,3,1	Isr: 2,3,1

#主題名稱:summer
#Partition:只有一個從0開始
#leader:id為2的broker
#Replicas: 副本存在id為 2 3 1的上面
#Isr:活躍狀態的broker

  刪除主題

/usr/local/kafka/bin/kafka-topics.sh --delete --zookeeper 172.16.90.21:2181  --topic summer

  使用kafka01發送消息,這里是生產者角色

 /usr/local/kafka/bin/kafka-console-producer.sh --broker-list 172.16.90.21:9092 --topic summer

  出現命令行需要手動輸入消息

  使用kafka02接收消息,這里是消費者角色

/usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server 172.16.90.22:9092 --topic summer --from-beginning

  在kafka01輸入消息然后會在kafka02接收到該消息

  下面將logstash-01的輸出從標准輸出改到kafka中,修改配置文件

/etc/logstash/conf.d/filebeat-logstash.conf

input{
    beats {
        port => 5044
    }
}

output{
    if "messages" in [tags]{
        kafka{
            #輸出至kafka
            bootstrap_servers => "172.16.90.21:9092,172.16.90.22:9092,172.16.90.23:9092"
            #主題名稱,將會自動創建
            topic_id => "system-messages"
            #壓縮類型
            compression_type => "snappy"
        }
        stdout{
            codec => rubydebug
        }
    }
}

  啟動logstash輸出測試

  kafka01查看是否生成這個主題

/usr/local/kafka/bin/kafka-topics.sh --describe --zookeeper 172.16.90.21:2181 --topic system-messages

  可以看出,這個主題生成了16個分區,每個分區都有對應自己的Leader,但是我想要有10個分區,3個副本如何辦?還是跟我們上面一樣命令行來創建主題就行,當然對於logstash輸出的我們也可以提前先定義主題,然后啟動logstash 直接往定義好的主題寫數據就行啦,命令如下

/usr/local/kafka/bin/kafka-topics.sh --create --zookeeper 172.16.90.21:2181 --replication-factor 3 --partitions 10 --topic TOPIC_NAME

  經過以上步驟已經通過完成了 filebeat-logstash01-kafka過程,下面配置logstash02接收消費kafka消息

  logstash02需要安裝logstash,elasticsearch,kibana

rpm -ivh elasticsearch-6.5.4.rpm 
rpm -ivh kibana-6.5.4-x86_64.rpm 
rpm -ivh logstash-6.5.4.rpm

  修改logstash配置先進行標准輸出測試 

input{
    kafka {
    #kafka輸入
    bootstrap_servers => "172.16.90.21:9092,172.16.90.22:9092,172.16.90.23:9092"
    codec => plain
    #主題
    topics => ["system-messages"]
    consumer_threads => 5  
    decorate_events => false 
    auto_offset_reset => "earliest"
    #定義type方便后面選擇過濾及輸出
    type => "system-messages"
  }
}

output{
    if "system-messages" in [type] {
       # elasticsearch{
       # hosts => ["172.16.90.24:9200"]
       # index => "messages-%{+YYYY.MM}"
       # }
	stdout{
	    codec => rubydebug
	}
    }
}

  對比logstash01和logstash02輸出可以發現經過kafka的輸出message會有所不同

  原始message

  經過kafka輸出message如下

  logstash-kafka 插件輸入和輸出默認 codec 為 json 格式。在輸入和輸出的時候注意下編碼格式。消息傳遞過程中 logstash 默認會為消息編碼內加入相應的時間戳和 hostname 等信息。如果不想要以上信息(一般做消息轉發的情況下),可以修改logstash01配置增加配置 codec => plain{ format => "%{message}"}

input{
    beats {
 	port => 5044
    }
}

output{
    if "messages" in [tags]{
        kafka{
	    #輸出至kafka
	    bootstrap_servers => "172.16.90.21:9092,172.16.90.22:9092,172.16.90.23:9092"
	    #主題名稱,將會自動創建
	    topic_id => "system-messages"
	    #壓縮類型
	    compression_type => "snappy"
	    #定義消息通過kafaka傳遞后格式不變
            codec => plain{ format => "%{message}"}
        }
	stdout{
	    codec => rubydebug
	}
    }
}

  PS:使用配置codec => plain{ format => "%{message}"} 后傳遞給kafka的消息僅僅剩下源messages信息,可以在kafka查看

/usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server 172.16.90.21:9092 --topic nginx-prod-log --from-beginning

  源信息為

  kafka消息隊列輸出

  后端過濾的logstash輸出

 

  啟動logstahs再次對比前后輸出

  原message

  經過kafka后的message

   停止任意兩個kafka只有有一台kafka主機在運行就能保障日志正常輸出

  把標准輸出刪除寫入到elasticsearch即可

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM