jstorm開發指南-寫個簡單的jstorm應用


jstorm開發指南-寫個簡單的jstorm應用

jstorm 是阿里巴巴開源的基於storm采用Java重寫的一套分布式實時流計算框架,使用簡單,特點如下:

  • 開發非常迅速: 接口簡單,容易上手,只要遵守Topology,Spout, Bolt的編程規范即可開發出一個擴展性極好的應用,底層rpc,worker之間冗余,數據分流之類的動作完全不用考慮。
  • 擴展性極好:當一級處理單元速度,直接配置一下並發數,即可線性擴展性能
  • 健壯:當worker失效或機器出現故障時, 自動分配新的worker替換失效worker
  • 數據准確性: 可以采用Acker機制,保證數據不丟失。 如果對精度有更多一步要求,采用事務機制,保證數據准確。

為什么要選擇jstorm,而不采用twitter的storm呢?jstorm對比storm有如下優點:

  • Nimbus 實現HA
  • 徹底解決Storm雪崩問題:底層RPC采用netty + disruptor保證發送速度和接受速度是匹配的
  • 新增supervisor、Supervisor shutdown時、提交新任務,worker數不夠時,均不自動觸發任務rebalance
  • 新topology不影響現有任務,新任務無需去搶占老任務的cpu,memory,disk和net
  • 減少對ZK的訪問量:去掉大量無用的watch;task的心跳時間延長一倍;Task心跳檢測無需全ZK掃描
  • Worker 內部全流水線模式:Spout nextTuple和ack/fail運行在不同線程
  • 性能:采用ZeroMq, 比storm快30%;采用netty時, 和storm快10%,並且穩定非常多

總之,Jstorm 比Storm 更穩定,功能更強大,更快。而且Storm上跑的程序可以一行代碼不變運行在Jstorm上,零成本,推薦所有使用storm的兄弟們搭建個jstorm集群緩過來。

jstorm 集群的搭建過程,可以參考另一篇文章:分布式實時日志系統(一)環境搭建之 Jstorm 集群搭建過程/Jstorm集群一鍵安裝部署

jstorm 開發實例

上面也說過了,jstorm使用起來很簡單,遵循Topology,Spout, Bolt的編程規范就可以,在下面的例子中將一步步完成這些。例子也很簡單,在spout中不斷產生自增的int數組,bolt接受到數值后打印出日志,並插入到hbase中。(如果沒有hbase環境的,這一步可以繼續注釋掉,不用打開,只看到跑到日志打印的地方就好了)

spout 的開發只需要繼承BaseRichSpout,實現繼承的方法即可:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
public class TestSpout extends BaseRichSpout {
private static final Logger LOGGER = LoggerFactory.getLogger(TestSpout.class);
static AtomicInteger sAtomicInteger = new AtomicInteger(0);
static AtomicInteger pendNum = new AtomicInteger(0);
private int sqnum;
SpoutOutputCollector collector;

@Override
public void open(Map conf, TopologyContext context,
SpoutOutputCollector collector) {
sqnum = sAtomicInteger.incrementAndGet();
this.collector = collector;
}

@Override
public void nextTuple() {
while (true) {
int a = pendNum.incrementAndGet();
LOGGER.info(String.format("spount %d,pendNum %d", sqnum, a));
this.collector.emit(new Values("xxxxx:"+a));

try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("log"));

}

/**
* 啟用 ack 機制,詳情參考:https://github.com/alibaba/jstorm/wiki/Ack-%E6%9C%BA%E5%88%B6
* @param msgId
*/
@Override
public void ack(Object msgId) {
super.ack(msgId);
}

/**
* 消息處理失敗后需要自己處理
* @param msgId
*/
@Override
public void fail(Object msgId) {
super.fail(msgId);
LOGGER.info("ack fail,msgId"+msgId);
}

}

bolt 同理,繼承 BaseRichBolt 實現其相應的方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class TestBolt extends BaseRichBolt {

private static final Logger LOGGER = CustomerLoggerFactory.LOGGER(TestBolt.class);
OutputCollector collector;

@Override
public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
this.collector = collector;
}

@Override
public void execute(Tuple input) {
String xx = input.getString(0);
LOGGER.info(String.format("receive from spout ,num is : %d", xx));

// 發送ack信息告知spout 完成處理的消息 ,如果下面的hbase的注釋代碼打開了,則必須等到插入hbase完畢后才能發送ack信息,這段代碼需要刪除
this.collector.ack(input);
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {

}
}

topology 的開發同理:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class TestTopology implements ILogTopology {
@Override
public void start(Properties properties) throws AlreadyAliveException, InvalidTopologyException, InterruptedException, IOException {

TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("testspout", new TestSpout(), 1);
builder.setBolt("testbolt", new TestBolt(), 2).shuffleGrouping("testspout");

Config conf = ConfigUtils.getStormConfig(properties);
conf.setNumAckers(1);

StormSubmitter.submitTopology("testtopology", conf, builder.createTopology());
System.out.println("storm cluster will start");
}

}

經過上面的三個步驟,一個最簡單的jstorm應用就開發完成了,接下來通過編譯、打包完后,生成jar文件 jstorm-hbase-demo-0.1.jar ,將此jar文件在jstorm集群的nimbus機器上提交即可:jstorm jar jstorm-hbase-demo-0.1.jar com.xirong.demo.BootStrap config.properties

demo運行效果

從jstorm集群的監控圖賞可以看到,對應topology的運行情況:

http://static.ixirong.com/pic/jstormdemo/jstorm-topology.png

bolt 的執行效率,及ack數量,占用機器內存等:

http://static.ixirong.com/pic/jstormdemo/jstorm-testbolt.png

源碼已經上傳到github上面,喜歡研究的同學,可以fork后自己修改練習。地址為:https://github.com/xirong/jstorm-hbase-demo
源碼中使用到的Phoenix組件,hbase上層的中間件,使得開發人員可以使用sql的方式來對hbase進行相應的操作,感興趣的可以閱讀:使用Phoenix通過sql語句更新操作hbase數據 ,此文中介紹了如何安裝及使用。
另外想對hbase的有所了解的可以查看:列式存儲hbase系統架構學習

 

原文http://www.ixirong.com/2015/07/18/develop-the-first-jstorm-demo/


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM