EOS多節點同步代碼分析


EOS version: 1.0.7

一. 配置文件的修改

  EOS的節點同步流程是通過p2p來完成,在nodeos的配置文件config.ini中填寫,其默認路徑為~/.local/share/eosio/nodeos/config目錄下,配置項及其格式如下:

p2p-peer-address = 10.186.11.223:9876
121 p2p-peer-address = 10.186.11.220:9876
122 p2p-peer-address = 10.186.11.141:9876

可以填寫多個p2p站點地址。

二.節點同步的chain_id

  每一個節點都唯一分配一個chain_id,如果兩個節點的chian_id不相等的話,是無法進行同步的,代碼中處理如下:

void net_plugin_impl::handle_message( connection_ptr c, const handshake_message &msg) {
         ...

if( msg.chain_id != chain_id) {
            elog( "Peer on a different chain. Closing connection");
            c->enqueue( go_away_message(go_away_reason::wrong_chain) );
            return;
         }

         ...
}

  那么這個chain_id是如何開成的?

  chain_id在chain_plugin中定義,在net_plugin中使用,在chain_plugin中如下定義

//controller.cpp 

chain_id( cfg.genesis.compute_chain_id() )


//genesis_state.cpp

chain::chain_id_type genesis_state::compute_chain_id() const {

digest_type::encoder enc;
fc::raw::pack( enc, *this );
return chain_id_type{enc.result()};
}

  這里相當於把整個genesis的數據做了一個類似hash的操作,默認情況下genesis的數據在代碼中填寫:

chain_config   initial_configuration = {
      .max_block_net_usage                  = config::default_max_block_net_usage,
      .target_block_net_usage_pct           = config::default_target_block_net_usage_pct,
      .max_transaction_net_usage            = config::default_max_transaction_net_usage,
      .base_per_transaction_net_usage       = config::default_base_per_transaction_net_usage,
      .net_usage_leeway                     = config::default_net_usage_leeway,
      .context_free_discount_net_usage_num  = config::default_context_free_discount_net_usage_num,
      .context_free_discount_net_usage_den  = config::default_context_free_discount_net_usage_den,

      .max_block_cpu_usage                  = config::default_max_block_cpu_usage,
      .target_block_cpu_usage_pct           = config::default_target_block_cpu_usage_pct,
      .max_transaction_cpu_usage            = config::default_max_transaction_cpu_usage,
      .min_transaction_cpu_usage            = config::default_min_transaction_cpu_usage,

      .max_transaction_lifetime             = config::default_max_trx_lifetime,
      .deferred_trx_expiration_window       = config::default_deferred_trx_expiration_window,
      .max_transaction_delay                = config::default_max_trx_delay,
      .max_inline_action_size               = config::default_max_inline_action_size,
      .max_inline_action_depth              = config::default_max_inline_action_depth,
      .max_authority_depth                  = config::default_max_auth_depth,
   };

  還可以通過nodeos命令行參數--genesis-json加載一個指定的配置文件genesis.json,其內容一般如下格式:

{
  "initial_timestamp": "2018-03-02T12:00:00.000",
  "initial_key": "EOS8Znrtgwt8TfpmbVpTKvA2oB8Nqey625CLN8bCN3TEbgx86Dsvr",
  "initial_configuration": {
    "max_block_net_usage": 1048576,
    "target_block_net_usage_pct": 1000,
    "max_transaction_net_usage": 524288,
    "base_per_transaction_net_usage": 12,
    "net_usage_leeway": 500,
    "context_free_discount_net_usage_num": 20,
    "context_free_discount_net_usage_den": 100,
    "max_block_cpu_usage": 100000,
    "target_block_cpu_usage_pct": 500,
    "max_transaction_cpu_usage": 50000,
    "min_transaction_cpu_usage": 100,
    "max_transaction_lifetime": 3600,
    "deferred_trx_expiration_window": 600,
    "max_transaction_delay": 3888000,
    "max_inline_action_size": 4096,
    "max_inline_action_depth": 4,
    "max_authority_depth": 6,
    "max_generated_transaction_count": 16
  },
  "initial_chain_id": "0000000000000000000000000000000000000000000000000000000000000000"
}

  所以,節點之間能同步的條件是參數配置需要完全相當的。

四.區塊同步數據流

  數據同步涉及幾個消息:

  handshake_message,  //hello握手信息,
  chain_size_message,  //暫未看到使用
  go_away_message //停止同步消息
  time_message,  // 時間戳相關
  notice_message,  //區塊和事務狀態同步
  request_message,  //請求發送區塊同步,帶有區塊的num數據
  sync_request_message,  //在request_message基礎上加了一個定時器做超時處理
  signed_block,      // 具體的區塊數據
  packed_transaction    //事務同步處理

 

  現在假設有一個節點M,它的p2p-peer-address對就有三個地址a、b、c,現在數據同步的流程基本上有下面幾個步驟.

 

  1.handshake_message處理流程

    首先,M結點會向a、b、c循環發起連接並發送一條握手信息,這條信息是一個名為struct handshake_message,定義如下:

struct handshake_message {
      uint16_t                   network_version = 0; //net version, require  M == a == b == c      chain_id_type              chain_id; // M == a == b == c      fc::sha256                 node_id; ///< used to identify peers and prevent self-connect
      chain::public_key_type     key; ///< authentication key; may be a producer or peer key, or empty
      tstamp                     time;
      fc::sha256                 token; ///< digest of time to prove we own the private key of the key above
      chain::signature_type      sig; ///< signature for the digest
      string                     p2p_address;
      uint32_t                   last_irreversible_block_num = 0;
      block_id_type              last_irreversible_block_id;
      uint32_t                   head_num = 0;
      block_id_type              head_id;
      string                     os;
      string                     agent;
      int16_t                    generation;
   };

包括了對通信的基本要求的參數,該消息初始化后會將其放入名為write_queue的消息隊列中,最后消息是使用asio::async_write進行發送,發送消息的成功與否是通過回調來處理的。

void connection::do_queue_write() {

...
while (write_queue.size() > 0) {
         auto& m = write_queue.front();
         bufs.push_back(boost::asio::buffer(*m.buff));
         out_queue.push_back(m);
         write_queue.pop_front();
      }
      boost::asio::async_write(*socket, bufs, [c](boost::system::error_code ec, std::size_t w) {
 try {
        for (auto& m: conn->out_queue) {
                  m.callback(ec, w);
               }

while (conn->out_queue.size() > 0) {
                  conn->out_queue.pop_front();
               }
               conn->enqueue_sync_block();
               conn->do_queue_write();

    }
...
}

對端收到handshake_message的消息后處理如下代碼:

void sync_manager::recv_handshake (connection_ptr c, const handshake_message &msg) {
    controller& cc = chain_plug->chain();
      uint32_t lib_num = cc.last_irreversible_block_num( );
      uint32_t peer_lib = msg.last_irreversible_block_num;
      reset_lib_num(c);
      c->syncing = false;

      //--------------------------------
      // sync need checks; (lib == last irreversible block)
      //
      // 0. my head block id == peer head id means we are all caugnt up block wise
      // 1. my head block num < peer lib - start sync locally
      // 2. my lib > peer head num - send an last_irr_catch_up notice if not the first generation
      //
      // 3  my head block num <= peer head block num - update sync state and send a catchup request
      // 4  my head block num > peer block num ssend a notice catchup if this is not the first generation
      //
      //-----------------------------

      uint32_t head = cc.head_block_num( );
      block_id_type head_id = cc.head_block_id();
      if (head_id == msg.head_id) {
      ...  
    }
        
    ...
}

梳理流程:

  • 兩個節點歷史區塊id相等,不進行同步;
  • A節點區塊的head_block_num小於B節點不可逆區塊的head_block_num,則B給A發送消息notice_message,消息中包含A節點所需要同步的區塊范圍,每次同步塊數為sync_req_span,此參數在genesis.json中設置或者是程度初始的;
  • A節點不可逆區塊的head_block_num大於B節點區塊的head_block_num,則A給B發送消息notice_message,消息中包含可逆與不可逆區塊的block_num;
  • A節點區塊的head_block_num小於B節點的head_block_num,A節點會產生一個request_message消息發送給B; 

2.go_away_message

  一般在某些異常情況下節點A會斷開與其它節點的同步,會發送一個go_away_message,會帶有一個錯誤碼:

  enum go_away_reason {
    no_reason, ///< no reason to go away
    self, ///< the connection is to itself
    duplicate, ///< the connection is redundant
    wrong_chain, ///< the peer's chain id doesn't match
    wrong_version, ///< the peer's network version doesn't match
    forked, ///< the peer's irreversible blocks are different
    unlinkable, ///< the peer sent a block we couldn't use
    bad_transaction, ///< the peer sent a transaction that failed verification
    validation, ///< the peer sent a block that failed validation
    benign_other, ///< reasons such as a timeout. not fatal but warrant resetting
    fatal_other, ///< a catch-all for errors we don't have discriminated
    authentication ///< peer failed authenicatio
  };

3.time_message

  這個消息應該是發送一個帶有幾個時間標志的keeplive消息包,目前設置的是每32秒發送一次。

 

4.notice_message

  這個消息定義如下:

struct notice_message {
    notice_message () : known_trx(), known_blocks() {}
    ordered_txn_ids known_trx;
    ordered_blk_ids known_blocks;
  };

  它包含了區塊的信息和交易信息,也即對可逆區塊,可逆事務,不可逆區塊,不可逆事務都可以通過這個消息處理。比如,節點A把本地節點最新區塊和事務信息(block_num)發送給節點B,節點B收到后會將本地的區塊和事務信息(block_num)進行比較,根據比較的結果決定誰給誰傳輸數據。

 

5.request_message

  A節點請求端分為四種,節點B做為接收端,分別給予的應答如下:

   對於區塊:

  • catch_up:B節點把本地的所有可逆的區塊打包發給節點A; 
  • normal:根據A節點vector里面的區塊id,在本地(B節點)不可逆的區塊中進行查找,如果找到了就把該區塊就發給A;

  對於事務:

  • catch_up:B節點把A節點所需要的可逆的transaction id 並且自己本地有的數據發送給A;
  • normal:  B節點把A節點所需要的不可逆的transaction id 並且自己本地有的數據發送給A;

6.sync_request_message

  此消息是在request_message實現基礎上加了一個5S的定時器,同步消息在5S內沒有得到應答會取消當前同步后再重新要求同步;

7.signed_block

  這里發送的是具體的區塊數據,一般是收到request_message或者 sync_request_message消息后把本節點的區塊發給對方;

 bool connection::enqueue_sync_block() {
      controller& cc = app().find_plugin<chain_plugin>()->chain();
      if (!peer_requested)
         return false;
      uint32_t num = ++peer_requested->last;
      bool trigger_send = num == peer_requested->start_block;
      if(num == peer_requested->end_block) {
         peer_requested.reset();
      }
      try {
      //從本地取出區塊數據 signed_block_ptr sb
= cc.fetch_block_by_number(num); if(sb) {
       //放入消息隊列並異步發送 enqueue(
*sb, trigger_send); return true; } } catch ( ... ) { wlog( "write loop exception" ); } return false; }

 

8.packed_transaction

  節點A把多個transacton放在一起進行打包發送,收到packed_transaction消息的節點會對其進行各種校驗,如果校驗結果正確,會把數據緩存到本地,然后再對本端所有p2p-peer-address的地址進行廣播。所以對於多個transaction的數據,在這里就實現了在多個地址之間相互快速傳播的功能。

void net_plugin_impl::handle_message( connection_ptr c, const packed_transaction &msg) {
      fc_dlog(logger, "got a packed transaction, cancel wait");
      peer_ilog(c, "received packed_transaction");
      if( sync_master->is_active(c) ) {
         fc_dlog(logger, "got a txn during sync - dropping");
         return;
      }
      transaction_id_type tid = msg.id();

    //收到數據后取異步定時器 c
->cancel_wait(); if(local_txns.get<by_id>().find(tid) != local_txns.end()) { fc_dlog(logger, "got a duplicate transaction - dropping"); return; }
    //將數據保存到本地的緩存中 dispatcher
->recv_transaction(c, tid); uint64_t code = 0;

    //對數據進行校驗,然后把結果傳遞給回調函數 chain_plug
->accept_transaction(msg, [=](const static_variant<fc::exception_ptr, transaction_trace_ptr>& result) { if (result.contains<fc::exception_ptr>()) { auto e_ptr = result.get<fc::exception_ptr>(); if (e_ptr->code() != tx_duplicate::code_value && e_ptr->code() != expired_tx_exception::code_value) elog("accept txn threw ${m}",("m",result.get<fc::exception_ptr>()->to_detail_string())); peer_elog(c, "bad packed_transaction : ${m}", ("m",result.get<fc::exception_ptr>()->what())); } else { auto trace = result.get<transaction_trace_ptr>();
         if (!trace->except) { fc_dlog(logger, "chain accepted transaction");
        
          //對其它p2p-peer-address進行廣播,數據互傳 dispatcher
->bcast_transaction(msg); return; } peer_elog(c, "bad packed_transaction : ${m}", ("m",trace->except->what())); }       //數據校給失敗,本地緩存數據回滾 dispatcher->rejected_transaction(tid); }); }

 

 

 

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM