filebeat+elk組合之kafka單機部署
准備:
kafka下載鏈接地址:http://kafka.apache.org/downloads.html
在這里下載kafka_2.12-2.10.0.0.tgz(kafka和zookeeper都用同一個包里的)。
一、安裝和配置jdk(下載jdk,配置環境即可)
JAVA_HOME=/opt/jdk1.8.0_131
CLASSPATH=.:$JAVA_HOME/lib.tools.jar
PATH=$JAVA_HOME/bin:$PATH
export JAVA_HOME CLASSPATH PATH
$ java -version
java version "1.8.0_131"
Java(TM) SE Runtime Environment (build 1.8.0_131-b11)
Java HotSpot(TM) Server VM (build 25.131-b11, mixed mode)
或者在bin/kafka-run-class.sh指定kafka jdk 環境變量
vi bin/kafka-run-class.sh
JAVA_HOME=/opt/jdk1.8.0_131
二、安裝Kafka
1、安裝glibc
# yum -y install glibc.i686
2、解壓kafka_2.12-2.10.0.0.tgz
先配置zookeeper
$cd kafka_2.12-2.10.0.0
$vi config/zookeeper.properties
dataDir=/data/soft/kafka/data
dataLogDir=/data/soft/kafka/log
clientPort=2181
maxClientCnxns=100
tickTime=2000
initLimit=10
配置后直接啟動zookeeper:
$bin/zookeeper-server-start.sh config/zookeeper.properties
如果沒有報錯,可以轉后台啟動:
$nohup bin/zookeeper-server-start.sh config/zookeeper.properties &
再配置kafka
$ vi config/server.properties
broker.id=0
listeners=PLAINTEXT://0.0.0.0:9092
advertised.listeners=PLAINTEXT://server20.srv:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/data/log/kafka
num.partitions=2
num.recovery.threads.per.data.dir=1
log.retention.check.interval.ms=300000
zookeeper.connect=localhost:2181
zookeeper.connection.timeout.ms=6000
啟動kafka:
$ bin/kafka-server-start.sh config/server.properties
如果沒有報錯,可以轉后台啟動:
$nohup bin/kafka-server-start.sh config/server.properties &
檢查啟動情況:默認開啟的端口為2181(zookeeper)和9202(kafka)。
3、測試kafka:
(1)、創建topic
$bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
(2)、查看創建的topic
$ bin/kafka-topics.sh --list --zookeeper localhost:2181
test
(3)、生產消息測試(模擬客戶端發送消息)
$bin/kafka-console-producer.sh --broker-list 192.168.53.20:9092 --topic test
> ..hello world.. #輸入內容回車
>
(4)、消費消息測試(模擬客戶端接收信息)
$bin/kafka-console-consumer.sh --bootstrap-server 192.168.53.20:9202 --topic test --from-beginning
..hello world.. #如果能正常接收到信息說明kafka部署正常
(5)、刪除topic
$bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic test
完成以上代表kafka單包機安裝成功。
三、配置filebeat
filebeat.yml文件添加配置信息,注釋掉原來的logstash output。
#------------------- Kafka output ---------------------
output.kafka:
hosts: ["server20.srv:9092"]
topic: 'kafka_logstash'
四、配置logstash
logstash.conf文件添加配置信息,注釋掉原來input{beats...}。
input {
kafka {
codec => "json"
bootstrap_servers => "server20.srv:9092"
topics => ["kafka_logstash"]
group_id => "kafka-consumer-group"
decorate_events => true
auto_offset_reset => "latest"
}
在logstash服務器上配置好kafka訪問地址:
$ cat /etc/hosts
122.9.10.106 server20.srv 8bet-kafka
五、kafka相關配置文件參考
$ cat config/server.properties | egrep -v '^$|#'
broker.id=0
listeners=PLAINTEXT://0.0.0.0:9092
advertised.listeners=PLAINTEXT://server20.srv:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/data/log/kafka
num.partitions=2
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=localhost:2181
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
$cat config/zookeeper.properties | egrep -v '^$|#'
dataDir=/data/soft/kafka/data
dataLogDir=/data/soft/kafka/zookeeper_log
clientPort=2181
maxClientCnxns=100
tickTime=2000
initLimit=10
$cat config/producer.properties | egrep -v '^$|#'
bootstrap.servers=localhost:9092
compression.type=none
$cat config/consumer.properties | egrep -v '^$|#'
bootstrap.servers=localhost:9092
group.id=kafka-consumer-group
六、配置完后測試消費消息連通,如果接受正常,則成功
$bin/kafka-console-consumer.sh --bootstrap-server server20.srv:9202 --topic test --from-beginning