filebeat 直接到logstash, 由於logstash的設計問題, 可能會出現阻塞問題, 因為中間使用消息隊列分開
可以使用redis, 或者kafka, 這兒使用的是kafka
1, 安裝
kafka的安裝, 解壓可用, 但需要zookeeper, 內置了一個zookeeper, 直接使用即可
1), 啟動內置zookeeper
./bin/zookeeper-server-start.sh ./config/zookeeper.properties &
2), 修改kafka的配置文件
vim ./conf/server.properties
############################# Server Basics ############################# broker.id=0 delete.topic.enable=true ############################# Socket Server Settings ############################# listeners=PLAINTEXT://0.0.0.0:9092 num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 ############################# Log Basics ############################# log.dirs=/tmp/kafka-logs num.partitions=1 num.recovery.threads.per.data.dir=1 ############################# Log Flush Policy ############################# log.flush.interval.messages=10000 log.flush.interval.ms=1000 ############################# Log Retention Policy ############################# log.retention.hours=168 log.segment.bytes=1073741824 log.retention.check.interval.ms=300000 ############################# Zookeeper ############################# zookeeper.connect=localhost:2181 zookeeper.connection.timeout.ms=6000
3), 啟動kafkaserver
/bin/kafka-server-start.sh ./config/server.properties &
4),修改filebeat文件, 最終形態
cat ./elk/filebeat-5.5.2-linux-x86_64/filebeat.yml | grep -v '#' | grep -v '^$'
filebeat.prospectors: - input_type: log paths: - /var/log/nginx/*.log encoding: utf-8 document_type: my-nginx-log scan_frequency: 5s harvester_buffer_size: 16384 max_bytes: 10485760 tail_files: true output.kafka: enabled: true hosts: ["www.wenbronk.com:9092"] topic: elk-%{[type]} worker: 2 max_retries: 3 bulk_max_size: 2048 timeout: 30s broker_timeout: 10s channel_buffer_size: 256 keep_alive: 60 compression: gzip max_message_bytes: 1000000 required_acks: 0 client_id: beats
5), 重新啟動filebeat
./filebeat -c ./filebeat.yml &
6), 修改 logstash的input
input { kafka { #codec => "json" topics_pattern => "elk-.*" bootstrap_servers => "127.0.0.1:9092" auto_offset_reset => "latest" group_id => "logstash-g1" } } output { elasticsearch { #Logstash輸出到elasticsearch; hosts => ["localhost:9200"] #elasticsearch為本地; index => "logstash-nginx-%{+YYYY.MM.dd}" #創建索引; document_type => "nginx" #文檔類型; workers => 1 #進程數量; user => elastic #elasticsearch的用戶; password => changeme #elasticsearch的密碼; flush_size => 20000 idle_flush_time => 10 } }
7), 重啟logstash
8 ), 頁面訪問 nginx, 可以查看消息隊列中的消息
./bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic elk-log -m-beginning