啊,AvroSink要復雜好多:《
好吧,先確定主要問題:
- AvroSink為啥這么多代碼?有必要嗎?它都有哪些邏輯需要實現?
你看,avro-rpc-quickstart里是這么建client,然后進行RPC的
NettyTransceiver client = new NettyTransceiver(new InetSocketAddress(65111)); // client code - attach to the server and send a message Mail proxy = (Mail) SpecificRequestor.getClient(Mail.class, client); proxy.send(message);
那么,AvroSink為啥不是這么簡單?它會啟動多個線程,並發的RPC? 它會使用連接池?它自己實現了一個?
AvroSink繼承自 AbstractRpcSink. AbstractRpcSink對Sink接口的process方法的實現為,由自己持有的RpcClient對象來對消息進行實際處理,即 client.appendBatch(batch);。而AvroSink實現AbstractRpcSink中的虛方法 "protected abstract RpcClient initializeRpcClient(Properties props)" 來提供一個可用的RpcClient。它的實現為:
protected RpcClient initializeRpcClient(Properties props) { logger.info("Attempting to create Avro Rpc client."); return RpcClientFactory.getInstance(props); }
而RpcClientFactory的getInstance方法當“client.type"參數為空時,返回默認的RpcClient,即 NettyAvroRpcClient。
NettyAvroRpcClient
在它的"private void connect(long timeout, TimeUnit tu) throws FlumeException"方法中,實始化進行RPC所需要的代理,即此類中avroClient域。
transceiver = new NettyTransceiver(this.address, socketChannelFactory, tu.toMillis(timeout)); avroClient = SpecificRequestor.getClient(AvroSourceProtocol.Callback.class, transceiver);
avroClient可以代理AvroSourceProtocol.Callback.class,這個AvroSourceProtocol.Callback.class 定義了跟AvroSourceProtocol相似的接口,不過增加了一個參數用來進行回調。
@org.apache.avro.specific.AvroGenerated
public interface AvroSourceProtocol {
public static final org.apache.avro.Protocol PROTOCOL = org.apache.avro.Protocol.parse("{\"protocol\":\"AvroSourceProtocol\",\"namespace\":\"org.apache.flume.source.avro\",\"doc\":\"* Licensed to the Apache Software Foundation (ASF) under one\\n * or more contributor license agreements. See the NOTICE file\\n * distributed with this work for additional information\\n * regarding copyright ownership. The ASF licenses this file\\n * to you under the Apache License, Version 2.0 (the\\n * \\\"License\\\"); you may not use this file except in compliance\\n * with the License. You may obtain a copy of the License at\\n *\\n * http://www.apache.org/licenses/LICENSE-2.0\\n *\\n * Unless required by applicable law or agreed to in writing,\\n * software distributed under the License is distributed on an\\n * \\\"AS IS\\\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY\\n * KIND, either express or implied. See the License for the\\n * specific language governing permissions and limitations\\n * under the License.\",\"types\":[{\"type\":\"enum\",\"name\":\"Status\",\"symbols\":[\"OK\",\"FAILED\",\"UNKNOWN\"]},{\"type\":\"record\",\"name\":\"AvroFlumeEvent\",\"fields\":[{\"name\":\"headers\",\"type\":{\"type\":\"map\",\"values\":\"string\"}},{\"name\":\"body\",\"type\":\"bytes\"}]}],\"messages\":{\"append\":{\"request\":[{\"name\":\"event\",\"type\":\"AvroFlumeEvent\"}],\"response\":\"Status\"},\"appendBatch\":{\"request\":[{\"name\":\"events\",\"type\":{\"type\":\"array\",\"items\":\"AvroFlumeEvent\"}}],\"response\":\"Status\"}}}");
org.apache.flume.source.avro.Status append(org.apache.flume.source.avro.AvroFlumeEvent event) throws org.apache.avro.AvroRemoteException;
org.apache.flume.source.avro.Status appendBatch(java.util.List<org.apache.flume.source.avro.AvroFlumeEvent> events) throws org.apache.avro.AvroRemoteException;@SuppressWarnings("all")
public interface Callback extends AvroSourceProtocol {
public static final org.apache.avro.Protocol PROTOCOL = org.apache.flume.source.avro.AvroSourceProtocol.PROTOCOL;
void append(org.apache.flume.source.avro.AvroFlumeEvent event, org.apache.avro.ipc.Callback<org.apache.flume.source.avro.Status> callback) throws java.io.IOException;
void appendBatch(java.util.List<org.apache.flume.source.avro.AvroFlumeEvent> events, org.apache.avro.ipc.Callback<org.apache.flume.source.avro.Status> callback) throws java.io.IOException;
}
}
下邊看下NettyAvroRpcClient是怎么實現其RpcClient接口的append和appendBatch方法的。
它所override的append(Event event)方法中,把消息的處理交給自己的append(Event event, long timeout, TimeUnit tu)來處理。
append方法首先將Event對象轉化為用於RPC的AvroFlumeEvent對象。然后把RPC的動作提交給一個線程池。
try {
// due to AVRO-1122, avroClient.append() may block
handshake = callTimeoutPool.submit(new Callable<Void>() {
@Override
public Void call() throws Exception {
avroClient.append(avroEvent, callFuture);
return null;
}
});
} catch (RejectedExecutionException ex) {
throw new EventDeliveryException(this + ": Executor error", ex);
}
avroClient.append中有一個callFuture參數,future代表了一個異步執行的結果,所以它所被期望的行為是append方法會立即返回,然后另一個線程通過callFuture對象來獲取執行的結果。但是實際上,由於avro RPC 之前有個handshake的過程用於確認雙方持有的shema是否合適,這個handshake的過程會阻塞client端RPC調用的方法,即會阻塞client端的append方法,使得在提交任務后,直接使用callFuture的get(timeout),若append方法執行完返回了結果,那這個timeout實際上可能包括了handshake的時長加上server端實際執行append方法的時長。因此AvroSink把這兩個時長都設為可配置的,即用戶可以設定handshake的花的時長,以及等待server端處理請求的時長。但是這個handshake只在client和server第一次通信時進行。所以后續的client端的append RPC調用會立即返回,不再需要等待handshake。
看一下AvroSink的配置選項。
connect-timeout | 20000 | Amount of time (ms) to allow for the first (handshake) request. |
request-timeout | 20000 | Amount of time (ms) to allow for requests after the first. |
connect-timeout的設置是通過下面的代碼實現的。等待這個Callable執行完,如果超時,就取消這個Callable.去掉異常處理后的代碼是這樣子:
handshake.get(connectTimeout, TimeUnit.MILLISECONDS); finally { if (!handshake.isDone()) { handshake.cancel(true); } } waitForStatusOK(callFuture, timeout, tu);
而waitForStatusOK是這樣子:
try { Status status = callFuture.get(timeout, tu); if (status != Status.OK) { throw new EventDeliveryException(this + ": Avro RPC call returned " + "Status: " + status); }
即, append方法會根據在flume配置文件里設置的超時參數進行等待。調用append方法的線程還是會阻塞到這個消息處理完畢。
由於handshake會阻塞RPC調用,而handshake花的時間是不確定的,所以才不得不使用一個線程池,即callTimeoutPool來將append這個RPC調用放在單獨的Callable里執行,用Future對RPC的執行情況進行監控,如果append進行過長時間的等待,就通過future取消這個任務。真是用心良苦……