在之前的文章中,我們模擬了節點網絡通訊,很多朋友反饋說,他們想看真正的節點網絡通訊而不是單節點的模擬。本章將滿足你們。😌
我將本章的內容放在了com.v5ent.real.p2p包中,大家可以在源碼中找到我更新的代碼。
通過本文,你將可以做到:
- 創建自己的真實peer-to-peer網絡
- 多個節點通過p2p網絡同步區塊內容
- 在自我節點實現RPC通訊,並向區塊中寫數據
- 在自我節點查看整個區塊內容
- 不含虛擬貨幣在內的其他區塊鏈知識
學習本章至少需要兩台可ping通的機器(虛擬機也可以),並且安裝了jdk8(如果只有jdk6,可以把代碼改為jdk6支持的形式)。
基於之前的文章,本章將重寫p2p部分的內容。我們首先要理清思路,提出區塊鏈通訊需要解決的問題。
1.如果我第一次作為全節點啟動,我需要干什么
2.如果我已經啟動,別的節點和我通訊,我需要交互哪些信息
3.如果我已經啟動,我怎么和自己的客戶端交互
這些問題有一個很好的參考對象,就是比特幣,我們先來看看比特幣的通訊過程
一、比特幣節點連接建立
1.尋找比特幣網絡中的有效節點,此步驟通常有兩種方法:
(1)使用“DNS種子”(DNS seeds),DNS種子提供比特幣節點的IP地址列表,Bitcoin Core客戶端提供五種不同的DNS種子,通常使用默認方式。這里不展開談論。
(2)手動通過-seednode命令指定一個比特幣節點的IP地址作為比特幣種子節點(為什么叫種子,我的理解就是,根據種子,得到更多)
這里我們使用簡單的方式來處理節點,用一個文件來存儲地址列表-peers.list,第一次連接的時候更新這個文件,獲取更多連接時也更新這個文件。簡單演示我們直接把要測試的機器ip和端口加進去。
我有兩台機,我把端口8015用作新的p2p網絡的默認監聽端口,那么我的peers.list的內容是:
10.16.0.205:8015
10.16.3.77:8015
2.與發現的有效比特幣節點進行初始“握手”,建立連接
節點發送一條包含基本認證內容的version消息開始“握手”通信過程,該消息包括如下內容:
-
nVersion:客戶端的比特幣P2P協議所采用的版本(例如:70002)。
-
nLocalServices:一組該節點支持的本地服務列表,當前僅支持NODE_NETWORK
-
nTime:當前時間
-
addrYou:當前節點可見的遠程節點的IP地址(上例中NodeB IP)
-
addrMe:當前節點的IP地址(上例中NodeA IP)
-
subver:指示當前節點運行的軟件類型的子版本號(例如:”/Satoshi:0.9.2.1/”)
-
BestHeight:當前節點區塊鏈的區塊高度(初始為0,即只包含創世區塊)
我們簡化一下,這里我們最關心的就是區塊高度bestHeight,我們就傳遞這個好了。區塊高度就是區塊鏈的長度。
1 if ("VERSION".equalsIgnoreCase(cmd)) { 2 // 對方發來握手信息,我方發給對方區塊高度和最新區塊的hash 3 pt.peerWriter.write("VERACK " + blockChain.size() + " " + blockChain.get(blockChain.size() - 1).getHash()); 4 }else if ("VERACK".equalsIgnoreCase(cmd)) { 5 // 獲取區塊高度 6 String[] parts = payload.split(" "); 7 bestHeight = Integer.parseInt(parts[0]); 8 //哈希暫時不校驗 9 }
3.新節點建立更多的連接,使節點在網絡中被更多節點接收,保證連接更穩定
這里我們就兩台機,如果你有更多機器,可以實現一下這個通訊,本章我們簡單實現一下。
1 if ("ADDR".equalsIgnoreCase(cmd)) { 2 // 對方發來地址,建立連接並保存 3 if (!peers.contains(payload)) { 4 String peerAddr = payload.substring(0, payload.indexOf(":")); 5 int peerPort = Integer.parseInt(payload.substring(payload.indexOf(":") + 1)); 6 peerNetwork.connect(peerAddr, peerPort); 7 peers.add(payload); 8 PrintWriter out = new PrintWriter(peerFile); 9 for (int k = 0; k < peers.size(); k++) { 10 out.println(peers.get(k)); 11 } 12 out.close(); 13 } 14 } else if ("GET_ADDR".equalsIgnoreCase(cmd)) { 15 //對方請求更多peer地址,隨機給一個 16 Random random = new Random(); 17 pt.peerWriter.write("ADDR " + peers.get(random.nextInt(peers.size()))); 18 }
4.交換“區塊清單”(注:該步驟僅在全節點上會執行,且從與節點建立連接就開始進行)本系列內容只使用全節點。
全節點
全節點沿着區塊鏈按時間倒敘一直追溯到創世區塊,建立一個完整的UTXO數據庫,通過查詢UTXO是否未被支付來驗證交易的有效性。
SPV節點
SPV節點通過向其他節點請求某筆交易的Merkle路徑(Merkle樹我可能會在后續章節講到),如果路徑正確無誤,並且該交易之上已有6個或以上區塊被確認,則證明該交易不是雙重支付。
全節點在連接到其他節點后,需要構建完整的區塊鏈,如果是新節點,它僅包含靜態植入客戶端中的0號區塊(創世區塊)。注意了,創世區塊是靜態的(硬編碼)。
如前文所言,我們在區塊鏈中取最長的鏈,區塊高度比我高,我就向對方獲取區塊。
1 else if ("BLOCK".equalsIgnoreCase(cmd)) { 2 //把對方給的塊存進鏈中 3 LOGGER.info("Attempting to add block..."); 4 LOGGER.info("Block: " + payload); 5 Block newBlock = gson.fromJson(payload, Block.class); 6 if (!blockChain.contains(newBlock)) { 7 // 校驗區塊,如果成功,將其寫入本地區塊鏈 8 if (BlockUtils.isBlockValid(newBlock, blockChain.get(blockChain.size() - 1))) { 9 if (blockChain.add(newBlock) && !catchupMode) { 10 LOGGER.info("Added block " + newBlock.getIndex() + " with hash: ["+ newBlock.getHash() + "]"); 11 peerNetwork.broadcast("BLOCK " + payload); 12 } 13 } 14 } 15 } else if ("GET_BLOCK".equalsIgnoreCase(cmd)) { 16 //把對方請求的塊給對方 17 LOGGER.info("Sending block[" + payload + "] to peer"); 18 Block block = blockChain.get(Integer.parseInt(payload)); 19 if (block != null) { 20 LOGGER.info("Sending block " + payload + " to peer"); 21 pt.peerWriter.write("BLOCK " + gson.toJson(block)); 22 } 23 }
到這里,我們基本上完成了p2p網絡中關於區塊的通訊。
其實比特幣等虛擬貨幣中還有很多通訊,關於交易的,這里我們不需要,不做討論。
讓我們開始編碼吧!
整合上文提到的所有通訊,Node.java的代碼如下
1 private static final Logger LOGGER = LoggerFactory.getLogger(Node.class); 2 3 /** 本地區塊鏈 */ 4 private static List<Block> blockChain = new LinkedList<Block>(); 5 6 public static void main(String[] args) throws IOException, InterruptedException { 7 int port = 8015; 8 LOGGER.info("Starting peer network... "); 9 PeerNetwork peerNetwork = new PeerNetwork(port); 10 peerNetwork.start(); 11 LOGGER.info("[ Node is Started in port:"+port+" ]"); 17 ArrayList<String> peers = new ArrayList<>(); 18 File peerFile = new File("peers.list"); 19 if (!peerFile.exists()) { 20 String host = InetAddress.getLocalHost().toString(); 21 FileUtils.writeStringToFile(peerFile, host+":"+port); 22 } 23 for (Object peer : FileUtils.readLines(peerFile)) { 24 String[] addr = peer.toString().split(":"); 25 peerNetwork.connect(addr[0], Integer.parseInt(addr[1])); 26 } 27 TimeUnit.SECONDS.sleep(2); 28 29 peerNetwork.broadcast("VERSION"); 30 31 // hard code genesisBlock 32 Block genesisBlock = new Block(); 33 genesisBlock.setIndex(0); 34 genesisBlock.setTimestamp("2017-07-13 22:32:00");//my son's birthday 35 genesisBlock.setVac(0); 36 genesisBlock.setPrevHash(""); 37 genesisBlock.setHash(BlockUtils.calculateHash(genesisBlock)); 38 blockChain.add(genesisBlock); 39 40 final Gson gson = new GsonBuilder().create(); 41 LOGGER.info(gson.toJson(blockChain)); 42 int bestHeight = 0; 43 boolean catchupMode = true; 44 45 /** 46 * p2p 通訊 47 */ 48 while (true) { 49 //對新連接過的peer寫入文件,下次啟動直接連接 50 for (String peer : peerNetwork.peers) { 51 if (!peers.contains(peer)) { 52 peers.add(peer); 53 FileUtils.writeStringToFile(peerFile, peer); 54 } 55 } 56 peerNetwork.peers.clear(); 57 58 // 處理通訊 59 for (PeerThread pt : peerNetwork.peerThreads) { 60 if (pt == null || pt.peerReader == null) { 61 break; 62 } 63 List<String> dataList = pt.peerReader.readData(); 64 if (dataList == null) { 65 LOGGER.info("Null ret retry."); 66 System.exit(-5); 67 break; 68 } 69 70 for (String data:dataList) { 71 LOGGER.info("Got data: " + data); 72 int flag = data.indexOf(' '); 73 String cmd = flag >= 0 ? data.substring(0, flag) : data; 74 String payload = flag >= 0 ? data.substring(flag + 1) : ""; 75 if (StringUtils.isNotBlank(cmd)) { 76 if ("VERSION".equalsIgnoreCase(cmd)) { 77 // 對方發來握手信息,我方發給對方區塊高度和最新區塊的hash 78 pt.peerWriter.write("VERACK " + blockChain.size() + " " + blockChain.get(blockChain.size() - 1).getHash()); 79 }else if ("VERACK".equalsIgnoreCase(cmd)) { 80 // 獲取區塊高度 81 String[] parts = payload.split(" "); 82 bestHeight = Integer.parseInt(parts[0]); 83 //哈希暫時不校驗 84 } else if ("GET_BLOCK".equalsIgnoreCase(cmd)) { 85 //把對方請求的塊給對方 86 LOGGER.info("Sending block[" + payload + "] to peer"); 87 Block block = blockChain.get(Integer.parseInt(payload)); 88 if (block != null) { 89 LOGGER.info("Sending block " + payload + " to peer"); 90 pt.peerWriter.write("BLOCK " + gson.toJson(block)); 91 } 92 } else if ("BLOCK".equalsIgnoreCase(cmd)) { 93 //把對方給的塊存進鏈中 94 LOGGER.info("Attempting to add block..."); 95 LOGGER.info("Block: " + payload); 96 Block newBlock = gson.fromJson(payload, Block.class); 97 if (!blockChain.contains(newBlock)) { 98 // 校驗區塊,如果成功,將其寫入本地區塊鏈 99 if (BlockUtils.isBlockValid(newBlock, blockChain.get(blockChain.size() - 1))) { 100 if (blockChain.add(newBlock) && !catchupMode) { 101 LOGGER.info("Added block " + newBlock.getIndex() + " with hash: ["+ newBlock.getHash() + "]"); 102 peerNetwork.broadcast("BLOCK " + payload); 103 } 104 } 105 } 106 }else if ("GET_ADDR".equalsIgnoreCase(cmd)) { 107 //對方請求更多peer地址,隨機給一個 108 Random random = new Random(); 109 pt.peerWriter.write("ADDR " + peers.get(random.nextInt(peers.size()))); 110 } else if ("ADDR".equalsIgnoreCase(cmd)) { 111 // 對方發來地址,建立連接並保存 112 if (!peers.contains(payload)) { 113 String peerAddr = payload.substring(0, payload.indexOf(":")); 114 int peerPort = Integer.parseInt(payload.substring(payload.indexOf(":") + 1)); 115 peerNetwork.connect(peerAddr, peerPort); 116 peers.add(payload); 117 PrintWriter out = new PrintWriter(peerFile); 118 for (int k = 0; k < peers.size(); k++) { 119 out.println(peers.get(k)); 120 } 121 out.close(); 122 } 123 } 124 } 125 } 126 } 127 128 // ******************************** 129 // 比較區塊高度,同步區塊 130 // ******************************** 131 132 int localHeight = blockChain.size(); 133 134 if (bestHeight > localHeight) { 135 catchupMode = true; 136 LOGGER.info("Local chain height: " + localHeight); 137 LOGGER.info("Best chain Height: " + bestHeight); 138 TimeUnit.MILLISECONDS.sleep(300); 139 140 for (int i = localHeight; i < bestHeight; i++) { 141 LOGGER.info("請求塊 " + i + "..."); 142 peerNetwork.broadcast("GET_BLOCK " + i); 143 } 144 } else { 145 if (catchupMode) { 146 LOGGER.info("[p2p] - Caught up with network."); 147 } 148 catchupMode = false; 149 } 150 151 152 153 // **************** 154 // 循環結束 155 // **************** 156 TimeUnit.MILLISECONDS.sleep(200); 157 } 158 }
PeerNetwork簡單封裝了一下p2p通訊的細節,篇幅有限我這里只列出核心交互,具體實現可以去看我的github源碼中的類:PeerThread、PeerReader、PeerWriter。
RPC
接下來,我們將討論關於本地節點客戶端的概念。在比特幣中,除了bitcoin-core,還有bitcoin-cli,這是做什么的呢。
它其實是用來做本地節點交互的,比如我作為本地節點,我需要發起交易,需要查看我的資產等等,后來發展出gui界面,就是大家俗稱的錢包。
在本章中,我們也需要這樣一個客戶端通訊,用來將我們的vac寫入鏈中(之前的文章,我們是用控制台輸入的,實際的做法是提供加密的rpc調用)我們接下來實現RPC服務
首先我們要在Node.java中加入通訊邏輯
LOGGER.info("Starting RPC daemon... ");
RpcServer rpcAgent = new RpcServer(port+1);
rpcAgent.start();
LOGGER.info("[ RPC agent is Started in port:"+(port+1)+" ]");
for循環體中增加
/** * 處理RPC服務 */ for (RpcThread th:rpcAgent.rpcThreads) { String request = th.req; if (request != null) { String[] parts = request.split(" "); parts[0] = parts[0].toLowerCase(); if ("getinfo".equals(parts[0])) { String res = gson.toJson(blockChain); th.res = res; } else if ("send".equals(parts[0])) { try { int vac = Integer.parseInt(parts[1]); // 根據vac創建區塊 Block newBlock = BlockUtils.generateBlock(blockChain.get(blockChain.size() - 1), vac); if (BlockUtils.isBlockValid(newBlock, blockChain.get(blockChain.size() - 1))) { blockChain.add(newBlock); th.res = "write Success!"; peerNetwork.broadcast("BLOCK " + gson.toJson(newBlock)); } else { th.res = "RPC 500: Invalid vac Error\n"; } } catch (Exception e) { th.res = "Syntax (no '<' or '>'): send <vac> <privateKey>"; LOGGER.error("invalid vac", e); } } else { th.res = "Unknown command: \"" + parts[0] + "\""; } } }
獨立線程處理
RpcThread.java
package com.v5ent.real.p2p; import java.io.BufferedReader; import java.io.InputStreamReader; import java.io.PrintWriter; import java.net.Socket; import java.util.concurrent.TimeUnit; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * 處理單個rpc連接 * @author Mignet */ public class RpcThread extends Thread { private static final Logger LOGGER = LoggerFactory.getLogger(RpcThread.class); private Socket socket; String res; String req; /** * 默認構造函數 * @param socket */ public RpcThread(Socket socket){ this.socket = socket; } @Override public void run(){ try{ req = null; res = null; PrintWriter out = new PrintWriter(socket.getOutputStream(), true); BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream())); String input; out.println("================Welcome RPC Daemon=============="); while((input = in.readLine()) != null){ if ("HELP".equalsIgnoreCase(input)){ out.println("############################################## COMMANDS ###############################################"); out.println("# 1) getinfo - Gets block chain infomations. #"); out.println("# 2) send <vac> - Write <vac> to blockChain #"); out.println("#######################################################################################################"); } else { req = input; while (res == null){ TimeUnit.MILLISECONDS.sleep(25); } out.println(res); req = null; res = null; } } } catch (Exception e){ LOGGER.info("An RPC client has disconnected.",e); } } }
RpcServer.java
package com.v5ent.real.p2p; import java.net.ServerSocket; import java.util.ArrayList; import java.util.List; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * RPC服務 * * 注意:不要把這個端口開放給外網 * @author Mignet */ public class RpcServer extends Thread { private static final Logger LOGGER = LoggerFactory.getLogger(RpcServer.class); private int port; private boolean runFlag = true; List<RpcThread> rpcThreads; /** * 默認配置 */ public RpcServer() { this.port = 8016; this.rpcThreads = new ArrayList<>(); } /** * 指定端口 * @param port Port to listen on */ public RpcServer(int port) { this.port = port; this.rpcThreads = new ArrayList<>(); } @Override public void run() { try { ServerSocket socket = new ServerSocket(port); while (runFlag) { RpcThread thread = new RpcThread(socket.accept()); rpcThreads.add(thread); thread.start(); } socket.close(); } catch (Exception e){ LOGGER.error("rpc error in port:" + port,e); } } }
跑起來
1.使用mvn的install命令打包
2.新建peers.list,把要組建網絡的ip地址填入去,在本機執行jar命令,啟動第一個節點。注意,這時候它會嘗試連接別的節點,連接不上
3.把jar包和peers.list上傳到其他機器,啟動第二個節點
4.我們用cmd在本機打開新的窗口,執行nc 127.0.0.1 8016,連接到本機節點的rpc服務,輸入help查看支持的命令:
5.節點1(本機)增加一個區塊:在rpc命令中,我們實現了1,查看區塊鏈;2,寫入vac數據,來驗證一下
我們先輸入getinfo查看一下,然后send 88,看到寫入成功了,再輸入getinfo,果然看到了新的塊在鏈中。
我們也可以看到節點控制台的輸出中關於新增區塊的信息
6.驗證是不是同步了:我們看一下另一台機器
我們在這台機器上也使用nc localhost 8016來連接看看區塊
7.我們再從這個結點寫入一個塊(send 666)
看看本機接收到了沒
很完美。
到此基本演示了區塊鏈通訊真實的樣子。當然,這里有很多可以改進的地方,比如安全性,比如命令的模式,比如不用sleep而是用Future,比如使用netty等更高效更成熟的通訊框架。
如果想利用區塊鏈來發行數字貨幣,那么在此基礎上,還要有公私鑰簽名交易,交易通訊,校驗,使用共識算法來選舉及獎勵貨幣等。
還有什么區塊鏈知識是本系列沒有提到的嗎?有的,使用默克爾樹來快速驗證區塊和整個鏈.
關於幣的問題也可以問,比如UTXO模型可以講一講嗎?如果有問題請大家留言