Kafka


Kafka 入門

 

什么是 Kafka

kafka最初是 LinkedIn 的一個內部基礎設施系統。最初開發的起因是,LinkedIn雖然有了數據庫和其他系統可以用來存儲數據,但是缺乏一個可以幫助處理持續數據流的組件。所以在設計理念上,開發者不想只是開發一個能夠存儲數據的系統,如關系數據庫、Nosql 數據庫、搜索引擎等等,更希望把數據看成一個持續變化和不斷增長的流,並基於這樣的想法構建出一個數據系統,一個數據架構。

Kafka外在表現很像消息系統,允許發布和訂閱消息流,但是它和傳統的消息系統有很大的差異,

首先,Kafka 是個現代分布式系統,以集群的方式運行,可以自由伸縮。

其次,Kafka 可以按照要求存儲數據,保存多久都可以,

第三,流式處理將數據處理的層次提示到了新高度,消息系統只會傳遞數據,Kafka 的流式處理能力可以讓我們用很少的代碼就能動態地處理派生流和數據集。所以 Kafka 不僅僅是個消息中間件。

Kafka不僅僅是一個消息中間件,同時它是一個流平台,這個平台上可以發布和訂閱數據流(Kafka 的流,有一個單獨的包 Stream 的處理),並把他們保存起來,進行處理,這個是 Kafka作者的設計理念。

大數據領域,Kafka 還可以看成實時版的 Hadoop,但是還是有些區別,Hadoop 可以存儲和定期處理大量的數據文件,往往以 TB 計數,而 Kafka可以存儲和持續處理大型的數據流。Hadoop 主要用在數據分析上,而 Kafka因為低延遲,更適合於核心的業務應用上。所以國內的大公司一般會結合使用,比如京東在實時數據計算架構中就使用了到了 Kafka,具體見《張開濤-海量數據下的應用系統架構實踐》

常見的大數據處理框架:storm、spark、Flink、(Blink 阿里)

Kafka名字的由來:卡夫卡與法國作家馬塞爾·普魯斯特,愛爾蘭作家詹姆斯·喬伊斯並稱為西方現代主義文學的先驅和大師。《變形記》是卡夫卡的短篇代表作,是卡夫卡的藝術成就中的一座高峰,被認為是 20 世紀最偉大的小說作品之一(達到管理層的高度應該多看下人文相關的書籍,增長管理知識和人格魅力)。

 

本文以 kafka_2.11-2.3.0 版本為主,其余版本不予考慮,並且 Kafka  scala 語言寫的,小眾語言,沒有必要研究其源碼,投入和產出比低,除非你的技術級別非常高或者需要去開發單獨的消息中間件。

 

Kafka 中的基本概念

消息和批次

  消息,Kafka 里的數據單元,也就是我們一般消息中間件里的消息的概念(可以比作數據庫中一條記錄)。消息由字節數組組成。消息還可以包含鍵(可選元數據,也是字節數組),主要用於對消息選取分區。

作為一個高效的消息系統,為了提高效率,消息可以被分批寫入 Kafka。批次就是一組消息,這些消息屬於同一個主題和分區。如果只傳遞單個消息,會導致大量的網絡開銷,把消息分成批次傳輸可以減少這開銷。但是,這個需要權衡(時間延遲和吞吐量之間),批次里包含的消息越多,單位時間內處理的消息就越多,單個消息的傳輸時間就越長(吞吐量高延時也高)。如果進行壓縮,可以提升數據的傳輸和存儲能力,但需要更多的計算處理。

  對於 Kafka來說,消息是晦澀難懂的字節數組,一般我們使用序列化和反序列化技術,格式常用的有 JSON  XML,還有 Avro(Hadoop 開發的一款序列化框架),具體怎么使用依據自身的業務來定。

主題和分區

  Kafka里的消息用主題進行分類(主題好比數據庫中的表),主題下有可以被分為若干個分區(分表技術)。分區本質上是個提交日志文件,有新消息,這個消息就會以追加的方式寫入分區(寫文件的形式),然后用先入先出的順序讀取。

  但是因為主題會有多個分區,所以在整個主題的范圍內,是無法保證消息的順序的,單個分區則可以保證。

  Kafka通過分區來實現數據冗余和伸縮性,因為分區可以分布在不同的服務器上,那就是說一個主題可以跨越多個服務器(這是 Kafka 高性能的一個原因,多台服務器的磁盤讀寫性能比單台更高)。

  前面我們說 Kafka 可以看成一個流平台,很多時候,我們會把一個主題的數據看成一個流,不管有多少個分區。

 

 

生產者和消費者、偏移量、消費者群組

  就是一般消息中間件里生產者和消費者的概念。一些其他的高級客戶端 API,像數據管道 API 和流式處理的 Kafka Stream,都是使用了最基本的生產者和消費者作為內部組件,然后提供了高級功能。

  生產者默認情況下把消息均衡分布到主題的所有分區上,如果需要指定分區,則需要使用消息里的消息鍵和分區器。

  消費者訂閱主題,一個或者多個,並且按照消息的生成順序讀取。消費者通過檢查所謂的偏移量來區分消息是否讀取過。偏移量是一種元數據,一個不斷遞增的整數值,創建消息的時候,Kafka 會把他加入消息。在一個主題中一個分區里,每個消息的偏移量是唯一的。每個分區最后讀取的消息偏移量會保存到 Zookeeper 或者 Kafka 上,這樣分區的消費者關閉或者重啟,讀取狀態都不會丟失。

  多個消費者可以構成一個消費者群組。怎么構成?共同讀取一個主題的消費者們,就形成了一個群組。群組可以保證每個分區只被一個消費者使用。

  消費者和分區之間的這種映射關系叫做消費者對分區的所有權關系,很明顯,一個分區只有一個消費者,而一個消費者可以有多個分區。

  (吃飯的故事:一桌一個分區,多桌多個分區,生產者不斷生產消息(消費),消費者就是買單的人,消費者群組就是一群買單的人),一個分區只能被消費者群組中的一個消費者消費(不能重復消費),如果有一個消費者掛掉了<James 跑路了>,另外的消費者接上)

 

 

Broker 和集群

  一個獨立的 Kafka 服務器叫 Broker。broker 的主要工作是,接收生產者的消息,設置偏移量,提交消息到磁盤保存;為消費者提供服務,響應請求,返回消息。在合適的硬件上,單個 broker 可以處理上千個分區和每秒百萬級的消息量。(要達到這個目的需要做操作系統調優和 JVM 調優)

  多個 broker 可以組成一個集群。每個集群中 broker 會選舉出一個集群控制器。控制器會進行管理,包括將分區分配給 broker 和監控 broker。

  集群里,一個分區從屬於一個 broker,這個 broker 被稱為首領。但是分區可以被分配給多個 broker,這個時候會發生分區復制。

  集群中 Kafka 內部一般使用管道技術進行高效的復制。

分區復制帶來的好處是,提供了消息冗余。一旦首領 broker 失效,其他 broker 可以接管領導權。當然相關的消費者和生產者都要重新連接到新的首領上。

 

保留消息

  在一定期限內保留消息是 Kafka 的一個重要特性,Kafka broker 默認的保留策略是:要么保留一段時間(天),要么保留一定大小(比如 1  G)。

到了限制,舊消息過期並刪除。但是每個主題可以根據業務需求配置自己的保留策略(開發時要注意,Kafka 不像 Mysql 之類的永久存儲)。

 

 

為什么選擇 Kafka

 

優點

  多生產者和多消費者

 

  基於磁盤的數據存儲,換句話說,Kafka 的數據天生就是持久化的。

  高伸縮性,Kafka 一開始就被設計成一個具有靈活伸縮性的系統,對在線集群的伸縮絲毫不影響整體系統的可用性。

  高性能,結合橫向擴展生產者、消費者和 broker,Kafka 可以輕松處理巨大的信息流(LinkedIn 公司每天處理萬億級數據),同時保證亞秒級的消息延遲。

 

常見場景

 

 

活動跟蹤

  跟蹤網站用戶和前端應用發生的交互,比如頁面訪問次數和點擊,將這些信息作為消息發布到一個或者多個主題上,這樣就可以根據這些數據為機器學習提供數據,更新搜素結果等等(頭條、淘寶等總會推送你感興趣的內容,其實在數據分析之前就已經做了活動跟蹤)。

 

傳遞消息

  標准消息中間件的功能

 

收集指標和日志

  收集應用程序和系統的度量監控指標,或者收集應用日志信息,通過 Kafka路由到專門的日志搜索系統,比如 ES。(國內用得較多)

 

提交日志

  收集其他系統的變動日志,比如數據庫。可以把數據庫的更新發布到 Kafka上,應用通過監控事件流來接收數據庫的實時更新,或者通過事件流將數據庫的更新復制到遠程系統。

  還可以當其他系統發生了崩潰,通過重放日志來恢復系統的狀態。(異地災備)

 

流處理

  操作實時數據流,進行統計、轉換、復雜計算等等。隨着大數據技術的不斷發展和成熟,無論是傳統企業還是互聯網公司都已經不再滿足於離線批處理,實時流處理的需求和重要性日益增長

  近年來業界一直在探索實時流計算引擎和 API,比如這幾年火爆的 Spark Streaming、Kafka Streaming、Beam  Flink,其中阿里雙 11 會場展示的實時銷售金額,就用的是流計算,是基於 Flink,然后阿里在其上定制化的 Blink。

 

Kafka 的安裝、管理和配置

安裝

預備環境

  Kafka是 Java 生態圈下的一員,用 Scala 編寫,運行在 Java 虛擬機上,所以安裝運行和普通的 Java 程序並沒有什么區別。

  安裝 Kafka官方說法,Java 環境推薦 Java8。

  Kafka需要 Zookeeper 保存集群的元數據信息和消費者信息。Kafka一般會自帶 Zookeeper,但是從穩定性考慮,應該使用單獨的 Zookeeper,而且構建Zookeeper 集群。

 

 

下載和安裝 Kafka

 

  在 http://kafka.apache.org/downloads 上尋找合適的版本下載,這里選用的是 kafka_2.11-2.3.0,下載完成后解壓到本地目錄。

 

 

 運行

 

  啟動 Zookeeper

 

  進入 Kafka目錄下的 bin\windows

 

  執行 kafka-server-start.bat ../../config/server.properties,出現以下畫面表示成功

 

  Linux下與此類似,進入 bin 后,執行對應的 sh 文件即可

 

 

 

基本的操作和管理

##列出所有主題

kafka-topics.bat --zookeeper localhost:2181 --list

##列出所有主題的詳細信息

kafka-topics.bat --zookeeper localhost:2181 --describe

##創建主題 主題名 my-topic1 副本,8 分區

kafka-topics.bat --zookeeper localhost:2181 --create --topic my-topic --replication-factor 1 --partitions 8

##增加分區,注意:分區無法被刪除

kafka-topics.bat --zookeeper localhost:2181 --alter --topic my-topic --partitions 16

##刪除主題

kafka-topics.bat --zookeeper localhost:2181 --delete --topic my-topic

##創建生產者(控制台)

kafka-console-producer.bat --broker-list localhost:9092 --topic my-topic

##創建消費者(控制台)

kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic my-topic --from-beginning

##列出消費者群組(僅 Linux

kafka-topics.sh --new-consumer --bootstrap-server localhost:9092 --list

##列出消費者群組詳細信息(僅 Linux

kafka-topics.sh --new-consumer --bootstrap-server localhost:9092 --describe --group 群組名

 

 

Broker 配置

  配置文件放在 Kafka目錄下的 config 目錄中,主要是 server.properties 文件

常規配置

broker.id

  在單機時無需修改,但在集群下部署時往往需要修改。它是個每一個 broker 在集群中的唯一表示,要求是正數。當該服務器的 IP 地址發生改變時,broker.id 沒有變化,則不會影響 consumers 的消息情況

listeners

  監聽列表(以逗號分隔 不同的協議(如 plaintext,trace,ssl、不同的 IP 和端口)),hostname 如果設置為 0.0.0.0 則綁定所有的網卡地址;如果 hostname 為空則綁定默認的網卡。

如果沒有配置則默認為 java.net.InetAddress.getCanonicalHostName()。

如:PLAINTEXT://myhost:9092,TRACE://:9091  PLAINTEXT://0.0.0.0:9092,

 

zookeeper.connect

  zookeeper 集群的地址,可以是多個,多個之間用逗號分割。(一組 hostname:port/path 列表,hostname  zk 的機器名或 IP、port  zk 的端口、/path是可選 zk 的路徑,如果不指定,默認使用根路徑)

 

log.dirs

  Kafka把所有的消息都保存在磁盤上,存放這些數據的目錄通過 log.dirs 指定。可以使用多路徑,使用逗號分隔。如果是多路徑,Kafka會根據“最少使用”原則,把同一個分區的日志片段保存到同一路徑下。會往擁有最少數據分區的路徑新增分區。

 

num.recovery.threads.per.data.dir

  每數據目錄用於日志恢復啟動和關閉時的線程數量。因為這些線程只是服務器啟動(正常啟動和崩潰后重啟)和關閉時會用到。所以完全可以設置大量的線程來達到並行操作的目的。注意,這個參數指的是每個日志目錄的線程數,比如本參數設置為 8,而 log.dirs 設置為了三個路徑,則總共會啟動24 個線程。

 

auto.create.topics.enable

  是否允許自動創建主題。如果設為 true,那么 produce(生產者往主題寫消息),consume(消費者從主題讀消息)或者 fetch metadata(任意客戶端向主題發送元數據請求時)一個不存在的主題時,就會自動創建。缺省為 true。

 

 

主題配置

  新建主題的默認參數

num.partitions

  每個新建主題的分區個數(分區個數只能增加,不能減少 )。這個參數一般要評估,比如,每秒鍾要寫入和讀取 1000M 數據,如果現在每個消費者每秒鍾可以處理 50MB 的數據,那么需要 20 個分區,這樣就可以讓 20 個消費者同時讀取這些分區,從而達到設計目標。(一般經驗,把分區大小限制在25G 之內比較理想)

 

 

log.retention.hours

 

  日志保存時間,默認為 7 天(168 小時)。超過這個時間會清理數據。bytes  minutes 無論哪個先達到都會觸發。與此類似還有 log.retention.minutes和log.retention.ms,都設置的話,優先使用具有最小值的那個。(提示:時間保留數據是通過檢查磁盤上日志片段文件的最后修改時間來實現的。也就是最后修改時間是指日志片段的關閉時間,也就是文件里最后一個消息的時間戳)

 

 

 

log.retention.bytes

 

  topic 每個分區的最大文件大小,一個 topic 的大小限制 = 分區數*log.retention.bytes。-1 沒有大小限制。log.retention.bytes  log.retention.minutes任意一個達到要求,都會執行刪除。(注意如果是 log.retention.bytes 先達到了,則是刪除多出來的部分數據),一般不推薦使用最大文件刪除策略,而是推薦使用文件過期刪除策略。

 

 

 

log.segment.bytes

 

  分區的日志存放在某個目錄下諸多文件中,這些文件將分區的日志切分成一段一段的,我們稱為日志片段。這個屬性就是每個文件的最大尺寸;當尺寸達到這個數值時,就會關閉當前文件,並創建新文件。被關閉的文件就開始等待過期。默認為 1G。

 

如果一個主題每天只接受 100MB 的消息,那么根據默認設置,需要 10 天才能填滿一個文件。而且因為日志片段在關閉之前,消息是不會過期的,所以如果 log.retention.hours 保持默認值的話,那么這個日志片段需要 17 天才過期。因為關閉日志片段需要 10 天,等待過期又需要 7 天。

 

 

 

 

log.segment.ms

  作用和 log.segment.bytes 類似,只不過判斷依據是時間。同樣的,兩個參數,以先到的為准。這個參數默認是不開啟的。

 

message.max.bytes

  表示一個服務器能夠接收處理的消息的最大字節數,注意這個值 producer  consumer 必須設置一致,且不要大於 fetch.message.max.bytes 屬性的值(消費者能讀取的最大消息,這個值應該大於或等於 message.max.bytes)。該值默認是 1000000 字節,大概 900KB~1MB。如果啟動壓縮,判斷壓縮后的值。

這個值的大小對性能影響很大,值越大,網絡和 IO 的時間越長,還會增加磁盤寫入的大小。

Kafka 設計的初衷是迅速處理短小的消息,一般 10K 大小的消息吞吐性能最好(LinkedIn  kafka性能測試)

 

硬件配置對 Kafka 性能的影響

  為 Kafka 選擇合適的硬件更像是一門藝術,就跟它的名字一樣,我們分別從磁盤、內存、網絡和 CPU 上來分析,確定了這些關注點,就可以在預算范圍之內選擇最優的硬件配置。

 

磁盤吞吐量/磁盤容量

  磁盤吞吐量(IOPS 每秒的讀寫次數)會影響生產者的性能。因為生產者的消息必須被提交到服務器保存,大多數的客戶端都會一直等待,直到至少有一個服務器確認消息已經成功提交為止。也就是說,磁盤寫入速度越快,生成消息的延遲就越低。(SSD固態貴單個速度快,HDD 機械偏移可以多買幾個,設置多個目錄加快速度,具體情況具體分析)

  磁盤容量的大小,則主要看需要保存的消息數量。如果每天收到 1TB 的數據,並保留 7 天,那么磁盤就需要 7TB 的數據。

 

內存

  Kafka本身並不需要太大內存,內存則主要是影響消費者性能。在大多數業務情況下,消費者消費的數據一般會從內存(頁面緩存,從系統內存中分)

中獲取,這比在磁盤上讀取肯定要快的多。一般來說運行 Kafka  JVM 不需要太多的內存,剩余的系統內存可以作為頁面緩存,或者用來緩存正在使用的日志片段,所以我們一般 Kafka不會同其他的重要應用系統部署在一台服務器上,因為他們需要共享頁面緩存,這個會降低 Kafka 消費者的性能。

 

 

網絡

  網絡吞吐量決定了 Kafka能夠處理的最大數據流量。它和磁盤是制約 Kafka 拓展規模的主要因素。對於生產者、消費者寫入數據和讀取數據都要瓜分網絡流量。同時做集群復制也非常消耗網絡。

 

CPU

  Kafka對 cpu的要求不高,主要是用在對消息解壓和壓縮上。所以 cpu 的性能不是在使用 Kafka的首要考慮因素。

 

 總結

  我們要為 Kafka選擇合適的硬件時,優先考慮存儲,包括存儲的大小,然后考慮生產者的性能(也就是磁盤的吞吐量),選好存儲以后,再來選擇CPU 和內存就容易得多。網絡的選擇要根據業務上的情況來定,也是非常重要的一環。

 

 

Kafka 的集群

 

 

為何需要 Kafka 集群

  本地開發,一台 Kafka足夠使用。在實際生產中,集群可以跨服務器進行負載均衡,再則可以使用復制功能來避免單獨故障造成的數據丟失。同時集群可以提供高可用性。

 

如何估算 Kafka 集群中 Broker 的數量

  要估量以下幾個因素:

    需要多少磁盤空間保留數據,和每個 broker 上有多少空間可以用。比如,如果一個集群有 10TB 的數據需要保留,而每個 broker 可以存儲 2TB,那么至少需要 5  broker。如果啟用了數據復制,則還需要一倍的空間,那么這個集群需要 10  broker。

    集群處理請求的能力。如果因為磁盤吞吐量和內存不足造成性能問題,可以通過擴展 broker 來解決。

 

Broker 如何加入 Kafka 集群

  非常簡單,只需要兩個參數。第一,配置 zookeeper.connect,第二,為新增的 broker 設置一個集群內的唯一性 id。

  Kafka中的集群是可以動態擴容的。

 

 

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM