filebeat+ELK配置及常用操作


背景介紹

最近工作涉及幾台新服務器的日志需要接入ELK系統,配置思路如下:

使用Filebeat收集本地日志數據,Filebeat監視日志目錄或特定的日志文件,再發送到消息隊列到kafka,然后logstash去獲取消費,利用filter功能過濾分析,最終存儲到elasticsearch中。

filebeat和flume都具有日志收集功能,不過filebeat更輕量,使用go語言編寫占用資源更少,可以有很高的並發,帶有內部模塊(auditd,Apache,Nginx,System和MySQL),可通過一個指定命令來簡化通用日志格式的收集,解析和可視化;flume使用java開發,需要安裝java環境,相對會比較重。

當然兩者也存在區別:Filebeat收集數據的速度大於寫入速度的時候可能出現數據丟失的現象,而flume會在收集數據和寫入數據之間做出調整,保證能在兩者之間提供一種平穩的數據狀態。可以實時的將分析數據並將數據保存在數據庫或者其他系統中,不會出現數據丟失的現象。

以下僅記錄配置過程及常見的幾種排錯命令,安裝篇會獨立一篇做詳細介紹。

配置信息

filebeat配置

我是直接yum install filebeat一鍵安裝的,這里不做具體講解官網有詳細介紹:

https://www.elastic.co/guide/en/beats/filebeat/current/index.html

安裝完成后我們以配置采集/var/log/messages為例,配置如下

# egrep -v '#|^$' /etc/filebeat/filebeat.yml
filebeat.inputs:
- type: log
  enabled: true
  paths:
    - /var/log/messages
filebeat.config.modules:
  path: ${path.config}/modules.d/*.yml
  reload.enabled: false
setup.template.settings:
  index.number_of_shards: 1
setup.kibana:
output.kafka:
  hosts: ["10.114.102.30:9092", "10.114.102.31:9092", "10.114.102.32:9092", "10.114.102.33:9092", "10.114.102.34:9092"]
  topic: T621_messages
  partition.round_robin:
    reachable_only: false
  required_acks: 1
  compression: gzip
processors:
  - add_host_metadata:
      when.not.contains.tags: forwarded
  - add_cloud_metadata: ~
  - add_docker_metadata: ~
  - add_kubernetes_metadata: ~

有幾個參數需要注意的:

paths表示需要提取的日志的路徑,將日志輸出到kafka中,創建topic

  • required_acks

0:這意味着生產者producer不等待來自broker同步完成的確認繼續發送下一條(批)消息。此選項提供最低的延遲但最弱的耐久性保證(當服務器發生故障時某些數據會丟失,如leader已死,但producer並不知情,發出去的信息broker就收不到)。

1:這意味着producer在leader已成功收到的數據並得到確認后發送下一條message。此選項提供了更好的耐久性為客戶等待服務器確認請求成功(被寫入死亡leader但尚未復制將失去了唯一的消息)。

-1:這意味着producer在follower副本確認接收到數據后才算一次發送完成。 此選項提供最好的耐久性,我們保證沒有信息將丟失,只要至少一個同步副本保持存活。 三種機制,性能依次遞減 (producer吞吐量降低),數據健壯性則依次遞增。

  • json.keys_under_root: true
  • json.add_error_key: true
  • json.message_key: log

這三行是識別json格式日志的配置,若日志格式不為json格式,需要注釋掉,否則收集到的日志為filebeat的錯誤日志。

kafka配置

kafka原來已經安裝並配置好了,這里不再說明具體安裝過程,后續會出一篇ELK完整搭建過程。

這里不做重點講解,可直接查官網:https://kafka.apache.org/documentation/#quickstart

因為有5台配合zookeeper做了集群,選其中一台配置如下:

# egrep -v '#|^$' /home/kafka/kafka/config/server.properties
broker.id=1		#按順序寫,不要亂
listeners=PLAINTEXT://0.0.0.0:9092		 #自己的ip
advertised.listeners=PLAINTEXT://10.114.102.30:9092
num.network.threads=24
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/data/kafka
num.partitions=8
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=48
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=10.114.102.30:2181,10.114.102.31:2181,10.114.102.32:2181,10.114.102.33:2181,10.114.102.34:2181
zookeeper.connection.timeout.ms=6000
auto.create.topics.enable=false
group.initial.rebalance.delay.ms=0

注意:每台服務器除broker.id需要修改之外,其他屬性保持一致。

logstash配置

logstash安裝也是直接參考官網就可以了

https://www.elastic.co/guide/en/logstash/7.x/index.html

不過有個地方要注意,kafka和logstash的版本兼容問題,以下是kafka使用的版本:

find /home/kafka/kafka/libs/ -name \*kafka_\* | head -1 | grep -o '\kafka[^\n]*'   
kafka_2.11-1.1.0.jar

通過查找rpm包可以看到logstash用的是7.8.0

/home/cxhchusr/logstash-7.8.0.rpm

conf.d目錄下配置消費messages的文件如下

# egrep -v '#|^$' /etc/logstash/conf.d/T621_messages.conf 
input {
  kafka {
    bootstrap_servers => "10.114.102.30:9092,10.114.102.31:9092,10.114.102.32:9092,10.114.102.33:9092,10.114.102.34:9092"
    client_id => "T621_messages"
    group_id => "T621_messages"
    auto_offset_reset => "latest"
    consumer_threads => 10
    decorate_events => true
    topics => ["T621_messages"]
    decorate_events => true
    type => syslog
    }
}
filter{
  grok {
    match => { "message" => "%{SYSLOGLINE}" }
  }
  date {
    match => [ "logdate", "YYYY-MM-dd HH:mm:ss.SSS" ]
    target => "@timestamp"
    timezone =>"+00:00"
  }
  mutate{
    remove_field => "logdate"
  }
}
output {
  elasticsearch {
    hosts => ["10.114.102.30:9200", "10.114.102.31:9200", "10.114.102.32:9200", "10.114.102.33:9200", "10.114.102.34:9200"]
    index => "t621_messages-%{+YYYY.MM.dd}"
    user => caixun
    password => "******()90"
    }
}

注意:es索引需要全部為小寫。

最后啟動即可,並加入開機自啟動/etc/rc.local

nohup /usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/ > /dev/null 2>&1 &

kibana創建索引

logstash配置完成后即可在kibana創建索引

image-20210818150900238

創建完成效果如下,表示接入成功:

image-20210818150651938

常用排查命令

配置過程中除了在kafka上創建topic,還需查詢topic是否創建成功、消費情況、以及消息處理情況。

以及es是否正常入庫並創建了索引。下面列出幾個ELK運維常用命令。

kafka常用運維指令

  • 查詢當前topic列表

/home/kafka/kafka/bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --list

  • 創建topic

/home/kafka/kafka/bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --create --partitions 10 --replication-factor 1 --topic T621_messages

  • topic描述(某topic詳細信息)

/home/kafka/kafka/bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --describe --topic T621_messages

  • topic消費情況(測試消息是否正常生產)

發送:/home/kafka/kafka/bin/kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic T621_messages

接收:/home/kafka/kafka/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic T621_messages --from-beginning

  • 查看topic堆積情況

/home/kafka/kafka/bin/kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --list ##查看組列表

/home/kafka/kafka/bin/kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --describe --group T621_messages ##偏移量

/data/kafka/kafka/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic T621_messages --from-beginning

ES常用運維指令

  • 查看ES集群健康情況

curl -u caixxx:"CAIxxx()90" '10.114.102.30:9200/_cluster/health'

  • 查看索引存儲情況

curl -u caixxx:"CAIxxx()90" '10.114.102.30:9200/_cat/indices?v'

  • 查看幫助命令
# curl -u caixun:"CAIXUN()90" '10.114.102.30:9200/_cat'
=^.^=
/_cat/allocation
/_cat/shards
/_cat/shards/{index}
/_cat/master
/_cat/nodes
/_cat/tasks
/_cat/indices
/_cat/indices/{index}
/_cat/segments
/_cat/segments/{index}
/_cat/count
/_cat/count/{index}
/_cat/recovery
/_cat/recovery/{index}
/_cat/health
/_cat/pending_tasks
/_cat/aliases
/_cat/aliases/{alias}
/_cat/thread_pool
/_cat/thread_pool/{thread_pools}
/_cat/plugins
/_cat/fielddata
/_cat/fielddata/{fields}
/_cat/nodeattrs
/_cat/repositories
/_cat/snapshots/{repository}
/_cat/templates
/_cat/transforms
/_cat/transforms/{transform_id}

過濾索引查看消息是否成功存儲在es,有的話代表配置成功。

---- 鋼鐵 648403020@qq.com 2021.08.20

參考鳴謝

官方kafka:http://kafka.apache.org/


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM