kafka作為elk緩存使用


ELK集群在大規模的日志收集中面臨着數據量大,收集不及時,或宕機的風險,可以選擇單節點的redis,但是相比redis,kafka集群高可用的特性,更優,下面來配置kafka集群配置elk作為緩存的方法。

kafka集群的安裝配置

一. 初始環境准備

1.服務器准備
主機 地址
db01 10.0.0.200
db02 10.0.0.201
db03 10.0.0.202
# cat /etc/redhat-release  #這里我使用的是centos7.6的系統
CentOS Linux release 7.6.1810 (Core) 
2. 下載安裝包

安裝kafka之前,需要每台服務器上配置好zookeeper

#(以下操作,三台服務器同時操作)
#創建目錄
mkdir /kafka 
cd /kafka

#下載kafka地址
http://kafka.apache.org/downloads

#下載kafka
wget http://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.3.1/kafka_2.11-2.3.1.tgz

#下載zookeeper地址
http://zookeeper.apache.org/releases.html

#下載zookeeper
wget https://mirrors.tuna.tsinghua.edu.cn/apache/zookeeper/zookeeper-3.4.14/zookeeper-3.4.14.tar.gz

#jdk准備好
jdk-8u151-linux-x64.tar.gz
3.配置java環境
#解壓tar包
tar xf jdk-8u151-linux-x64.tar.gz -C /opt

#創建軟鏈接
ln -s /opt/jdk1.8.0_151/ /opt/jdk

#配置環境變量
#在/etc/profile后三行添加如下:
# tail -3 /etc/profile 
export JAVA_HOME=/opt/jdk
export CLASSPATH=.:$JAVA_HOME/lib:$JAVA_HOME/jre/lib
export PATH=$PATH:$JAVA_HOME/bin

#驗證
source /etc/profile 

java -version
java version "1.8.0_151"

二. 安裝zookeeper服務

1.安裝kafka
#解壓zookeeper
tar xf zookeeper-3.4.14.tar.gz -C /opt/

#創建軟鏈接
ln -s /opt/zookeeper-3.4.14 /opt/zookeeper

#創建數據目錄
mkdir -p /data/zookeeper

#編輯配置文件
cp /opt/zookeeper/conf/zoo_sample.cfg /opt/zookeeper/conf/zoo.cfg

vim /opt/zookeeper/conf/zoo.cfg

tickTime=2000
initLimit=10
syncLimit=5
dataDir=/data/zookeeper
clientPort=2181
server.1=10.0.0.200:2888:3888
server.2=10.0.0.201:2888:3888
server.3=10.0.0.202:2888:3888

#復制到另外兩台
scp /opt/zookeeper/conf/zoo.cfg 10.0.0.201:/opt/zookeeper/conf/
  
scp /opt/zookeeper/conf/zoo.cfg 10.0.0.202:/opt/zookeeper/conf/

#創建myid
echo 1 > /data/zookeeper/myid  #server1
echo 2 > /data/zookeeper/myid  #server2
echo 3 > /data/zookeeper/myid  #server3
3.啟動zookeeper
#三台都啟動zookeeper
/opt/zookeeper/bin/zkServer.sh start

#查看狀態
# /opt/zookeeper/bin/zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /opt/zookeeper/bin/../conf/zoo.cfg
# Mode: follower 從庫
# Mode: leader  主庫

#配置環境變量,更方便的使用命令
echo "export PATH=/opt/zookeeper/bin/:$PATH" >> /etc/profile
source /etc/profile

#測試
zkCli.sh -server 10.0.0.202:2181 #可以登錄任意一個節點

create /test dhc  #創建

get /test  #查看

set /test ctt #改變key的值

三.安裝kafka並測試

1.安裝kafka
#以下操作3台服務器都需要操作,以server1為事列

#解壓
tar xf kafka_2.11-2.3.1.tgz -C /opt/
cd /opt

#創建軟鏈接
ln -s kafka_2.11-2.3.1 kafka
cd kafka

#編輯配置文件

vim config/server.properties  #我只放修改的,其他的不動

broker.id=1   #對應zookeeper的id
listeners=PLAINTEXT://10.0.0.200:9092 #配置自己的ip地址
log.retention.hours=24   #改成24小時
zookeeper.connect=10.0.0.200:2181,10.0.0.201:2181,10.0.0.202:2181 #配置集群


#測試啟動
/opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.properties  #只是測試

#最后一行出現一下,證明啟動成功
INFO [KafkaServer id=1] started (kafka.server.KafkaServer)
 
#ctrl +c掉,重新后台運行
/opt/kafka/bin/kafka-server-start.sh -daemon /opt/kafka/config/server.properties

#測試
ps all|grep kafka

#查看日志
tail -f /opt/kafka/logs/server.log

#在node1上創建test
# bin/kafka-topics.sh --create --bootstrap-server 10.0.0.200:9092 --replication-factor 1 --partitions 1 --topic test

#在node2上創建test1
# bin/kafka-topics.sh --create --bootstrap-server 10.0.0.201:9092 --replication-factor 1 --partitions 1 --topic test1

#在node3上查看創建了什么,如果可以看到證明成功
# bin/kafka-topics.sh --list --bootstrap-server 10.0.0.202:9092

#查看具體的topic里面有什么內容
/opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server 10.0.0.200:9092 --topic kafka --from-beginning

#具體實驗步驟請看官網
http://kafka.apache.org/quickstart

四 安裝filebeat並配置

#安裝filebeat(這里就不詳細說明了,詳細請看上一篇博客)
rpm -ivh jdk-8u102-linux-x64.rpm 
rpm -ivh filebeat-7.5.0-x86_64.rpm

# grep -Ev '^$|#' /etc/filebeat/filebeat.yml  #修改后的配置文件
filebeat.inputs:
- type: log
  enabled: true
  paths:
    - /var/log/nginx/access.log
filebeat.config.modules:
  path: ${path.config}/modules.d/*.yml
  reload.enabled: false
setup.template.settings:
  index.number_of_shards: 1
setup.kibana:
output.kafka:
  hosts: ["10.0.0.200:9092","10.0.0.201:9092","10.0.0.202:9092"]
  topic: kafka
processors:
  - add_host_metadata: ~
  - add_cloud_metadata: ~
  - add_docker_metadata: ~
  - add_kubernetes_metadata: ~

#啟動filebeat
systemctl start filebeat

#驗證kafka里面是否有新增topic
/opt/kafka/bin/kafka-topics.sh --list --bootstrap-server 10.0.0.201:9092


#查看topic-kafka里面有沒有內容
/opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server 10.0.0.200:9092 --topic kafka --from-beginning

五 配置logstash

#安裝logstash(具體的安裝步驟請看上一篇博客)
rpm -ivh logstash-7.5.0.rpm

#配置logstash
[root@db01 elk]# cat /etc/logstash/conf.d/kafka.conf 
input { 

kafka{
		bootstrap_servers => ["10.0.0.200:9092,10.0.0.201:9092,10.0.0.202:9092"]
		group_id => "test"
		auto_offset_reset => "earliest"
		consumer_threads => "5"
		decorate_events => "false"
		topics => ["kafka"]
		type => "bbs_log"
		codec => json
 
	}
}
output {
stdout {}   #只是用來測試輸出

}
到此,kafka集群配合filebeat+logstash 完成!


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM