只用120行Java代碼寫一個自己的區塊鏈-4實現真正的p2p網絡


在之前的文章中,我們模擬了節點網絡通訊,很多朋友反饋說,他們想看真正的節點網絡通訊而不是單節點的模擬。本章將滿足你們。😌

我將本章的內容放在了com.v5ent.real.p2p包中,大家可以在源碼中找到我更新的代碼。

通過本文,你將可以做到:

  • 創建自己的真實peer-to-peer網絡
  • 多個節點通過p2p網絡同步區塊內容
  • 在自我節點實現RPC通訊,並向區塊中寫數據
  • 在自我節點查看整個區塊內容
  • 不含虛擬貨幣在內的其他區塊鏈知識

學習本章至少需要兩台可ping通的機器(虛擬機也可以),並且安裝了jdk8(如果只有jdk6,可以把代碼改為jdk6支持的形式)。

基於之前的文章,本章將重寫p2p部分的內容。我們首先要理清思路,提出區塊鏈通訊需要解決的問題。

1.如果我第一次作為全節點啟動,我需要干什么

2.如果我已經啟動,別的節點和我通訊,我需要交互哪些信息

3.如果我已經啟動,我怎么和自己的客戶端交互

這些問題有一個很好的參考對象,就是比特幣,我們先來看看比特幣的通訊過程

一、比特幣節點連接建立

1.尋找比特幣網絡中的有效節點,此步驟通常有兩種方法:

(1)使用“DNS種子”(DNS seeds),DNS種子提供比特幣節點的IP地址列表,Bitcoin Core客戶端提供五種不同的DNS種子,通常使用默認方式。這里不展開談論。

(2)手動通過-seednode命令指定一個比特幣節點的IP地址作為比特幣種子節點(為什么叫種子,我的理解就是,根據種子,得到更多)

這里我們使用簡單的方式來處理節點,用一個文件來存儲地址列表-peers.list,第一次連接的時候更新這個文件,獲取更多連接時也更新這個文件。簡單演示我們直接把要測試的機器ip和端口加進去。

我有兩台機,我把端口8015用作新的p2p網絡的默認監聽端口,那么我的peers.list的內容是:

10.16.0.205:8015

10.16.3.77:8015

2.與發現的有效比特幣節點進行初始“握手”,建立連接

節點發送一條包含基本認證內容的version消息開始“握手”通信過程,該消息包括如下內容:

  • nVersion:客戶端的比特幣P2P協議所采用的版本(例如:70002)。

  • nLocalServices:一組該節點支持的本地服務列表,當前僅支持NODE_NETWORK

  • nTime:當前時間

  • addrYou:當前節點可見的遠程節點的IP地址(上例中NodeB IP)

  • addrMe:當前節點的IP地址(上例中NodeA IP)

  • subver:指示當前節點運行的軟件類型的子版本號(例如:”/Satoshi:0.9.2.1/”)

  • BestHeight:當前節點區塊鏈的區塊高度(初始為0,即只包含創世區塊)

 我們簡化一下,這里我們最關心的就是區塊高度bestHeight,我們就傳遞這個好了。區塊高度就是區塊鏈的長度。

1 if ("VERSION".equalsIgnoreCase(cmd)) {
2                             // 對方發來握手信息,我方發給對方區塊高度和最新區塊的hash
3                             pt.peerWriter.write("VERACK " + blockChain.size() + " " + blockChain.get(blockChain.size() - 1).getHash());
4                         }else if ("VERACK".equalsIgnoreCase(cmd)) {
5                             // 獲取區塊高度
6                             String[] parts = payload.split(" ");
7                             bestHeight = Integer.parseInt(parts[0]);
8                             //哈希暫時不校驗
9                         }

3.新節點建立更多的連接,使節點在網絡中被更多節點接收,保證連接更穩定

這里我們就兩台機,如果你有更多機器,可以實現一下這個通訊,本章我們簡單實現一下。

 1 if ("ADDR".equalsIgnoreCase(cmd)) {
 2                             // 對方發來地址,建立連接並保存
 3                             if (!peers.contains(payload)) {
 4                                 String peerAddr = payload.substring(0, payload.indexOf(":"));
 5                                 int peerPort = Integer.parseInt(payload.substring(payload.indexOf(":") + 1));
 6                                 peerNetwork.connect(peerAddr, peerPort);
 7                                 peers.add(payload);
 8                                 PrintWriter out = new PrintWriter(peerFile);
 9                                 for (int k = 0; k < peers.size(); k++) {
10                                     out.println(peers.get(k));
11                                 }
12                                 out.close();
13                             }
14                         } else if ("GET_ADDR".equalsIgnoreCase(cmd)) {
15                             //對方請求更多peer地址,隨機給一個
16                             Random random = new Random();
17                             pt.peerWriter.write("ADDR " + peers.get(random.nextInt(peers.size())));
18                         } 

4.交換“區塊清單”(注:該步驟僅在全節點上會執行,且從與節點建立連接就開始進行)本系列內容只使用全節點。

全節點

全節點沿着區塊鏈按時間倒敘一直追溯到創世區塊,建立一個完整的UTXO數據庫,通過查詢UTXO是否未被支付來驗證交易的有效性。

SPV節點

SPV節點通過向其他節點請求某筆交易的Merkle路徑(Merkle樹我可能會在后續章節講到),如果路徑正確無誤,並且該交易之上已有6個或以上區塊被確認,則證明該交易不是雙重支付。

全節點在連接到其他節點后,需要構建完整的區塊鏈,如果是新節點,它僅包含靜態植入客戶端中的0號區塊(創世區塊)。注意了,創世區塊是靜態的(硬編碼)。

如前文所言,我們在區塊鏈中取最長的鏈,區塊高度比我高,我就向對方獲取區塊。

 1 else if ("BLOCK".equalsIgnoreCase(cmd)) {
 2                             //把對方給的塊存進鏈中
 3                             LOGGER.info("Attempting to add block...");
 4                             LOGGER.info("Block: " + payload);
 5                             Block newBlock = gson.fromJson(payload, Block.class);
 6                             if (!blockChain.contains(newBlock)) {
 7                                 // 校驗區塊,如果成功,將其寫入本地區塊鏈
 8                                 if (BlockUtils.isBlockValid(newBlock, blockChain.get(blockChain.size() - 1))) {
 9                                     if (blockChain.add(newBlock) && !catchupMode) {
10                                         LOGGER.info("Added block " + newBlock.getIndex() + " with hash: ["+ newBlock.getHash() + "]");
11                                         peerNetwork.broadcast("BLOCK " + payload);
12                                     }
13                                 }
14                             }
15                         } else if ("GET_BLOCK".equalsIgnoreCase(cmd)) {
16                             //把對方請求的塊給對方
17                             LOGGER.info("Sending block[" + payload + "] to peer");
18                             Block block = blockChain.get(Integer.parseInt(payload));
19                             if (block != null) {
20                                 LOGGER.info("Sending block " + payload + " to peer");
21                                 pt.peerWriter.write("BLOCK " + gson.toJson(block));
22                             }
23                         }

到這里,我們基本上完成了p2p網絡中關於區塊的通訊。

其實比特幣等虛擬貨幣中還有很多通訊,關於交易的,這里我們不需要,不做討論。

讓我們開始編碼吧!

整合上文提到的所有通訊,Node.java的代碼如下

  1 private static final Logger LOGGER = LoggerFactory.getLogger(Node.class);
  2 
  3     /** 本地區塊鏈 */
  4     private static List<Block> blockChain = new LinkedList<Block>();
  5 
  6     public static void main(String[] args) throws IOException, InterruptedException {
  7         int port = 8015;
  8         LOGGER.info("Starting peer network...  ");
  9         PeerNetwork peerNetwork = new PeerNetwork(port);
 10         peerNetwork.start();
 11         LOGGER.info("[  Node is Started in port:"+port+"  ]"); 17         ArrayList<String> peers = new ArrayList<>();
 18         File peerFile = new File("peers.list");
 19         if (!peerFile.exists()) {
 20             String host = InetAddress.getLocalHost().toString();
 21             FileUtils.writeStringToFile(peerFile, host+":"+port);
 22         }
 23         for (Object peer : FileUtils.readLines(peerFile)) {
 24             String[] addr = peer.toString().split(":");
 25             peerNetwork.connect(addr[0], Integer.parseInt(addr[1]));
 26         }
 27         TimeUnit.SECONDS.sleep(2);
 28 
 29         peerNetwork.broadcast("VERSION");
 30 
 31         // hard code genesisBlock
 32         Block genesisBlock = new Block();
 33         genesisBlock.setIndex(0);
 34         genesisBlock.setTimestamp("2017-07-13 22:32:00");//my son's birthday  35         genesisBlock.setVac(0);
 36         genesisBlock.setPrevHash("");
 37         genesisBlock.setHash(BlockUtils.calculateHash(genesisBlock));
 38         blockChain.add(genesisBlock);
 39 
 40         final Gson gson = new GsonBuilder().create();
 41         LOGGER.info(gson.toJson(blockChain));
 42         int bestHeight = 0;
 43         boolean catchupMode = true;
 44 
 45         /**
 46          * p2p 通訊
 47          */
 48         while (true) {
 49             //對新連接過的peer寫入文件,下次啟動直接連接
 50             for (String peer : peerNetwork.peers) {
 51                 if (!peers.contains(peer)) {
 52                     peers.add(peer);
 53                     FileUtils.writeStringToFile(peerFile, peer);
 54                 }
 55             }
 56             peerNetwork.peers.clear();
 57 
 58             // 處理通訊
 59             for (PeerThread pt : peerNetwork.peerThreads) {
 60                 if (pt == null || pt.peerReader == null) {
 61                     break;
 62                 }
 63                 List<String> dataList = pt.peerReader.readData();
 64                 if (dataList == null) {
 65                     LOGGER.info("Null ret retry.");
 66                     System.exit(-5);
 67                     break;
 68                 }
 69 
 70                 for (String data:dataList) {
 71                     LOGGER.info("Got data: " + data);
 72                     int flag = data.indexOf(' ');
 73                     String cmd = flag >= 0 ? data.substring(0, flag) : data;
 74                     String payload = flag >= 0 ? data.substring(flag + 1) : "";
 75                     if (StringUtils.isNotBlank(cmd)) {
 76                         if ("VERSION".equalsIgnoreCase(cmd)) {
 77                             // 對方發來握手信息,我方發給對方區塊高度和最新區塊的hash
 78                             pt.peerWriter.write("VERACK " + blockChain.size() + " " + blockChain.get(blockChain.size() - 1).getHash());
 79                         }else if ("VERACK".equalsIgnoreCase(cmd)) {
 80                             // 獲取區塊高度
 81                             String[] parts = payload.split(" ");
 82                             bestHeight = Integer.parseInt(parts[0]);
 83                             //哈希暫時不校驗
 84                         } else if ("GET_BLOCK".equalsIgnoreCase(cmd)) {
 85                             //把對方請求的塊給對方
 86                             LOGGER.info("Sending block[" + payload + "] to peer");
 87                             Block block = blockChain.get(Integer.parseInt(payload));
 88                             if (block != null) {
 89                                 LOGGER.info("Sending block " + payload + " to peer");
 90                                 pt.peerWriter.write("BLOCK " + gson.toJson(block));
 91                             }
 92                         } else if ("BLOCK".equalsIgnoreCase(cmd)) {
 93                             //把對方給的塊存進鏈中
 94                             LOGGER.info("Attempting to add block...");
 95                             LOGGER.info("Block: " + payload);
 96                             Block newBlock = gson.fromJson(payload, Block.class);
 97                             if (!blockChain.contains(newBlock)) {
 98                                 // 校驗區塊,如果成功,將其寫入本地區塊鏈
 99                                 if (BlockUtils.isBlockValid(newBlock, blockChain.get(blockChain.size() - 1))) {
100                                     if (blockChain.add(newBlock) && !catchupMode) {
101                                         LOGGER.info("Added block " + newBlock.getIndex() + " with hash: ["+ newBlock.getHash() + "]");
102                                         peerNetwork.broadcast("BLOCK " + payload);
103                                     }
104                                 }
105                             }
106                         }else if ("GET_ADDR".equalsIgnoreCase(cmd)) {
107                             //對方請求更多peer地址,隨機給一個
108                             Random random = new Random();
109                             pt.peerWriter.write("ADDR " + peers.get(random.nextInt(peers.size())));
110                         } else if ("ADDR".equalsIgnoreCase(cmd)) {
111                             // 對方發來地址,建立連接並保存
112                             if (!peers.contains(payload)) {
113                                 String peerAddr = payload.substring(0, payload.indexOf(":"));
114                                 int peerPort = Integer.parseInt(payload.substring(payload.indexOf(":") + 1));
115                                 peerNetwork.connect(peerAddr, peerPort);
116                                 peers.add(payload);
117                                 PrintWriter out = new PrintWriter(peerFile);
118                                 for (int k = 0; k < peers.size(); k++) {
119                                     out.println(peers.get(k));
120                                 }
121                                 out.close();
122                             }
123                         } 
124                     }
125                 }
126             }
127 
128             // ********************************
129             //         比較區塊高度,同步區塊
130             // ********************************
131 
132             int localHeight = blockChain.size();
133 
134             if (bestHeight > localHeight) {
135                 catchupMode = true;
136                 LOGGER.info("Local chain height: " + localHeight);
137                 LOGGER.info("Best chain Height: " + bestHeight);
138                 TimeUnit.MILLISECONDS.sleep(300);
139                 
140                 for (int i = localHeight; i < bestHeight; i++) {
141                     LOGGER.info("請求塊 " + i + "...");
142                     peerNetwork.broadcast("GET_BLOCK " + i);
143                 }
144             } else {
145                 if (catchupMode) {
146                     LOGGER.info("[p2p] - Caught up with network.");
147                 }
148                 catchupMode = false;
149             }
150 
151             
152 
153             // ****************
154             // 循環結束
155             // ****************
156             TimeUnit.MILLISECONDS.sleep(200);
157         }
158     }

PeerNetwork簡單封裝了一下p2p通訊的細節,篇幅有限我這里只列出核心交互,具體實現可以去看我的github源碼中的類:PeerThread、PeerReader、PeerWriter。

RPC

接下來,我們將討論關於本地節點客戶端的概念。在比特幣中,除了bitcoin-core,還有bitcoin-cli,這是做什么的呢。

它其實是用來做本地節點交互的,比如我作為本地節點,我需要發起交易,需要查看我的資產等等,后來發展出gui界面,就是大家俗稱的錢包。

在本章中,我們也需要這樣一個客戶端通訊,用來將我們的vac寫入鏈中(之前的文章,我們是用控制台輸入的,實際的做法是提供加密的rpc調用)我們接下來實現RPC服務

首先我們要在Node.java中加入通訊邏輯

LOGGER.info("Starting RPC daemon... ");
RpcServer rpcAgent = new RpcServer(port+1);
rpcAgent.start();
LOGGER.info("[ RPC agent is Started in port:"+(port+1)+" ]");

for循環體中增加
/**
* 處理RPC服務 */ for (RpcThread th:rpcAgent.rpcThreads) { String request = th.req; if (request != null) { String[] parts = request.split(" "); parts[0] = parts[0].toLowerCase(); if ("getinfo".equals(parts[0])) { String res = gson.toJson(blockChain); th.res = res; } else if ("send".equals(parts[0])) { try { int vac = Integer.parseInt(parts[1]); // 根據vac創建區塊 Block newBlock = BlockUtils.generateBlock(blockChain.get(blockChain.size() - 1), vac); if (BlockUtils.isBlockValid(newBlock, blockChain.get(blockChain.size() - 1))) { blockChain.add(newBlock); th.res = "write Success!"; peerNetwork.broadcast("BLOCK " + gson.toJson(newBlock)); } else { th.res = "RPC 500: Invalid vac Error\n"; } } catch (Exception e) { th.res = "Syntax (no '<' or '>'): send <vac> <privateKey>"; LOGGER.error("invalid vac", e); } } else { th.res = "Unknown command: \"" + parts[0] + "\""; } } }

獨立線程處理

RpcThread.java

package com.v5ent.real.p2p;

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.Socket;
import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
 * 處理單個rpc連接
 * @author Mignet
 */
public class RpcThread extends Thread {
    
    private static final Logger LOGGER = LoggerFactory.getLogger(RpcThread.class);
    
    private Socket socket;
    String res;
    String req;

    /**
     * 默認構造函數
     * @param socket
     */
    public RpcThread(Socket socket){
        this.socket = socket;
    }

    @Override
    public void run(){
        try{
            req = null;
            res = null;
            PrintWriter out = new PrintWriter(socket.getOutputStream(), true);
            BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
            String input;
            out.println("================Welcome RPC Daemon==============");
            while((input = in.readLine()) != null){
                if ("HELP".equalsIgnoreCase(input)){
                    out.println("############################################## COMMANDS ###############################################");
                    out.println("#     1) getinfo       - Gets block chain infomations.                                                #");
                    out.println("#     2) send <vac>    - Write <vac> to blockChain                                                    #");
                    out.println("#######################################################################################################");
                } else {
                    req = input;
                    while (res == null){
                        TimeUnit.MILLISECONDS.sleep(25);
                    }
                    out.println(res);
                    req = null;
                    res = null;
                }
            }
        } catch (Exception e){
            LOGGER.info("An RPC client has disconnected.",e);
        }
    }
}

RpcServer.java

package com.v5ent.real.p2p;

import java.net.ServerSocket;
import java.util.ArrayList;
import java.util.List;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * RPC服務
 *
 * 注意:不要把這個端口開放給外網
 * @author Mignet
 */
public class RpcServer extends Thread
{
    private static final Logger LOGGER = LoggerFactory.getLogger(RpcServer.class);
    private int port;
    private boolean runFlag = true;

    List<RpcThread> rpcThreads;

    /**
     * 默認配置
     */
    public RpcServer()
    {
        this.port = 8016;
        this.rpcThreads = new ArrayList<>();
    }

    /**
     * 指定端口
     * @param port Port to listen on
     */
    public RpcServer(int port)
    {
        this.port = port;
        this.rpcThreads = new ArrayList<>();
    }

    @Override
    public void run()
    {
        try
        {
            ServerSocket socket = new ServerSocket(port);
            while (runFlag)
            {
                RpcThread thread = new RpcThread(socket.accept());
                rpcThreads.add(thread);
                thread.start();
            }
            socket.close();
        } catch (Exception e){
            LOGGER.error("rpc error in port:" + port,e);
        }
    }
}

跑起來

1.使用mvn的install命令打包

2.新建peers.list,把要組建網絡的ip地址填入去,在本機執行jar命令,啟動第一個節點。注意,這時候它會嘗試連接別的節點,連接不上

3.把jar包和peers.list上傳到其他機器,啟動第二個節點

4.我們用cmd在本機打開新的窗口,執行nc 127.0.0.1 8016,連接到本機節點的rpc服務,輸入help查看支持的命令:

5.節點1(本機)增加一個區塊:在rpc命令中,我們實現了1,查看區塊鏈;2,寫入vac數據,來驗證一下

我們先輸入getinfo查看一下,然后send 88,看到寫入成功了,再輸入getinfo,果然看到了新的塊在鏈中。

我們也可以看到節點控制台的輸出中關於新增區塊的信息

6.驗證是不是同步了:我們看一下另一台機器

我們在這台機器上也使用nc localhost 8016來連接看看區塊

7.我們再從這個結點寫入一個塊(send 666)

看看本機接收到了沒

很完美。

到此基本演示了區塊鏈通訊真實的樣子。當然,這里有很多可以改進的地方,比如安全性,比如命令的模式,比如不用sleep而是用Future,比如使用netty等更高效更成熟的通訊框架。

如果想利用區塊鏈來發行數字貨幣,那么在此基礎上,還要有公私鑰簽名交易,交易通訊,校驗,使用共識算法來選舉及獎勵貨幣等。

還有什么區塊鏈知識是本系列沒有提到的嗎?有的,使用默克爾樹來快速驗證區塊和整個鏈.

關於幣的問題也可以問,比如UTXO模型可以講一講嗎?如果有問題請大家留言


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM