一、Kafka 簡介
1.基本概念
Kafka 是一個分布式的基於發布/訂閱消息系統,主要應用於大數據實時處理領域,其官網是:http://kafka.apache.org/。Kafka 是一個分布式、支持分區的(Partition)、多副本的(Replica),基於 ZooKeeper 協調的發布/訂閱消息系統。
Kafka 有以下三個基本概念:
- Kafka 作為一個集群運行在一個或多個服務器上;
- Kafka 集群存儲的消息是以 Topic 為類別記錄的;
- 每個消息是由一個 Key,一個 Value 和時間戳構成。
2.基本架構
Kafka 的基本架構圖如下:
- Producer:生產者,就是向 Broker 發消息的客戶端;
- Consumer:消費者,就是從 Broker 取消息的客戶端;
- Consumer Group:消費者組,由多個消費者組成。組內每個消費者負責消費不同分區的數據,一個分區的數據只能由一個組內的消費者進行消費,組內消費者之間互不影響;
- Broker:一個 Kafka 服務器就是一個 Broker,一個集群由多個 Broker 組成;
- Topic:主題,可以理解為隊列,生成者和消費者都是用的同一個隊列;
- Partition:分區,為實現擴展性,一個大的 Topic 可以分散到多個 Broker 上,一個 Topic 可以分為多個 Partition;
- Replica:副本,保證集群中某個節點發生故障時,該節點上的數據不丟失。
二、Ubuntu 下安裝 Kafka
1.安裝 Java
更新軟件包
sudo apt-get update
安裝 openjdk-8-jdk
sudo apt-get install openjdk-8-jdk
查看 Java 版本,檢查是否安裝成功
2.安裝 ZooKeeper
1)安裝
下載 ZooKeeper:http://mirrors.hust.edu.cn/apache/zookeeper/。
下載好之后解壓(注意:3.5.5之后的版本應該下載文件名中帶“bin”的壓縮包),再執行如下命令:
sudo mv apache-zookeeper-3.5.8-bin /usr/local/zookeeper
cd /usr/local/zookeeper
cp conf/zoo_sample.cfg conf/zoo.cfg
其中有一些配置參數:
- tickTime:Zookeeper 使用的基本時間單元,默認值2000;
- initLimit:Zookeeper 中連接同步的最大時間,默認值為10;
- syncLimit:Zookeeper 中進行心跳檢測的最大時間,默認值為5;
- dataDir:數據庫更新事物保存的目錄;
- clientPort:Zookeeper 服務監聽的端口,默認值為2181。
2)配置
修改 /etc/profile 文件,增加如下內容:
export ZOOKEEPER_HOME=/usr/local/zookeeper/
export PATH=$PATH:$ZOOKEEPER_HOME/bin
更新環境變量
source /etc/profile
3)測試
首先進入 bin 目錄,開啟服務:
再啟動 CLI 連接服務:
3.安裝 Kafka
1)安裝
下載 Kafka:http://kafka.apache.org/downloads。
下載好之后解壓,再執行如下命令:
sudo mv kafka_2.13-2.5.0/ /usr/local/kafka
cd /usr/local/kafka
2)測試
由於前面已經啟動了 Zookeeper 服務,所以這里只需要執行如下命令來開啟 Kafka 服務:
bin/kafka-server-start.sh config/server.properties
通過輸出信息可以看到 Kafka 服務已經成功開啟了,截圖如下:
但這樣開啟之后是阻塞的了,我們可以在中間加一個“-daemon”即開一個守護進程來運行,則命令如下:
bin/kafka-server-start.sh -daemon config/server.properties
創建一個主題,用一個分區和一個副本創建一個名為“mytopic”的主題:
bin/kafka-topics.sh --create --zookeeper 127.0.0.1:2181 --replication-factor 1 --partitions 1 --topic mytopic
這樣就已經創建成功了,然后可以使用如下命令查看主題:
bin/kafka-topics.sh --list --zookeeper 127.0.0.1:2181
Kafka 有一個命令行服務端,它將從文件或標准輸入中獲取輸入,並將其作為消息發送到 Kafka 集群。默認情況下,每行將作為單獨的消息發送:
bin/kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic mytopic
同樣的,Kafka 還有一個命令行客戶端,可以從 Kafka 集群中獲取消息:
bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic mytopic --from-beginning
三、kafka-python 使用
1.安裝 kafka-python
pip3 install kafka-python
2.創建 Consumer
Consumer 消費者負責從 Kafka 中獲取消息進行處理,需要實例化 KafkaConsumer 這個類。
1 from kafka import KafkaConsumer 2 3 4 consumer = KafkaConsumer("test", bootstrap_servers=["localhost:9092"]) 5 for msg in consumer: 6 print(msg)
3.創建 Producer
Producer 生產者負責向 Kafka 生產和發送消息,需要實例化 KafkaProducer 這個類。
1 from kafka import KafkaProducer 2 3 4 producer = KafkaProducer(bootstrap_servers="localhost:9092") 5 for i in range(10): 6 producer.send("test", "Hello {}".format(i).encode("utf-8")) 7 producer.close()
4.運行測試
先運行消費者程序,再運行生產者程序,消費者一直在監聽,等到生產者發送消息,消費者就把消息取出,運行結果如下:
可以看到其中每個消息都包含了主題、分區、消息內容、時間戳等信息。