Spout呢,是Topology中數據流的源頭,也是Storm針對數據源的編程單元。一般數據的來源,是通過外部數據源來讀取數據項(Tuple),並讀取的數據項傳輸至作業的其他組件。編程人員一般可通過OutputFieldsDeclarer類的declareStream()方法來聲明多個流,指定數據將要發送的流,然后使用SpoutOutputCollector的emit方法將數據發送。
這里整理了下ISpout和IComponent接口。
ISpout聲明了Spout的核心方法,用於向Topology供給數據項。對於每一個發出的數據項,Storm通過Spout,可以追蹤它經歷處理過程的有向無環圖(竟然也是DAG)。
void open (java,util.Map conf,TopologyContext context,SpoutOutputCollector collector) 用於實例化Spout的一個運行時任務,被急群眾的某一進程調用 (conf對象維護Storm中針對該Spout的配置信息,context是一個上下文對象,可用於獲取該組件運行時任務的信息,collector用於從該Spout發送數據項) void close() 用於停止一個Spout void activate() 在Spout從非激活狀態轉換為激活狀態時被調用 void deactivate() 在Spout的非激活狀態被調用
void ack(java.lang.Object msgId)
Storm用於確認該Spout發送的這個數據項已經被完整處理 void fail(java.lang.Object msgId) Storm用於確認該Spout發送的這個數據項已經失敗 void nextTuple() 當這個方法被調用時,Storm要求Spout發送一個數據項至output collector
(nextTuple是Spout向Topology中發送一個數據項,是Spout需要實現的最重要的方法。在可靠的Spout的一個任務中,nextTuple()、ack()、fail()三個方法的調用在一個單獨線程中循環。當不存在數據項需要發送時,nextTuple()將會休眠一小段間隔,確保不會浪費過多的CPU資源)
IComponent接口,聲明了Topology組件的通用方法。使用JAVA語言的Spout和Bolt都必須實現這個接口。
void declareOutputFields(OutputFieldsDeclarer declarer) 聲明指定輸出流的數據項結構。(這里指定了輸出流的數據項結構(schema)。參數declarer被用來聲明輸出流(stream)的id,域。 java.util.Map getComponentConfiguration() 獲取組件的配置信息
以Storm官網的WordCount來說明就是:
public class WordCount extends BaseRichSpout { public static Logger log = logger.getLogger(backtype/storm/testing/WordCount); boolean_isDistributed; SpoutOutputCollector_collector; public WordCount(){ this(true); } public WordCount(boolean isDistributed){ _isDistributed = isDistributed; } public void open(Map conf,TopologyContext context,SpoutOutputCollector collector){ _collector = collector; } public void close(){ } public void nextTuple(){ Utils.sleep(100L); String words[] = { "nathan","mike","jackson","golda","bertels" }; public void ack(Object obj){ } public void fail(Object obj){ } public void declareOutputFields(OutputFieldsDeclarer declarer){ declarer.declarer(new Fields(new String[] { "word" })); } public Map getComponentConfiguration(){ if(!_isDistributed_) { Map ret = new HashMap(); ret.put(Config.TOPOLOGY_MAX_TASK_PARALLELISM,Integer.valueOf(1)); }else{ return null; } } }
1、類中有對WordCount的兩個重載的構造函數,其中_isDistributed指明了Spout的並行度,若_isDistributed=false,則意味着這個Spout運行時僅有一份任務實例。
2、open()函數的實現,將傳入的collector賦值給局部變量,使之后通過該局部變量來操作數據項的發送。
3、declareOutputFields()函數,生命了輸出流的數據項結構。
4、nextTuple函數,讓一只執行的線程休眠100毫秒,再繼續執行下述函數體,通過線程的休眠,控制nextTuple()產生數據項的周期為0.1秒。並且在維護字符串數組中,隨機挑選一個字符串,作為"word"的域,交給變量collector作為一個Tuple發送。 (ack的作用是確認數據項是否被完整處理,這里沒做處理)
5、getComponentConfiguration()函數則返回組建的配置信息(這個實例中只有在_isDistributed=false時,才返回包含該配置項的Map數據結構。
6、其他重載函數都為空實現。
那么在Topology實現類的main函數使其作為一個spout:
TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("sentenceGenSpout",new WordCount());