【原】storm源碼之storm代碼結構【譯】


說明:本文翻譯自Storm在GitHub上的官方Wiki中提供的Storm代碼結構描述一節Structure of the codebase,希望對正在基於Storm進行源碼級學習和研究的朋友有所幫助。

Storm的源碼共分為三個不同的層次。

首先,Storm在設計之初就考慮到了兼容多語言開發。Nimbus是一個thrift服務,topologies被定義為Thrift結構體.Thrift的運用使得Storm可以被任意開發語言使用。

其次,Storm的所有接口都是Java語言來定義的。因此,盡管Storm中的很多功能實現都是Clojure代碼,但是使用這些功能都必須通過Java API。這意味着Storm的所有特性對於Java來講都是可用的。

第三,Storm的很大一部分實現都是Clojure代碼。從代碼行來看,差不多是一半Java代碼,一半Clojure代碼。但是由於Clojure在表達能力上更為見長,因此,實際上絕大多數邏輯的實現都是Clojure來做的。

接下來的小節里將會逐個詳細解釋這三個層次。

storm.thrift


要理解Storm的代碼結構,首先需要看的是storm.thrift文件。

Storm使用了從這里folk出來的Thrift版本來自動生成代碼。這個Thrift版本實際上是將所有的Java packages都重命名為"org.apache.thrift7"之后的Thrift 7。除此之外,它與Thrfit 7是完全一樣的。之所以單獨出這樣一個Thrift版本一是考慮到Thrift缺少向后兼容,而是為了避免包名沖突以滿足一些用戶在他們自己的topologies中用到其他版本的thrift。

一個topology中的任何一個spout或bolt都會被用戶指定一個唯一標識,稱為"component id"。當描述1個bolt接收其他哪些spout或bolt的輸出時需要用到這個"component id"。StormTopology結構中保存了1個map來保存"component id"到"component"的映射關系,這個映射關系包含所有的component類型(即所有的spout、bolt)。

Thrift對Spout或bolt的定義是相同的,因此我們只需要看一下bolt的thrift定義。它包含了1個"ComponentObject"結構和1個"ComponentCommon"結構。

"ComponentObject"即是bolt的實現實體。它可以是以下三個類型之一:

 

  1. 1個序列化的java對象(這個對象實現IBolt接口)
  2. 1個"ShellComponent"對象,意味着bolt是由其他語言實現的。如果以這種方式來定義1個bolt,Storm將會實例化1個ShellBolt對象來負責處理基於JVM的worker進程與非JVM的component(即該bolt)實現體之間的通訊。
  3. 1個"JavaObject"結構,這個結構告訴Storm實例化這個bolt所需要得classname和構造函數參數。這一點在你想用非JVM語言來定義topology時比較有用。這樣,在你使用非JVM語言來定義topology時就可以做到既使用基於JVM的spout或bolt,同時又不需要創建並序列化它們的Java對象。


"ComponentCommon"定義了這個component的其他所有屬性。包括:

 

  1. 這個component發射什么stream以及stream的元數據(是否是direct stream,stream中field的聲明)
  2. 這個component接收什么stream(被定義在1個component_id到stream_id的map里,在stream做分組時用到)
  3. 這個component的並行度
  4. 這個component的配置項configuration


注意,在spout的結構中同樣有"ComponentCommon"字段,因此,spout也是可以被聲明接收其他的stream輸入。然而,Storm Java API並沒有提供一種方式指定spout接收什么stream,同時如果你在這里指定1個spout的輸入聲明,在提交這個topology時將會出現報錯信息。之所以這樣設計,是因為spout的輸入聲明不是讓用戶自己來使用的,而是Storm內部使用的。Storm會在內部自動向topology添加stream和bolt來構造acking framework,其中的兩個stream就是從acker bolt發出給topology中的所有spout節點的。只要1個tuple樹被檢測到完成了或失敗了,acker就會通過這兩個stream分別發出"ack"或"fail"消息。將用戶提交的topology轉換成運行時的topology的代碼可參見這里

Java接口


Storm的接口定義都是Java接口。主要的接口如下:

  1. IRichBolt
  2. IRichSpout
  3. TopologyBuilder


這樣定義這些接口的主要意圖在於:

  1. 以Java語言來定義接口
  2. 基於此接口,可以做到在不同的場合,提供出各自最適合的默認實現基類


這一策略的實際運用可以參考BaseRichSpout

Spout和bolt就是按照以上接口描述的方式被序列化到topology的Thrift定義結構中。

值得一提的一個細節是,IBolt、ISpout與IRichBolt、IRichSpout這兩對接口是有區別的。它們主要區別是在"Rich"版本里增加了"declareOutputFields"方法。這樣設計的原因是所有的輸出stream的輸出field聲明都必須是在Thrift結構里的(這樣就可以做到使用任何編程語言來聲明了),但是用戶又希望能夠在自己的class中來聲明stream輸出field信息。為解決這個問題,"TopologyBuilder"在構造Thrift結構時就是通過調用"declareOutputFields"方法來得到輸出field的聲明,然后將其轉換納入Thrift結構。這個轉換操作可以從"TopologyBuilder"代碼中的這一段里看到。

接口實現


通過將Storm所有的接口都由Java語言來定義確保了Storm的所有功能對於Java來講都是可使用的。同時,Java接口的使用也使得Java用戶在使用Storm時體驗更好。

應該說,Storm主要是由Clojure語言實現的。盡管從代碼行數上看一半是Java一半是Clojure,但其實里面絕大多數的邏輯實現都是Clojure。有兩個值得一提的例外就是DRPC支持事務的topology,它們二者都純Java實現的。這樣做的主要目的是來展示如何基於Storm,實現Storm之上更高層次的抽象。DRPC和支持事務的topology的實現分別位於backtype.storm.coordinationbacktype.storm.transactional包里。

這里總結了一份主要的Java包和Clojure命名空間的內容列表:

Java包

 

  • backtype.storm.coordination:實現了DRPC和事務性topology里用到的基於Storm的批處理功能。這個包里最重要得類是CoordinatedBolt
  • backtype.storm.drpc:DRPC的更高層次抽象的具體實現
  • backtype.storm.generated:自動生成的Thrift代碼(利用這里folk出來的Thrift版本生成的,主要是把org.apache.thrift包重命名成org.apache.thrift7來避免與其他Thrift版本的沖突)
  • backtype.storm.grouping:包含了用戶實現自定義stream分組類時需要用到的接口
  • backtype.storm.hooks:定義了處理storm各種事件的鈎子接口,例如當task發射tuple時、當tuple被ack時。關於鈎子的手冊詳見這里
  • backtype.storm.serialization:storm序列化/反序列化tuple的實現。在Kryo之上構建。
  • backtype.storm.spout:spout及相關接口的定義(例如"SpoutOutputCollector")。也包括了"ShellSpout"來實現非JVM語言定義spout的協議。
  • backtype.storm.task:bolt及相關接口的定義(例如"OutputCollector")。也包括了"ShellBolt"來實現非JVM語言定義bolt的協議。最后,"TopologyContext"也是在這里定義的,用來在運行時供spout和bolt使用以獲取topology的執行信息。
  • backtype.storm.testing:包括了storm單元測試中用到的各種測試bolt及工具。
  • backtype.storm.topology:在Thrift結構之上的Java層,用以提供一個純Java API來使用Storm(用戶不需要了解Thrift的細節)。"TopologyBuilder"及不同spout和bolt的基類們也在這里定義。稍高一層次的接口"IBasicBolt"也在這里定義,它會使得創建某些特定類型的bolt會更加簡潔。
  • backtype.storm.transactional:包括了事務性topology的實現。
  • backtype.storm.tuple:包括Storm中tuple數據模型的實現。
  • backtype.storm.utils:包含了Storm源碼中用到的數據結構及各種工具類。


Clojure 命名空間

 

  • backtype.storm.bootstrap:包括了1個有用的宏來引入源碼中用到的所有類及命名空間。
  • backtype.storm.clojure:包括了利用Clojure為Storm定義的特定領域語言(DSL)。
  • backtype.storm.cluster:Storm守護進程中用到的Zookeeper邏輯都封裝在這個文件中。這部分代碼提供了API來將整個集群的運行狀態映射到Zookeeper的"文件系統"上(例如哪里運行着怎樣的task,每個task運行的是哪個spout/bolt)。
  • backtype.storm.command.*:這些命名空間包括了各種"storm xxx"開頭的客戶端命令行的命令實現。這些實現都很簡短。
  • backtype.storm.config:Clojure中config的讀取/解析實現。同時也包括了工具函數來告訴nimbus、supervisor等守護進程在各種情況下應該使用哪些本地目錄。例如:"master-inbox"函數會返回本地目錄告訴Nimbus應該將上傳給它的jar包保存到哪里。
  • backtype.storm.daemon.acker:"acker" bolt的實現。這是Storm確保數據被完全處理的關鍵組成部分。
  • backtype.storm.daemon.common:Storm守護進程用到的公共函數,例如根據topology的名字獲取其id,將1個用戶定義的topology映射到真正運行的topology(真正運行的topology是在用戶定義的topology基礎上添加了ack stream及acker bolt,參見system-topology!函數),同時包括了各種心跳及Storm中其他數據結構的定義。
  • backtype.storm.daemon.drpc:包括了DRPC服務器的實現,用來與DRPC topology一起使用。
  • backtype.storm.daemon.nimbus:包括了Nimbus的實現。
  • backtype.storm.daemon.supervisor:包括了Supervisor的實現。
  • backtype.storm.daemon.task:包括了spout或bolt的task實例實現。包括了處理消息路由、序列化、為UI提供的統計集合及spout、bolt執行動作的實現。
  • backtype.storm.daemon.worker:包括了worker進程(1個worker包含很多的task)的實現。包括了消息傳輸和task啟動的實現。
  • backtype.storm.event:包括了1個簡單的異步函數的執行器。Nimbus和Supervisor很多場合都用到了異步函數執行器來避免資源競爭。
  • backtype.storm.log:定義了用來輸出log信息給log4j的函數。
  • backtype.storm.messaging.*:定義了1個高一層次的接口來實現點對點的消息通訊。工作在本地模式時Storm會使用內存中的Java隊列來模擬消息傳遞。工作在集群模式時,消息傳遞使用的是ZeroMQ。通用的接口在protocol.clj中定義。
  • backtype.storm.stats:實現了向Zookeeper中寫入UI使用的統計信息時如何進行匯總。實現了不同粒度的聚合。
  • backtype.storm.testing:包括了測試Storm topology的工具。包括時間仿真,運行一組固定數量的tuple然后獲得輸出快照的"complete-topology","tracker topology"可以在集群"空閑"時做更細粒度的控制操作,以及其他工具。
  • backtype.storm.thrift:包括了自動生成的Thrift API的Clojure封裝以使得使用Thrift結構更加便利。
  • backtype.storm.timer:實現了1個后台定時器來延遲執行函數或者定時輪詢執行。Storm不能使用Java里的Timer類,因為為了單測Nimbus和Supervisor,必須要與時間仿真集成起來使用。
  • backtype.storm.ui.*:Storm UI的實現。完全獨立於其他的代碼,通過Nimbus的Thrift API來獲取需要的數據。
  • backtype.storm.util:包括了Storm代碼中用到的通用工具函數。
  • backtype.storm.zookeeper:包括了Clojure對Zookeeper API的封裝,同時也提供了一些高一層次的操作例如:"mkdirs"、"delete-recursive"


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM