Kafka簡介
在當前的大數據時代,第一個挑戰是海量數據的收集,另一個就是這些數據的分析。數據分析的類型通常有用戶行為數據、應用性能跟蹤數據、活動數據日志、事件消息等。消息發布機制用於連接各種應用並在它們之間路由消息,例如通過message broker。Kafka是快速地將海量信息實時路由到消費者的解決方案,實現信息的生產者和消費者的無縫集成。它不會阻塞信息的生產者,同時信息生產者不會知道信息消費者。
Apache Kafka是個開源的分布式消息發布訂閱系統,具有以下特征:
- 消息持久化(persistent messaging):Kafka提供時間復雜度為O(1)的消息持久化能力,即使是TB級別以上的數據也能保證常數時間復雜度的訪問性能。在Kafka中,消息被持久化在磁盤中,同時也在集群之間復制,防止數據丟失
- 高吞吐率(high throughput):即使在普通的機器上也能提供每秒上百MB讀寫的能力。
- 分布式(distributed):Kafka支持Kafka服務器間的消息分區,以及基於消費者機器集群的分布式消費,維護每個分區內的順序。Kafka集群可以在不停機的情況下彈性地增減結點。
- 實時(real time):生產者線程產生的消息應該立即被消費者線程可見,這是基於事件系統的關鍵特性,例如Complex Event Processing (CEP) system。
- 支持在線水平擴展(scale out)
Kafka提供了實時發布訂閱的解決方案,克服了實時數據消費和比實時數據更大數量級的數據量增長的問題。Kafka也支持Hadoop系統中的並行數據加載。下圖展示了一種典型的使用Kafka消息系統的大數據聚合分析的場景。
在消息生產端,有不同類型的生產者,例如:
- 生成應用日志的前端web應用
- 生成web分析日志的代理
- 生成轉換日志的適配器
- 生成調用跟蹤日志的服務
在消息消費端,有不同類型的消費者,例如:
- 離線消費者(offline consumer):消費消息,將它們存儲到Hadoop或傳統數據倉庫用於離線分析
- 接近實時的消費者(near real-time consumer):消費消息,將它們存儲到NoSQL數據庫中用於接近實時的分析
- 實時消費者(real-time consumer):例如Spark或Storm,在內存數據庫中過濾信息並觸發相關事件
使用Kafka的場景
各種形式的web活動產生大量數據,例出用戶活動事件如登錄、訪問頁面、點擊鏈接,社交網絡活動如喜歡、分享、評論,還有系統運作日志等等。由於這些數據的高吞吐量(每秒百萬級的消息),通常由日志記錄系統和日志聚合解決方案來處理。這樣的傳統方案對提供日志數據給Hadoop這樣的離線分析系統是可行的,但對於需要實時處理的系統就夠嗆了。
互聯網應用的發展趨勢表明,這樣的活動數據已經成為生產數據的一部分,用於實時分析,包括:
- 基於相關性的搜索
- 基於受歡迎度、同現、或這情感分析的推薦
- 向大眾提供廣告
- 從垃圾郵件或抓取未經授權數據分析互聯網應用程序安全性
- 設備傳感器發送高溫報警
- 反常用戶行為或應用被侵入
Kafka的目標是統一離線和在線處理,通過提供Hadoop系統中並行加載機制和使用集群將實時消費分區的能力。Kafka可以跟Scribe或Flume相比較,因為它可以用於處理活動流數據,但是從架構的角度,它更接近於傳統的消息系統,如ActiveMQ或RabitMQ。
使用Kafka的案例
Kafka通常被用於:
- 日志聚合(log aggregation):從服務器收集日志文件,將它們放到文件服務器或HDFS中處理。使用Kafka可以將日志數據或事件數據抽象為信息流,從而避免了對文件格式等的依賴。同時提供了低延遲處理能力,並支持多數據源和分布式處理。
- 流處理(stream processing):Kafka可用於對數據進行多階段處理的場景,例如,一個主題的原始數據被消費,經過增強或轉換處理形成新的主題供后續的消費者使用。這樣的處理過程稱為流處理。
- 提交日志(commit logs):Kafka可用來處理大規模分布式系統的外部提交日志。在Kafka集群之間復制日志可以幫助故障節點恢復其狀態。
- 點擊流跟蹤(click stream tracking):另一個常用Kafka的場景是,捕捉用戶點擊流數據,如頁面視圖、搜索等這樣的real-time publish-subscribe feed。這些數據被發布到中心主題,由於數據量巨大,每個活動類型一個主題。這些主題可以被很多類型的消費者訂閱,如實時處理和監控應用。
- 消息處理(messaging):message broker用於將數據處理和數據提供者解耦。Kafka可以作為message broker,它具有更好的吞吐量、內建分區、復制和容錯能力。
一些公司將Kafka用於各自的場景:
- LinkedIn使用Kafka處理活動流數據和運營指標數據。這些數據支撐着除Hadoop離線分析系統外的多個系統,如LinkedIn news feed和LinkedIn Today。
- DataSift使用Kafka收集監控事件和實時跟蹤用戶消費數據。
- Twitter、1號店將Kafka作為Storm的一部分。
- Foursquare使用Kafka支撐在線和離線消息處理,將監控和基於Hadoop的離線的生產系統相集成。
- Square將Kafka當作總線使用,在各種各樣的數據中心之間傳遞系統事件。
Kafka的一些高級特性
- 為分區提供了復制因子。保證了所有提交的信息不會丟失,即使某個borker失效時分區中還有未消費的數據,至少有一個副本是可用的。默認生產者發送信息請求會阻塞直到信息提交到所有活躍的副本中,也可以通過配置指定生產者將信息提交到某個broker。
- 消費者采用長輪詢模式(long-pulling model),並且會阻塞直到有可用的提交信息,避免了頻繁輪詢。
安裝Kafka
Kafka是用Scala語言實現的,並用Gradle構建二進制包。使用Kafka之前安裝JDK 1.7或更高版本。並配置環境變量JAVA_HOME。
-
下載Kaka:
wget http://mirrors.ustc.edu.cn/apache/kafka/0.9.0.0/kafka_2.10-0.9.0.0.tgz
-
解壓縮:
tar xzf kafka_2.10-0.9.0.0.tgz cd kafka_2.10-0.9.0.0
其中:
/bin 啟動和停止命令等 /config 配置文件 /libs 類庫
-
啟動和停止
啟動Zookeeper server:bin/zookeeper-server-start.sh config/zookeeper.properties &
啟動Kafka server:
bin/kafka-server-start.sh config/server.properties &
停止Kafka server:
bin/kafka-server-stop.sh
停止Zookeeper server:
bin/zookeeper-server-stop.sh
-
單機連通性測試:
運行producer:bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
運行consumer:
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
在producer端輸入字符串並回車,consumer端顯示則表示成功。
參考資料