在大數據時代,數據規模變得越來越大。由於數據的增長速度和非結構化的特性,常用的軟硬件工具已無法在用戶可容忍的時間內對數據進行采集、管理 和處理。本文主要介紹如何在阿里雲上使用Kafka和Storm搭建大規模消息分發和實時數據流處理系統,以及這個過程中主要遭遇的一些挑戰。實踐主要立 足建立一套汽車狀態實時監控系統,可以在阿里雲上立即進行部署。
一、實時大數據處理利器——Storm和Kafka
大數據時代,隨着可獲取數據的渠道增多,比如常見的
電子商務、網絡、
傳感器的數據流、太空數據等,數據規模也變得越來越大;同時,不同的渠道往往產生更多的數據類型,這些衍生的數據增長非常之快,規模非常之大。大數據時代各個機構可謂是坐擁金山,然而目前大數據技術的應用卻仍然存在眾多挑戰,主要出現在數據收集、
存儲、處理和可視化幾個過程。
1. 數據收集
Gartner的Merv Adrian對大數據有這樣一個定義:“大數據讓常用硬件軟件工具無法在用戶可容忍時間內對數據進行采集、管理和處理。”[1]麥肯錫全球研究院在2011年5月也有這樣一個概念:“大數據是指超出典型數據庫軟件工具采集、
存儲、管理和分析能力的數據集。”[2]從上面的定義可以看出,大數據最大的挑戰在於如何在有限時間內對數據進行處理和分析,並得到有用信息。
2. 數據處理
大數據處理中最著名的工具是Hadoop,不過它並不是一套實時系統。為了解決這個問題,計算機工程師們又開發了Storm和Kafka。 Apache Storm是一套開源的分布式實時計算系統。最早由Nathan Marz[3]開發,在被Twitter收購后開源,並在2014年9月起成為Apache頂級開源項目。Storm被廣泛用於各種商業網站,包括 Twitter、Yelp、Groupon、百度、淘寶等。Storm的使用場景非常廣泛,例如實時分析、在線機器學習、連續計算、分部署RPC、ET| 等。Storm有着非常快的處理速度,單節點可以達到百萬個元組每秒,此外它還具有高擴展、容錯、保證數據處理等特性。圖1是Storm的一個簡單的架 構。
圖1 Storm架構
Apache Kafka也是一個開源的系統,旨在提供一個統一的,高吞吐、低延遲的分布式消息處理平台來對實時數據進行處理。它最早由LinkedIn開發,開源於 2011年並被貢獻給了Apache。Kafka區別於傳統RabbitMQ、Apache ActiveMQ等消息系統的地方主要在於:分布式系統特性,易於擴展;為發布和訂閱提供高吞吐量;支持多訂閱,可以自動平衡消費者;可以將消息持久化到 磁盤,可以用於批量消費,例如ETL等。
圖2 Kafka架構
二、在阿里雲上部署Storm和Kafka
我們需要設計一個實時車輛監控系統,這個系統要將汽車駕駛過程中實時的位置,速度,轉速,油耗以及轉速發送到系統中,從而可以實時計算出車流量和污染 物排放量。該系統的目標是要能同事支持10萬輛車同時發送消息,在最高峰能滿足100萬輛車。為了實現如此規模的消息分發和吞吐,我們基於Kafka和 Storm來設計實現。同時為了滿足高擴展性,我們將Storm和Kafka分別部署到不同的
服務器上,如果需要更多的計算能力,可以隨時通過創建新的
服務器的方式來完成。此外為了滿足高可用性,每台相同功能的服務器也需要至少部署2台,這樣一旦一台服務器出現問題,另外一台服務器也可以持續提供服務。
在實體服務器上部署Storm和Kafka等系統涉及到大量服務器集群和軟件的安裝部署,這個過程需要花費大量時間,而雲計算則很好的彌補了這一點——提供各種虛擬服務器和鏡像功能,加快基礎設施和軟件的部署過程。
圖3 車聯網監控系統架構
我們需要2台服務器來構建Kafka代理服務器,在Storm中還需要2台服務器來運行Spout和2個Bolt,另外在Redis層則需要2台服務器來部署緩存,再加上2台服務器作為Web服務器。服務器架構圖如圖4所示。
圖4 車聯網監控系統架構
在部署車聯網監控系統之前,我們首先需要在每台服務器上部署相應的軟件,包括Git、Libzmq、Java、G++等,用於代碼編譯和相關軟件安裝。可以使用SSH連接到相應的機器。用戶名密碼則會由阿里雲以郵件或者短消息的方式提供。
在車聯網實時監控系統中,我們需要部署4種不同類型的服務器,分別是網站前台服務器、Kafka服務器、Storm服務器和緩存服務器,以滿足上面提 到的高擴展性的要求。在每一種類型的服務器部署完成之后,都可以通過阿里雲鏡像的功能,創建一個能隨時使用的鏡像,這樣在擴展服務器的時候就不需要重新安 裝軟件,直接通過鏡像創建服務器就可以了。
以下命令需要在所有服務器上運行以安裝相應的軟件:
以下命令安裝在緩存服務器和Kafka服務器上:
另外,我們還需要在Storm的服務器安裝maven和lein用於代碼編譯:
在Kafka服務器上安裝Kafka:
對於Storm和Kafka的安裝,到這一步已基本完成,接下去需要分別創建鏡像。創建鏡像的方法是先創建阿里雲快照,然后通過將快照轉換為鏡像的方式完成。具體步驟如下:
在阿里雲的管理界面選擇雲服務器,隨后選擇該服務器的磁盤列表,點擊創建快照。
輸入快照名稱並確認。
阿里雲會自動為雲服務器的系統盤創建快照,當創建完成以后,會出現“創建自定義鏡像”按鈕。
點擊“創建自定義鏡像”的按鈕,阿里雲就會將這個快照轉換為鏡像,可以在阿里雲ECS管理界面的自定義鏡像欄中看到。
接下來,我們通過鏡像可以直接創建相同配置的ECS服務器。
圖5 從自定義鏡像中創建雲服務器
當然,在自動擴展實現上,雲服務並不需要用戶去手動執行,這里我們使用阿里雲的ECS REST API自動通過鏡像創建機器。可以參考以下Python代碼,自動創建阿里雲ECS虛擬機:
三、基於Storm和Kafka的車輛信息實時監控系統打造
接下來做的就是將車輛信息實時監控系統部署到系統中。這個系統演示了如何編寫一個Storm的Topology,從Kafka消息系統中將信息讀取出 來。我們使用Kafka的客戶端模擬從世界各地發送車輛實時信息給Kafka集群,然后Storm Topology會把這些消息通過Bolts將坐標轉換為Json對象,並且使用GeoJSON在Bing Map上顯示車輛的實時位置、溫度、轉速以及速度等等信息。Topology還會將信息寫到Redis緩存中,然后Node.js通過socket.io 讀取Redis中的信息,並且使用d3js顯示在頁面上。
首先,我們需要編寫Kafka 生產者的部分代碼,主要是模擬讀取汽車的實時數據並向Kafka集群進行發送,我們實現了一個KafkaCarDataProducer類,通過配置 ProducerConfig來創建一個Producer對象來發送數據。它可以用來連接到Zookeeper,或者直接是Kafka 代理。例如:kafkaclient.cloudapp.net:2181或者0:kafkaclient.cloudapp.net:9092。代碼中 我們根據不同的連接字符串設置不同配置。偽代碼如下:
然后就可以直接通過下面代碼來發送消息:
接下來我們需要編寫3個Storm類,首先是創建Storm的Topology,這個類叫KafkaCarTopology,我們創建了一個叫car 的topic,然后定義本機一個hosts和Zookeeper hosts,最后創建一個Spout,叫做KafkaSpout,然后添加ParseCarDataBolt連接到KafkaSout,再創建一個 RedisCarBolt,用於將結果寫入Redis緩存。最后根據參數創建3個Worker,提交Storm Topology。
在這個拓撲結構中,我們有2個Bolt用於數據的處理,第一個叫ParserCarDataBolt,這個Bolt主要將Kafka傳出的消息轉換為 Json格式,它繼承BaseBasicBolt,在execute函數中通過collector提交數據,同時重載了 declareOutputFields函數,通知下一個Bolt的數據格式。代碼如下:
數據會被寫入RedisCarBolt,再寫入到Redis緩存中。它繼承自BaseRichBolt,需要重載prepare和excute方法來處理消息元組。此外還需要重載prepare和cleanup函數,幾個關鍵的函數如下:
最后我們還需要編寫一些Node.js的代碼,保證在頁面上通過socket.io進行通訊,實時將最終數據從Redis里面讀取出來,並在BingMap上顯示。
到此為止,一個簡單的車輛信息實時監控系統就實現了,我們通過bash腳本進行編譯,並安裝到相應的服務器上,比如下列代碼需要被安裝在Storm的服務器上:
有一點需要注意的是,由於在編譯過程中需要自動下載Storm庫,在阿里雲的國內機房的虛擬機很有可能需要設置代理進行。設置代理的方法也很簡單,通過對lein命令增加以下參數就可以了:http_proxy=http://URL:PORT
接着我們在網頁上訪問http://webhostname或者運行node.js的服務器,就會看到下面的網頁,同時發現網頁將同步刷新汽車的實時位置、速度、轉速等。
圖6 車聯網監控系統演示頁面
四、對車聯網監控系統的性能測試
接下來我們對這個系統進行了一個簡單的吞吐量測試。我們只有1個Topic,使用5個partition、3個worker、1個Spout和2個 Bolt,在一台2核2GB的ECS上運行。我們使用了另外4台客戶端,每個客戶端有4核8G內存,分別啟動40個線程不斷向這個系統實時發送汽車信息, 模擬160台汽車發送的情況,其消息發送數量和CPU占用率情況如圖7所示。
圖7 車聯網監控系統性能分析
從圖7中可以看出,平均每輛汽車客戶端會模擬每秒給系統發送了1000條消息,總的吞吐量達到16萬條左右,此時平均的CPU占用率大約在30%左 右。如果系統是完全線性的,在系統CPU占用率達到90%的情況下,大約能處理48萬條消息。不過實際情況中,在阿里雲ECS上,卻發現CPU達到50% 以后,就不再上升,而客戶端發送消息的延時也逐步增加。
經過分析以后發現,由於ECS的磁盤性能無法和物理機的SSD磁盤相比,所以在Kafka消息大量寫入磁盤的過程中,吞吐量下降,磁盤讀寫負擔變得非 常大。這時我們增加了Kafka的Broker和Storm的Spout的數量,將消息分布式地分發到多台ECS上,從而實現了消息吞吐量的線性增加。
在這個系統中,我們不推薦使用大核和大內存的機器,而推薦使用多台2核2GB的服務器分布式地處理消息。這也是雲計算處理大數據的原則所在,使用橫向擴展而不用縱向擴展。
五、結論
至此我們介紹了利用Storm和Kafka實現大數據的實時處理,並且介紹了如何在雲上通過鏡像快速地創建這套系統。此外,我們還介紹了如何對 Storm、Kafka、Redis以及Node.js開發出一個實時的車輛信息監控系統。這個系統能夠實現高性能、大吞吐量和高並發。當然,隨着大數據 的快速發展,我們相信還會有越來越多好的工具和產品出現在市場上,到那時我們從大數據中獲取有效的信息將會變得更加容易和便捷。有了雲計算的幫助,開發的 周期也會變得越來越短。
責任編輯:
熊東旭
原文:http://articles.e-works.net.cn/infrastructure/Article120178_1.htm