此次使用kafka代替redis,elk集群搭建過程請參考:https://www.cnblogs.com/dmjx/p/9120474.html
kafka名詞解釋:
- 1、話題(Topic):是特定類型的消息流。消息是字節的有效負載(Payload),話題是消息的分類名或種子(Feed)名;
- 2、生產者(Producer):是能夠發布消息到話題的任何對象;
- 3、服務代理(Broker):已發布的消息保存在一組服務器中,它們被稱為代理(Broker)或Kafka集群;
- 4、消費者(Consumer):可以訂閱一個或多個話題,並從Broker拉數據,從而消費這些已發布的消息;
系統環境信息:
CentOS Linux release 7.3.1611 (Core)
基礎環境准備:
關閉防火牆:systemctl stop firewalld
SeLinux設為disabled: setenforce 0
jdk版本:jdk_1.8
kafka版本:kafka_2.12-2.1.0
本次搭建使用了三個節點,kafka下載地址:http://mirror.rise.ph/apache/kafka/2.1.0/kafka_2.12-2.1.0.tgz
環境搭建
zookeeoer配置部署:
1. 修改配置文件:zookeeper.properties
dataDir=/apps/product/zookeeper/data clientPort=2181 maxClientCnxns=10 tickTime=2000 initLimit=10 syncLimit=5 server.1=10.20.88.199:2888:3888 server.2=10.20.88.200:2888:3888 server.3=10.20.88.201:2888:3888
2. 創建zookeeper所需要的目錄
mkdir /apps/product/zookeeper/data
3. 創建myid文件用於標識主機
echo 1 > /apps/product/zookeeper/data/myid
以上就時zookeeper的配置,將上邊的配置復制到其他兩個節點就可以了,注意修改myid文件。
4. 啟動zookeeper並查看狀態
./bin/zookeeper-server-start.sh config/zookeeper.properties >> /apps/product/kafka_2.12-2.1.0/logs/zookeeper.log & netstat -nlpt | grep -E "2181|2888|3888" tcp 0 0 10.20.88.200:3888 0.0.0.0:* LISTEN 17711/java tcp 0 0 0.0.0.0:2181 0.0.0.0:* LISTEN 17711/java tcp 0 0 10.100.67.62:2888 0.0.0.0:* LISTEN 17711/java
kafka的配置部署:
1. 修改配置文件:server.properties(以下為本次的配置)
broker.id=1 prot=9092 host.name=10.20.88.199 num.network.threads=4 num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 log.dirs=/apps/product/kafka_2.12-2.1.0/kafka_data num.partitions=3 offsets.topic.replication.factor=1 transaction.state.log.replication.factor=1 transaction.state.log.min.isr=1 log.cleanup.policy=delete log.retention.hours=168 log.segment.bytes=1073741824 log.retention.check.interval.ms=300000 zookeeper.connect=10.20.88.199:2181,10.20.88.200:2181,10.20.88.201:2181 zookeeper.connection.timeout.ms=6000 log.cleaner.backoff.ms=15000 group.initial.rebalance.delay.ms=0
2. 創建kafka所需要的數據目錄
/apps/product/kafka_2.12-2.1.0/kafka_data
3. kafka啟動
./bin/kafka-server-start.sh config/server.properties >> /dev/null &
kafka的操作:
1. 創建一個topic
kafka-topics.sh --create --zookeeper 10.20.88.200:2181 --replication-factor 3 --partitions 4 --topic test_wx --partitions 4 # 4個分區 --replication-factor 3 # 3個副本 # 將分區被復制到三個broker上
2. 查看有哪些topic
bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --list
3. 查看主題的詳情
./bin/kafka-topics.sh --describe --zookeeper 10.20.88.199:2181 --topic test_wx Topic:test_wx PartitionCount:4 ReplicationFactor:3 Configs: Topic: test_wx Partition: 0 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1 Topic: test_wx Partition: 1 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2 Topic: test_wx Partition: 2 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3 Topic: test_wx Partition: 3 Leader: 2 Replicas: 2,1,3 Isr: 2,1,3
4. 輸入輸出測試
# 輸入 ./bin/kafka-console-producer.sh --topic test_wx --broker-list 10.20.88.199:9092 # 輸出 ./bin/kafka-console-consumer.sh --bootstrap-server 10.20.88.199:9092 --topic test_wx --from-beginning
5. 修改操作
# 增加分區數 ./bin/kafka-topics.sh --zookeeper zk_host:port --alter --topic my_topic_name --partitions 10 # 增加配置 ./bin/kafka-topics.sh --zookeeper zk_host:port --alter --topic my_topic_name --config flush.messages=1 # 刪除配置 ./bin/kafka-topics.sh --zookeeper zk_host:port --alter --topic my_topic_name --delete-config flush.messages
6. 刪除topic
# 刪除topic 目前刪除操作默認情況下只是打上一個刪除的標記,再重新啟動kafka后才刪除。如果需要立即刪除,則需要在server.properties中配置:delete.topic.enable=true ./bin/kafka-topics.sh --zookeeper zk_host:port --delete --topic my_topic_name
filebeat中輸出至kafka配置
filebeat: prospectors: - paths: - /var/log/named/namequery.log output.kafka: hosts: ["10.20.88.199:9092","10.20.88.200:9092","10.20.88.201:9092"] topic: 'dns_ct' partition.round_robin: reachable_only: false required_acks: 1 compression: gzip max_message_bytes: 1000000
logstash取kafka中的數據
input { kafka { bootstrap_servers => ["10.20.88.199:9092,10.20.88.200:9092,10.20.88.201:9092"] topics => ["dns_ct"] codec => "plain" auto_offset_reset => "latest" consumer_threads => 1 decorate_events => true type => "dns-ct" } } output { file { path => "/apps/logs/output-dns" } stdout { codec => rubydebug } }
