linux單機部署kafka(filebeat+elk組合)


 

filebeat+elk組合之kafka單機部署

 

准備:

kafka下載鏈接地址:http://kafka.apache.org/downloads.html

在這里下載kafka_2.12-2.10.0.0.tgz(kafka和zookeeper都用同一個包里的)。

 

一、安裝和配置jdk(下載jdk,配置環境即可)

JAVA_HOME=/opt/jdk1.8.0_131

CLASSPATH=.:$JAVA_HOME/lib.tools.jar

PATH=$JAVA_HOME/bin:$PATH

export JAVA_HOME CLASSPATH PATH

 

$ java -version

java version "1.8.0_131"

Java(TM) SE Runtime Environment (build 1.8.0_131-b11)

Java HotSpot(TM) Server VM (build 25.131-b11, mixed mode)

或者在bin/kafka-run-class.sh指定kafka jdk 環境變量

vi bin/kafka-run-class.sh

JAVA_HOME=/opt/jdk1.8.0_131

二、安裝Kafka

1安裝glibc

# yum -y install glibc.i686

2、解壓kafka_2.12-2.10.0.0.tgz

先配置zookeeper

$cd  kafka_2.12-2.10.0.0

$vi config/zookeeper.properties

dataDir=/data/soft/kafka/data

dataLogDir=/data/soft/kafka/log

clientPort=2181

maxClientCnxns=100

tickTime=2000

initLimit=10

 

配置后直接啟動zookeeper:

$bin/zookeeper-server-start.sh config/zookeeper.properties

 

如果沒有報錯,可以轉后台啟動:

$nohup bin/zookeeper-server-start.sh config/zookeeper.properties &

再配置kafka

$ vi config/server.properties

broker.id=0

listeners=PLAINTEXT://0.0.0.0:9092

advertised.listeners=PLAINTEXT://server20.srv:9092

num.network.threads=3

num.io.threads=8

socket.send.buffer.bytes=102400

socket.receive.buffer.bytes=102400

socket.request.max.bytes=104857600

log.dirs=/data/log/kafka

num.partitions=2

num.recovery.threads.per.data.dir=1

log.retention.check.interval.ms=300000

zookeeper.connect=localhost:2181

zookeeper.connection.timeout.ms=6000

 

啟動kafka:

$ bin/kafka-server-start.sh config/server.properties

 

如果沒有報錯,可以轉后台啟動:

$nohup bin/kafka-server-start.sh config/server.properties &

檢查啟動情況:默認開啟的端口為2181(zookeeper)和9202(kafka)。

3、測試kafka

(1)、創建topic

$bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

 

(2)、查看創建的topic

$ bin/kafka-topics.sh --list --zookeeper localhost:2181

test

 

(3)、生產消息測試(模擬客戶端發送消息)

$bin/kafka-console-producer.sh --broker-list 192.168.53.20:9092 --topic test

> ..hello world..          #輸入內容回車

 

(4)、消費消息測試(模擬客戶端接收信息)

$bin/kafka-console-consumer.sh --bootstrap-server 192.168.53.20:9202 --topic test --from-beginning

 

..hello world..                   #如果能正常接收到信息說明kafka部署正常

 

(5)、刪除topic

$bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic test

 

完成以上代表kafka單包機安裝成功。

三、配置filebeat

filebeat.yml文件添加配置信息,注釋掉原來的logstash output。

#------------------- Kafka output ---------------------

output.kafka:

  hosts: ["server20.srv:9092"]

  topic: 'kafka_logstash'

 

四、配置logstash

logstash.conf文件添加配置信息,注釋掉原來input{beats...}。

  input {

    kafka {

      codec => "json"

      bootstrap_servers => "server20.srv:9092"

      topics => ["kafka_logstash"]

      group_id => "kafka-consumer-group"

      decorate_events => true

      auto_offset_reset => "latest"

  }

在logstash服務器上配置好kafka訪問地址:

$ cat /etc/hosts

122.9.10.106    server20.srv    8bet-kafka

五、kafka相關配置文件參考

$ cat config/server.properties | egrep -v '^$|#'

broker.id=0

listeners=PLAINTEXT://0.0.0.0:9092

advertised.listeners=PLAINTEXT://server20.srv:9092

num.network.threads=3

num.io.threads=8

socket.send.buffer.bytes=102400

socket.receive.buffer.bytes=102400

socket.request.max.bytes=104857600

log.dirs=/data/log/kafka

num.partitions=2

num.recovery.threads.per.data.dir=1

offsets.topic.replication.factor=1

transaction.state.log.replication.factor=1

transaction.state.log.min.isr=1

log.retention.hours=168

log.segment.bytes=1073741824

log.retention.check.interval.ms=300000

zookeeper.connect=localhost:2181

zookeeper.connection.timeout.ms=6000

group.initial.rebalance.delay.ms=0

 

$cat config/zookeeper.properties | egrep -v '^$|#'

dataDir=/data/soft/kafka/data

dataLogDir=/data/soft/kafka/zookeeper_log

clientPort=2181

maxClientCnxns=100

tickTime=2000

initLimit=10

$cat config/producer.properties | egrep -v '^$|#'

bootstrap.servers=localhost:9092

compression.type=none

$cat config/consumer.properties | egrep -v '^$|#'

bootstrap.servers=localhost:9092

group.id=kafka-consumer-group

六、配置完后測試消費消息連通,如果接受正常,則成功

$bin/kafka-console-consumer.sh --bootstrap-server server20.srv:9202 --topic test --from-beginning

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM