死磕以太坊源碼分析之區塊上鏈入庫
配合以下代碼進行閱讀:https://github.com/blockchainGuide/
寫文不易,給個小關注,有什么問題可以指出,便於大家交流學習。
引言
不管是礦工挖礦還是Fetcher
同步,Downloader
同步,或者是導入本地文件等等,最中都是將區塊上鏈入庫。接下來我們就詳細分析這部分的動作。
幾處可能調用的地方
①:在Downloader同步最后會將區塊插入到區塊鏈中
func (d *Downloader) importBlockResults(results []*fetchResult) error {
...
if index, err := d.blockchain.InsertChain(blocks); err != nil {
....
}
}
②:創建一個新的以太坊協議管理器,也會將區塊插入到鏈中
func NewProtocolManager(...) (*ProtocolManager, error) {
...
n, err := manager.blockchain.InsertChain(blocks)
}
③:插入側鏈數據
func (bc *BlockChain) insertSideChain(block *types.Block, it *insertIterator) (int, error) {
...
if _, err := bc.insertChain(blocks, false); err != nil {
....
}
}
④:從本地文件導入鏈
func (api *PrivateAdminAPI) ImportChain(file string) (bool, error) {
if _, err := api.eth.BlockChain().InsertChain(blocks); err != nil {
....
}
}
⑤:fetcher同步導入塊
func (f *Fetcher) insert(peer string, block *types.Block) {
...
if _, err := f.insertChain(types.Blocks{block}); err != nil {
...
}
}
以上就是比較常見的需要將區塊上鏈的動作。調用的核心方法就是:
func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, error) {}
獲取區塊鏈所有相關文章以及資料,請參閱:https://github.com/blockchainGuide/
插入數據到blockchain中
①:如果鏈正在中斷,直接返回
if atomic.LoadInt32(&bc.procInterrupt) == 1 {
return 0, nil
}
②:開啟並行的簽名恢復
senderCacher.recoverFromBlocks(types.MakeSigner(bc.chainConfig, chain[0].Number()), chain)
③:開啟並行校驗header
abort, results := bc.engine.VerifyHeaders(bc, headers, seals)
校驗header
是共識引擎所要做的事情,我們這里只分析ethash
它的實現。
func (ethash *Ethash) VerifyHeaders(chain consensus.ChainReader, headers []*types.Header, seals []bool) (chan<- struct{}, <-chan error) {
....
errors[index] = ethash.verifyHeaderWorker(chain, headers, seals, index)
}
func (ethash *Ethash) verifyHeaderWorker(chain consensus.ChainReader, headers []*types.Header, seals []bool, index int) error {
var parent *types.Header
if index == 0 {
parent = chain.GetHeader(headers[0].ParentHash, headers[0].Number.Uint64()-1)
} else if headers[index-1].Hash() == headers[index].ParentHash {
parent = headers[index-1]
}
if parent == nil {
return consensus.ErrUnknownAncestor
}
if chain.GetHeader(headers[index].Hash(), headers[index].Number.Uint64()) != nil {
return nil // known block
}
return ethash.verifyHeader(chain, headers[index], parent, false, seals[index])
}
首先會調用verifyHeaderWorker
進行校驗,主要檢驗塊的祖先是否已知以及塊是否已知,接着會調用verifyHeader
進行更深的校驗,也是最核心的校驗,大概做了以下幾件事:
- header.Extra不可超過32字節
- header.Time不能超過15秒,15秒以后的就被認定為未來的塊
- 當前header的時間戳不可以等於父塊的時間戳
- 根據難度計算算法得出的expected必須和header.Difficulty 一致。
- Gas limit 要 <= 2 ^ 63-1
- gasUsed<= gasLimit
- Gas limit 要在允許范圍內
- 塊號必須是父塊加1
- 根據 ethash.VerifySeal去驗證塊是否滿足POW難度要求
到此驗證header的事情就做完了。
④:循環校驗body
block, err := it.next()
-> ValidateBody
-> VerifyUncles
包括以下錯誤:
- block已知
- uncle太多
- 重復的uncle
- uncle是祖先塊
- uncle哈希不匹配
- 交易哈希不匹配
- 未知祖先
- 祖先塊的狀態無法獲取
4.1 如果block
存在,且是已知塊,則寫入已知塊。
bc.writeKnownBlock(block)
4.2 如果是祖先塊的狀態無法獲取的錯誤,則作為側鏈插入:
bc.insertSideChain(block, it)
4.3 如果是未來塊或者未知祖先,則添加未來塊:
bc.addFutureBlock(block);
注意這里的添加 futureBlock,會被扔進futureBlocks里面去,在NewBlockChain的時候會開啟新的goroutine:
go bc.update()
func (bc *BlockChain) update() {
futureTimer := time.NewTicker(5 * time.Second)
for{
select{
case <-futureTimer.C:
bc.procFutureBlocks()
}
}
}
func (bc *BlockChain) procFutureBlocks() {
...
for _, hash := range bc.futureBlocks.Keys() {
if block, exist := bc.futureBlocks.Peek(hash); exist {
blocks = append(blocks, block.(*types.Block))
}
}
...
for i := range blocks {
bc.InsertChain(blocks[i : i+1])
}
}
}
會開啟一個計時器,每5秒就會去執行插入這些未來的塊。
4.4 如果是其他錯誤,直接中斷,並且報告壞塊。
bc.futureBlocks.Remove(block.Hash())
...
bc.reportBlock(block, nil, err)
⑤:沒有校驗錯誤
5.1 如果是壞塊,則報告;
if BadHashes[block.Hash()] {
bc.reportBlock(block, nil, ErrBlacklistedHash)
return it.index, ErrBlacklistedHash
}
5.2 如果是未知塊,則寫入未知塊;
if err == ErrKnownBlock {
logger := log.Debug
if bc.chainConfig.Clique == nil {
logger = log.Warn
}
...
if err := bc.writeKnownBlock(block); err != nil {
return it.index, err
}
stats.processed++
lastCanon = block
continue
}
5.3 根據給定trie,創建狀態;
parent := it.previous()
if parent == nil {
parent = bc.GetHeader(block.ParentHash(), block.NumberU64()-1)
}
statedb, err := state.New(parent.Root, bc.stateCache)
5.4執行塊中的交易: (稍后會在下節對此進行詳細分析)
receipts, logs, usedGas, err := bc.processor.Process(block, statedb, bc.vmConfig)
5.5 使用默認的validator校驗狀態:
bc.validator.ValidateState(block, statedb, receipts, usedGas);
5.6 將塊寫入到區塊鏈中並獲取狀態: (稍后會在下節對此進行詳細分析)
status, err := bc.writeBlockWithState(block, receipts, logs, statedb, false)
⑥:校驗寫入區塊的狀態
CanonStatTy
: 插入成功新的blockSideStatTy
:插入成功新的分叉區塊Default
:插入未知狀態的block
⑦:如果還有塊,並且是未來塊的話,那么將塊添加到未來塊的緩存中去
bc.addFutureBlock(block)
至此insertChain
大概介紹清楚。
執行塊中交易
在我們將區塊上鏈,有一個關鍵步驟就是執行區塊交易:
receipts, logs, usedGas, err := bc.processor.Process(block, statedb, bc.vmConfig)
進入函數,具體分析:
①:准備要用的字段,循環執行交易
關鍵函數:ApplyTransaction
,根據此函數返回收據。
1.1 將交易結構轉成Message
結構
msg, err := tx.AsMessage(types.MakeSigner(config, header.Number))
1.2 創建要在EVM環境中使用的新上下文
context := NewEVMContext(msg, header, bc, author)
1.3 創建一個新環境,其中包含有關事務和調用機制的所有相關信息。
vmenv := vm.NewEVM(context, statedb, config, cfg)
1.4 將交易應用到當前狀態(包含在env中)
_, gas, failed, err := ApplyMessage(vmenv, msg, gp)
這部分代碼繼續跟進:
func ApplyMessage(evm *vm.EVM, msg Message, gp *GasPool) ([]byte, uint64, bool, error) {
return NewStateTransition(evm, msg, gp).TransitionDb()
}
NewStateTransition
是一個狀態轉換對象,TransitionDb()
負責轉換交易狀態,繼續跟進:
先進行preCheck
,用來校驗nonce
是否正確
st.preCheck()
if st.msg.CheckNonce() {
nonce := st.state.GetNonce(st.msg.From())
if nonce < st.msg.Nonce() {
return ErrNonceTooHigh
} else if nonce > st.msg.Nonce() {
return ErrNonceTooLow
}
}
計算所需gas
:
gas, err := IntrinsicGas(st.data, contractCreation, homestead, istanbul)
扣除gas
:
if err = st.useGas(gas); err != nil {
return nil, 0, false, err
}
func (st *StateTransition) useGas(amount uint64) error {
if st.gas < amount {
return vm.ErrOutOfGas
}
st.gas -= amount
return nil
}
如果是合約交易,則新建一個合約
ret, _, st.gas, vmerr = evm.Create(sender, st.data, st.gas, st.value)
如果不是合約交易,則增加nonce
st.state.SetNonce(msg.From(), st.state.GetNonce(sender.Address())+1)
ret, st.gas, vmerr = evm.Call(sender, st.to(), st.data, st.gas, st.value)
重點關注evm.call
方法:
檢查賬戶是否有足夠的氣體進行轉賬
if !evm.Context.CanTransfer(evm.StateDB, caller.Address(), value) {
return nil, gas, ErrInsufficientBalance
}
如果stateDb不存在此賬戶,則新建賬戶
if !evm.StateDB.Exist(addr) {
evm.StateDB.CreateAccount(addr)
}
執行轉賬操作
evm.Transfer(evm.StateDB, caller.Address(), to.Address(), value)
創建合約
contract := NewContract(caller, to, value, gas)
執行合約
ret, err = run(evm, contract, input, false)
添加余額
st.state.AddBalance(st.evm.Coinbase, new(big.Int).Mul(new(big.Int).SetUint64(st.gasUsed()), st.gasPrice))
回到ApplyTransaction
1.5 調用IntermediateRoot
計算狀態trie
的當前根哈希值。
最終確定所有骯臟的存儲狀態,並把它們寫進trie
s.Finalise(deleteEmptyObjects)
將trie根設置為當前的根哈希並將給定的object
寫入到trie
中
obj.updateRoot(s.db)
s.updateStateObject(obj)
1.6 創建收據
receipt := types.NewReceipt(root, failed, *usedGas)
receipt.TxHash = tx.Hash()
receipt.GasUsed = gas
if msg.To() == nil {
receipt.ContractAddress = crypto.CreateAddress(vmenv.Context.Origin, tx.Nonce())
}
// Set the receipt logs and create a bloom for filtering
receipt.Logs = statedb.GetLogs(tx.Hash())
receipt.Bloom = types.CreateBloom(types.Receipts{receipt})
receipt.BlockHash = statedb.BlockHash()
receipt.BlockNumber = header.Number
receipt.TransactionIndex = uint(statedb.TxIndex())
②:最后完成區塊,應用任何共識引擎特定的額外功能(例如區塊獎勵)
p.engine.Finalize(p.bc, header, statedb, block.Transactions(), block.Uncles())
func (ethash *Ethash) Finalize(chain consensus.ChainReader, header *types.Header, state *state.StateDB, txs []*types.Transaction, uncles []*types.Header) {
// Accumulate any block and uncle rewards and commit the final state root
//累積任何塊和叔叔的獎勵並提交最終狀態樹根
accumulateRewards(chain.Config(), state, header, uncles)
header.Root = state.IntermediateRoot(chain.Config().IsEIP158(header.Number))
}
到此為止bc.processor.Process
執行完畢,返回receipts
.
校驗狀態
大致包括4部分的校驗:
①:校驗使用的gas
是否相等
if block.GasUsed() != usedGas {
return fmt.Errorf("invalid gas used (remote: %d local: %d)", block.GasUsed(), usedGas)
}
②:校驗bloom是否相等
rbloom := types.CreateBloom(receipts)
if rbloom != header.Bloom {
return fmt.Errorf("invalid bloom (remote: %x local: %x)", header.Bloom, rbloom)
}
③:校驗收據哈希是否相等
receiptSha := types.DeriveSha(receipts)
if receiptSha != header.ReceiptHash {
return fmt.Errorf("invalid receipt root hash (remote: %x local: %x)", header.ReceiptHash, receiptSha)
}
④:校驗merkleroot 是否相等
if root := statedb.IntermediateRoot(v.config.IsEIP158(header.Number)); header.Root != root {
return fmt.Errorf("invalid merkle root (remote: %x local: %x)", header.Root, root)
}
將塊和關聯狀態寫入到數據庫
函數:WriteBlockWithState
①:計算塊的total td
ptd := bc.GetTd(block.ParentHash(), block.NumberU64()-1)
②:添加待插入塊本身的td
,並將此時最新的total td
存儲到數據庫中。
bc.hc.WriteTd(block.Hash(), block.NumberU64(), externTd)
③:將塊的header
和body
分別序列化到數據庫
rawdb.WriteBlock(bc.db, block)
->WriteBody(db, block.Hash(), block.NumberU64(), block.Body())
->WriteHeader(db, block.Header())
④:將狀態寫入底層內存Trie
數據庫
state.Commit(bc.chainConfig.IsEIP158(block.Number()))
⑤:遍歷節點數據寫入到磁盤
triedb.Commit(header.Root, true)
⑥:存儲一個塊的所有交易數據
rawdb.WriteReceipts(batch, block.Hash(), block.NumberU64(), receipts)
⑦:將新的head
塊注入到當前鏈中
if status == CanonStatTy {
bc.insert(block)
}
- 存儲分配給規范塊的哈希
- 存儲頭塊的哈希
- 存儲最新的快
- 更新
currentFastBlock
⑧:發送chainEvent
事件或者ChainSideEvent
事件或者ChainHeadEvent
事件
if status == CanonStatTy {
bc.chainFeed.Send(ChainEvent{Block: block, Hash: block.Hash(), Logs: logs})
if len(logs) > 0 {
bc.logsFeed.Send(logs)
}
if emitHeadEvent {
bc.chainHeadFeed.Send(ChainHeadEvent{Block: block})
}
} else {
bc.chainSideFeed.Send(ChainSideEvent{Block: block})
}
到此writeBlockWithState 結束,從上面可以知道,insertChain的最終還是調用了writeBlockWithState
的insert方法完成了最終的上鏈入庫動作。
最后整個insertChain
函數,如果已經完成了插入,就發送chain head
事件
defer func() {
if lastCanon != nil && bc.CurrentBlock().Hash() == lastCanon.Hash() {
bc.chainHeadFeed.Send(ChainHeadEvent{lastCanon})
}
}()
比較常見的有這么幾處會進行訂閱chain head
事件:
-
在tx_pool.go中,收到此事件會進行換head的操作
pool.chainHeadSub = pool.chain.SubscribeChainHeadEvent(pool.chainHeadCh)
-
在worker.go中,其他節點的礦工收到此事件就會停止當前的挖礦,繼續下一個挖礦任務
worker.chainHeadSub = eth.BlockChain().SubscribeChainHeadEvent(worker.chainHeadCh)
到此整個區塊上鏈入庫就完成了,最后再送上一張總結的圖: