Flume的Avro Sink和Avro Source研究之一: Avro Source


問題 : Avro Source提供了怎么樣RPC服務,是怎么提供的?

問題 1.1 Flume Source是如何啟動一個Netty Server來提供RPC服務。

由GitHub上avro-rpc-quickstart知道可以通過下面這種方式啟動一個NettyServer,來提供特定的RPC。那么Flume Source 是通過這種方法來提供的RPC服務嗎?

  server = new NettyServer(new SpecificResponder(Mail.class, new MailImpl()), new InetSocketAddress(65111));

 

 AvroSource中創建NettyServer的源碼為:

  

    Responder responder = new SpecificResponder(AvroSourceProtocol.class, this);

    NioServerSocketChannelFactory socketChannelFactory = initSocketChannelFactory();

    ChannelPipelineFactory pipelineFactory = initChannelPipelineFactory();

    server = new NettyServer(responder, new InetSocketAddress(bindAddress, port),
          socketChannelFactory, pipelineFactory, null);

  看來AvroSource也是直接用Avro提供的NettyServer類來建立了一個NettyServe,不過它使用了另一個構造函數,指定了ChannelFactory和ChannelPipelineFactory.

   那么AvroSource使用的是怎么樣的一個ChannelFactory呢?

  initSocketChannelFactory()方法的實現為:

  private NioServerSocketChannelFactory initSocketChannelFactory() {
    NioServerSocketChannelFactory socketChannelFactory;
    if (maxThreads <= 0) {
      socketChannelFactory = new NioServerSocketChannelFactory
          (Executors .newCachedThreadPool(), Executors.newCachedThreadPool());
    } else {
      socketChannelFactory = new NioServerSocketChannelFactory(
          Executors.newCachedThreadPool(),
          Executors.newFixedThreadPool(maxThreads));
    }
    return socketChannelFactory;
  }

  看來之所以要指定ChannelFactory,是為了根據AvroSource的"threads”這個參數,來決定可以使用worker thread的最大個數。這個數字決定了最多有多少個線程來處理RPC請求。

  參見NioServerChannelFactory的說明

  

A ServerSocketChannelFactory which creates a server-side NIO-based ServerSocketChannel. It utilizes the non-blocking I/O mode which was introduced with NIO to serve many number of concurrent connections efficiently.

How threads work

There are two types of threads in a NioServerSocketChannelFactory; one is boss thread and the other is worker thread.

Boss threads

Each bound ServerSocketChannel has its own boss thread. For example, if you opened two server ports such as 80 and 443, you will have two boss threads. A boss thread accepts incoming connections until the port is unbound. Once a connection is accepted successfully, the boss thread passes the accepted Channel to one of the worker threads that the NioServerSocketChannelFactory manages.

Worker threads

One NioServerSocketChannelFactory can have one or more worker threads. A worker thread performs non-blocking read and write for one or more Channels in a non-blocking mode.

  ChannelPipelineFactory是干嘛的呢?為什么也要特化一個?

  ChannelPipleline類的說明為:

  A list of ChannelHandlers which handles or intercepts ChannelEvents of a ChannelChannelPipeline implements an advanced form of the Intercepting Filter pattern to give a user full control over how an event is handled and how the ChannelHandlers in the pipeline interact with each other.

 

  看來這東西提供了一種更高級的攔截器組合。那就來看看AvroSource是用了怎么樣的ChannelPiplelineFactory

  

  private ChannelPipelineFactory initChannelPipelineFactory() {
    ChannelPipelineFactory pipelineFactory;
    boolean enableCompression = compressionType.equalsIgnoreCase("deflate");
    if (enableCompression || enableSsl) {
      pipelineFactory = new SSLCompressionChannelPipelineFactory(
          enableCompression, enableSsl, keystore,
          keystorePassword, keystoreType);
    } else {
      pipelineFactory = new ChannelPipelineFactory() {
        @Override
        public ChannelPipeline getPipeline() throws Exception {
          return Channels.pipeline();
        }
      };
    }
    return pipelineFactory;
  }

  看來如果開啟了壓縮或者使用了ssl,就使用SSLCompressionChannelPiplelineFactory,這類是AvroSource一個私有的靜態內部類。否則就使用Channels.pipleline()新建一個,這個pipleline貌似啥都不做?

  

問題 1.2這樣Server是起來了,可是Server提供了什么樣的RPC服務呢?

  關鍵在這一句。

  

Responder responder = new SpecificResponder(AvroSourceProtocol.class, this);

  查下Avro的API,得知道SpecificResponder的兩個參數是protocol和protocol的實現。看起來AvroSource這個類實現了AvroSourceProtocol。Yes, AvroSource的聲明為

  

public class AvroSource extends AbstractSource implements EventDrivenSource,Configurable, AvroSourceProtocol

  那就看看AvroSourceProtocol是怎么樣定義的吧。它定義在flume-ng-sdk工程的src/main/avro目錄下,由flume.avdl定義。avdl是使用Avro IDL定義的協議。放在那個特定的目錄下,是avro-maven-plugin的約定。

  這個avdl是這樣的

  

@namespace("org.apache.flume.source.avro")

protocol AvroSourceProtocol {

enum Status {
  OK, FAILED, UNKNOWN
}

record AvroFlumeEvent {
  map<string> headers;
  bytes body;
}

Status append( AvroFlumeEvent event );

Status appendBatch( array<AvroFlumeEvent> events );

}

  

  它定義了一個枚舉,用作append和appendBatch的返回值。表示Source端對傳輸來的消息處理的結果,有OK FAILED UNKNOWN三種狀態。

  定義了 AvroFlumeEvent這樣一個record類型,符合Flume對Event的定義,header是一系列K-V對,即一個Map, body是byte數組。

  定義了兩個方法,append單條AvroFlumeEvent,以及append一批AvroFlumeEvent.

  由此avdl,Avro生成了三個java文件,包括:一個枚舉Status,一個類AvroFlumeEvent,一個接口AvroSourceProtocol。其中AvroSource類實現了AvroSourceProtocol接口,對外提供了append和appendBatch這兩個遠程方法調用。

  append方法實現為:

  

  @Override
  public Status append(AvroFlumeEvent avroEvent) {
    logger.debug("Avro source {}: Received avro event: {}", getName(),
        avroEvent);
    sourceCounter.incrementAppendReceivedCount();
    sourceCounter.incrementEventReceivedCount();

    Event event = EventBuilder.withBody(avroEvent.getBody().array(),
        toStringMap(avroEvent.getHeaders()));

    try {
      getChannelProcessor().processEvent(event);
    } catch (ChannelException ex) {
      logger.warn("Avro source " + getName() + ": Unable to process event. " +
          "Exception follows.", ex);
      return Status.FAILED;
    }

    sourceCounter.incrementAppendAcceptedCount();
    sourceCounter.incrementEventAcceptedCount();

    return Status.OK;
  }

  這個方法就是用獲取的AvroFlumeEvent對象,經過轉換構建一個Event對象。這個轉換只是將不對等的數據類型進行了轉換,arvoEvent.getBody()返回的是ByteBuffer,而avroEvent.getHeaders()返回的是Map<CharSequence,CharSequence>。

構建完Event后,把這個消息傳遞給這個Source對應的ChannelProcessor來處理。

  appendBatch方法和append方法的實現很相似。

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM