Apache Kafka是分布式發布-訂閱消息系統。它最初由LinkedIn公司開發,之后成為Apache項目的一部分。Kafka是一種快速、可擴展的、設計內在就是分布式的,分區的和可復制的提交日志服務。官網下載地址:http://kafka.apache.org/downloads
安裝
cd /usr/local/src
wget https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.6.0/kafka_2.13-2.6.0.tgz tar -zxvf kafka_2.13-2.6.0.tgz mv kafka_2.13-2.6.0 /usr/local/kafka cd /usr/local/kafka
啟動
Kafka使用ZooKeeper,所以您需要先啟動一個ZooKeeper服務器。如果沒有安裝。您可以使用隨Kafka一起打包的便捷腳本來獲取一個快速但是比較粗糙的單節點ZooKeeper實例。
1、啟動ZooKeeper
查看ZooKeeper配置
# Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # the directory where the snapshot is stored. dataDir=/tmp/zookeeper # the port at which the clients will connect clientPort=2181 ########## zookeeper的默認端口 2181,在后面會用到 # disable the per-ip limit on the number of connections since this is a non-production config maxClientCnxns=0 # Disable the adminserver by default to avoid port conflicts. # Set the port to something non-conflicting if choosing to enable this admin.enableServer=false # admin.serverPort=8080
啟動命令:
bin/zookeeper-server-start.sh config/zookeeper.properties
2、Kafka基本配置
Kafka在config目錄下提供了一個基本的配置文件。為了保證可以遠程訪問Kafka,我們需要修改兩處配置。
打開config/server.properties文件,在很靠前的位置有listeners和 advertised.listeners兩處配置的注釋,去掉這兩個注釋,並且根據當前服務器的IP修改如下:
vim config/server.properties
listeners=PLAINTEXT://:9092 advertised.listeners=PLAINTEXT://127.0.0.1:9092 #你需要修改為外網或局域網可以訪問到的服務器IP
3、啟動Kafka
bin/kafka-server-start.sh config/server.properties
或者使用后台啟動
bin/kafka-server-start.sh -daemon config/server.properties
這里碰到幾個問題,我的雲服務器只有1G內存:
java內存不足的問題
vim jvm.options #修改為 -Xms512m -Xmx512m
kafaka啟動時日式內存不足
vim bin/kafka-server-start.sh #修改為 if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then export KAFKA_HEAP_OPTS="-Xmx512m -Xms512m" fi
3、創建 Topic
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
查看 topic 列表
bin/kafka-topics.sh --list --zookeeper localhost:2181
查看描述 topics 信息
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
返回信息說明:
第一行給出了所有分區的摘要,每個附加行給出了關於一個分區的信息。 由於我們只有一個分區,所以只有一行。
“Leader”: 是負責給定分區的所有讀取和寫入的節點。 每個節點將成為分區隨機選擇部分的領導者。
“Replicas”: 是復制此分區日志的節點列表,無論它們是否是領導者,或者即使他們當前處於活動狀態。
“Isr”: 是一組“同步”副本。這是復制品列表的子集,當前活着並被引導到領導者。
4、啟動生產者
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
5、啟動消費者
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
--from-beginning 表示從起始位讀取
測試
在生產者窗口輸入,可以在消費者端查看輸入的內容
生產端:
消費端:
特別提示:
一定要先啟動ZooKeeper 再啟動Kafka 順序不可以改變。
先關閉kafka ,再關閉zookeeper。