jstorm 是阿里巴巴開源的基於storm采用Java重寫的一套分布式實時流計算框架,使用簡單,特點如下:
- 開發非常迅速: 接口簡單,容易上手,只要遵守Topology,Spout, Bolt的編程規范即可開發出一個擴展性極好的應用,底層rpc,worker之間冗余,數據分流之類的動作完全不用考慮。
- 擴展性極好:當一級處理單元速度,直接配置一下並發數,即可線性擴展性能
- 健壯:當worker失效或機器出現故障時, 自動分配新的worker替換失效worker
- 數據准確性: 可以采用Acker機制,保證數據不丟失。 如果對精度有更多一步要求,采用事務機制,保證數據准確。
為什么要選擇jstorm,而不采用twitter的storm呢?jstorm對比storm有如下優點:
- Nimbus 實現HA
- 徹底解決Storm雪崩問題:底層RPC采用netty + disruptor保證發送速度和接受速度是匹配的
- 新增supervisor、Supervisor shutdown時、提交新任務,worker數不夠時,均不自動觸發任務rebalance
- 新topology不影響現有任務,新任務無需去搶占老任務的cpu,memory,disk和net
- 減少對ZK的訪問量:去掉大量無用的watch;task的心跳時間延長一倍;Task心跳檢測無需全ZK掃描
- Worker 內部全流水線模式:Spout nextTuple和ack/fail運行在不同線程
- 性能:采用ZeroMq, 比storm快30%;采用netty時, 和storm快10%,並且穩定非常多
總之,Jstorm 比Storm 更穩定,功能更強大,更快。而且Storm上跑的程序可以一行代碼不變運行在Jstorm上,零成本,推薦所有使用storm的兄弟們搭建個jstorm集群緩過來。
jstorm 集群的搭建過程,可以參考另一篇文章:分布式實時日志系統(一)環境搭建之 Jstorm 集群搭建過程/Jstorm集群一鍵安裝部署
jstorm 開發實例
上面也說過了,jstorm使用起來很簡單,遵循Topology,Spout, Bolt的編程規范就可以,在下面的例子中將一步步完成這些。例子也很簡單,在spout中不斷產生自增的int數組,bolt接受到數值后打印出日志,並插入到hbase中。(如果沒有hbase環境的,這一步可以繼續注釋掉,不用打開,只看到跑到日志打印的地方就好了)
spout 的開發只需要繼承BaseRichSpout
,實現繼承的方法即可:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55
|
public class TestSpout extends BaseRichSpout { private static final Logger LOGGER = LoggerFactory.getLogger(TestSpout.class); static AtomicInteger sAtomicInteger = new AtomicInteger(0); static AtomicInteger pendNum = new AtomicInteger(0); private int sqnum; SpoutOutputCollector collector;
@Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { sqnum = sAtomicInteger.incrementAndGet(); this.collector = collector; }
@Override public void nextTuple() { while (true) { int a = pendNum.incrementAndGet(); LOGGER.info(String.format("spount %d,pendNum %d", sqnum, a)); this.collector.emit(new Values("xxxxx:"+a));
try { Thread.sleep(10000); } catch (InterruptedException e) { e.printStackTrace(); } } }
@Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("log"));
}
/** * 啟用 ack 機制,詳情參考:https://github.com/alibaba/jstorm/wiki/Ack-%E6%9C%BA%E5%88%B6 * @param msgId */ @Override public void ack(Object msgId) { super.ack(msgId); }
/** * 消息處理失敗后需要自己處理 * @param msgId */ @Override public void fail(Object msgId) { super.fail(msgId); LOGGER.info("ack fail,msgId"+msgId); }
}
|
bolt 同理,繼承 BaseRichBolt
實現其相應的方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
|
public class TestBolt extends BaseRichBolt {
private static final Logger LOGGER = CustomerLoggerFactory.LOGGER(TestBolt.class); OutputCollector collector;
@Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector = collector; }
@Override public void execute(Tuple input) { String xx = input.getString(0); LOGGER.info(String.format("receive from spout ,num is : %d", xx));
|
topology 的開發同理:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
|
public class TestTopology implements ILogTopology { @Override public void start(Properties properties) throws AlreadyAliveException, InvalidTopologyException, InterruptedException, IOException {
TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("testspout", new TestSpout(), 1); builder.setBolt("testbolt", new TestBolt(), 2).shuffleGrouping("testspout");
Config conf = ConfigUtils.getStormConfig(properties); conf.setNumAckers(1);
StormSubmitter.submitTopology("testtopology", conf, builder.createTopology()); System.out.println("storm cluster will start"); }
}
|
經過上面的三個步驟,一個最簡單的jstorm應用就開發完成了,接下來通過編譯、打包完后,生成jar文件 jstorm-hbase-demo-0.1.jar
,將此jar文件在jstorm集群的nimbus機器上提交即可:jstorm jar jstorm-hbase-demo-0.1.jar com.xirong.demo.BootStrap config.properties
。
demo運行效果
從jstorm集群的監控圖賞可以看到,對應topology的運行情況:

bolt 的執行效率,及ack數量,占用機器內存等:

源碼已經上傳到github上面,喜歡研究的同學,可以fork后自己修改練習。地址為:https://github.com/xirong/jstorm-hbase-demo
源碼中使用到的Phoenix組件,hbase上層的中間件,使得開發人員可以使用sql的方式來對hbase進行相應的操作,感興趣的可以閱讀:使用Phoenix通過sql語句更新操作hbase數據 ,此文中介紹了如何安裝及使用。
另外想對hbase的有所了解的可以查看:列式存儲hbase系統架構學習
原文http://www.ixirong.com/2015/07/18/develop-the-first-jstorm-demo/