關鍵字:區塊鏈 可靠信道 BFT-SMaRt Socket SSL/TLS 網絡通信
信道的可靠是BFT的前提。(參見兩軍問題)
本文通過跟蹤BFT-SMaRt通信層源碼,研究節點間可靠信道的實現原理。本文涉及區塊鏈方面的內容較少,重點研究使用Java語言建立可靠網絡通道的技術,請選擇性閱讀。
通信層系統,是分布式網絡中獲得可靠且認證的點對點通道的保證。BFT-SMaRt的安全通信是基於SSL/TLS標准。
- 節點之間建立互為信任的Socket IO連接,實現點對點的消息處理。
- 節點與客戶端之間建立健壯性、可用性更高的Netty NIO連接,實現大規模的消息處理。
本文主要介紹第一種情況:在BFT-SMaRt中,作為服務端的節點之間的連接構建方法。
一、引子
接上一篇 BFT-SMaRt的理論與實踐 ,啟動分布式計數器服務示例程序時,需輸入命令:
runscripts/smartrun.sh bftsmart.demo.counter.CounterServer 0
命令調用的是CounterServer類的內容,先查看CounterServer的類結構。

接着再看一下CounterServer的類圖。

通過CounterServer的類圖可以清晰地展示它的關系結構。其父類DefaultSingleRecoverable實現了Recoverable和SingleExecutable接口,而SingleExecutable繼承了Executable。追本溯源,根部是以下兩個接口:
- Recoverable:恢復程序,實現此接口的類應該實現狀態轉移協議。通常,類應該同時實現這個接口和一個Executable。
- Executable:執行程序,實現此接口,可接收無序的客戶端請求。如果要支持有序的請求,可以選擇其子類接口FIFOExecutable、BatchExecutable或SingleExecutable。
回到CounterServer的源碼,它是所有官方示例中架構最簡單的,這會提高我們的學習效率。CounterServer有兩個類屬性:
- counter,計數字段,保存計數器的狀態值。
- iterations,操作次數,日志記錄以及數據恢復時使用。
下面開始項目調試,我們為命令手動配置添加請求參數“0”作為節點id,然后啟動命令進入CounterServer的main入口函數:
public static void main(String[] args){
if(args.length < 1) {
System.out.println("Use: java CounterServer <processId>");
System.exit(-1);
}
new CounterServer(Integer.parseInt(args[0]));
}
進來先校驗參數個數,然后調用CounterServer構造函數。函數內創建了一個ServiceReplica對象。
public CounterServer(int id) {
new ServiceReplica(id, this, this);
}
傳入的第一個參數是命令帶入的唯一參數,即節點id。后面兩個參數的值都是this,是將CounterServer分別作為執行程序和恢復程序。
二、名詞統一
在本文的研究中,會涉及到一些由本系統提出,並且非常重要的名詞概念。為避免后續發生同一件事的稱呼混亂,造成困擾,在這里統一聲明。
1. 節點id
replicaId、processId、remoteId、TTPid指的都是節點id,但包含以下幾種情況:
- replicaId:節點作為一個副本的時候。
- processId:一個處理單元作為節點的時候。
- remoteId:外部的節點id。
- TTPid:設置的TTP節點的id。
2. 節點
Replica是分布式系統中的副本,在區塊鏈網絡中代表一個服務節點,節點不一定是一台機器,也可能是一個處理單元,下面統一稱作節點。注意,所有節點都是用一套代碼編譯部署的環境。
3. 本地節點
本文依據命令runscripts/smartrun.sh bftsmart.demo.counter.CounterServer 0,因此本地節點都指的是CounterServer,id為0的節點。本文只研究本地作為節點的情況。而本地作為客戶端的情況(CounterClient作為入口),在后續文章介紹。
4. 配置域
組網配置文件host.config所描述的,由確定數量且順序編號的節點所組成的網絡,我們可以稱之為配置域。下面就是示例節點的host.config的內容。
#server id, address and port (the ids from 0 to n-1 are the service replicas)
0 127.0.0.1 11000 11001
1 127.0.0.1 11010 11011
2 127.0.0.1 11020 11021
3 127.0.0.1 11030 11031
5. TTP
在system.config系統配置文件中有相關配置項:
system.ttp.id
該配置項的值是一個節點id,所以只能配置一個,我們稱之為TTPid。一般是在配置域外,可用作向系統添加和刪除節點。注意,根據約定,TTPid一定大於配置域任意id。
6. 陌生域
那么如果一個節點id既不屬於配置域,又不是ttpid,我們稱之為陌生id。除配置域和ttp以外的所有空間,我們稱之為陌生域。陌生id的加入,在主流區塊鏈產品例如比特幣等,都會有完整的解決方案。BFT-SMaRt的分布式網絡中除了啟動時的配置域和TTP,也允許陌生id的接入,后面會有相關介紹。換句話講,P2P網絡最精彩的部分就是與陌生域的自由聯系。當然了,如果是成熟的聯盟鏈產品,會通過權限控制管理配置域、TTP和陌生域。
三、節點服務類
進入ServiceReplica的構造函數就意味着離開了示例程序,深入到了BFT-SMaRt標准庫內容。ServiceReplica類可以被稱為本地節點服務類,主要用作管理本地作為節點的基礎服務,包括網絡通信和節點間消息共識。這個類從DeliveryThread接收消息,並管理應用程序的執行和對客戶端的回復。對於順序消息逐個執行的應用程序,ServiceReplica接收一致決定的批處理,逐個交付,並使用批處理回復。在應用程序成批執行消息的情況下,該批消息被交付給應用程序,ServiceReplica不需要成批組織應答。
/**
* Constructor
*
* @param id 節點(副本)ID
* @param configHome 配置文件
* @param executor 執行器
* @param recoverer 恢復器
* @param verifier 請求校驗器
* @param replier 請求回復器
* @param loader 加載簽名器
*/
public ServiceReplica(int id, String configHome, Executable executor, Recoverable recoverer, RequestVerifier verifier, Replier replier, KeyLoader loader) {
this.id = id;
// 讀取配置文件,構建配置域視圖實例。
this.SVController = new ServerViewController(id, configHome, loader); // NEXT TODO
this.executor = executor; // 傳入執行程序
this.recoverer = recoverer; // 傳入恢復程序
this.replier = (replier != null ? replier : new DefaultReplier()); // 回復的包裝類
this.verifier = verifier; // null,判斷請求有效性,只是true/flase,未展開有效判斷的邏輯。
this.init(); // 節點初始化(重點)
// 節點環境,上下文內容,屬共識層內容,在恢復程序和回復消息時都被需要。
this.recoverer.setReplicaContext(replicaCtx); // NEXT TODO
this.replier.setReplicaContext(replicaCtx); // NEXT TODO
}
回復器接口Replier只有一個實現類DefaultReplier。它的功能就是在正常的回復開始前,保證對節點上下文ReplicaContext的校驗。當ReplicaContext為空時,會掛起等待,直到有值時,才會走正常的回復。 ServerViewController是共識層的內容,本篇不展開。接下來,進入執行init初始化函數。
private void init() {
try {
cs = new ServerCommunicationSystem(this.SVController, this); // 創建本地節點通信系統
} catch (Exception ex) {
logger.error("Failed to initialize replica-to-replica communication system", ex);
throw new RuntimeException("Unable to build a communication system.");
}
if (this.SVController.isInCurrentView()) {
logger.info("In current view: " + this.SVController.getCurrentView());
initTOMLayer(); // NEXT TODO:初始化共識協議層,BFT-SMaRt的要達成的共識對象是TOM,即total ordered message,追求的是所有節點上面的全部消息有序且一致。
} else { // 該節點不屬於當前配置域
logger.info("Not in current view: " + this.SVController.getCurrentView());
logger.info("Waiting for the TTP: " + this.SVController.getCurrentView());
waitTTPJoinMsgLock.lock();
try {
// 等待“TTP節點”流程(ServerConnection -> run -> replica.joinMsgReceived)
canProceed.awaitUninterruptibly();
} finally {
waitTTPJoinMsgLock.unlock();
}
}
initReplica(); // 啟動本地節點通信系統
}
這段代碼是初始化節點的全部動作,其中涉及TTP節點的流程參照下文。
節點初始化的主流程分為三步:創建本地節點通信系統,初始化共識協議層,啟動本地節點通信系統。本文焦點在可靠通信,主要介紹本地節點通信系統的創建以及啟動。初始化共識協議會放在下一篇來講。
四、節點通信系統概覽
本地節點通信系統的工作主要靠類ServerCommunicationSystem來維護。接着上面的代碼進入它的構造函數。
public ServerCommunicationSystem(ServerViewController controller, ServiceReplica replica) throws Exception {
super("Server Comm. System");
this.controller = controller;
// 初始化消息處理工具
messageHandler = new MessageHandler();
// 請求消息容器,是一個阻塞消息隊列,大小根據配置文件。
inQueue = new LinkedBlockingQueue<SystemMessage>(controller.getStaticConf().getInQueueSize());
// 本地節點與遠端節點的連接對象(本文重點部分)
serversConn = new ServersCommunicationLayer(controller, inQueue, replica);
// 本地節點與外部客戶端的連接對象(本文不介紹)
clientsConn = CommunicationSystemServerSideFactory.getCommunicationSystemServerSide(controller);
}
節點通信系統類ServerCommunicationSystem承擔了所有本地節點的通信工作,它是一個統一的管理類。從業務角度來考慮,節點的通信可分為兩種:
- 與其他若干節點通信的負責對象是 serversConn。
- 與外部客戶端通信的負責對象是 clientsConn。
下面,我們梳理源碼中的線索,主要研究負責節點通信的對象的構建。
五、節點通信層准備
本地節點與遠端節點的通信是通過ServersCommunicationLayer類完成的。ServersCommunicationLayer類本身是一個線程類,在啟動該線程之前,要通過配置文件做一些准備工作。前面提到了配置域的概念,這是在配置文件中設置完成的,因此在啟動ServersCommunicationLayer線程之前,可預先完成配置域的本地和遠端節點的連接。
1. 創建socket服務端
進入ServersCommunicationLayer構造函數,首先創造本地的基於SSL/TLS安全協議的socket服務端對象serverSocketSSLTLS。
// 通信層屬性設置
this.controller = controller;
this.inQueue = inQueue;
this.me = controller.getStaticConf().getProcessId();
this.replica = replica;
this.ssltlsProtocolVersion = controller.getStaticConf().getSSLTLSProtocolVersion();
//獲取訪問地址,IP加端口。
String myAddress;
String confAddress = controller.getStaticConf().getRemoteAddress(controller.getStaticConf().getProcessId())
.getAddress().getHostAddress();
if (InetAddress.getLoopbackAddress().getHostAddress().equals(confAddress)) {
myAddress = InetAddress.getLoopbackAddress().getHostAddress();
} else if (controller.getStaticConf().getBindAddress().equals("")) {
myAddress = InetAddress.getLocalHost().getHostAddress();
if (InetAddress.getLoopbackAddress().getHostAddress().equals(myAddress) && !myAddress.equals(confAddress)) {
myAddress = confAddress;
}
} else {
myAddress = controller.getStaticConf().getBindAddress();
}
int myPort = controller.getStaticConf().getServerToServerPort(controller.getStaticConf().getProcessId());
// 開始構建基於SSL/TLS協議的Socket服務端對象。
FileInputStream fis = null;
try {
fis = new FileInputStream("config/keysSSL_TLS/" + controller.getStaticConf().getSSLTLSKeyStore());
ks = KeyStore.getInstance(KeyStore.getDefaultType());
ks.load(fis, SECRET.toCharArray());
} finally {
if (fis != null) {
fis.close();
}
}
String algorithm = Security.getProperty("ssl.KeyManagerFactory.algorithm");
kmf = KeyManagerFactory.getInstance(algorithm);
kmf.init(ks, SECRET.toCharArray());
trustMgrFactory = TrustManagerFactory.getInstance(algorithm);
trustMgrFactory.init(ks);
context = SSLContext.getInstance(this.ssltlsProtocolVersion);
context.init(kmf.getKeyManagers(), trustMgrFactory.getTrustManagers(), new SecureRandom());
serverSocketFactory = context.getServerSocketFactory();
// 獲得socket服務端對象
this.serverSocketSSLTLS = (SSLServerSocket) serverSocketFactory.createServerSocket(myPort, 100,
InetAddress.getByName(myAddress));
serverSocketSSLTLS.setEnabledCipherSuites(this.controller.getStaticConf().getEnabledCiphers());
String[] ciphers = serverSocketFactory.getSupportedCipherSuites();
for (int i = 0; i < ciphers.length; i++) {
logger.trace("Supported Cipher: {} ", ciphers[i]);
}
serverSocketSSLTLS.setEnableSessionCreation(true);
serverSocketSSLTLS.setReuseAddress(true);
serverSocketSSLTLS.setNeedClientAuth(true);
serverSocketSSLTLS.setWantClientAuth(true);
// 用戶密碼部分,待應用程序去實現,目前SECRET是寫死的。
SecretKeyFactory fac = TOMUtil.getSecretFactory();
PBEKeySpec spec = TOMUtil.generateKeySpec(SECRET.toCharArray());
selfPwd = fac.generateSecret(spec);
通過上面的源碼,我們梳理出來建立基於SSL_TLS的socket服務端對象的步驟:
- 讀取SSL_TLS秘鑰配置文件,構建KeyStore實例並加載秘鑰。
- 構建基於SunX509算法的KeyManagerFactory實例,傳入KeyStore完成初始化。
- 構建TrustManagerFactory實例,傳入KeyStore完成初始化。
- 構建SSL上下文SSLContext實例,傳入KeyManagerFactory和TrustManagerFactory。
- 通過上下文創建基於SSLTLS的socket連接實例serverSocketSSLTLS。
- 為serverSocketSSLTLS設置CipherSuites,加密套件。
- 為serverSocketSSLTLS設置支持創建多會話。
- 為serverSocketSSLTLS設置支持地址重用。
- 為serverSocketSSLTLS設置想要want客戶端認證,如果客戶端不提供認證也繼續。
- 為serverSocketSSLTLS設置必須need客戶端認證,如果客戶端不提供認證不繼續。
以上所有步驟如再進一步深入就下探到了rt.jar的javax.net.ssl包的內容,超出本文研究范圍。到目前為止,我們獲得了一個在節點間通信的基於SSLTLS的socket連接serverSocketSSLTLS。
private SSLServerSocket serverSocketSSLTLS;
用戶密碼
通信層還提供了節點的本地密碼加密功能selfPwd,這個密碼目前是由SECRET寫死在當前類中的,如果是基於BFT-SMaRt開發,可以繼承SECRET,通過節點管理員輸入,記錄本地的用戶密碼。不同於SSL/TLS安全協議中的各種秘鑰,以及區塊鏈必要的公私鑰,用戶密碼在用戶體驗方面更人性化,通過用戶密碼來管理整個節點的權利,是有益的。
2. [種類組合]本地節點與遠端節點
本地節點與遠端節點的通信包含一種情況,如下表所示:
| 本地節點\遠端節點 | 配置域 | TTP | 陌生域 |
|---|---|---|---|
| 配置域 | [A]本地和遠端節點均為配置域 | [D]配置域接收TTP | [G]配置域接收陌生節點 |
| TTP | [B]本地節點作為TTP接入配置域 | [E] TTP只能配置一個 | [H]TTP接入陌生節點 |
| 陌生域 | [C]陌生節點接入配置域 | [F]陌生節點接入TTP | [I]陌生節點接入陌生節點 |
根據表格所示,共有九種情況,我分別用大寫字母作為標識將他們區分。下面會在相應章節分析所有的情況,為便於查找,會使用這些標識關聯。其中[E]是悖論,因此,
[E]:悖論。
下面進入ServersCommunicationLayer的構造函數。通信層類ServersCommunicationLayer,是一個線程類,構建時傳入配置域視圖實例、節點對象、消息隊列。
3. 本地和遠端節點均為配置域
ServersCommunicationLayer擁有一個私有屬性的HashMap容器connections,對應的是網絡中每個節點都有一個自己的connections容器。該容器用來儲存本地節點的所有對外連接,以遠端id為key,連接對象為值。目前為止,該容器仍舊是空的,接下來,首先存儲域內其他3個節點的連接對象,id分別為1,2,3。
if (controller.isInCurrentView()) { // 本地節點在配置域
int[] initialV = controller.getCurrentViewAcceptors(); // 配置域節點id數組,[0,3]
for (int i = 0; i < initialV.length; i++) {
if (initialV[i] != me) { // 配置域中的非本地節點,找出其他3個
getConnection(initialV[i]);
}
}
}
遍歷配置域的節點id,篩選出非本地節點的遠端節點作為網絡內遠程節點id。
private ServerConnection getConnection(int remoteId) { // 其他3個節點id就是remoteId了。
connectionsLock.lock();
ServerConnection ret = this.connections.get(remoteId);
if (ret == null) { // 如果remoteId未曾建立連接,則新建,並按id檢索更新其連接對象。
// 建立連接(重點),第二個參數socket為null
ret = new ServerConnection(controller, null,
remoteId, this.inQueue, this.replica);
this.connections.put(remoteId, ret); // 以id為key,連接對象為值,存儲到HashMap容器。
}
connectionsLock.unlock();
return ret;
}
4. 建立socket連接
本地節點與配置域內的其他3個節點建立連接,是通過ServerConnection類的構造函數。注意此時傳入的socket對象為null。
ServerConnection是系統首次創建連接。
public ServerConnection(ServerViewController controller,
SSLSocket socket, int remoteId,
LinkedBlockingQueue<SystemMessage> inQueue,
ServiceReplica replica) {
this.controller = controller;
this.socket = socket;
this.remoteId = remoteId;
this.inQueue = inQueue;
this.outQueue = new LinkedBlockingQueue<byte[]>(this.controller.getStaticConf().getOutQueueSize());
if (isToConnect()) { // 校驗規則
ssltlsCreateConnection(); // 安全握手,建立通道。
}
if (this.socket != null) {
try {
// socket不為空時,建立socket IO通信流,用於發送和讀取
socketOutStream = new DataOutputStream(this.socket.getOutputStream());
socketInStream = new DataInputStream(this.socket.getInputStream());
} catch (IOException ex) {
logger.error("Error creating connection to " + remoteId, ex);
}
}
// 系統配置:指定通信系統是否應該使用獨立線程來發送數據
this.useSenderThread = this.controller.getStaticConf().isUseSenderThread();
if (useSenderThread && (this.controller.getStaticConf().getTTPId() != remoteId)) {
new SenderThread().start(); // 默認使用獨立發送線程,啟動SenderThread。
} else {
sendLock = new ReentrantLock(); // 使用當前線程發送,加重入鎖
}
if (!this.controller.getStaticConf().isTheTTP()) { // 當前節點不是TTP
if (this.controller.getStaticConf().getTTPId() == remoteId) { // 對方節點是TTP
new TTPReceiverThread(replica).start(); // 啟用TTP接入線程
} else { // remoteId不是TTP
new ReceiverThread().start(); // 啟動接收線程。
}
}
}
① 校驗規則
isToConnect()返回true或false。默認情況不連接,校驗規則有三條:
- 當遠端節點是TTP時,不連接。
- 當本地節點是TTP是,連接。
- 當本地節點在配置域,同時遠端節點id小於本地節點的id,連接。(id分配從小到大,如果比本地節點小,說明也在配置域里)
如果本地節點和遠端節點均為配置域,則前兩條與TTP相關的規則都沒用了。接着看第三條,本地節點此時是0,遠端節點為1,則不連接。這條規則很有趣,它規定了節點id從小到大去觸發連接。我們通過例子來分析:
- 本地啟動id為0的節點,此時 isToConnect = false,不執行ssltlsCreateConnection建立連接。
- 本地啟動id為1的節點,那么假定id為0的遠端節點已經啟動完成,則此時 isToConnect = true,執行ssltlsCreateConnection建立連接。如果遠端節點為2或3,仍舊 isToConnect = false。
所以,系統期待節點能夠按照配置域中的順序編號依次啟動,完成首次連接。但如果節點不按照配置域的序號啟動會發生什么?例如:
- 本地啟動id為1的節點,那么假定id為0的遠端節點已經啟動完成,則此時 isToConnect = true,執行ssltlsCreateConnection建立連接。但此時遠端節點0並未啟動,執行ssltlsCreateConnection會報錯:
17:25:06.985 [main] ERROR bftsmart.communication.server.ServerConnection - Connection refused (SocketException)
但這個錯是被捕捉到的,因此不影響后續流程。節點如果滿足條件的話,仍舊會創建獨立發送線程以及接收線程。
② 安全握手
安全連接的建立首先需要有連接請求,然后握手。如果沒有連接,在握手邏輯時就會被檢查到,那么就會中斷當前流程,等待直到獲得遠端節點的連接請求。然后封裝SSL/TLS協議,最后得到Socket服務端對象。通過socket服務端始終監聽socket客戶端,可以達到基於socket連接的兩方的在安全加密的通道內持續通信的目的。
這一次調用ServerConnection函數,是建立與配置域內的3個節點的連接,因此符合校驗規則。下面直接進入ssltlsCreateConnection方法。根據SSL/TLS協議,建立Socket服務端與Socket客戶端的通信連接的第一步,就是雙方進行安全握手。
基於SSL/TLS的Socket服務端對象創建流程參照下文創建socket服務端,同樣的,我們為Socket客戶端對象增加SSL/TLS協議。
拿到對象以后,為其增加異步的握手完成監聽HandshakeCompletedListener,當握手完成時,會在日志中打印相關信息,便於調試及流程展示。真實場景的日志信息如下:
-- SSL/TLS handshake complete!, Id:0 # CipherSuite: TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256.
-- SSL/TLS handshake complete!, Id:1 # CipherSuite: TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256.
-- SSL/TLS handshake complete!, Id:2 # CipherSuite: TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256.
接下來,啟動握手。這部分內容是與本項目無關的底層邏輯,握手分為:
- performInitialHandshake,連接狀態為握手時,發送初始化消息,包括握手hash,hello版本,格式檢查。
- kickstartHandshake,處理其他連接狀態,
握手的過程就是正式建立數據流之前,通道先進行安全預熱,經過幾次預熱以后,就完成了握手過程,雙方就建立了信任通道。
若配置域內遠端節點未啟動,則握手失敗。這里的握手代碼只是執行一次,但在啟動本地節點通信系統后會不斷地向connections的節點發起定時任務requestsTimer,發送多次的握手請求。這里我們假定其他3個節點均已啟動,遠端節點與本地節點都有連接請求,可以完成安全握手。
握手成功,開啟數據通信。將本地節點id發送給新連接的數據輸出流,作為新連接接入的身份介紹。
new DataOutputStream(this.socket.getOutputStream())
.writeInt(this.controller.getStaticConf().getProcessId());
本地節點向輸出流寫入本地id。注意所有節點用的是同一套代碼,所以作為遠端節點來講,就是獲得了一個輸入流,內容為remoteId(為0)。
③ IO開啟
回到ServerConnection,ssltlsCreateConnection方法已經讓this.socket獲得了有效的連接對象,下面就開啟IO。
if (this.socket != null) {
try {
// socket不為空時,建立socket IO通信流,用於發送和讀取
socketOutStream = new DataOutputStream(this.socket.getOutputStream());
socketInStream = new DataInputStream(this.socket.getInputStream());
} catch (IOException ex) {
logger.error("Error creating connection to " + remoteId, ex);
}
}
this.socket有兩種情況可以獲得有效對象:
- ServerConnection構造函數的參數中socket字段傳入為有效對象。
- isToConnect()返回true,ssltlsCreateConnection安全握手構建有效對象。
如果以上兩種情況均不滿足,則this.socket為null,則不運行該分支代碼,無法開啟IO。
④ 獨立發送線程
繼續往下跟蹤源碼。
// UseSenderThread是一個系統配置。
this.useSenderThread = this.controller.getStaticConf().isUseSenderThread();
if (useSenderThread && (this.controller.getStaticConf().getTTPId() != remoteId)) {
new SenderThread().start(); // 啟動獨立發送線程
} else {
sendLock = new ReentrantLock();
}
如果在system.config中,配置了system.communication.useSenderThread為true,且遠端節點是屬於配置域或陌生域。則在建立連接時,會啟動獨立線程SenderThread來發送數據。
如果系統配置useSenderThread為false,或者遠端節點是TTP,則不啟動獨立發送線程,創建重進入鎖,用於主線程發送消息時上鎖使用。
SenderThread線程類源碼如下。
private class SenderThread extends Thread {
public SenderThread() {
super("Sender for " + remoteId);
}
@Override
public void run() {
byte[] data = null;
while (doWork) {
try {
// 從out隊列(保存待發送消息)獲取並移除隊頭的元素
data = outQueue.poll(POOL_TIME, TimeUnit.MILLISECONDS);
} catch (InterruptedException ex) {
}
if (data != null) {
logger.trace("Sending data to, RemoteId:{}", remoteId);
sendBytes(data); // 發送數據
}
}
logger.debug("Sender for " + remoteId + " stopped!");
}
}
outQueue發送隊列保存了所有待發送消息。
protected LinkedBlockingQueue<byte[]> outQueue;
獨立發送線程通過調用發送隊列的poll方法獲取並移除隊列中隊頭的元素,作為發送數據,通過sendBytes方法發送出去。上一小節我們已經創建了IO流對象,獨立發送線程使用socket輸出流對象,在sendBytes方法體中將字節化的數據放入通道發送出去。
private final void sendBytes(byte[] messageData) {
boolean abort = false;
do {
if (abort)
return; // 如果需要重連,則終止方法
if (socket != null && socketOutStream != null) {
try {
byte[] data = new byte[5 + messageData.length];// 沒有包含MAC,消息驗證碼
int value = messageData.length; // 長度
System.arraycopy(new byte[] { (byte) (value >>> 24), (byte) (value >>> 16), (byte) (value >>> 8),
(byte) value }, 0, data, 0, 4); // data前個字節,用數據長度做位運算,給主數據做混淆
// 從data第5個字節開始,復制一份原數據。
System.arraycopy(messageData, 0, data, 4, messageData.length);
// data的最后一個字節,寫入一個0
System.arraycopy(new byte[] { (byte) 0 }, 0, data, 4 + messageData.length, 1);
socketOutStream.write(data); // 發送數據
return;
} catch (IOException ex) {
closeSocket();
waitAndConnect(); // IO異常,重連
abort = true;
}
} else {
waitAndConnect(); // socket丟失,重連
abort = true;
}
} while (doWork);
}
如果過程中出現任何連接異常,則嘗試重連。
⑤ 獨立接收線程
回到ServerConnection,繼續跟蹤代碼。
if (!this.controller.getStaticConf().isTheTTP()) {
if (this.controller.getStaticConf().getTTPId() == remoteId) {
new TTPReceiverThread(replica).start(); // TTP部分
} else {
new ReceiverThread().start();
}
}
如果本地節點不是TTP,遠端節點是TTP,則啟動TTPReceiverThread獨立線程。如果遠端節點不是TTP,可能是配置域或陌生域,則啟動獨立接收線程ReceiverThread。目前還未涉及到TTP的連接,因此進入接收線程。
protected class ReceiverThread extends Thread {
public ReceiverThread() {
super("Receiver for " + remoteId);
}
@Override
public void run() {
while (doWork) {
if (socket != null && socketInStream != null) {
try {
// 先讀取消息長度,分配空間。
int dataLength = socketInStream.readInt();
byte[] data = new byte[dataLength];
// 讀入數據
int read = 0;
do {
read += socketInStream.read(data, read, dataLength - read);
} while (read < dataLength);
byte hasMAC = socketInStream.readByte(); // 輸入流繼續讀入一個字節,判斷是否存在消息驗證碼
logger.trace("Read: {}, HasMAC: {}", read, hasMAC);
SystemMessage sm = (SystemMessage) (new ObjectInputStream(new ByteArrayInputStream(data))
.readObject()); // 包裝消息為SystemMessage對象
// SSL/TLS協議的驗證已完成
sm.authenticated = true;
if (sm.getSender() == remoteId) { // 發送者為遠端id
if (!inQueue.offer(sm)) { // 將消息添加至讀入隊列
logger.warn("Inqueue full (message from " + remoteId + " discarded).");
}
}
} catch (ClassNotFoundException ex) {
logger.info("Invalid message received. Ignoring!");
} catch (IOException ex) {
if (doWork) {
logger.debug("Closing socket and reconnecting");
closeSocket();
waitAndConnect(); // IO異常,重連
}
}
} else {
waitAndConnect(); // socket丟失,重連
}
}
}
}
讀入的消息都會被轉化為統一的SystemMessage對象,被放入讀入隊列inQueue等待后續處理。
private LinkedBlockingQueue<SystemMessage> inQueue;
5. 配置域的本地和遠端節點連接完成
到目前為止,ServersCommunicationLayer構造函數就差最后一行就執行完畢。
start(); // ServersCommunicationLayer構造函數的最后一行用於啟動主線程
如果假定配置域中四個節點的代碼均執行掛起在ServersCommunicationLayer構造函數的最后一行,也就是還沒有啟動ServersCommunicationLayer構造函數主線程。那么每個節點系統內都是有且只有6條線程存在。

結合上一節的源碼分析,本地節點與遠端節點建立了獨立的發送線程和獨立的接收線程。共有3個遠端節點,因此就產生了6條線程。這也符合軟件設計原則中的單一職責原則。如上圖所示,線程服務對象分別對應着節點id為1,2,3。本地節點是0。本地節點與配置域的3個遠端節點建立了ServerConnection。並且將連接對象ServerConnection緩存到了本地容器connections中。未來可以直接通過remoteId得到連接對象。
那么,配置域節點間是否真正建立起有效連接了呢?
答案是否定的,不一定。這取決於這4個節點是否按照順序執行。如果節點0首先啟動程序掛起到start(),然后節點1再啟動程序同樣掛起到start()。這時候,根據前文介紹的isToConnect()規則校驗,節點1會與節點0成功建立連接。以此類推,配置域內所有節點都會成功建立與其他3個節點的連接,所以此時配置域內有1-0、2-1、2-0、3-2、3-1、3-0根據公式
共6個連接,共24個線程。如果不按照以上順序執行,由於我們假定所有節點都掛起在start(),因此其實只有一次機會去建立連接。節點1在啟動時去找節點0但失敗了,就失去了這次連接的機會。同樣的,配置域內其他節點也是這樣。這時候配置域內就不一定是幾個連接了,因為可能有的節點按順序了,有的沒有,例如啟動順序為1,0,2,3 就會建立5條連接,仍舊24個線程,這里正好缺了一條節點0和節點1的連接。而值得注意的是,無論是否建立連接,每個節點都會有6個線程,所以配置域中4個節點24個線程是不會改變的。
如果,配置域間未成功建立連接,后續怎么辦?
那就需要啟動ServerCommunicationSystem主線程start(),通過代碼編寫定時任務,每個節點都會啟動獨立的線程,讓節點之間擁有持續的連接機會,而不是僅這一次。
6. 重連
socket連接在建立過程或IO通信過程中,出現IO異常或socket丟失的時候,都會做重連處理。重連處理通過執行方法waitAndConnect,以及方法reconnect。
private void waitAndConnect() {
if (doWork) {
try {
Thread.sleep(POOL_TIME); // 阻塞線程5s
} catch (InterruptedException ie) {
}
outQueue.clear(); // 清空發送隊列
reconnect(null); // 調用重連
}
}
該方法首先會等待5秒鍾,然后再清空發送隊列,調用重連方法reconnect。
protected void reconnect(SSLSocket newSocket) {
connectLock.lock();
if (socket == null || !socket.isConnected()) {
if (isToConnect()) { // 校驗規則
ssltlsCreateConnection(); // 安全握手
} else {
socket = newSocket;
}
if (socket != null) {
try {
// 建立連接的標志就是啟動socket IO流,與首次連接ServerConnection是一樣的。
socketOutStream = new DataOutputStream(socket.getOutputStream());
socketInStream = new DataInputStream(socket.getInputStream());
} catch (IOException ex) {
logger.error("Failed to authenticate to replica", ex);
}
}
}
connectLock.unlock();
}
前面分析過的ServerConnection是首次連接,reconnect是重連,因此建立連接的代碼差不多,ssltlsCreateConnection只會在這兩個方法被調用。建立連接的標志就是啟動socket IO流,校驗后完成握手即可創建IO流,重連完成。
六、啟動節點通信層
通過前面幾節的詳細論述,ServersCommunicationLayer構造函數最后一行之前的所有流程我們都已研究清楚。本節將執行start()啟動ServersCommunicationLayer的主線程。下面進入run函數。
public void run() {
while (doWork) {
try {
// 啟動已有連接的通信監聽
SSLSocket newSocket = (SSLSocket) serverSocketSSLTLS.accept();
setSSLSocketOptions(newSocket);
// 讀入連接建立時存有本地節點id的輸出流。現在的角色變了,
// 那個本地節點成為了遠端節點,那么輸出流就是當前本地節點的輸入流。
int remoteId = new DataInputStream(newSocket.getInputStream()).readInt();
if (!this.controller.isInCurrentView() &&
(this.controller.getStaticConf().getTTPId() != remoteId)) {
// 本地節點不是配置域且遠端節點不是TTP,對應情況[B][C][H][I]
waitViewLock.lock();
pendingConn.add(new PendingConnection(newSocket, remoteId));
waitViewLock.unlock();
} else { // 遠端節點為TTP或本地節點為配置域,對應情況[A][D][F][G]
logger.debug("Trying establish connection with Replica: {}", remoteId);
establishConnection(newSocket, remoteId);
}
} catch (SocketTimeoutException ex) {
logger.trace("Server socket timed out, retrying");
} catch (SSLHandshakeException sslex) {
sslex.printStackTrace();
} catch (IOException ex) {
logger.error("Problem during thread execution", ex);
}
}
try {
serverSocket.close();
} catch (IOException ex) {
logger.error("Failed to close server socket", ex);
}
logger.info("ServerCommunicationLayer stopped.");
}
ServersCommunicationLayer主線程,首先如果連接已成功,開始監聽,建立通道。通道內第一個消息是遠端節點id,分析遠端id和本地id的情況,得到:
- 本地節點不是配置域且遠端節點不是TTP,對應情況[B][C][H][I],放入容器pendingConn,等待后續處理。
- 遠端節點為TTP或本地節點為配置域,對應情況[A][D][F][G],直接調用establishConnection建立連接。
1. IO開啟
第二種情況比較簡單,
else { // 遠端節點為TTP或本地節點為配置域,對應情況[A][D][F][G]
logger.debug("Trying establish connection with Replica: {}", remoteId);
establishConnection(newSocket, remoteId);
}
我們直接進入establishConnection函數。
private void establishConnection(SSLSocket newSocket, int remoteId) throws IOException {
if ((this.controller.getStaticConf().getTTPId() == remoteId) || this.controller.isCurrentViewMember(remoteId)) {
// 遠端ID為TTP或配置域,從[A][D][F][G]中篩選,剩余[A][D][F]
connectionsLock.lock();
if (this.connections.get(remoteId) == null) { // connections中不存在,說明首次連接
this.connections.put(remoteId, // 傳入newSocket調用ServerConnection
new ServerConnection(controller, newSocket, remoteId, inQueue, replica));
} else { // connections中已有連接對象,重連即可。
logger.debug("ReConnecting with replica: {}", remoteId);
this.connections.get(remoteId).reconnect(newSocket);
}
connectionsLock.unlock();
} else { // 本地節點為配置域接收遠端陌生節點,屬於[G]情況的處理,直接關閉。
logger.debug("Closing connection with replica: {}", remoteId);
newSocket.close();
}
}
參考以上代碼中的注釋,首先情況[G]會被直接關閉。
[G]:關閉連接
剩余[A][D][F],如果connections容器中能查到,則直接傳入socket調用reconnect方法重連。前文分析了reconnect方法,但與這一次分支不同。
if (isToConnect()) {
ssltlsCreateConnection(); // [A]
} else {
socket = newSocket; // [D][F]
}
決定權又來到了isToConnect()。我們再次對比規則(默認不連接):
- 遠端TTP,不連接 [D]
- 本地TTP,連接
- 本地配置域且id大於遠端id,TTPid肯定大於配置域,所以遠端id一定是配置域。雙雙配置域[A]
所以,[D][F]為不連接傳入,[A]為連接。
[A]:如果失去了准備階段的首次配置域連接機會,這里可以再次建立連接。構建IO通道是建立連接的標志。
[D][F]:使用當前socket,既然讀到了remoteId,說明連接已成功。那么就使用該socket建立IO通道。
2. 待處理連接
回到第一種情況,本地節點不是配置域且遠端節點不是TTP,對應情況[B][C][H][I],放入容器pendingConn,等待后續處理。
if (!this.controller.isInCurrentView() && (this.controller.getStaticConf().getTTPId() != remoteId)) {
// 本地節點不是配置域且遠端節點不是TTP,對應情況[B][C][H][I]
waitViewLock.lock();
pendingConn.add(new PendingConnection(newSocket, remoteId));
waitViewLock.unlock();
}
PendingConnection類是pendingConn 的元素,用於存儲掛起連接。陌生節點在接收到連接的響應之后,只有學習了當前視圖后才可接受連接。這主要是為了處理陌生域的情況。
private List<PendingConnection> pendingConn = new LinkedList<PendingConnection>();
3. 節點連接對象完成
ServerCommunicationSystem構造函數中ServersCommunicationLayer的對象serversConn完成。下面我們通過4個維度進行總結。
① 線程狀況
此時本地線程除了上面的6條以外,又增加了一條剛剛啟動的ServersCommunicationLayer的線程。

② 已涉及類圖
接着,我們梳理一下到目前為止,涉及到的幾個類的類圖關系。

實現Runnable接口的都是線程類,目前ServerCommunicationSystem還未構建完成,因此其他3個線程類共創建了7條線程,如上所示。
③ [種類組合]結果現狀
這里再次更新一下前面本地節點與遠端節點的種類組合的結果情況,參見五-2。
| 本地節點\遠端節點 | 配置域 | TTP | 陌生域 |
|---|---|---|---|
| 配置域 | [A] 構建IO通道(終態) | [D] 構建IO通道 | [G] 關閉連接(終態) |
| TTP | [B] 待處理 | [E] 悖論(終態) | [H] 待處理 |
| 陌生域 | [C] 待處理 | [F] 構建IO通道 | [I] 待處理 |
如上表所示,[A][E][G]情況已經處理完畢。剩余的部分,其中[D][F]還有待TTP流程處理。而[B][C][H][I]還需進一步處理。
④ 代碼調試進程
到目前為止,代碼調試的執行情況進展到了ServerCommunicationSystem構造函數。

下面繼續ServerCommunicationSystem的構造,進入clientConn的構造。
七、后記
經過第五、第六兩章的論述,節點通信對象serversConn已完成。節點間的通信是通過原生socket建立的,其中也涉及到我們手動對socket進行SSL/TLS安全協議的構建。前文也分析了線程的情況,由於我們假定在聯盟鏈的場景中,作為服務端的節點並不多,因此維護長連接的線程開銷可以承受。然而如果從公鏈角度或者客戶端請求連接來考慮,原生socket是不足以支撐這種情況的。我們看到節點間的通信甚至沒有使用到線程池以及后續的一系列改善方案,這是隨着場景的變化需要不斷升級適配的。本章開始介紹客戶端的通信,客戶端通信的特點是需要維護更大規模的連接以及IO通信。因此,客戶端的通信采用了目前較主流更健壯的Netty NIO的方式。下面一篇文章將繼續分析BFT-SMaRt在底層網絡通信架構上對於Netty的使用。
更多文章請轉到一面千人的博客園。
