開發Kafka通用數據平台中間件
(含本次項目全部代碼及資源)
目錄:
一. Kafka概述
二. Kafka啟動命令
三.我們為什么使用Kafka
四. Kafka數據平台中間件設計及代碼解析
五.未來Kafka開發任務
一. Kafka概述
Kafka是Linkedin於2010年12月份創建的開源消息系統,它主要用於處理活躍的流式數據。活躍的流式數據在web網站應用中非常常見,這些活動數據包括頁面訪問量(Page View)、被查看內容方面的信息以及搜索情況等內容。 這些數據通常以日志的形式記錄下來,然后每隔一段時間進行一次統計分析。
傳統的日志分析系統是一種離線處理日志信息的方式,但若要進行實時處理,通常會有較大延遲。而現有的消息隊列系統能夠很好的處理實時或者近似實時的應用,但未處理的數據通常不會寫到磁盤上,這對於Hadoop之類,間隔時間較長的離線應用而言,在數據安全上會出現問題。Kafka正是為了解決以上問題而設計的,它能夠很好地進行離線和在線應用。
1.1 Kfka部署結構:
(圖1)
1.2 Kafka關鍵字:
•Broker : Kafka消息服務器,消息中心。一個Broker可以容納多個Topic。
•Producer :消息生產者,就是向Kafka broker發消息的客戶端。
•Consumer :消息消費者,向Kafka broker取消息的客戶端。
•Zookeeper :管理Producer,Broker,Consumer的動態加入與離開。
•Topic :可以為各種消息划分為多個不同的主題,Topic就是主題名稱。Producer可以針對某個主題進行生產,Consumer可以針對某個主題進行訂閱。
•Consumer Group: Kafka采用廣播的方式進行消息分發,而Consumer集群在消費某Topic時, Zookeeper會為該集群建立Offset消費偏移量,最新Consumer加入並消費該主題時,可以從最新的Offset點開始消費。
•Partition:Kafka采用對數據文件切片(Partition)的方式可以將一個Topic可以分布存儲到多個Broker上,一個Topic可以分為多個Partition。在多個Consumer並發訪問一個partition會有同步鎖控制。
(圖2)
1.3 消息收發流程:
•啟動Zookeeper及Broker.
•Producer連接Broker后,將消息發布到Broker中指定Topic上(可以指定Patition)。
•Broker集群接收到Producer發過來的消息后,將其持久化到硬盤,並將消息該保留指定時長(可配置),而不關注消息是否被消費。
•Consumer連接到Broker后,啟動消息泵對Broker進行偵聽,當有消息到來時,會觸發消息泵循環獲取消息,獲取消息后Zookeeper將記錄該Consumer的消息Offset。
1.4 Kafka特性:
•高吞吐量
•負載均衡:通過zookeeper對Producer,Broker,Consumer的動態加入與離開進行管理。
•拉取系統:由於kafka broker會持久化數據,broker沒有內存壓力,因此,consumer非常適合采取pull的方式消費數據
•動態擴展:當需要增加broker結點時,新增的broker會向zookeeper注冊,而producer及consumer會通過zookeeper感知這些變化,並及時作出調整。
•消息刪除策略:數據文件將會根據broker中的配置要求,保留一定的時間之后刪除。kafka通過這種簡單的手段,來釋放磁盤空間。
二. Kafka啟動命令:
啟動Zookeeper服務:
zookeeper-server-start.bat ../../config/zookeeper.properties
啟動Broker服務:
kafka-server-start.bat ../../config/server.properties
通過Zookeeper的協調在Broker中創建一個Topic(主題)
kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic testTopic
查詢當前Broker中某個指定主題的配置信息
kafka-run-class.bat kafka.admin.TopicCommand --describe --zookeeper localhost:2181 --topic testTopic
啟動一個數據生產者Producer
kafka-console-producer.bat --broker-list localhost:9092 --topic testTopic
啟動一個數據消費者Consumer
kafka-console-consumer.bat --zookeeper localhost:2181 --topic testTopic --from-beginning
Zookeeper配置文件,zookeeper.properties配置片段
Broker配置文件,server.properties配置片段
關於kafka收發消息相關的配置項
1.在Broker Server中屬性(這些屬性需要在Server啟動時加載):
//每次Broker Server能夠接收的最大包大小,該參數要與consumer的fetch.message.max.bytes屬性進行匹配使用
* message.max.bytes 1000000(默認)
//Broker Server中針對Producer發送方的數據緩沖區。Broker Server會利用該緩沖區循環接收來至Producer的數據 包,緩沖區過小會導致對該數據包的分段數量增加,但不會影響數據包尺寸限制問題。
socket.send.buffer.bytes 100 * 1024(默認)
//Broker Server中針對Consumer接收方的數據緩沖區。意思同上。
socket.receive.buffer.bytes 100 * 1024(默認)
//Broker Server中針對每次請求最大的緩沖區尺寸,包括Prodcuer和Consumer雙方。該值必須大於 message.max.bytes屬性
* socket.request.max.bytes 100 * 1024 * 1024(默認)
2.在Consumer中的屬性(這些屬性需要在程序中配置Consumer時設置)
//Consumer用於接收來自Broker的數據緩沖區,意思同socket.send.buffer.bytes。
socket.receive.buffer.bytes 64 * 1024(默認)
//Consumer用於每次接收消息包的最大尺寸,該屬性需要與Broker中的message.max.bytes屬性配對使用
* fetch.message.max.bytes 1024 * 1024(默認)
3.在Producer中的屬性(這些屬性需要在程序中配置Consumer時設置)
//Producer用於發送數據緩沖區,意思同socket.send.buffer.bytes。
send.buffer.bytes 100 * 1024(默認)
三. 我們為什么使用Kafka
當前項目中,我們更希望從企業獲得盡可能多的有價值數據。最直接獲取大數據的方式是采用寫應用直連目標企業數據庫來獲得數據。但這種方式在實際應用中,會由於企業擔心開放本地數據庫而導致的安全隱患很難實施。另外,這種方式會與企業本地數據庫結構耦合度過高,會出現多家企業多個應用的情況,缺少統一的數據交互平台,導致后期維護困難。
3.1 Kafka在當前項目中問題:
當前案例,我們想把某企業的本地數據實時同步到數據中心中,之后對這些數據進行二次分析處理。我們的目標是建立統一的數據同步平台,便於在日后的多企業多系統中能有統一的實施標准,所以選用了Kafka消息系統作為支撐。
Producer(數據發送方)以獨立線程方式常駐某企業內部應用中,依靠一定的時間周期,從本地數據庫獲得數據並推送至Broker中。而Consumer(快銷組數據接收方)也是獨立與WEB框架常駐內存,獲得數據消息后保存至數據中心中。
但目前Kafka在實施中面臨以下問題:
1.Producer/Consumer均獨立於Web框架,Producer依靠消息片輪詢檢索/發送最新數據,執行效率低。
2.Producer會直接針對某企業內部數據庫表結構操作,導致代碼與企業業務耦合度過高,而無法平滑移植到其他企業系統中。
3.由於Producer/Consumer是獨立於Web框架的,在外圍負責數據的采集及推送,與Web項目主程序無切合度。
4.目前針對Kafka的數據傳輸異常處理比較簡陋,當Broker或 Zookeeper等出現異常時,有可能會導致數據安全性問題。
3.2實現目標:
針對以上問題,我們要實現如下目標:
1.把Producer/Consumer的數據推送/獲取的過程封裝成Class或者Jar包的形式,供Java Web框架調用,從而形成與企業內部Web應用或計算中心數據分析Web應用融合一體。
2.數據的推送/獲取只針對Java Object對象,不要針對數據庫表結構,不能與企業特有數據耦合度過高,形成通用的數據接口。Producer需要對Object進行序列化,Consumer需要對序列化后的二進制信息進行反序列化重建Object返回給調用者。
3.消息的推送/獲取的整個生命周期中,要把重要事件通知給外部調用者,比如:Broker,Zookeeper是否有異常,數據推送/獲取是否成功,如果失敗需要保留失敗記錄便於進行后期數據恢復等。(需要在中間件中建立回調機制通知調用者)
4.可對多企業多應用進行平滑移植,移植過程中盡可能保持整體Kafka數據平台結構的零修改。
四. Kafka數據平台中間件設計
4.1解決方案:
基於以上待完成目標,我們有了以下解決方案。
3.2 實現要點:
KfkProducer(數據生產者)
•KfkProducer對象需要在Web框架中的Application_OnStart()中啟動,常駐進程,只與Broker連接一次,數據發送過程不能與Broker建立連接。(實踐中發現Kafka的 Broker如果有異常,重啟Broker后Producer不用再次連接即可發送)
•Web框架可以隨時調用推送接口將對象(Object)推送至Broker.
•Object序列化后形成二進制信息,並且要保證在Consumer所處框架中能順利還原.
•可發送多種對象(Object,File ,Byte[]等),簡化外圍框架針對待發送數據所做操作,簡化調用接口。
•數據發送使用Kafka中最新的異步式數據發送API,不能由於發送時間過長或Broker異常等問題阻塞調用者。
•需要對整個發送生命期進行跟蹤反饋異常信息,若發送失敗,需要將待發送數據使用回調機制通知到框架調用者。
•詳細測試Broker,或Zookeeper產生異常時,Producer可能會出現的情況。
•在針對多企業多應用中,可依靠Topic進行區分數據主題,這樣可實現多應用部署時框架零修改問題。
KfkConsumer(數據消費者)
•KfkConsumer需要在計算中心內部Web框架中的Application_OnStart()中啟動,常駐進程,只與Broker連接一次,並啟動消息泵等待消息到來。(實踐中發現Kafka的 Broker如果有異常,重啟Broker后Consumer不用再次連接即可正常獲取消息)
•需要定義回調接口,該回調接口由外圍框架程序注冊處理程序,當數據消息到來時,Consumer需要把數據發送至該接口,之后由調用者處理。
•調用者需要注冊所接受的對象類型,因為Broker中同一Topic下會有各種數據對象(UserInfo,CompanyInfo,ProductInfo...)存在,所以必須提供接收對象的注冊接口,以方便調用者有針對性的獲取。
•數據到來時,要針對發送方序列化的二進制信息進行反序列化操作,並能准確還原成原始對象。
•需要對整個接收生命期進行跟蹤反饋異常信息,若消息泵停止或異常,需要通知到框架調用者。
實現以上要點后,需要將KfkProducer及KfkConsumer對象打包成Jar包的形式,更靈活的部署到企業本地Web框架及計算中心內部Web框架中。
3.3 代碼實現及分析:
3.3.1 KfkProducer 對象:數據生產者對象,封裝了關於數據發送的相關功能。
接口函數/子對象 |
說明 |
KfkProducer () |
構造函數中需要調用者提供Broker集群的Ip,Port等信息。 Kafka支持Broker集群列表。(127.0.0.1:9092,127.0.0.1:9093)
|
Connect() |
該函數需要完成對Broker集群的連接。
|
Send() |
該函數入口為Object對象,需要對該對象進行Serialize操作,根據待發送數據構造KfkMsg對象,並取得由KfkMsg序列化后的Byte[]數組,之后調用Kafka的異步發送方式及掛接回調處理函數。
要實現多個Send()接口,需要提供對Object,File ,Byte[]等多種數據類型的支持,方便調用者操作。
|
Close() |
該函數完成對Broker連接進行關閉。
|
SendCallback發送回調對象 onCompletion()發送回調接口 |
在kafka異步發送函數send()中注冊,在收到Broker返回的發送是否成功信息后,會觸發該函數,並調用ProducerEvent對象的onSendMsg()函數,向調用者發送成功與否結果。
成功則返回調用者RecordMetadata信息(BrokerServer中的數據offset,Partition位置ID,Topic主題)
失敗者返回調用者原始數據信息,便於日后恢復。
|
ProducerEvent接口對象 onSendMsg() |
為調用者提供的回調接口,調用者在注冊后,即可重寫onSendMsg()函數,以便接到通知后,處理當前事件(發送數據成功與否)狀態。
|
3.3.2 KfkConsumer對象:數據消費者對象,封裝了關於數據接收的相關功能。
接口函數/子對象 |
說明 |
KfkConsumer() |
構造函數中需要調用者提供Zookeeper集群的Ip,Port等信息。(即將推出的Kafka0.9.X版本將支持直連Broker集群的機制)
該對象繼承至Thread對象,為線程對象。 |
connect() |
配置Zookeeper連接相關屬性,並連接Zookeeper服務器。
|
run() |
線程主函數,該函數將啟動Kafka消息泵等待Broker的消息到來。
消息到來后,將調用KfkMsg對象對二進制序列化信息進行還原對象操作(KfkMsg將對序列化數據進行反序列化操作,並重新還原原始對象操作)。
對象還原后,將調用調用者注冊的回調接口,將對象傳出。
|
close() |
關閉Consumer與Broker,Zookeeper的Socket連接。
|
ConsumerEvent接收回調對象 onRecvMsg()接收回調函數 |
為調用者提供的回調接口,調用者在注冊后,即可重寫onRecvMsg()函數,以便接到通知后,收取對象或處理當前事件。
|
3.3.3 KfkMsg對象:數據消息對象,封裝了數據對象的序列化/反序列化操作,構造多種類型的發送對象,封裝發送協議等操作。
接口函數/子對象 |
說明 |
MsgBase對象 |
消息包基類,可以在Consumer接到數據消息后,形成多種對象的反序列化多態性。
|
MsgObject對象 serializeMsg()序列化函數 deserializeMsg()反序列化函數 |
針對Object數據的序列化和反序列化操作,及消息體封裝,通訊協議構造等操作。
|
MsgByteArr對象 serializeMsg()序列化函數 deserializeMsg()反序列化函數 |
針對Byte[]數據的序列化和反序列化操作,及消息體封裝,通訊協議構造等操作。
|
MsgFile對象 serializeMsg()序列化函數 deserializeMsg()反序列化函數 |
針對二進制文件的序列化和反序列化操作,及消息體封裝,通訊協議構造等操作。
|
getMsgType()函數
|
負責對Consumer接收的序列化信息進行首次協議解析,判斷對象類型(Object,File,byte[])之后構造對應的MsgXXX對象,以便使調用者進行反序列化多態功能。
|
3.3.4 SerializeUtils對象:序列化操作工具類,完成在Jar包內部對外部對象的序列化/反序列化基礎從操作。
接口函數/子對象 |
說明 |
deserialize()函數 |
將序列化后的二進制數組byte[]還原成原始Object.
由於如果使用默認的ObjectInputStream對象進行反序列化操作,在Jar內將無法找到外部調用者定義的對象名,也即無法反序列化成功,報無法找到外部對象的異常。
所以必須重寫resolveClass()函數,加載當前線程范圍內的Class上下文。
|
Serialize()函數 |
將Object序列化成二進制數組,byte[]。
|
3.3.5 調用者Web框架部署:
KfkProducer部署:
部署要點 |
說明 |
1.注冊發送消息回調函數 |
在WEB框架中的Application_OnStart()事件中向Jar注冊發送消息回調函數。並重寫onSendMsg()回調接口,用於接受發送成功/失敗消息,發送失敗后,可以在Web框架中針對返回的原始數據信息做備份/恢復處理。
|
2.建立與Broker之間的連接 |
在WEB框架中的 Application_OnStart()事件中調用KfkProducer connect()函數,連接遠程Broker。
|
3.將KfkProducer傳入框架 |
經過前兩步操作后,我們已經順利建立KfkProducer對象,現在我們需要把該對象傳入Web框架中后續頁面處理類中,以方便調用其send()函數進行數據發送。
在Play中我們使用了cache對象機制,可以在Play Web App全生命期內獲得KfkProducer對象實例。
|
4.關閉與Broker之間的連接 |
在WEB框架中的Application_OnStop()事件中調用KfkProducer的close()函數,關閉遠程Broker連接。
|
KfkConsumer部署:
部署要點 |
說明 |
1.注冊發送消息回調函數 |
在WEB框架中的Application_OnStart()事件中向Jar注冊消息接收回調函數。並重寫onRecvMsg()回調接口,用於接受來自Broker的數據信息。
在onRecvMsg()函數中,還需針對傳入的Object對象進行instanceof比對操作,區分特定對象。
|
2.注冊需要接收的Object類型 |
向Jar包中注冊需要接收的對象類型,比如本應用需要接收(UserInfo,CompanyInfo,ProdcutInfo等對象)。 注冊后,來自Broker的廣播消息將被Jar包過濾,只返回調用者所需的對象數據。
|
3.建立與Zookeeper(Broker)之間的連接 |
在WEB框架中的 Application_OnStart()事件中調用KfkConsumer connect()函數,連接遠程Zookeeper/Broker。
|
4.啟動消息泵線程 |
經過前兩步操作后,我們已經順利建立與Zookeeper/Broker建立連接。
我們需要啟動消息泵來收聽消息的到來,這里需要調用KfkConsumer對象的start()函數啟動消息泵線程常駐內存。
|
4.關閉與Zookeeper之間的連接 |
在WEB框架中的Application_OnStop()事件中調用KfkConsumer的close()函數,關閉遠程Zookeeper/Broker連接。
|
五. 未來Kafka中間件
目前該中間件只完成了初級階段功能,很多功能都不完善不深入,隨着應用業務的拓展及Kafka未來版本功能支持,。以Kafka消息中間件為中心的大數據處理平台還有很多任務去實現。
一般在互聯網中所流動的數據由以下幾種類型:
•需要實時響應的交易數據,用戶提交一個表單,輸入一段內容,這種數據最后是存放在關系數據庫(Oracle, MySQL)中的,有些需要事務支持。
•活動流數據,准實時的,例如頁面訪問量、用戶行為、搜索情況等。我們可以針對這些數據廣播、排序、個性化推薦、運營監控等。這種數據一般是前端服務器先寫文件,然后通過批量的方式把文件倒到Hadoop(離線數據分析平台)這種大數據分析器里面,進行慢慢的分析。
•各個層面程序產生的日志,例如http的日志、tomcat的日志、其他各種程序產生的日志。這種數據一個是用來監控報警,還有就是用來做分析。
謝謝觀賞!
注:基於全球開源共享理念,本人會分享更多原創及譯文,讓更多的IT人從中受益,與大家一起進步!
基因Cloud 原創,轉發請注明出處
1738387@qq.com (工作繁忙,有事發郵件,QQ不加,非要事勿擾,多謝!)
2015 / 06 / 14