Storm常見模式——分布式RPC


本文翻譯自:https://github.com/nathanmarz/storm/wiki/Distributed-RPC,作為學習Storm DRPC的資料,轉載必須以超鏈接形式標明文章原始出處及本文翻譯鏈接

分布式RPC(distributed RPC,DRPC)用於對Storm上大量的函數調用進行並行計算過程。對於每一次函數調用,Storm集群上運行的拓撲接收調用函數的參數信息作為輸入流,並將計算結果作為輸出流發射出去。

DRPC本身算不上Storm的特性,它是通過Storm的基本元素:streams,spouts,bolts,topologies而衍生的一個模式。DRPC可以單獨作為一個獨立於Storm的庫發布,但由於其重要性還是和Storm捆綁在了一起。

總體概述

DRPC通過DRPC Server來實現,DRPC Server的整體工作過程如下:

  1. 接收到一個RPC調用請求;
  2. 發送請求到Storm上的拓撲;
  3. 從Storm上接收計算結果;
  4. 將計算結果返回給客戶端。

以上過程,在client客戶端看來,一個DRPC調用看起來和一般的RPC調用沒什么區別。下面代碼是client通過DRPC調用“reach”函數,參數為“http://twitter.com”:

DRPCClient client = new DRPCClient("drpc-host", 3772);
String result = client.execute("reach", "http://twitter.com");

DRPC內部工作流程如下:

  1. Client向DRPC Server發送被調用執行的DRPC函數名稱及參數。
  2. Storm上的topology通過DRPCSpout實現這一函數,從DPRC Server接收到函數調用流;
  3. DRPC Server會為每次函數調用生成唯一的id;
  4. Storm上運行的topology開始計算結果,最后通過一個ReturnResults的Bolt連接到DRPC Server,發送指定id的計算結果;
  5. DRPC Server通過使用之前為每個函數調用生成的id,將結果關聯到對應的發起調用的client,將計算結果返回給client。

LinearDRPCTopologyBuilder

Storm提供了一個topology builder——LinearDRPCTopologyBuilder,它可以自動完成幾乎所有的DRPC步驟。包括:

  1. 構建spout
  2. DRPC Server返回結果;
  3. Bolt提供函數用於對tuples進行聚集。

下面是一個簡單的例子,這個DRPC拓撲只是簡單的在輸入參數后追加!后返回:

public static class ExclaimBolt extends BaseBasicBolt {
    public void execute(Tuple tuple, BasicOutputCollector collector) {
        String input = tuple.getString(1);
        collector.emit(new Values(tuple.getValue(0), input + "!"));
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("id", "result"));
    }
}

public static void main(String[] args) throws Exception {
    LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("exclamation");
    builder.addBolt(new ExclaimBolt(), 3);
    // ...
}

由上述例子可見,我們只需很少的工作即可完成拓撲。當創建LinearDRPCTopologyBuilder的時候,需要指定拓撲中DRPC函數的名稱exclamation。一個DRPC Server可以協調多個函數,每個函數有不同的函數名稱。拓撲中的第一個bolt的輸入是個字段:第一個是請求的id號;第二個是請求的參數。

LinearDRPCTopologyBuilder同時需要最后一個bolt發射一個包含兩個字段的輸出流:第一個字段是請求id;第二個字段是計算結果。因此,所有的中間tuples必須包含請求id作為第一個字段。

例子中,ExclaimBolt在輸入tuple的第二個字段后面追加“!”LinearDRPCTopologyBuilder負責處理其余的協調工作:與DRPC Server建立連接,發送結果給DRPC Server

本地模式DRPC

DRPC可以以本地模式運行,下面的代碼是如何在本地模式運行上面的例子:

LocalDRPC drpc = new LocalDRPC();
LocalCluster cluster = new LocalCluster();

cluster.submitTopology("drpc-demo", conf, builder.createLocalTopology(drpc));

System.out.println("Results for 'hello':" + drpc.execute("exclamation", "hello"));

cluster.shutdown();
drpc.shutdown();

首先創建一個LocalDRPC對象,該對象在本地模擬一個DRPC Server,正如LocalCluster在本地模擬一個Storm集群一樣。然后創建一個LocalCluster對象在本地模式下運行拓撲。LinearDRPCTopologyBuilder含有單獨的方法用於創建本地拓撲和遠程拓撲。

本地模式下,LocalDRPC並不綁定任何端口,因此Storm的拓撲需要了解要通訊的對象——這就是為什么createLocalTopology方法需要以LocalDRPC對象作為輸入。

加載完拓撲之后,通過對LocalDRPC調用execute方法,就可以執行DRPC函數調用了。

遠程模式DRPC

在實際的Storm集群上運行DRPC也一樣很簡單。只需完成以下步驟:

  1. 啟動DRPC Server(s);
  2. 配置DRPC Server(s)地址;
  3. 向Storm集群提交DRPC拓撲。

首先,通過storm腳本啟動DRPC Server:

bin/storm drpc

然后,在Storm集群中配置DRPC Server地址,這就是DRPCSpout讀取函數調用請求的地方。這一步的配置可以通過storm.yaml文件或者拓撲的配置來完成。通過storm.yaml文件的配置方式如下:

drpc.servers:
  - "drpc1.foo.com"
  - "drpc2.foo.com"

最后,通過StormSubmitter啟動DRPC拓撲。為了以遠程模式運行上面的例子,代碼如下:

StormSubmitter.submitTopology("exclamation-drpc", conf, builder.createRemoteTopology());

createRemoteTopology被用於為Storm集群創建合適的拓撲。

一個復雜的例子

上面的exclamation只是一個簡單的DRPC例子。下面通過一個復雜的例子介紹如何在Storm集群內進行DRPC——計算Twitter上每個URL的到達度(reach),也就是每個URL暴露給的不同人的個數。

為了完成這一計算,需要完成以下步驟:

  1. 獲取所有點選了(tweet)該URL的人;
  2. 獲取步驟1中所有人的關注者(followers,粉絲);
  3. 對所有關注者followers進行去重;
  4. 對步驟3中的關注者人數進行求和。

一個簡單的URL到達度計算可能涉及成千上萬次數據庫調用以及數以百萬的followers記錄,計算量非常大。有了Storm,將很容易實現這一計算過程。單機上可能需要運行幾分鍾才能完成,在Storm集群上,即使是最難計算的URL也只需要幾秒鍾。

這個例子的代碼在storm-starter:點擊這里。這里是如何創建拓撲的代碼:

LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("reach");
builder.addBolt(new GetTweeters(), 3);
builder.addBolt(new GetFollowers(), 12)
        .shuffleGrouping();
builder.addBolt(new PartialUniquer(), 6)
        .fieldsGrouping(new Fields("id", "follower"));
builder.addBolt(new CountAggregator(), 2)
        .fieldsGrouping(new Fields("id"));
拓撲的執行分為以下四步:
  1. GetTweeters:獲取所有tweet了指定URL的用戶列表,這個Bolt將輸入流[id, url]轉換成輸出流[id, tweeter],每個url元組被映射為多個tweeter元組。
  2. GetFollowers:獲取步驟1中所有用戶列表的followers,這個Bolt將輸入流[id, twetter]轉換成輸出流[id, follower],當某個人同時是多個人的關注者follower,而且這些人都tweet了指定的URL,那么將產生重復的follower元組。
  3. PartialUniquer:將所有followers按照follower id分組,使得同一個follower在同一個task中被處理。這個Bolt接收follower並進行去重計數。
  4. CountAggregator:從各個PartialUniquer中接收各部分的計數結果,累加后完成到達度計算。

下面是PartialUniquer這個Bolt的代碼實現:

public class PartialUniquer extends BaseBatchBolt {
    BatchOutputCollector _collector;
    Object _id;
    Set<String> _followers = new HashSet<String>();
    
    @Override
    public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, Object id) {
        _collector = collector;
        _id = id;
    }

    @Override
    public void execute(Tuple tuple) {
        _followers.add(tuple.getString(1));
    }
    
    @Override
    public void finishBatch() {
        _collector.emit(new Values(_id, _followers.size()));
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("id", "partial-count"));
    }
}

PartialUniquer通過繼承BaseBatchBolt實現了IBatchBolt接口,batch bolt提供了API用於將一批tuples作為整體來處理。每個請求id會創建一個新的batch bolt實例,同時Storm負責這些實例的清理工作。

PartialUniquer接收到一個follower元組時執行execute方法,將follower添加到請求id對應的HashSet集合中。

Batch bolt同時提供了finishBatch方法用於當這個task已經處理完所有的元組時調用。PartialUniquer發射一個包含當前task所處理的follower ids子集去重后個數的元組。

在內部實現上,CoordinatedBolt用於檢測指定的bolt是否已經收到指定請求id的所有tuples元組。CoordinatedBolt使用direct streams管理實現這一協作過程。

拓撲的其他部分易於理解。到達度的每一步的計算過程都是並行進行的,通過DRPC實現也是非常容易的。

Non-linear DRPC拓撲

LinearDRPCTopologyBuilder只能處理線性的”DRPC拓撲——正如到達度這樣可以通過一系列步驟序列來完成的計算。不難想象,DRPC調用中包含有更復雜的帶有分支和合並Bolt的拓撲。目前,必須自己直接使用CoordinatedBolt來完成這種非線性拓撲的計算。

LinearDRPCTopologyBuilder工作過程

  • DRPCSpout發射[args, return-info],其中return-info包含DRPC Server的主機和端口號,以及DRPC Server為該次請求生成的唯一id號;
  • 構造一個Storm拓撲包含以下部分:
    • DRPCSpout
    • PrepareRequest(生成一個請求id,為return info創建一個流,為args創建一個流)
    • CoordinatedBolt wrappers以及direct groupings
    • JoinResult(將結果與return info拼接起來)
    • ReturnResult(連接到DRPC Server,返回結果)
  • LinearDRPCTopologyBuilder是建立在Storm基本元素之上的高層抽象。

高級進階

  • KeyedFairBolt用於組織同一時刻多請求的處理過程;
  • 如何直接使用CoordinatedBolt

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM