JSTORM使用筆記


安裝部署

zeromq

簡單快速的傳輸層框架,安裝如下:

wget http://download.zeromq.org/zeromq-2.1.7.tar.gz
tar zxf zeromq-2.1.7.tar.gz
cd zeromq-2.1.7
./configure
make
sudo make install
sudo ldconfig 

jzmq

應該是zmq的java包吧,安裝步驟如下:

git clone git://github.com/nathanmarz/jzmq.git
cd jzmq
./autogen.sh
./configure
make
make install

zookeeper

針對大型分布式系統提供配置維護、名字服務、分布式同步、組服務等,可以保證:

  1. 順序性:客戶端的更新請求都會被順序處理
  2. 原子性:更新操作要不成功,要不失敗
  3. 一致性:客戶端不論連接到那個服務端,展現給它的都是同一個視圖
  4. 可靠性:更新會被持久化
  5. 實時性:對於每個客戶端他的系統視圖都是最新的

在zookeeper中有幾種角色:

  1. Leader:發起投票和決議,更新系統狀態
  2. Follower:響應客戶端請求,參與投票
  3. Observer:不參與投票,只同步Leader狀態
  4. Client:發起請求

在啟動之前需要在conf下編寫zoo.cfg配置文件,里面的內容包括:

  1. tickTime:心跳間隔
  2. initLimit:Follower和Leader之間建立連接的最大心跳數
  3. syncLimit:Follower和Leader之間通信時限
  4. dataDir:數據目錄
  5. dataLogDir:日志目錄
  6. minSessionTimeout:最小會話時間(默認tickTime * 2)
  7. maxSessionTimeout:最大會話時間(默認tickTime * 20)
  8. maxClientCnxns:客戶端數量
  9. clientPort:監聽客戶端連接的端口
  10. server.N=YYYY:A:B:其中N為服務器編號,YYYY是服務器的IP地址,A是Leader和Follower通信端口,B為選舉端口

在單機的時候可以直接將zoo_sample.cfg修改為zoo.cfg,然后使用啟動服務即可(如果報錯沒有目錄,手動創建即可):

sudo ./zkServer.sh start

現在用netstat -na(或者是./zkCli.sh 127.0.0.1:2181)就能看到在監聽指定的端口,那么zookeeper現在起來了。 

參考:

  1. http://blog.csdn.net/shenlan211314/article/details/6170717
  2. http://blog.csdn.net/hi_kevin/article/details/7089358
  3. 下載地址:http://apache.dataguru.cn/zookeeper/zookeeper-3.4.6

jstorm

該系統是阿里巴巴在對storm做了重寫和優化,在storm里面能運行的在jstorm里面也能運行,該系統擅長執行實時計算,而且基本上都在內存中搞定。進入正題,jstorm中有如下幾種角色:

  1. spout:源頭。
  2. bolt:處理器。
  3. topology:由處理器、源頭組成的拓撲網絡(每條邊就是一個訂閱關系)。
  4. tuple:數據。
  5. worker:執行進程。
  6. task:執行線程。
  7. nimbus:分發代碼、任務,監控集群運行狀態
  8. supervisor:監聽nimbus的指令,接收分發代碼和任務並執行

jstorm是用zookeeper來管理的,下面來看conf/storm.yaml中的常用配置:

  1. storm.zookeeper.servers:zookeeper集群地址。
  2. storm.zookeeper.root:zookeeper中storm的根目錄位置。
  3. storm.local.dir:用來存放配置文件、JAR等。
  4. storm.messaging.netty.transfer.async.batch:在使用Netty的時候,設置是否一個batch中會有多個消息。
  5. java.library.path:本地庫的加載地址,比如zeromq、jzmq等。
  6. supervisor.slots.ports:supervisor節點上的worker使用的端口號列表。
  7. supervisor.enable.cgroup:是否使用cgroups來做資源隔離。
  8. topology.buffer.size.limited:是否限制內存,如果不限制將使用LinkedBlockingDeque。
  9. topology.performance.metrics:是否開啟監控。
  10. topology.alimonitor.metrics.post:是否將監控數據發送給AliMonitor。
  11. topology.enable.classloader:默認禁用了用戶自定義的類加載器。
  12. worker.memory.size:worker的內存大小。

在把配置搞正確之后,就可以用bin中的腳本來啟動節點服務了:

sudo ./jstorm nimbus
sudo ./jstorm supervisor

參考:

  1. https://github.com/alibaba/jstorm/wiki/%E5%A6%82%E4%BD%95%E5%AE%89%E8%A3%85
  2. storm編程入門:http://ifeve.com/getting-started-with-storm-5/

jstorm的架構

結構和hadoop的很像,整體看來如下(Nimbus負責控制、提交任務,Supervisor負責執行任務):

為了做實時計算你需要建立topology,由計算節點組成的圖:

在JStorm上的topology的生命周期如下:

  1. 上傳代碼並做校驗(/nimbus/inbox);
  2. 建立本地目錄(/stormdist/topology-id/);
  3. 建立zookeeper上的心跳目錄;
  4. 計算topology的工作量(parallelism hint),分配task-id並寫入zookeeper;
  5. 把task分配給supervisor執行;
  6. 在supervisor中定時檢查是否有新的task,下載新代碼、刪除老代碼,剩下的工作交個小弟worker;
  7. 在worker中把task拿到,看里面有哪些spout/Bolt,然后計算需要給哪些task發消息並建立連接;
  8. 在nimbus將topology終止的時候會將zookeeper上的相關信息刪除;

在集群運行的時候要明白WorkerExecutorTask的概念,當然消息被傳遞的時候其實發起者、接收者都是Task,而真正執行的是Executor(可以理解為一個線程),由它來輪詢其中的Spout/Bolt:

在jstorm中通過ack機制來保證數據至少被處理一次,簡單來說下ack:

在消息發、收的過程中會形成一棵樹狀的結構,在一個消息收的時候發一個驗證消息,發的時候也發一個驗證消息,那么總體上每個消息出現兩次。那么ack機制就是將每個消息的隨機生成的ID進行異或,如果在某一時刻結果為0,那就說明處理成功。

如下圖所示: 

需要補充一下:雖然ack算是隨機算法,但是出錯的概率極低,但是系統應該具備在出錯之后矯正的能力(甚至檢查是否出錯)。ack機制保證了消息會被處理,但是不能保證只處理一次&順序處理,在需要的情形就有了事務的概念:

碼代碼

基本用法

所謂普通模式是指不去使用JStorm為開發人員提供的高級抽象,用其提供的原生的接口進行開發,主要涉及到的接口有:

  1. ISpout:數據源頭接口,jstorm會不斷調用nextTuple方法來獲取數據並發射出去。
    1. open:在worker中初始化該ISpout時調用,一般用來設置一些屬性:比如從spring容器中獲取對應的Bean。
    2. close:和open相對應(在要關閉的時候調用)。
    3. activate:從非活動狀態變為活動狀態時調用。
    4. deactivate:和activate相對應(從活動狀態變為非活動狀態時調用)。
    5. nextTuple:JStorm希望在每次調用該方法的時候,它會通過collector.emit發射一個tuple。
    6. ack:jstorm發現msgId對應的tuple被成功地完整消費會調用該方法。
    7. fail:和ack相對應(jstorm發現某個tuple在某個環節失敗了)。
  2. IBolt:數據處理接口,jstorm將消息發給他並讓其處理,完成之后可能整個處理流程就結束了,也可能傳遞給下一個節點繼續執行。
    1. prepare:對應ISpout的open方法。
    2. cleanup:對應ISpout的close方法(吐槽一下,搞成一樣的名字會死啊...)。
    3. execute:處理jstorm發送過來的tuple。
  3. TopologyBuilder:每個jstorm運行的任務都是一個拓撲接口,而builder的作用就是根據配置文件構建這個拓撲結構,更直白就是構建一個網。
    1. setSpout:添加源頭節點並設置並行度。
    2. setBolt:添加處理節點並設置並行度。

因為還存在多種其他類型的拓撲結構,那么在builder這個環節當然不能亂傳,在基本用法要去實現IRichSpoutIRichBolt接口,他們並沒有新增任何的方法,僅僅是用來區分類型。既然是拓撲結構那么應該是一個比較復雜的網絡,其實這個是在builder中完成的,其中setSpout/setBolt返回的結果其實是InputDeclarer對象,在其中定義了N個流分組的策略:

public T fieldsGrouping(String componentId, String streamId, Fields fields); // 字段分組
public T globalGrouping(String componentId, String streamId); // 全局分組
public T shuffleGrouping(String componentId, String streamId); // 隨機分組
public T localOrShuffleGrouping(String componentId, String streamId); // 本地或隨機分組
public T noneGrouping(String componentId, String streamId); // 無分組
public T allGrouping(String componentId, String streamId); // 廣播分組
public T directGrouping(String componentId, String streamId); // 直接分組
// 自定義分組  
public T customGrouping(String componentId, CustomStreamGrouping grouping);  
public T customGrouping(String componentId, String streamId, CustomStreamGrouping grouping);  
public T grouping(GlobalStreamId id, Grouping grouping);  

通過這些接口,我們可以一邊增加處理節點、一邊指定其消費哪些消息。

批量用法

基本的用法是每次處理一個tuple,但是這種效率比較低,很多情況下是可以批量獲取消息然后一起處理,批量用法對這種方式提供了支持。打開代碼可以很明顯地發現jstorm和storm的有着不小的區別:

// storm 中的定義
public interface IBatchSpout extends Serializable {
	void open(Map conf, TopologyContext context);
	void emitBatch(long batchId, TridentCollector collector);// 批次發射tuple
	void ack(long batchId); // 成功處理批次
	void close();
	Map getComponentConfiguration();
	Fields getOutputFields();
}

// jstorm中的定義
public interface IBatchSpout extends IBasicBolt, ICommitter, Serializable {
}

另外如果用批次的話就需要改用BatchTopologyBuilder來構建拓撲結構,在IBatchSpout中主要實現的接口如下:

  1. execute:雖然和IBolt中名字、參數一致,但是增加了一些默認邏輯
    1. 入參的input.getValue(0)表示批次(BatchId)。
    2. 發送消息時collector.emit(new Values(batchId, value)),發送的列表第一個字段表示批次(BatchId)。
  2. commit:批次成功時調用,常見的是修改offset。
  3. revert:批次失敗時調用,可以在這里根據offset取出批次數據進行重試。

Transactional Topology

事務拓撲並不是新的東西,只是在原始的ISpout、IBolt上做了一層封裝。在事務拓撲中以並行(processing)和順序(commiting)混合的方式來完成任務,使用Transactional Topology可以保證每個消息只會成功處理一次。不過需要注意的是,在Spout需要保證能夠根據BatchId進行多次重試,在這里有一個基本的例子,這里有一個不錯的講解。

Trident

這次一種更高級的抽象(甚至不需要知道底層是怎么map-reduce的),所面向的不再是spout和bolt,而是stream。主要涉及到下面幾種接口:

  1. 在本地完成的操作
    1. Function:自定義操作。
    2. Filters:自定義過濾。
    3. partitionAggregate:對同批次的數據進行local combiner操作。
    4. project:只保留stream中指定的field。
    5. stateQuery、partitionPersist:查詢和持久化。
  2. 決定Tuple如何分發到下一個處理環節
    1. shuffle:隨機。
    2. broadcast:廣播。
    3. partitionBy:以某一個特定的field進行hash,分到某一個分區,這樣該field位置相同的都會放到同一個分區。
    4. global:所有tuple發到指定的分區。
    5. batchGlobal:同一批的tuple被放到相同的分區(不同批次不同分區)。
    6. partition:用戶自定義的分區策略。
  3. 不同partition處理結果的匯聚操作
    1. aggregate:只針對同一批次的數據。
    2. persistentAggregate:針對所有批次進行匯聚,並將中間狀態持久化。
  4. 對stream中的tuple進行重新分組,后續的操作將會對每一個分組獨立進行(類似sql中的group by)
    1. groupBy
  5. 將多個Stream融合成一個
    1. merge:多個流進行簡單的合並。
    2. join:多個流按照某個KEY進行UNION操作(只能針對同一個批次的數據)。

這里有一個jstorm中使用Trident的簡單例子。

DRPC

 

 

 

 

 

 

問題排查

 

 

 

模型設計

在很多的實際問題中,我們面對的模型都是大同小異,下面先來看問題是什么:

問題描述

1、在流式計算中經常需要對一批的數據進行匯總計算,如果用SQL來描述就是:

SELECT MIN(status) FROM my_table GROUP BY order_id

在用JStorm來實現這一條簡單的SQL時,面對的是一條一條的數據庫變化的消息(這里需要保證有序消費),其實相當於在一堆的消息上面做了一個嵌套的SQL查詢,用一張圖表示如下:

2、

 

  

 

 

 

 

----- updating -----


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM