filebeat+logstash+es+kafka數據采集


初期選用fiume發送至Kafka。經過觀察,每一個FlumeAgent都占用了較大的系統資源(至少會占用一顆CPU 50%以上的資源)。而另外一個業務,LogServer壓力大,CPU資源尤其緊張,如果要實時收集分析日志,那么就需要一個更輕量級、占用資源更少的日志收集框架。 

 filebeat、logstash、es下載

wget https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-7.8.1-linux-x86_64.tar.gz
wget https://artifacts.elastic.co/downloads/logstash/logstash-7.8.1.tar.gz
wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-7.8.1-linux-x86_64.tar.gz

filebeat解壓后修改配置文件

啟動命令:

./filebeat -e -c filebeat.yml

nohup ./filebeat -e -c filebeat.yml >> filebeat.log 2>&1 &

filebeat.yml配置

filebeat.inputs:(log類型)

  - type: log
# Change to true to enable t
    enabled: true
# Paths that should be crawl
    paths:                      
      - /home/lw/test/filebeat/*.log
    fields:                 
      log_topic: lw_filebeat_t_2

kafka output:

output.kafka:
  enable: true
  #根據kafka指定對應端口和ip
    hosts: ["xxx.xxx.xxx.xxx:9092", "xxx.xxx.xxx.xxx:9092", "xxx.xxx.xxx.xxx:9092"]

    topic: '%{[fields.log_topic]}'
    partition.round_robin:
        reachable_only: false

    required_acks: 1
    compression: gzip #也可以none
    max_message_bytes: 1000000

    version: 0.9.0.1                                                                                                          
    codec.format:                                                              
        string: '%{[host.name]}-%{[message]}'

 

  • hosts是kafka集群的broker list;

  • topic: ‘%{[fields.log_topic]}’ : 這項指定了我們要寫入kafka集群哪個topic, 可以看到它實現上是引用了上面test.yml配置中我們自定義的filed字段,通過這種方式我們就可以將收集的不同路徑的數據寫入到不同的topic中,但是這個有個限制就是只能寫到一個kafka集群,因為當前版本的filebeat不允許同時配置多個output。

  • codec.format: 指定了寫入kafka集群的消息格式,我們在從日志文件中讀取的每行內容前面加上了當前機器的hostname。

在kafka上創建對應的topic

#查看topic
bin/kafka-topics.sh --list --zookeeper master:2181,slave1:2181,slave2:2181
#創建
bin/kafka-topics.sh --create --zookeeper master:2181,slave1:2181,slave2:2181 --replication-factor 3 --partitions 3 --topic test_log_r3p3

 kafka其他命令

#生成數據
bin/kafka-console-producer.sh --broker-list ip:9094 --topic test_log_r3p3
#查看數據
bin/kafka-console-consumer.sh --bootstrap-server ip:6667 --topic test_log_r3p3 --from-beginning
#主題描述
bin/kafka-topics.sh --zookeeper localhost:2181 --describe  --topic test_log_r3p3
#刪除主題
bin/kafka-topics.sh --zookeeper localhost:2181 --delete  --topic test_log_r3p3

解壓logstash

可直接命令啟動:

bin/logstash -e 'input { stdin { } } output { stdout {codec=>rubydebug} }'(控制台輸入內容並輸出內容)

加入配置文件,后台啟動

nohup ./bin/logstash -f config/fbet_es.conf --config.reload.automatic >> /opt/server/logstash/logs/logstash_es.log 2>&1 &

根據需求修改配置文件:

Kafka輸入插件配置詳解:https://blog.csdn.net/weixin_34405354/article/details/88730394

kafka到es,期間可以根據需求過濾

input {
  kafka {
    ## app-log-服務名稱
    topics_pattern => "app-log-.*"
#topics => ["test_weblog","..."] bootstrap_servers => "ip:9092" codec => json consumer_threads => 1 ## 因為只設置了一個partition,所以消費者線程數設置為1 #auto_offset_rest => "latest" group_id => "app-log-group" } kafka { ## error-log-服務名稱 topics_pattern => "error-log-.*" bootstrap_servers => "ip:9092" codec => json consumer_threads => 1 decorate_events => true #auto_offset_rest => "latest"#earliest group_id => "error-log-group" } } filter { ## 時區轉換 ruby { code => "event.set('index_time',event.timestamp.time.localtime.strftime('%Y.%m.%d'))" } if "app-log" in [fields][logtopic]{ grok { ## 表達式 match => ["message", "\[%{NOTSPACE:currentDateTime}\] \[%{NOTSPACE:level}\] \[%{NOTSPACE:thread-id}\] \[%{NOTSPACE:class}\] \[%{DATA:hostName}\] \[%{DATA:ip}\] \[%{DATA:applicationName}\] \[%{DATA:location}\] \[%{DATA:messageInfo}\] ## (\'\'|%{QUOTEDSTRING:throwable})"] } } if "error-log" in [fields][logtopic]{ grok { ## 表達式 match => ["message", "\[%{NOTSPACE:currentDateTime}\] \[%{NOTSPACE:level}\] \[%{NOTSPACE:thread-id}\] \[%{NOTSPACE:class}\] \[%{DATA:hostName}\] \[%{DATA:ip}\] \[%{DATA:applicationName}\] \[%{DATA:location}\] \[%{DATA:messageInfo}\] ## (\'\'|%{QUOTEDSTRING:throwable})"] } } } ## elasticsearch: output { if "app-log" in [fields][logtopic]{ ## es插件 elasticsearch { # es服務地址 hosts => ["ip:9200"] # 用戶名密碼 user => "elastic" password => "123456" ## 索引名,+ 號開頭的,就會自動認為后面是時間格式: ## javalog-app-service-2019.01.23 index => "app-log-%{[fields][logbiz]}-%{index_time}" # 是否嗅探集群ip:一般設置true;http://ip:9200/_nodes/http?pretty # 通過嗅探機制進行es集群負載均衡發日志消息 sniffing => true # logstash默認自帶一個mapping模板,進行模板覆蓋 template_overwrite => true } } if "error-log" in [fields][logtopic]{ elasticsearch { hosts => ["ip:9200"] user => "elastic" password => "123456" index => "error-log-%{[fields][logbiz]}-%{index_time}" #"test_log-%{+YYYY.MM.dd}" sniffing => true template_overwrite => true } } }

 

kafka到hdfs:

需要安裝webhdfs插件

#下載插件命令
./bin/logstash-plugin install logstash-output-webhdfs

#方法二
##下載插件包
https://github.com/logstash-plugins/logstash-output-webhdfs

##在logstash目錄下的Gemfile文件中,進行如下修改 
gem "logstash-output-webhdfs"
改為
gem "logstash-output-webhdfs", :path => "/root/logstash-output-webhdfs/"

##完成后執行下載命令,等待出現Installation successful則成功

配置項

#===========kafka-hdfs==================

input {
    kafka {
        bootstrap_servers => "ip:9092, ip2:9092, ip3:9092"
        topics => ["rsyslog_nginx"]
        codec => "json"
    }
}

filter {
    date {
        match => ["time_local","dd/MMM/yyyy:HH:mm:ss Z"]
        target => "time_local"
    }

    ruby {
        code => "event.set('index.date', event.get('time_local').time.localtime.strftime('%Y%m%d'))"
    }

    ruby {
        code => "event.set('index.hour', event.get('time_local').time.localtime.strftime('%H'))"
    }
}

output {
    webhdfs {
        workers => 2
        host => "master01"
        port => 50070
        user => "hdfs"
        path => "/logs/nginx/%{index.date}/%{index.hour}.log" #"/test_logstash/logstash-%{+YYYY}-%{+MM}-%{+dd}/test.log" 
        codec => "json"
    }
    stdout { codec => rubydebug }
}

ES集群安裝

ES的7版本需要jdk1.8以上

解壓修改配置文件

vim ./config/elasticsearch.yml

master節點

#配置詳情在另一博客
cluster.name: elasticsearch
# ------------------------------------ Node ------------------------------------

node.name: es-master
node.master: true
node.data: true
# ----------------------------------- Paths ------------------------------------

path.data: /opt/server/elasticsearch-7.8.1/data
path.logs: /opt/server/elasticsearch-7.8.1/logs
# ---------------------------------- Network -----------------------------------

network.host: 0.0.0.0

http.cors.enabled: true
http.cors.allow-origin: "*"
# --------------------------------- Discovery ----------------------------------

cluster.initial_master_nodes: ["es-master"]
discovery.seed_hosts: ["ip1:9300","ip2:9300", "ip3:9300"]

#es指定創建索引規則
action.auto_create_index: .monitoring-kibana*,.monitoring-data*,.watches,.kibana,.watcher-history*,.monitoring-es*,.security,.triggered_watches,logstash-*

 

從節點1

# ---------------------------------- Cluster -----------------------------------

cluster.name: elasticsearch
#
# ------------------------------------ Node -----------------------

node.name: es-node01
node.master: false
node.data: true
# ----------------------------------- Paths ------------------------------------

path.data: /opt/server/elasticsearch-7.8.1/data
path.logs: /opt/server/elasticsearch-7.8.1/logs

# ---------------------------------- Network -----------------------------------
network.host: 0.0.0.0

http.cors.enabled: true
http.cors.allow-origin: "*"

# --------------------------------- Discovery ----------------------------------

 
         

cluster.initial_master_nodes: ["es-master"]
discovery.seed_hosts: ["ip1:9300","ip2:9300", "ip3:9300"]
#es指定創建索引規則
action.auto_create_index: .monitoring-kibana*,.monitoring-data*,.watches,.kibana,.watcher-history*,.monitoring-es*,.security,.triggered_watches,logstash-*

 
        

從節點2修改node.name:即可

root用戶修改配置

vim /etc/security/limits.conf

* soft nofile 65536
* hard nofile 131072
* soft nproc 2048
* hard nproc 4096

soft nproc: 可打開的文件描述符的最大數(軟限制)

hard nproc: 可打開的文件描述符的最大數(硬限制)

soft nofile:單個用戶可用的最大進程數量(軟限制)

hard nofile:單個用戶可用的最大進程數量(硬限制)

修改sysctl.conf

sysctl -w vm.max_map_count=655360

vim /etc/sysctl.conf

vm.max_map_count=655360

查看

sysctl -a|grep vm.max_map_count

啟動

ES不允許使用root操作es,需要添加用戶之后切換用戶啟動

./elasticsearch

后台啟動./elasticsearch -d

訪問 http://ip:9200/_cat/nodes?v會看到3個節點的信息或者使用命令curl 172.31.5.6:9200/_cat/nodes?v訪問

 elasticHD

下載:

wget https://github.com/360EntSecGroup-Skylar/ElasticHD/releases/download/1.4/elasticHD_linux_amd64.zip

unzip安裝包解壓、修改權限

unzip elasticHD_linux_amd64.zip
chmod 777 ElasticHD

后台啟動

#127.0.0.1改為es集群ip
nohup ./ElasticHD -p 127.0.0.1:9800 > /opt/servers/elasticsearch/elasticHD.log 2>&1 &

頁面查看

http://ip:9800/

kibana

wget https://artifacts.elastic.co/downloads/kibana/kibana-7.8.1-linux-x86_64.tar.gz
nohup ./bin/kibana >> ./logs/kibana.log & 

filebeat知識鏈接:https://www.yuque.com/stonejin/cbt4vt/blqise

kibana教程:https://www.cnblogs.com/chenqionghe/p/12503181.html?utm_source=tuicool&utm_medium=referral

ELK參考:https://www.kancloud.cn/noahs/linux/1397312

logstash插件使用:https://blog.csdn.net/chenleiking/article/details/73563930

es基礎使用:https://www.pianshen.com/article/739610986/

es使用:http://blog.cheyo.net/136.html


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM