Storm+kafka的HelloWorld初體驗


從16年4月5號開始學習kafka,后來由於項目需要又涉及到了storm。

經過幾天的掃盲,到今天16年4月13日,磕磕碰碰的總算是寫了一個kafka+storm的HelloWorld的例子。

為了達到前人栽樹后人乘涼的知識共享的目的,我嘗試着梳理一下過程。

 

====實例需求

由kafka消息隊列源源不斷生產數據,然后由storm進行實時消費。

大家可以設想這些數據源是不同商品的用戶行為操作行為,我們是不是就可以實時觀測到用戶關注商品的熱點呢?

 

====環境准備

(1)Linux:

 

公司暫時沒有多余的Linux主機,所以我只能在自己的電腦上建立的3台Linux虛擬機。

虛擬機的建立方法我做了一個小白級別的手冊,按照這個手冊就可以建立起虛擬機了。

 

百度雲連接地址:http://pan.baidu.com/s/1hr3lVqG

 

(2)JDK:

 

我這里使用的是:jdk-7u80-linux-x64.tar.gz。

在官方網站上下載,然后配置環境變量即可。

 

 

(3)zookeeper集群:

搭建方法省略。可以參照我的博客:http://www.cnblogs.com/quchunhui/p/5356511.html

 

(4)kafka:

搭建方法省略。可以參照我的博客:http://www.cnblogs.com/quchunhui/p/5356511.html

 

 (5)storm:

我這里使用的版本是相對穩定的:apache-storm-0.9.5.tar.gz

搭建方法省略,可以參照我的博客:http://www.cnblogs.com/quchunhui/p/5370191.html

 

(6)Maven:

開發環境的構建使用Maven。我這里使用的版本是:apache-maven-3.3.3.zip

Maven的入門可以參考我的博客:http://www.cnblogs.com/quchunhui/p/5359293.html

 

補充一下環境變量配置之后的圖,以供小白參考。

 

====程序執行方式

(1)kafka:

需要手動編寫kafka的生產者程序,然后通過eclipse等工具在Windows端啟動,以達到生產消息的目的。

 

(2)storm:

可以進行兩種方式的啟動。一種是通過eclipse等工具在Windows端啟動(俗稱本地模式)

另一種是將storm的消費者程序打成jar包發布到Linux環境上,通過Linux啟動程序進行消費(俗稱集群模式)。

 

====Storm框架前期理解

從某位大神的QQ群組里下載了一篇關於storm的基本框架以及安裝的文章

我這里共享到了我的百度雲盤上了,請大家在開始編程之前一定要看看。非常值得一看。

百度雲地址:https://pan.baidu.com/s/14q7HBAYtvHKtaTA4_dm62g

 

那么后面我們就可以開始編寫我們的程序了。首先需要編寫的是kafka的生產者程序。

 

====kafka程序相關:

我已經寫好的代碼共享到了Github上了:https://github.com/quchunhui/kafkaSample/

這里只對目錄結構以及重要部分進行說明:

(1)src/main路徑結構如下:

+---common

| Constants.java                   //這里統一定義了所有的常量,修改配置的時候只修改這里就可以。
|
+---consumer
| +---group
| | GroupConsumer.java        //kafka消費者程序。消費模型:分組消費
| |
| \---partition
| PartitionConsumer.java       //kafka消費者程序。消費模型:分區消費
|
+---producer
| +---async
| | AsyncProduce.java           //kafka生產者程序。生產模型:異步生產(本次實例相關)
| |
| +---partiton
| | SimplePartitioner.java       //message的序列化類
| |
| \---sync
| SyncProduce.java              //kafka生產者程序。生產模型:同步生產
|
\---utilities
CommonUtil.java                 //共通方法類。

 

(2)實例所用的代碼:

本次實例中,僅僅使用了kafka進行消息的生產,同事考慮到異步生產性能更高一些,

本次實例中使用了異步生產的代碼,就是上面紅色字標記的java程序(AsyncProduce.java)。

代碼本身比較簡單,其中下面紅色框的部分為【異步】的配置項,需要注意。

各個配置項的說明請參考我的另一篇博客:http://www.cnblogs.com/quchunhui/p/5357040.html

 

 

====Storm程序相關:

(1)拓撲設計

【消息源(RandomSentenceSpout)】

接入到從上面的kafka消息隊列中,將kafka作為storm的消息源。

【數據標准化(WordNormalizerBolt)】

然后使用一個Bolt進行歸一化(語句切分),句子切分成單詞發射出去。(代碼更新中。。。)

【詞頻統計(WordCountBolt)】

使用一個Bolt接受訂閱切分的單詞Tuple,進行單詞統計,並且選擇使用按字段分組的策略,詞頻實時排序,把TopN實時發射出去。(代碼更新中。。。)

【工具類(PrintBolt)】

最后使用一哥Bolt將結果打印到Log中。(代碼更新中。。。)

 

====實例代碼

我自己進行驗證用的代碼已經上傳到Github上了,可以直接下載下來使用。

這里只對代碼的目錄結構以及需要格外關注的點進行一些補充。

Git地址:https://github.com/quchunhui/storm-kafka-plus-qch

 

(1)目錄結構

src\main\java\com\dscn\helloworld


| WordCountTopology.java         // Topology代碼,程序入口,使用eclipse是需要執行該程序。
|
+---bolt
| PrintBolt.java                          // 上面講到的工具類(PrintBolt)類
| WordCountBolt.java                // 上面講到的詞頻統計(WordCountBolt)類
| WordNormalizerBolt.java          // 上面講到的數據標准化(WordNormalizerBolt)類
|
\---spout
RandomSentenceSpout.java       // 未使用

 

(2)重要代碼說明

由於源代碼已經共享給大家了,Storm的接口的用法在下面的篇幅中單獨羅列了一下,我這里不進行過多的闡述。

在這里只將我碰到過的問題羅列出來、以問題&解決方法的形式分享。 

 

【問題1】

storm是如何實現與kafka的對接的

【回答】

Spout作為storm的消息源的接入點,在這里可以同構設置Storm官方提供【storm.kafka.SpoutConfig】類來指定消息源。

 

----------------

//配置zookeeper集群地址,畢竟storm是需要集群支持的。

BrokerHosts brokerHosts = new ZkHosts("192.168.93.128:2181,192.168.93.129:2181,192.168.93.130:2181");

//配置Kafka訂閱的Topic,以及zookeeper中數據節點目錄和名字

SpoutConfig spoutConfig = new SpoutConfig(brokerHosts, "qchlocaltest", "", "topo");

spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());

//如果該Topology因故障停止處理,下次正常運行時是否從Spout對應數據源Kafka中的該訂閱Topic的起始位置開始讀取
spoutConfig.forceFromStart = true;

//zookeeper集群host

spoutConfig.zkServers = Arrays.asList(new String[] {"192.168.93.128", "192.168.93.129", "192.168.93.130"});

//zookeeper集群port

spoutConfig.zkPort = 2181;

----------------

 

【問題2】

我嘗試着自己重新寫代碼配置開發環境(不是直接使用Github上的代碼),

編譯時可以正常通過的,但是本地模式通過eclipse啟動Topology的時候,出現了log4j和slf4j的沖突問題。

【解決方法】

問題原因是由於log4j和slf4j之間的重復調用,導致死循環而致使內存溢出。

解決辦法就是log4j和slf4j保留一個, 普遍上都是保留slf4j的。

需要在Maven的pom.xml上將log4j的相關依賴移除。

移除方法:

 

可以通過【mvn dependency:tree】命令來查看修改之后的依賴關系。

如果發現需要移除的包的時候,使用Maven的【exclusion】標簽來移除依賴關系。

填寫exclusion標簽的時候,下圖中紅色的部分是groupId,藍色的部分是artifactId。

 

 

【問題3】

使用mvn install命令將程序打jar包上傳到Linux的storm目錄下,然后使用命令

[storm jar test-0.1-jar-with-dependencies.jar com.dscn.helloworld.WordCountTopology 192.168.93.128]

啟動Topology的時候,出現了下面的提示錯誤。

【解決方法】

是Maven的pom.xml的配置出現了問題。詳細請參考博客http://blog.csdn.net/luyee2010/article/details/18455237

修改方法就是強storm的scope修改為provided。如下圖所示:

 

 

【問題4】

將代碼放到實際的集群運行環境(kafka+storm+hbase)中,發現storm接受不到消息。

【原因】

一直以來都是使用kafka的異步生產來生產消息,以為都正常的生產消息了。由於異步生產的時候,並沒有消息確認機制,

所以不能確保消息是否正確的進入到了消息隊列之中,改用同步生產的代碼嘗試了一下,果然發生了一下的錯誤。

【解決辦法】

通過網上搜索[kafka Failed to send messages]關鍵字,發現有可能是需要設置advertised.host.name這個屬性。

抱着嘗試一下的心態試了一下,果然好使了。至於這個屬性的真正意義還有待探索。(TODO)

 

【問題5】

代碼在本地的時候好好的,通過storm jar命令發布到集群環境的時候,發生了Jar包沖突的問題。

【解決方法】

本來是認為自己的Maven環境的依賴有問題,也通過mvn dependency:tree查看了依賴關系,毫無問題。根本就誒有log4j-over-slf4j.jar這個包。

頭疼了很久,通過QQ群咨詢了一些朋友,他們建議我確認集群環境中storm/lib下是否存在log4j-over-slf4j.jar,如果存在就把它刪掉。

嘗試了一下之后,果然好使了。原來是我的程序的jar包和集群環境中會有沖突。詳細請參考我的另一篇博客:

http://www.cnblogs.com/quchunhui/p/5404168.html

 

====Storm接口詳解:

【IComponent接口】

Spout和Bolt都是其Component。所以,Storm定義了一個名叫IComponent的總接口。

 

IComponent的繼承關系如下圖所示:

綠色部分是我們最常用、比較簡單的部分。紅色部分是與事務相關。

BaseComponent 是Storm提供的“偷懶”的類。為什么這么說呢,它及其子類,都或多或少實現了其接口定義的部分方法。

這樣我們在用的時候,可以直接繼承該類,而不是自己每次都寫所有的方法。

但值得一提的是,BaseXXX這種定義的類,它所實現的方法,都是空的,直接返回null。

 

【Spout】

類圖如下圖所示:

 

接口如下圖所示:

 

各個接口說明:

①、open方法:

是初始化動作。允許你在該spout初始化時做一些動作,傳入了上下文,方便取上下文的一些數據。 

②、close方法

在該spout關閉前執行,但是並不能得到保證其一定被執行。

spout是作為task運行在worker內,在cluster模式下,supervisor會直接kill -9 woker的進程,這樣它就無法執行了。

而在本地模式下,只要不是kill -9, 如果是發送停止命令,是可以保證close的執行的。 

③、activatedeactivate方法 

一個spout可以被暫時激活和關閉,這兩個方法分別在對應的時刻被調用。 

④、nextTuple方法:

負責消息的接入,執行數據發射。是Spout中的最重要方法。

⑤、ack(Object)方法:

傳入的Object其實是一個id,唯一表示一個tuple。該方法是這個id所對應的tuple被成功處理后執行。 

⑥、fail(Object)方法:

同ack,只不過是tuple處理失敗時執行。 

 

我們的RandomSpout由於繼承了BaseRichSpout,

所以不用實現close、activate、deactivate、ack、fail和getComponentConfiguration方法,只關心最基本核心的部分。 

 

結論:

通常情況下(Shell和事務型的除外),實現一個Spout,可以直接實現接口IRichSpout,如果不想寫多余的代碼,可以直接繼承BaseRichSpout。 

 

【Bolt】

類圖如下圖所示:

 

這里可以看到一個奇怪的問題: 為什么IBasicBolt並沒有繼承IBolt? 我們帶着問題往下看。 

IBolt定義了三個方法: 

①、prepare方法:

IBolt繼承了java.io.Serializable,我們在nimbus上提交了topology以后,創建出來的bolt會序列化后發送到具體執行的worker上去。

worker在執行該Bolt時,會先調用prepare方法傳入當前執行的上下文。

②、execute方法:

接受一個tuple進行處理,並用prepare方法傳入的OutputCollector的ack方法(表示成功)或fail(表示失敗)來反饋處理結果。

③、cleanup方法:

同ISpout的close方法,在關閉前調用。同樣不保證其一定執行。

 

紅色部分(execute方法)是Bolt實現時一定要注意的地方。

而Storm提供了IBasicBolt接口,其目的就是實現該接口的Bolt不用在代碼中提供反饋結果了,Storm內部會自動反饋成功。

如果你確實要反饋失敗,可以拋出FailedException。

 
我們來再寫一個Bolt繼承BaseRichBolt替代ExclaimBasicBolt。代碼如下:
修改topology
運行下,結果一致。
 
結論:
通常情況下,實現一個Bolt,可以實現IRichBolt接口或繼承BaseRichBolt,
如果不想自己處理結果反饋,可以實現IBasicBolt接口或繼承BaseBasicBolt,它實際上相當於自動做掉了prepare方法和collector.emit.ack(inputTuple);

 

====推薦博客:

【整合實戰類】:

http://shiyanjun.cn/archives/934.html

http://www.tuicool.com/articles/NzyqAn

http://itindex.net/detail/51477-storm-筆記-kafka

【問題解決類】:

http://www.aboutyun.com/thread-12590-1-1.html

【Storm調優類】:

http://blog.csdn.net/derekjiang/article/details/9040243

http://www.51studyit.com/html/notes/20140329/45.html

 

--END--


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM