在開源搜索引擎Iveely的0.8.0中,我們有提到Iveely Computing實時計算平台,因為Iveely搜索引擎也是基於這個平台做的開發,因此,我們可以利用這個平台,輕松構建分布式實時應用程序。在開始構建程序之前,請按照這里部署Iveely Computing,確定部署無誤之后,我們可以從下面代碼開始學習。
不管是hadoop還是storm,都會在入門的時候,有一個WordCount示例,Iveely Computing也不例外。
首先,WordCount代碼如下:
/* * To change this license header, choose License Headers in Project Properties. * To change this template file, choose Tools | Templates * and open the template in the editor. */ package iveely.computing.example; import com.iveely.computing.api.FieldsDeclarer; import com.iveely.computing.api.IInput; import com.iveely.computing.api.IOutput; import com.iveely.computing.api.StreamChannel; import com.iveely.computing.api.TopologyBuilder; import com.iveely.computing.api.TopologySubmitter; import com.iveely.computing.api.Tuple; import java.util.HashMap; import java.util.Random; import java.util.TreeMap; /** * * @author liufanping@iveely.com */ public class WordCount { public static class WordInput extends IInput { /** * Output data to collector. */ private StreamChannel _channel; /** * Count of emitted. */ private int emitCount = 0; @Override public void start(HashMap<String, Object> conf, StreamChannel channel) { _channel = channel; } @Override public void declareOutputFields(FieldsDeclarer declarer) { declarer.declare(new String[]{"word", "defaultCount"}, new Integer[]{0}); } @Override public void nextTuple() { final String[] words = new String[]{"iveely", "mike", "jackson", "golda", "bertels", "blue", "china", "pan", "qq", "baidu", "ABC", "desk", "pen", "music", "play", "mouse", "mac", "windows", "microsoft", "c++", "java"}; final Random rand = new Random(); final String word = words[rand.nextInt(words.length)]; final int count = 1; System.out.println(getName() + ":" + word); _channel.emit(word, count); } @Override public void end(HashMap<String, Object> conf) { } @Override public void toOutput() { _channel.addOutputTo(new WordOutputA()); _channel.addOutputTo(new WordOutputB()); } } public static class WordOutputA extends IOutput { /** * Output data to collector. */ private StreamChannel _channel; @Override public void start(HashMap<String, Object> conf, StreamChannel channel) { _channel = channel; } @Override public void declareOutputFields(FieldsDeclarer declarer) { declarer.declare(new String[]{"word", "totalCount"}, null); } @Override public void execute(Tuple tuple) { String word = (String) tuple.get(0).toString(); Integer defaultCount = Integer.parseInt(tuple.get(1).toString()); _channel.emit(word, defaultCount); } @Override public void end(HashMap<String, Object> conf) { // Output map to data base or others. } @Override public void toOutput() { } } public static class WordOutputB extends IOutput { private TreeMap<String, Integer> map; /** * Output data to collector. */ private StreamChannel _channel; @Override public void start(HashMap<String, Object> conf, StreamChannel channel) { map = new TreeMap<>(); _channel = channel; } @Override public void declareOutputFields(FieldsDeclarer declarer) { declarer.declare(new String[]{"word", "totalCount"}, null); } @Override public void execute(Tuple tuple) { String word = (String) tuple.get(0).toString(); System.out.println(this.getName() + ":" + word); Integer defaultCount = Integer.parseInt(tuple.get(1).toString()); if (map.containsKey(word)) { int currentCount = map.get(word); map.put(word, defaultCount + currentCount); } else { map.put(word, defaultCount); } } @Override public void end(HashMap<String, Object> conf) { // Output map to data base or others. } @Override public void toOutput() { } } public static void main(String[] args) { TopologyBuilder builder = new TopologyBuilder("WordCount"); builder.setInput(new WordInput(), 1); builder.setSlave(2); builder.isLocalMode = false; builder.setOutput(new WordOutputB(), 2); builder.setOutput(new WordOutputA(), 2); TopologySubmitter.submit(builder, args); } }
其次,代碼解析。
代碼中,包含了四個部分:
類WordInput:數據輸入流,繼承IInput,用於數據的產生。
類WordOutputA:數據輸出流,繼承IOutput , 用於數據的輸出。
類WordOutputB:同上。
main函數:執行的入口。
第三,IInput。
public void start(HashMap<String, Object> conf, StreamChannel channel)
類似於初始化函數,在開始產生數據之前,做的一些初始化,conf是一些配置文件,在main函數中的ToplogyBuilder中通過put方法設置進去,屬於用戶自定義數據。StreamChannel是流數據發送接口。在調用NextTuple的時候,會用到。本函數僅會調用一次。
public void declareOutputFields(FieldsDeclarer declarer)
用於聲明本次產生數據的格式,declarer.declare(new String[]{"word", "defaultCount"}, new Integer[]{0, 1}); 表示輸出數據有兩個字段,第一個是word本身,第二個是默認的數量,后面new Integer[]{0}非常重要,是數據分發的分組,0表示,按照“word”本身進行分組。這樣不同的word就會分發到不同的處理節點,本函數僅會調用一次。
public void nextTuple()
是數據的真正產生源,此方法會不斷被調用,直到產生數據完成,產生完成是通過_channel.emitEnd();方法來表示完成,是需要手動調用,否則程序將會一直無休止運行下去,產生數據之后,一定要記得將數據發送出去:_channel.emit(word, count);。
public void end(HashMap<String, Object> conf)
當在nextTuple中調用_channel.emitEnd();之后,會調用此方法,此方法類似於程序推出前的清理工作,此方法僅調用一次。
public void toOutput()
此方法表示聲明數據輸出到的下一步流程。例如_channel.addOutputTo(new WordOutputA());表示輸出的數據將會被WordOutoutA繼續處理。當然可以addOutputTo到更多的IOutput。
第四,IOutput。
在IOutput中,大部分均和IInput中類似,不同的在於IInput中有一個nextTuple,而在IOutput中,是 public void execute(Tuple tuple),此方法的調用方式為接收到數據之后才會觸發。IOutput依然是可以輸出到多個IOutput中去。
第五,main函數。
main函數是程序的執行入口,對於我們的程序依然是,main函數中,包含兩種模式,在這兩種模式下,代碼略有不同。
調試模式
用於本地調試,不需要部署Iveely Computing,可斷點,跟調試一個普通程序一樣,此刻main函數應該是這樣。
public static void main(String[] args) { TopologyBuilder builder = new TopologyBuilder("WordCount"); builder.setInput(new WordInput(), 1); builder.setSlave(2); builder.isLocalMode = true; builder.setOutput(new WordOutputB(), 2); builder.setOutput(new WordOutputA(), 2); TopologySubmitter.submit(builder, args); }
在builder參數中的isLocalMode設置為true即可。
部署模式
用於將程序提交到Iveely Computing,在調試模式下,確定程序無誤之后,提交給Iveely Computing運行。
public static void main(String[] args) { TopologyBuilder builder = new TopologyBuilder("WordCount"); builder.setInput(new WordInput(), 1); builder.setSlave(2); builder.isLocalMode = false; builder.setOutput(new WordOutputB(), 2); builder.setOutput(new WordOutputA(), 2); TopologySubmitter.submit(builder, args); }
其余代碼並無太大區別。其中setSlave表示期望給多個少機器節點運行,此處是兩個,當不能滿足這么多節點的時候,會根據當前最多的節點給予分配。 builder.setInput(new WordInput(), 1); 中的第二個參數表示給予1個線程讀取數據。
編譯文件到jar,參考這里的提交應用程序到Iveely Computing,並查看運行情況。
總結:上述是利用WordCount示例,大致對Iveely Computing的API做了一個介紹,如果有任何疑問,請郵件我:liufanping@iveely.com。
