在學校期間大家都寫過不少程序,比如寫個hello world服務類,然后本地調用下,如下所示。這些程序的特點是服務消費方和服務提供方是本地調用關系。
1
2
3
4
5
6
|
public
class
Test {
public
static
void
main(String[] args) {
HelloWorldService helloWorldService =
new
HelloWorldServiceImpl();
helloWorldService.sayHello(
"test"
);
}
}
|
而一旦踏入公司尤其是大型互聯網公司就會發現,公司的系統都由成千上萬大大小小的服務組成,各服務部署在不同的機器上,由不同的團隊負責。
這時就會遇到兩個問題:
- 要搭建一個新服務,免不了需要依賴他人的服務,而現在他人的服務都在遠端,怎么調用?
- 其它團隊要使用我們的新服務,我們的服務該怎么發布以便他人調用?下文將對這兩個問題展開探討。
1. 如何調用他人的遠程服務?
由於各服務部署在不同機器,服務間的調用免不了網絡通信過程,服務消費方每調用一個服務都要寫一坨網絡通信相關的代碼,不僅復雜而且極易出錯。
如果有一種方式能讓我們像調用本地服務一樣調用遠程服務,而讓調用者對網絡通信這些細節透明,那么將大大提高生產力,比如服務消費方在執行helloWorldService.sayHello(“test”)時,實質上調用的是遠端的服務。這種方式其實就是RPC(Remote Procedure Call Protocol),在各大互聯網公司中被廣泛使用,如阿里巴巴的hsf、dubbo(開源)、Facebook的thrift(開源)、Google grpc(開源)、Twitter的finagle(開源)等。
要讓網絡通信細節對使用者透明,我們需要對通信細節進行封裝,我們先看下一個RPC調用的流程涉及到哪些通信細節:
- 服務消費方(client)調用以本地調用方式調用服務;
- client stub接收到調用后負責將方法、參數等組裝成能夠進行網絡傳輸的消息體;
- client stub找到服務地址,並將消息發送到服務端;
- server stub收到消息后進行解碼;
- server stub根據解碼結果調用本地的服務;
- 本地服務執行並將結果返回給server stub;
- server stub將返回結果打包成消息並發送至消費方;
- client stub接收到消息,並進行解碼;
- 服務消費方得到最終結果。
RPC的目標就是要2~8這些步驟都封裝起來,讓用戶對這些細節透明。
1.1 怎么做到透明化遠程服務調用?
怎么封裝通信細節才能讓用戶像以本地調用方式調用遠程服務呢?對java來說就是使用代理!java代理有兩種方式:
- jdk 動態代理
- 字節碼生成
盡管字節碼生成方式實現的代理更為強大和高效,但代碼維護不易,大部分公司實現RPC框架時還是選擇動態代理方式。
下面簡單介紹下動態代理怎么實現我們的需求。我們需要實現RPCProxyClient代理類,代理類的invoke方法中封裝了與遠端服務通信的細節,消費方首先從RPCProxyClient獲得服務提供方的接口,當執行helloWorldService.sayHello(“test”)方法時就會調用invoke方法。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
public
class
RPCProxyClient
implements
java.lang.reflect.InvocationHandler{
private
Object obj;
public
RPCProxyClient(Object obj){
this
.obj=obj;
}
/**
* 得到被代理對象;
*/
public
static
Object getProxy(Object obj){
return
java.lang.reflect.Proxy.newProxyInstance(obj.getClass().getClassLoader(),
obj.getClass().getInterfaces(),
new
RPCProxyClient(obj));
}
/**
* 調用此方法執行
*/
public
Object invoke(Object proxy, Method method, Object[] args)
throws
Throwable {
//結果參數;
Object result =
new
Object();
// ...執行通信相關邏輯
// ...
return
result;
}
}
|
1
2
3
4
5
6
|
public
class
Test {
public
static
void
main(String[] args) {
HelloWorldService helloWorldService = (HelloWorldService)RPCProxyClient.getProxy(HelloWorldService.
class
);
helloWorldService.sayHello(
"test"
);
}
}
|
1.2 怎么對消息進行編碼和解碼?
1.2.1 確定消息數據結構
上節講了invoke里需要封裝通信細節(通信細節再后面幾章詳細探討),而通信的第一步就是要確定客戶端和服務端相互通信的消息結構。客戶端的請求消息結構一般需要包括以下內容:
1)接口名稱
在我們的例子里接口名是“HelloWorldService”,如果不傳,服務端就不知道調用哪個接口了;
2)方法名
一個接口內可能有很多方法,如果不傳方法名服務端也就不知道調用哪個方法;
3)參數類型&參數值
參數類型有很多,比如有bool、int、long、double、string、map、list,甚至如struct(class);以及相應的參數值;
4)超時時間
5)requestID,標識唯一請求id,在下面一節會詳細描述requestID的用處。
同理服務端返回的消息結構一般包括以下內容。
1)返回值
2)狀態code
3)requestID
1.2.2 序列化
一旦確定了消息的數據結構后,下一步就是要考慮序列化與反序列化了。
什么是序列化?序列化就是將數據結構或對象轉換成二進制串的過程,也就是編碼的過程。
什么是反序列化?將在序列化過程中所生成的二進制串轉換成數據結構或者對象的過程。
為什么需要序列化?轉換為二進制串后才好進行網絡傳輸嘛!
為什么需要反序列化?將二進制轉換為對象才好進行后續處理!
現如今序列化的方案越來越多,每種序列化方案都有優點和缺點,它們在設計之初有自己獨特的應用場景,那到底選擇哪種呢?從RPC的角度上看,主要看三點:
- 通用性,比如是否能支持Map等復雜的數據結構;
- 性能,包括時間復雜度和空間復雜度,由於RPC框架將會被公司幾乎所有服務使用,如果序列化上能節約一點時間,對整個公司的收益都將非常可觀,同理如果序列化上能節約一點內存,網絡帶寬也能省下不少;
- 可擴展性,對互聯網公司而言,業務變化飛快,如果序列化協議具有良好的可擴展性,支持自動增加新的業務字段,而不影響老的服務,這將大大提供系統的靈活度。
目前互聯網公司廣泛使用Protobuf、Thrift、Avro等成熟的序列化解決方案來搭建RPC框架,這些都是久經考驗的解決方案。
1.3 通信
消息數據結構被序列化為二進制串后,下一步就要進行網絡通信了。目前有兩種常用IO通信模型:1)BIO;2)NIO。一般RPC框架需要支持這兩種IO模型。
如何實現RPC的IO通信框架呢?
- 使用java nio方式自研,這種方式較為復雜,而且很有可能出現隱藏bug,但也見過一些互聯網公司使用這種方式;
- 基於mina,mina在早幾年比較火熱,不過這些年版本更新緩慢;
- 基於netty,現在很多RPC框架都直接基於netty這一IO通信框架,省力又省心,比如阿里巴巴的HSF、dubbo,Twitter的finagle等。
1.4 消息里為什么要有requestID?
如果使用netty的話,一般會用channel.writeAndFlush()方法來發送消息二進制串,這個方法調用后對於整個遠程調用(從發出請求到接收到結果)來說是一個異步的,即對於當前線程來說,將請求發送出來后,線程就可以往后執行了,至於服務端的結果,是服務端處理完成后,再以消息的形式發送給客戶端的。於是這里出現以下兩個問題:
- 怎么讓當前線程“暫停”,等結果回來后,再向后執行?
- 如果有多個線程同時進行遠程方法調用,這時建立在client server之間的socket連接上會有很多雙方發送的消息傳遞,前后順序也可能是隨機的,server處理完結果后,將結果消息發送給client,client收到很多消息,怎么知道哪個消息結果是原先哪個線程調用的?
如下圖所示,線程A和線程B同時向client socket發送請求requestA和requestB,socket先后將requestB和requestA發送至server,而server可能將responseA先返回,盡管requestA請求到達時間更晚。我們需要一種機制保證responseA丟給ThreadA,responseB丟給ThreadB。
怎么解決呢?
- client線程每次通過socket調用一次遠程接口前,生成一個唯一的ID,即requestID(requestID必需保證在一個Socket連接里面是唯一的),一般常常使用AtomicLong從0開始累計數字生成唯一ID;
- 將處理結果的回調對象callback,存放到全局ConcurrentHashMap里面put(requestID, callback);
- 當線程調用channel.writeAndFlush()發送消息后,緊接着執行callback的get()方法試圖獲取遠程返回的結果。在get()內部,則使用synchronized獲取回調對象callback的鎖,再先檢測是否已經獲取到結果,如果沒有,然后調用callback的wait()方法,釋放callback上的鎖,讓當前線程處於等待狀態。
- 服務端接收到請求並處理后,將response結果(此結果中包含了前面的requestID)發送給客戶端,客戶端socket連接上專門監聽消息的線程收到消息,分析結果,取到requestID,再從前面的ConcurrentHashMap里面get(requestID),從而找到callback對象,再用synchronized獲取callback上的鎖,將方法調用結果設置到callback對象里,再調用callback.notifyAll()喚醒前面處於等待狀態的線程。
1
2
3
4
5
6
7
|
public
Object get() {
synchronized
(
this
) {
// 旋鎖
while
(!isDone) {
// 是否有結果了
wait();
//沒結果是釋放鎖,讓當前線程處於等待狀態
}
}
}
|
1
2
3
4
5
6
7
|
private
void
setDone(Response res) {
this
.res = res;
isDone =
true
;
synchronized
(
this
) {
//獲取鎖,因為前面wait()已經釋放了callback的鎖了
notifyAll();
// 喚醒處於等待的線程
}
}
|
2 如何發布自己的服務?
如何讓別人使用我們的服務呢?有同學說很簡單嘛,告訴使用者服務的IP以及端口就可以了啊。確實是這樣,這里問題的關鍵在於是自動告知還是人肉告知。
人肉告知的方式:如果你發現你的服務一台機器不夠,要再添加一台,這個時候就要告訴調用者我現在有兩個ip了,你們要輪詢調用來實現負載均衡;調用者咬咬牙改了,結果某天一台機器掛了,調用者發現服務有一半不可用,他又只能手動修改代碼來刪除掛掉那台機器的ip。現實生產環境當然不會使用人肉方式。
有沒有一種方法能實現自動告知,即機器的增添、剔除對調用方透明,調用者不再需要寫死服務提供方地址?當然可以,現如今zookeeper被廣泛用於實現服務自動注冊與發現功能!
簡單來講,zookeeper可以充當一個服務注冊表
(Service Registry),讓多個服務提供者
形成一個集群,讓服務消費者
通過服務注冊表獲取具體的服務訪問地址(ip+端口)去訪問具體的服務提供者。如下圖所示:
具體來說,zookeeper就是個分布式文件系統,每當一個服務提供者部署后都要將自己的服務注冊到zookeeper的某一路徑上: /{service}/{version}/{ip:port}, 比如我們的HelloWorldService部署到兩台機器,那么zookeeper上就會創建兩條目錄:分別為/HelloWorldService/1.0.0/100.19.20.01:16888 /HelloWorldService/1.0.0/100.19.20.02:16888。
zookeeper提供了“心跳檢測”功能,它會定時向各個服務提供者發送一個請求(實際上建立的是一個 Socket 長連接),如果長期沒有響應,服務中心就認為該服務提供者已經“掛了”,並將其剔除,比如100.19.20.02這台機器如果宕機了,那么zookeeper上的路徑就會只剩/HelloWorldService/1.0.0/100.19.20.01:16888。
服務消費者會去監聽相應路徑(/HelloWorldService/1.0.0),一旦路徑上的數據有任務變化(增加或減少),zookeeper都會通知服務消費方服務提供者地址列表已經發生改變,從而進行更新。
更為重要的是zookeeper與生俱來的容錯容災能力(比如leader選舉),可以確保服務注冊表的高可用性。
3.Hadoop中RPC實例分析
ipc.RPC類中有一些內部類,為了大家對RPC類有個初步的印象,就先羅列幾個我們感興趣的分析一下吧:
Invocation :用於封裝方法名和參數,作為數據傳輸層。
ClientCache :用於存儲client對象,用socket factory作為hash key,存儲結構為hashMap <SocketFactory, Client>。
Invoker :是動態代理中的調用實現類,繼承了InvocationHandler.
Server :是ipc.Server的實現類。
1
2
3
4
5
6
7
8
|
public
Object invoke(Object proxy, Method method, Object[] args)
throws
Throwable {
•••
ObjectWritable value = (ObjectWritable)
client.call(
new
Invocation(method, args), remoteId);
•••
return
value.get();
}
|
如果你發現這個invoke()方法實現的有些奇怪的話,那你就對了。一般我們看到的動態代理的invoke()方法中總會有 method.invoke(ac, arg); 這句代碼。而上面代碼中卻沒有,這是為什么呢?其實使用 method.invoke(ac, arg); 是在本地JVM中調用;而在hadoop中,是將數據發送給服務端,服務端將處理的結果再返回給客戶端,所以這里的invoke()方法必然需要進行網絡通信。而網絡通信就是下面的這段代碼實現的:
1
2
|
ObjectWritable value = (ObjectWritable)
client.call(
new
Invocation(method, args), remoteId);
|
Invocation類在這里封裝了方法名和參數。其實這里網絡通信只是調用了Client類的call()方法。那我們接下來分析一下ipc.Client源碼吧。和第一章一樣,同樣是3個問題
- 客戶端和服務端的連接是怎樣建立的?
- 客戶端是怎樣給服務端發送數據的?
- 客戶端是怎樣獲取服務端的返回數據的?
3.1 客戶端和服務端的連接是怎樣建立的?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
|
public
Writable call(Writable param, ConnectionId remoteId)
throws
InterruptedException, IOException {
Call call =
new
Call(param);
//將傳入的數據封裝成call對象
Connection connection = getConnection(remoteId, call);
//獲得一個連接
connection.sendParam(call);
// 向服務端發送call對象
boolean
interrupted =
false
;
synchronized
(call) {
while
(!call.done) {
try
{
call.wait();
// 等待結果的返回,在Call類的callComplete()方法里有notify()方法用於喚醒線程
}
catch
(InterruptedException ie) {
// 因中斷異常而終止,設置標志interrupted為true
interrupted =
true
;
}
}
if
(interrupted) {
Thread.currentThread().interrupt();
}
if
(call.error !=
null
) {
if
(call.error
instanceof
RemoteException) {
call.error.fillInStackTrace();
throw
call.error;
}
else
{
// 本地異常
throw
wrapException(remoteId.getAddress(), call.error);
}
}
else
{
return
call.value;
//返回結果數據
}
}
}
|
具體代碼的作用我已做了注釋,所以這里不再贅述。但到目前為止,你依然不知道RPC機制底層的網絡連接是怎么建立的。分析代碼后,我們會發現和網絡通信有關的代碼只會是下面的兩句了:
1
2
|
Connection connection = getConnection(remoteId, call);
//獲得一個連接
connection.sendParam(call);
// 向服務端發送call對象
|
先看看是怎么獲得一個到服務端的連接吧,下面貼出ipc.Client類中的getConnection()方法。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
private
Connection getConnection(ConnectionId remoteId,
Call call)
throws
IOException, InterruptedException {
if
(!running.get()) {
// 如果client關閉了
throw
new
IOException(
"The client is stopped"
);
}
Connection connection;
//如果connections連接池中有對應的連接對象,就不需重新創建了;如果沒有就需重新創建一個連接對象。
//但請注意,該//連接對象只是存儲了remoteId的信息,其實還並沒有和服務端建立連接。
do
{
synchronized
(connections) {
connection = connections.get(remoteId);
if
(connection ==
null
) {
connection =
new
Connection(remoteId);
connections.put(remoteId, connection);
}
}
}
while
(!connection.addCall(call));
//將call對象放入對應連接中的calls池,就不貼出源碼了
//這句代碼才是真正的完成了和服務端建立連接哦~
connection.setupIOstreams();
return
connection;
}
|
下面貼出Client.Connection類中的setupIOstreams()方法:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
private
synchronized
void
setupIOstreams()
throws
InterruptedException {
•••
try
{
•••
while
(
true
) {
setupConnection();
//建立連接
InputStream inStream = NetUtils.getInputStream(socket);
//獲得輸入流
OutputStream outStream = NetUtils.getOutputStream(socket);
//獲得輸出流
writeRpcHeader(outStream);
•••
this
.in =
new
DataInputStream(
new
BufferedInputStream
(
new
PingInputStream(inStream)));
//將輸入流裝飾成DataInputStream
this
.out =
new
DataOutputStream
(
new
BufferedOutputStream(outStream));
//將輸出流裝飾成DataOutputStream
writeHeader();
// 跟新活動時間
touch();
//當連接建立時,啟動接受線程等待服務端傳回數據,注意:Connection繼承了Tread
start();
return
;
}
}
catch
(IOException e) {
markClosed(e);
close();
}
}
|
再有一步我們就知道客戶端的連接是怎么建立的啦,下面貼出Client.Connection類中的setupConnection()方法:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
private
synchronized
void
setupConnection()
throws
IOException {
short
ioFailures =
0
;
short
timeoutFailures =
0
;
while
(
true
) {
try
{
this
.socket = socketFactory.createSocket();
//終於看到創建socket的方法了
this
.socket.setTcpNoDelay(tcpNoDelay);
•••
// 設置連接超時為20s
NetUtils.connect(
this
.socket, remoteId.getAddress(),
20000
);
this
.socket.setSoTimeout(pingInterval);
return
;
}
catch
(SocketTimeoutException toe) {
/* 設置最多連接重試為45次。
* 總共有20s*45 = 15 分鍾的重試時間。
*/
handleConnectionFailure(timeoutFailures++,
45
, toe);
}
catch
(IOException ie) {
handleConnectionFailure(ioFailures++, maxRetries, ie);
}
}
}
|
終於,我們知道了客戶端的連接是怎樣建立的了,其實就是創建一個普通的socket進行通信。
3.2 客戶端是怎樣給服務端發送數據的?
下面貼出Client.Connection類的sendParam()方法吧:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
public
void
sendParam(Call call) {
if
(shouldCloseConnection.get()) {
return
;
}
DataOutputBuffer d=
null
;
try
{
synchronized
(
this
.out) {
if
(LOG.isDebugEnabled())
LOG.debug(getName() +
" sending #"
+ call.id);
//創建一個緩沖區
d =
new
DataOutputBuffer();
d.writeInt(call.id);
call.param.write(d);
byte
[] data = d.getData();
int
dataLength = d.getLength();
out.writeInt(dataLength);
//首先寫出數據的長度
out.write(data,
0
, dataLength);
//向服務端寫數據
out.flush();
}
}
catch
(IOException e) {
markClosed(e);
}
finally
{
IOUtils.closeStream(d);
}
}
|
3.3 客戶端是怎樣獲取服務端的返回數據的?
下面貼出Client.Connection類和Client.Call類中的相關方法:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
|
方法一:
public
void
run() {
•••
while
(waitForWork()) {
receiveResponse();
//具體的處理方法
}
close();
•••
}
方法二:
private
void
receiveResponse() {
if
(shouldCloseConnection.get()) {
return
;
}
touch();
try
{
int
id = in.readInt();
// 阻塞讀取id
if
(LOG.isDebugEnabled())
LOG.debug(getName() +
" got value #"
+ id);
Call call = calls.get(id);
//在calls池中找到發送時的那個對象
int
state = in.readInt();
// 阻塞讀取call對象的狀態
if
(state == Status.SUCCESS.state) {
Writable value = ReflectionUtils.newInstance(valueClass, conf);
value.readFields(in);
// 讀取數據
//將讀取到的值賦給call對象,同時喚醒Client等待線程,貼出setValue()代碼方法三
call.setValue(value);
calls.remove(id);
//刪除已處理的call
}
else
if
(state == Status.ERROR.state) {
•••
}
else
if
(state == Status.FATAL.state) {
•••
}
}
catch
(IOException e) {
markClosed(e);
}
}
方法三:
public
synchronized
void
setValue(Writable value) {
this
.value = value;
callComplete();
//具體實現
}
protected
synchronized
void
callComplete() {
this
.done =
true
;
notify();
// 喚醒client等待線程
}
|
完成的功能主要是:啟動一個處理線程,讀取從服務端傳來的call對象,將call對象讀取完畢后,喚醒client處理線程。就這么簡單,客戶端就獲取了服務端返回的數據了哦~。客戶端的源碼分析就到這里了哦,下面我們來分析Server端的源碼吧。
3.4 ipc.Server源碼分析
為了讓大家對ipc.Server有個初步的了解,我們先分析一下它的幾個內部類吧:
Call :用於存儲客戶端發來的請求
Listener : 監聽類,用於監聽客戶端發來的請求,同時Listener內部還有一個靜態類,Listener.Reader,當監聽器監聽到用戶請求,便讓Reader讀取用戶請求。
Responder :響應RPC請求類,請求處理完畢,由Responder發送給請求客戶端。
Connection :連接類,真正的客戶端請求讀取邏輯在這個類中。
Handler :請求處理類,會循環阻塞讀取callQueue中的call對象,並對其進行操作。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
private
void
initialize(Configuration conf)
throws
IOException {
•••
// 創建 rpc server
InetSocketAddress dnSocketAddr = getServiceRpcServerAddress(conf);
if
(dnSocketAddr !=
null
) {
int
serviceHandlerCount =
conf.getInt(DFSConfigKeys.DFS_NAMENODE_SERVICE_HANDLER_COUNT_KEY,
DFSConfigKeys.DFS_NAMENODE_SERVICE_HANDLER_COUNT_DEFAULT);
//獲得serviceRpcServer
this
.serviceRpcServer = RPC.getServer(
this
, dnSocketAddr.getHostName(),
dnSocketAddr.getPort(), serviceHandlerCount,
false
, conf, namesystem.getDelegationTokenSecretManager());
this
.serviceRPCAddress =
this
.serviceRpcServer.getListenerAddress();
setRpcServiceServerAddress(conf);
}
//獲得server
this
.server = RPC.getServer(
this
, socAddr.getHostName(),
socAddr.getPort(), handlerCount,
false
, conf, namesystem
.getDelegationTokenSecretManager());
•••
this
.server.start();
//啟動 RPC server Clients只允許連接該server
if
(serviceRpcServer !=
null
) {
serviceRpcServer.start();
//啟動 RPC serviceRpcServer 為HDFS服務的server
}
startTrashEmptier(conf);
}
|
查看Namenode初始化源碼得知:RPC的server對象是通過ipc.RPC類的getServer()方法獲得的。下面咱們去看看ipc.RPC類中的getServer()源碼吧:
1
2
3
4
5
6
7
|
public
static
Server getServer(
final
Object instance,
final
String bindAddress,
final
int
port,
final
int
numHandlers,
final
boolean
verbose, Configuration conf,
SecretManager<?
extends
TokenIdentifier> secretManager)
throws
IOException {
return
new
Server(instance, conf, bindAddress, port, numHandlers, verbose, secretManager);
}
|
這時我們發現getServer()是一個創建Server對象的工廠方法,但創建的卻是RPC.Server類的對象。哈哈,現在你明白了我前面說的“RPC.Server是ipc.Server的實現類”了吧。不過RPC.Server的構造函數還是調用了ipc.Server類的構造函數的,因篇幅所限,就不貼出相關源碼了。
初始化Server后,Server端就運行起來了,看看ipc.Server的start()源碼吧:
1
2
3
4
5
6
7
8
9
10
11
|
/** 啟動服務 */
public
synchronized
void
start() {
responder.start();
//啟動responder
listener.start();
//啟動listener
handlers =
new
Handler[handlerCount];
for
(
int
i =
0
; i < handlerCount; i++) {
handlers[i] =
new
Handler(i);
handlers[i].start();
//逐個啟動Handler
}
}
|
分析過ipc.Client源碼后,我們知道Client端的底層通信直接采用了阻塞式IO編程,當時我們曾做出猜測:Server端是不是也采用了阻塞式IO。現在我們仔細地分析一下吧,如果Server端也采用阻塞式IO,當連接進來的Client端很多時,勢必會影響Server端的性能。hadoop的實現者們考慮到了這點,所以他們采用了java NIO來實現Server端,那Server端采用java NIO是怎么建立連接的呢?分析源碼得知,Server端采用Listener監聽客戶端的連接,下面先分析一下Listener的構造函數吧:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
public
Listener()
throws
IOException {
address =
new
InetSocketAddress(bindAddress, port);
// 創建ServerSocketChannel,並設置成非阻塞式
acceptChannel = ServerSocketChannel.open();
acceptChannel.configureBlocking(
false
);
// 將server socket綁定到本地端口
bind(acceptChannel.socket(), address, backlogLength);
port = acceptChannel.socket().getLocalPort();
// 獲得一個selector
selector= Selector.open();
readers =
new
Reader[readThreads];
readPool = Executors.newFixedThreadPool(readThreads);
//啟動多個reader線程,為了防止請求多時服務端響應延時的問題
for
(
int
i =
0
; i < readThreads; i++) {
Selector readSelector = Selector.open();
Reader reader =
new
Reader(readSelector);
readers[i] = reader;
readPool.execute(reader);
}
// 注冊連接事件
acceptChannel.register(selector, SelectionKey.OP_ACCEPT);
this
.setName(
"IPC Server listener on "
+ port);
this
.setDaemon(
true
);
}
|
在啟動Listener線程時,服務端會一直等待客戶端的連接,下面貼出Server.Listener類的run()方法:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
public
void
run() {
•••
while
(running) {
SelectionKey key =
null
;
try
{
selector.select();
Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
while
(iter.hasNext()) {
key = iter.next();
iter.remove();
try
{
if
(key.isValid()) {
if
(key.isAcceptable())
doAccept(key);
//具體的連接方法
}
}
catch
(IOException e) {
}
key =
null
;
}
}
catch
(OutOfMemoryError e) {
•••
}
|
下面貼出Server.Listener類中doAccept()方法中的關鍵源碼吧:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
void
doAccept(SelectionKey key)
throws
IOException, OutOfMemoryError {
Connection c =
null
;
ServerSocketChannel server = (ServerSocketChannel) key.channel();
SocketChannel channel;
while
((channel = server.accept()) !=
null
) {
//建立連接
channel.configureBlocking(
false
);
channel.socket().setTcpNoDelay(tcpNoDelay);
Reader reader = getReader();
//從readers池中獲得一個reader
try
{
reader.startAdd();
// 激活readSelector,設置adding為true
SelectionKey readKey = reader.registerChannel(channel);
//將讀事件設置成興趣事件
c =
new
Connection(readKey, channel, System.currentTimeMillis());
//創建一個連接對象
readKey.attach(c);
//將connection對象注入readKey
synchronized
(connectionList) {
connectionList.add(numConnections, c);
numConnections++;
}
•••
}
finally
{
//設置adding為false,采用notify()喚醒一個reader,其實代碼十三中啟動的每個reader都使
//用了wait()方法等待。因篇幅有限,就不貼出源碼了。
reader.finishAdd();
}
}
}
|
當reader被喚醒,reader接着執行doRead()方法。
下面貼出Server.Listener.Reader類中的doRead()方法和Server.Connection類中的readAndProcess()方法源碼:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
|
方法一:
void
doRead(SelectionKey key)
throws
InterruptedException {
int
count =
0
;
Connection c = (Connection)key.attachment();
//獲得connection對象
if
(c ==
null
) {
return
;
}
c.setLastContact(System.currentTimeMillis());
try
{
count = c.readAndProcess();
// 接受並處理請求
}
catch
(InterruptedException ieo) {
•••
}
•••
}
方法二:
public
int
readAndProcess()
throws
IOException, InterruptedException {
while
(
true
) {
•••
if
(!rpcHeaderRead) {
if
(rpcHeaderBuffer ==
null
) {
rpcHeaderBuffer = ByteBuffer.allocate(
2
);
}
//讀取請求頭
count = channelRead(channel, rpcHeaderBuffer);
if
(count <
0
|| rpcHeaderBuffer.remaining() >
0
) {
return
count;
}
// 讀取請求版本號
int
version = rpcHeaderBuffer.get(
0
);
byte
[] method =
new
byte
[] {rpcHeaderBuffer.get(
1
)};
•••
data = ByteBuffer.allocate(dataLength);
}
// 讀取請求
count = channelRead(channel, data);
if
(data.remaining() ==
0
) {
•••
if
(useSasl) {
•••
}
else
{
processOneRpc(data.array());
//處理請求
}
•••
}
}
return
count;
}
}
|
下面貼出Server.Connection類中的processOneRpc()方法和processData()方法的源碼。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
方法一:
private
void
processOneRpc(
byte
[] buf)
throws
IOException,
InterruptedException {
if
(headerRead) {
processData(buf);
}
else
{
processHeader(buf);
headerRead =
true
;
if
(!authorizeConnection()) {
throw
new
AccessControlException(
"Connection from "
+
this
+
" for protocol "
+ header.getProtocol()
+
" is unauthorized for user "
+ user);
}
}
}
方法二:
private
void
processData(
byte
[] buf)
throws
IOException, InterruptedException {
DataInputStream dis =
new
DataInputStream(
new
ByteArrayInputStream(buf));
int
id = dis.readInt();
// 嘗試讀取id
Writable param = ReflectionUtils.newInstance(paramClass, conf);
//讀取參數
param.readFields(dis);
Call call =
new
Call(id, param,
this
);
//封裝成call
callQueue.put(call);
// 將call存入callQueue
incRpcCount();
// 增加rpc請求的計數
}
|
4. RPC與web service
RPC:
Web service
web service接口就是RPC中的stub組件,規定了server能夠提供的服務(web service),這在server和client上是一致的,但是也是跨語言跨平台的。同時,由於web service規范中的WSDL文件的存在,現在各平台的web service框架,都可以基於WSDL文件,自動生成web service接口 。
其實兩者差不多,只是傳輸的協議不同。
(轉載:http://www.importnew.com/22003.html)
Reference:
1. http://www.cnblogs.com/LBSer/p/4853234.html
2. http://weixiaolu.iteye.com/blog/1504898
3. http://kyfxbl.iteye.com/blog/1745550