基於版本:CDH5.4.2
上述版本較老,但是目前生產上是使用這個版本,所以以此為例。
1. 概要
說明:
-
客戶端API發送的請求將會被RPCServer的Listener線程監聽到。
-
Listener線程將分配Reader給到此Channel用戶后續請求的相應。
-
Reader線程將請求包裝成CallRunner實例,並將通過RpcScheduler線程根據請求屬性分類dispatch到不同的Executor線程。
-
Executor線程將會保存這個CallRunner實例到隊列。
-
每一個Executor隊列都被綁定了指定個數的Handler線程進行消費,消費很簡單,即拿出隊列的CallRunner實例,執行器run()方法。
-
run()方法將會組裝response到Responder線程中。
-
Responder線程將會不斷地將不同Channel的結果返回到客戶端。
2. 代碼梳理
總體來說服務端RPC處理機制是一個生產者消費者模型。
2.1 組件初始化
-
RpcServer是在master或者regionserver啟動時候進行初始化的,關鍵代碼如下:
public HRegionServer(Configuration conf, CoordinatedStateManager csm)
throws IOException, InterruptedException {
this.fsOk = true;
this.conf = conf;
checkCodecs(this.conf);
.....
rpcServices.start();
.....
}
-
/** Starts the service. Must be called before any calls will be handled. */
@Override
public synchronized void start() {
if (started) return;
......
responder.start();
listener.start();
scheduler.start();
started = true;
}
2.2 客戶端API請求接收和包裝
Listener通過NIO機制進行端口監聽,客戶端API連接服務端指定端口將會被監聽。
-
Listener對於API請求的接收:
void doAccept(SelectionKey key) throws IOException, OutOfMemoryError {
Connection c;
ServerSocketChannel server = (ServerSocketChannel) key.channel();
SocketChannel channel;
while ((channel = server.accept()) != null) {
try {
......
// 當一個API請求過來時候將會打開一個Channel,Listener將會分配一個Reader注冊。
// reader實例個數有限,采取順序分配和復用,即一個reader可能為多個Channel服務。
Reader reader = getReader();
try {
reader.startAdd();
SelectionKey readKey = reader.registerChannel(channel);
// 同時也將保存這個Channel,用於后續的結果返回等
c = getConnection(channel, System.currentTimeMillis());
readKey.attach(c);
synchronized (connectionList) {
connectionList.add(numConnections, c);
numConnections++;
......
}
}
上述中Reader個數是有限的並且可以順序復用的,個數可以通過如下參數進行設定,默認為10個。
this.readThreads = conf.getInt("hbase.ipc.server.read.threadpool.size", 10);
當生產能力不足時,可以考慮增加此配置值。
-
Reader讀取請求並包裝請求
當Reader實例被分配到一個Channel后,它將讀取此通道過來的請求,並包裝成CallRunner用於調度。
void doRead(SelectionKey key) throws InterruptedException {
......
try {
// 此時將調用connection的讀取和處理方法
count = c.readAndProcess();
......
}
}public int readAndProcess() throws IOException, InterruptedException {
......
// 通過connectionPreambleRead標記為判斷此鏈接是否為新連接,如果是新的那么需要讀取
// 頭部報文信息,用於判斷當前鏈接屬性,比如是當前采取的是哪種安全模式?
if (!connectionPreambleRead) {
count = readPreamble();
if (!connectionPreambleRead) {
return count;
}
......
count = channelRead(channel, data);
if (count >= 0 && data.remaining() == 0) { // count==0 if dataLength == 0
// 實際處理請求,里面也會根據鏈接的頭報文讀取時候判斷出的兩種模式進行不同的處理。
process();
}
return count;
}private void process() throws IOException, InterruptedException {
......
if (useSasl) {
// Kerberos安全模式
saslReadAndProcess(data.array());
} else {
// AuthMethod.SIMPLE模式
processOneRpc(data.array());
}
.......
}如下以AuthMethod.SIMPLE模式為例進行分析:
private void processOneRpc(byte[] buf) throws IOException, InterruptedException {
if (connectionHeaderRead) {
// 處理具體請求
processRequest(buf);
} else {
// 再次判斷鏈接Header是否讀取,未讀取則取出頭報文用以確定請求的服務和方法等。
processConnectionHeader(buf);
this.connectionHeaderRead = true;
if (!authorizeConnection()) {
throw new AccessDeniedException("Connection from " + this + " for service "
connectionHeader.getServiceName() + " is unauthorized for user: " + user);
}
}
}protected void processRequest(byte[] buf) throws IOException, InterruptedException {
long totalRequestSize = buf.length;
......
// 這里將會判斷RpcServer做接收到的請求是否超過了maxQueueSize,注意這個值為
// RpcServer級別的變量
if ((totalRequestSize + callQueueSize.get()) > maxQueueSize) {
final Call callTooBig =
new Call(id, this.service, null, null, null, null, this,
responder, totalRequestSize, null);
ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
setupResponse(responseBuffer, callTooBig, new CallQueueTooBigException(),
"Call queue is full on " + getListenerAddress() +
", is hbase.ipc.server.max.callqueue.size too small?");
responder.doRespond(callTooBig);
return;
}
......
Call call = new Call(id, this.service, md, header, param, cellScanner, this, responder,
totalRequestSize,
traceInfo);
// 此時請求段處理結束,將請求包裝成CallRunner后發送到不同的Executer的隊列中去。
scheduler.dispatch(new CallRunner(RpcServer.this, call, userProvider));
}注意這個值為 RpcServer級別的變量,默認值為1G,超過此閾值將會出現Call queue is full錯誤。
callQueueSize的大小會在請求接收的時候增加,在請求處理結束(調用完畢CallRunner的run方法后)減去相應值。
this.maxQueueSize =this.conf.getInt("hbase.ipc.server.max.callqueue.size",DEFAULT_MAX_CALLQUEUE_SIZE);
2.3 請求轉發與調度
客戶端請求在經過接收和包裝為CallRunner后將會被具體的Scheduler進行dispatch,master和regionserver
調度器並不相同,這里以regionserver的調度器進行講解。具體為:SimpleRpcScheduler。
public RSRpcServices(HRegionServer rs) throws IOException {
......
RpcSchedulerFactory rpcSchedulerFactory;
try {
Class<?> rpcSchedulerFactoryClass = rs.conf.getClass(
REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS,
SimpleRpcSchedulerFactory.class);
rpcSchedulerFactory = ((RpcSchedulerFactory) rpcSchedulerFactoryClass.newInstance());
-
請求轉發
前面已經提到請求包裝完CallRunner后由具體的RpcScheduler實現類的dispacth方法進行轉發。
具體代碼為:
-
執行器介紹-隊列初始化
在此調度器中共分為三個級別的調度執行器:
-
高優先請求級執行器
-
一般請求執行器
-
replication請求執行器
private final RpcExecutor callExecutor;
private final RpcExecutor priorityExecutor;
private final RpcExecutor replicationExecutor;
上述中callExecutor為最主要一般請求執行器,在當前版本中此執行器中可以將讀取和寫入初始化為不同比例的隊列,並將handler也分成不同比例進行隊列的綁定。即一個隊列上面只有被綁定的handler具體處理權限。默認的不划分讀寫分離的場景下就只有一個隊列,所有請求都進入其中,所有的handler也將去處理這個隊列。
具體我們以讀寫分離隊列為例進行代碼分析:
float callQueuesHandlersFactor = conf.getFloat(CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 0);
int numCallQueues = Math.max(1, (int)Math.round(handlerCount * callQueuesHandlersFactor));
LOG.info("Using " + callQueueType + " as user call queue, count=" + numCallQueues);
if (numCallQueues > 1 && callqReadShare > 0) {
// multiple read/write queues
if (callQueueType.equals(CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE)) {
CallPriorityComparator callPriority = new CallPriorityComparator(conf, this.priority);
// 實例化RW讀取執行器,構造參數中的為讀寫比例,其中讀取又分為一般讀取和scan讀取比例
// 后續將會調用重載的其他構造方法,最終將會計算出各個讀取隊列的個數和handler的比例數
callExecutor = new RWQueueRpcExecutor("RW.default", handlerCount, numCallQueues,
callqReadShare, callqScanShare, maxQueueLength, conf, abortable,
BoundedPriorityBlockingQueue.class, callPriority);
} else { -
如下為最終調用的重載構造方法:
public RWQueueRpcExecutor(final String name, int writeHandlers, int readHandlers,
int numWriteQueues, int numReadQueues, float scanShare,
final Class<? extends BlockingQueue> writeQueueClass, Object[] writeQueueInitArgs,
final Class<? extends BlockingQueue> readQueueClass, Object[] readQueueInitArgs) {
super(name, Math.max(writeHandlers, numWriteQueues) + Math.max(readHandlers, numReadQueues));
int numScanQueues = Math.max(0, (int)Math.floor(numReadQueues * scanShare));
int scanHandlers = Math.max(0, (int)Math.floor(readHandlers * scanShare));
if ((numReadQueues - numScanQueues) > 0) {
numReadQueues -= numScanQueues;
readHandlers -= scanHandlers;
} else {
numScanQueues = 0;
scanHandlers = 0;
}
// 確定各個主要隊列參數
this.writeHandlersCount = Math.max(writeHandlers, numWriteQueues);
this.readHandlersCount = Math.max(readHandlers, numReadQueues);
this.scanHandlersCount = Math.max(scanHandlers, numScanQueues);
this.numWriteQueues = numWriteQueues;
this.numReadQueues = numReadQueues;
this.numScanQueues = numScanQueues;
this.writeBalancer = getBalancer(numWriteQueues);
this.readBalancer = getBalancer(numReadQueues);
this.scanBalancer = getBalancer(numScanQueues);
queues = new ArrayList<BlockingQueue<CallRunner>>(writeHandlersCount + readHandlersCount);
LOG.debug(name + " writeQueues=" + numWriteQueues + " writeHandlers=" + writeHandlersCount +
" readQueues=" + numReadQueues + " readHandlers=" + readHandlersCount +
((numScanQueues == 0) ? "" : " scanQueues=" + numScanQueues +
" scanHandlers=" + scanHandlersCount));
// 初始化隊列列表,注意queues為有序列表,如下隊列位置初始化后不會變動,在后續按照具體的請求
// 通過具體的getBalancer方法進行查找
for (int i = 0; i < numWriteQueues; ++i) {
queues.add((BlockingQueue<CallRunner>)
ReflectionUtils.newInstance(writeQueueClass, writeQueueInitArgs));
}
for (int i = 0;