一、簡介
Kafka 是一個實現了分布式的、具有分區、以及復制的日志的一個服務。它通過一套獨特的設計提供了消息系統中間件的功能。它是一種發布訂閱功能的消息系統。
1、名詞介紹
Message
消息,就是要發送的內容,一般包裝成一個消息對象。
Topic
通俗來講的話,就是放置“消息”的地方,也就是說消息投遞的一個容器。假如把消息看作是信封的話,那么 Topic 就是一個郵箱
Partition && Log
Partition 分區,可以理解為一個邏輯上的分區,像是我們電腦的磁盤 C:, D:, E: 盤一樣,
Kafka 為每個分區維護着一份日志Log文件。
Producers(生產者)
和其他消息隊列一樣,生產者通常都是消息的產生方。
在 Kafka 中它決定消息發送到指定Topic的哪個分區上。
Consumers(消費者)
消費者就是消息的使用者,在消費者端也有幾個名詞需要區分一下。
一般消息隊列有兩種模式的消費方式,分別是 隊列模式 和 訂閱模式。
隊列模式:一對一,就是一個消息只能被一個消費者消費,不能重復消費。一般情況隊列支持存在多個消費者,但是對於一個消息,只會有一個消費者可以消費它。
訂閱模式:一對多,一個消息可能被多次消費,消息生產者將消息發布到Topic中,只要是訂閱改Topic的消費者都可以消費。
二、安裝zookeeper
1、簡介
Kafka使用zookeeper作為其分布式協調框架,很好的將消息生產、消息存儲、消息消費的過程結合在一起。同時借助zookeeper,kafka能夠生產者、消費者和broker在內的所以組件在無狀態的情況下,建立起生產者和消費者的訂閱關系,並實現生產者與消費者的負載均衡。
2、下載zookeeper
可以到zookeeper官網下載
http://zookeeper.apache.org/releases.html
3、配置zookeeper
(1)下載解壓完成后,來到conf文件夾下,有一個 zoo_sample.cfg 官方默認的配置文件。復制一份,重命名為 zoo.cfg
(2)配置,打開zoo.cfg 修改配置信息
#存儲內存中數據庫快照的位置,如果不設置參數,更新事務日志將被存儲到默認位置。 dataDir=../zkData #日志文件的位置 dataLogDir=../zkLog #監聽端口 clientPort=2181
(3)集群配置
server.1=127.0.0.1:12888:1388 server.2=127.0.0.1:12889:1389 server.3=127.0.0.1:12887:1387
格式: server.A = B:C:D
A:是一個數字,表示第幾號服務器
B:服務器IP地址
C:是一個端口號,用來集群成員的信息交換,表示這個服務器與集群中的leader服務器交換信息的端口
D:是在leader掛掉時專門用來進行選舉leader所用的端口
完整的配置文件如下
復制兩份zookeeper解壓好配置后的文件夾,命名為
clientPort=2181
clientPort=2182
clientPort=2183
創建ServerID
在配置的dataDir目錄下面新建一個 myid 文件,文件內容就是對應的id號,
比如:
zookeeper-3.4.6程序 myid 文件的內容 為 1
zookeeper-3.4.6-2程序 myid 文件的內容 為 2
zookeeper-3.4.6-3程序 myid 文件的內容 為 3
我這邊配置的目錄是
啟動zookeeper
在對應的bin目錄下啟動
zkServer.cmd
三、安裝kafka
(1)下載
去官網 http://kafka.apache.org/下載即可 這邊下載的是
https://www.apache.org/dyn/closer.cgi?path=/kafka/0.8.2.2/kafka_2.9.2-0.8.2.2.tgz
這個版本
(2)配置
解壓后到config文件夾下 打開server.properties配置文件進行配置
(3)配置內容
修改或新增以下配置信息
#唯一標識 broker.id=0 #監聽端口 port=9092 host.name=127.0.0.1 #消息最大大小 message.max.bytes=50485760 #配置副本數量 default.replication.factor=2 #獲取的最大大小 replica.fetch.max.bytes=50485760 #隊列中消息持久化存放的位置,可以多個目錄,用逗號分開 log.dirs=/tmp/kafka-logs #默認的分區數 num.partitions=2 #對應着剛剛配置的zookeeper的三個ip與端口地址 zookeeper.connect=127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183
(4)集群配置
復制兩份解壓后的文件,命名如下
修改部分配置信息
在對應的server.properties中修改
#唯一標識
broker.id=0
broker.id=1
broker.id=2
#監聽端口
port=9092
port=9093
port=9094
啟動對應的kafka
進入到bin/windows目錄下 啟動kafka並指定配置文件
kafka-server-start.bat ../../config/server.properties
啟動過程中如果遇到Kafka中錯誤:
Unrecognized VM option ‘UseCompressedOops’ Error: Clould not create the Java Vritual Machine. Error: A fatal exception has occurres . Program will exit.
解決方案:
找到bin/windows/kafka-run-class.bat 文件,
找到112行左右
IF ["%KAFKA_JVM_PERFORMANCE_OPTS%"] EQU [""] ( set KAFKA_JVM_PERFORMANCE_OPTS=-server -XX:+UseCompressedOops -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:+CMSClassUnloadingEnabled -XX:+CMSScavengeBeforeRemark -XX:+DisableExplicitGC -Djava.awt.headless=true )
刪除掉 -XX:+UseCompressedOops 即可
測試集群
(1)創建一個 topic
kafka-topics.bat --create --zookeeper 127.0.0.1:2181 --replication-factor 1 --partitions 1 --topic test
(2)查看是否創建成功
kafka-topics.bat --list --zookeeper localhost:2181
(3)發送消息
kafka-console-producer.bat --broker-list localhost:9092 --topic test
This is a message
(4)接收消息
kafka-console-consumer.bat --zookeeper localhost:2181 --topic test --from-beginning
不同客戶端能接收到消息,說明配置成功
公眾號
歡迎關注我的公眾號“碼上開發”,每天分享最新技術資訊。關注獲取最新資源