安裝部署
zeromq
簡單快速的傳輸層框架,安裝如下:
wget http://download.zeromq.org/zeromq-2.1.7.tar.gz
tar zxf zeromq-2.1.7.tar.gz
cd zeromq-2.1.7
./configure
make
sudo make install
sudo ldconfig
jzmq
應該是zmq的java包吧,安裝步驟如下:
git clone git://github.com/nathanmarz/jzmq.git
cd jzmq
./autogen.sh
./configure
make
make install
zookeeper
針對大型分布式系統提供配置維護、名字服務、分布式同步、組服務等,可以保證:
- 順序性:客戶端的更新請求都會被順序處理
- 原子性:更新操作要不成功,要不失敗
- 一致性:客戶端不論連接到那個服務端,展現給它的都是同一個視圖
- 可靠性:更新會被持久化
- 實時性:對於每個客戶端他的系統視圖都是最新的
在zookeeper中有幾種角色:
- Leader:發起投票和決議,更新系統狀態
- Follower:響應客戶端請求,參與投票
- Observer:不參與投票,只同步Leader狀態
- Client:發起請求
在啟動之前需要在conf下編寫zoo.cfg配置文件,里面的內容包括:
- tickTime:心跳間隔
- initLimit:Follower和Leader之間建立連接的最大心跳數
- syncLimit:Follower和Leader之間通信時限
- dataDir:數據目錄
- dataLogDir:日志目錄
- minSessionTimeout:最小會話時間(默認tickTime * 2)
- maxSessionTimeout:最大會話時間(默認tickTime * 20)
- maxClientCnxns:客戶端數量
- clientPort:監聽客戶端連接的端口
- server.N=YYYY:A:B:其中N為服務器編號,YYYY是服務器的IP地址,A是Leader和Follower通信端口,B為選舉端口
在單機的時候可以直接將zoo_sample.cfg修改為zoo.cfg,然后使用啟動服務即可(如果報錯沒有目錄,手動創建即可):
sudo ./zkServer.sh start
現在用netstat -na(或者是./zkCli.sh 127.0.0.1:2181)就能看到在監聽指定的端口,那么zookeeper現在起來了。
參考:
- http://blog.csdn.net/shenlan211314/article/details/6170717
- http://blog.csdn.net/hi_kevin/article/details/7089358
- 下載地址:http://apache.dataguru.cn/zookeeper/zookeeper-3.4.6
jstorm
該系統是阿里巴巴在對storm做了重寫和優化,在storm里面能運行的在jstorm里面也能運行,該系統擅長執行實時計算,而且基本上都在內存中搞定。進入正題,jstorm中有如下幾種角色:
- spout:源頭。
- bolt:處理器。
- topology:由處理器、源頭組成的拓撲網絡(每條邊就是一個訂閱關系)。
- tuple:數據。
- worker:執行進程。
- task:執行線程。
- nimbus:分發代碼、任務,監控集群運行狀態
- supervisor:監聽nimbus的指令,接收分發代碼和任務並執行
jstorm是用zookeeper來管理的,下面來看conf/storm.yaml中的常用配置:
- storm.zookeeper.servers:zookeeper集群地址。
- storm.zookeeper.root:zookeeper中storm的根目錄位置。
- storm.local.dir:用來存放配置文件、JAR等。
- storm.messaging.netty.transfer.async.batch:在使用Netty的時候,設置是否一個batch中會有多個消息。
- java.library.path:本地庫的加載地址,比如zeromq、jzmq等。
- supervisor.slots.ports:supervisor節點上的worker使用的端口號列表。
- supervisor.enable.cgroup:是否使用cgroups來做資源隔離。
- topology.buffer.size.limited:是否限制內存,如果不限制將使用LinkedBlockingDeque。
- topology.performance.metrics:是否開啟監控。
- topology.alimonitor.metrics.post:是否將監控數據發送給AliMonitor。
- topology.enable.classloader:默認禁用了用戶自定義的類加載器。
- worker.memory.size:worker的內存大小。
在把配置搞正確之后,就可以用bin中的腳本來啟動節點服務了:
sudo ./jstorm nimbus
sudo ./jstorm supervisor
參考:
- https://github.com/alibaba/jstorm/wiki/%E5%A6%82%E4%BD%95%E5%AE%89%E8%A3%85
- storm編程入門:http://ifeve.com/getting-started-with-storm-5/
jstorm的架構
結構和hadoop的很像,整體看來如下(Nimbus負責控制、提交任務,Supervisor負責執行任務):
為了做實時計算你需要建立topology,由計算節點組成的圖:
在JStorm上的topology的生命周期如下:
- 上傳代碼並做校驗(/nimbus/inbox);
- 建立本地目錄(/stormdist/topology-id/);
- 建立zookeeper上的心跳目錄;
- 計算topology的工作量(parallelism hint),分配task-id並寫入zookeeper;
- 把task分配給supervisor執行;
- 在supervisor中定時檢查是否有新的task,下載新代碼、刪除老代碼,剩下的工作交個小弟worker;
- 在worker中把task拿到,看里面有哪些spout/Bolt,然后計算需要給哪些task發消息並建立連接;
- 在nimbus將topology終止的時候會將zookeeper上的相關信息刪除;
在集群運行的時候要明白Worker、Executor、Task的概念,當然消息被傳遞的時候其實發起者、接收者都是Task,而真正執行的是Executor(可以理解為一個線程),由它來輪詢其中的Spout/Bolt:
在jstorm中通過ack機制來保證數據至少被處理一次,簡單來說下ack:
在消息發、收的過程中會形成一棵樹狀的結構,在一個消息收的時候發一個驗證消息,發的時候也發一個驗證消息,那么總體上每個消息出現兩次。那么ack機制就是將每個消息的隨機生成的ID進行異或,如果在某一時刻結果為0,那就說明處理成功。
如下圖所示:
需要補充一下:雖然ack算是隨機算法,但是出錯的概率極低,但是系統應該具備在出錯之后矯正的能力(甚至檢查是否出錯)。ack機制保證了消息會被處理,但是不能保證只處理一次&順序處理,在需要的情形就有了事務的概念:
碼代碼
基本用法
所謂普通模式是指不去使用JStorm為開發人員提供的高級抽象,用其提供的原生的接口進行開發,主要涉及到的接口有:
- ISpout:數據源頭接口,jstorm會不斷調用nextTuple方法來獲取數據並發射出去。
- open:在worker中初始化該ISpout時調用,一般用來設置一些屬性:比如從spring容器中獲取對應的Bean。
- close:和open相對應(在要關閉的時候調用)。
- activate:從非活動狀態變為活動狀態時調用。
- deactivate:和activate相對應(從活動狀態變為非活動狀態時調用)。
- nextTuple:JStorm希望在每次調用該方法的時候,它會通過collector.emit發射一個tuple。
- ack:jstorm發現msgId對應的tuple被成功地完整消費會調用該方法。
- fail:和ack相對應(jstorm發現某個tuple在某個環節失敗了)。
- IBolt:數據處理接口,jstorm將消息發給他並讓其處理,完成之后可能整個處理流程就結束了,也可能傳遞給下一個節點繼續執行。
- prepare:對應ISpout的open方法。
- cleanup:對應ISpout的close方法(吐槽一下,搞成一樣的名字會死啊...)。
- execute:處理jstorm發送過來的tuple。
- TopologyBuilder:每個jstorm運行的任務都是一個拓撲接口,而builder的作用就是根據配置文件構建這個拓撲結構,更直白就是構建一個網。
- setSpout:添加源頭節點並設置並行度。
- setBolt:添加處理節點並設置並行度。
因為還存在多種其他類型的拓撲結構,那么在builder這個環節當然不能亂傳,在基本用法要去實現IRichSpout、IRichBolt接口,他們並沒有新增任何的方法,僅僅是用來區分類型。既然是拓撲結構那么應該是一個比較復雜的網絡,其實這個是在builder中完成的,其中setSpout/setBolt返回的結果其實是InputDeclarer對象,在其中定義了N個流分組的策略:
public T fieldsGrouping(String componentId, String streamId, Fields fields); // 字段分組 public T globalGrouping(String componentId, String streamId); // 全局分組 public T shuffleGrouping(String componentId, String streamId); // 隨機分組 public T localOrShuffleGrouping(String componentId, String streamId); // 本地或隨機分組 public T noneGrouping(String componentId, String streamId); // 無分組 public T allGrouping(String componentId, String streamId); // 廣播分組 public T directGrouping(String componentId, String streamId); // 直接分組 // 自定義分組 public T customGrouping(String componentId, CustomStreamGrouping grouping); public T customGrouping(String componentId, String streamId, CustomStreamGrouping grouping); public T grouping(GlobalStreamId id, Grouping grouping);
通過這些接口,我們可以一邊增加處理節點、一邊指定其消費哪些消息。
批量用法
基本的用法是每次處理一個tuple,但是這種效率比較低,很多情況下是可以批量獲取消息然后一起處理,批量用法對這種方式提供了支持。打開代碼可以很明顯地發現jstorm和storm的有着不小的區別:
// storm 中的定義 public interface IBatchSpout extends Serializable { void open(Map conf, TopologyContext context); void emitBatch(long batchId, TridentCollector collector);// 批次發射tuple void ack(long batchId); // 成功處理批次 void close(); Map getComponentConfiguration(); Fields getOutputFields(); } // jstorm中的定義 public interface IBatchSpout extends IBasicBolt, ICommitter, Serializable { }
另外如果用批次的話就需要改用BatchTopologyBuilder來構建拓撲結構,在IBatchSpout中主要實現的接口如下:
- execute:雖然和IBolt中名字、參數一致,但是增加了一些默認邏輯
- 入參的input.getValue(0)表示批次(BatchId)。
- 發送消息時collector.emit(new Values(batchId, value)),發送的列表第一個字段表示批次(BatchId)。
- commit:批次成功時調用,常見的是修改offset。
- revert:批次失敗時調用,可以在這里根據offset取出批次數據進行重試。
Transactional Topology
事務拓撲並不是新的東西,只是在原始的ISpout、IBolt上做了一層封裝。在事務拓撲中以並行(processing)和順序(commiting)混合的方式來完成任務,使用Transactional Topology可以保證每個消息只會成功處理一次。不過需要注意的是,在Spout需要保證能夠根據BatchId進行多次重試,在這里有一個基本的例子,這里有一個不錯的講解。
Trident
這次一種更高級的抽象(甚至不需要知道底層是怎么map-reduce的),所面向的不再是spout和bolt,而是stream。主要涉及到下面幾種接口:
- 在本地完成的操作
- Function:自定義操作。
- Filters:自定義過濾。
- partitionAggregate:對同批次的數據進行local combiner操作。
- project:只保留stream中指定的field。
- stateQuery、partitionPersist:查詢和持久化。
- 決定Tuple如何分發到下一個處理環節
- shuffle:隨機。
- broadcast:廣播。
- partitionBy:以某一個特定的field進行hash,分到某一個分區,這樣該field位置相同的都會放到同一個分區。
- global:所有tuple發到指定的分區。
- batchGlobal:同一批的tuple被放到相同的分區(不同批次不同分區)。
- partition:用戶自定義的分區策略。
- 不同partition處理結果的匯聚操作
- aggregate:只針對同一批次的數據。
- persistentAggregate:針對所有批次進行匯聚,並將中間狀態持久化。
- 對stream中的tuple進行重新分組,后續的操作將會對每一個分組獨立進行(類似sql中的group by)
- groupBy
- 將多個Stream融合成一個
- merge:多個流進行簡單的合並。
- join:多個流按照某個KEY進行UNION操作(只能針對同一個批次的數據)。
在這里有一個jstorm中使用Trident的簡單例子。
DRPC
問題排查
模型設計
在很多的實際問題中,我們面對的模型都是大同小異,下面先來看問題是什么:
問題描述
1、在流式計算中經常需要對一批的數據進行匯總計算,如果用SQL來描述就是:
SELECT MIN(status) FROM my_table GROUP BY order_id
在用JStorm來實現這一條簡單的SQL時,面對的是一條一條的數據庫變化的消息(這里需要保證有序消費),其實相當於在一堆的消息上面做了一個嵌套的SQL查詢,用一張圖表示如下:
2、
----- updating -----