限於篇幅關系,在觀察源碼的時候,只列舉了部分源代碼
TServer類層次體系
TSimpleServer/TThreadPoolServer是阻塞服務模型
TNonblockingServer/THsHaServer/TThreadedSelectotServer是非阻塞服務模型(NIO)
1 TServer抽象類的定義
內部靜態類Args的定義, 用於TServer類用於串聯軟件棧(傳輸層, 協議層, 處理層)
public abstract class TServer {
public static class Args extends AbstractServerArgs<Args> {
public Args(TServerTransport transport) {
super(transport);
}
}
public static abstract class AbstractServerArgs<T extends AbstractServerArgs<T>> {
public AbstractServerArgs(TServerTransport transport);
public T processorFactory(TProcessorFactory factory);
public T processor(TProcessor processor);
public T transportFactory(TTransportFactory factory);
public T protocolFactory(TProtocolFactory factory);
}
}
TServer類定義的抽象類
public abstract class TServer {
public abstract void serve();
public void stop();
public boolean isServing();
public void setServerEventHandler(TServerEventHandler eventHandler);
}
評注:
抽象函數serve由具體的TServer實例來實現, 而並非所有的服務都需要優雅的退出, 因此stop沒有被定義為抽象
2 TSimpleServer
TSimpleServer的工作模式采用最簡單的阻塞IO,實現方法簡潔明了,便於理解,但是一次只能接收和處理一個socket連接,效率比較低,主要用於演示Thrift的工作過程,在實際開發過程中很少用到它。
工作方式如圖:
抽象的代碼可簡單描述如下:
// *) server socket進行監聽
serverSocket.listen();
while ( isServing() ) {
// *) 接受socket鏈接
client = serverSocket.accept();
// *) 封裝處理器
processor = factory.getProcess(client);
while ( true ) {
// *) 阻塞處理rpc的輸入/輸出
if ( !processor.process(input, output) ) {
break;
}
}
}
3 ThreadPoolServer
TThreadPoolServer模式采用阻塞socket方式工作,,主線程負責阻塞式監聽“監聽socket”中是否有新socket到來,業務處理交由一個線程池來處
工作模式圖:
ThreadPoolServer解決了TSimple不支持並發和多連接的問題, 引入了線程池. 實現的模型是One Thread Per Connection
線程池代碼片段:
private static ExecutorService createDefaultExecutorService(Args args) {
SynchronousQueue<Runnable> executorQueue =
new SynchronousQueue<Runnable>();
return new ThreadPoolExecutor(args.minWorkerThreads,
args.maxWorkerThreads,
args.stopTimeoutVal,
TimeUnit.SECONDS,
executorQueue);
}
評注:
采用同步隊列(SynchronousQueue), 線程池采用能線程數可伸縮的模式.
主線程循環簡單描述代碼:
setServing(true);
while (!stopped_) {
try {
TTransport client = serverTransport_.accept();
WorkerProcess wp = new WorkerProcess(client);
executorService_.execute(wp);
} catch (TTransportException ttx) {
}
}
TThreadPoolServer模式優點:
線程池模式中,拆分了監聽線程(accept)和處理客戶端連接的工作線程(worker),數據讀取和業務處理都交由線程池完成,主線程只負責監聽新連接,因此在並發量較大時新連接也能夠被及時接受。線程池模式比較適合服務器端能預知最多有多少個客戶端並發的情況,這時每個請求都能被業務線程池及時處理,性能也非常高。
TThreadPoolServer模式缺點:
線程池模式的處理能力受限於線程池的工作能力,當並發請求數大於線程池中的線程數時,新請求也只能排隊等待
4 TNonblockingServer
TNonblockingServer該模式也是單線程工作,但是采用NIO的模式, 借助Channel/Selector機制, 采用IO事件模型來處理.
所有的socket都被注冊到selector中,在一個線程中通過seletor循環監控所有的socket,每次selector結束時,處理所有的處於就緒狀態的socket,對於有數據到來的socket進行數據讀取操作,對於有數據發送的socket則進行數據發送,對於監聽socket則產生一個新業務socket並將其注冊到selector中。
工作原理圖:
nio部分關鍵代碼如下:
private void select() {
try {
// wait for io events.
selector.select();
// process the io events we received
Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator();
while (!stopped_ && selectedKeys.hasNext()) {
SelectionKey key = selectedKeys.next();
selectedKeys.remove();
// skip if not valid
if (!key.isValid()) {
cleanupSelectionKey(key);
continue;
}
// if the key is marked Accept, then it has to be the server
// transport.
if (key.isAcceptable()) {
handleAccept();
} else if (key.isReadable()) {
// deal with reads
handleRead(key);
} else if (key.isWritable()) {
// deal with writes
handleWrite(key);
} else {
LOGGER.warn("Unexpected state in select! " + key.interestOps());
}
}
} catch (IOException e) {
LOGGER.warn("Got an IOException while selecting!", e);
}
}
TNonblockingServer模式優點:
相比於TSimpleServer效率提升主要體現在IO多路復用上,TNonblockingServer采用非阻塞IO,對accept/read/write等IO事件進行監控和處理,同時監控多個socket的狀態變化;
TNonblockingServer模式缺點:
TNonblockingServer模式在業務處理上還是采用單線程順序來完成,在業務處理比較復雜、耗時的時候,例如某些接口函數需要讀取數據庫執行時間較長,會導致整個服務被阻塞住,此時該模式效率也不高,因為多個調用請求任務依然是順序一個接一個執行
5 THsHaServer
鑒於TNonblockingServer的缺點, THsHaServer繼承TNonblockingServer,引入了線程池去處理, 其模型把讀寫任務放到線程池去處理.THsHaServer是Half-sync/Half-async的處理模式, Half-aysnc是在處理IO事件上(accept/read/write io), Half-sync用於handler對rpc的同步處理上.
工作模式圖:
/**
* Helper to create an invoker pool
*/
protected static ExecutorService createInvokerPool(Args options) {
int minWorkerThreads = options.minWorkerThreads;
int maxWorkerThreads = options.maxWorkerThreads;
int stopTimeoutVal = options.stopTimeoutVal;
TimeUnit stopTimeoutUnit = options.stopTimeoutUnit;
LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>();
ExecutorService invoker = new ThreadPoolExecutor(minWorkerThreads,
maxWorkerThreads, stopTimeoutVal, stopTimeoutUnit, queue);
return invoker;
}
THsHaServer的優點:
與TNonblockingServer模式相比,THsHaServer在完成數據讀取之后,將業務處理過程交由一個線程池來完成,主線程直接返回進行下一次循環操作,效率大大提升;
THsHaServer的缺點:
主線程需要完成對所有socket的監聽以及數據讀寫的工作,當並發請求數較大時,且發送數據量較多時,監聽socket上新連接請求不能被及時接受。
6. TThreadedSelectorServer
TThreadedSelectorServer是對以上NonblockingServer的擴充, 其分離了Accept和Read/Write的Selector線程, 同時引入Worker工作線程池. 它也是種Half-sync/Half-async的服務模型
TThreadedSelectorServer模式是目前Thrift提供的最高級的模式,它內部有如果幾個部分構成:
(1) 一個AcceptThread線程對象,專門用於處理監聽socket上的新連接;
(2) 若干個SelectorThread對象專門用於處理業務socket的網絡I/O操作,所有網絡數據的讀寫均是有這些線程來完成;
(3) 一個負載均衡器SelectorThreadLoadBalancer對象,主要用於AcceptThread線程接收到一個新socket連接請求時,決定將這個新連接請求分配給哪個SelectorThread線程。
(4) 一個ExecutorService類型的工作線程池,在SelectorThread線程中,監聽到有業務socket中有調用請求過來,則將請求讀取之后,交個ExecutorService線程池中的線程完成此次調用的具體執行;主要用於處理每個rpc請求的handler回調處理(這部分是同步的).
工作模式圖:
TThreadedSelectorServer模式中有一個專門的線程AcceptThread用於處理新連接請求,因此能夠及時響應大量並發連接請求;另外它將網絡I/O操作分散到多個SelectorThread線程中來完成,因此能夠快速對網絡I/O進行讀寫操作,能夠很好地應對網絡I/O較多的情況
從accpect線程到selectorThreads關鍵代碼
protected boolean startThreads() {
try {
for (int i = 0; i < args.selectorThreads; ++i) {
selectorThreads.add(new SelectorThread(args.acceptQueueSizePerThread));//建立事件選擇線程池
}
acceptThread = new AcceptThread((TNonblockingServerTransport) serverTransport_,
createSelectorThreadLoadBalancer(selectorThreads));//建立accept接受請求線程
for (SelectorThread thread : selectorThreads) {
thread.start();
}
acceptThread.start();
return true;
} catch (IOException e) {
LOGGER.error("Failed to start threads!", e);
return false;
}
}
負載均衡器SelectorThreadLoadBalancer對象部分關鍵代碼:
protected SelectorThreadLoadBalancer createSelectorThreadLoadBalancer(Collection<? extends SelectorThread> threads) {
return new SelectorThreadLoadBalancer(threads);
}
/**
* A round robin load balancer for choosing selector threads for new
* connections.
*/
protected static class SelectorThreadLoadBalancer {
private final Collection<? extends SelectorThread> threads;
private Iterator<? extends SelectorThread> nextThreadIterator;
public <T extends SelectorThread> SelectorThreadLoadBalancer(Collection<T> threads) {
if (threads.isEmpty()) {
throw new IllegalArgumentException("At least one selector thread is required");
}
this.threads = Collections.unmodifiableList(new ArrayList<T>(threads));
nextThreadIterator = this.threads.iterator();
}
//根據循環負載均衡策略獲取一個SelectorThread
public SelectorThread nextThread() {
// Choose a selector thread (round robin)
if (!nextThreadIterator.hasNext()) {
nextThreadIterator = threads.iterator();
}
return nextThreadIterator.next();
}
}
從SelectorThread線程中,監聽到有業務socket中有調用請求,轉到業務工作線程池關鍵代碼
private void handleAccept() {
final TNonblockingTransport client = doAccept();//取得客戶端的連接
if (client != null) {
// Pass this connection to a selector thread
final SelectorThread targetThread = threadChooser.nextThread();//獲取目標SelectorThread
if (args.acceptPolicy == Args.AcceptPolicy.FAST_ACCEPT || invoker == null) {
doAddAccept(targetThread, client);
} else {
// FAIR_ACCEPT
try {
invoker.submit(new Runnable() {// 提交client的業務給到工作線程
public void run() {
doAddAccept(targetThread, client);
}
});
} catch (RejectedExecutionException rx) {
LOGGER.warn("ExecutorService rejected accept registration!", rx);
// close immediately
client.close();
}
}
}
}
demo地址:
碼雲:http://git.oschina.net/shunyang/thrift-all/tree/master/thrift-demo
github:https://github.com/shunyang/thrift-all/tree/master/thrift-demo
本文參考文章: