如何快速寫一個分布式實時應用程序


    在開源搜索引擎Iveely的0.8.0中,我們有提到Iveely Computing實時計算平台,因為Iveely搜索引擎也是基於這個平台做的開發,因此,我們可以利用這個平台,輕松構建分布式實時應用程序。在開始構建程序之前,請按照這里部署Iveely Computing,確定部署無誤之后,我們可以從下面代碼開始學習。

    不管是hadoop還是storm,都會在入門的時候,有一個WordCount示例,Iveely Computing也不例外。

    首先,WordCount代碼如下

/*
 * To change this license header, choose License Headers in Project Properties.
 * To change this template file, choose Tools | Templates
 * and open the template in the editor.
 */
package iveely.computing.example;

import com.iveely.computing.api.FieldsDeclarer;
import com.iveely.computing.api.IInput;
import com.iveely.computing.api.IOutput;
import com.iveely.computing.api.StreamChannel;
import com.iveely.computing.api.TopologyBuilder;
import com.iveely.computing.api.TopologySubmitter;
import com.iveely.computing.api.Tuple;
import java.util.HashMap;
import java.util.Random;
import java.util.TreeMap;

/**
 *
 * @author liufanping@iveely.com
 */
public class WordCount {

    public static class WordInput extends IInput {

        /**
         * Output data to collector.
         */
        private StreamChannel _channel;

        /**
         * Count of emitted.
         */
        private int emitCount = 0;

        @Override
        public void start(HashMap<String, Object> conf, StreamChannel channel) {
            _channel = channel;
        }

        @Override
        public void declareOutputFields(FieldsDeclarer declarer) {
            declarer.declare(new String[]{"word", "defaultCount"}, new Integer[]{0});
        }

        @Override
        public void nextTuple() {
            final String[] words = new String[]{"iveely", "mike", "jackson", "golda", "bertels", "blue", "china", "pan", "qq", "baidu", "ABC", "desk", "pen", "music", "play", "mouse", "mac", "windows", "microsoft", "c++", "java"};
            final Random rand = new Random();
            final String word = words[rand.nextInt(words.length)];
            final int count = 1;
            System.out.println(getName() + ":" + word);
            _channel.emit(word, count);
        }

        @Override
        public void end(HashMap<String, Object> conf) {

        }

        @Override
        public void toOutput() {
            _channel.addOutputTo(new WordOutputA());
            _channel.addOutputTo(new WordOutputB());
        }
    }

    public static class WordOutputA extends IOutput {

        /**
         * Output data to collector.
         */
        private StreamChannel _channel;

        @Override
        public void start(HashMap<String, Object> conf, StreamChannel channel) {
            _channel = channel;
        }

        @Override
        public void declareOutputFields(FieldsDeclarer declarer) {
            declarer.declare(new String[]{"word", "totalCount"}, null);
        }

        @Override
        public void execute(Tuple tuple) {
            String word = (String) tuple.get(0).toString();
            Integer defaultCount = Integer.parseInt(tuple.get(1).toString());
            _channel.emit(word, defaultCount);
        }

        @Override
        public void end(HashMap<String, Object> conf) {
            // Output map to data base or others.
        }

        @Override
        public void toOutput() {

        }
    }

    public static class WordOutputB extends IOutput {

        private TreeMap<String, Integer> map;

        /**
         * Output data to collector.
         */
        private StreamChannel _channel;

        @Override
        public void start(HashMap<String, Object> conf, StreamChannel channel) {
            map = new TreeMap<>();
            _channel = channel;
        }

        @Override
        public void declareOutputFields(FieldsDeclarer declarer) {
            declarer.declare(new String[]{"word", "totalCount"}, null);
        }

        @Override
        public void execute(Tuple tuple) {
            String word = (String) tuple.get(0).toString();
            System.out.println(this.getName() + ":" + word);
            Integer defaultCount = Integer.parseInt(tuple.get(1).toString());
            if (map.containsKey(word)) {
                int currentCount = map.get(word);
                map.put(word, defaultCount + currentCount);
            } else {
                map.put(word, defaultCount);
            }
        }

        @Override
        public void end(HashMap<String, Object> conf) {
            // Output map to data base or others.
        }

        @Override
        public void toOutput() {

        }
    }

    public static void main(String[] args) {
        TopologyBuilder builder = new TopologyBuilder("WordCount");
        builder.setInput(new WordInput(), 1);
        builder.setSlave(2);
        builder.isLocalMode = false;
        builder.setOutput(new WordOutputB(), 2);
        builder.setOutput(new WordOutputA(), 2);
        TopologySubmitter.submit(builder, args);
    }
}

     其次,代碼解析

     代碼中,包含了四個部分:

     類WordInput:數據輸入流,繼承IInput,用於數據的產生。

     類WordOutputA:數據輸出流,繼承IOutput , 用於數據的輸出。

     類WordOutputB:同上。

     main函數:執行的入口。

     第三,IInput

     public void start(HashMap<String, Object> conf, StreamChannel channel)

     類似於初始化函數,在開始產生數據之前,做的一些初始化,conf是一些配置文件,在main函數中的ToplogyBuilder中通過put方法設置進去,屬於用戶自定義數據。StreamChannel是流數據發送接口。在調用NextTuple的時候,會用到。本函數僅會調用一次。

     public void declareOutputFields(FieldsDeclarer declarer)

     用於聲明本次產生數據的格式,declarer.declare(new String[]{"word", "defaultCount"}, new Integer[]{0, 1}); 表示輸出數據有兩個字段,第一個是word本身,第二個是默認的數量,后面new Integer[]{0}非常重要,是數據分發的分組,0表示,按照“word”本身進行分組。這樣不同的word就會分發到不同的處理節點,本函數僅會調用一次。

     public void nextTuple()

     是數據的真正產生源,此方法會不斷被調用,直到產生數據完成,產生完成是通過_channel.emitEnd();方法來表示完成,是需要手動調用,否則程序將會一直無休止運行下去,產生數據之后,一定要記得將數據發送出去:_channel.emit(word, count);。

     public void end(HashMap<String, Object> conf)

     當在nextTuple中調用_channel.emitEnd();之后,會調用此方法,此方法類似於程序推出前的清理工作,此方法僅調用一次。

     public void toOutput()

     此方法表示聲明數據輸出到的下一步流程。例如_channel.addOutputTo(new WordOutputA());表示輸出的數據將會被WordOutoutA繼續處理。當然可以addOutputTo到更多的IOutput。

     第四,IOutput

     在IOutput中,大部分均和IInput中類似,不同的在於IInput中有一個nextTuple,而在IOutput中,是 public void execute(Tuple tuple),此方法的調用方式為接收到數據之后才會觸發。IOutput依然是可以輸出到多個IOutput中去。

     第五,main函數

     main函數是程序的執行入口,對於我們的程序依然是,main函數中,包含兩種模式,在這兩種模式下,代碼略有不同。

     調試模式

     用於本地調試,不需要部署Iveely Computing,可斷點,跟調試一個普通程序一樣,此刻main函數應該是這樣。

public static void main(String[] args) {
        TopologyBuilder builder = new TopologyBuilder("WordCount");
        builder.setInput(new WordInput(), 1);
        builder.setSlave(2);
        builder.isLocalMode = true;
        builder.setOutput(new WordOutputB(), 2);
        builder.setOutput(new WordOutputA(), 2);
        TopologySubmitter.submit(builder, args);
    }

     在builder參數中的isLocalMode設置為true即可。

     部署模式

     用於將程序提交到Iveely Computing,在調試模式下,確定程序無誤之后,提交給Iveely Computing運行。

 public static void main(String[] args) {
        TopologyBuilder builder = new TopologyBuilder("WordCount");
        builder.setInput(new WordInput(), 1);
        builder.setSlave(2);
        builder.isLocalMode = false;
        builder.setOutput(new WordOutputB(), 2);
        builder.setOutput(new WordOutputA(), 2);
        TopologySubmitter.submit(builder, args);
    }

      其余代碼並無太大區別。其中setSlave表示期望給多個少機器節點運行,此處是兩個,當不能滿足這么多節點的時候,會根據當前最多的節點給予分配。 builder.setInput(new WordInput(), 1); 中的第二個參數表示給予1個線程讀取數據。

      編譯文件到jar,參考這里的提交應用程序到Iveely Computing,並查看運行情況。

      總結:上述是利用WordCount示例,大致對Iveely Computing的API做了一個介紹,如果有任何疑問,請郵件我:liufanping@iveely.com。

     

      背景:開源搜索引擎Iveely 0.8.0發布,終見天日


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM