從16年4月5號開始學習kafka,后來由於項目需要又涉及到了storm。
經過幾天的掃盲,到今天16年4月13日,磕磕碰碰的總算是寫了一個kafka+storm的HelloWorld的例子。
為了達到前人栽樹后人乘涼的知識共享的目的,我嘗試着梳理一下過程。
====實例需求
由kafka消息隊列源源不斷生產數據,然后由storm進行實時消費。
大家可以設想這些數據源是不同商品的用戶行為操作行為,我們是不是就可以實時觀測到用戶關注商品的熱點呢?
====環境准備
(1)Linux:
公司暫時沒有多余的Linux主機,所以我只能在自己的電腦上建立的3台Linux虛擬機。
虛擬機的建立方法我做了一個小白級別的手冊,按照這個手冊就可以建立起虛擬機了。
百度雲連接地址:http://pan.baidu.com/s/1hr3lVqG
(2)JDK:
我這里使用的是:jdk-7u80-linux-x64.tar.gz。
在官方網站上下載,然后配置環境變量即可。
(3)zookeeper集群:
搭建方法省略。可以參照我的博客:http://www.cnblogs.com/quchunhui/p/5356511.html
(4)kafka:
搭建方法省略。可以參照我的博客:http://www.cnblogs.com/quchunhui/p/5356511.html
(5)storm:
我這里使用的版本是相對穩定的:apache-storm-0.9.5.tar.gz
搭建方法省略,可以參照我的博客:http://www.cnblogs.com/quchunhui/p/5370191.html
(6)Maven:
開發環境的構建使用Maven。我這里使用的版本是:apache-maven-3.3.3.zip
Maven的入門可以參考我的博客:http://www.cnblogs.com/quchunhui/p/5359293.html
補充一下環境變量配置之后的圖,以供小白參考。
====程序執行方式
(1)kafka:
需要手動編寫kafka的生產者程序,然后通過eclipse等工具在Windows端啟動,以達到生產消息的目的。
(2)storm:
可以進行兩種方式的啟動。一種是通過eclipse等工具在Windows端啟動(俗稱本地模式)
另一種是將storm的消費者程序打成jar包發布到Linux環境上,通過Linux啟動程序進行消費(俗稱集群模式)。
====Storm框架前期理解
從某位大神的QQ群組里下載了一篇關於storm的基本框架以及安裝的文章
我這里共享到了我的百度雲盤上了,請大家在開始編程之前一定要看看。非常值得一看。
百度雲地址:https://pan.baidu.com/s/14q7HBAYtvHKtaTA4_dm62g
那么后面我們就可以開始編寫我們的程序了。首先需要編寫的是kafka的生產者程序。
====kafka程序相關:
我已經寫好的代碼共享到了Github上了:https://github.com/quchunhui/kafkaSample/
這里只對目錄結構以及重要部分進行說明:
(1)src/main路徑結構如下:
+---common
| Constants.java //這里統一定義了所有的常量,修改配置的時候只修改這里就可以。
|
+---consumer
| +---group
| | GroupConsumer.java //kafka消費者程序。消費模型:分組消費
| |
| \---partition
| PartitionConsumer.java //kafka消費者程序。消費模型:分區消費
|
+---producer
| +---async
| | AsyncProduce.java //kafka生產者程序。生產模型:異步生產(本次實例相關)
| |
| +---partiton
| | SimplePartitioner.java //message的序列化類
| |
| \---sync
| SyncProduce.java //kafka生產者程序。生產模型:同步生產
|
\---utilities
CommonUtil.java //共通方法類。
(2)實例所用的代碼:
本次實例中,僅僅使用了kafka進行消息的生產,同事考慮到異步生產性能更高一些,
本次實例中使用了異步生產的代碼,就是上面紅色字標記的java程序(AsyncProduce.java)。
代碼本身比較簡單,其中下面紅色框的部分為【異步】的配置項,需要注意。
各個配置項的說明請參考我的另一篇博客:http://www.cnblogs.com/quchunhui/p/5357040.html
====Storm程序相關:
(1)拓撲設計
【消息源(RandomSentenceSpout)】
接入到從上面的kafka消息隊列中,將kafka作為storm的消息源。
【數據標准化(WordNormalizerBolt)】
然后使用一個Bolt進行歸一化(語句切分),句子切分成單詞發射出去。(代碼更新中。。。)
【詞頻統計(WordCountBolt)】
使用一個Bolt接受訂閱切分的單詞Tuple,進行單詞統計,並且選擇使用按字段分組的策略,詞頻實時排序,把TopN實時發射出去。(代碼更新中。。。)
【工具類(PrintBolt)】
最后使用一哥Bolt將結果打印到Log中。(代碼更新中。。。)
====實例代碼
我自己進行驗證用的代碼已經上傳到Github上了,可以直接下載下來使用。
這里只對代碼的目錄結構以及需要格外關注的點進行一些補充。
Git地址:https://github.com/quchunhui/storm-kafka-plus-qch
(1)目錄結構
src\main\java\com\dscn\helloworld
|
| WordCountTopology.java // Topology代碼,程序入口,使用eclipse是需要執行該程序。
|
+---bolt
| PrintBolt.java // 上面講到的工具類(PrintBolt)類
| WordCountBolt.java // 上面講到的詞頻統計(WordCountBolt)類
| WordNormalizerBolt.java // 上面講到的數據標准化(WordNormalizerBolt)類
|
\---spout
RandomSentenceSpout.java // 未使用
(2)重要代碼說明
由於源代碼已經共享給大家了,Storm的接口的用法在下面的篇幅中單獨羅列了一下,我這里不進行過多的闡述。
在這里只將我碰到過的問題羅列出來、以問題&解決方法的形式分享。
【問題1】
storm是如何實現與kafka的對接的
【回答】
Spout作為storm的消息源的接入點,在這里可以同構設置Storm官方提供【storm.kafka.SpoutConfig】類來指定消息源。
----------------
//配置zookeeper集群地址,畢竟storm是需要集群支持的。
BrokerHosts brokerHosts = new ZkHosts("192.168.93.128:2181,192.168.93.129:2181,192.168.93.130:2181");
//配置Kafka訂閱的Topic,以及zookeeper中數據節點目錄和名字
SpoutConfig spoutConfig = new SpoutConfig(brokerHosts, "qchlocaltest", "", "topo");
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
//如果該Topology因故障停止處理,下次正常運行時是否從Spout對應數據源Kafka中的該訂閱Topic的起始位置開始讀取
spoutConfig.forceFromStart = true;
//zookeeper集群host
spoutConfig.zkServers = Arrays.asList(new String[] {"192.168.93.128", "192.168.93.129", "192.168.93.130"});
//zookeeper集群port
spoutConfig.zkPort = 2181;
----------------
【問題2】
我嘗試着自己重新寫代碼配置開發環境(不是直接使用Github上的代碼),
編譯時可以正常通過的,但是本地模式通過eclipse啟動Topology的時候,出現了log4j和slf4j的沖突問題。
【解決方法】
問題原因是由於log4j和slf4j之間的重復調用,導致死循環而致使內存溢出。
解決辦法就是log4j和slf4j保留一個, 普遍上都是保留slf4j的。
需要在Maven的pom.xml上將log4j的相關依賴移除。
移除方法:
可以通過【mvn dependency:tree】命令來查看修改之后的依賴關系。
如果發現需要移除的包的時候,使用Maven的【exclusion】標簽來移除依賴關系。
填寫exclusion標簽的時候,下圖中紅色的部分是groupId,藍色的部分是artifactId。
【問題3】
使用mvn install命令將程序打jar包上傳到Linux的storm目錄下,然后使用命令
[storm jar test-0.1-jar-with-dependencies.jar com.dscn.helloworld.WordCountTopology 192.168.93.128]
啟動Topology的時候,出現了下面的提示錯誤。
【解決方法】
是Maven的pom.xml的配置出現了問題。詳細請參考博客:http://blog.csdn.net/luyee2010/article/details/18455237
修改方法就是強storm的scope修改為provided。如下圖所示:
【問題4】
將代碼放到實際的集群運行環境(kafka+storm+hbase)中,發現storm接受不到消息。
【原因】
一直以來都是使用kafka的異步生產來生產消息,以為都正常的生產消息了。由於異步生產的時候,並沒有消息確認機制,
所以不能確保消息是否正確的進入到了消息隊列之中,改用同步生產的代碼嘗試了一下,果然發生了一下的錯誤。
【解決辦法】
通過網上搜索[kafka Failed to send messages]關鍵字,發現有可能是需要設置advertised.host.name這個屬性。
抱着嘗試一下的心態試了一下,果然好使了。至於這個屬性的真正意義還有待探索。(TODO)
【問題5】
代碼在本地的時候好好的,通過storm jar命令發布到集群環境的時候,發生了Jar包沖突的問題。
【解決方法】
本來是認為自己的Maven環境的依賴有問題,也通過mvn dependency:tree查看了依賴關系,毫無問題。根本就誒有log4j-over-slf4j.jar這個包。
頭疼了很久,通過QQ群咨詢了一些朋友,他們建議我確認集群環境中storm/lib下是否存在log4j-over-slf4j.jar,如果存在就把它刪掉。
嘗試了一下之后,果然好使了。原來是我的程序的jar包和集群環境中會有沖突。詳細請參考我的另一篇博客:
http://www.cnblogs.com/quchunhui/p/5404168.html
====Storm接口詳解:
【IComponent接口】
Spout和Bolt都是其Component。所以,Storm定義了一個名叫IComponent的總接口。
IComponent的繼承關系如下圖所示:
綠色部分是我們最常用、比較簡單的部分。紅色部分是與事務相關。
BaseComponent 是Storm提供的“偷懶”的類。為什么這么說呢,它及其子類,都或多或少實現了其接口定義的部分方法。
這樣我們在用的時候,可以直接繼承該類,而不是自己每次都寫所有的方法。
但值得一提的是,BaseXXX這種定義的類,它所實現的方法,都是空的,直接返回null。
【Spout】
類圖如下圖所示:
接口如下圖所示:
各個接口說明:
①、open方法:
是初始化動作。允許你在該spout初始化時做一些動作,傳入了上下文,方便取上下文的一些數據。
②、close方法
在該spout關閉前執行,但是並不能得到保證其一定被執行。
spout是作為task運行在worker內,在cluster模式下,supervisor會直接kill -9 woker的進程,這樣它就無法執行了。
而在本地模式下,只要不是kill -9, 如果是發送停止命令,是可以保證close的執行的。
③、activate和deactivate方法 :
一個spout可以被暫時激活和關閉,這兩個方法分別在對應的時刻被調用。
④、nextTuple方法:
負責消息的接入,執行數據發射。是Spout中的最重要方法。
⑤、ack(Object)方法:
傳入的Object其實是一個id,唯一表示一個tuple。該方法是這個id所對應的tuple被成功處理后執行。
⑥、fail(Object)方法:
同ack,只不過是tuple處理失敗時執行。
我們的RandomSpout由於繼承了BaseRichSpout,
所以不用實現close、activate、deactivate、ack、fail和getComponentConfiguration方法,只關心最基本核心的部分。
結論:
通常情況下(Shell和事務型的除外),實現一個Spout,可以直接實現接口IRichSpout,如果不想寫多余的代碼,可以直接繼承BaseRichSpout。
【Bolt】
類圖如下圖所示:
這里可以看到一個奇怪的問題: 為什么IBasicBolt並沒有繼承IBolt? 我們帶着問題往下看。
IBolt定義了三個方法:
①、prepare方法:
IBolt繼承了java.io.Serializable,我們在nimbus上提交了topology以后,創建出來的bolt會序列化后發送到具體執行的worker上去。
worker在執行該Bolt時,會先調用prepare方法傳入當前執行的上下文。
②、execute方法:
接受一個tuple進行處理,並用prepare方法傳入的OutputCollector的ack方法(表示成功)或fail(表示失敗)來反饋處理結果。
③、cleanup方法:
同ISpout的close方法,在關閉前調用。同樣不保證其一定執行。
紅色部分(execute方法)是Bolt實現時一定要注意的地方。
而Storm提供了IBasicBolt接口,其目的就是實現該接口的Bolt不用在代碼中提供反饋結果了,Storm內部會自動反饋成功。
如果你確實要反饋失敗,可以拋出FailedException。


====推薦博客:
【整合實戰類】:
http://shiyanjun.cn/archives/934.html
http://www.tuicool.com/articles/NzyqAn
http://itindex.net/detail/51477-storm-筆記-kafka
【問題解決類】:
http://www.aboutyun.com/thread-12590-1-1.html
【Storm調優類】:
http://blog.csdn.net/derekjiang/article/details/9040243
http://www.51studyit.com/html/notes/20140329/45.html
--END--