最近在研究jstorm,看了很多資料,所以也想分享出來一些。
安裝部署
zeromq
簡單快速的傳輸層框架,安裝如下:
wget http://download.zeromq.org/zeromq-2.1.7.tar.gz
tar zxf zeromq-2.1.7.tar.gz
cd zeromq-2.1.7
./configure
make
sudo make install
sudo ldconfig
jzmq
應該是zmq的java包吧,安裝步驟如下:
git clone git://github.com/nathanmarz/jzmq.git
cd jzmq
./autogen.sh
./configure
make
make install
zookeeper
針對大型分布式系統提供配置維護、名字服務、分布式同步、組服務等,可以保證:
- 順序性:客戶端的更新請求都會被順序處理
- 原子性:更新操作要不成功,要不失敗
- 一致性:客戶端不論連接到那個服務端,展現給它的都是同一個視圖
- 可靠性:更新會被持久化
- 實時性:對於每個客戶端他的系統視圖都是最新的
在zookeeper中有幾種角色:
- Leader:發起投票和決議,更新系統狀態
- Follower:響應客戶端請求,參與投票
- Observer:不參與投票,只同步Leader狀態
- Client:發起請求
在啟動之前需要在conf下編寫zoo.cfg配置文件,里面的內容包括:
- tickTime:心跳間隔
- initLimit:Follower和Leader之間建立連接的最大心跳數
- syncLimit:Follower和Leader之間通信時限
- dataDir:數據目錄
- dataLogDir:日志目錄
- minSessionTimeout:最小會話時間(默認tickTime * 2)
- maxSessionTimeout:最大會話時間(默認tickTime * 20)
- maxClientCnxns:客戶端數量
- clientPort:監聽客戶端連接的端口
- server.N=YYYY:A:B:其中N為服務器編號,YYYY是服務器的IP地址,A是Leader和Follower通信端口,B為選舉端口
在單機的時候可以直接將zoo_sample.cfg修改為zoo.cfg,然后使用啟動服務即可(如果報錯沒有目錄,手動創建即可):
sudo ./zkServer.sh start
現在用netstat -na(或者是./zkCli.sh 127.0.0.1:2181)就能看到在監聽指定的端口,那么zookeeper現在起來了。
參考:
- http://blog.csdn.net/shenlan211314/article/details/6170717
- http://blog.csdn.net/hi_kevin/article/details/7089358
- 下載地址:http://apache.dataguru.cn/zookeeper/zookeeper-3.4.6
jstorm
該系統是阿里巴巴在對storm做了重寫和優化,在storm里面能運行的在jstorm里面也能運行,該系統擅長執行實時計算,而且基本上都在內存中搞定。進入正題,jstorm中有如下幾種角色:
- spout:源頭。
- bolt:處理器。
- topology:由處理器、源頭組成的拓撲網絡(每條邊就是一個訂閱關系)。
- tuple:數據。
- worker:執行進程。
- task:執行線程。
- nimbus:分發代碼、任務,監控集群運行狀態
- supervisor:監聽nimbus的指令,接收分發代碼和任務並執行
jstorm是用zookeeper來管理的,下面來看conf/storm.yaml中的常用配置:
- storm.zookeeper.servers:zookeeper集群地址。
- storm.zookeeper.root:zookeeper中storm的根目錄位置。
- storm.local.dir:用來存放配置文件、JAR等。
- storm.messaging.netty.transfer.async.batch:在使用Netty的時候,設置是否一個batch中會有多個消息。
- java.library.path:本地庫的加載地址,比如zeromq、jzmq等。
- supervisor.slots.ports:supervisor節點上的worker使用的端口號列表。
- supervisor.enable.cgroup:是否使用cgroups來做資源隔離。
- topology.buffer.size.limited:是否限制內存,如果不限制將使用LinkedBlockingDeque。
- topology.performance.metrics:是否開啟監控。
- topology.alimonitor.metrics.post:是否將監控數據發送給AliMonitor。
- topology.enable.classloader:默認禁用了用戶自定義的類加載器。
- worker.memory.size:worker的內存大小。
在把配置搞正確之后,就可以用bin中的腳本來啟動節點服務了:
sudo ./jstorm nimbus
sudo ./jstorm supervisor
參考:
- https://github.com/alibaba/jstorm/wiki/%E5%A6%82%E4%BD%95%E5%AE%89%E8%A3%85
- storm編程入門:http://ifeve.com/getting-started-with-storm-5/
jstorm的架構
結構和hadoop的很像,整體看來如下(Nimbus負責控制、提交任務,Supervisor負責執行任務):
為了做實時計算你需要建立topology,由計算節點組成的圖:
在JStorm上的topology的生命周期如下:
- 上傳代碼並做校驗(/nimbus/inbox);
- 建立本地目錄(/stormdist/topology-id/);
- 建立zookeeper上的心跳目錄;
- 計算topology的工作量(parallelism hint),分配task-id並寫入zookeeper;
- 把task分配給supervisor執行;
- 在supervisor中定時檢查是否有新的task,下載新代碼、刪除老代碼,剩下的工作交個小弟worker;
- 在worker中把task拿到,看里面有哪些spout/Bolt,然后計算需要給哪些task發消息並建立連接;
- 在nimbus將topology終止的時候會將zookeeper上的相關信息刪除;
在集群運行的時候要明白Worker、Executor、Task的概念,當然消息被傳遞的時候其實發起者、接收者都是Task,而真正執行的是Executor(可以理解為一個線程),由它來輪詢其中的Spout/Bolt:
在jstorm中通過ack機制來保證數據至少被處理一次,簡單來說下ack:
在消息發、收的過程中會形成一棵樹狀的結構,在一個消息收的時候發一個驗證消息,發的時候也發一個驗證消息,那么總體上每個消息出現兩次。那么ack機制就是將每個消息的隨機生成的ID進行異或,如果在某一時刻結果為0,那就說明處理成功。
如下圖所示:
需要補充一下:雖然ack算是隨機算法,但是出錯的概率極低,但是系統應該具備在出錯之后矯正的能力(甚至檢查是否出錯)。ack機制保證了消息會被處理,但是不能保證只處理一次&順序處理,在需要的情形就有了事務的概念:
碼代碼
基本用法
所謂普通模式是指不去使用JStorm為開發人員提供的高級抽象,用其提供的原生的接口進行開發,主要涉及到的接口有:
- ISpout:數據源頭接口,jstorm會不斷調用nextTuple方法來獲取數據並發射出去。
- open:在worker中初始化該ISpout時調用,一般用來設置一些屬性:比如從spring容器中獲取對應的Bean。
- close:和open相對應(在要關閉的時候調用)。
- activate:從非活動狀態變為活動狀態時調用。
- deactivate:和activate相對應(從活動狀態變為非活動狀態時調用)。
- nextTuple:JStorm希望在每次調用該方法的時候,它會通過collector.emit發射一個tuple。
- ack:jstorm發現msgId對應的tuple被成功地完整消費會調用該方法。
- fail:和ack相對應(jstorm發現某個tuple在某個環節失敗了)。
- IBolt:數據處理接口,jstorm將消息發給他並讓其處理,完成之后可能整個處理流程就結束了,也可能傳遞給下一個節點繼續執行。
- prepare:對應ISpout的open方法。
- cleanup:對應ISpout的close方法(吐槽一下,搞成一樣的名字會死啊...)。
- execute:處理jstorm發送過來的tuple。
- TopologyBuilder:每個jstorm運行的任務都是一個拓撲接口,而builder的作用就是根據配置文件構建這個拓撲結構,更直白就是構建一個網。
- setSpout:添加源頭節點並設置並行度。
- setBolt:添加處理節點並設置並行度。
因為還存在多種其他類型的拓撲結構,那么在builder這個環節當然不能亂傳,在基本用法要去實現IRichSpout、IRichBolt接口,他們並沒有新增任何的方法,僅僅是用來區分類型。既然是拓撲結構那么應該是一個比較復雜的網絡,其實這個是在builder中完成的,其中setSpout/setBolt返回的結果其實是InputDeclarer對象,在其中定義了N個流分組的策略:
1
2
3
4
5
6
7
8
9
10
11
|
public
T fieldsGrouping(String componentId, String streamId, Fields fields);
// 按字段分組,具有同樣字段值的Tuple會被分到相同Bolt里的Task,不同字段值則會被分配到不同Task
public
T globalGrouping(String componentId, String streamId);
// 全局分組,Tuple被分配到Bolt中ID值最低的的一個Task。
public
T shuffleGrouping(String componentId, String streamId);
// 隨機分組,隨機派發Stream里面的Tuple,保證每個Bolt接收到的Tuple數目大致相同,通過輪詢隨機的方式使得下游Bolt之間接收到的Tuple數目差值不超過1。
public
T localOrShuffleGrouping(String componentId, String streamId);
本worker優先,如果本worker內有目標component的task,則隨機從本worker內部的目標component的task中進行選擇,否則就和普通的shuffleGrouping一樣
public
T noneGrouping(String componentId, String streamId);
隨機發送tuple到目標component上,但無法保證平均
public
T allGrouping(String componentId, String streamId);
// 廣播分組,每一個Tuple,所有的Bolt都會收到。
public
T directGrouping(String componentId, String streamId);
// 直接分組,Tuple需要指定由Bolt的哪個Task接收。 只有被聲明為Direct Stream的消息流可以聲明這種分組方法。
// 自定義分組
public
T customGrouping(String componentId, CustomStreamGrouping grouping);
public
T customGrouping(String componentId, String streamId, CustomStreamGrouping grouping);
public
T grouping(GlobalStreamId id, Grouping grouping);
|
通過這些接口,我們可以一邊增加處理節點、一邊指定其消費哪些消息。
批量用法
基本的用法是每次處理一個tuple,但是這種效率比較低,很多情況下是可以批量獲取消息然后一起處理,批量用法對這種方式提供了支持。打開代碼可以很明顯地發現jstorm和storm的有着不小的區別:
1
2
3
4
5
6
7
8
9
10
11
12
13
|
// storm 中的定義
public
interface
IBatchSpout
extends
Serializable {
void
open(Map conf, TopologyContext context);
void
emitBatch(
long
batchId, TridentCollector collector);
// 批次發射tuple
void
ack(
long
batchId);
// 成功處理批次
void
close();
Map getComponentConfiguration();
Fields getOutputFields();
}
// jstorm中的定義
public
interface
IBatchSpout
extends
IBasicBolt, ICommitter, Serializable {
}
|
另外如果用批次的話就需要改用BatchTopologyBuilder來構建拓撲結構,在IBatchSpout中主要實現的接口如下:
- execute:雖然和IBolt中名字、參數一致,但是增加了一些默認邏輯
- 入參的input.getValue(0)表示批次(BatchId)。
- 發送消息時collector.emit(new Values(batchId, value)),發送的列表第一個字段表示批次(BatchId)。
- commit:批次成功時調用,常見的是修改offset。
- revert:批次失敗時調用,可以在這里根據offset取出批次數據進行重試。
Transactional Topology
事務拓撲並不是新的東西,只是在原始的ISpout、IBolt上做了一層封裝。在事務拓撲中以並行(processing)和順序(commiting)混合的方式來完成任務,使用Transactional Topology可以保證每個消息只會成功處理一次。不過需要注意的是,在Spout需要保證能夠根據BatchId進行多次重試,在這里有一個基本的例子,這里有一個不錯的講解。
Trident
這次一種更高級的抽象(甚至不需要知道底層是怎么map-reduce的),所面向的不再是spout和bolt,而是stream。主要涉及到下面幾種接口:
- 在本地完成的操作
- Function:自定義操作。
- Filters:自定義過濾。
- partitionAggregate:對同批次的數據進行local combiner操作。
- project:只保留stream中指定的field。
- stateQuery、partitionPersist:查詢和持久化。
- 決定Tuple如何分發到下一個處理環節
- shuffle:隨機。
- broadcast:廣播。
- partitionBy:以某一個特定的field進行hash,分到某一個分區,這樣該field位置相同的都會放到同一個分區。
- global:所有tuple發到指定的分區。
- batchGlobal:同一批的tuple被放到相同的分區(不同批次不同分區)。
- partition:用戶自定義的分區策略。
- 不同partition處理結果的匯聚操作
- aggregate:只針對同一批次的數據。
- persistentAggregate:針對所有批次進行匯聚,並將中間狀態持久化。
- 對stream中的tuple進行重新分組,后續的操作將會對每一個分組獨立進行(類似sql中的group by)
- groupBy
- 將多個Stream融合成一個
- merge:多個流進行簡單的合並。
- join:多個流按照某個KEY進行UNION操作(只能針對同一個批次的數據)。