KafKa的基本認識,寫的很好的一篇博客:https://www.cnblogs.com/sujing/p/10960832.html
問題:
1、kafka是什么?
Kafka是一種高吞吐量的分布式發布訂閱消息系統,它可以處理消費者規模的網站中的所有動作流數據,具有高性能、持久化、多副本備份、橫向擴展能力。
2、kafka的工作原理[去耦合]
Kafka采用的是訂閱-發布的模式,消費者主動的去kafka集群拉取消息,與producer相同的是,消費者在拉取消息的時候也是找leader去拉取。
3、kafka存在的意義:去耦合、異步、中間件的消息系統。
- kafka節點之間如何復制備份的?
- kafka消息是否會丟失?為什么?[ACK機制]
- kafka最合理的配置是什么?
- kafka的leader選舉機制是什么?
- kafka對硬件的配置有什么要求?
- kafka的消息保證有幾種方式?
- kafka為什么會丟消息?
1 Kafka簡介
Kafka:是一個高吞吐量、分布式的發布-訂閱消息系統。kafka是一款開源的、輕量級的、分布式、可分區和具有復制備份[Replicated]的、基於Zookeeper協調管理的分布式流平台的功能強大的消息系統。
1.1 Kafka特性
- 能夠允許發布和訂閱流數據
- 存儲流數據時提供相應的容錯機制
- 當流數據到達時能夠被及時處理[近乎實時性的消息處理能力,可以高效地存儲消息和查詢消息]
1.2 Kafka消息系統最基本的體系結構[kafka的工作模式]
1.3 Kafka生態系統
Kafka Core API:
Kafka Extended API:
1.4 Kafka基本概念[核心概念]
- Topic& 分區 &Log:Topic 是用於存儲消息的邏輯概念,可以看作一個消息集合。每個 Topic 可以有多個生產者向其中推送(push)消息,也可以有任意多個消費者消費其中的消息。每個分區[對應一個磁盤的文件夾]由一系列有序、不可變的消息組成,是一個有序隊列。Log 由多個 Segment 組成,每個 Segment 對應一個日志文件和索引文件。每個分區在物理上對應為一個文件夾,分區的命名規則為主題名稱后接“—”連接符,之 后再接分區編號,分區編號從 0 開始,編號最大值為分區的總數減 1。日志段:一個日志又被划分為多個日志段(LogSegment)[邏輯概念],日志段是 Kafka 日志對象分片的最小單位。一個日志段對應磁盤上一個具體日志文件和 兩個索引文件。日志文件是以“.log”為文件名后綴的數據文件,用於保存消息實際數據。兩個索引文件分別以“.index”和“.timeindex”作為文件名后綴,分別表示消息偏移量索引文件和消息時間戳索引文件。
Topic與Partition之間的關系:
- 消息:Kafka通信的基本單位,消息由一串字節構成,其中主要由 key 和 value 構成,key 和 value 也都是 byte 數組。key的主要作用是根據一定的策略,將此消息路由到指定的分區中,這樣就可以保證包含同一 key 的消息全部寫入同一分區中,key 可以是 null。消息的真正有效負載是 value 部分的數據。
- 副本:每個 Partition 可以有多個副本,每個副本中包含的消息是一樣的。每個分區至少有一個副本,當分區中只有一個副本時,就只有 Leader 副本,沒有 Follower 副本。所有的讀寫請求都由選舉出 的 Leader 副本處理,其他都作為 Follower 副本,Follower 副本僅僅是從 Leader 副本處把數據拉取到本地之后,同步更新到自己的 Log 中。Kafka 提供兩種刪除老數據的策略, 一是基於消息已存儲的時間長度,二是基於分區的大小。
- 偏移量:任何發布到分區的消息會被直接追加到日志文件(分區目錄下以“.log”為文件名后綴的 數據文件)的尾部,而每條消息在日志文件中的位置都會對應一個按序遞增的偏移量。
- 代理:每一個Kafka實例稱為代理(Broker),也稱為kafka服務器。Kafka集群一般包含一台或多台服務器,可以在一台服務器上配置一個或多個代理。
代理和主題之間的關系:
- 生產者:將消息發給代理,也就是向Kafka代理發送消息的客戶端。例如:生產者將數據發送到主題。[key-value的形式寫入]
- 消費者和消費組:消費者(Comsumer)以拉取 (pull)方式拉取數據,它是消費的客戶端。在 Kafka 中每一 個消費者都屬於一個特定消費組(ConsumerGroup),我們可以為每個消費者指定一個消費組, 以 groupId 代表消費組名稱,通過 group.id 配置設置。
Consumer group 和topic的聯動方式:比如這里的topic1有3個分區,Consumer Group 0中有2個Consumer,Consumer 1 拉取分區0和分區1的數據,Consumer 2拉取分區2的數據。
- ISR:Kafka 在 ZooKeeper 中動態維護了一個 ISR(In-sync Replica),即保存同步的副本列表,該列表中保存的是與 Leader 副本保持消息同步的所有副本對應的代理節點id。ISR(In-Sync Replica)集合表示的是目前“可用”(alive)且消息量與 Leader 相差不多的副本集合,這是整個副本集合的一 個子集。
- ZooKeeper[選舉算法]:Kafka 利用 ZooKeeper 保存相應元數據信息,Kafka元數據信息包括如代理節點信息、Kafka 集群信息、舊版消費者信息及其消費偏移量信息、主題信息、分區狀態信息、分區副本分配方 案信息、動態配置信息等。
1.5 Kafka集群架構
根據業務邏輯產生消息,在根據路由規則將消息發送到指定分區的Leader副本所在的Broker上。
1.6 Kafka設計概述
- 動機:統一、實時處理大規模數據的平台。[類似數據庫日志系統]
- (1)具有高吞吐量來支持諸如實時的日志集這樣的大規模事件流。
- (2)能夠很好地處理大量積壓的數據,以便能夠周期性地加載離線數據進行處理。
- (3)能夠低延遲地處理傳統消息應用場景。
- (4)能夠支持分區、分布式,實時地處理消息,同時具有容錯保障機制。
- 特性:消息持久化、高吞吐量、擴展性、多客戶端支持、Kafka Streams、安全機制、數據備份、輕量級、消息壓縮。
- (1)消息持久化:Kafka 高度依賴於文件系統來存儲和緩存消息。
- (2)高吞吐量:Kafka 將數據寫到磁盤,充分利用磁盤的順序讀寫。 同時,Kafka 在數據寫入及數據同步采用了零拷貝(zero-copy)技術,完全在內核中操作,從而避免了內核緩沖區與用戶緩沖區之間數據的拷貝,操作效率極高。Kafka 還支持數據壓縮及批量發送,同時 Kafka 將每個主題划分為多個分區。
- (3)擴展性:集群能夠自動感知,重新進行負責均衡及數據復制。
- 應用場景:
- (1)消息系統。[在應用系統中可以將kafka作為傳統的消息中間件,實現消息隊列和消息的發布/訂閱]
- (2)應用監控。
- (3)網站用戶行為追蹤。[用作日志收集中心,多個系統產生的日志統一收集到Kafka中,然后由數據分析平台進行統一處理]
- (4)流處理。
- (5)持久性日志。[Kafka 可以為外部系統提供一種持久性日志的分布式系統。日志可以在多個節點間進行備份,Kafka為故障節點數據恢復提供了一種重新同步的機制。]
- (6)kafka用作系統中的數據總線,將其接入多個子系統中,子系統會將產生的數據發送到kafka中保存,之后流轉到目的系統中。
2 配置Kafka集群及開發環境
- 在linux中安裝並配置docker並啟動kafka開發環境
- 在kafka環境中對topic進行create、delete、list、descr
- 在kafka環境命令行中producer向topic中推送數據
- 在kafka環境命令行中consumer向topic中拉取數據
2.1 在linux中安裝並配置docker並啟動kafka開發環境
- 1、ubuntu docker安裝[使用官方安裝腳本自動安裝]:https://www.runoob.com/docker/ubuntu-docker-install.html
- 2、如何使用docker在本地建立一個kafka的開發環境:在github官網(https://github.com)中搜索lensesio/fast-data-dev
- 3、在瀏覽器中打開網址:http://localhost:3030/,看到如下界面,表示kafka開發環境啟動成功。
#安裝curl,目的是使用官網安裝docker[linux安裝時提示沒有該命令,按需安裝] apt install curl #在linux[查看版本使用命令:uname -r]下安裝docker curl -fsSL https://get.docker.com | bash -s docker --mirror Aliyun #啟動kafka開發環境 docker run --rm -it \ -p 2181:2181 -p 3030:3030 -p 8081:8081 \ -p 8082:8082 -p 8083:8083 -p 9092:9092 \ -e ADV_HOST=127.0.0.1 \ lensesio/fast-data-dev
前提是docker已安裝成功:
2.2 在kafka環境中對topic進行create、delete、list、descr
#登錄到kafka,進行kafca操作 docker run --rm -it --net=host lensesio/fast-data-dev bash #topic對應的參數名稱顯示 kafka-topics #創建first-topic kafka-topics --zookeeper 127.0.0.1:2181 --create --topic first_topic --partitions 3 --replication-factor 1 kafka-topics --zookeeper 127.0.0.1:2181 --create --topic second_topic --partitions 3 --replication-factor 1 #列表顯示對應的topic kafka-topics --zookeeper 127.0.0.1:2181 --list #刪除topic kafka-topics --zookeeper 127.0.0.1:2181 --delete --topic first_topic #查看存在的topic kafka-topics --zookeeper 127.0.0.1:2181 --describe --topic second_topic
2.3 在kafka環境命令行中producer向topic中推送數據
#登錄到kafka,進行kafca操作 docker run --rm -it --net=host lensesio/fast-data-dev bash #用console producer向topic中推送數據 #producer對應的參數名稱顯示 kafka-console-producer #推送數據hi、hello、today、nice、kafka
[未指定key,value隨機指派給分區] kafka-console-producer --broker-list 127.0.0.1:9092 --topic first_topic >hi >hello [指定key,value指派給對應的分區] kafka-console-producer --broker-list 127.0.0.1:9092 --topic first_topic --property "parse.key=true" \
--property "key.separator=:"
2.4 在kafka環境命令行中consumer向topic中拉取數據
#登錄到kafka,進行kafca操作 docker run --rm -it --net=host lensesio/fast-data-dev bash #用console consumer向topic中拉取數據,下列命令可以列出所有需要使用的參數 kafka-console-consumer #創建consumer,從當前位置開始讀取 kafka-console-consumer --bootstrap-server 127.0.0.1:9092 --topic first_topic #創建consumer,從起始位置開始讀取,my-group-1中的一員 kafka-console-consumer --bootstrap-server 127.0.0.1:9092 --topic first_topic \
--consumer-property group.id=my-group-1 --from-beginning #創建consumer,從起始位置開始讀取 kafka-console-consumer --bootstrap-server 127.0.0.1:9092 --topic first_topic --from-beginning