Apache Storm


作者:jiangzz 電話:15652034180 微信:jiangzz_wx 微信公眾賬號:jiangzz_wy

背景介紹

流計算:將大規模流動數據在不斷變化的運動過程中實現數據的實時分析,捕捉到可能有用的信息,並把結果發送到下一計算節點。
主流流計算框架:Kafka Streaming、Apache Storm、Spark Streaming、Flink DataStream等。

  • Kafka Streaming:是一套基於Kafka-Streaming庫的一套流計算工具jar包,具有簡單容易集成等特點。
  • Apache Storm:流處理框架實現對流數據流的處理和狀態管理等操作。
  • Spark Streaming:是基於Spark 離散 RDD 的小批量內存計算,因為計算速度比較快,被認定為實時|處理。
  • Flink DataStream:Stateful Computations over Data Streams,Flink核心是一個流式的數據流執行引擎,其針對數據流的分布式計算提供了數據分布、數據通信以及容錯機制等功能,在流處理領域中借鑒了Storm和SparkStreaming的優勢作為第三代流處理框架。

What is Apache Storm?

Apache Storm提供了一種基於Topology流計算概念,該概念等價於hadoop的mapreduce計算,但是不同於MapReduce計算因為MR計算會最終終止,但是Topology計算會一直運行下去,除非用戶執行storm kill指令該計算才會終止.Storm提供了高可靠/可擴展/高度容錯的流計算服務 ,該服務可以保證數據|Tuple可靠性處理(至少一次|精確1次)處理機制.可以方便的和現用戶的服務進行集成,例如:HDFS/Kafka/Hbase/Redis/Memcached/Yarn等服務集成.Storm的單個階段每秒鍾可以處理100萬條數據|Tuple

Storm 架構

在這里插入圖片描述
nimbus:計算任務的主節點,負責分發代碼/分配任務/故障檢測 Supervisor任務執行.

supervisor:接受來自Nimbus的任務分配,啟動Worker進程執行計算任務.

zookeeper:負責Nimbus和Supervisor協調,會使用zk存儲nimbus和supervisor進程狀態信息,這就導致了Nimbus和Supervisor是無狀態的可以實現任務快速回復,即而讓流計算達到難以置信的穩定.

Worker:是Supervisor專門為某一個Topology任務啟動的一個Java 進程,Worker進程通過執行Executors(線程池)計算任務,每個任務是同Task執行,一個Task就代表一個線程.一旦Topology被kill,該Topology下的所有worker進程退出.

Storm環境

Storm 編譯

由於Storm在和消息隊列Kafka整合的的時候,存在bugSTORM-3046為了后期的使用方便這里,需要大家下載storm-1.2.3源碼包,然后通過Maven編譯。
在這里插入圖片描述
編譯Storm

[root@CentOS ~]# tar -zxf apache-maven-3.3.9-bin.tar.gz -C /usr/
[root@CentOS ~]# vi .bashrc

if [ -f /etc/bashrc ]; then
        . /etc/bashrc
fi
M2_HOME=/usr/apache-maven-3.3.9
STORM_HOME=/usr/apache-storm-1.2.3
JAVA_HOME=/usr/java/latest
CLASSPATH=.
PATH=$PATH:$JAVA_HOME/bin:$STORM_HOME/bin:$M2_HOME/bin
export JAVA_HOME
export CLASSPATH
export PATH
export STORM_HOME
export M2_HOME
[root@CentOS ~]# source .bashrc

[root@CentOS ~]#
[root@CentOS ~]#
[root@CentOS ~]#
[root@CentOS ~]#
[root@CentOS ~]# tar -zxf storm-1.2.3.tar.gz
[root@CentOS ~]# cd storm-1.2.3
[root@CentOS storm-1.2.3]# mvn clean package install -DskipTests=true
[root@CentOS storm-1.2.3]# cd storm-dist/binary
[root@CentOS binary]# mvn package -Dgpg.skip=true

指令執行結束后,在storm-1.2.3/storm-dist/binary/target目錄下會產生apache-storm-1.2.3.tar.gz文件.該文件為Storm運行文件。

Maven依賴問題

進入storm-1.2.3的文件后再pom.xml問價中添加兩個配置

部署配置

<distributionManagement>
    <repository>
        <id>nexus</id>
        <name>admin</name>
        <url>http://localhost:8081/nexus/content/repositories/releases/</url>
    </repository>
</distributionManagement>

源碼插件

<plugin>
    <groupId>org.apache.maven.plugins</groupId>
    <artifactId>maven-source-plugin</artifactId>
    <version>3.0.1</version>
    <configuration>
        <attach>true</attach>
    </configuration>
    <executions>
        <execution>
            <phase>compile</phase>
            <goals>
                <goal>jar</goal>
            </goals>
        </execution>
    </executions>
</plugin>

然后執行以下腳本,耐心等待,即可將該版本的所有依賴 jar和源碼jar部署到私服上了。

[root@CentOS storm-1.2.3]# mvn deploy -DskipTests=true

在這里插入圖片描述

Storm安裝

  • 安裝JDK,要求1.8+,配置JAVA_HOME
[root@CentOS ~]# rpm -ivh jdk-8u171-linux-x64.rpm
Preparing...                ########################################### [100%]
   1:jdk1.8                 ########################################### [100%]
Unpacking JAR files...
        tools.jar...
        plugin.jar...
        javaws.jar...
        deploy.jar...
        rt.jar...
        jsse.jar...
        charsets.jar...
        localedata.jar...
[root@CentOS ~]# vi .bashrc        
JAVA_HOME=/usr/java/latest
CLASSPATH=.
PATH=$PATH:$JAVA_HOME/bin
export JAVA_HOME
export CLASSPATH
export PATH
[root@CentOS ~]# source .bashrc
  • 配置主機名和IP映射關系
[root@CentOS ~]# vi /etc/hosts
127.0.0.1   localhost localhost.localdomain localhost4 localhost4.localdomain4
::1         localhost localhost.localdomain localhost6 localhost6.localdomain6
192.168.38.129 CentOS
  • 關閉防火牆
[root@CentOS ~]# vi /etc/hosts
[root@CentOS ~]# service iptables stop
iptables: Setting chains to policy ACCEPT: filter          [  OK  ]
iptables: Flushing firewall rules:                         [  OK  ]
iptables: Unloading modules:                               [  OK  ]
[root@CentOS ~]# chkconfig iptables off
  • 安裝Zookeeper
[root@CentOS ~]# tar -zxf zookeeper-3.4.6.tar.gz -C /usr/
[root@CentOS ~]# mkdir zkdata
[root@CentOS ~]# cp /usr/zookeeper-3.4.6/conf/zoo_sample.cfg /usr/zookeeper-3.4.6/conf/zoo.cfg
[root@CentOS ~]# vi /usr/zookeeper-3.4.6/conf/zoo.cfg
tickTime=2000
dataDir=/root/zkdata
clientPort=2181
[root@CentOS ~]# /usr/zookeeper-3.4.6/bin/zkServer.sh start zoo.cfg
JMX enabled by default
Using config: /usr/zookeeper-3.4.6/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
[root@CentOS ~]# /usr/zookeeper-3.4.6/bin/zkServer.sh status zoo.cfg
JMX enabled by default
Using config: /usr/zookeeper-3.4.6/bin/../conf/zoo.cfg
Mode: standalone
  • 安裝配置Storm
[root@CentOS ~]# tar -zxf apache-storm-1.2.2.tar.gz -C /usr/
[root@CentOS ~]# vi .bashrc 
STORM_HOME=/usr/apache-storm-1.2.3
JAVA_HOME=/usr/java/latest
CLASSPATH=.
PATH=$PATH:$JAVA_HOME/bin:$STORM_HOME/bin
export JAVA_HOME
export CLASSPATH
export PATH
export STORM_HOME

[root@CentOS ~]# source .bashrc 
[root@CentOS ~]# vi /usr/apache-storm-1.2.3/conf/storm.yaml 
 storm.zookeeper.servers:
     - "CentOS"
 storm.local.dir: "/usr/apache-storm-1.2.3/storm-stage"
 nimbus.seeds: ["CentOS"]
 supervisor.slots.ports:
     - 6700
     - 6701
     - 6702
     - 6703
  • 啟動Storm
[root@CentOS ~]# nohup storm nimbus >/dev/null 2>&1 &
[root@CentOS ~]# nohup storm supervisor >/dev/null 2>&1 &
[root@CentOS ~]# nohup storm ui >/dev/null 2>&1 & 

啟動storm ui是Storm計算的狀態頁面,用戶可以訪問該頁面完成對Topology任務的管理和並行度參數察看等。

  • 檢測啟動成功
[root@CentOS ~]# jps
1682 nimbus
1636 QuorumPeerMain
2072 Jps
1721 Supervisor
1772 core

訪問:http://CentOS:8080

在這里插入圖片描述

Topology

Topologies:包含實時計算的邏輯,一個完成Topology(任務),是由SpoutBolt連接形成一個計算圖表Graph(DAG).Spout和Bolt組件通過stream grouping |Shuffle方式連接起來.

Streams:Streams是一些列無止境的Tuples,這些Tuple都有schema ,該Schema描述了Tuple中的filed的名字.

Streams <==> List<Tuple> + Schema(field1,field2,...) 
元組 元素構成數組+ 命名元素|schema
Tuple t=new Tuple(new Object[]{zhangsan,18,true})
String name=t.getStringByField("name");
Boolean sex=t.getBooleanByField("sex");

Spouts:負責產生Tuple,是Streams源頭.通常是通過Spout讀取外圍系統的數據,並且將數據封裝成Tuple,並且將封裝Tuple發射|emit到Topology中.IRichSpout |BaseRichSpout

Bolts:所有的Topology中的Tuple是通過Bolt處理,Bolt作用是用於過濾/聚合/函數處理/join/存儲數據到DB中等.

IRichBolt|BaseRichBolt,IBasicBolt|BaseBasicBolt,IStatefulBolt | BaseStatefulBolt

參考:http://storm.apache.org/releases/1.2.2/Concepts.html

入門案例

public class TopologyBuilderTests {
    public static void main(String[] args) throws Exception {
        //1.創建TopologyBuilder,編制Topology任務
        TopologyBuilder builder=new TopologyBuilder();
        Config config = new Config();
        config.setDebug(false);
        config.setNumAckers(0);
        config.setNumWorkers(2);

        //2.設置Spout,不間斷的向streams中發射字符串
        builder.setSpout("WordSpout",new WordSpout());

        //3.設置Bolt用於處理上游的Tuple,並將一行文本拆解成單個字符
        builder.setBolt("LineSplitBolt",new LineSplitBolt(),2)
                .setNumTasks(4)
                .shuffleGrouping("WordSpout");

        //4.將上游的字符按照 word分區的形式發送給Bolt計算字符和字符出現的次數
        builder.setBolt("WordCountBolt",new WordCountBolt(),4)
                .fieldsGrouping("LineSplitBolt",new Fields("word"));

        //5.按照word分區接收來自上游Bolt的輸出,將結果打印在控制台
        builder.setBolt("WordPrintBolt",new WordPrintBolt(),3)
                .fieldsGrouping("WordCountBolt", new Fields("word"));

        //6.將編制的Topology提交
        StormSubmitter.submitTopology("Hello World",config,builder.createTopology());
    }
}

WordSpout

public class WordSpout extends BaseRichSpout {
    private  SpoutOutputCollector collector;
    private String[] lines={"this is a test demo",
            "Strom is easy to learn"};
    @Override
    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
        this.collector=collector;
    }

    @Override
    public void nextTuple() {
        Utils.sleep(1000);
        String line = lines[new Random().nextInt(lines.length)];
        collector.emit(new Values(line));
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("line"));
    }
}

LineSplitBolt

public class LineSplitBolt extends BaseRichBolt {
    private OutputCollector collector;
    @Override
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
        this.collector=collector;
    }

    @Override
    public void execute(Tuple input) {
        String[] words = input.getStringByField("line").split("\\W+");
        for (String word : words) {
            collector.emit(new Values(word));
        }
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word"));
    }
}

WordCountBolt

public class WordCountBolt extends BaseRichBolt {
    private OutputCollector collector;
    private Map<String,Integer> wordMap;
    @Override
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
        this.collector=collector;
        wordMap=new HashMap<String,Integer>();
    }

    @Override
    public void execute(Tuple input) {
        String word = input.getStringByField("word");
        int count=0;
        if(!wordMap.containsKey(word)){
            count=1;
        }else{
            count=wordMap.get(word)+1;
        }
        wordMap.put(word,count);
        collector.emit(new Values(word,count));
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word","count"));
    }
}

WordPrintBolt

public class WordPrintBolt extends BaseRichBolt {
  @Override
  public void prepare(Map stormConf,TopologyContext context, OutputCollector collector) { }

  @Override
  public void execute(Tuple input) {
    System.out.println(input);
  }
  @Override
  public void declareOutputFields(OutputFieldsDeclarer declarer) { }
}
  • 任務提交

遠程提交

[root@CentOS ~]# storm jar xxx.jar 入口類全限定名

仿真提交

//測試環境
LocalCluster cluster=new LocalCluster();
cluster.submitTopology("Hello World",config,topology);

將StormSubmitter更改為LocalCluster即可

  • 查看任務列表
[root@CentOS ~]# storm list
Topology_name        Status     Num_tasks  Num_workers  Uptime_secs
-------------------------------------------------------------------
Hello World          ACTIVE     12         2            58
  • 刪除Topology
[root@CentOS ~]# storm  kill 'Hello World'

Storm 任務並行度

分析入門案例的代碼Storm集群在接收到以上的Topology之后會嘗試將任務拆解成4個階段,並且得到如下矢量狀態執行圖:

在這里插入圖片描述
可以在Storm UI頁面中看到共計啟動10個線程運行12個Task.
在這里插入圖片描述
在用戶提交一個任務的時候,Storm 首先會根據用戶設定的config.setNumWorkers方法啟動Woker服務,由Worker進程負責對整個Topology的任務並行度進行均分。也就說一個並行度就代表1個線程。在上訴入門案例中系統啟動了2個Work服務,該Worker服務負責將負責這10個線程的執行。其中LineSplitBolt並行度設置為2但是Task設置為4,Storm會啟動兩個線程實例化4個LineSplitBolt負責將Spout發射的字符串拆分成字符。

在這里插入圖片描述

現在可以將入門案例任務執行繪制如下:

在這里插入圖片描述
storm rebalance

[root@CentOS ~]# storm rebalance 'Hello World' -w 5 -n 4

在這里插入圖片描述

[root@CentOS ~]# storm rebalance 'Hello World' -w 5 -n 2 -e LineSplitBolt=4

在這里插入圖片描述
參考:http://storm.apache.org/releases/1.2.2/Understanding-the-parallelism-of-a-Storm-topology.html

消息的可靠性保障

Storm 消息Tuple可以通過一個叫做__ackerBolt 去監測整個Tuple Tree是否能夠被完整消費,如果消費超時或者失敗該__acker 會調用Spout組件的fail方法,要求Spout重新發送Tuple.默認__ackerBolt並行度是和Worker數目一直,用戶可以通過config.setNumAckers(0);關閉Storm的Acker機制。

Acker機制使用策略

//Spout在發射 tuple 的時候必須提供msgID
collector.emit(new Values(line),i);
//所有的下游的Bolt,必須錨定當前tuple,並且在處理完后,必須調用ack方法
try {
    //錨定 當前 tuple
    collector.emit(tuple,new Values(word,count));
    //給上游應答
    collector.ack(tuple);
} catch (Exception e) {
    //通知失敗
    collector.fail(tuple);
}

Spout ack方法只有當TupleTree 被完整處理后才會被__ackerBolt調用Spout#ack方法,但是Spout#Fail調用有兩種情形:

①當下游有任何一個Bolt調用了collector.fail(tuple);

__acker 監測超時默認時間30 secs系統都會調用fail方法.

當啟用Storm的Ack機制之后,處理的每個Tuple都必須被ack或fail。 因為Storm使用內存來跟蹤每個元組,因此如果沒有ack/fail每個Tuple,那么__ackerBolt最終會耗盡內存。

WordSpout寫法

public class WordSpout extends BaseRichSpout {
    private SpoutOutputCollector collector;
    private String[] lines={"this is a test demo",
                            "Strom is easy to learn"};
    @Override
    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
        this.collector=collector;
    }

    @Override
    public void nextTuple() {
        Utils.sleep(1000);
        int msgId = new Random().nextInt(lines.length);
        String line = lines[msgId];
        //啟動Acker機制
        collector.emit(new Values(line),msgId);
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("line"));
    }

    @Override
    public void ack(Object msgId) {
        //Tuple被完整處理系統回調
    }

    @Override
    public void fail(Object msgId) {
        //Tuple處理失敗或者超時回調
    }
}

LineSplitBolt

public class LineSplitBolt extends BaseRichBolt {
    private OutputCollector collector;
    @Override
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
        this.collector=collector;
    }

    @Override
    public void execute(Tuple input) {
        String[] words = input.getStringByField("line").split("\\W+");
        try {
            for (String word : words) {
                //設定Input錨定
                collector.emit(input,new Values(word));
            }
            //進行確認Ack
            collector.ack(input);
        } catch (Exception e) {
            //失敗通知
            collector.fail(input);
        }
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word"));
    }
}

在開啟可靠傳輸之后,所有的Bolt都必須遵循這種ack和fail機制,為了方便Storm提供了IBasicBolt/BaseBasicBolt類幫助用戶簡化開發步驟,以上代碼可以更改為:

public class LineSplitBolt extends BaseBasicBolt {
    private OutputCollector collector;
    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word"));
    }
    @Override
    public void execute(Tuple input, BasicOutputCollector collector) {
        String[] words = input.getStringByField("line").split("\\W+");
        for (String word : words) {
            collector.emit(new Values(word));
        }
    }
}

如果可以重放元組,我如何使我的應用程序正常工作?

與軟件設計一樣,答案是“它取決於”具體的應用場景。如果你真的想要一次語義使用Trident API。在某些情況下,與大量分析一樣,丟棄數據是可以的,因此通過將acker bolt的數量設置為0 Config.TOPOLOGY_ACKERS來禁用容錯。但在某些情況下,您希望確保所有內容都至少處理過一次。如果所有操作都是冪等的,或者重復刪除可能會發生,那么這尤其有用。

去除Storm可靠性,提升性能方案?

  • 在Spout發射tuple時候,不提供msgID
  • 在config設置config.setNumAckers(0);
  • 取消在Bolt端的錨定,第一個bolt執行ack方法.

如何防止Spout掛起的Tuple數目過多,導致Topology內存緊張?

config.setMaxSpoutPending(100);

設置一個spout task上面最多有多少個沒有處理(ack/fail)的tuple,防止tuple隊列過大, 只對可靠任務起作用.

可靠機制算法原理解析
在這里插入圖片描述
​ T1 ^ T2 ^ Tn .... ^T1 ^ T2 .. Tn =0

http://storm.apache.org/releases/1.2.2/Guaranteeing-message-processing.html

Storm 容錯機制(了解)

Nimbus和Supervisor都是stateless ,即使服務宕機,機器可以通過Zookeeper和本地磁盤數據對任務進行恢復.如果Worker宕機會用Supervisor重新啟動Worker進行,如果Supervisor宕機會有Nimbus將任務分配其他的Supervisor進行調度.

參考:http://storm.apache.org/releases/1.2.2/Daemon-Fault-Tolerance.html

Storm 狀態管理

概述

Storm提供了一種機制使得Bolt可以存儲和查詢自己的操作的狀態,目前Storm提供了一個默認的實現,該實現基於內存In-Memory實現,除此之外還提供了基於Redis/Memcached和Hbase等的實現.Storm提供了IStatefulBolt|BaseStatefulBolt 用於實現Bolt的狀態管理.

public class WordCountBolt extends BaseStatefulBolt<KeyValueState> {

    private  KeyValueState<String,Integer> state;
    private  OutputCollector collector;

    @Override
    public void prepare(Map stormConf, TopologyContext tc, OutputCollector collector) {
        this.collector=collector;
    }

    @Override
    public void execute(Tuple tuple) {
        String word = tuple.getStringByField("word");
        //獲取word值,如果沒有返回默認值 1
        Integer count = state.get(word, 0);
        state.put(word,count+1);
        //設置錨定
        collector.emit(tuple,new Values(word,count+1));
        //設置ack應答
        collector.ack(tuple);
    }

    @Override
    public void initState(KeyValueState keyValueState) {
        this.state=keyValueState;
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word","count"));
    }
}

Storm默認實現使用InMemoryKeyValueState,該種方式在JVM退出的時候,無法持久化Bolt狀態,因此通常在開發環境下會使用Redis或者Hbase充當存儲.

RedisState

<dependency>
    <groupId>org.apache.storm</groupId>
    <artifactId>storm-redis</artifactId>
    <version>1.2.2</version>
</dependency>
config.put(Config.TOPOLOGY_STATE_PROVIDER,"org.apache.storm.redis.state.RedisKeyValueStateProvider");

Map<String,Object> stateConfig=new HashMap<String,Object>();
Map<String,Object> redisConfig=new HashMap<String,Object>();
redisConfig.put("host","CentOS");
redisConfig.put("port",6379);
stateConfig.put("jedisPoolConfig",redisConfig);
ObjectMapper objectMapper=new ObjectMapper();
config.put(Config.TOPOLOGY_STATE_PROVIDER_CONFIG,objectMapper.writeValueAsString(stateConfig));

將結果存儲到Redis-RedisStoreBolt

JedisPoolConfig poolConfig = new JedisPoolConfig.Builder()
    .setHost("CentOS").setPort(6379).build();
RedisStoreMapper storeMapper = new WordCountStoreMapper();
RedisStoreBolt storeBolt = new RedisStoreBolt(poolConfig, storeMapper);

RedisStoreMapper

public class WordCountStoreMapper implements RedisStoreMapper {
    private RedisDataTypeDescription description;
    public WordCountStoreMapper() {
        description = new RedisDataTypeDescription(
            RedisDataTypeDescription.RedisDataType.SORTED_SET, "zwordscount");
    }
    @Override
    public RedisDataTypeDescription getDataTypeDescription() {
        return description;
    }
    @Override
    public String getKeyFromTuple(ITuple iTuple) {
        return iTuple.getStringByField("word");
    }
    @Override
    public String getValueFromTuple(ITuple iTuple) {
        return iTuple.getIntegerByField("count")+"";
    }
}

HbaseState

<dependency>
    <groupId>org.apache.storm</groupId>
    <artifactId>storm-hbase</artifactId>
    <version>1.2.2</version>
</dependency>
     config.put(Config.TOPOLOGY_STATE_PROVIDER,"org.apache.storm.hbase.state.HBaseKeyValueStateProvider");

Map<String,Object> hbaseConfig=new HashMap<String,Object>();
hbaseConfig.put("hbase.zookeeper.quorum", "CentOS");

config.put("hbase.conf", hbaseConfig);

ObjectMapper objectMapper=new ObjectMapper();
Map<String,Object> stateConfig=new HashMap<String,Object>();
stateConfig.put("hbaseConfigKey","hbase.conf");
stateConfig.put("tableName","baizhi:state");
stateConfig.put("columnFamily","cf1");
config.put(Config.TOPOLOGY_STATE_PROVIDER_CONFIG,objectMapper.writeValueAsString(stateConfig));

將計算結果存儲到Hbase-HBaseBolt

SimpleHBaseMapper mapper = new SimpleHBaseMapper()
    .withRowKeyField("word")
    .withColumnFields(new Fields("word"))
    .withCounterFields(new Fields("count"))
    .withColumnFamily("cf1");

HBaseBolt hbaseBolt = new HBaseBolt("baizhi:t_word_count", mapper)
    .withConfigKey("hbase.conf");

Map<String,Object> hbaseConfig=new HashMap<String,Object>();
        hbaseConfig.put("hbase.zookeeper.quorum", "CentOS");

config.put("hbase.conf", hbaseConfig);

Storm狀態管理實現機制
在這里插入圖片描述

參考:http://storm.apache.org/releases/1.2.2/State-checkpointing.html

Distributed RPC

Storm的DRPC真正的實現了並行計算.Storm Topology接受用戶的參數進行計算,然后最終將計算結果以Tuple形式返回給用戶.
在這里插入圖片描述
搭建DRPC服務器

vi /usr/apache-storm-1.2.2/conf/storm.yaml

 storm.zookeeper.servers:
     - "CentOS"
 storm.local.dir: "/usr/apache-storm-1.2.2/storm-stage"
 nimbus.seeds: ["CentOS"]
 supervisor.slots.ports:
     - 6700
     - 6701
     - 6702
     - 6703
 drpc.servers:
     - "CentOS"
 storm.thrift.transport: "org.apache.storm.security.auth.plain.PlainSaslTransportPlugin"

啟動DRPC服務器

[root@CentOS ~]# nohup storm drpc  >/dev/null 2>&1 &
[root@CentOS ~]# nohup storm nimbus >/dev/null 2>&1 &
[root@CentOS ~]# nohup storm supervisor >/dev/null 2>&1 &
[root@CentOS ~]# nohup storm ui >/dev/null 2>&1 &

配置Maven依賴

<dependency>
    <groupId>org.apache.storm</groupId>
    <artifactId>storm-core</artifactId>
    <version>${storm.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.storm</groupId>
    <artifactId>storm-redis</artifactId>
    <version>${storm.version}</version>
</dependency>
public class WordCountRedisLookupMapper implements RedisLookupMapper {
    /**
     *
     * @param input 輸入的Tuple
     * @param value Redis查詢到的值
     * @return:將 返回的結果封裝成 values
     */
    @Override
    public List<Values> toTuple(ITuple input, Object value) {
        Object id = input.getValue(0);
        List<Values> values = Lists.newArrayList();
        if(value == null){
          value = 0;
        }
        values.add(new Values(id, value));

        return values;
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("id", "num"));
    }

    @Override
    public RedisDataTypeDescription getDataTypeDescription() {
        return  new RedisDataTypeDescription(
                RedisDataTypeDescription.RedisDataType.HASH, "wordcount");
    }

    @Override
    public String getKeyFromTuple(ITuple tuple) {
         return tuple.getString(1);
    }
    @Override
    public String getValueFromTuple(ITuple tuple) {
        return null;
    }
}
---
public class TopologyDRPCStreeamTest {
    public static void main(String[] args) throws Exception {
        //1.創建TopologyBuilder,編制Topology任務
        LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("count");
        Config conf = new Config();
        conf.setDebug(false);
        JedisPoolConfig poolConfig = new JedisPoolConfig.Builder()
                .setHost("192.168.38.129").setPort(6379).build();
        RedisLookupMapper lookupMapper = new WordCountRedisLookupMapper();
        RedisLookupBolt lookupBolt = new RedisLookupBolt(poolConfig, lookupMapper);
        builder.addBolt(lookupBolt,4);
        StormSubmitter.submitTopology("drpc-demo", conf, builder.createRemoteTopology());
    }
}
---
public class TestDRPCTests {
    public static void main(String[] args) throws TException {
        Config conf = new Config();
        conf.put("storm.thrift.transport", "org.apache.storm.security.auth.plain.PlainSaslTransportPlugin");
        conf.put(Config.STORM_NIMBUS_RETRY_TIMES, 3);
        conf.put(Config.STORM_NIMBUS_RETRY_INTERVAL, 10);
        conf.put(Config.STORM_NIMBUS_RETRY_INTERVAL_CEILING, 20);
        DRPCClient client = new DRPCClient(conf, "192.168.38.129", 3772);

        String result = client.execute("count", "hello");
        System.out.println(result);
    }
}

Storm 集成 Kafka/Redis

Maven 依賴

<dependency>
    <groupId>org.apache.storm</groupId>
    <artifactId>storm-core</artifactId>
    <version>${storm.version}</version>
</dependency>

<dependency>
    <groupId>org.apache.storm</groupId>
    <artifactId>storm-redis</artifactId>
    <version>${storm.version}</version>
</dependency>

<dependency>
    <groupId>org.apache.storm</groupId>
    <artifactId>storm-kafka-client</artifactId>
    <version>${storm.version}</version>
</dependency>

鏈接Kafka

TopologyBuilder tp=new TopologyBuilder();
Config conf = new Config();


KafkaSpoutConfig.Builder<String,String> builder=
    new KafkaSpoutConfig.Builder<String,String>("CentOS:9092,CentOS:9093,CentOS:9094", 
    "topic01");

// null的tuple不處理
builder.setEmitNullTuples(false);
// 設置key-value 序列化
builder.setProp(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
builder.setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
// 設置消費組
builder.setProp(ConsumerConfig.GROUP_ID_CONFIG,"consumer_id_01");
// 設置讀取策略,從上一次未提交提交偏移量開始
builder.setFirstPollOffsetStrategy(
    KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_LATEST);

// 開啟Tuple的應答,只有正常應答的Tuple,對應的offset才會提交
builder.setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.AT_LEAST_ONCE);
// 設置每個分區最大未提交Commit數目,如果單個分區超設置值,系統就不在繼續poll對應分區的數據,
// 解決Storm壓背問題問題
builder.setMaxUncommittedOffsets(2);

// 設置 KafkaSpout
tp.setSpout("KafkaSpout",new KafkaSpout<>(builder.build()));

LineSplitBolt

public class LineSplitBolt extends BaseBasicBolt {

    @Override
    public void execute(Tuple input, BasicOutputCollector collector) {
        String[] tokens = input.getStringByField("value").split("\\W+");
        for (String token : tokens) {
            collector.emit(new Values(token));
        }
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word"));
    }
}
tp.setBolt("LineSplitBolt",new LineSplitBolt()).shuffleGrouping("KafkaSpout");

這里需要注意必須BaseBasicBolt,否則會導致系統重新讀取Kafka信息,導致數據重復計算。

WordCountBolt

public class WordCountBolt extends BaseStatefulBolt<KeyValueState<String,Integer>> {
    private KeyValueState<String,Integer> state;
    private OutputCollector collector;
    @Override
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
        this.collector=collector;
    }

    @Override
    public void execute(Tuple input) {

        String word = input.getStringByField("word");
        Integer count = state.get(word, 0)+1;
        state.put(word,count);
        collector.emit(input,new Values(word,count));
        collector.ack(input);
    }

    @Override
    public void initState(KeyValueState<String, Integer> state) {
        this.state=state;
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word","count"));
    }

Config conf = new Config();
conf.put(Config.TOPOLOGY_STATE_PROVIDER,
         "org.apache.storm.redis.state.RedisKeyValueStateProvider");
String jedisConfigJson ="{\"jedisPoolConfig\":{\"port\":6379,\"host\":\"CentOS\"}}";
conf.put(Config.TOPOLOGY_STATE_PROVIDER_CONFIG,jedisConfigJson);

RedisStoreBolt

public class WordCountRedisStoreMapper implements RedisStoreMapper {
    @Override
    public RedisDataTypeDescription getDataTypeDescription() {
        return new RedisDataTypeDescription(RedisDataTypeDescription.RedisDataType.HASH,
                                            "wordcount");
    }

    @Override
    public String getKeyFromTuple(ITuple tuple) {
        return tuple.getStringByField("word");
    }

    @Override
    public String getValueFromTuple(ITuple tuple) {
        System.out.println("tuple:\t"+tuple);
        return tuple.getIntegerByField("count").toString();
    }
}
JedisPoolConfig jedisPoolConfig=new JedisPoolConfig.Builder()
    .setHost("CentOS")
    .setPort(6379)
    .setTimeout(5000)
    .build();

tp.setBolt("RedisStoreBolt",
           new RedisStoreBolt(jedisPoolConfig, new WordCountRedisStoreMapper()))
           .fieldsGrouping("WordCountBolt",new Fields("word"));

整體代碼

TopologyBuilder tp=new TopologyBuilder();

//設置Redis Store存儲
Map<String,Object> stateConfig=new HashMap<String,Object>();
Map<String,Object> redisConfig=new HashMap<String,Object>();
redisConfig.put("host","CentOS");
redisConfig.put("port",6379);
stateConfig.put("jedisPoolConfig",redisConfig);
ObjectMapper objectMapper=new ObjectMapper();
Config conf = new Config();
conf.put(Config.TOPOLOGY_STATE_PROVIDER,"org.apache.storm.redis.state.RedisKeyValueStateProvider");
String jedisConfigJson =objectMapper.writeValueAsString(stateConfig);
conf.put(Config.TOPOLOGY_STATE_PROVIDER_CONFIG,jedisConfigJson);


//設置Kafka鏈接參數
KafkaSpoutConfig.Builder<String,String> builder=new KafkaSpoutConfig.Builder<String,String>(
    "CentOS:9092,CentOS:9093,CentOS:9094", "topic01");
//設置key-value 序列化
builder.setProp(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
builder.setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
//設置消費組
builder.setProp(ConsumerConfig.GROUP_ID_CONFIG,"consumer_id_01");
//設置讀取策略,從上一次未提交提交偏移量開始
builder.setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_LATEST);
//開啟Tuple的應答,只有正常應答的Tuple,對應的offset才會提交
builder.setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.AT_LEAST_ONCE);
//設置每個分區最大未提交Commit偏移量,如果單個分區超設置值,系統就不在繼續poll對應分區的數據,解決壓背問題問題
builder.setMaxUncommittedOffsets(2);
builder.setEmitNullTuples(false);
tp.setSpout("KafkaSpout",new KafkaSpout<>(builder.build()));

tp.setBolt("LineSplitBolt",new LineSplitBolt())
    .shuffleGrouping("KafkaSpout");

tp.setBolt("WordCountBolt",new WordCountBolt())
    .fieldsGrouping("LineSplitBolt",new Fields("word"));
//設置Redis Store Bolt鏈接參數
JedisPoolConfig jedisPoolConfig=new JedisPoolConfig.Builder()
    .setHost("CentOS")
    .setPort(6379)
    .setTimeout(5000)
    .build();

tp.setBolt("RedisStoreBolt",new RedisStoreBolt(jedisPoolConfig,new WordCountRedisStoreMapper()))
    .fieldsGrouping("WordCountBolt",new Fields("word"));
//任務提交
StormSubmitter.submitTopology("kafkademo",conf,tp.createTopology());

[root@CentOS ~]# storm jar storm-1.0-SNAPSHOT.jar com.baizhi.demo09.KafkaSpoutTopology --artifacts 'org.apache.storm:storm-redis:1.2.2,org.apache.storm:storm-kafka-client:1.2.2'

Storm 窗口函數

Storm核心支持處理窗口內的一組元組。 Windows使用以下兩個參數指定:

  • 窗口長度- the length or duration of the window
  • 滑動間隔- the interval at which the windowing slides

Sliding Window

Tuples以窗口進行分組,窗口每間隔一段滑動間隔滑動除一個新的窗口。例如下面就是一個基於時間滑動的窗口,窗口每間隔10秒鍾為一個窗口,每間隔5秒鍾滑動一次窗口,從下面的案例中可以看到,滑動窗口是存在一定的重疊,也就是說一個tuple可能屬於1~n個窗口 。

........| e1 e2 | e3 e4 e5 e6 | e7 e8 e9 |...
-5      0       5            10          15   -> time
|<------- w1 -->|
        |<---------- w2 ----->|
                |<-------------- w3 ---->|

Tumbling Window

Tuples以窗口分組,窗口滑動的長度恰好等於窗口長度,這就導致和Tumbling Window和Sliding Window最大的區別是Tumbling Window沒有重疊,也就是說一個Tuple只屬於固定某一個window。

| e1 e2 | e3 e4 e5 e6 | e7 e8 e9 |...
0       5             10         15    -> time
   w1         w2            w3
TopologyBuilder tp=new TopologyBuilder();

tp.setSpout("ClickSpout",new ClickSpout(),1);

tp.setBolt("ClickMapBolt",new ClickMapBolt(),4)
   .shuffleGrouping("ClickSpout");

ClickWindowCountBolt wb = new ClickWindowCountBolt();
wb.withWindow(BaseWindowedBolt.Duration.seconds(2),
             BaseWindowedBolt.Duration.seconds(1));
tp.setBolt("ClickWindowCountBolt",wb,3)
   .fieldsGrouping("ClickMapBolt",new Fields("click"));

tp.setBolt("PrintClickBolt",new PrintClickBolt())
   .fieldsGrouping("ClickWindowCountBolt",new Fields("click"));

LocalCluster lc=new LocalCluster();
lc.submitTopology("window",new Config(),tp.createTopology());

Tuple時間戳和亂序

默認情況下,窗口中跟蹤的時間戳是Bolt處理元組的時間。窗口計算基於處理時間戳執行。 Storm支持根據源生成的時間戳跟蹤窗口。

  • 指定屬性該屬性值必須是long類型,如果不存在就會拋出異常
public BaseWindowedBolt withTimestampField(String fieldName)
  • TimestampExtractor該實現也可以提取時間戳
public BaseWindowedBolt withTimestampExtractor(TimestampExtractor timestampExtractor)

推薦使用第二個方案,因為比較靈活。例如:

public class EventTimeTimestampExtractor implements TimestampExtractor {
    @Override
    public long extractTimestamp(Tuple tuple) {
        SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd hh:mm:ss");
        Long start =tuple.getLongByField("timestamp");
        String word = tuple.getStringByField("word");
        System.out.println("收到:"+word+"\t"+sdf.format(start));
        return tuple.getLongByField("timestamp");
    }
}

在給定時間戳之后,系統可以根據給定的時間戳計算window,此時允許用戶處理最大亂序的時間.

public BaseWindowedBolt withLag(Duration duration)

如果超過最大延遲時間的元素系統會默認忽略處理。用戶可以指定亂序處理的邏輯。用於專門處理亂序流邏輯。

public BaseWindowedBolt withLateTupleStream(String streamId)

例如:

SlidingWindowBolt slidingWindowBolt = new SlidingWindowBolt();
slidingWindowBolt.withTumblingWindow(Duration.seconds(5))
    .withTimestampExtractor(new EventTimeTimestampExtractor())
    .withLag(Duration.seconds(2))
    .withLateTupleStream("late");

tp.setBolt("SlidingWindowBolt", slidingWindowBolt).fieldsGrouping("ExtractorBolt",new Fields("word"));
tp.setBolt("late",new LateBolt()).shuffleGrouping("SlidingWindowBolt","late");

Watermarks

為了處理帶有時間戳字段的Tuple,Storm會根據傳入的Tuple的時間戳在內部計算Watermarks。Watermarks是所有輸入流中最新元組時間戳(減去滯后)的最大值(原文介紹有誤)。定期(默認每秒),發出Watermarks時間戳,如果正在使用基於元組的時間戳,則將其視為窗口計算的時鍾周期。默認計算watermarker頻率是1秒。用戶可以通過 public BaseWindowedBolt withWatermarkInterval(Duration interval)​指定計算的評率。
在這里插入圖片描述

e01,155479500000
e02,155479501000
e03,155479504000
e04,155479505000
e05,155479506000                          
Config conf = new Config();
conf.setNumWorkers(1);
conf.setMessageTimeoutSecs(60);

TopologyBuilder tp = new TopologyBuilder();
//設置Kafka鏈接參數
KafkaSpoutConfig.Builder<String,String> builder=new KafkaSpoutConfig.Builder<String,String>(
    "CentOS:9092,CentOS:9093,CentOS:9094", "topic01");
//設置key-value 序列化
builder.setProp(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
builder.setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
//設置消費組
builder.setProp(ConsumerConfig.GROUP_ID_CONFIG,"consumer_id_"+ UUID.randomUUID());
//設置讀取策略,從上一次未提交提交偏移量開始
builder.setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_LATEST);
//開啟Tuple的應答,只有正常應答的Tuple,對應的offset才會提交
builder.setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.AT_LEAST_ONCE);
//設置每個分區最大未提交Commit偏移量,如果單個分區超設置值,系統就不在繼續poll對應分區的數據,解決壓背問題問題
builder.setMaxUncommittedOffsets(100);
builder.setEmitNullTuples(false);

tp.setSpout("KafkaSpout",new KafkaSpout<String,String>(builder.build()));

tp.setBolt("ExtractorBolt",new ExtractorBolt()).shuffleGrouping("KafkaSpout");

SlidingWindowBolt slidingWindowBolt = new SlidingWindowBolt();
slidingWindowBolt.withTumblingWindow(Duration.seconds(5))
    .withTimestampExtractor(new EventTimeTimestampExtractor())
    .withLag(Duration.seconds(2))
    .withWatermarkInterval(Duration.seconds(1))
    .withLateTupleStream("late");

tp.setBolt("SlidingWindowBolt", slidingWindowBolt).fieldsGrouping("ExtractorBolt",new Fields("word"));

tp.setBolt("late",new LateBolt()).shuffleGrouping("SlidingWindowBolt","late");
new LocalCluster().submitTopology("word-count",conf,tp.createTopology());

注意:①消息超時時間Timeout必須遠遠大於window length + sliding interval 否則會導致消息因為沒有能夠及時被完全處理,而導致Storm系統重發。②所有的Bolt都必須實現BaseBasicBolt,這樣用戶用關注Tuple的Acker ③ 所有在BaseWindowedBolt中的Tuple必須acker。否則系統同樣會因為Tuple超時而導致Tuple重發。

WindowJoin

Storm核心支持在JoinBolt的幫助下將多個數據流合並為一個。 JoinBolt是一個Windowed Bolt,即它等待配置的窗口持續時間以匹配正在連接的流之間的Tuple。

public static KafkaSpout<String,String> buiderKafkaSpout(String topic,String group){
    KafkaSpoutConfig.Builder<String,String> builder=
        new KafkaSpoutConfig.Builder<String,String>(
        "CentOS:9092,CentOS:9093,CentOS:9094", topic);
    //設置key-value 序列化
    builder.setProp(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    builder.setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    //設置消費組
    builder.setProp(ConsumerConfig.GROUP_ID_CONFIG,"consumer_id_"+ group);
    //設置讀取策略,從上一次未提交提交偏移量開始
    builder.setFirstPollOffsetStrategy(
        KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_LATEST);
    //開啟Tuple的應答,只有正常應答的Tuple,對應的offset才會提交
    builder.setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.AT_LEAST_ONCE);
    //設置每個分區最大未提交Commit偏移量,如果單個分區超設置值,系統就不在繼續poll對應分區的數據,解決壓背問題問題
    builder.setMaxUncommittedOffsets(100);
    builder.setEmitNullTuples(false);
    return new KafkaSpout<String,String>(builder.build());
}
TopologyBuilder tp=new TopologyBuilder();
Config conf = new Config();
conf.setNumWorkers(1);
conf.setMessageTimeoutSecs(120);

tp.setSpout("spout01",buiderKafkaSpout("topic01","g2"));
tp.setSpout("spout02",buiderKafkaSpout("topic02","g2"));

tp.setBolt("UserBolt",new UserBolt()).shuffleGrouping("spout01");
tp.setBolt("OrderBolt",new OrderBolt()).shuffleGrouping("spout02");

JoinBolt joinBolt = new JoinBolt(JoinBolt.Selector.STREAM,"user","uid")
    .leftJoin("order", "uid", "user")
    .select("uid,name,item,price")
    .withTumblingWindow(BaseWindowedBolt.Duration.seconds(60));

tp.setBolt("joinBolt",joinBolt)
    .fieldsGrouping("UserBolt","user",new Fields("uid"))
    .fieldsGrouping("OrderBolt","order",new Fields("uid"));

tp.setBolt("PrintJoinBolt",new PrintJoinBolt()).fieldsGrouping("joinBolt",new Fields("uid"));

new LocalCluster().submitTopology("join",conf,tp.createTopology());

Trident Tutorial

Trident 是構建在Storm之上一個用於實時流處理的高級API(抽象),它允許您無縫地將高吞吐量有狀態流處理與低延遲分布式查詢混合在一起。Trident支持jion/聚合/分組/函數/過濾算子使用,很優雅將以上算子翻譯Toplogy任務,用戶無需關心翻譯過程.Trident可以保證整個Topology數據處理的一致性並且保證exactly-once語義處理 ,Trident中的核心數據模型是“流”,Trident所有的針對流處理都是批量處理List[Tuple],例如:

[Tupe1,Tupe2,Tupe3,...,Tupen]
         |
[Tupe1,Tupe2,Tupe3],[Tupe4,Tupe5,Tupe6] ...

Stream在集群中的節點之間進行分區,並且應用於流的操作跨每個分區並行運行。

Trident和Kafka整合

<dependency>
    <groupId>org.apache.storm</groupId>
    <artifactId>storm-core</artifactId>
    <version>${storm.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.storm</groupId>
    <artifactId>storm-kafka-client</artifactId>
    <version>${storm.version}</version>
</dependency>
TridentTopology topology = new TridentTopology();
Config conf = new Config();
KafkaTridentSpoutOpaque<String, String> kafkaTridentSpoutOpaque = buildKafkaTridentSpoutOpaque(conf,"topic01");
topology.newStream("kafka",kafkaTridentSpoutOpaque)
    .peek(new Consumer() {
        @Override
        public void accept(TridentTuple input) {
            System.out.println(input);
        }
    });
StormSubmitter.submitTopology("demo01",conf,topology.build());
public static KafkaTridentSpoutOpaque<String,String> buildKafkaTridentSpoutOpaque(Config conf,String topic){
    //開啟事務控制,否則系統不會提交offset STORM-2675 在1.2.0以前無法正常運行
    conf.put(Config.TRANSACTIONAL_ZOOKEEPER_PORT,2181);
    conf.put(Config.TRANSACTIONAL_ZOOKEEPER_SERVERS, Arrays.asList(new String[]{"CentOS"}));

    //注意如果是1.2.2以前版本存在bug STORM-3064 在1.2.3和2.0.0版本已經修復
    String servers="CentOS:9092,CentOS:9093,CentOS:9094";
    KafkaSpoutConfig.Builder<String,String> builder=
        new KafkaSpoutConfig.Builder<String,String>(servers,topic);

    builder.setProp(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    builder.setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,  StringDeserializer.class.getName());
    builder.setProp(ConsumerConfig.GROUP_ID_CONFIG,"consumer_id_00");
    builder.setRecordTranslator(new DefaultRecordTranslator<String,String>());

    //如果消費者第一次消費,則從latest位置,如果不是則從上一次未提交為位置
    builder.setFirstPollOffsetStrategy(
        KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_LATEST);
    //設置offset為手動提交,只有事務提交的時候offset才提交
    builder.setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.AT_LEAST_ONCE);
    //如果每個分區中累計達到有2個未提交的Record,系統將停止poll數據
    builder.setMaxUncommittedOffsets(2);

    return new KafkaTridentSpoutOpaque<String,String>(builder.build());
}

[root@CentOS ~]# storm jar storm-lowlevel-1.0-SNAPSHOT.jar com.jiangzz.demo10.TridentTopologyTests --artifacts 'org.apache.storm:storm-kafka-client:1.2.3'

或者在項目中添加插件,該插件會將項目依賴的jar包都打包提交。

<plugin>
    <groupId>org.apache.maven.plugins</groupId>
    <artifactId>maven-shade-plugin</artifactId>
    <version>2.4.1</version>
    <executions>
        <execution>
            <phase>package</phase>
            <goals>
                <goal>shade</goal>
            </goals>
        </execution>
    </executions>
</plugin>

Trident API

each

該函數的作用是將流的數據一個個的傳遞給function處理,這些function分為BaseFunctionBaseFilter

BaseFunction:該函數的作用是給Tuple修改屬性。

TridentTopology topology = new TridentTopology();
Config conf = new Config();
KafkaTridentSpoutOpaque<String, String> kafkaTridentSpoutOpaque = buildKafkaTridentSpoutOpaque(conf,"topic01");
topology.newStream("kafka",kafkaTridentSpoutOpaque)

    .each(new Fields("value"), new BaseFunction() {
        @Override
        public void execute(TridentTuple tuple, TridentCollector collector) {
            String[] token = tuple.getStringByField("value").split("\\W+");
            collector.emit(new Values(token[2]));
        }
    },new Fields("sex"))
    .peek(new Consumer() {
        @Override
        public void accept(TridentTuple input) {
            System.out.println(input.getFields()+"\t"+input);
        }
    });

new LocalCluster().submitTopology("demo01",conf,topology.build());
輸入:
 1 zhangsan true 12
 2 lisi false 25
輸出:
 1 zhangsan true 12, true
 2 lisi false 25, false

BaseFilter:過濾結果,將不滿足要求的Tuple移除。

TridentTopology topology = new TridentTopology();
Config conf = new Config();
KafkaTridentSpoutOpaque<String, String> kafkaTridentSpoutOpaque = buildKafkaTridentSpoutOpaque(conf,"topic01");
topology.newStream("kafka",kafkaTridentSpoutOpaque)

    .each(new Fields("value"), new BaseFilter() {
        @Override
        public boolean isKeep(TridentTuple tuple) {
            return tuple.getStringByField("value").contains("error");
        }
    })
    .peek(new Consumer() {
        @Override
        public void accept(TridentTuple input) {
            System.out.println(input.getFields()+"\t"+input);
        }
    });

new LocalCluster().submitTopology("demo01",conf,topology.build());
>error hello world
>this is an error messgae
>this is not message
>hello error tests

Map

將Tuple轉換為某種形式的Tuple,負責將上游的Tuple轉換為新的Tuple,如果不提供Fields,則用戶不能修改原有Tuple的元素個數。

TridentTopology topology = new TridentTopology();
Config conf = new Config();
KafkaTridentSpoutOpaque<String, String> kafkaTridentSpoutOpaque = buildKafkaTridentSpoutOpaque(conf,"topic01");
topology.newStream("kafka",kafkaTridentSpoutOpaque)

    .map(new MapFunction() {
        @Override
        public Values execute(TridentTuple input) {
            String value = input.getStringByField("value");
            return new Values(value);
        }
    },new Fields("value"))
    .peek(new Consumer() {
        @Override
        public void accept(TridentTuple input) {
            System.out.println(input.getFields()+"\t"+input);
        }
    });

new LocalCluster().submitTopology("demo01",conf,topology.build());

flatMap

將一個Tuple轉換為多個Tuple。

TridentTopology topology = new TridentTopology();
Config conf = new Config();
KafkaTridentSpoutOpaque<String, String> kafkaTridentSpoutOpaque = buildKafkaTridentSpoutOpaque(conf,"topic01");
topology.newStream("kafka",kafkaTridentSpoutOpaque)
    .flatMap(new FlatMapFunction() {
        @Override
        public Iterable<Values> execute(TridentTuple input) {
            String[] tokens = input.getStringByField("value").split(" ");
            String[] hobbies = tokens[1].split(",");
            String user=tokens[0];
            List<Values> vs=new ArrayList<Values>();
            for (String hobby : hobbies) {
                vs.add(new Values(user,hobby));
            }
            return vs;
        }
    },new Fields("user","hobby"))
    .peek(new Consumer() {
        @Override
        public void accept(TridentTuple input) {
            System.out.println(input.getFields()+"\t"+input);
        }
    });

new LocalCluster().submitTopology("demo01",conf,topology.build());

project

指定需要獲取的filed

TridentTopology topology = new TridentTopology();
Config conf = new Config();
KafkaTridentSpoutOpaque<String, String> kafkaTridentSpoutOpaque = buildKafkaTridentSpoutOpaque(conf,"topic01");
topology.newStream("kafka",kafkaTridentSpoutOpaque)
    .project(new Fields("partition","offset","value"))
    .peek(new Consumer() {
        @Override
        public void accept(TridentTuple input) {
            System.out.println(input.getFields()+"\t"+input);
        }
    });

new LocalCluster().submitTopology("demo01",conf,topology.build());

partition & partitionAggregate

TridentTopology topology = new TridentTopology();
Config conf = new Config();
topology.newStream("kafka",buildKafkaTridentSpoutOpaque(conf,"topic01"))
   .parallelismHint(3)
   .flatMap(new FlatMapFunction() {
       @Override
       public Iterable<Values> execute(TridentTuple input) {
           String value = input.getStringByField("value");
           String[] words = value.split(" ");
           List<Values> vs=new ArrayList<Values>();
           for (String word : words) {
               vs.add(new Values(word,1));
           }
           return vs;
       }
   },new Fields("word","count"))
   .partition(new PartialKeyGrouping(new Fields("word")))
   .parallelismHint(4)
   .partitionAggregate(new Fields("word","count"),new WordCountAggregator(),
                       new Fields("word","count"))
   .each(new Fields("word", "count"), new BaseFilter() {
       @Override
       public boolean isKeep(TridentTuple tuple) {
           return true;
       }
   })
   .peek(new Consumer() {
       @Override
       public void accept(TridentTuple input) {
           System.out.println(input);
       }
   });

new LocalCluster().submitTopology("demo01",conf,topology.build());

WordCountAggregator計算邏輯

public static class WordCountAggregator extends BaseAggregator<Map<String,Integer>>{

    @Override
    public Map<String, Integer> init(Object batchId, TridentCollector collector) {
        return new HashMap<String,Integer>();
    }

    @Override
    public void aggregate(Map<String, Integer> val, TridentTuple tuple, TridentCollector collector) {
        System.out.println("aggregate:"+tuple+"\t"+this+"\t"+val);

        String word = tuple.getStringByField("word");
        Integer count=tuple.getIntegerByField("count");
        if(val.containsKey(word)){
            count= val.get(word)+count;
        }
        val.put(word,count);
    }

    @Override
    public void complete(Map<String, Integer> val, TridentCollector collector) {
        for (Map.Entry<String, Integer> entry : val.entrySet()) {
            collector.emit(new Values(entry.getKey(),entry.getValue()));
        }
        val.clear();
    }
}

Trident State

Trident以容錯方式管理狀態,以便在重試和失敗時狀態更新是冪等的。這使您可以推理Trident拓撲,就好像每條消息都被精確處理一次。在進行狀態更新時,可以實現各種級別的容錯。

Redis

persistentAggregate

TridentTopology topology = new TridentTopology();
Config conf = new Config();

KafkaTridentSpoutOpaque<String,String> kafkaTridentSpoutOpaque=
    KafkaSpoutUitls.buildKafkaSpoutOpaque(conf,"topic01");

JedisPoolConfig poolConfig = new JedisPoolConfig.Builder()
    .setHost("CentOS")
    .setPort(6379)
    .build();


Options<OpaqueValue> options=new Options<OpaqueValue>();
options.dataTypeDescription=new RedisDataTypeDescription(RedisDataTypeDescription.RedisDataType.HASH,"mapstate");
options.serializer=new JSONOpaqueSerializer();


topology.newStream("kakfaspout",kafkaTridentSpoutOpaque)
    .map((t)-> new Values(t.getStringByField("value")),new Fields("line"))
    .flatMap(line->{
        String[] words = line.getStringByField("line").split(" ");
        List<Values> values=new ArrayList<Values>();
        for (String word : words) {
            values.add(new Values(word));
        }
        return values;
    },new Fields("word"))
    .parallelismHint(3)
    .groupBy(new Fields("word"))
    .persistentAggregate(RedisMapState.opaque(poolConfig,options),new Fields("word"),new WordCountReducerAggregator(),null);

new LocalCluster().submitTopology("TestTridentDemo",conf,topology.build());
public static class WordCountReducerAggregator implements ReducerAggregator<Long>{

    @Override
    public Long init() {
        return 0L;
    }

    @Override
    public Long reduce(Long curr, TridentTuple tuple) {
        return curr+1;
    }
}

這里必須注意泛型不能是Integer,必須修改為Long否則重啟報錯,這算是Storm的一個目前的一個Bug

newStaticState

RedisRetriveStateFactory

public class RedisRetriveStateFactory implements StateFactory {
    @Override
    public State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) {
        return new RedisMapState();
    }
}

RedisMapState

public class RedisMapState implements State  {
    private Jedis jedis=new Jedis("CentOS",6379);
    private JSONOpaqueSerializer jsonOpaqueSerializer=new JSONOpaqueSerializer();

    @Override
    public void beginCommit(Long txid) {

    }
    @Override
    public void commit(Long txid) {

    }
    public List<Long> batchRetive(List<TridentTuple> args) {

        String[] keys=new String[args.size()];
        for(int i=0;i<args.size();i++){
            keys[i]=args.get(i).getStringByField("word");
        }
        List<Long> longs=new ArrayList<Long>(args.size());
        for (String key : keys) {
            String v=jedis.hget("mapstate",key);
            if(v!=null){
                OpaqueValue opaqueValue = jsonOpaqueSerializer.deserialize(v.getBytes());
                long l = Long.parseLong(opaqueValue.getCurr().toString());
                longs.add(l);
            }else{
                longs.add(0L);
            }
        }
        return longs;
    }
}

RedisMapStateQueryFunction

public class RedisMapStateQueryFunction implements QueryFunction<RedisMapState,Long> {
    @Override
    public List<Long> batchRetrieve(RedisMapState state, List<TridentTuple> args) {
        return state.batchRetive(args);
    }

    @Override
    public void execute(TridentTuple tuple, Long result, TridentCollector collector) {
        collector.emit(new Values(result));
    }
    @Override
    public void prepare(Map conf, TridentOperationContext context) {

    }
    @Override
    public void cleanup() {

    }
}

TridentTopologyTests

TridentTopology topology = new TridentTopology();
Config conf = new Config();
KafkaTridentSpoutOpaque<String, String> kafkaTridentSpoutOpaque = KafkaSpoutUitls.buildKafkaSpoutOpaque(conf,"topic01");

//自定義State
StateFactory stateFactory = new RedisRetriveStateFactory();
TridentState state = topology.newStaticState(stateFactory);

topology.newStream("kafka",kafkaTridentSpoutOpaque)
    .flatMap(new FlatMapFunction() {
        @Override
        public Iterable<Values> execute(TridentTuple input) {
            String[] split = input.getStringByField("value").split(" ");
            List<Values> values=new ArrayList<Values>();
            for (String s : split) {
                values.add(new Values(s));
            }
            return values;
        }
    },new Fields("word"))
    .stateQuery(state, new Fields("word"),new RedisMapStateQueryFunction(),new Fields("count")).peek(new Consumer() {
    @Override
    public void accept(TridentTuple input) {
        System.out.println(input);
    }
});

new LocalCluster().submitTopology("demo01",conf,topology.build());

Window

TridentTopology topology = new TridentTopology();
Config conf = new Config();
KafkaTridentSpoutOpaque<String, String> kafkaTridentSpoutOpaque = buildKafkaTridentSpoutOpaque(conf,"topic01");
WindowsStoreFactory wsf=new InMemoryWindowsStoreFactory();
topology.newStream("kafka",kafkaTridentSpoutOpaque)
    .project(new Fields("value"))
    .flatMap(new FlatMapFunction() {
        @Override
        public Iterable<Values> execute(TridentTuple input) {
            String[] values = input.getStringByField("value").split("\\W+");
            List<Values> vs=new ArrayList<Values>();
            for (String value : values) {
                vs.add(new Values(value,1));
            }
            return vs;
        }
    },new Fields("word","count"))
    .tumblingWindow(
        BaseWindowedBolt.Duration.seconds(10),
        wsf,
        new Fields("word","count"),
        new WordCountAggregator(),
        new Fields("word","count")
    )
    .peek(new Consumer() {
        @Override
        public void accept(TridentTuple input) {
            System.out.println(input.getFields()+"\t"+input);
        }
    });

new LocalCluster().submitTopology("demo01",conf,topology.build());

WordCountAggregator

public static class WordCountAggregator extends BaseAggregator<TridentToplogyDemo.CountState> {
    @Override
    public CountState init(Object batchId, TridentCollector tridentCollector) {

        return new CountState();
    }
    @Override
    public void aggregate(CountState state, TridentTuple tridentTuple, TridentCollector tridentCollector) {
        boolean exits = state.count.containsKey(tridentTuple.getStringByField("word"));
        int count=0;
        if(exits){
            count = state.count.get(tridentTuple.getStringByField("word"));
            count=count+1;
        }else{
            count=1;
        }
        state.count.put(tridentTuple.getStringByField("word"),count);
    }

    @Override
    public void complete(CountState state, TridentCollector tridentCollector) {
        Set<Map.Entry<String, Integer>> entries = state.count.entrySet();
        for (Map.Entry<String, Integer> entry : entries) {
            tridentCollector.emit(new Values(entry.getKey(),entry.getValue()));
        }
    }
}

Storm 集群搭建

在這里插入圖片描述

  • 時鍾同步
  • 安裝JDK
  • 安裝zk集群(正常啟動)
  • 配置主機名和IP映射關系
  • 安裝配置Storm
[root@CentOSX ~]# tar -zxf apache-storm-1.2.2.tar.gz -C /usr/

[root@CentOSX ~]# vi .bashrc 
`STORM_HOME=/usr/apache-storm-1.2.2`
HBASE_MANAGES_ZK=false
HBASE_HOME=/usr/hbase-1.2.4
HADOOP_HOME=/usr/hadoop-2.6.0
HADOOP_CLASSPATH=/root/mysql-connector-java-5.1.44.jar
JAVA_HOME=/usr/java/latest
PATH=$PATH:$JAVA_HOME/bin:$HADOOP_HOME/bin:$HADOOP_HOME/sbin:$HBASE_HOME/bin:`$STORM_HOME/bin`
CLASSPATH=.
export JAVA_HOME
export PATH
export CLASSPATH
export HADOOP_HOME
export HADOOP_CLASSPATH
export HBASE_HOME
export HBASE_MANAGES_ZK
`export STORM_HOME`
[root@CentOSX ~]# source .bashrc 
[root@CentOSX ~]# vi /usr/apache-storm-1.2.2/conf/storm.yaml 
 storm.zookeeper.servers:
     - "CentOSA"
     - "CentOSB"
     - "CentOSC"
 storm.local.dir: "/usr/apache-storm-1.2.2/storm-stage"
 nimbus.seeds: ["CentOSA","CentOSB","CentOSC"]
 supervisor.slots.ports:
     - 6700
     - 6701
     - 6702
     - 6703
 drpc.servers:
     - "CentOSA"
     - "CentOSB"
     - "CentOSC"
 storm.thrift.transport: "org.apache.storm.security.auth.plain.PlainSaslTransportPlugin"
     
  • 啟動 Storm 集群
[root@CentOSA ~]# nohup storm ui >/dev/null 2>&1 &
[root@CentOSX ~]# nohup storm nimbus >/dev/null 2>&1 &
[root@CentOSX ~]# nohup storm supervisor >/dev/null 2>&1 &
[root@CentOSX ~]# nohup storm drpc >/dev/null 2>&1 & 

更多精彩內容關注

微信公眾賬號


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM