調了兩天,終於把EOS的出塊和DPOS共識機制的代碼看了個大概,下面做個小總結。因為這一塊是EOS比較核心的代碼塊,稍微復雜些,如果有理解錯誤之處,請大家見識並幫我指出來,我也會有可能隨時改寫這里的內容,盡量保持表達的正確性。
在調度的過程中,我可能修改了部分源代碼並配置了環境:
1.配置多節點多主機環境,見我前面的文章,這里一共有四台機器,有三個超級節點,還有一台原為eosio用戶在接收數據;
2.把EOS改為了5秒出一個塊,只為方便調試和查看打印信息;
3.增加各個文件的打印信息。
一.區塊調度
EOS出塊在producer_plugin這個插件中,一系列的初始化后就有下面這一段代碼了:
void producer_plugin::plugin_startup() { ......... my->schedule_production_loop(); }
節點在正常情況下是永遠不會退出schedule_production_loop這個函數的調度的,而且整個函數采用定時器,是完全異步,下面對這個函數做具體的分析:
void producer_plugin_impl::schedule_production_loop() {
chain::controller& chain = app().get_plugin<chain_plugin>().chain();
//關掉以前所有的定時器,即取消所有異步等待 _timer.cancel(); std::weak_ptr<producer_plugin_impl> weak_this = shared_from_this(); bool last_block;
//實時更新節點所有調度信息 auto result = start_block(last_block);
//根據結果來判斷本節點應該采取哪一種動作,所有操作都是異步的,主要分下面四種:
/* 1. 獲取各種調度信息異常,則重新獲取數據進行調度;
2. 其它節點正在出塊,則進行等待
3. 輪到本節點出塊,則進行出塊操作;
4. 計算下一個生產者出塊的時間,然后再進行系統調度
*/
if (result == start_block_result::failed) { elog("Failed to start a pending block, will try again later"); _timer.expires_from_now( boost::posix_time::microseconds( config::block_interval_us / 10 )); // we failed to start a block, so try again later? _timer.async_wait([weak_this,cid=++_timer_corelation_id](const boost::system::error_code& ec) { auto self = weak_this.lock(); if (self && ec != boost::asio::error::operation_aborted && cid == self->_timer_corelation_id) { self->schedule_production_loop(); } }); } else if (result == start_block_result::waiting) { // nothing to do until more blocks arrive } else if (_pending_block_mode == pending_block_mode::producing) { // we succeeded but block may be exhausted if (result == start_block_result::succeeded) { // ship this block off no later than its deadline static const boost::posix_time::ptime epoch(boost::gregorian::date(1970, 1, 1)); _timer.expires_at(epoch + boost::posix_time::microseconds(chain.pending_block_time().time_since_epoch().count() + (last_block ? _last_block_time_offset_us : _produce_time_offset_us))); fc_dlog(_log, "Scheduling Block Production on Normal Block #${num} for ${time}", ("num", chain.pending_block_state()->block_num)("time",chain.pending_block_time())); } else { // ship this block off immediately _timer.expires_from_now( boost::posix_time::microseconds( 0 )); fc_dlog(_log, "Scheduling Block Production on Exhausted Block #${num} immediately", ("num", chain.pending_block_state()->block_num)); } _timer.async_wait([&chain,weak_this,cid=++_timer_corelation_id](const boost::system::error_code& ec) { auto self = weak_this.lock(); if (self && ec != boost::asio::error::operation_aborted && cid == self->_timer_corelation_id) { auto res = self->maybe_produce_block(); fc_dlog(_log, "Producing Block #${num} returned: ${res}", ("num", chain.pending_block_state()->block_num)("res", res) ); } }); } else if (_pending_block_mode == pending_block_mode::speculating && !_producers.empty() && !production_disabled_by_policy()){ // if we have any producers then we should at least set a timer for our next available slot optional<fc::time_point> wake_up_time; for (const auto&p: _producers) { auto next_producer_block_time = calculate_next_block_time(p); if (next_producer_block_time) { auto producer_wake_up_time = *next_producer_block_time - fc::microseconds(config::block_interval_us); if (wake_up_time) { // wake up with a full block interval to the deadline wake_up_time = std::min<fc::time_point>(*wake_up_time, producer_wake_up_time); } else { wake_up_time = producer_wake_up_time; } } } if (wake_up_time) { fc_dlog(_log, "Specualtive Block Created; Scheduling Speculative/Production Change at ${time}", ("time", wake_up_time)); static const boost::posix_time::ptime epoch(boost::gregorian::date(1970, 1, 1)); _timer.expires_at(epoch + boost::posix_time::microseconds(wake_up_time->time_since_epoch().count())); _timer.async_wait([weak_this,cid=++_timer_corelation_id](const boost::system::error_code& ec) { auto self = weak_this.lock(); if (self && ec != boost::asio::error::operation_aborted && cid == self->_timer_corelation_id) { self->schedule_production_loop(); } }); } else { fc_dlog(_log, "Speculative Block Created; Not Scheduling Speculative/Production, no local producers had valid wake up times"); } } else { fc_dlog(_log, "Speculative Block Created"); } }
出塊的原則是:21個超級節點輪流出塊,每個節點連接出塊12個,一個塊是0.5s,所以在不出現異常情況下一個流程為:21*12*0.5=126s,在這個流程中如果有其它節點被投票選舉成為超級節點后是不會馬上出塊的,必須等到一個流程結束后再會把第21個超級節點擠下去進行出塊(注:投票是實時的,只不過計數數是緩存在database里面的,超級節點出塊的整個流程完成后會重新從database里面讀取數據確定由哪些節點出塊,開始下一個出塊流程。還有就是如果在這個流程中有一個節點由於某種原因導致出塊中斷,那么在本次該 節點出塊的時間段內是沒有辦法出塊的,這里便會形成一個時間段沒有塊產生的。
producer_plugin_impl::start_block_result producer_plugin_impl::start_block(bool &last_block) {
chain::controller& chain = app().get_plugin<chain_plugin>().chain();
//獲取本節點最高區塊高度的區塊狀態信息 const auto& hbs = chain.head_block_state(); //Schedule for the next second's tick regardless of chain state // If we would wait less than 50ms (1/10 of block_interval), wait for the whole block interval. fc::time_point now = fc::time_point::now(); fc::time_point base = std::max<fc::time_point>(now, chain.head_block_time()); int64_t min_time_to_next_block = (config::block_interval_us) - (base.time_since_epoch().count() % (config::block_interval_us) ); fc::time_point block_time = base + fc::microseconds(min_time_to_next_block); if((block_time - now) < fc::microseconds(config::block_interval_us/10) ) { // we must sleep for at least 50ms // ilog("Less than ${t}us to next block time, time_to_next_block_time ${bt}", // ("t", config::block_interval_us/10)("bt", block_time)); block_time += fc::microseconds(config::block_interval_us); } _pending_block_mode = pending_block_mode::producing; // Not our turn
判斷是否是整個流程的最后一次出塊,last_block為返回值 last_block = ((block_timestamp_type(block_time).slot % config::producer_repetitions) == config::producer_repetitions - 1);
//計算下一個區塊是由哪個節點出塊 const auto& scheduled_producer = hbs->get_scheduled_producer(block_time); auto currrent_watermark_itr = _producer_watermarks.find(scheduled_producer.producer_name); auto signature_provider_itr = _signature_providers.find(scheduled_producer.block_signing_key); auto irreversible_block_age = get_irreversible_block_age(); // If the next block production opportunity is in the present or future, we're synced.
//本節點計算出來的下一個區塊的時間必須大於當前本節點收到的最大區塊的時間,也即如果本節點生產區塊,那區塊的時間點肯定比現在最新的區塊的時間要大 if( !_production_enabled ) { _pending_block_mode = pending_block_mode::speculating; } else if( _producers.find(scheduled_producer.producer_name) == _producers.end()) { _pending_block_mode = pending_block_mode::speculating; } else if (signature_provider_itr == _signature_providers.end()) { elog("Not producing block because I don't have the private key for ${scheduled_key}", ("scheduled_key", scheduled_producer.block_signing_key)); _pending_block_mode = pending_block_mode::speculating; } else if ( _pause_production ) { elog("Not producing block because production is explicitly paused"); _pending_block_mode = pending_block_mode::speculating; } else if ( _max_irreversible_block_age_us.count() >= 0 && irreversible_block_age >= _max_irreversible_block_age_us ) { elog("Not producing block because the irreversible block is too old [age:${age}s, max:${max}s]", ("age", irreversible_block_age.count() / 1'000'000)( "max", _max_irreversible_block_age_us.count() / 1'000'000 )); _pending_block_mode = pending_block_mode::speculating; } if (_pending_block_mode == pending_block_mode::producing) { // determine if our watermark excludes us from producing at this point if (currrent_watermark_itr != _producer_watermarks.end()) { if (currrent_watermark_itr->second >= hbs->block_num + 1) { elog("Not producing block because \"${producer}\" signed a BFT confirmation OR block at a higher block number (${watermark}) than the current fork's head (${head_block_num})", ("producer", scheduled_producer.producer_name) ("watermark", currrent_watermark_itr->second) ("head_block_num", hbs->block_num)); _pending_block_mode = pending_block_mode::speculating; } } } if (_pending_block_mode == pending_block_mode::speculating) { auto head_block_age = now - chain.head_block_time(); if (head_block_age > fc::seconds(5)) return start_block_result::waiting; } try { uint16_t blocks_to_confirm = 0; if (_pending_block_mode == pending_block_mode::producing) { // determine how many blocks this producer can confirm // 1) if it is not a producer from this node, assume no confirmations (we will discard this block anyway) // 2) if it is a producer on this node that has never produced, the conservative approach is to assume no // confirmations to make sure we don't double sign after a crash TODO: make these watermarks durable? // 3) if it is a producer on this node where this node knows the last block it produced, safely set it -UNLESS- // 4) the producer on this node's last watermark is higher (meaning on a different fork) if (currrent_watermark_itr != _producer_watermarks.end()) { auto watermark = currrent_watermark_itr->second; if (watermark < hbs->block_num) { blocks_to_confirm = std::min<uint16_t>(std::numeric_limits<uint16_t>::max(), (uint16_t)(hbs->block_num - watermark)); } } } chain.abort_block(); chain.start_block(block_time, blocks_to_confirm); } FC_LOG_AND_DROP(); const auto& pbs = chain.pending_block_state(); if (pbs) { if (_pending_block_mode == pending_block_mode::producing && pbs->block_signing_key != scheduled_producer.block_signing_key) { elog("Block Signing Key is not expected value, reverting to speculative mode! [expected: \"${expected}\", actual: \"${actual\"", ("expected", scheduled_producer.block_signing_key)("actual", pbs->block_signing_key)); _pending_block_mode = pending_block_mode::speculating; } // attempt to play persisted transactions first bool exhausted = false; auto unapplied_trxs = chain.get_unapplied_transactions(); // remove all persisted transactions that have now expired auto& persisted_by_id = _persistent_transactions.get<by_id>(); auto& persisted_by_expiry = _persistent_transactions.get<by_expiry>(); while(!persisted_by_expiry.empty() && persisted_by_expiry.begin()->expiry <= pbs->header.timestamp.to_time_point()) { persisted_by_expiry.erase(persisted_by_expiry.begin()); } try { for (auto itr = unapplied_trxs.begin(); itr != unapplied_trxs.end(); ++itr) { const auto& trx = *itr; if (persisted_by_id.find(trx->id) != persisted_by_id.end()) { // this is a persisted transaction, push it into the block (even if we are speculating) with // no deadline as it has already passed the subjective deadlines once and we want to represent // the state of the chain including this transaction try { chain.push_transaction(trx, fc::time_point::maximum()); } catch ( const guard_exception& e ) { app().get_plugin<chain_plugin>().handle_guard_exception(e); return start_block_result::failed; } FC_LOG_AND_DROP(); // remove it from further consideration as it is applied *itr = nullptr; } } if (_pending_block_mode == pending_block_mode::producing) { for (const auto& trx : unapplied_trxs) { if (exhausted) { break; } if (!trx) { // nulled in the loop above, skip it continue; } if (trx->packed_trx.expiration() < pbs->header.timestamp.to_time_point()) { // expired, drop it chain.drop_unapplied_transaction(trx); continue; } try { auto deadline = fc::time_point::now() + fc::milliseconds(_max_transaction_time_ms); bool deadline_is_subjective = false; if (_max_transaction_time_ms < 0 || (_pending_block_mode == pending_block_mode::producing && block_time < deadline)) { deadline_is_subjective = true; deadline = block_time; } auto trace = chain.push_transaction(trx, deadline); if (trace->except) { if (failure_is_subjective(*trace->except, deadline_is_subjective)) { exhausted = true; } else { // this failed our configured maximum transaction time, we don't want to replay it chain.drop_unapplied_transaction(trx); } } } catch ( const guard_exception& e ) { app().get_plugin<chain_plugin>().handle_guard_exception(e); return start_block_result::failed; } FC_LOG_AND_DROP(); } auto& blacklist_by_id = _blacklisted_transactions.get<by_id>(); auto& blacklist_by_expiry = _blacklisted_transactions.get<by_expiry>(); auto now = fc::time_point::now(); while (!blacklist_by_expiry.empty() && blacklist_by_expiry.begin()->expiry <= now) { blacklist_by_expiry.erase(blacklist_by_expiry.begin()); } auto scheduled_trxs = chain.get_scheduled_transactions(); for (const auto& trx : scheduled_trxs) { if (exhausted) { break; } if (blacklist_by_id.find(trx) != blacklist_by_id.end()) { continue; } try { auto deadline = fc::time_point::now() + fc::milliseconds(_max_transaction_time_ms); bool deadline_is_subjective = false; if (_max_transaction_time_ms < 0 || (_pending_block_mode == pending_block_mode::producing && block_time < deadline)) { deadline_is_subjective = true; deadline = block_time; } auto trace = chain.push_scheduled_transaction(trx, deadline); if (trace->except) { if (failure_is_subjective(*trace->except, deadline_is_subjective)) { exhausted = true; } else { auto expiration = fc::time_point::now() + fc::seconds(chain.get_global_properties().configuration.deferred_trx_expiration_window); // this failed our configured maximum transaction time, we don't want to replay it add it to a blacklist _blacklisted_transactions.insert(transaction_id_with_expiry{trx, expiration}); } } } catch ( const guard_exception& e ) { app().get_plugin<chain_plugin>().handle_guard_exception(e); return start_block_result::failed; } FC_LOG_AND_DROP(); } } if (exhausted) { return start_block_result::exhausted; } else { // attempt to apply any pending incoming transactions if (!_pending_incoming_transactions.empty()) { auto old_pending = std::move(_pending_incoming_transactions); _pending_incoming_transactions.clear(); for (auto& e: old_pending) { on_incoming_transaction_async(std::get<0>(e), std::get<1>(e), std::get<2>(e)); } } return start_block_result::succeeded; } } catch ( boost::interprocess::bad_alloc& ) { raise(SIGUSR1); return start_block_result::failed; } } return start_block_result::failed; }
做為一個超級節點,從功能上講,應該分為兩問部分,一是接收其它超級節點的區塊並進行驗證;二是輪到自己出塊的時候就進行出塊,並廣播其它超級節點;下面我們先從超級節點做為見證者開始。
二.超級節點做為見證者
超級節點做為見證者的時候,會從底層網絡收到生產者節點發過來的區塊,具體的接口函數在net_plugin.cpp中
void net_plugin_impl::handle_message( connection_ptr c, const signed_block &msg)
這一段函數在<<EOS多節點同步代碼分析>>一文有解析過,這里不再做分析,通過這里的調用和驗證后會來到producer_plugin.cpp文件中的on_incoming_block 這個函數,下面才是真正的區塊數據處理:
void on_incoming_block(const signed_block_ptr& block) { fc_dlog(_log, "received incoming block ${id}", ("id", block->id())); EOS_ASSERT( block->timestamp < (fc::time_point::now() + fc::seconds(7)), block_from_the_future, "received a block from the future, ignoring it" ); chain::controller& chain = app().get_plugin<chain_plugin>().chain(); /* de-dupe here... no point in aborting block if we already know the block */ auto id = block->id(); auto existing = chain.fetch_block_by_id( id ); if( existing ) { return; } // abort the pending block chain.abort_block(); // exceptions throw out, make sure we restart our loop auto ensure = fc::make_scoped_exit([this](){ schedule_production_loop(); }); // push the new block bool except = false; try { chain.push_block(block); } catch ( const guard_exception& e ) { app().get_plugin<chain_plugin>().handle_guard_exception(e); return; } catch( const fc::exception& e ) { elog((e.to_detail_string())); except = true; } catch ( boost::interprocess::bad_alloc& ) { raise(SIGUSR1); return; } if( except ) { app().get_channel<channels::rejected_block>().publish( block ); return; }
//判斷當前結點是否可以出塊的一個因素,即當前節點要出塊則下一個區塊時間上必須領先當前時間 if( chain.head_block_state()->header.timestamp.next().to_time_point() >= fc::time_point::now() ) { _production_enabled = true; } if( fc::time_point::now() - block->timestamp < fc::minutes(5) || (block->block_num() % 1000 == 0) ) { ilog("Received block ${id}... #${n} @ ${t} signed by ${p} [trxs: ${count}, lib: ${lib}, conf: ${confs}, latency: ${latency} ms]", ("p",block->producer)("id",fc::variant(block->id()).as_string().substr(8,16)) ("n",block_header::num_from_id(block->id()))("t",block->timestamp) ("count",block->transactions.size())("lib",chain.last_irreversible_block_num())("confs", block->confirmed)("latency", (fc::time_point::now() - block->timestamp).count()/1000 ) ); } }
查看controller.cpp中的push_block函數,較為關鍵的為第二個,代碼如下
void push_block( const signed_block_ptr& b, controller::block_status s ) { // idump((fc::json::to_pretty_string(*b))); EOS_ASSERT(!pending, block_validate_exception, "it is not valid to push a block when there is a pending block"); try { EOS_ASSERT( b, block_validate_exception, "trying to push empty block" ); EOS_ASSERT( s != controller::block_status::incomplete, block_validate_exception, "invalid block status for a completed block" );
//使用checkpoint校驗區塊id emit( self.pre_accepted_block, b ); bool trust = !conf.force_all_checks && (s == controller::block_status::irreversible || s == controller::block_status::validated);
//更新fork_db相關的數據 auto new_header_state = fork_db.add( b, trust ); emit( self.accepted_block_header, new_header_state ); // on replay irreversible is not emitted by fork database, so emit it explicitly here if( s == controller::block_status::irreversible ) emit( self.irreversible_block, new_header_state );
//這個條件滿足 if ( read_mode != db_read_mode::IRREVERSIBLE ) { maybe_switch_forks( s ); } } FC_LOG_AND_RETHROW( ) }
先看看fork_database::add方法,也是一段關鍵的代碼
block_state_ptr fork_database::add( signed_block_ptr b, bool trust ) { EOS_ASSERT( b, fork_database_exception, "attempt to add null block" ); EOS_ASSERT( my->head, fork_db_block_not_found, "no head block set" );
//fork_db是否有當前區塊信息的緩存 const auto& by_id_idx = my->index.get<by_block_id>(); auto existing = by_id_idx.find( b->id() ); EOS_ASSERT( existing == by_id_idx.end(), fork_database_exception, "we already know about this block" );
//校驗收到的區塊中包含的前一個區塊是否在本地的fork_db中存在,不存的話會報異常,表示不認可此區塊,校驗失敗. auto prior = by_id_idx.find( b->previous ); EOS_ASSERT( prior != by_id_idx.end(), unlinkable_block_exception, "unlinkable block", ("id", string(b->id()))("previous", string(b->previous)) );
//這里代碼比較多,不在展開:每收到一個區塊,會分配一個新的block_state記錄該區塊相關的信息,並把該區塊也放入了block_state,然后把block_state放入到fork_db中,再在fork_db中刪除掉block_num比不dpos_irreversible_blocknum還小的緩存區塊信息 auto result = std::make_shared<block_state>( **prior, move(b), trust ); EOS_ASSERT( result, fork_database_exception , "fail to add new block state" ); return add(result); }
綜上所述:收到區塊數據后先對區塊數據進行各種校驗,然后生成對應的block_state,再把區塊數據和對應的調度信息打包放入fork_db中。下面繼續下走,看看maybe_switch_forks函數:
void maybe_switch_forks( controller::block_status s = controller::block_status::complete ) { auto new_head = fork_db.head();
// new_head這里是fork_db中我們上面存儲的最新的block_header_state,而head是controller中存儲的舊的block_header_state(非正常情況下head不一定比new_head舊,自行理解...),所以正常情況會進第一個分支,非正常情況就是我們常說的鏈出現分叉了 if( new_head->header.previous == head->id ) { try { apply_block( new_head->block, s ); fork_db.mark_in_current_chain( new_head, true ); fork_db.set_validity( new_head, true ); head = new_head; } catch ( const fc::exception& e ) { fork_db.set_validity( new_head, false ); // Removes new_head from fork_db index, so no need to mark it as not in the current chain. throw; }
//鏈出現分叉時的處理 } else if( new_head->id != head->id ) { ilog("switching forks from ${current_head_id} (block number ${current_head_num}) to ${new_head_id} (block number ${new_head_num})", ("current_head_id", head->id)("current_head_num", head->block_num)("new_head_id", new_head->id)("new_head_num", new_head->block_num) ); auto branches = fork_db.fetch_branch_from( new_head->id, head->id ); for( auto itr = branches.second.begin(); itr != branches.second.end(); ++itr ) { fork_db.mark_in_current_chain( *itr , false ); pop_block(); } EOS_ASSERT( self.head_block_id() == branches.second.back()->header.previous, fork_database_exception, "loss of sync between fork_db and chainbase during fork switch" ); // _should_ never fail for( auto ritr = branches.first.rbegin(); ritr != branches.first.rend(); ++ritr) { optional<fc::exception> except; try { apply_block( (*ritr)->block, (*ritr)->validated ? controller::block_status::validated : controller::block_status::complete ); head = *ritr; fork_db.mark_in_current_chain( *ritr, true ); (*ritr)->validated = true; } catch (const fc::exception& e) { except = e; } if (except) { elog("exception thrown while switching forks ${e}", ("e",except->to_detail_string())); // ritr currently points to the block that threw // if we mark it invalid it will automatically remove all forks built off it. fork_db.set_validity( *ritr, false ); // pop all blocks from the bad fork // ritr base is a forward itr to the last block successfully applied auto applied_itr = ritr.base(); for( auto itr = applied_itr; itr != branches.first.end(); ++itr ) { fork_db.mark_in_current_chain( *itr , false ); pop_block(); } EOS_ASSERT( self.head_block_id() == branches.second.back()->header.previous, fork_database_exception, "loss of sync between fork_db and chainbase during fork switch reversal" ); // _should_ never fail // re-apply good blocks for( auto ritr = branches.second.rbegin(); ritr != branches.second.rend(); ++ritr ) { apply_block( (*ritr)->block, controller::block_status::validated /* we previously validated these blocks*/ ); head = *ritr; fork_db.mark_in_current_chain( *ritr, true ); } throw *except; } // end if exception } /// end for each block in branch ilog("successfully switched fork to new head ${new_head_id}", ("new_head_id", new_head->id)); } } /// push_block
在這里我們分兩種情況了,一種是正常處理,即第一個分支,第二種是異常情況,出現分叉:
1.正常
先看看apply_block這個函數
void apply_block( const signed_block_ptr& b, controller::block_status s ) { try { try { EOS_ASSERT( b->block_extensions.size() == 0, block_validate_exception, "no supported extensions" ); start_block( b->timestamp, b->confirmed, s ); transaction_trace_ptr trace;
//這一塊應該是對所有交易進行校驗,暫時沒有理解,交易相關的以后再另做分析 for( const auto& receipt : b->transactions ) { auto num_pending_receipts = pending->_pending_block_state->block->transactions.size(); if( receipt.trx.contains<packed_transaction>() ) { auto& pt = receipt.trx.get<packed_transaction>(); auto mtrx = std::make_shared<transaction_metadata>(pt); trace = push_transaction( mtrx, fc::time_point::maximum(), false, receipt.cpu_usage_us); } else if( receipt.trx.contains<transaction_id_type>() ) { trace = push_scheduled_transaction( receipt.trx.get<transaction_id_type>(), fc::time_point::maximum(), receipt.cpu_usage_us ); } else { EOS_ASSERT( false, block_validate_exception, "encountered unexpected receipt type" ); } bool transaction_failed = trace && trace->except; bool transaction_can_fail = receipt.status == transaction_receipt_header::hard_fail && receipt.trx.contains<transaction_id_type>(); if( transaction_failed && !transaction_can_fail) { edump((*trace)); throw *trace->except; } EOS_ASSERT( pending->_pending_block_state->block->transactions.size() > 0, block_validate_exception, "expected a receipt", ("block", *b)("expected_receipt", receipt) ); EOS_ASSERT( pending->_pending_block_state->block->transactions.size() == num_pending_receipts + 1, block_validate_exception, "expected receipt was not added", ("block", *b)("expected_receipt", receipt) ); const transaction_receipt_header& r = pending->_pending_block_state->block->transactions.back(); EOS_ASSERT( r == static_cast<const transaction_receipt_header&>(receipt), block_validate_exception, "receipt does not match", ("producer_receipt", receipt)("validator_receipt", pending->_pending_block_state->block->transactions.back()) ); } finalize_block();
//對生產者的區塊數據進行簽名校驗,檢查數據是否被篡改過 sign_block( [&]( const auto& ){ return b->producer_signature; }, false ); //trust ); // this is implied by the signature passing //FC_ASSERT( b->id() == pending->_pending_block_state->block->id(), // "applying block didn't produce expected block id" ); commit_block(false); return; } catch ( const fc::exception& e ) { edump((e.to_detail_string())); abort_block(); throw; } } FC_CAPTURE_AND_RETHROW() } /// apply_block
調用的controller.cpp中的start_block:
void start_block( block_timestamp_type when, uint16_t confirm_block_count, controller::block_status s ) { EOS_ASSERT( !pending, block_validate_exception, "pending block is not available" ); EOS_ASSERT( db.revision() == head->block_num, database_exception, "db revision is not on par with head block", ("db.revision()", db.revision())("controller_head_block", head->block_num)("fork_db_head_block", fork_db.head()->block_num) ); auto guard_pending = fc::make_scoped_exit([this](){ pending.reset(); });
//在db中創建一個session,用來保存與該區塊相關的數據,區塊驗證失敗后db中的數據會回滾 pending = db.start_undo_session(true); pending->_block_status = s;
//這里重新構建一個block_state,並根據區塊的最新時間戳來計算出下一個區塊相關的調度信息,head的下一個區塊信息,那正常情況下就是和fork_db最新的是一樣的 pending->_pending_block_state = std::make_shared<block_state>( *head, when ); // promotes pending schedule (if any) to active pending->_pending_block_state->in_current_chain = true; //這里是傳說中的dpos共識算法
pending->_pending_block_state->set_confirmed(confirm_block_count); auto was_pending_promoted = pending->_pending_block_state->maybe_promote_pending(); //modify state in speculative block only if we are speculative reads mode (other wise we need clean state for head or irreversible reads) if ( read_mode == db_read_mode::SPECULATIVE || pending->_block_status != controller::block_status::incomplete ) { const auto& gpo = db.get<global_property_object>(); if( gpo.proposed_schedule_block_num.valid() && // if there is a proposed schedule that was proposed in a block ... ( *gpo.proposed_schedule_block_num <= pending->_pending_block_state->dpos_irreversible_blocknum ) && // ... that has now become irreversible ... pending->_pending_block_state->pending_schedule.producers.size() == 0 && // ... and there is room for a new pending schedule ... !was_pending_promoted // ... and not just because it was promoted to active at the start of this block, then: ) { // Promote proposed schedule to pending schedule. if( !replaying ) { ilog( "promoting proposed schedule (set in block ${proposed_num}) to pending; current block: ${n} lib: ${lib} schedule: ${schedule} ", ("proposed_num", *gpo.proposed_schedule_block_num)("n", pending->_pending_block_state->block_num) ("lib", pending->_pending_block_state->dpos_irreversible_blocknum) ("schedule", static_cast<producer_schedule_type>(gpo.proposed_schedule) ) ); } pending->_pending_block_state->set_new_producers( gpo.proposed_schedule ); db.modify( gpo, [&]( auto& gp ) { gp.proposed_schedule_block_num = optional<block_num_type>(); gp.proposed_schedule.clear(); }); } try {
//把當前的head里面的所有交易拿出來,在本節點進行驗證 auto onbtrx = std::make_shared<transaction_metadata>( get_on_block_transaction() ); auto reset_in_trx_requiring_checks = fc::make_scoped_exit([old_value=in_trx_requiring_checks,this](){ in_trx_requiring_checks = old_value; }); in_trx_requiring_checks = true;
//這里調用push_transaction,會發送到本節點對區塊中的交易地
/*
1. 對本節點各種白名單和黑名單的校驗(要設置黑白名單,請注意eos中大量使用的系統用戶)
2. 對交易的接收者增加通知消息
3. 用戶net cpu ram資源的計算和更新
4. 對權限的驗證
*/
push_transaction( onbtrx, fc::time_point::maximum(), true, self.get_global_properties().configuration.min_transaction_cpu_usage ); } catch( const boost::interprocess::bad_alloc& e ) { elog( "on block transaction failed due to a bad allocation" ); throw; } catch( const fc::exception& e ) { wlog( "on block transaction failed, but shouldn't impact block generation, system contract needs update" ); edump((e.to_detail_string())); } catch( ... ) { } clear_expired_input_transactions();
//這里更新系統用戶eosio.prods的權限,它的權限閾值是隨生產節點個數變化的 update_producers_authority(); } guard_pending.cancel(); } // start_block
再來看看push_transaction函數:
transaction_trace_ptr push_transaction( const transaction_metadata_ptr& trx, fc::time_point deadline, bool implicit, uint32_t billed_cpu_time_us) { EOS_ASSERT(deadline != fc::time_point(), transaction_exception, "deadline cannot be uninitialized"); transaction_trace_ptr trace; try {
// transaction_context trx_context(self, trx->trx, trx->id); if ((bool)subjective_cpu_leeway && pending->_block_status == controller::block_status::incomplete) { trx_context.leeway = *subjective_cpu_leeway; } trx_context.deadline = deadline; trx_context.billed_cpu_time_us = billed_cpu_time_us; trace = trx_context.trace; try { if( implicit ) { trx_context.init_for_implicit_trx(); trx_context.can_subjectively_fail = false; } else {
//對trx的數據進行校驗, trx_context.init_for_input_trx( trx->packed_trx.get_unprunable_size(), trx->packed_trx.get_prunable_size(), trx->trx.signatures.size()); } if( trx_context.can_subjectively_fail && pending->_block_status == controller::block_status::incomplete ) { check_actor_list( trx_context.bill_to_accounts ); // Assumes bill_to_accounts is the set of actors authorizing the transaction } trx_context.delay = fc::seconds(trx->trx.delay_sec); if( !self.skip_auth_check() && !implicit ) { authorization.check_authorization( trx->trx.actions, trx->recover_keys( chain_id ), {}, trx_context.delay, [](){} /*std::bind(&transaction_context::add_cpu_usage_and_check_time, &trx_context, std::placeholders::_1)*/, false ); }
/*對區塊中所有交易的action的處理包括如下但不限於:
1. 對本節點各種白名單和黑名單的校驗(要設置黑白名單,請注意eos中大量使用的系統用戶)
2. 對交易的接收者增加通知消息
3. 用戶net cpu ram資源的計算和更新
*/ trx_context.exec(); trx_context.finalize(); // Automatically rounds up network and CPU usage in trace and bills payers if successful auto restore = make_block_restore_point(); if (!implicit) { transaction_receipt::status_enum s = (trx_context.delay == fc::seconds(0)) ? transaction_receipt::executed : transaction_receipt::delayed; trace->receipt = push_receipt(trx->packed_trx, s, trx_context.billed_cpu_time_us, trace->net_usage); pending->_pending_block_state->trxs.emplace_back(trx); } else { transaction_receipt_header r; r.status = transaction_receipt::executed; r.cpu_usage_us = trx_context.billed_cpu_time_us; r.net_usage_words = trace->net_usage / 8; trace->receipt = r; } fc::move_append(pending->_actions, move(trx_context.executed)); // call the accept signal but only once for this transaction if (!trx->accepted) { emit( self.accepted_transaction, trx); trx->accepted = true; } emit(self.applied_transaction, trace); if ( read_mode != db_read_mode::SPECULATIVE && pending->_block_status == controller::block_status::incomplete ) { //this may happen automatically in destructor, but I prefere make it more explicit trx_context.undo(); } else { restore.cancel(); trx_context.squash(); } if (!implicit) { unapplied_transactions.erase( trx->signed_id ); } return trace; } catch (const fc::exception& e) { trace->except = e; trace->except_ptr = std::current_exception(); } if (!failure_is_subjective(*trace->except)) { unapplied_transactions.erase( trx->signed_id ); } return trace; } FC_CAPTURE_AND_RETHROW((trace)) } /// push_transaction
到最后在apply_block()中調用 的commit_block(false)函數,如下
void on_block( const block_state_ptr& bsp ) { if( bsp->header.timestamp <= _last_signed_block_time ) return; if( bsp->header.timestamp <= _start_time ) return; if( bsp->block_num <= _last_signed_block_num ) return; const auto& active_producer_to_signing_key = bsp->active_schedule.producers; flat_set<account_name> active_producers; active_producers.reserve(bsp->active_schedule.producers.size()); for (const auto& p: bsp->active_schedule.producers) { active_producers.insert(p.producer_name); } std::set_intersection( _producers.begin(), _producers.end(), active_producers.begin(), active_producers.end(), boost::make_function_output_iterator( [&]( const chain::account_name& producer ) { if( producer != bsp->header.producer ) { auto itr = std::find_if( active_producer_to_signing_key.begin(), active_producer_to_signing_key.end(), [&](const producer_key& k){ return k.producer_name == producer; } ); if( itr != active_producer_to_signing_key.end() ) { auto private_key_itr = _signature_providers.find( itr->block_signing_key ); if( private_key_itr != _signature_providers.end() ) { auto d = bsp->sig_digest(); auto sig = private_key_itr->second( d ); _last_signed_block_time = bsp->header.timestamp; _last_signed_block_num = bsp->block_num; // ilog( "${n} confirmed", ("n",name(producer)) ); _self->confirmed_block( { bsp->id, d, producer, sig } ); } } } } ) ); // since the watermark has to be set before a block is created, we are looking into the future to // determine the new schedule to identify producers that have become active chain::controller& chain = app().get_plugin<chain_plugin>().chain(); const auto hbn = bsp->block_num; auto new_block_header = bsp->header; new_block_header.timestamp = new_block_header.timestamp.next(); new_block_header.previous = bsp->id; auto new_bs = bsp->generate_next(new_block_header.timestamp); // for newly installed producers we can set their watermarks to the block they became active if (new_bs.maybe_promote_pending() && bsp->active_schedule.version != new_bs.active_schedule.version) { flat_set<account_name> new_producers; new_producers.reserve(new_bs.active_schedule.producers.size()); for( const auto& p: new_bs.active_schedule.producers) { if (_producers.count(p.producer_name) > 0) new_producers.insert(p.producer_name); } for( const auto& p: bsp->active_schedule.producers) { new_producers.erase(p.producer_name); } for (const auto& new_producer: new_producers) { _producer_watermarks[new_producer] = hbn; } } }
還有分叉的情況這種問題有機會再做分析。
