Kafka簡明教程


概述

Kafka 是一個分布式消息隊列(MQ, Message queue)中間件,支持點對點(Quene)、發布訂閱(Topic)模式。Kafka 的定位主要在日志等方面,單擊吞吐量特別大, 因為Kafka 設計的初衷就是處理日志的,可以看做是一個日志(消息)系統一個重要組件,針對性很強。

使用場景:

  • 網站活動跟蹤:根據不同的業務數據類型,將消息發布到不同的 Topic。
  • 日志聚合:可以將多台主機或應用的日志數據抽象成一個個日志或事件的消息流,異步發送到 Kafka 集群。
  • 流計算處理:構建應用系統和分析系統的橋梁,並將它們之間的關聯解耦。
  • 數據中轉樞紐:利用 Kafka 作為數據中轉樞紐,同份數據可以被導入到不同專用系統中。

官網:http://kafka.apache.org/
中文站:http://kafka.apachecn.org/

名稱: Kafka
所屬社區/公司:Apache
開發語言: Java
協議: 自行設計的協議,仿AMQP
事務:不支持
集群:支持,依賴ZooKeeper

快速入門

官方的 quickstart 已經非常詳細了,按照文檔可以一步一步的達到入門的效果。地址:http://kafka.apache.org/quickstart

這里我記錄一下簡單的步驟,僅作為測試使用,真實環境請參考官方文檔部署:
1、下載解壓:

$ cd /opt
$ wget http://mirror.bit.edu.cn/apache/kafka/2.2.0/kafka_2.12-2.2.0.tgz
$ tar -xzf kafka_2.12-2.2.0.tgz
$ cd kafka_2.12-2.2.0

Kafka 依賴 ZooKeeper 。安裝包里已經包含了 ZooKeeper。

2、啟動 ZooKeeper

$ bin/zookeeper-server-start.sh config/zookeeper.properties

# 限於篇幅,省略大部分輸出
...
[2019-05-11 13:15:44,643] INFO binding to port 0.0.0.0/0.0.0.0:2181 (org.apache.zookeeper.server.NIOServerCnxnFactory)

如果需要后台運行,請在命令后面追加&

3、啟動 Kafka Server端

$ bin/kafka-server-start.sh config/server.properties

# 限於篇幅,省略大部分輸出
...
[2019-05-11 13:18:34,578] INFO Kafka version: 2.2.0 (org.apache.kafka.common.utils.AppInfoParser)
[2019-05-11 13:18:34,578] INFO Kafka commitId: 05fcfde8f69b0349 (org.apache.kafka.common.utils.AppInfoParser)
[2019-05-11 13:18:34,579] INFO [KafkaServer id=0] started (kafka.server.KafkaServer)

如果需要后台運行,請在命令后面追加&

4、創建主題(Topic)
創建一個名為 test 的主題,包含1個分區(partition),1個副本(replication-factor):

$ bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test

創建完畢后可以查看該主題:

$  bin/kafka-topics.sh --list --bootstrap-server localhost:9092

test

也可以在配置里設置為在發布不存在的主題時自動創建主題,而不是手動創建主題。這個后面再說明。

5、發布消息
我們新啟動一個命令行窗口充當生產者,向 Kafka 里發送消息,指定主題為 test

$ cd /opt/kafka_2.12-2.2.0/
$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

>

然后命令行等待我們輸入消息。我們輸入 hello回車:

>hello
>

消息就發出去了。接下來我們啟動消費者。

6、消費消息

我們新啟動一個命令行窗口充當消費者來消費消息,指定主題為 test

$ cd /opt/kafka_2.12-2.2.0/
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

hello

就消費了1條消息。我們可以在生產者命令行窗口繼續發生消息,消費者端可以實時消費。

好了,基本的安裝測試就到這。關於設置kakfa集群請參考:http://kafka.apache.org/quickstart#quickstart_multibroker

如何在項目里使用

上一節僅演示了在命令行里使用,可以方便調試。對於在項目里使用,需要借助 SDK。這個頁面收錄了所有的客戶端:https://cwiki.apache.org/confluence/display/KAFKA/Clients

PHP

常用的SDK:

這里以 kafka-php 為例。

kafka-php 使用純粹的PHP 編寫的 kafka 客戶端,目前支持 0.8.x 以上版本的 Kafka。最新的kafka-php 版本是 v0.2.8 (截止到2019-05-11),詳見:https://github.com/weiboad/kafka-php/releaseskafka-phpv0.2.xv0.1.x 不兼容,如果使用原有的 v0.1.x 的可以參照文檔 Kafka PHP v0.1.x Document, 不過建議切換到 v0.2.x 上。

kafka-php (v0.2.8) 環境要求:

  • PHP 版本大於 5.5
  • Kafka Server 版本大於 0.8.0
  • 消費模塊 Kafka Server 版本需要大於 0.9.0

1、發送消息,同步方式:

require '../vendor/autoload.php';
date_default_timezone_set('PRC');

// use Monolog\Logger;
// //use Monolog\Handler\StdoutHandler;
// Create the logger
// $logger = new Logger('my_logger');
// //Now add some handlers
// $logger->pushHandler(new StdoutHandler());

$config = \Kafka\ProducerConfig::getInstance();
$config->setMetadataRefreshIntervalMs(10000);
$config->setMetadataBrokerList('127.0.0.1:9192,127.0.0.1:9193');
$config->setBrokerVersion('0.10.2.1');
$config->setRequiredAck(1);
$config->setIsAsyn(false);
$config->setProduceInterval(500);
$config->setTimeout(2000);

$producer = new \Kafka\Producer();
// $producer->setLogger($logger);

for($i = 0; $i < 100; $i++) {
    $result = $producer->send(array(
        array(
            'topic' => 'test1',
            'value' => 'test1....message.',
            'key' => '',
        ),
    ));
    var_dump($result);
}

說明:

  1. 設置 logger 不是必選的。但是如果需要調試,建議加上。如果沒有安裝Monolog,也可以自己定一個 logger ,只要實現了 psr/log規范即可。
  2. MetadataBrokerList支持集群配置。使用英文逗號隔開即可。
  3. BrokerVersion版本需與安裝的 kafka 版本一致。

2、消費消息

消費消息一般需要寫腳本常駐運行。可以借助 Supervisor 工具。

require '../vendor/autoload.php';
date_default_timezone_set('PRC');

// use Monolog\Logger;
// use Monolog\Handler\StdoutHandler;
// // Create the logger
// $logger = new Logger('my_logger');
// // Now add some handlers
// $logger->pushHandler(new StdoutHandler());

$config = \Kafka\ConsumerConfig::getInstance();
$config->setMetadataRefreshIntervalMs(10000);
$config->setMetadataBrokerList('10.13.4.159:9192');
$config->setGroupId('test'); //消費者組
$config->setBrokerVersion('0.10.2.1');
$config->setTopics(['test']); //主題
//$config->setOffsetReset('earliest');
$consumer = new \Kafka\Consumer();

// $consumer->setLogger($logger);

$consumer->start(function($topic, $part, $message) {
	var_dump($message);
});

注意:

  1. 消費者組可以有多個,互相之間不影響。每個消費者組都可以消費到完整的一份消息。
  2. setOffsetReset的值有:earliest(從最早的開始消費)、latest(從最新的開始消費)。

Golang

Python

發送消息示例:

from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers='localhost:9092')
for _ in range(100):
    producer.send('test', b'some_message_bytes')

Kakfa 原理


(上圖為Kakfa架構圖)

一個典型的消息隊列 Kafka 集群包含:

  • Producer:通過 push 模式向消息隊列 Kafka Broker 發送消息,可以是網站的頁面訪問、服務器日志等,也可以是 CPU 和內存相關的系統資源信息;
  • Kafka Broker:消息隊列 Kafka 的服務器,用於存儲消息;支持水平擴展,一般 Broker 節點數量越多,集群吞吐率越高;
  • Consumer Group:通過 pull 模式從消息隊列 Kafka Broker 訂閱並消費消息;
  • Zookeeper:管理集群的配置、選舉 leader,以及在 Consumer Group 發生變化時進行負載均衡。

幾個重要概念

  • Broker:消息隊列 Kafka 集群包含一個或多個消息處理服務器,該服務器被稱為 Broker。
  • Topic:主題。每條發布到Kafka集群的消息都有一個類別,這個類別被稱為Topic。
  • Partition:分區。一個Topic中的消息數據按照多個分區組織,分區是kafka消息隊列組織的最小單位,一個分區可以看作是一個FIFO( First Input First Output的縮寫,先入先出隊列)的隊列。
  • Producer: 消息發布者,也稱為消息生產者,負責生產並發送消息到 Kafka Broker。
  • Consumer: 消息訂閱者,也稱為消息消費者,負責向 Kafka Broker 讀取消息並進行消費。
  • Consumer Group:消費者組。這類 Consumer 通常接收並消費同一類消息,且消費邏輯一致。Consumer Group 和 Topic 的關系是 N:N,同一個 Consumer Group 可以訂閱多個 Topic,同一個 Topic 也可以被多個 Consumer Group 訂閱。
  • Replication:副本。為了保證分布式可靠性,kafka0.8開始對每個分區的數據進行備份(不同的Broker上),防止其中一個Broker宕機造成分區上的數據不可用。

分區、組、消費者的關系

消息隊列 Kafka 采用 Pub/Sub(發布/訂閱)模型,其中:

  • Consumer Group 和 Topic 的關系是 N:N。 同一個 Consumer Group 可以訂閱多個 Topic,同一個 Topic 也可以同時被多個 Consumer Group 訂閱。
  • 同一 Topic 的一條消息只能被同一個 Consumer Group 內的任意一個 Consumer 消費,但多個 Consumer Group 可同時消費這一消息。

說明:
1、同一個分區(partition)內的消息只能被同一個組中的一個消費者(consumer)消費,當消費者數量多於分區的數量時,多余的消費者空閑。
2、啟動多個組,則會使同一個消息被消費多次。

詳細請看:https://www.jianshu.com/p/6233d5341dfe

組成結構

生產者消費者關系:

對於每一個topic, Kafka集群都會維持一個分區日志,如下所示:

kafka分區是提高kafka性能的關鍵所在,當發現集群性能不高時,常用手段就是增加Topic的分區,分區里面的消息是按照從新到老的順序進行組織,消費者從隊列頭訂閱消息,生產者從隊列尾添加消息。

負載均衡

Kafka 負載消費的內部原理是,把訂閱的 Topic 的分區,平均分配給各個消費實例。因此,消費實例的個數不要大於分區的數量,否則會有實例分配不到任何分區而處於空跑狀態。這個負載均衡發生的時間,除了第一次啟動上線之外,后續消費實例發生重啟、增加、減少等變更時,都會觸發一次負載均衡。

配置

Kafka支持的配置非常多,這里僅僅列出來部分關於 broker 的配置。broker 配置文件是config/server.properties

每個kafka broker中配置文件默認必須配置的屬性如下:

broker.id=0  
port=9092
num.network.threads=2  
num.io.threads=8  
socket.send.buffer.bytes=1048576  
socket.receive.buffer.bytes=1048576  
socket.request.max.bytes=104857600  
log.dirs=/tmp/kafka-logs  
num.partitions=1
log.retention.hours=168  
log.segment.bytes=536870912  
log.retention.check.interval.ms=60000  
log.cleaner.enable=false  
zookeeper.connect=localhost:2181  
zookeeper.connection.timeout.ms=1000000

配置說明:

參數 默認值 描述
broker.id -1 用於服務的broker id。如果沒設置,將生成一個唯一broker id。為了避免ZooKeeper生成的id和用戶配置的broker id相沖突,生成的id將在reserved.broker.max.id的值基礎上加1。
port 9092 broker server服務端口。僅在未設置listeners時使用。
host.name broker的主機地址,若是設置了,那么會綁定到這個地址上,若是沒有,會綁定到所有的接口上,並將其中之一發送到ZK。僅在未設置listeners時使用。
log.dirs /tmp/kafka-logs kafka數據的存放地址,多個地址的話用逗號分割,多個目錄分布在不同磁盤上可以提高讀寫性能 /data/kafka-logs-1,/data/kafka-logs-2
message.max.bytes 1000012 表示消息體的最大大小,單位是字節
num.network.threads 3 broker處理消息的最大線程數,一般情況下數量為cpu核數
num.io.threads 8 處理IO的線程數
log.flush.interval.messages Long.MaxValue 在數據被寫入到硬盤和消費者可用前最大累積的消息的數量
log.flush.interval.ms Long.MaxValue 在數據被寫入到硬盤前的最大時間
log.flush.scheduler.interval.ms Long.MaxValue 檢查數據是否要寫入到硬盤的時間間隔。
log.retention.hours 168 控制一個log保留多長個小時
log.retention.bytes -1 控制log文件最大尺寸
log.cleaner.enable false 是否log cleaning
log.cleanup.policy delete delete還是compat.
log.segment.bytes 1073741824 單一的log segment文件大小
log.roll.hours 168 開始一個新的log文件片段的最大時間
background.threads 10 后台線程序
num.partitions 1 默認分區數
socket.send.buffer.bytes 102400 socket SO_SNDBUFF參數
socket.receive.buffer.bytes 102400 socket SO_RCVBUFF參數
zookeeper.connect 指定zookeeper連接字符串, 格式如hostname:port/chroot。chroot是一個namespace
zookeeper.connection.timeout.ms 6000 指定客戶端連接zookeeper的最大超時時間
zookeeper.session.timeout.ms 6000 連接zk的session超時時間
zookeeper.sync.time.ms 2000 zk follower落后於zk leader的最長時間
auto.create.topics.enable true 是否允許在服務器上自動創建topic

更多配置查看官方文檔:http://kafka.apache.org/documentation.html#configuration

常用命令

  • 啟動zookeeper
$ bin/zookeeper-server-start.sh config/zookeeper.properties &
  • 關閉zookeeper
$ bin/zookeeper-server-stop.sh
  • 啟動kafka
$ bin/kafka-server-start.sh config/server.properties &
  • 關閉kafka
$ bin/kafka-server-stop.sh
  • 創建topic
$ bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test
  • 查看所有topic
$ bin/kafka-topics.sh --list --bootstrap-server localhost:9092
  • 查看某個topic具體信息
$ bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic test
  • 刪除topic (可直接刪除的前提:delete.topic.enable=true)
$ bin/kafka-topics.sh --delete --bootstrap-server localhost:9092 --topic test
  • 生產消息
$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
  • 消費消息
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

Kafka監控工具

https://github.com/Morningstar/kafka-offset-monitor

消息隊列比較

消息隊列主要是解決了應用解耦、異常處理、流量削鋒等問題。常見的消息隊列還有:ActiveMQRabbitMQRocketMQZeroMQMetaMQ 等等。當然,我們也可以使用Redis作為簡單的消息隊列使用。

消息隊列對比參考:

(圖片來源於互聯網)

參考

1、Apache Kafka
http://kafka.apache.org/documentation/
2、消息隊列Kafka、RocketMQ、RabbitMQ的優劣勢比較 - 知乎
https://zhuanlan.zhihu.com/p/60288391
3、weiboad/kafka-php: kafka php client
https://github.com/weiboad/kafka-php
4、kafka中partition和消費者對應關系 - 簡書
https://www.jianshu.com/p/6233d5341dfe
5、kafka常用的命令 - 隨筆 - SegmentFault 思否
https://segmentfault.com/a/1190000010040990
6、消息中間件部署及比較:rabbitMQ、activeMQ、zeroMQ、rocketMQ、Kafka、redis - 掘金
https://juejin.im/post/5b32044ef265da59654c3027
7、面試官問分布式技術面試題,一臉懵逼怎么辦?_ITPUB博客
http://blog.itpub.net/69917606/viewspace-2642545/
8、產品架構_產品簡介_消息隊列 Kafka-阿里雲
https://help.aliyun.com/document_detail/68152.html?spm=a2c4g.11186623.6.543.3ba272e4cAMqaH


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM