EOS生產區塊:解析插件producer_plugin


producer_plugin是控制區塊生產的關鍵插件。

關鍵字:producer_plugin,同步區塊的處理,pending區塊,生產區塊,最后不可逆區塊,生產循環,生產安排,水印輪次,計時器,確認數

producer_plugin生命周期

EOS的所有plugin都有共同的基類,因此每個plugin的研究都可以從生命周期入手。

①set_program_options

向config.ini文件中增加屬於producer_plugin的配置項。這些配置項如下表所示。

配置項 解釋
enable-stale-production 允許區塊生產,即使鏈是陳腐的,即鏈生產的區塊由於遲到未能被采納進鏈。
pause-on-startup 當生產暫停時啟動這個節點
max-transaction-time 執行已推送事務代碼的最長時間限制,過期則判定為無效,默認30毫秒
max-irreversible-block-age 當前節點生產區塊所在鏈的DPOS不可逆區塊的時間限制,按秒計算,默認-1無限制。
producer-name 當前節點的生產者ID,可以被多次指定。
private-key (已被丟棄,使用以下signature-provider替代)
signature-provider KV元組使用格式為<public-key>=<provider-spec>,等號前為公鑰,等號后為KEY:私鑰或KEOSD:私鑰。前者為以上公鑰對應的私鑰,后者為keosd的可用url並且相關錢包要被解鎖。
keosd-provider-timeout 發送區塊到keosd簽名的最大時間,按毫秒計算。
greylist-account 灰名單,記錄了無法繼承CPU/NET虛擬資源的賬戶列表。
produce-time-offset-us 非最后一個區塊產生時間的偏移量,按微秒計算。負值會導致塊更早出去,正值會導致塊更晚出去。
last-block-time-offset-us 最后一個區塊產生時間的偏移量,按微秒計算。負值會導致塊更早出去,正值會導致塊更晚出去。
incoming-defer-ratio 當兩者都被耗盡時,輸入交易和遞延交易之間的比率。
snapshots-dir 快照目錄的位置(絕對路徑或data目錄相對路徑)

②initialize

插件初始化,第一個階段是通過現有配置項初始化設置插件。現有配置項來自於配置文件config.ini中producer_plugin相關配置項與命令行參數中producer_plugin相關配置項的交集,同樣的配置項以命令行為准。現有配置以boost::program_options::variables_map&類型對象options為參數傳入初始化函數。配置過程操作的是producer_plugin的私有成員std::shared_ptr<class producer_plugin_impl> my。my指針擁有producer_plugin_impl對象的成員,這些成員都被設計為與傳入配置項對應,逐一設置即可。
第二個階段是4個遠程異步調用的聲明:

  • 前兩個通訊模式是訂閱一個channel綁定一個執行函數,一旦嗅到該頻道被發布則執行綁定的函數。
    • incoming::channels::block,接收區塊的頻道,該頻道將在bnet_plugin的on_message的on函數中被發布,觸發producer_plugin當前的訂閱函數on_incoming_block,下面詳述。
    • incoming::channels::transaction,接收事務的頻道,該頻道與上面相同,也將在bnet_plugin的on_message的on函數中被發布,觸發producer_plugin當前的訂閱函數on_incoming_transaction_async,下面詳述。
  • 后兩個通訊模式是注冊一個method,供外部程序調用。
    • incoming::methods::block_sync,接收區塊的同步方法。該method將在chain_plugin的read_write::push_block函數中被調用,這部分內容在chain_plugin的文章中有專門的分析。實際上執行的是producer_plugin當下注冊的方法on_incoming_block,同上。
    • incoming::methods::transaction_async,接收事務的同步方法。該method將在chain_plugin的read_write::push_transaction函數中被調用,實際上執行的是on_incoming_transaction_async,亦同上。

總結一下會發現,在producer_plugin的初始化階段:

  • 有兩個處理對象,
  • 有兩個通訊模式,
    • channel的方式,對接的是bnet_plugin
    • method的方式,對接的是chain_plugin

③startup

進入插件的啟動階段,首先設置日志,

const fc::string logger_name("producer_plugin");
fc::logger _log;

const fc::string trx_trace_logger_name("transaction_tracing");
fc::logger _trx_trace_log;

分別創建以producer_plugin插件為主的日志對象,以及事務追蹤"transaction_tracing"為主的日志對象。除了這兩個在插件內部新建的日志,還有程序自身日志,例如nodeos,日志信息將打印在nodeos的輸出位置,輸出插件啟動日志。接下來的工作列舉如下:

  • 校驗chain的db讀取模式以及本地生產者集合是否為空,根據不同情況輸出對應日志用於提示用戶。
  • 使用【信號槽技術】分別連接信號accepted_block(綁定本地處理函數on_block和信號irreversible_block(綁定本地處理函數on_irreversible_block,這兩個信號將在controller中被發射,從而觸發當前信號槽,這兩個處理函數將在下面詳述。

信號槽的方式,對接的都是controller。

之前討論過多次,由於解耦的模式,信號發射方和信號槽處理方互不認識,因此一個信號被發射,可以擁有多個信號槽處理方。

  • 是針對最后不可逆區塊的討論,下面詳述。
  • 如果本地生產者集合不為空時,輸出日志在當前為這些生產者啟動區塊生產工作。如果本地具備生產能力_production_enabled,如果當前鏈的頭區塊號為0,則調用new_chain_banner(chain),該函數下面詳述。
  • 執行定時生產循環函數schedule_production_loop,下面詳述。

④shutdown

釋放資源,代碼不多如下:

void producer_plugin::plugin_shutdown() {
   try {
      my->_timer.cancel(); // 停止倒計時器
   } catch(fc::exception& e) {
      edump((e.to_detail_string())); // 輸出錯誤日志
   }
   // 重置釋放連接槽
   my->_accepted_block_connection.reset();
   my->_irreversible_block_connection.reset();
}

插件關閉階段,取消計時器,后面會展開對計時器basic_deadline_timer的研究,重置(調用析構函數)清除上面startup階段啟動的兩個信號槽。

on_incoming_block 函數

/**
 * 處理incoming接收到的區塊。
 * @param block 已簽名區塊
 */
void on_incoming_block(const signed_block_ptr& block) {
   fc_dlog(_log, "received incoming block ${id}", ("id", block->id()));
   // 判斷區塊時間是否在當前節點的未來7秒之內,如果不是,則證明這個區塊還沒到處理的時間。
   EOS_ASSERT( block->timestamp < (fc::time_point::now() + fc::seconds(7)), block_from_the_future, "received a block from the future, ignoring it" );
   // 獲取鏈對象。
   chain::controller& chain = app().get_plugin<chain_plugin>().chain();
   /* 如果本地已經存在接收的區塊了,則不必處理,直接返回。*/
   auto id = block->id();
   auto existing = chain.fetch_block_by_id( id );
   if( existing ) { return; }
   // 啟動多線程驗證區塊。這個函數在下面有解釋
   auto bsf = chain.create_block_state_future( block );
   // 丟棄pending區塊
   chain.abort_block();
   // 拋出異常,保證重啟定時生產循環
   auto ensure = fc::make_scoped_exit([this](){
       schedule_production_loop();
   });
   // 向本地鏈推送新區塊
   bool except = false;
   try {
      chain.push_block(block);//推送區塊
   } catch ( const guard_exception& e ) {
      // 打印詳細錯誤日志,並跳出循環。
      app().get_plugin<chain_plugin>().handle_guard_exception(e);
      return;
   } catch( const fc::exception& e ) {
      elog((e.to_detail_string()));
      except = true;
   } catch ( boost::interprocess::bad_alloc& ) {
      chain_plugin::handle_db_exhaustion();
      return;
   }
   if( except ) {
      // rejected_block頻道發布某區塊已被拒絕的消息,該頻道已在bnet插件被訂閱,當消息發布,bnet插件會調用函數on_bad_block處理被拒區塊。
      app().get_channel<channels::rejected_block>().publish( block );
      return;
   }
   // 當鏈的頭塊狀態中時間戳的下一個點大於等於當前時間時,本地則具備生產能力。
   if( chain.head_block_state()->header.timestamp.next().to_time_point() >= fc::time_point::now() ) {
      _production_enabled = true;
   }
   if( fc::time_point::now() - block->timestamp < fc::minutes(5) || (block->block_num() % 1000 == 0) ) {
      //區塊時間點已流逝的時間在5分鍾之內的情況,或者區塊號是整千時。輸出日志模板並替換變量的值。
      ilog("Received block ${id}... #${n} @ ${t} signed by ${p} [trxs: ${count}, lib: ${lib}, conf: ${confs}, latency: ${latency} ms]",
              // p是生產者,
              // id是區塊id截取中間的8到16位輸出,
              // n是區塊號,t是區塊時間,
              // count是區塊中事務的數量,
              // lib是鏈最后一個不可逆區塊號,
              // confs是區塊的確認數
              ("p",block->producer)("id",fc::variant(block->id()).as_string().substr(8,16))
                   ("n",block_header::num_from_id(block->id()))("t",block->timestamp)
                   // confirmed,是生產者在簽名一個區塊時向前確認的區塊數量,默認是1,則只確認前一個區塊。
                   // latency,潛伏因素的字面含義。值為當前區塊時間點已流逝的時間。
                   // count是時間庫中的一個特殊函數,返回某個時間按照某個單位來計數時的字面值,可以用做跨單位的運算。
                   // block_timestamp_type類型定義了區塊鏈的時間戳的默認間隔是500ms,一個周期是2000年。
                   ("count",block->transactions.size())("lib",chain.last_irreversible_block_num())("confs", block->confirmed)("latency", (fc::time_point::now() - block->timestamp).count()/1000 ) );
   }
}

關於block_timestamp_type類型的定義,源碼如下:

typedef block_timestamp<config::block_interval_ms,config::block_timestamp_epoch> block_timestamp_type; 

...
const static int      block_interval_ms = 500;
const static uint64_t block_timestamp_epoch = 946684800000ll; // epoch is year 2000.

接着進入函數create_block_state_future,

std::future<block_state_ptr> create_block_state_future( const signed_block_ptr& b ) {
  EOS_ASSERT( b, block_validate_exception, "null block" );//不能為空塊
  auto id = b->id();
  // 已存在區塊,終止並提示
  auto existing = fork_db.get_block( id );
  EOS_ASSERT( !existing, fork_database_exception, "we already know about this block: ${id}", ("id", id) );

  auto prev = fork_db.get_block( b->previous );// 獲得前一個區塊,不存在則報錯。
  EOS_ASSERT( prev, unlinkable_block_exception, "unlinkable block ${id}", ("id", id)("previous", b->previous) );

  return async_thread_pool( [b, prev]() {// 傳入具體task到異步線程池。
     const bool skip_validate_signee = false;
     return std::make_shared<block_state>( *prev, move( b ), skip_validate_signee );
  } );
}

異步線程池async_thread_pool。傳入task,由當前同步的待驗證區塊以及前一個區塊組成,返回的是block_state對象。

template<typename F>
auto async_thread_pool( F&& f ) {
  auto task = std::make_shared<std::packaged_task<decltype( f() )()>>( std::forward<F>( f ) );
  boost::asio::post( *thread_pool, [task]() { (*task)(); } );// 將任務上傳到線程池,通過boost::asio庫異步分配線程並行處理。
  return task->get_future();
}

on_incoming_transaction_async 函數

該函數的工作是處理接收到的事務的本地同步,聲明如下:

/**
 * 處理接收到的事務的本地同步工作
 * @param trx 接收的事務,是打包狀態的
 * @param persist_until_expired 標志位:事務是否在過期前被持久化了,bool類型
 * @param next 回調函數next方法。
 */
void on_incoming_transaction_async(const packed_transaction_ptr& trx, bool persist_until_expired, next_function<transaction_trace_ptr> next) {}

可以分為三個部分,第一部分是校驗工作。

如果鏈不存在pending區塊狀態,則在pending接收事務結合中增加接收的事務待start_block中處理,並中止函數返回。

接收到的事務要打包在本地的pending區塊中,如果不存在pending區塊,說明本地節點未開始生產區塊,所以要插入到pending事務集合_pending_incoming_transactions中等待start_block來處理。這部分的校驗代碼如下:

chain::controller& chain = app().get_plugin<chain_plugin>().chain();
    if (!chain.pending_block_state()) {
      _pending_incoming_transactions.emplace_back(trx, persist_until_expired, next);
      return;   
}

第二部分是該函數定義了一個lambda的內部函數send_response,用於異步發送響應,該內部函數源碼如下:

auto send_response = [this, &trx, &chain, &next](const fc::static_variant<fc::exception_ptr, transaction_trace_ptr>& response) {
       next(response);
       if (response.contains<fc::exception_ptr>()) {
          // 如果響應中包含異常指針,則發布異常信息以及事務對象到channels::transaction_ack
          _transaction_ack_channel.publish(std::pair<fc::exception_ptr, packed_transaction_ptr>(response.get<fc::exception_ptr>(), trx));
          if (_pending_block_mode == pending_block_mode::producing) {// 如果pending區塊的模式為生產中,則打印出對應的debug日志:區塊被拒絕。
             fc_dlog(_trx_trace_log, "[TRX_TRACE] Block ${block_num} for producer ${prod} is REJECTING tx: ${txid} : ${why} ",
                     ("block_num", chain.head_block_num() + 1)
                             ("prod", chain.pending_block_state()->header.producer)
                             ("txid", trx->id())
                             ("why",response.get<fc::exception_ptr>()->what()));
          } else {// 如果pending區塊的模式為投機中,則打印出對應的debug日志:投機行為被拒絕。
             fc_dlog(_trx_trace_log, "[TRX_TRACE] Speculative execution is REJECTING tx: ${txid} : ${why} ",
                     ("txid", trx->id())
                             ("why",response.get<fc::exception_ptr>()->what()));
          }
       } else {// 響應中無異常。發布空異常信息以及事務對象到channels::transaction_ack
          _transaction_ack_channel.publish(std::pair<fc::exception_ptr, packed_transaction_ptr>(nullptr, trx));
          if (_pending_block_mode == pending_block_mode::producing) {// 仍舊區分pending區塊狀態生產中與投機行為的不同日志輸出。
             fc_dlog(_trx_trace_log, "[TRX_TRACE] Block ${block_num} for producer ${prod} is ACCEPTING tx: ${txid}",
                     ("block_num", chain.head_block_num() + 1)
                             ("prod", chain.pending_block_state()->header.producer)
                             ("txid", trx->id()));
          } else {
             fc_dlog(_trx_trace_log, "[TRX_TRACE] Speculative execution is ACCEPTING tx: ${txid}",
                     ("txid", trx->id()));
          }
       }
   };

這部分代碼很容易拆解,通過publish/subscribe通訊模式,本地發布頻道信息,交由頻道訂閱者異步處理。
頻道:channels::transaction_ack
publisher:on_incoming_transaction_async ->send_response
subscriber:net_plugin::plugin_startup
binding function:net_plugin_impl::transaction_ack
dispatcher:rejected_transaction/bcast_transaction

在這個異步通訊過程中,要加入校驗代碼。函數體被調用時,send_response已經收到了處理后的事務響應,同時捕獲了事務源對象,鏈對象。鏈對象在當前程序中應該是單例的,不必在此校驗。校驗響應事務是否存在異常信息,如果存在則將異常信息附屬發布到頻道消息,如果不存在則附屬空異常。

if (response.contains<fc::exception_ptr>()) {
    _transaction_ack_channel.publish(std::pair<fc::exception_ptr, packed_transaction_ptr>(response.get<fc::exception_ptr>(), trx));
} else {
    _transaction_ack_channel.publish(std::pair<fc::exception_ptr, packed_transaction_ptr>(nullptr, trx));
}

注意,在發布完頻道消息以后,要給前台輸出事務跟蹤日志。

producer_plugin的startup啟動階段分析過,該插件包含三種日志,事務跟蹤日志就是其中之一。輸出日志要判斷pending區塊性質是否是正在生產,如果是生產中的區塊,則打印區塊號,生產者以及事務id,如果不是生產中的區塊而是投機區塊(可能被生產也可能被丟棄),則只打印事務id。

publish的消息是trx源事務對象,而不是響應對象response。

前兩部分完成以后,本地存在pending區塊有打包事務的條件,且發送響應的函數也有了,准備工作已經做好了。接下來進入第三部分,正式開始本地打包接收事務的工作。工作開始之前,仍舊要先校驗:

  • 接收的事務是否過期,通過比較待打包區塊時間和接收事務時間確定事務是否過期,如果過期則發送事務已過期的響應信息並終止程序。
  • 接收事的務是否已存在,在本地查找該事務如果查到則發送事務已存在的響應信息並終止程序。

這兩個校驗的源碼如下:

auto block_time = chain.pending_block_state()->header.timestamp.to_time_point();//獲得待打包區塊時間,即鏈pending區塊頭的時間戳轉換而來。
auto id = trx->id();
if( fc::time_point(trx->expiration()) < block_time ) {//如果事務的過期時間小於區塊時間,說明區塊開始打包時事務已過期。報錯並中止。
  send_response(std::static_pointer_cast<fc::exception>(std::make_shared<expired_tx_exception>(FC_LOG_MESSAGE(error, "expired transaction ${id}", ("id", id)) )));
  return;
}

if( chain.is_known_unexpired_transaction(id) ) {// 如果在鏈db中找到了該事務,說明已存在,報錯並中止。
  send_response(std::static_pointer_cast<fc::exception>(std::make_shared<tx_duplicate>(FC_LOG_MESSAGE(error, "duplicate transaction ${id}", ("id", id)) )));
  return;
}

兩個校驗工作結束以后,要確定接收事務的code執行截止時間。初始化的值是當前時間加上本地設置的最大事務執行時間。但如果本地設置未限制最大事務執行時間或者pending區塊是本地正在生產且區塊時間小於截止時間的,事務截止時間改為區塊時間。這段代碼如下:

auto block_time = chain.pending_block_state()->header.timestamp.to_time_point();//獲得待打包區塊時間,即鏈pending區塊頭的時間戳轉換而來。
auto deadline = fc::time_point::now() + fc::milliseconds(_max_transaction_time_ms);// 算出事務的code執行的截止時間。
bool deadline_is_subjective = false; // 主觀截止日期標志位,事務截止時間為區塊時間
if (_max_transaction_time_ms < 0 || (_pending_block_mode == pending_block_mode::producing && block_time < deadline) ) {
  deadline_is_subjective = true; // 主觀截止日期標志位設置為true。
  deadline = block_time;// 截止時間改為區塊時間
}

接下來,確認了事務截止時間以后,執行推送接收的事務到區塊鏈。

// 調用chain推送事務,接收結果儲存在trace對象
auto trace = chain.push_transaction(std::make_shared<transaction_metadata>(*trx), deadline);

trace對象接收了chain的推送事務的處理結果。如果判斷該結果沒有異常則證明處理成功,則要先判斷標志位persist_until_expired是否為true,如果為true說明該事務在過期前已被成功持久化,需要在本地持久化事務集合對象中插入事務id,用來保證也能應用在未來的投機區塊。最后,將trace對象作為響應信息發送出去。源碼如下:

if (persist_until_expired) {// 標志位:事務過期前被持久化
    // 存儲事務ID,從而保證它也能應用在未來的投機區塊(可逆區塊)。
    _persistent_transactions.insert(transaction_id_with_expiry{trx->id(), trx->expiration()});
}
send_response(trace);// 將事務推送結果發送響應。

如果trace結果包含異常,則要判斷該異常是否是主觀異常。如果是的話,采用上面不存在pending區塊的處理方式,將事務插入到pending接收事務集合中,等待start_block處理,同時按照pending區塊性質輸出日志。如果不是主觀失敗,則直接丟棄事務,發送異常信息作為響應內容。源碼如下:

if (trace->except) {// 異常處理
    if (failure_is_subjective(*trace->except, deadline_is_subjective)) {
        // 主觀失敗,在pending接收事務結合中增加接收的事務待start_block中處理,並中止函數返回。
        _pending_incoming_transactions.emplace_back(trx, persist_until_expired, next);
        // 仍舊區分pending區塊狀態生產中與投機行為的不同日志輸出。
        if (_pending_block_mode == pending_block_mode::producing) {
           fc_dlog(_trx_trace_log, "[TRX_TRACE] Block ${block_num} for producer ${prod} COULD NOT FIT, tx: ${txid} RETRYING ",
                   ("block_num", chain.head_block_num() + 1)
                           ("prod", chain.pending_block_state()->header.producer)
                           ("txid", trx->id()));
        } else {
           fc_dlog(_trx_trace_log, "[TRX_TRACE] Speculative execution COULD NOT FIT tx: ${txid} RETRYING",
                   ("txid", trx->id()));
        }
    } else {
        // 增加異常信息,發送響應。
        auto e_ptr = trace->except->dynamic_copy_exception();
        send_response(e_ptr);
    }
}

on_block 函數

該函數在controller accepted_block信號處理時有過介紹。下面進行詳細分析,首先是三個校驗:

if( bsp->header.timestamp <= _last_signed_block_time ) return;
if( bsp->header.timestamp <= _start_time ) return;
if( bsp->block_num <= _last_signed_block_num ) return;
  • 如果區塊時間小於等於最后簽名區塊時間,則終止退出當前函數。
  • 如果區塊時間小於等於開始時間(初始化為當前時間),則終止退出當前函數。
  • 如果區塊號小於等於最后簽名區塊號,則退出當前函數。

通過以上三個校驗,可以得知新區塊要在最后簽名區塊之后(生產時間要在它之后,區塊號也要在它之后),另外新區塊的生產時間不能是比現在早的時間,必須是之后的時間才可能被當下所處理。校驗通過以后,新建活躍生產者賬戶集合active_producers,插入計划出塊的生產者。

const auto& active_producer_to_signing_key = bsp->active_schedule.producers;

flat_set<account_name> active_producers;
active_producers.reserve(bsp->active_schedule.producers.size());
for (const auto& p: bsp->active_schedule.producers) {
  active_producers.insert(p.producer_name);
}

接下來處理接收到的由它節點生產的區塊。

// 利用set\_intersection取本地生產者與集合active\_producers的交集
std::set_intersection( _producers.begin(), _producers.end(),
                      active_producers.begin(), active_producers.end(),
                      // 將結果存入一個迭代器make_function_output_iterator,迭代執行內部函數
                      boost::make_function_output_iterator( [&]( const chain::account_name& producer )
                      // 如果結果為空,說明本地生產者沒有出塊權利不屬於活躍生產者的一份子
{
    if( producer != bsp->header.producer ) { // 如果交集生產者不等於接收區塊的生產者,說明是校驗別人生產的區塊,如果是相等的不必做特殊處理。
        // 在活躍生產者的key中找到匹配的key(本地生產者賬戶公鑰)
       auto itr = std::find_if( active_producer_to_signing_key.begin(), active_producer_to_signing_key.end(),
                                [&](const producer_key& k){ return k.producer_name == producer; } );
       if( itr != active_producer_to_signing_key.end() ) {// 成功找到,否則說明該區塊不是合法生產者簽名拋棄不處理。
          auto private_key_itr = _signature_providers.find( itr->block_signing_key );
          // 獲取本地生產者私鑰
          if( private_key_itr != _signature_providers.end() ) {
             auto d = bsp->sig_digest();
             auto sig = private_key_itr->second( d );
             // 更新producer插件本地標志位
             _last_signed_block_time = bsp->header.timestamp;
             _last_signed_block_num  = bsp->block_num;

             // 組裝生產確認數據字段,包括區塊id,區塊摘要,生產者,簽名。發射信號confirmed\_block。但經過搜索,項目中目前沒有對該信號設置槽connection
             _self->confirmed_block( { bsp->id, d, producer, sig } );
          }
       }
    }
} ) );

在區塊創建之前要為該區塊的生產者設置水印用來標示該區塊的生產者是誰。水印就是一個kv結構對象,例如 _producer_watermarks[new_producer] = new_block_num;

chain::controller& chain = app().get_plugin<chain_plugin>().chain();
const auto hbn = bsp->block_num;
// 設置新區塊頭信息,水印信息,包括時間戳
auto new_block_header = bsp->header; 
new_block_header.timestamp = new_block_header.timestamp.next();
new_block_header.previous = bsp->id;
auto new_bs = bsp->generate_next(new_block_header.timestamp);

接下來,對於新安裝的生產者,可以設置他們的水印使他們變為活躍生產者。

if (new_bs.maybe_promote_pending() && bsp->active_schedule.version != new_bs.active_schedule.version) {
    flat_set<account_name> new_producers;
    new_producers.reserve(new_bs.active_schedule.producers.size());
    for( const auto& p: new_bs.active_schedule.producers) {
        if (_producers.count(p.producer_name) > 0)
            new_producers.insert(p.producer_name);
    }
    
    for( const auto& p: bsp->active_schedule.producers) {
        new_producers.erase(p.producer_name);
    }
    
    for (const auto& new_producer: new_producers) {
        _producer_watermarks[new_producer] = hbn;// 水印map,本地變量,用於指揮計划出塊的生產者。
    }
}

on_irreversible_block 函數

在producer_plugin中,該函數是用來更新不可逆區塊時間的,這個時間在系統中由一個時間變量_irreversible_block_time控制。

void on_irreversible_block( const signed_block_ptr& lib ) {
   _irreversible_block_time = lib->timestamp.to_time_point();
}

這個時間變量將用來計算不可逆區塊時間的流逝時間,即當前時間減去該時間變量的結果,如果結果為正數且不小說明很久沒有出現不可逆區塊了,反之則是剛剛出現不可逆區塊。

fc::microseconds get_irreversible_block_age() {
   auto now = fc::time_point::now();
   if (now < _irreversible_block_time) {
      return fc::microseconds(0);
   } else {
      return now - _irreversible_block_time;
   }
}

last_irreversible 的討論

在producer_plugin的啟動階段,包含一段關於最后不可逆區塊的代碼:

const auto lib_num = chain.last_irreversible_block_num();// 獲取當前最后不可逆區塊號
const auto lib = chain.fetch_block_by_number(lib_num); // 獲取最后不可逆區塊
if (lib) { // 如果最后不可逆區塊存在
  my->on_irreversible_block(lib); // 執行函數同步更新本地區塊的不可逆時間
} else { // 如果最后不可逆區塊不存在
  my->_irreversible_block_time = fc::time_point::maximum();// 區塊不可逆時間設置為最大值。
}

通常來講,最后不可逆區塊的存在是被用來定位本地事務被打包至某個區塊后是否成功上鏈變為不可逆狀態,只需要這個區塊號小於最后不可逆區塊即可確定。

以上代碼段中if-else語句比較容易理解,是根據最后不可逆區塊是否存在對本地區塊不可逆時間變量_irreversible_block_time的設置,存在則更新為最后不可逆區塊的時間,不存在則將其設置為時間最大值。不存在最后不可逆區塊意味着鏈數據完全是孤立的未經任何確認的,區塊鏈的特性也不再存在,因此本地時間變量設置為了時間的最大值。那么令人費解的是上面兩行代碼,首先獲取最后不可逆區塊號,接着通過該區塊號獲得區塊。

controller::last_irreversible_block_num

uint32_t controller::last_irreversible_block_num() const {
   return std::max(std::max(my->head->bft_irreversible_blocknum, my->head->dpos_irreversible_blocknum), my->snapshot_head_block);
}

以上獲取最后不可逆區塊號函數的源碼,可以看出是從當前區塊頭的bft不可逆區塊號、dpos不可逆區塊號以及快照頭塊的區塊號三者中選擇最大的一個作為結果返回。分別來看這三個區塊號的含義:

  • bft不可逆區塊號,在區塊頭狀態結構中的generate_next函數中有初始化的操作,這個函數主要是用來通過一個給定的時間生成一個模板的區塊頭狀態對象,不包含事務Merkle根、action Merkle根以及新生產者字段數據,因為這些組件是派生自鏈狀態的。總之,在代碼中查找,發現bft不可逆區塊號只有一個初始化為0的賦值動作,原因可能與EOS計划引入bft而目前還沒有bft有關系。因此該值為0。
  • dpos不可逆區塊號,controller初始化為0。仍舊在generate_next函數中找到該字段的初始化值為calc_dpos_last_irreversible()函數的結果。
  • 快照的頭塊號,初始化是0,如果有快照讀入的話,就是快照的頭區塊號。

calc_dpos_last_irreversible函數

該函數用來計算dpos最后不可逆區塊。

uint32_t block_header_state::calc_dpos_last_irreversible()const {
  vector<uint32_t> blocknums; blocknums.reserve( producer_to_last_implied_irb.size() );
  for( auto& i : producer_to_last_implied_irb ) {
     blocknums.push_back(i.second);
  }
  if( blocknums.size() == 0 ) return 0;
  std::sort( blocknums.begin(), blocknums.end() );//默認從小到大排序。less<int>()
  return blocknums[ (blocknums.size()-1) / 3 ];// dpos最后不可逆區塊的判斷條件是必須在池子里面保持有2/3個區塊號是大於自己的。
}

fetch_block_by_number

signed_block_ptr controller::fetch_block_by_number( uint32_t block_num )const  { try {
   auto blk_state = my->fork_db.get_block_in_current_chain_by_num( block_num );// 從分叉庫中根據塊號獲取狀態區塊。
   if( blk_state && blk_state->block ) {//狀態區塊存在且其block成員也存在
      return blk_state->block; // 返回其block成員對象。
   }

   return my->blog.read_block_by_num(block_num);// 否則的話從block.log日志中獲取區塊返回。
} FC_CAPTURE_AND_RETHROW( (block_num) ) }

重新回到producer_plugin的啟動階段的last_irreversible 的討論。首先通過函數last_irreversible_block_num從bft和dpos以及快照三個區塊號中獲取最大的一個,由於目前未引進bft且有快照進入的概率不高,所以暫定該最后不可逆區塊號為dpos的那個號。接着用這個區塊號通過fetch_block_by_number中查找,先在fork_db中查找,如果沒有則在block.log中查找獲得區塊對象。不過一般fork_db中不應該存在不可逆區塊,如果區塊變為不可逆狀態應該被立即持久化到block.log,並從fork_db中刪除。

new_chain_banner(chain)

該函數翻譯過來就是新鏈的條幅,條幅是顯示在日志中的,源碼如下:

void new_chain_banner(const eosio::chain::controller& db)
{
   std::cerr << "\n"
      "*******************************\n"
      "*                             *\n"
      "*   ------ NEW CHAIN ------   *\n"
      "*   -  Welcome to EOSIO!  -   *\n"
      "*   -----------------------   *\n"
      "*                             *\n"
      "*******************************\n"
      "\n";

   if( db.head_block_state()->header.timestamp.to_time_point() < (fc::time_point::now() - fc::milliseconds(200 * config::block_interval_ms)))
   {
      std::cerr << "Your genesis seems to have an old timestamp\n"
         "Please consider using the --genesis-timestamp option to give your genesis a recent timestamp\n"
         "\n"
         ;
   }
   return;
}

傳入一個鏈對象(controller實例),輸出一個字符圖案在日志中,接着校驗genesis的時間戳,如果小於當前時間200個間隔周期,則報錯重新設置genesis的時間戳配置為一個就近的時間。

schedule_production_loop

這是一個對於producer_plugin非常重要的函數,是出塊節點按計划出塊的循環函數。在系統多個功能函數中涉及處理恢復繼續按計划出塊時,多次被調用到。該函數中大量使用到了_timer對象,下面先研究_timer。

basic_deadline_timer 的研究

對producer_plugin_impl類的共有成員_timer的追蹤,可以發現它是basic_deadline_timer類的對象。

該函數提供了可等待的計時器功能。basic_deadline_timer類模板提供了執行阻塞(blocking)或異步等待(asynchronous wait)定時器期滿的能力。截止日期計時器總是處於兩種狀態之一:“過期”或“未過期”。如果在過期計時器上調用wait()或async_wait()函數,則等待操作將立即完成。

使用實例:

①阻塞等待(blocking wait)

為計時器設置一個相對時間。

timer.expires_from_now(boost::posix_time::seconds(5));// 從現在開始計時5秒鍾。

等待計時器過期。

timer.wait();

②異步等待(asynchronous wait)

首先要創建一個處理器handler。

void handler(const boost::system::error_code& error)
{
    if (!error)
    {
        // Timer expired.
    }
}

構建一個絕對過期時間的計時器。

boost::asio::deadline_timer timer(io_context, boost::posix_time::time_from_string("2005-12-07 23:59:59.000"));

啟動一個異步等待。

timer.async_wait(handler);

③改變過期時間

當存在掛起的異步等待時,更改計時器的過期時間會導致這些等待操作被取消。要確保與計時器關聯的操作只執行一次,請使用類似的方法:

// boost::asio::basic\_deadline\_timer::expires\_from\_now() 函數取消任何掛起的異步等待,並返回已取消的異步等待的數量。如果返回0,則太遲了,且等待處理器已經被執行,或者即將被執行。如果它返回1,那么等待程序會被成功取消。
void on_some_event() // 模擬某事件處理函數
{
    if (my_timer.expires_from_now(seconds(5)) > 0) {
      // 取消計時器,啟動一個新的異步等待
      my_timer.async_wait(on_timeout);
    } else {
      // 計時器已過期。
    }
}
// 如果一個等待處理程序被取消,傳遞給它的boost::system::error\_code包含值boost::asio::error::operation\_aborted。
void on_timeout(const boost::system::error_code& e) // 超時事件處理函數
{
    if (e != boost::asio::error::operation_aborted) {
      計時器未取消,繼續執行操作。
    }
}

回到producer_plugin的shutdown階段中_timer的使用。

my->_timer.cancel();

進入basic_deadline_timer::cancel()函數:

std::size_t cancel()
{
    boost::system::error_code ec;
    std::size_t s = this->get_service().cancel(this->get_implementation(), ec);
    boost::asio::detail::throw_error(ec, "cancel");
    return s;
}

該函數將取消所有正在等待計時器的異步操作。

回到schedule_production_loop函數。

這一部分是對計時器的設置。首先重置計時器,獲得鏈對象chain以及弱指針producer_plugin_impl實例。執行start_block,並接收結果,根據結果的不同做不同的處理,該結果為一個枚舉類型:

enum class start_block_result {
    succeeded, // 成功
    failed, // 失敗
    waiting, // 等待
    exhausted // 耗盡,該狀態在producer插件中並沒有顯式使用,而是其他狀態處理完畢剩余的情況。
};
  • 如果是failed,啟動區塊的返回值是失敗的,那么要輸出提醒日志,同時計時器啟動50毫秒倒計時,異步等待到期以后再次嘗試重新調用自己schedule_production_loop函數。
  • 如果是waiting,等待中。判斷生產者如果不是空且啟用了生產能力,則調用延時計划生產循環schedule_delayed_production_loop函數。
bool production_disabled_by_policy() { // 確定生產能力是否被禁用的方式。有以下三種判斷條件,滿足其一即可。
 return !_production_enabled || _pause_production || (_max_irreversible_block_age_us.count() >= 0 && get_irreversible_block_age() >= _max_irreversible_block_age_us);
}

延時計划生產循環schedule_delayed_production_loop函數,主要操作對象是wake_up_time,即該延時操作的喚醒時間。一些列校驗判斷探測出喚醒時間已到達時,就會調用回schedule_production_loop函數。

  • 接下來,start_block結果其他的狀態情況,即succeeded或者exhausted。當pending區塊模式為生產中時,pending區塊的模式分為:
enum class pending_block_mode {
   producing, // 本地生產
   speculating // 外部確認有可能確認失敗不一定能成為不可逆,因此是投機性的。
};

到目前這個分支下,換句話講就是啟動區塊start_block已經成功succeeded了,但是也有可能耗盡exhausted,這兩種情況要通過另外一種判斷,即是否pending區塊處於生產中的狀態來做區分。

pending區塊模式為生產中producing

  • start_block成功succeeded。這部分代碼的工作主要是用來保證區塊要在截止時間之前被裝運上鏈。先校驗一下是否存在pending區塊。接着計算截止時間並按照該時間啟動計時器:
// 計算截止時間,epoch默認是從1970/1/1,從epoch開始計算到pending區塊的時間加上預設的區塊生產時間偏移量。
auto deadline = chain.pending_block_time().time_since_epoch().count() + (last_block ? _last_block_time_offset_us : _produce_time_offset_us);
_timer.expires_at( epoch + boost::posix_time::microseconds( deadline ));// 截止時間加上epoch的時間初始量,按此時間啟動計時器。

現行西歷即格里歷,又譯國瑞歷、額我略歷、格列高利歷、格里高利歷,稱西元。地球每天的自轉是有些不規則的,而且正在緩慢減速。所以,格林尼治時間已經不再被作為標准時間使用。現在的標准時間──協調世界時(UTC)──由原子鍾提供。

  • start_block成功exhausted。仍舊要先檢查是否存在pending區塊。接着計算預期時間expect_time,是penging區塊時間減去一個區塊間隔時間0.5秒(現在是設置的0.5秒出一個塊,在config.hpp中可以查到)
auto expect_time = chain.pending_block_time() - fc::microseconds(config::block_interval_us);

... config.hpp
const static int      block_interval_ms = 500;
const static int      block_interval_us = block_interval_ms*1000;

下面是判斷預期時間和現在時間的對比,如果預期時間已過,則將計時器時間調節為0(立即執行出塊)。如果預期時間未到,則設置計時器到預期時間,等待計時完成。

if (fc::time_point::now() >= expect_time) { // 預期時間已過
    _timer.expires_from_now( boost::posix_time::microseconds( 0 )); // 將計時器時間調節為0
    fc_dlog(_log, "Scheduling Block Production on Exhausted Block #${num} immediately", ("num", chain.pending_block_state()->block_num));
} else { // 預期時間未到
    _timer.expires_at(epoch + boost::posix_time::microseconds(expect_time.time_since_epoch().count()));
    fc_dlog(_log, "Scheduling Block Production on Exhausted Block #${num} at ${time}", ("num", chain.pending_block_state()->block_num)("time",expect_time)); // 設置計時器到預期時間
}

分別將succeeded以及exhausted狀態的_timer設置完畢以后,下面要處理當計時器到時的事件處理,即_timer.async_wait函數。該函數的參數為匿名內部類組成的異步回調函數。

_timer.async_wait([&chain,weak_this,cid=++_timer_corelation_id](const boost::system::error_code& ec) {
    auto self = weak_this.lock(); // 獲得鎖。
    if (self && ec != boost::asio::error::operation_aborted && cid == self->_timer_corelation_id) { // 滿足生產區塊的條件:有鎖且操作未被終止且計時器關聯id匹配。
        // 內部要校驗一遍pending區塊是否存在。
        auto block_num = chain.pending_block_state() ? chain.pending_block_state()->block_num : 0; // 區塊號的設置,若pending區塊存在則設置為pending區塊號,若不存在,則設置為0。
        auto res = self->maybe_produce_block(); // 調用maybe_produce_block()函數(下面分析)執行區塊的生產,返回生產結果。
        fc_dlog(_log, "Producing Block #${num} returned: ${res}", ("num", block_num)("res", res));
    }
});

計時器關聯id匹配。_timer_corelation_id的存在源自一個攻擊警報:Boost計時器可能處於一個處理程序尚未執行但不可中止的狀態,這個狀態給外部攻擊提供了可能。關聯id的設置可以有效防止,處理程序被改變。在處理程序捕獲相關性ID設置時,他們必須執行檢查匹配全局變量_timer_corelation_id。如果不匹配,則意味着該方法已被調用,處理程序處於應該取消但無法取消的狀態。

pending區塊模式為投機中speculating

這個狀態下,分兩種情況處理:

  • 如果生產者存在且具備生產能力(有可能是備用節點)時,校驗一番以后最終會調用延時計划出塊循環schedule_delayed_production_loop。
  • 其他情況則只打印日志,說明創建了投機區塊。

maybe_produce_block()函數

前面提到,schedule_production_loop函數是出塊者生產區塊時,調用start_block函數並根據返回結果設置計時器_timer,並處理計時完成的處理程序,而最終只有start_block結果為succeeded以及exhausted狀態,計時完成以后同時滿足有鎖且操作未被終止且計時器關聯id匹配。這全部條件的滿足,最后調用區塊生產執行函數maybe_produce_block。

簡單來講,schedule_production_loop函數就是通過調用start_block設置timer,計時完成執行maybe_produce_block。所以schedule_production_loop函數的核心是處理_timer。

下面分析函數maybe_produce_block:

bool producer_plugin_impl::maybe_produce_block() {
   // 當前作用域退出時回調schedule_production_loop()繼續循環處理出塊工作。
   auto reschedule = fc::make_scoped_exit([this]{ 
      schedule_production_loop();
   });

   try {
      try {
         produce_block(); // 實際調用函數produce_block生產區塊。
         return true; // 返回true,代表區塊生產成功,其他異常狀態均返回false,代表出塊失敗。
      } catch ( const guard_exception& e ) { // 處理守衛異常
         app().get_plugin<chain_plugin>().handle_guard_exception(e);
         return false;
      } FC_LOG_AND_DROP();
   } catch ( boost::interprocess::bad_alloc&) { // 處理內部線程內存錯誤異常
      raise(SIGUSR1);
      return false;
   }
   // 區塊生產出錯,丟其區塊。
   fc_dlog(_log, "Aborting block due to produce_block error");
   chain::controller& chain = app().get_plugin<chain_plugin>().chain();
   chain.abort_block(); // 丟其區塊。
   return false;
}

produce_block函數

區塊生產函數用於處理區塊生產,前面maybe_produce_block函數的主要功能集中在“maybe”,所以實際出塊任務仍舊交由produce_block處理。

void producer_plugin_impl::produce_block() {
   // 區塊生產必須是pending區塊狀態為producing,否則輸出錯誤日志:實際上並沒有真正生產區塊。
   EOS_ASSERT(_pending_block_mode == pending_block_mode::producing, producer_exception, "called produce_block while not actually producing");
   chain::controller& chain = app().get_plugin<chain_plugin>().chain(); // 獲取chain實例
   const auto& pbs = chain.pending_block_state(); // 從chain實例獲取當前pending區塊,如果獲取為空,則輸出錯誤日志:不存在pending區塊,可能被其他插件毀壞。
   const auto& hbs = chain.head_block_state(); // 從chain實例獲取當前頭區塊
   EOS_ASSERT(pbs, missing_pending_block_state, "pending_block_state does not exist but it should, another plugin may have corrupted it");
   auto signature_provider_itr = _signature_providers.find( pbs->block_signing_key ); // 通過pending區塊的區塊簽名公鑰去內存多索引表_signature_providers中差找signature_provider。
   // 如果未查到有效signature_provider,則輸出錯誤日志:正在嘗試生產一個區塊,是由一個我們不擁有的私鑰所簽名。
   EOS_ASSERT(signature_provider_itr != _signature_providers.end(), producer_priv_key_not_found, "Attempting to produce a block for which we don't have the private key");
   chain.finalize_block(); // 執行chain的區塊完成操作,重置資源(調用的為controller的finalize_block函數)。
   chain.sign_block( [&]( const digest_type& d ) { // 調用controller的sign_block函數進行函數簽名,參數為一個回調函數。區塊簽名最終是由block_header_state來做的實際工作。
      auto debug_logger = maybe_make_debug_time_logger();
      return signature_provider_itr->second(d);
   } );
   chain.commit_block(); // 仍舊是執行controller的commit_block函數進行區塊提交。
   block_state_ptr new_bs = chain.head_block_state(); 
   _producer_watermarks[new_bs->header.producer] = chain.head_block_num(); // 設置水印

   // 打印生產結果日志。
   ilog("Produced block ${id}... #${n} @ ${t} signed by ${p} [trxs: ${count}, lib: ${lib}, confirmed: ${confs}]",
        ("p",new_bs->header.producer)("id",fc::variant(new_bs->id).as_string().substr(0,16))
        ("n",new_bs->block_num)("t",new_bs->header.timestamp)
        ("count",new_bs->block->transactions.size())("lib",chain.last_irreversible_block_num())("confs", new_bs->header.confirmed));

}

水印的意義重申一下,是用來給計划出塊預先設置出塊安排的,即安排下一個區塊的生產者,管理出塊輪次。

produce_block函數屬於producer_plugin,然而其中核心區塊處理,例如重置資源准備、區塊簽名、提交區塊都時通過chain_plugin調用了controller的相關函數,而controller只是負責管理與數據層的交互,數據層包括block.log以及及與chainbase的db,區塊簽名的內容是區塊頭block_header_state來處理。結構拆分如下圖所示:

image

start_block 函數

該函數是producer插件對出塊管理的核心函數。該函數通過對時間的控制管理了出塊節奏,管理出塊輪次。到這里可以得出producer插件的操作對象是pending區塊,所以該函數對pending區塊是本地生產還是外部同步進來的做了區分處理。這其中涉及到一個區塊同步確認的處理,即生產者生產當前區塊時要確認多少個區塊:

  • 如果區塊的生產者不是當前節點的,則假設沒有確認(丟棄這個塊)。
  • 如果區塊的生產者是當前節點上從未產生過的生產者,那么保守的方法就是假定沒有確認,確保不會在crash之后重復簽名。(不過此處有個問題是crash的話,是否要保證水印持久化?否則crash會丟失,答案是肯定的)
  • 如果區塊的生產者是這個節點上的生產者,這個節點是知道它生成的最后一個塊的,則安全地設置它:unless
  • 如果區塊的生產者在該節點的最后水印中的位置較高,則意味着該區塊時在一個不同的分叉上。

本函數大約包含三百多行代碼,用於處理pending區塊不同情況下的校驗以及動作,包括對區塊中打包事務的校驗和處理,最終返回的時start_block_result狀態,前面有介紹過。

start_block的代碼不在此詳細分析,但總結下來可以得出是對pending區塊的區塊頭校驗,包括是否是本地生產抑或是外部同步,然后是對pending區塊內事務的處理,包括如何重置打包接收的事務。到這部分相當於將一個區塊的頭部信息構成以及校驗工作和區塊體的事務打包內容工作完成了。最后返回一個處理狀態,如果通過了層層校驗以及無異常的順利處理,則返回啟動區塊成功的狀態,如果是時間超時,耗盡了規定時間則返回exhausted,其他情況則時failed。

總結

本文分析介紹了producer_plugin的重點功能,研究了其大量內部函數。最初的研究路先是分析該插件的生命周期,然后引申到各個未知或以前未仔細研究過的調用的函數細節。其中,涉及到了出塊安排水印、pending區塊處理、區塊生產循環、區塊的生產者校驗、是否本地或是同步、計時器的相關知識和應用、最后不可逆塊的研究、區塊生產、區塊簽名等,另外還涉及到新版本的多線程校驗簽名區塊的內容。研究過程中,也梳理了producer插件與chain插件的交互以及延伸到controller的內部函數的使用。總之,內容較多篇幅較長,整體研究脈絡似乎仍舊不算清晰,但也算是自身知識圖譜的“大數據”的一部分,量變引發質變。

更多文章請轉到醒者呆的博客園


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM