接上一篇Kafka的安裝與實驗:
http://www.cnblogs.com/charlesblc/p/6046023.html
還有再上一篇Flume的安裝與實驗:
http://www.cnblogs.com/charlesblc/p/6046023.html
Storm的安裝可以參考這篇:
http://shiyanjun.cn/archives/934.html
有1.0后的版本,和0.x的版本,最后為了穩妥,還是下載了0.10的版本。
http://apache.fayea.com/storm/apache-storm-0.10.2/apache-storm-0.10.2.tar.gz
拷貝到 /home/work/data/installed 解壓。
在conf里面storm_env.ini 可以配置 JAVA_HOME,但是現在PATH已經配了JAVA的路徑,所以這里可以先不配。
在conf的storm.yaml里面配置(用05做主,06做從,兩台機器的配置一樣):
storm.zookeeper.servers: - "slave1.Hadoop" # - "server2" # # nimbus.host: "nimbus" nimbus.host: "master.Hadoop"
直接啟動命令會報錯:
$ bin/storm nimbus
Need python version > 2.6
我把jumbo的bin目錄也加到 bashrc文件了,然后還需要改一下 bin下面的storm.py文件頭:
#!python
還是不行。需要把storm腳本文件里面的PYTHON改一下,因為那里的PYTHON指向錯了
PYTHON=python
現在05機器好了,06機器貌似沒裝python。
$ nohup bin/storm nimbus &
$ nohup bin/storm ui &
然后 http://[host05]:8080/index.html 可以看到UI界面(我還沒研究到怎么改端口)
storm.yaml里面添加 ui.port=8489 報錯。
那就先8080吧。
界面類似這樣:
然后,就需要寫一個Java程序來處理日志啦。程序是參考下面的寫的:
http://blog.csdn.net/ymh198816/article/details/51998085
[此處是代碼]
pom.xml
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.myapp.logparser</groupId> <artifactId>LogParser</artifactId> <version>1.0-SNAPSHOT</version> <dependencies> <dependency> <!-- https://mvnrepository.com/artifact/redis.clients/jedis --> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> <version>2.9.0</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.storm/storm-core --> <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-core</artifactId> <version>0.10.2</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.storm/storm-kafka --> <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-kafka</artifactId> <version>0.10.2</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka_2.11 --> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.11</artifactId> <version>0.10.1.0</version> <exclusions> <exclusion> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> </exclusion> <exclusion> <groupId>log4j</groupId> <artifactId>log4j</artifactId> </exclusion> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> </exclusions> </dependency> </dependencies> </project>
SkuBean
package com.myapp.logparser; /** * Created by baidu on 16/11/10. */ public class SkuBean { }
OrderBean
package com.myapp.logparser; import java.util.ArrayList; import java.util.Date; /** * Created by baidu on 16/11/10. */ public class OrderBean { Date createTime = null; String number = ""; String paymentNumber = ""; Date paymentDate = null; String merchantName = ""; ArrayList<SkuBean> skuGroup = null; float totalPrice = 0; float discount = 0; float paymentPrice = 0; public Date getCreateTime() { return createTime; } public void setCreateTime(Date createTime) { this.createTime = createTime; } public String getNumber() { return number; } public void setNumber(String number) { this.number = number; } public String getPaymentNumber() { return paymentNumber; } public void setPaymentNumber(String paymentNumber) { this.paymentNumber = paymentNumber; } public Date getPaymentDate() { return paymentDate; } public void setPaymentDate(Date paymentDate) { this.paymentDate = paymentDate; } public String getMerchantName() { return merchantName; } public void setMerchantName(String merchantName) { this.merchantName = merchantName; } public ArrayList<SkuBean> getSkuGroup() { return skuGroup; } public void setSkuGroup(ArrayList<SkuBean> skuGroup) { this.skuGroup = skuGroup; } public float getTotalPrice() { return totalPrice; } public void setTotalPrice(float totalPrice) { this.totalPrice = totalPrice; } public float getDiscount() { return discount; } public void setDiscount(float discount) { this.discount = discount; } public float getPaymentPrice() { return paymentPrice; } public void setPaymentPrice(float paymentPrice) { this.paymentPrice = paymentPrice; } }
LogInfoHandler
package com.myapp.logparser; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.regex.Matcher; import java.util.regex.Pattern; /** * Created by baidu on 16/11/10. */ public class LogInfoHandler { SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); public OrderBean getOrderBean(String orderInfo) { OrderBean orderBean = new OrderBean(); Pattern pattern = Pattern.compile("orderNumber:.+"); Matcher matcher = pattern.matcher(orderInfo); if (matcher.find()) { String orderInfoStr = matcher.group(0); String[] orderInfoGroup = orderInfoStr.trim().split("\\|"); String orderNum = (orderInfoGroup[0].split(":"))[1].trim(); orderBean.setNumber(orderNum); String orderCreateTime = orderInfoGroup[1].split(":")[1].trim(); try { orderBean.setCreateTime(simpleDateFormat.parse(orderCreateTime)); } catch (ParseException e) { e.printStackTrace(); } String merchantName = orderInfoGroup[4].split(":")[1].trim(); orderBean.setMerchantName(merchantName); String orderPriceInfo = orderInfoGroup[6].split("totalPrice:")[1].trim().split(" ")[0]; orderBean.setTotalPrice(Float.parseFloat(orderPriceInfo)); } return orderBean; } }
SalesBolt
package com.myapp.logparser; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichBolt; import backtype.storm.tuple.Tuple; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPool; import redis.clients.jedis.JedisPoolConfig; import java.util.Map; /** * Created by baidu on 16/11/10. */ public class SalesBolt extends BaseRichBolt{ OutputCollector collector; LogInfoHandler logInfoHandler; JedisPool jedisPool; public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) { this.collector = outputCollector; this.logInfoHandler = new LogInfoHandler(); this.jedisPool = new JedisPool(new JedisPoolConfig(), "10.117.host", 8379, 60000, "[password]"); } public void execute(Tuple tuple) { String orderInfo = tuple.getString(0); OrderBean orderBean = logInfoHandler.getOrderBean(orderInfo); Jedis jedis = jedisPool.getResource(); jedis.zincrby("orderAnalysis:topSales", orderBean.getTotalPrice(), orderBean.getMerchantName()); } public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { } }
LogParser
package com.myapp.logparser; import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.StormSubmitter; import backtype.storm.generated.AlreadyAliveException; import backtype.storm.generated.AuthorizationException; import backtype.storm.generated.InvalidTopologyException; import backtype.storm.spout.SchemeAsMultiScheme; import backtype.storm.topology.TopologyBuilder; import storm.kafka.*; import java.util.UUID; /** * Created by baidu on 16/11/10. */ public class LogParser { private static String topicName = "test1"; // Storm的UI上能看到 zookeeper.root "/storm" private static String zkRoot = "/storm"; public static void main(String[] args) { BrokerHosts hosts = new ZkHosts("slave1.Hadoop:2181"); SpoutConfig spoutConfig = new SpoutConfig(hosts, topicName, zkRoot, UUID.randomUUID().toString()); spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme()); KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig); TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("kafkaSpout", kafkaSpout); builder.setBolt("SalesBolt", new SalesBolt(), 2).shuffleGrouping("kafkaSpout"); Config config = new Config(); config.setDebug(true); if (args != null && args.length > 0) { config.setNumWorkers(1); try { StormSubmitter.submitTopologyWithProgressBar(args[0], config, builder.createTopology()); } catch (InvalidTopologyException e) { e.printStackTrace(); } catch (AlreadyAliveException e) { e.printStackTrace(); } catch (AuthorizationException e) { e.printStackTrace(); } } else { config.setMaxSpoutPending(3); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("logparser", config, builder.createTopology()); } } }
代碼寫完之后, File->Project Structure -> Artifacts -> + -> Jar -> From modules with dependancy
選擇main函數之后,選擇 "copy to output directory..." ,然后把最后的java改成 resources
然后 Build Artifacts。
把 LogParser_jar 這個目錄拷貝到05機器上,新建一個目錄,放在里面:
/home/work/data/installed/apache-storm-0.10.2/topologies
然后在storm主目錄,調用命令:
bin/storm jar topologies/LogParser_jar/LogParser.jar com.myapp.logparser.LogParser 報錯: aused by: java.lang.RuntimeException: java.io.IOException: Found multiple defaults.yaml resources. You're probably bundling the Storm jars with your topology jar. [jar:file:/home/work/data/installed/apache-storm-0.10.2/lib/storm-core-0.10.2.jar!/defaults.yaml, jar:file:/home/work/data/installed/apache-storm-0.10.2/topologies/LogParser_jar/storm-core-0.10.2.jar!/defaults.yaml] 所以應該是把Jar包里面的這些jar刪掉
然后把storm-core.jar刪掉之后,再啟動,貌似成功了沒有報錯。然后用" java -jar LogGenerator.jar"生成日志,過一會兒(大概數秒到數十秒不等),能看到Storm的界面有顯示:
575550 [Thread-18-kafkaSpout] INFO b.s.d.executor - Processing received message FOR TUPLE: source: __system:-1, stream: __tick, id: {}, [30] 605550 [Thread-18-kafkaSpout] INFO b.s.d.executor - Processing received message FOR TUPLE: source: __system:-1, stream: __tick, id: {}, [30] 605551 [Thread-18-kafkaSpout] INFO b.s.d.executor - SPOUT Failing -8193749749191662772: {:stream "default", :values ["[INFO][main][2016-11-10 13:04:52][com.comany.log.generator.LogGenerator] - orderNumber: 852921478754292872 | orderDate: 2016-11-10 13:04:52 | paymentNumber: Paypal-98410280 | paymentDate: 2016-11-10 13:04:52 | merchantName: ???? | sku: [ skuName: ????? skuNum: 2 skuCode: fad8a2ugxv skuPrice: 299.0 totalSkuPrice: 598.0; skuName: ????? skuNum: 1 skuCode: dg1hcn5x99 skuPrice: 2000.0 totalSkuPrice: 2000.0; skuName: ??????? skuNum: 2 skuCode: 3kknxo8cgi skuPrice: 899.0 totalSkuPrice: 1798.0; ] | price: [ totalPrice: 4396.0 discount: 20.0 paymentPrice: 4376.0 ]"]} REASON: TIMEOUT MSG-ID: storm.kafka.PartitionManager$KafkaMessageId@1d483e3b
然后連接02的redis從庫,或者04的redis主庫,可以看到有記錄進入:
> zrange orderAnalysis:topSales 0 -1 withscores
1) "\xe6\x9a\xb4\xe9\x9b\xaa\xe5\x85\xac\xe5\x8f\xb8"
2) "3295"
3) "\xe5\x93\x88\xe6\xaf\x92\xe5\xa6\x87"
4) "4296"
5) "Apple"
6) "4399"
7) "\xe8\xb7\x91\xe7\x94\xb7"
8) "4693"
9) "Oracle"
10) "6097"
11) "BMW"
12) "7486"
13) "\xe5\xae\x88\xe6\x9c\x9b\xe5\x85\x88\xe5\xb3\xb0"
14) "7693"
15) "CSDN"
16) "9191"
17) "\xe4\xbc\x98\xe8\xa1\xa3\xe5\xba\x93"
18) "15389"
19) "\xe5\x92\x95\xe5\x99\x9c\xe5\xa4\xa7\xe5\xa4\xa7"
20) "18386"
可是不知道為什么,有新的日志的時候,這些數字不再刷新。
改一下代碼,按照出現次數來進行統計。首先需要把原來的zset刪除:
用 zrem key member 每次只能刪一個數據
需要用 zremrangebyrank orderAnalysis:topSales 0 -1 來刪除全部
然后代碼里面這里改了一下:
jedis.zincrby("orderAnalysis:topSales", 1, orderBean.getMerchantName()); logger.info("zincrby orderAnalysis:topSales 1 " + orderBean.getMerchantName()); 不再看銷售金額,只看數量。
並且加了日志。
重新生成代碼Artifact,上傳。只需要上傳 LogParser.jar,其他的jar不需要重新上傳了。
然后重新起命令:
bin/storm jar topologies/LogParser_jar/LogParser.jar com.myapp.logparser.LogParser
沒有之前那些parse錯誤了。然后能夠看到redis操作
65623 [Thread-10-SalesBolt] INFO c.m.l.LogParser - zincrby orderAnalysis:topSales 1 ????
Redis結果,看到這樣的:
> zrange orderAnalysis:topSales 0 -1 withscores
1) "Apple"
2) "1"
3) "BMW"
4) "1"
5) "\xe5\x93\x88\xe6\xaf\x92\xe5\xa6\x87"
6) "1"
7) "\xe6\x9a\xb4\xe9\x9b\xaa\xe5\x85\xac\xe5\x8f\xb8"
8) "1"
9) "\xe5\x92\x95\xe5\x99\x9c\xe5\xa4\xa7\xe5\xa4\xa7"
10) "2"
放到后台重新起一下:
$nohup bin/storm jar topologies/LogParser_jar/LogParser.jar com.myapp.logparser.LogParser & $ tail -f nohup.out | grep zincrby 7641 [Thread-10-SalesBolt] INFO c.m.l.LogParser - zincrby orderAnalysis:topSales 1 Apple 7641 [Thread-14-SalesBolt] INFO c.m.l.LogParser - zincrby orderAnalysis:topSales 1 ???? 7642 [Thread-14-SalesBolt] INFO c.m.l.LogParser - zincrby orderAnalysis:topSales 1 ???
用egrep命令來跟蹤兩種日志:
tail -f nohup.out | egrep 'orderNumber|zincrby'
再生成一下日志:
$ java -jar LogGenerator.jar 過了幾秒鍾, $ tail -f nohup.out | egrep 'orderNumber|zincrby' 365652 [Thread-18-kafkaSpout] INFO b.s.d.executor - SPOUT Failing -2362934606036969397: {:stream "default", :values ["[INFO][main][2016-11-10 13:04:33][com.comany.log.generator.LogGenerator] - orderNumber: 078361478754273911 | orderDate: 2016-11-10 13:04:33 | paymentNumber: Paypal-60291370 | paymentDate: 2016-11-10 13:04:33 | merchantName: Benz | sku: [ skuName: ????? skuNum: 1 skuCode: v3ep48x772 skuPrice: 899.0 totalSkuPrice: 899.0; skuName: ???? skuNum: 1 skuCode: znugr14dlk skuPrice: 299.0 totalSkuPrice: 299.0; skuName: ????? skuNum: 1 skuCode: uzt1xub809 skuPrice: 2000.0 totalSkuPrice: 2000.0; ] | price: [ totalPrice: 3198.0 discount: 50.0 paymentPrice: 3148.0 ]"]} REASON: TIMEOUT MSG-ID: storm.kafka.PartitionManager$KafkaMessageId@1998028f 365653 [Thread-18-kafkaSpout] INFO b.s.d.task - Emitting: kafkaSpout default [[INFO][main][2016-11-10 13:04:33][com.comany.log.generator.LogGenerator] - orderNumber: 040381478754273912 | orderDate: 2016-11-10 13:04:33 | paymentNumber: Paypal-74845178 | paymentDate: 2016-11-10 13:04:33 | merchantName: ?? | sku: [ skuName: ???? skuNum: 3 skuCode: c0o41xcdzq skuPrice: 699.0 totalSkuPrice: 2097.0; skuName: ????? skuNum: 2 skuCode: apuu8oj59j skuPrice: 899.0 totalSkuPrice: 1798.0; skuName: ???? skuNum: 2 skuCode: rydrrdxuuo skuPrice: 399.0 totalSkuPrice: 798.0; ] | price: [ totalPrice: 4693.0 discount: 50.0 paymentPrice: 4643.0 ]] 365653 [Thread-18-kafkaSpout] INFO b.s.d.executor - TRANSFERING tuple TASK: 1 TUPLE: source: kafkaSpout:4, stream: default, id: {-7961243545913627632=165572358204464334}, [[INFO][main][2016-11-10 13:04:33][com.comany.log.generator.LogGenerator] - orderNumber: 040381478754273912 | orderDate: 2016-11-10 13:04:33 | paymentNumber: Paypal-74845178 | paymentDate: 2016-11-10 13:04:33 | merchantName: ?? | sku: [ skuName: ???? skuNum: 3 skuCode: c0o41xcdzq skuPrice: 699.0 totalSkuPrice: 2097.0; skuName: ????? skuNum: 2 skuCode: apuu8oj59j skuPrice: 899.0 totalSkuPrice: 1798.0; skuName: ???? skuNum: 2 skuCode: rydrrdxuuo skuPrice: 399.0 totalSkuPrice: 798.0; ] | price: [ totalPrice: 4693.0 discount: 50.0 paymentPrice: 4643.0 ]] 365654 [Thread-18-kafkaSpout] INFO b.s.d.executor - TRANSFERING tuple TASK: 2 TUPLE: source: kafkaSpout:4, stream: default, id: {2338210682144559994=-169238346679811883}, [[INFO][main][2016-11-10 13:04:33][com.comany.log.generator.LogGenerator] - orderNumber: 298261478754273912 | orderDate: 2016-11-10 13:04:33 | paymentNumber: Paypal-01751994 | paymentDate: 2016-11-10 13:04:33 | merchantName: ???? | sku: [ skuName: ????? skuNum: 2 skuCode: wzz1in2mrr skuPrice: 399.0 totalSkuPrice: 798.0; skuName: ????? skuNum: 2 skuCode: up2xye2faj skuPrice: 299.0 totalSkuPrice: 598.0; skuName: ???? skuNum: 2 skuCode: dsaay7ilko skuPrice: 2000.0 totalSkuPrice: 4000.0; ] | price: [ totalPrice: 5396.0 discount: 50.0 paymentPrice: 5346.0 ]] 有各種日志,好像看到了Failing 和 Timeout,但是不明所以。。
全部清空,重新走,發現沒有增加Redis的值。
看來還把 06 的 supervisor也啟動吧。。
nohup ./bin/storm supervisor &
Running: /home/work/.jumbo/opt/sun-java8/bin/java -server -Ddaemon.name=supervisor -Dstorm.options= -Dstorm.home=/home/work/data/installed/apache-storm-0.10.2 -Dstorm.log.dir=/home/work/data/installed/apache-storm-0.10.2/logs -Djava.library.path=/usr/local/lib:/opt/local/lib:/usr/lib -Dstorm.conf.file= -cp /home/work/data/installed/apache-storm-0.10.2/lib/asm-4.0.jar:/home/work/data/installed/apache-storm-0.10.2/lib/hadoop-auth-2.4.0.jar:/home/work/data/installed/apache-storm-0.10.2/lib/minlog-1.2.jar:/home/work/data/installed/apache-storm-0.10.2/lib/reflectasm-1.07-shaded.jar:/home/work/data/installed/apache-storm-0.10.2/lib/log4j-over-slf4j-1.6.6.jar:/home/work/data/installed/apache-storm-0.10.2/lib/storm-core-0.10.2.jar:/home/work/data/installed/apache-storm-0.10.2/lib/disruptor-2.10.4.jar:/home/work/data/installed/apache-storm-0.10.2/lib/kryo-2.21.jar:/home/work/data/installed/apache-storm-0.10.2/lib/servlet-api-2.5.jar:/home/work/data/installed/apache-storm-0.10.2/lib/log4j-core-2.1.jar:/home/work/data/installed/apache-storm-0.10.2/lib/log4j-slf4j-impl-2.1.jar:/home/work/data/installed/apache-storm-0.10.2/lib/slf4j-api-1.7.7.jar:/home/work/data/installed/apache-storm-0.10.2/lib/clojure-1.6.0.jar:/home/work/data/installed/apache-storm-0.10.2/lib/log4j-api-2.1.jar:/home/work/data/installed/apache-storm-0.10.2/conf -Xmx256m -Dlogfile.name=supervisor.log -Dlog4j.configurationFile=/home/work/data/installed/apache-storm-0.10.2/log4j2/cluster.xml backtype.storm.daemon.supervisor
這時候,Storm 的 UI才能看到Supervisor的信息。
同時,我發現我的提交始終有問題,因為少了一個參數,所以一直用的是LocalCluster運行的。。。(參考這篇文章:link)
需要像下面這樣提交才行:
$ bin/storm jar topologies/LogParser_jar/LogParser.jar com.myapp.logparser.LogParser logparser SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/home/work/data/installed/apache-storm-0.10.2/lib/log4j-slf4j-impl-2.1.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/home/work/data/installed/apache-storm-0.10.2/topologies/LogParser_jar/log4j-slf4j-impl-2.1.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory] Running: /home/work/.jumbo/opt/sun-java8/bin/java -client -Ddaemon.name= -Dstorm.options= -Dstorm.home=/home/work/data/installed/apache-storm-0.10.2 -Dstorm.log.dir=/home/work/data/installed/apache-storm-0.10.2/logs -Djava.library.path=/usr/local/lib:/opt/local/lib:/usr/lib -Dstorm.conf.file= -cp /home/work/data/installed/apache-storm-0.10.2/lib/servlet-api-2.5.jar:/home/work/data/installed/apache-storm-0.10.2/lib/asm-4.0.jar:/home/work/data/installed/apache-storm-0.10.2/lib/storm-core-0.10.2.jar:/home/work/data/installed/apache-storm-0.10.2/lib/kryo-2.21.jar:/home/work/data/installed/apache-storm-0.10.2/lib/log4j-api-2.1.jar:/home/work/data/installed/apache-storm-0.10.2/lib/disruptor-2.10.4.jar:/home/work/data/installed/apache-storm-0.10.2/lib/log4j-slf4j-impl-2.1.jar:/home/work/data/installed/apache-storm-0.10.2/lib/slf4j-api-1.7.7.jar:/home/work/data/installed/apache-storm-0.10.2/lib/log4j-over-slf4j-1.6.6.jar:/home/work/data/installed/apache-storm-0.10.2/lib/hadoop-auth-2.4.0.jar:/home/work/data/installed/apache-storm-0.10.2/lib/reflectasm-1.07-shaded.jar:/home/work/data/installed/apache-storm-0.10.2/lib/clojure-1.6.0.jar:/home/work/data/installed/apache-storm-0.10.2/lib/log4j-core-2.1.jar:/home/work/data/installed/apache-storm-0.10.2/lib/minlog-1.2.jar:topologies/LogParser_jar/LogParser.jar:/home/work/data/installed/apache-storm-0.10.2/conf:/home/work/data/installed/apache-storm-0.10.2/bin -Dstorm.jar=topologies/LogParser_jar/LogParser.jar com.myapp.logparser.LogParser logparser SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/home/work/data/installed/apache-storm-0.10.2/lib/log4j-slf4j-impl-2.1.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/home/work/data/installed/apache-storm-0.10.2/topologies/LogParser_jar/log4j-slf4j-impl-2.1.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory] 585 [main] INFO b.s.u.Utils - Using defaults.yaml from resources 647 [main] INFO b.s.u.Utils - Using storm.yaml from resources 679 [main] INFO b.s.u.Utils - Using defaults.yaml from resources 686 [main] INFO b.s.u.Utils - Using storm.yaml from resources 687 [main] INFO b.s.StormSubmitter - Generated ZooKeeper secret payload for MD5-digest: -7390434021614848370:-4867446214048727896 689 [main] INFO b.s.s.a.AuthUtils - Got AutoCreds [] 698 [main] INFO b.s.u.StormBoundedExponentialBackoffRetry - The baseSleepTimeMs [2000] the maxSleepTimeMs [60000] the maxRetries [5] 714 [main] INFO b.s.u.StormBoundedExponentialBackoffRetry - The baseSleepTimeMs [2000] the maxSleepTimeMs [60000] the maxRetries [5] 722 [main] INFO b.s.StormSubmitter - Uploading topology jar topologies/LogParser_jar/LogParser.jar to assigned location: /home/work/data/installed/apache-storm-0.10.2/storm-local/nimbus/inbox/stormjar-6ce814c2-f0f9-41f3-8e2e-710eebb33718.jar Start uploading file 'topologies/LogParser_jar/LogParser.jar' to '/home/work/data/installed/apache-storm-0.10.2/storm-local/nimbus/inbox/stormjar-6ce814c2-f0f9-41f3-8e2e-710eebb33718.jar' (6947 bytes) [==================================================] 6947 / 6947 File 'topologies/LogParser_jar/LogParser.jar' uploaded to '/home/work/data/installed/apache-storm-0.10.2/storm-local/nimbus/inbox/stormjar-6ce814c2-f0f9-41f3-8e2e-710eebb33718.jar' (6947 bytes) 743 [main] INFO b.s.StormSubmitter - Successfully uploaded topology jar to assigned location: /home/work/data/installed/apache-storm-0.10.2/storm-local/nimbus/inbox/stormjar-6ce814c2-f0f9-41f3-8e2e-710eebb33718.jar 743 [main] INFO b.s.u.StormBoundedExponentialBackoffRetry - The baseSleepTimeMs [2000] the maxSleepTimeMs [60000] the maxRetries [5] 743 [main] INFO b.s.StormSubmitter - Submitting topology logparser in distributed mode with conf {"storm.zookeeper.topology.auth.scheme":"digest","storm.zookeeper.topology.auth.payload":"-7390434021614848370:-4867446214048727896","topology.workers":1,"topology.debug":true} 871 [main] INFO b.s.StormSubmitter - Finished submitting topology: logparser
然后在Storm的UI上面就可以看到監控內容了:
然后再看看Redis的內容:
沒有內容。。。
還是感覺數據沒過來。看看Kafka的UI監控吧。
還是沒看出什么,但是看到Executor都是在slave上面,然后到Slave上面去看日志,發現了class加載的error。想到了用到了Redis Pool什么的包。會不會是包不全。
網上一查,貌似要打成一個包。試一下。打了一個40多M的大包,上傳,重新命令:
bin/storm jar topologies/LogParser.jar com.myapp.logparser.LogParser logparser
開始沒有把storm-core去掉,會沖突。要到打包的地方,把storm-core去掉。
然后重新上傳,成功。到Supervisor的機器,能夠看到處理的日志:
cd /home/work/data/installed/apache-storm-0.10.2/logs/ ; tail -f logparser-2-1478780903-worker-6700.log*
Redis也能夠有一些輸出了:
zrange orderAnalysis:topSales 0 -1 withscores
1) "Apple"
2) "1"
3) "Oracle"
4) "1"
5) "\xe5\x93\x88\xe6\xaf\x92\xe5\xa6\x87"
6) "1"
7) "\xe6\x9a\xb4\xe9\x9b\xaa\xe5\x85\xac\xe5\x8f\xb8"
8) "1"
9) "\xe8\xb7\x91\xe7\x94\xb7"
10) "1"
11) "BMW"
12) "2"
13) "CSDN"
14) "2"
15) "\xe4\xbc\x98\xe8\xa1\xa3\xe5\xba\x93"
16) "2"
17) "\xe5\xae\x88\xe6\x9c\x9b\xe5\x85\x88\xe5\xb3\xb0"
18) "2"
19) "\xe5\x92\x95\xe5\x99\x9c\xe5\xa4\xa7\xe5\xa4\xa7"
20) "3"
無語了。全部重啟。
還是不行。
把原來的topic都刪了。當然還需要把conf里面的 delete.topic.enable改成true.
bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic test1
還是只能添加幾個,然后就TMD超時了。。。
把代碼里面的worker增大,然后把debug去掉,重新部署。
還是不行,而且好像也沒反應了。
先這樣吧。
開始以為是30秒超時太短,把超時改成10分鍾,也不行。 看來是Bolt的確太慢了。
//config.setMessageTimeoutSecs(600);
【終於查清楚原因了】
是JedisPool的連接沒有釋放。
做了改進,需要把從JedisPool里面的連接釋放掉。
SalesBolt里面 try { jedis = jedisPool.getResource(); jedis.zincrby("orderAnalysis:topSales", 1, orderBean.getMerchantName()); logger.info("zincrby orderAnalysis:topSales 1 " + orderBean.getMerchantName()); } finally { if (jedis != null) { jedis.close(); } }
現在正常能走通啦。但是不知道為什么,Redis的計數持續在增加,是在log沒有更新的情況下。
> zrange orderAnalysis:topSales 0 -1 withscores 1) "Sumsam" 2) "66" 3) "Maserati" 4) "88" 5) "Nissan" 6) "88" 7) "Oracle" 8) "88" 9) "\xe5\xbf\xab\xe4\xb9\x90\xe5\xae\x9d\xe8\xb4\x9d" 10) "110" 11) "\xe5\xa4\xa9\xe7\x8c\xab" 12) "115" 13) "\xe8\xb7\xaf\xe6\x98\x93\xe6\x96\xaf\xe5\xa8\x81\xe7\x99\xbb" 14) "119" 15) "Java" 16) "132" 17) "\xe4\xbc\x98\xe8\xa1\xa3\xe5\xba\x93" 18) "141" 19) "Apple" 20) "154" 21) "BMW" 22) "154" 23) "Storm" 24) "164" 25) "\xe8\xb7\x91\xe7\x94\xb7" 26) "167" 27) "\xe5\x92\x95\xe5\x99\x9c\xe5\xa4\xa7\xe5\xa4\xa7" 28) "172" 29) "\xe5\xae\x88\xe6\x9c\x9b\xe5\x85\x88\xe5\xb3\xb0" 30) "181" 31) "CSDN" 32) "185" 33) "\xe6\x9a\xb4\xe9\x9b\xaa\xe5\x85\xac\xe5\x8f\xb8" 34) "185" 35) "Benz" 36) "224" 37) "\xe6\xb7\x98\xe5\xae\x9d" 38) "224" 39) "\xe5\x93\x88\xe6\xaf\x92\xe5\xa6\x87" 40) "246"
決定把Kafka清掉試試。
把原來的topic都刪了。當然還需要把conf里面的 delete.topic.enable改成true.
bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic test1
的確Redis清空了。
然后重新跑一遍:
$ java -jar LogGenerator.jar 過一會兒,Redis里面就出結果啦: > zrange orderAnalysis:topSales 0 -1 withscores 1) "CSDN" 2) "2" 3) "Maserati" 4) "2" 5) "\xe5\x92\x95\xe5\x99\x9c\xe5\xa4\xa7\xe5\xa4\xa7" 6) "2" 7) "\xe6\xb7\x98\xe5\xae\x9d" 8) "2" 9) "\xe8\xb7\x91\xe7\x94\xb7" 10) "2" 11) "Benz" 12) "4" 13) "\xe4\xbc\x98\xe8\xa1\xa3\xe5\xba\x93" 14) "4" 15) "\xe8\xb7\xaf\xe6\x98\x93\xe6\x96\xaf\xe5\xa8\x81\xe7\x99\xbb" 16) "4" 多了一遍,看來的確是重復處理了。
查了一下,貌似要對tuple進行顯式ack,加了這段代碼:
try { jedis = jedisPool.getResource(); jedis.zincrby("orderAnalysis:topSales", 1, orderBean.getMerchantName()); logger.info("zincrby orderAnalysis:topSales 1 " + orderBean.getMerchantName()); collector.ack(tuple); } finally { if (jedis != null) { jedis.close(); } }
另外對於中文的問題,發現從tuple出來的就是亂碼,所以加上下面這一段:
logger.info("get log: " + orderInfo); try { orderInfo = new String(orderInfo.getBytes(), "UTF-8"); logger.info("get new log: " + orderInfo); } catch (UnsupportedEncodingException e) { //e.printStackTrace(); logger.warn(e.toString()); }
中文貌似還是不行。但是重復處理的問題解決了。
中文,后面單獨拎出來再看吧:
http://www.cnblogs.com/charlesblc/p/6055441.html