以太坊源碼交易流程源碼解讀


和Bitcoin類似,以太坊的轉賬流程基本是這樣的:

1.發起交易:指定目標地址和交易金額,以及必需的gas/gasLimit

2.交易簽名:使用賬戶私鑰對交易進行簽名

3.提交交易:驗簽交易,並將交易提交到交易緩沖池

4.廣播交易:通知以太坊虛擬機吧交易信息廣播給其他節點

Eth Transaction結構

首先,在源碼中搜索到Transaction結構的定義之處:./core/types/transaction.go

//交易結構體 type Transaction struct { //交易數據 data txdata // caches hash atomic.Value size atomic.Value from atomic.Value } type txdata struct { //發送者發起的交易總數 AccountNonce uint64 `json:"nonce" gencodec:"required"` //交易的Gas價格 Price *big.Int `json:"gasPrice" gencodec:"required"` //交易允許消耗的最大Gas GasLimit uint64 `json:"gas" gencodec:"required"` //交易接收者地址 Recipient *common.Address `json:"to" rlp:"nil"` // nil means contract creation //交易額 Amount *big.Int `json:"value" gencodec:"required"` //其他數據 Payload []byte `json:"input" gencodec:"required"` // Signature values // 交易相關簽名數據 V *big.Int `json:"v" gencodec:"required"` R *big.Int `json:"r" gencodec:"required"` S *big.Int `json:"s" gencodec:"required"` // This is only used when marshaling to JSON. //交易HAsh Hash *common.Hash `json:"hash" rlp:"-"` } 

Eth Tx轉賬邏輯

1.創建交易

首先我們曾在之前的geth基本功能一篇中使用轉賬命令eth.sendTransaction()進行過轉賬操作。
當命令行輸入該指令時,geth內部實際是調用了PublicTransactionPoolAPI的sendTransaction接口:./internal/ethapi/api.go

// SendTransaction will create a transaction from the given arguments and // tries to sign it with the key associated with args.To. If the given passwd isn't // able to decrypt the key it fails. // 發起交易 func (s *PrivateAccountAPI) SendTransaction(ctx context.Context, args SendTxArgs, passwd string) (common.Hash, error) { //交易參數相關判斷 if args.Nonce == nil { // Hold the addresse's mutex around signing to prevent concurrent assignment of // the same nonce to multiple accounts. s.nonceLock.LockAddr(args.From) defer s.nonceLock.UnlockAddr(args.From) } //交易簽名 signed, err := s.signTransaction(ctx, args, passwd) if err != nil { return common.Hash{}, err } //提交交易 return submitTransaction(ctx, s.b, signed) } 

然后,我們看一下交易是怎么實現簽名的。

// signTransactions sets defaults and signs the given transaction // NOTE: the caller needs to ensure that the nonceLock is held, if applicable, // and release it after the transaction has been submitted to the tx pool // 交易簽名 func (s *PrivateAccountAPI) signTransaction(ctx context.Context, args SendTxArgs, passwd string) (*types.Transaction, error) { // Look up the wallet containing the requested signer //獲取交易發起方錢包 account := accounts.Account{Address: args.From} wallet, err := s.am.Find(account) if err != nil { return nil, err } // Set some sanity defaults and terminate on failure if err := args.setDefaults(ctx, s.b); err != nil { return nil, err } // Assemble the transaction and sign with the wallet //組裝交易 tx := args.toTransaction() var chainID *big.Int if config := s.b.ChainConfig(); config.IsEIP155(s.b.CurrentBlock().Number()) { chainID = config.ChainID } //對交易進行簽名 return wallet.SignTxWithPassphrase(account, passwd, tx, chainID) } 

繼續循着toTransaction線索去找創建交易的代碼:

func (args *SendTxArgs) toTransaction() *types.Transaction {
    var input []byte

    //相關賦值 if args.Data != nil { input = *args.Data } else if args.Input != nil { input = *args.Input } //交易接收方地址為空,創建的交易為合約交易 if args.To == nil { return types.NewContractCreation(uint64(*args.Nonce), (*big.Int)(args.Value), uint64(*args.Gas), (*big.Int)(args.GasPrice), input) } //創建普通的轉賬交易 return types.NewTransaction(uint64(*args.Nonce), *args.To, (*big.Int)(args.Value), uint64(*args.Gas), (*big.Int)(args.GasPrice), input) } 

這里終於找到了創建交易的方法NewTransaction:./core/types/transaction.go

//創建普通交易 func NewTransaction(nonce uint64, to common.Address, amount *big.Int, gasLimit uint64, gasPrice *big.Int, data []byte) *Transaction { return newTransaction(nonce, &to, amount, gasLimit, gasPrice, data) } //創建合約交易 func NewContractCreation(nonce uint64, amount *big.Int, gasLimit uint64, gasPrice *big.Int, data []byte) *Transaction { return newTransaction(nonce, nil, amount, gasLimit, gasPrice, data) } //創建普通交易 func newTransaction(nonce uint64, to *common.Address, amount *big.Int, gasLimit uint64, gasPrice *big.Int, data []byte) *Transaction { if len(data) > 0 { data = common.CopyBytes(data) } d := txdata{ AccountNonce: nonce, Recipient: to, Payload: data, Amount: new(big.Int), GasLimit: gasLimit, Price: new(big.Int), V: new(big.Int), R: new(big.Int), S: new(big.Int), } if amount != nil { d.Amount.Set(amount) } if gasPrice != nil { d.Price.Set(gasPrice) } return &Transaction{data: d} } 

2.交易簽名

從上面創建交易的代碼細節我們已經知道對交易進行簽名是通過錢包類的一個方法實現的wallet.SignTxWithPassphrase。

源碼在./accounts/keystore/keystore_wallet.go

// SignTxWithPassphrase implements accounts.Wallet, attempting to sign the given // transaction with the given account using passphrase as extra authentication. // 交易簽名 func (w *keystoreWallet) SignTxWithPassphrase(account accounts.Account, passphrase string, tx *types.Transaction, chainID *big.Int) (*types.Transaction, error) { // Make sure the requested account is contained within //判斷賬戶合法性 if account.Address != w.account.Address { return nil, accounts.ErrUnknownAccount } if account.URL != (accounts.URL{}) && account.URL != w.account.URL { return nil, accounts.ErrUnknownAccount } // Account seems valid, request the keystore to sign //真正的簽名 return w.keystore.SignTxWithPassphrase(account, passphrase, tx, chainID) } 

繼續深入到簽名函數里。

// SignTxWithPassphrase signs the transaction if the private key matching the // given address can be decrypted with the given passphrase. func (ks *KeyStore) SignTxWithPassphrase(a accounts.Account, passphrase string, tx *types.Transaction, chainID *big.Int) (*types.Transaction, error) { // 判斷賬戶是否解鎖並獲取私鑰 _, key, err := ks.getDecryptedKey(a, passphrase) if err != nil { return nil, err } defer zeroKey(key.PrivateKey) // Depending on the presence of the chain ID, sign with EIP155 or homestead // EIP155規范需要chainID參數,即平時命令行使用的“--networkid”參數 if chainID != nil { return types.SignTx(tx, types.NewEIP155Signer(chainID), key.PrivateKey) } return types.SignTx(tx, types.HomesteadSigner{}, key.PrivateKey) } 

終於見到交易的簽名函數本尊了。

// SignTx signs the transaction using the given signer and private key func SignTx(tx *Transaction, s Signer, prv *ecdsa.PrivateKey) (*Transaction, error) { //1.對交易進行哈希 h := s.Hash(tx) //2.生成簽名 sig, err := crypto.Sign(h[:], prv) if err != nil { return nil, err } //3.將簽名數據填充到Tx信息中 return tx.WithSignature(s, sig) } 

找到這里后,就可以繼續深入crypto.Sign方法看下簽名是怎么根據交易哈希和私鑰生成的。

// Sign calculates an ECDSA signature. // // This function is susceptible to chosen plaintext attacks that can leak // information about the private key that is used for signing. Callers must // be aware that the given hash cannot be chosen by an adversery. Common // solution is to hash any input before calculating the signature. // // The produced signature is in the [R || S || V] format where V is 0 or 1. //根據ECDSA算法生成簽名,以字節數組的形式返回 按[R || S || V]格式 func Sign(hash []byte, prv *ecdsa.PrivateKey) (sig []byte, err error) { //哈希值判斷 if len(hash) != 32 { return nil, fmt.Errorf("hash is required to be exactly 32 bytes (%d)", len(hash)) } seckey := math.PaddedBigBytes(prv.D, prv.Params().BitSize/8) defer zeroBytes(seckey) return secp256k1.Sign(hash, seckey) } 

生成簽名后將簽名填充到交易信息的R,S,V字段。

// WithSignature returns a new transaction with the given signature. // This signature needs to be formatted as described in the yellow paper (v+27). // 生成簽名后將簽名填充到交易信息的R,S,V字段。 func (tx *Transaction) WithSignature(signer Signer, sig []byte) (*Transaction, error) { //獲取簽名信息 r, s, v, err := signer.SignatureValues(tx, sig) if err != nil { return nil, err } //將原有交易信息進行一份拷貝 cpy := &Transaction{data: tx.data} //簽名賦值 cpy.data.R, cpy.data.S, cpy.data.V = r, s, v return cpy, nil } 

3.交易提交

交易簽名后就可以提交到交易緩沖池,這里是通過submitTransaction()函數實現的。這里涉及到一個新的數據結構交易緩沖池TxPool,所以先來看下TxPool的結構:./core/tx_pool.go

// TxPool contains all currently known transactions. Transactions // enter the pool when they are received from the network or submitted // locally. They exit the pool when they are included in the blockchain. // // The pool separates processable transactions (which can be applied to the // current state) and future transactions. Transactions move between those // two states over time as they are received and processed. type TxPool struct { //交易緩沖池配置 config TxPoolConfig chainconfig *params.ChainConfig chain blockChain gasPrice *big.Int txFeed event.Feed scope event.SubscriptionScope chainHeadCh chan ChainHeadEvent chainHeadSub event.Subscription signer types.Signer mu sync.RWMutex currentState *state.StateDB // Current state in the blockchain head pendingState *state.ManagedState // Pending state tracking virtual nonces currentMaxGas uint64 // Current gas limit for transaction caps locals *accountSet // Set of local transaction to exempt from eviction rules journal *txJournal // Journal of local transaction to back up to disk //當前所有可被處理的交易列表 pending map[common.Address]*txList // All currently processable transactions //當前所有不可被處理的交易隊列 queue map[common.Address]*txList // Queued but non-processable transactions beats map[common.Address]time.Time // Last heartbeat from each known account //所有的交易列表 key為交易hash all *txLookup // All transactions to allow lookups //將all中的交易按照gas price進行排列的數組,gas price相同按noce升序排列 priced *txPricedList // All transactions sorted by price wg sync.WaitGroup // for shutdown sync homestead bool } 

這里涉及到兩個重要的屬性pending和queue,它們的類型都是txList,所以就繼續看下txList的結構。

// txList is a "list" of transactions belonging to an account, sorted by account // nonce. The same type can be used both for storing contiguous transactions for // the executable/pending queue; and for storing gapped transactions for the non- // executable/future queue, with minor behavioral changes. type txList struct { //交易的nonce值是否連續 strict bool // Whether nonces are strictly continuous or not //已排序的交易Map txs *txSortedMap // Heap indexed sorted hash map of the transactions //最高成本交易價格 costcap *big.Int // Price of the highest costing transaction (reset only if exceeds balance) //最高花費的gas限制 gascap uint64 // Gas limit of the highest spending transaction (reset only if exceeds block limit) } ... // txSortedMap is a nonce->transaction hash map with a heap based index to allow // iterating over the contents in a nonce-incrementing way. type txSortedMap struct { //包含所有交易的字典,key是交易對應nonce items map[uint64]*types.Transaction // Hash map storing the transaction data //降序排列的Nonce值數組 index *nonceHeap // Heap of nonces of all the stored transactions (non-strict mode) //已經排序的交易緩存 cache types.Transactions // Cache of the transactions already sorted } 

交易緩沖池這里的邏輯大概是這樣的:交易提交后首先是進入到txPool的queue隊列緩存,然后再選擇一部分交易進入peending列表進行處理。當txPool滿了的時候,會根據priced的排序規則去除gas price廉價的交易來保證txPool正常運行。

我們可以看一下Eth默認的交易緩沖池配置:

// TxPoolConfig are the configuration parameters of the transaction pool. type TxPoolConfig struct { NoLocals bool // Whether local transaction handling should be disabled Journal string // Journal of local transactions to survive node restarts Rejournal time.Duration // Time interval to regenerate the local transaction journal PriceLimit uint64 // Minimum gas price to enforce for acceptance into the pool PriceBump uint64 // Minimum price bump percentage to replace an already existing transaction (nonce) AccountSlots uint64 // Minimum number of executable transaction slots guaranteed per account GlobalSlots uint64 // Maximum number of executable transaction slots for all accounts AccountQueue uint64 // Maximum number of non-executable transaction slots permitted per account GlobalQueue uint64 // Maximum number of non-executable transaction slots for all accounts Lifetime time.Duration // Maximum amount of time non-executable transaction are queued } // contains the default configurations for the transaction // pool. // TxPool默認配置 var DefaultTxPoolConfig = TxPoolConfig{ Journal: "transactions.rlp", Rejournal: time.Hour, //允許進入交易池的最低gas price PriceLimit: 1, //相同Nonce交易 gas price差值超過該值,則使用新的交易 PriceBump: 10, //pending列表中每個賬戶存儲的交易處閾值,超過該數可能被認為垃圾交易 AccountSlots: 16, //pending列表最大長度 GlobalSlots: 4096, //queue隊列中每個賬戶存儲的交易處閾值,超過該數可能被認為垃圾交易 AccountQueue: 64, //queue隊列最大長度 GlobalQueue: 1024, Lifetime: 3 * time.Hour, } 

現在了解了txPool結構之后,我們終於可以進入正題來看submitTransaction()函數的實現了:./internal/ethapi/api.go

// submitTransaction is a helper function that submits tx to txPool and logs a message. // 提交交易到交易池 func submitTransaction(ctx context.Context, b Backend, tx *types.Transaction) (common.Hash, error) { //b Backend是在eth Service初始化時創建的,在ethapiBackend(./eth/api_backend.go) // 通過Backend類真正實現提交交易 if err := b.SendTx(ctx, tx); err != nil { return common.Hash{}, err } if tx.To() == nil { signer := types.MakeSigner(b.ChainConfig(), b.CurrentBlock().Number()) from, err := types.Sender(signer, tx) if err != nil { return common.Hash{}, err } addr := crypto.CreateAddress(from, tx.Nonce()) log.Info("Submitted contract creation", "fullhash", tx.Hash().Hex(), "contract", addr.Hex()) } else { log.Info("Submitted transaction", "fullhash", tx.Hash().Hex(), "recipient", tx.To()) } return tx.Hash(), nil } 

按圖索驥,深入到Bakend.sendTx函數:

func (b *EthAPIBackend) SendTx(ctx context.Context, signedTx *types.Transaction) error {
    return b.eth.txPool.AddLocal(signedTx) } 

然后繼續找到txPool的addLocal函數:

// AddLocal enqueues a single transaction into the pool if it is valid, marking // the sender as a local one in the mean time, ensuring it goes around the local // pricing constraints. func (pool *TxPool) AddLocal(tx *types.Transaction) error { return pool.addTx(tx, !pool.config.NoLocals) } ... // addTx enqueues a single transaction into the pool if it is valid. // 將一筆普通交易添加到TxPool中 func (pool *TxPool) addTx(tx *types.Transaction, local bool) error { pool.mu.Lock() defer pool.mu.Unlock() // Try to inject the transaction and update any state // 將交易加入交易池queue隊列 replace, err := pool.add(tx, local) if err != nil { return err } // If we added a new transaction, run promotion checks and return // 通過promoteExecutables將queue中部分交易加入到pending列表中進行處理 if !replace { from, _ := types.Sender(pool.signer, tx) // already validated pool.promoteExecutables([]common.Address{from}) } return nil } 

首先,先去看看將交易加入到equeu隊列的方法add():

// add validates a transaction and inserts it into the non-executable queue for // later pending promotion and execution. If the transaction is a replacement for // an already pending or queued one, it overwrites the previous and returns this // so outer code doesn't uselessly call promote. // // If a newly added transaction is marked as local, its sending account will be // whitelisted, preventing any associated transaction from being dropped out of // the pool due to pricing constraints. func (pool *TxPool) add(tx *types.Transaction, local bool) (bool, error) { // If the transaction is already known, discard it //獲取交易hash並以此判斷交易池中是否已存在該筆交易 hash := tx.Hash() if pool.all.Get(hash) != nil { log.Trace("Discarding already known transaction", "hash", hash) return false, fmt.Errorf("known transaction: %x", hash) } // If the transaction fails basic validation, discard it // 驗證交易合法性 if err := pool.validateTx(tx, local); err != nil { log.Trace("Discarding invalid transaction", "hash", hash, "err", err) invalidTxCounter.Inc(1) return false, err } // If the transaction pool is full, discard underpriced transactions // 如果交易池已滿,按priced數組中gas price較低的交易剔除 if uint64(pool.all.Count()) >= pool.config.GlobalSlots+pool.config.GlobalQueue { // If the new transaction is underpriced, don't accept it if !local && pool.priced.Underpriced(tx, pool.locals) { log.Trace("Discarding underpriced transaction", "hash", hash, "price", tx.GasPrice()) underpricedTxCounter.Inc(1) return false, ErrUnderpriced } // New transaction is better than our worse ones, make room for it drop := pool.priced.Discard(pool.all.Count()-int(pool.config.GlobalSlots+pool.config.GlobalQueue-1), pool.locals) for _, tx := range drop { log.Trace("Discarding freshly underpriced transaction", "hash", tx.Hash(), "price", tx.GasPrice()) underpricedTxCounter.Inc(1) pool.removeTx(tx.Hash(), false) } } // If the transaction is replacing an already pending one, do directly // 如果交易已經存在於pending列表,比較新舊交易gasPrice的差值是否超過PriceBump // 若超過則使用新交易代替舊交易 from, _ := types.Sender(pool.signer, tx) // already validated if list := pool.pending[from]; list != nil && list.Overlaps(tx) { // Nonce already pending, check if required price bump is met inserted, old := list.Add(tx, pool.config.PriceBump) if !inserted { pendingDiscardCounter.Inc(1) return false, ErrReplaceUnderpriced } // New transaction is better, replace old one if old != nil { pool.all.Remove(old.Hash()) pool.priced.Removed() pendingReplaceCounter.Inc(1) } pool.all.Add(tx) pool.priced.Put(tx) pool.journalTx(from, tx) log.Trace("Pooled new executable transaction", "hash", hash, "from", from, "to", tx.To()) // We've directly injected a replacement transaction, notify subsystems go pool.txFeed.Send(NewTxsEvent{types.Transactions{tx}}) return old != nil, nil } // New transaction isn't replacing a pending one, push into queue // 將交易添加到equeu隊列 replace, err := pool.enqueueTx(hash, tx) if err != nil { return false, err } // Mark local addresses and journal local transactions // 判斷是否本地交易,保證本地交易優先被加入到TxPool if local { pool.locals.add(from) } pool.journalTx(from, tx) log.Trace("Pooled new future transaction", "hash", hash, "from", from, "to", tx.To()) return replace, nil } 

這里對交易合法性的驗證必須滿足8個條件:

// validateTx checks whether a transaction is valid according to the consensus // rules and adheres to some heuristic limits of the local node (price and size). // 交易合法性驗證 func (pool *TxPool) validateTx(tx *types.Transaction, local bool) error { // Heuristic limit, reject transactions over 32KB to prevent DOS attacks // 1.交易數據量必須 < 32KB if tx.Size() > 32*1024 { return ErrOversizedData } // Transactions can't be negative. This may never happen using RLP decoded // transactions but may occur if you create a transaction using the RPC. // 2.交易金額必須非負值 if tx.Value().Sign() < 0 { return ErrNegativeValue } // Ensure the transaction doesn't exceed the current block limit gas. // 3.交易的gasLimit必須 < 交易池當前規定最大gas if pool.currentMaxGas < tx.Gas() { return ErrGasLimit } // Make sure the transaction is signed properly // 4.交易簽名必須有效 from, err := types.Sender(pool.signer, tx) if err != nil { return ErrInvalidSender } // Drop non-local transactions under our own minimal accepted gas price // 5.交易的gas price必須大於交易池設置的gas price local = local || pool.locals.contains(from) // account may be local even if the transaction arrived from the network if !local && pool.gasPrice.Cmp(tx.GasPrice()) > 0 { return ErrUnderpriced } // Ensure the transaction adheres to nonce ordering // 6.交易的Nonce值必須大於鏈上該賬戶的Nonce if pool.currentState.GetNonce(from) > tx.Nonce() { return ErrNonceTooLow } // Transactor should have enough funds to cover the costs // cost == V + GP * GL // 7.交易賬戶余額必須 > 交易額 + gasPrice * gasLimit if pool.currentState.GetBalance(from).Cmp(tx.Cost()) < 0 { return ErrInsufficientFunds } // 8.交易的gasLimit必須 > 對應數據量所需要的最低gas水平 intrGas, err := IntrinsicGas(tx.Data(), tx.To() == nil, pool.homestead) if err != nil { return err } if tx.Gas() < intrGas { return ErrIntrinsicGas } return nil } 

接下來繼續看,交易從queue隊列到pending列表又是怎么一個過程:

// promoteExecutables moves transactions that have become processable from the // future queue to the set of pending transactions. During this process, all // invalidated transactions (low nonce, low balance) are deleted. func (pool *TxPool) promoteExecutables(accounts []common.Address) { // Track the promoted transactions to broadcast them at once var promoted []*types.Transaction // Gather all the accounts potentially needing updates if accounts == nil { accounts = make([]common.Address, 0, len(pool.queue)) for addr := range pool.queue { accounts = append(accounts, addr) } } // Iterate over all accounts and promote any executable transactions for _, addr := range accounts { list := pool.queue[addr] if list == nil { continue // Just in case someone calls with a non existing account } // Drop all transactions that are deemed too old (low nonce) // 1.1丟棄交易nonce值 < 賬戶當前nonce的交易 for _, tx := range list.Forward(pool.currentState.GetNonce(addr)) { hash := tx.Hash() log.Trace("Removed old queued transaction", "hash", hash) pool.all.Remove(hash) pool.priced.Removed() } // Drop all transactions that are too costly (low balance or out of gas) // 1.2.丟棄賬戶余額不足的 drops, _ := list.Filter(pool.currentState.GetBalance(addr), pool.currentMaxGas) for _, tx := range drops { hash := tx.Hash() log.Trace("Removed unpayable queued transaction", "hash", hash) pool.all.Remove(hash) pool.priced.Removed() queuedNofundsCounter.Inc(1) } // Gather all executable transactions and promote them // 3.將交易添加到pending列表 for _, tx := range list.Ready(pool.pendingState.GetNonce(addr)) { hash := tx.Hash() if pool.promoteTx(addr, hash, tx) { log.Trace("Promoting queued transaction", "hash", hash) promoted = append(promoted, tx) } } // Drop all transactions over the allowed limit if !pool.locals.contains(addr) { for _, tx := range list.Cap(int(pool.config.AccountQueue)) { hash := tx.Hash() pool.all.Remove(hash) pool.priced.Removed() queuedRateLimitCounter.Inc(1) log.Trace("Removed cap-exceeding queued transaction", "hash", hash) } } // Delete the entire queue entry if it became empty. if list.Empty() { delete(pool.queue, addr) } } // Notify subsystem for new promoted transactions. if len(promoted) > 0 { go pool.txFeed.Send(NewTxsEvent{promoted}) } // If the pending limit is overflown, start equalizing allowances pending := uint64(0) for _, list := range pool.pending { pending += uint64(list.Len()) } //2 pending列表達到最大限量 if pending > pool.config.GlobalSlots { pendingBeforeCap := pending // Assemble a spam order to penalize large transactors first spammers := prque.New() for addr, list := range pool.pending { // Only evict transactions from high rollers // 統計高額交易 if !pool.locals.contains(addr) && uint64(list.Len()) > pool.config.AccountSlots { spammers.Push(addr, float32(list.Len())) } } // Gradually drop transactions from offenders // 逐漸驅逐高額交易 offenders := []common.Address{} for pending > pool.config.GlobalSlots && !spammers.Empty() { // Retrieve the next offender if not local address offender, _ := spammers.Pop() offenders = append(offenders, offender.(common.Address)) // Equalize balances until all the same or below threshold // 均衡各賬戶存儲的交易數直到交易數相同 /* 均衡交易數時采取的策略是: 2.1.在超出交易數的賬戶里以交易數最少的為標准,將其他賬戶的交易數削減至該標准 eg:10個賬戶交易數超過了AccountSlots(16),其中交易數最少的為18,則將其他9個賬戶的交易數削減至18 2.2.經過1后,pengding長度依舊超過GlobalSlots,此時按照AccountSlots標准將超標的賬戶里交易數削減至AccountSlots eg:將2.1里的10個賬戶的交易數都削減至AccountSlots(16) **/ // 2.1 if len(offenders) > 1 { // Calculate the equalization threshold for all current offenders // 超標賬戶的最低交易數 threshold := pool.pending[offender.(common.Address)].Len() // Iteratively reduce all offenders until below limit or threshold reached // 將其他賬戶的交易數削減至threshold for pending > pool.config.GlobalSlots && pool.pending[offenders[len(offenders)-2]].Len() > threshold { for i := 0; i < len(offenders)-1; i++ { list := pool.pending[offenders[i]] for _, tx := range list.Cap(list.Len() - 1) { // Drop the transaction from the global pools too hash := tx.Hash() pool.all.Remove(hash) pool.priced.Removed() // Update the account nonce to the dropped transaction if nonce := tx.Nonce(); pool.pendingState.GetNonce(offenders[i]) > nonce { pool.pendingState.SetNonce(offenders[i], nonce) } log.Trace("Removed fairness-exceeding pending transaction", "hash", hash) } pending-- } } } } // If still above threshold, reduce to limit or min allowance // 2.2 經過1的交易數均衡后,pengding長度依舊超過GlobalSlots 此時按照AccountSlots標准將超標的賬戶里交易數削減至AccountSlots if pending > pool.config.GlobalSlots && len(offenders) > 0 { for pending > pool.config.GlobalSlots && uint64(pool.pending[offenders[len(offenders)-1]].Len()) > pool.config.AccountSlots { for _, addr := range offenders { list := pool.pending[addr] for _, tx := range list.Cap(list.Len() - 1) { // Drop the transaction from the global pools too hash := tx.Hash() pool.all.Remove(hash) pool.priced.Removed() // Update the account nonce to the dropped transaction if nonce := tx.Nonce(); pool.pendingState.GetNonce(addr) > nonce { pool.pendingState.SetNonce(addr, nonce) } log.Trace("Removed fairness-exceeding pending transaction", "hash", hash) } pending-- } } } pendingRateLimitCounter.Inc(int64(pendingBeforeCap - pending)) } // If we've queued more transactions than the hard limit, drop oldest ones queued := uint64(0) for _, list := range pool.queue { queued += uint64(list.Len()) } // 3.eqeue隊列長度大於queue隊列最大長度 if queued > pool.config.GlobalQueue { // Sort all accounts with queued transactions by heartbeat // 對隊列里的所有賬戶按最近一次心跳時間排序 addresses := make(addresssByHeartbeat, 0, len(pool.queue)) for addr := range pool.queue { if !pool.locals.contains(addr) { // don't drop locals addresses = append(addresses, addressByHeartbeat{addr, pool.beats[addr]}) } } sort.Sort(addresses) // Drop transactions until the total is below the limit or only locals remain // 按順序刪除相關賬戶的交易,直到queue隊列長度符合條件 for drop := queued - pool.config.GlobalQueue; drop > 0 && len(addresses) > 0; { addr := addresses[len(addresses)-1] list := pool.queue[addr.address] addresses = addresses[:len(addresses)-1] // Drop all transactions if they are less than the overflow if size := uint64(list.Len()); size <= drop { for _, tx := range list.Flatten() { pool.removeTx(tx.Hash(), true) } drop -= size queuedRateLimitCounter.Inc(int64(size)) continue } // Otherwise drop only last few transactions txs := list.Flatten() for i := len(txs) - 1; i >= 0 && drop > 0; i-- { pool.removeTx(txs[i].Hash(), true) drop-- queuedRateLimitCounter.Inc(1) } } } } 

在這里promoteExecutables主要有三個作用:

1.將queue中選出符合條件的交易加入到pending中。在這之前需要對交易進行一些判斷:

1.1丟棄交易nonce值 < 賬戶當前nonce的交易
1.2.丟棄賬戶余額不足的

2.對pending列表進行清理,以使其滿足相關配置條件。

2.1在超出交易數的賬戶里以交易數最少的為標准,將其他賬戶的交易數削減至該標准 eg:10個賬戶交易數超過了AccountSlots(16),其中交易數最少的為18,則將其他9個賬戶的交易數削減至18
2.2經過1后,pengding長度依舊超過GlobalSlots,此時按照AccountSlots標准將超標的賬戶里交易數削減至AccountSlots eg:將2.1里的10個賬戶的交易數都削減至AccountSlots(16)

3.對queue隊列進行清理,以使其滿足相關配置條件。

eqeue隊列長度大於queue隊列最大長度,按順序刪除相關賬戶的交易,直到queue隊列長度符合條件

執行和廣播交易

接着pool.txFeed.Send發送一個TxPreEvent事件,外部呢會通過SubscribeNewTxsEvent()函數來訂閱該事件:

// SubscribeNewTxsEvent registers a subscription of NewTxsEvent and // starts sending event to the given channel. func (pool *TxPool) SubscribeNewTxsEvent(ch chan<- NewTxsEvent) event.Subscription { return pool.scope.Track(pool.txFeed.Subscribe(ch)) } 

在源碼中全局搜索這個函數,在./miner/worker.go中發現一次SubscribeNewTxsEvent的訂閱。

 
SubscribeNewTxsEvent訂閱

我們發現這里訂閱了TxPreEvent事件后,開啟了一個goroutine來處理該事件,進一步分析update函數,可以看到,如果當前節點不挖礦會調用commitTransactions函數提交交易;否則會調用commitNewWork函數,但其內部依然會調用commitTransactions函數提交交易。

func (self *worker) update() { defer self.txsSub.Unsubscribe() defer self.chainHeadSub.Unsubscribe() defer self.chainSideSub.Unsubscribe() for { // A real event arrived, process interesting content select { // Handle ChainHeadEvent case <-self.chainHeadCh: self.commitNewWork() // Handle ChainSideEvent case ev := <-self.chainSideCh: self.uncleMu.Lock() self.possibleUncles[ev.Block.Hash()] = ev.Block self.uncleMu.Unlock() // Handle NewTxsEvent case ev := <-self.txsCh: // Apply transactions to the pending state if we're not mining. // // Note all transactions received may not be continuous with transactions // already included in the current mining block. These transactions will // be automatically eliminated. if atomic.LoadInt32(&self.mining) == 0 { self.currentMu.Lock() txs := make(map[common.Address]types.Transactions) for _, tx := range ev.Txs { acc, _ := types.Sender(self.current.signer, tx) txs[acc] = append(txs[acc], tx) } txset := types.NewTransactionsByPriceAndNonce(self.current.signer, txs) //當前節點不挖礦,提交交易 self.current.commitTransactions(self.mux, txset, self.chain, self.coinbase) self.updateSnapshot() self.currentMu.Unlock() } else { // If we're mining, but nothing is being processed, wake on new transactions // 當前節點為礦工節點,commitNewWork進行挖礦 if self.config.Clique != nil && self.config.Clique.Period == 0 { self.commitNewWork() } } // System stopped case <-self.txsSub.Err(): return case <-self.chainHeadSub.Err(): return case <-self.chainSideSub.Err(): return } } } 

在上面搜索SubscribeNewTxsEvent函數時,另一個調用的地方便是./eth/handler.go。這里和上面一樣也是創建了一個gorountine來處理TxPreEvent事件。

func (pm *ProtocolManager) Start(maxPeers int) { pm.maxPeers = maxPeers // broadcast transactions pm.txsCh = make(chan core.NewTxsEvent, txChanSize) pm.txsSub = pm.txpool.SubscribeNewTxsEvent(pm.txsCh) go pm.txBroadcastLoop() // broadcast mined blocks pm.minedBlockSub = pm.eventMux.Subscribe(core.NewMinedBlockEvent{}) go pm.minedBroadcastLoop() // start sync handlers go pm.syncer() go pm.txsyncLoop() } ... func (pm *ProtocolManager) txBroadcastLoop() { for { select { case event := <-pm.txsCh: pm.BroadcastTxs(event.Txs) // Err() channel will be closed when unsubscribing. case <-pm.txsSub.Err(): return } } } ... // BroadcastTxs will propagate a batch of transactions to all peers which are not known to // already have the given transaction. func (pm *ProtocolManager) BroadcastTxs(txs types.Transactions) { var txset = make(map[*peer]types.Transactions) // Broadcast transactions to a batch of peers not knowing about it for _, tx := range txs { peers := pm.peers.PeersWithoutTx(tx.Hash()) for _, peer := range peers { txset[peer] = append(txset[peer], tx) } log.Trace("Broadcast transaction", "hash", tx.Hash(), "recipients", len(peers)) } // FIXME include this again: peers = peers[:int(math.Sqrt(float64(len(peers))))] for peer, txs := range txset { peer.AsyncSendTransactions(txs) } } 

至此,一筆交易從發起到構建到簽名驗證以及緩存到交易池然后廣播給其他節點的整個流程的邏輯就看完了。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM