Storm安裝與實驗


接上一篇Kafka的安裝與實驗:

http://www.cnblogs.com/charlesblc/p/6046023.html

還有再上一篇Flume的安裝與實驗:

http://www.cnblogs.com/charlesblc/p/6046023.html

 

Storm的安裝可以參考這篇:

http://shiyanjun.cn/archives/934.html

 

有1.0后的版本,和0.x的版本,最后為了穩妥,還是下載了0.10的版本。

http://apache.fayea.com/storm/apache-storm-0.10.2/apache-storm-0.10.2.tar.gz

拷貝到 /home/work/data/installed 解壓。

在conf里面storm_env.ini 可以配置 JAVA_HOME,但是現在PATH已經配了JAVA的路徑,所以這里可以先不配。

在conf的storm.yaml里面配置(用05做主,06做從,兩台機器的配置一樣):

storm.zookeeper.servers:
     - "slave1.Hadoop"
#     - "server2"
#
# nimbus.host: "nimbus"
nimbus.host: "master.Hadoop"

直接啟動命令會報錯:

$ bin/storm nimbus
Need python version > 2.6

我把jumbo的bin目錄也加到 bashrc文件了,然后還需要改一下 bin下面的storm.py文件頭:

#!python

還是不行。需要把storm腳本文件里面的PYTHON改一下,因為那里的PYTHON指向錯了

PYTHON=python

現在05機器好了,06機器貌似沒裝python。

$ nohup bin/storm nimbus &

$ nohup bin/storm ui &

然后 http://[host05]:8080/index.html 可以看到UI界面(我還沒研究到怎么改端口)

storm.yaml里面添加 ui.port=8489 報錯。

那就先8080吧。

界面類似這樣:

 

然后,就需要寫一個Java程序來處理日志啦。程序是參考下面的寫的:

http://blog.csdn.net/ymh198816/article/details/51998085

 

[此處是代碼]

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.myapp.logparser</groupId>
    <artifactId>LogParser</artifactId>
    <version>1.0-SNAPSHOT</version>

    <dependencies>
        <dependency>
            <!-- https://mvnrepository.com/artifact/redis.clients/jedis -->
            <groupId>redis.clients</groupId>
            <artifactId>jedis</artifactId>
            <version>2.9.0</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.storm/storm-core -->
        <dependency>
            <groupId>org.apache.storm</groupId>
            <artifactId>storm-core</artifactId>
            <version>0.10.2</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.storm/storm-kafka -->
        <dependency>
            <groupId>org.apache.storm</groupId>
            <artifactId>storm-kafka</artifactId>
            <version>0.10.2</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka_2.11 -->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.11</artifactId>
            <version>0.10.1.0</version>
            <exclusions>
                <exclusion>
                    <groupId>org.apache.zookeeper</groupId>
                    <artifactId>zookeeper</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>log4j</groupId>
                    <artifactId>log4j</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

    </dependencies>


</project>

SkuBean

package com.myapp.logparser;

/**
 * Created by baidu on 16/11/10.
 */
public class SkuBean {
}

OrderBean

package com.myapp.logparser;

import java.util.ArrayList;
import java.util.Date;

/**
 * Created by baidu on 16/11/10.
 */

public class OrderBean {
    Date createTime = null;
    String number = "";
    String paymentNumber = "";
    Date paymentDate = null;
    String merchantName = "";
    ArrayList<SkuBean> skuGroup = null;
    float totalPrice = 0;
    float discount = 0;
    float paymentPrice = 0;

    public Date getCreateTime() {
        return createTime;
    }
    public void setCreateTime(Date createTime) {
        this.createTime = createTime;
    }
    public String getNumber() {
        return number;
    }
    public void setNumber(String number) {
        this.number = number;
    }
    public String getPaymentNumber() {
        return paymentNumber;
    }
    public void setPaymentNumber(String paymentNumber) {
        this.paymentNumber = paymentNumber;
    }
    public Date getPaymentDate() {
        return paymentDate;
    }
    public void setPaymentDate(Date paymentDate) {
        this.paymentDate = paymentDate;
    }
    public String getMerchantName() {
        return merchantName;
    }
    public void setMerchantName(String merchantName) {
        this.merchantName = merchantName;
    }
    public ArrayList<SkuBean> getSkuGroup() {
        return skuGroup;
    }
    public void setSkuGroup(ArrayList<SkuBean> skuGroup) {
        this.skuGroup = skuGroup;
    }
    public float getTotalPrice() {
        return totalPrice;
    }
    public void setTotalPrice(float totalPrice) {
        this.totalPrice = totalPrice;
    }
    public float getDiscount() {
        return discount;
    }
    public void setDiscount(float discount) {
        this.discount = discount;
    }
    public float getPaymentPrice() {
        return paymentPrice;
    }
    public void setPaymentPrice(float paymentPrice) {
        this.paymentPrice = paymentPrice;
    }


}

LogInfoHandler

package com.myapp.logparser;

import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

/**
 * Created by baidu on 16/11/10.
 */
public class LogInfoHandler {
    SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

    public OrderBean getOrderBean(String orderInfo) {
        OrderBean orderBean = new OrderBean();

        Pattern pattern = Pattern.compile("orderNumber:.+");
        Matcher matcher = pattern.matcher(orderInfo);
        if (matcher.find()) {
            String orderInfoStr = matcher.group(0);
            String[] orderInfoGroup = orderInfoStr.trim().split("\\|");

            String orderNum = (orderInfoGroup[0].split(":"))[1].trim();
            orderBean.setNumber(orderNum);

            String orderCreateTime = orderInfoGroup[1].split(":")[1].trim();
            try {
                orderBean.setCreateTime(simpleDateFormat.parse(orderCreateTime));
            } catch (ParseException e) {
                e.printStackTrace();
            }

            String merchantName = orderInfoGroup[4].split(":")[1].trim();
            orderBean.setMerchantName(merchantName);

            String orderPriceInfo = orderInfoGroup[6].split("totalPrice:")[1].trim().split(" ")[0];
            orderBean.setTotalPrice(Float.parseFloat(orderPriceInfo));

        }
        return orderBean;
    }
}

SalesBolt

package com.myapp.logparser;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Tuple;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;

import java.util.Map;

/**
 * Created by baidu on 16/11/10.
 */
public class SalesBolt extends BaseRichBolt{
    OutputCollector collector;
    LogInfoHandler logInfoHandler;
    JedisPool jedisPool;

    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
        this.collector = outputCollector;
        this.logInfoHandler = new LogInfoHandler();
        this.jedisPool = new JedisPool(new JedisPoolConfig(), "10.117.host", 8379, 60000, "[password]");
    }

    public void execute(Tuple tuple) {
        String orderInfo = tuple.getString(0);
        OrderBean orderBean = logInfoHandler.getOrderBean(orderInfo);

        Jedis jedis = jedisPool.getResource();
        jedis.zincrby("orderAnalysis:topSales", orderBean.getTotalPrice(), orderBean.getMerchantName());

    }

    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {

    }
}

LogParser

package com.myapp.logparser;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.AuthorizationException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.spout.SchemeAsMultiScheme;
import backtype.storm.topology.TopologyBuilder;
import storm.kafka.*;

import java.util.UUID;

/**
 * Created by baidu on 16/11/10.
 */
public class LogParser {
    private static String topicName = "test1";
    // Storm的UI上能看到 zookeeper.root "/storm"
    private static String zkRoot = "/storm";

    public static void main(String[] args) {
        BrokerHosts hosts = new ZkHosts("slave1.Hadoop:2181");

        SpoutConfig spoutConfig = new SpoutConfig(hosts, topicName, zkRoot, UUID.randomUUID().toString());
        spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
        KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);

        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("kafkaSpout", kafkaSpout);
        builder.setBolt("SalesBolt", new SalesBolt(), 2).shuffleGrouping("kafkaSpout");

        Config config = new Config();
        config.setDebug(true);

        if (args != null && args.length > 0) {
            config.setNumWorkers(1);
            try {
                StormSubmitter.submitTopologyWithProgressBar(args[0], config, builder.createTopology());
            } catch (InvalidTopologyException e) {
                e.printStackTrace();
            } catch (AlreadyAliveException e) {
                e.printStackTrace();
            } catch (AuthorizationException e) {
                e.printStackTrace();
            }
        }
        else {
            config.setMaxSpoutPending(3);
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("logparser", config, builder.createTopology());
        }

    }
}

 

代碼寫完之后, File->Project Structure -> Artifacts -> + -> Jar -> From modules with dependancy

選擇main函數之后,選擇 "copy to output directory..." ,然后把最后的java改成 resources

然后 Build Artifacts。

 

把 LogParser_jar 這個目錄拷貝到05機器上,新建一個目錄,放在里面:

/home/work/data/installed/apache-storm-0.10.2/topologies

 

然后在storm主目錄,調用命令:

bin/storm jar topologies/LogParser_jar/LogParser.jar  com.myapp.logparser.LogParser

報錯:
aused by: java.lang.RuntimeException: java.io.IOException: Found multiple defaults.yaml resources. You're probably bundling the Storm jars with your topology jar. [jar:file:/home/work/data/installed/apache-storm-0.10.2/lib/storm-core-0.10.2.jar!/defaults.yaml, jar:file:/home/work/data/installed/apache-storm-0.10.2/topologies/LogParser_jar/storm-core-0.10.2.jar!/defaults.yaml]

所以應該是把Jar包里面的這些jar刪掉

 然后把storm-core.jar刪掉之后,再啟動,貌似成功了沒有報錯。然后用" java -jar LogGenerator.jar"生成日志,過一會兒(大概數秒到數十秒不等),能看到Storm的界面有顯示:

575550 [Thread-18-kafkaSpout] INFO  b.s.d.executor - Processing received message FOR  TUPLE: source: __system:-1, stream: __tick, id: {}, [30]
605550 [Thread-18-kafkaSpout] INFO  b.s.d.executor - Processing received message FOR  TUPLE: source: __system:-1, stream: __tick, id: {}, [30]
605551 [Thread-18-kafkaSpout] INFO  b.s.d.executor - SPOUT Failing -8193749749191662772: {:stream "default", :values ["[INFO][main][2016-11-10 13:04:52][com.comany.log.generator.LogGenerator] - orderNumber: 852921478754292872 | orderDate: 2016-11-10 13:04:52 | paymentNumber: Paypal-98410280 | paymentDate: 2016-11-10 13:04:52 | merchantName: ???? | sku: [ skuName: ????? skuNum: 2 skuCode: fad8a2ugxv skuPrice: 299.0 totalSkuPrice: 598.0; skuName: ????? skuNum: 1 skuCode: dg1hcn5x99 skuPrice: 2000.0 totalSkuPrice: 2000.0; skuName: ??????? skuNum: 2 skuCode: 3kknxo8cgi skuPrice: 899.0 totalSkuPrice: 1798.0; ] | price: [ totalPrice: 4396.0 discount: 20.0 paymentPrice: 4376.0 ]"]} REASON: TIMEOUT MSG-ID: storm.kafka.PartitionManager$KafkaMessageId@1d483e3b

然后連接02的redis從庫,或者04的redis主庫,可以看到有記錄進入:

> zrange orderAnalysis:topSales 0 -1 withscores
 1) "\xe6\x9a\xb4\xe9\x9b\xaa\xe5\x85\xac\xe5\x8f\xb8"
 2) "3295"
 3) "\xe5\x93\x88\xe6\xaf\x92\xe5\xa6\x87"
 4) "4296"
 5) "Apple"
 6) "4399"
 7) "\xe8\xb7\x91\xe7\x94\xb7"
 8) "4693"
 9) "Oracle"
10) "6097"
11) "BMW"
12) "7486"
13) "\xe5\xae\x88\xe6\x9c\x9b\xe5\x85\x88\xe5\xb3\xb0"
14) "7693"
15) "CSDN"
16) "9191"
17) "\xe4\xbc\x98\xe8\xa1\xa3\xe5\xba\x93"
18) "15389"
19) "\xe5\x92\x95\xe5\x99\x9c\xe5\xa4\xa7\xe5\xa4\xa7"
20) "18386"

可是不知道為什么,有新的日志的時候,這些數字不再刷新。

改一下代碼,按照出現次數來進行統計。首先需要把原來的zset刪除:

用 zrem key member 每次只能刪一個數據

需要用 zremrangebyrank orderAnalysis:topSales 0 -1 來刪除全部

然后代碼里面這里改了一下:

jedis.zincrby("orderAnalysis:topSales", 1, orderBean.getMerchantName());

 logger.info("zincrby orderAnalysis:topSales 1 " + orderBean.getMerchantName());

不再看銷售金額,只看數量。
並且加了日志。

重新生成代碼Artifact,上傳。只需要上傳 LogParser.jar,其他的jar不需要重新上傳了。

然后重新起命令:

bin/storm jar topologies/LogParser_jar/LogParser.jar  com.myapp.logparser.LogParser

沒有之前那些parse錯誤了。然后能夠看到redis操作

65623 [Thread-10-SalesBolt] INFO  c.m.l.LogParser - zincrby orderAnalysis:topSales 1 ????

Redis結果,看到這樣的:

> zrange orderAnalysis:topSales 0 -1 withscores
 1) "Apple"
 2) "1"
 3) "BMW"
 4) "1"
 5) "\xe5\x93\x88\xe6\xaf\x92\xe5\xa6\x87"
 6) "1"
 7) "\xe6\x9a\xb4\xe9\x9b\xaa\xe5\x85\xac\xe5\x8f\xb8"
 8) "1"
 9) "\xe5\x92\x95\xe5\x99\x9c\xe5\xa4\xa7\xe5\xa4\xa7"
10) "2"

放到后台重新起一下:

$nohup bin/storm jar topologies/LogParser_jar/LogParser.jar  com.myapp.logparser.LogParser & 

$ tail -f nohup.out | grep zincrby
7641 [Thread-10-SalesBolt] INFO  c.m.l.LogParser - zincrby orderAnalysis:topSales 1 Apple
7641 [Thread-14-SalesBolt] INFO  c.m.l.LogParser - zincrby orderAnalysis:topSales 1 ????
7642 [Thread-14-SalesBolt] INFO  c.m.l.LogParser - zincrby orderAnalysis:topSales 1 ???

用egrep命令來跟蹤兩種日志:

tail -f nohup.out | egrep 'orderNumber|zincrby'

再生成一下日志:

$ java -jar LogGenerator.jar

過了幾秒鍾,

$ tail -f nohup.out | egrep 'orderNumber|zincrby'

365652 [Thread-18-kafkaSpout] INFO  b.s.d.executor - SPOUT Failing -2362934606036969397: {:stream "default", :values ["[INFO][main][2016-11-10 13:04:33][com.comany.log.generator.LogGenerator] - orderNumber: 078361478754273911 | orderDate: 2016-11-10 13:04:33 | paymentNumber: Paypal-60291370 | paymentDate: 2016-11-10 13:04:33 | merchantName: Benz | sku: [ skuName: ????? skuNum: 1 skuCode: v3ep48x772 skuPrice: 899.0 totalSkuPrice: 899.0; skuName: ???? skuNum: 1 skuCode: znugr14dlk skuPrice: 299.0 totalSkuPrice: 299.0; skuName: ????? skuNum: 1 skuCode: uzt1xub809 skuPrice: 2000.0 totalSkuPrice: 2000.0; ] | price: [ totalPrice: 3198.0 discount: 50.0 paymentPrice: 3148.0 ]"]} REASON: TIMEOUT MSG-ID: storm.kafka.PartitionManager$KafkaMessageId@1998028f

365653 [Thread-18-kafkaSpout] INFO  b.s.d.task - Emitting: kafkaSpout default [[INFO][main][2016-11-10 13:04:33][com.comany.log.generator.LogGenerator] - orderNumber: 040381478754273912 | orderDate: 2016-11-10 13:04:33 | paymentNumber: Paypal-74845178 | paymentDate: 2016-11-10 13:04:33 | merchantName: ?? | sku: [ skuName: ???? skuNum: 3 skuCode: c0o41xcdzq skuPrice: 699.0 totalSkuPrice: 2097.0; skuName: ????? skuNum: 2 skuCode: apuu8oj59j skuPrice: 899.0 totalSkuPrice: 1798.0; skuName: ???? skuNum: 2 skuCode: rydrrdxuuo skuPrice: 399.0 totalSkuPrice: 798.0; ] | price: [ totalPrice: 4693.0 discount: 50.0 paymentPrice: 4643.0 ]]

365653 [Thread-18-kafkaSpout] INFO  b.s.d.executor - TRANSFERING tuple TASK: 1 TUPLE: source: kafkaSpout:4, stream: default, id: {-7961243545913627632=165572358204464334}, [[INFO][main][2016-11-10 13:04:33][com.comany.log.generator.LogGenerator] - orderNumber: 040381478754273912 | orderDate: 2016-11-10 13:04:33 | paymentNumber: Paypal-74845178 | paymentDate: 2016-11-10 13:04:33 | merchantName: ?? | sku: [ skuName: ???? skuNum: 3 skuCode: c0o41xcdzq skuPrice: 699.0 totalSkuPrice: 2097.0; skuName: ????? skuNum: 2 skuCode: apuu8oj59j skuPrice: 899.0 totalSkuPrice: 1798.0; skuName: ???? skuNum: 2 skuCode: rydrrdxuuo skuPrice: 399.0 totalSkuPrice: 798.0; ] | price: [ totalPrice: 4693.0 discount: 50.0 paymentPrice: 4643.0 ]]

365654 [Thread-18-kafkaSpout] INFO  b.s.d.executor - TRANSFERING tuple TASK: 2 TUPLE: source: kafkaSpout:4, stream: default, id: {2338210682144559994=-169238346679811883}, [[INFO][main][2016-11-10 13:04:33][com.comany.log.generator.LogGenerator] - orderNumber: 298261478754273912 | orderDate: 2016-11-10 13:04:33 | paymentNumber: Paypal-01751994 | paymentDate: 2016-11-10 13:04:33 | merchantName: ???? | sku: [ skuName: ????? skuNum: 2 skuCode: wzz1in2mrr skuPrice: 399.0 totalSkuPrice: 798.0; skuName: ????? skuNum: 2 skuCode: up2xye2faj skuPrice: 299.0 totalSkuPrice: 598.0; skuName: ???? skuNum: 2 skuCode: dsaay7ilko skuPrice: 2000.0 totalSkuPrice: 4000.0; ] | price: [ totalPrice: 5396.0 discount: 50.0 paymentPrice: 5346.0 ]]

有各種日志,好像看到了Failing 和 Timeout,但是不明所以。。

全部清空,重新走,發現沒有增加Redis的值。

看來還把 06 的 supervisor也啟動吧。。

nohup ./bin/storm supervisor & 

Running: /home/work/.jumbo/opt/sun-java8/bin/java -server -Ddaemon.name=supervisor -Dstorm.options= -Dstorm.home=/home/work/data/installed/apache-storm-0.10.2 -Dstorm.log.dir=/home/work/data/installed/apache-storm-0.10.2/logs -Djava.library.path=/usr/local/lib:/opt/local/lib:/usr/lib -Dstorm.conf.file= -cp /home/work/data/installed/apache-storm-0.10.2/lib/asm-4.0.jar:/home/work/data/installed/apache-storm-0.10.2/lib/hadoop-auth-2.4.0.jar:/home/work/data/installed/apache-storm-0.10.2/lib/minlog-1.2.jar:/home/work/data/installed/apache-storm-0.10.2/lib/reflectasm-1.07-shaded.jar:/home/work/data/installed/apache-storm-0.10.2/lib/log4j-over-slf4j-1.6.6.jar:/home/work/data/installed/apache-storm-0.10.2/lib/storm-core-0.10.2.jar:/home/work/data/installed/apache-storm-0.10.2/lib/disruptor-2.10.4.jar:/home/work/data/installed/apache-storm-0.10.2/lib/kryo-2.21.jar:/home/work/data/installed/apache-storm-0.10.2/lib/servlet-api-2.5.jar:/home/work/data/installed/apache-storm-0.10.2/lib/log4j-core-2.1.jar:/home/work/data/installed/apache-storm-0.10.2/lib/log4j-slf4j-impl-2.1.jar:/home/work/data/installed/apache-storm-0.10.2/lib/slf4j-api-1.7.7.jar:/home/work/data/installed/apache-storm-0.10.2/lib/clojure-1.6.0.jar:/home/work/data/installed/apache-storm-0.10.2/lib/log4j-api-2.1.jar:/home/work/data/installed/apache-storm-0.10.2/conf -Xmx256m -Dlogfile.name=supervisor.log -Dlog4j.configurationFile=/home/work/data/installed/apache-storm-0.10.2/log4j2/cluster.xml backtype.storm.daemon.supervisor

這時候,Storm 的 UI才能看到Supervisor的信息。

 

同時,我發現我的提交始終有問題,因為少了一個參數,所以一直用的是LocalCluster運行的。。。(參考這篇文章:link

需要像下面這樣提交才行:

$ bin/storm jar topologies/LogParser_jar/LogParser.jar  com.myapp.logparser.LogParser logparser

SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/home/work/data/installed/apache-storm-0.10.2/lib/log4j-slf4j-impl-2.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/home/work/data/installed/apache-storm-0.10.2/topologies/LogParser_jar/log4j-slf4j-impl-2.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
Running: /home/work/.jumbo/opt/sun-java8/bin/java -client -Ddaemon.name= -Dstorm.options= -Dstorm.home=/home/work/data/installed/apache-storm-0.10.2 -Dstorm.log.dir=/home/work/data/installed/apache-storm-0.10.2/logs -Djava.library.path=/usr/local/lib:/opt/local/lib:/usr/lib -Dstorm.conf.file= -cp /home/work/data/installed/apache-storm-0.10.2/lib/servlet-api-2.5.jar:/home/work/data/installed/apache-storm-0.10.2/lib/asm-4.0.jar:/home/work/data/installed/apache-storm-0.10.2/lib/storm-core-0.10.2.jar:/home/work/data/installed/apache-storm-0.10.2/lib/kryo-2.21.jar:/home/work/data/installed/apache-storm-0.10.2/lib/log4j-api-2.1.jar:/home/work/data/installed/apache-storm-0.10.2/lib/disruptor-2.10.4.jar:/home/work/data/installed/apache-storm-0.10.2/lib/log4j-slf4j-impl-2.1.jar:/home/work/data/installed/apache-storm-0.10.2/lib/slf4j-api-1.7.7.jar:/home/work/data/installed/apache-storm-0.10.2/lib/log4j-over-slf4j-1.6.6.jar:/home/work/data/installed/apache-storm-0.10.2/lib/hadoop-auth-2.4.0.jar:/home/work/data/installed/apache-storm-0.10.2/lib/reflectasm-1.07-shaded.jar:/home/work/data/installed/apache-storm-0.10.2/lib/clojure-1.6.0.jar:/home/work/data/installed/apache-storm-0.10.2/lib/log4j-core-2.1.jar:/home/work/data/installed/apache-storm-0.10.2/lib/minlog-1.2.jar:topologies/LogParser_jar/LogParser.jar:/home/work/data/installed/apache-storm-0.10.2/conf:/home/work/data/installed/apache-storm-0.10.2/bin -Dstorm.jar=topologies/LogParser_jar/LogParser.jar com.myapp.logparser.LogParser logparser
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/home/work/data/installed/apache-storm-0.10.2/lib/log4j-slf4j-impl-2.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/home/work/data/installed/apache-storm-0.10.2/topologies/LogParser_jar/log4j-slf4j-impl-2.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
585  [main] INFO  b.s.u.Utils - Using defaults.yaml from resources
647  [main] INFO  b.s.u.Utils - Using storm.yaml from resources
679  [main] INFO  b.s.u.Utils - Using defaults.yaml from resources
686  [main] INFO  b.s.u.Utils - Using storm.yaml from resources
687  [main] INFO  b.s.StormSubmitter - Generated ZooKeeper secret payload for MD5-digest: -7390434021614848370:-4867446214048727896
689  [main] INFO  b.s.s.a.AuthUtils - Got AutoCreds []
698  [main] INFO  b.s.u.StormBoundedExponentialBackoffRetry - The baseSleepTimeMs [2000] the maxSleepTimeMs [60000] the maxRetries [5]
714  [main] INFO  b.s.u.StormBoundedExponentialBackoffRetry - The baseSleepTimeMs [2000] the maxSleepTimeMs [60000] the maxRetries [5]
722  [main] INFO  b.s.StormSubmitter - Uploading topology jar topologies/LogParser_jar/LogParser.jar to assigned location: /home/work/data/installed/apache-storm-0.10.2/storm-local/nimbus/inbox/stormjar-6ce814c2-f0f9-41f3-8e2e-710eebb33718.jar
Start uploading file 'topologies/LogParser_jar/LogParser.jar' to '/home/work/data/installed/apache-storm-0.10.2/storm-local/nimbus/inbox/stormjar-6ce814c2-f0f9-41f3-8e2e-710eebb33718.jar' (6947 bytes)
[==================================================] 6947 / 6947
File 'topologies/LogParser_jar/LogParser.jar' uploaded to '/home/work/data/installed/apache-storm-0.10.2/storm-local/nimbus/inbox/stormjar-6ce814c2-f0f9-41f3-8e2e-710eebb33718.jar' (6947 bytes)
743  [main] INFO  b.s.StormSubmitter - Successfully uploaded topology jar to assigned location: /home/work/data/installed/apache-storm-0.10.2/storm-local/nimbus/inbox/stormjar-6ce814c2-f0f9-41f3-8e2e-710eebb33718.jar
743  [main] INFO  b.s.u.StormBoundedExponentialBackoffRetry - The baseSleepTimeMs [2000] the maxSleepTimeMs [60000] the maxRetries [5]
743  [main] INFO  b.s.StormSubmitter - Submitting topology logparser in distributed mode with conf {"storm.zookeeper.topology.auth.scheme":"digest","storm.zookeeper.topology.auth.payload":"-7390434021614848370:-4867446214048727896","topology.workers":1,"topology.debug":true}
871  [main] INFO  b.s.StormSubmitter - Finished submitting topology: logparser

然后在Storm的UI上面就可以看到監控內容了:

 

然后再看看Redis的內容:

沒有內容。。。

還是感覺數據沒過來。看看Kafka的UI監控吧。

還是沒看出什么,但是看到Executor都是在slave上面,然后到Slave上面去看日志,發現了class加載的error。想到了用到了Redis Pool什么的包。會不會是包不全。

網上一查,貌似要打成一個包。試一下。打了一個40多M的大包,上傳,重新命令:

bin/storm jar topologies/LogParser.jar  com.myapp.logparser.LogParser logparser

開始沒有把storm-core去掉,會沖突。要到打包的地方,把storm-core去掉。

 

然后重新上傳,成功。到Supervisor的機器,能夠看到處理的日志:

cd /home/work/data/installed/apache-storm-0.10.2/logs/ ; tail -f logparser-2-1478780903-worker-6700.log*

 

Redis也能夠有一些輸出了:

zrange orderAnalysis:topSales 0 -1 withscores
 1) "Apple"
 2) "1"
 3) "Oracle"
 4) "1"
 5) "\xe5\x93\x88\xe6\xaf\x92\xe5\xa6\x87"
 6) "1"
 7) "\xe6\x9a\xb4\xe9\x9b\xaa\xe5\x85\xac\xe5\x8f\xb8"
 8) "1"
 9) "\xe8\xb7\x91\xe7\x94\xb7"
10) "1"
11) "BMW"
12) "2"
13) "CSDN"
14) "2"
15) "\xe4\xbc\x98\xe8\xa1\xa3\xe5\xba\x93"
16) "2"
17) "\xe5\xae\x88\xe6\x9c\x9b\xe5\x85\x88\xe5\xb3\xb0"
18) "2"
19) "\xe5\x92\x95\xe5\x99\x9c\xe5\xa4\xa7\xe5\xa4\xa7"
20) "3"

 

無語了。全部重啟。

還是不行。

 

把原來的topic都刪了。當然還需要把conf里面的 delete.topic.enable改成true.

bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic test1

 

 還是只能添加幾個,然后就TMD超時了。。。

 

把代碼里面的worker增大,然后把debug去掉,重新部署。 

 

還是不行,而且好像也沒反應了。

先這樣吧。 

 

開始以為是30秒超時太短,把超時改成10分鍾,也不行。 看來是Bolt的確太慢了。

//config.setMessageTimeoutSecs(600);

【終於查清楚原因了】

是JedisPool的連接沒有釋放。

做了改進,需要把從JedisPool里面的連接釋放掉。

SalesBolt里面

try {
            jedis = jedisPool.getResource();
            jedis.zincrby("orderAnalysis:topSales", 1, orderBean.getMerchantName());
            logger.info("zincrby orderAnalysis:topSales 1 " + orderBean.getMerchantName());
        }
        finally {
            if (jedis != null) {
                jedis.close();
            }
}

 

現在正常能走通啦。但是不知道為什么,Redis的計數持續在增加,是在log沒有更新的情況下。

> zrange orderAnalysis:topSales 0 -1 withscores
 1) "Sumsam"
 2) "66"
 3) "Maserati"
 4) "88"
 5) "Nissan"
 6) "88"
 7) "Oracle"
 8) "88"
 9) "\xe5\xbf\xab\xe4\xb9\x90\xe5\xae\x9d\xe8\xb4\x9d"
10) "110"
11) "\xe5\xa4\xa9\xe7\x8c\xab"
12) "115"
13) "\xe8\xb7\xaf\xe6\x98\x93\xe6\x96\xaf\xe5\xa8\x81\xe7\x99\xbb"
14) "119"
15) "Java"
16) "132"
17) "\xe4\xbc\x98\xe8\xa1\xa3\xe5\xba\x93"
18) "141"
19) "Apple"
20) "154"
21) "BMW"
22) "154"
23) "Storm"
24) "164"
25) "\xe8\xb7\x91\xe7\x94\xb7"
26) "167"
27) "\xe5\x92\x95\xe5\x99\x9c\xe5\xa4\xa7\xe5\xa4\xa7"
28) "172"
29) "\xe5\xae\x88\xe6\x9c\x9b\xe5\x85\x88\xe5\xb3\xb0"
30) "181"
31) "CSDN"
32) "185"
33) "\xe6\x9a\xb4\xe9\x9b\xaa\xe5\x85\xac\xe5\x8f\xb8"
34) "185"
35) "Benz"
36) "224"
37) "\xe6\xb7\x98\xe5\xae\x9d"
38) "224"
39) "\xe5\x93\x88\xe6\xaf\x92\xe5\xa6\x87"
40) "246"

決定把Kafka清掉試試。

把原來的topic都刪了。當然還需要把conf里面的 delete.topic.enable改成true.

bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic test1

 的確Redis清空了。

然后重新跑一遍:

$ java -jar LogGenerator.jar 

過一會兒,Redis里面就出結果啦:
> zrange orderAnalysis:topSales 0 -1 withscores
 1) "CSDN"
 2) "2"
 3) "Maserati"
 4) "2"
 5) "\xe5\x92\x95\xe5\x99\x9c\xe5\xa4\xa7\xe5\xa4\xa7"
 6) "2"
 7) "\xe6\xb7\x98\xe5\xae\x9d"
 8) "2"
 9) "\xe8\xb7\x91\xe7\x94\xb7"
10) "2"
11) "Benz"
12) "4"
13) "\xe4\xbc\x98\xe8\xa1\xa3\xe5\xba\x93"
14) "4"
15) "\xe8\xb7\xaf\xe6\x98\x93\xe6\x96\xaf\xe5\xa8\x81\xe7\x99\xbb"
16) "4"

多了一遍,看來的確是重復處理了。

查了一下,貌似要對tuple進行顯式ack,加了這段代碼:

try {
            jedis = jedisPool.getResource();
            jedis.zincrby("orderAnalysis:topSales", 1, orderBean.getMerchantName());
            logger.info("zincrby orderAnalysis:topSales 1 " + orderBean.getMerchantName());
            collector.ack(tuple);
        }
        finally {
            if (jedis != null) {
                jedis.close();
            }
        }

另外對於中文的問題,發現從tuple出來的就是亂碼,所以加上下面這一段:

logger.info("get log: " + orderInfo);
        try {
            orderInfo = new String(orderInfo.getBytes(), "UTF-8");
            logger.info("get new log: " + orderInfo);
        } catch (UnsupportedEncodingException e) {
            //e.printStackTrace();
            logger.warn(e.toString());
        }

中文貌似還是不行。但是重復處理的問題解決了。

 

中文,后面單獨拎出來再看吧:

http://www.cnblogs.com/charlesblc/p/6055441.html

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM