FileCoin Lotus復制證明 PoRep 源碼梳理


流程圖

Incoming

lotus-miner-storage,首先調用 PledgeSector 通過類似微服務的方式調用

在 cmd/lotus-storage-miner/sectors.go 發出生成扇區的命令,通過微服務的方式調用

    var pledgeSectorCmd = &cli.Command{
    Name:  "pledge-sector",
    Usage: "store random data in a sector",
    Action: func(cctx *cli.Context) error {
        // 獲取miner網關地址
        nodeApi, closer, err := lcli.GetStorageMinerAPI(cctx)
        if err != nil {
            return err
        }
        defer closer()
        ctx := lcli.ReqContext(cctx)

        return nodeApi.PledgeSector(ctx)
    },
}

在 storage/garbage.go 生成新的扇區,獲取分片數組大小,扇區id,該過程關鍵在調用內部方法 m.pledgeSector產生數據,填滿扇區數據。

func (m *Miner) PledgeSector() error {
    go func() {
        ctx := context.TODO() // we can't use the context from command which invokes
        // this, as we run everything here async, and it's cancelled when the
        // command exits

        // 一共多少個分片,是否跟生成默克爾書的分塊對應?
        size := sectorbuilder.UserBytesForSectorSize(m.sb.SectorSize())

        // 扇區id
        sid, err := m.sb.AcquireSectorId()
        if err != nil {
            log.Errorf("%+v", err)
            return
        }
        // 產生分片數組,該方法中會將生成的簽名信息提交到鏈上,重點方法
        pieces, err := m.pledgeSector(ctx, sid, []uint64{}, size)
        if err != nil {
            log.Errorf("%+v", err)
            return
        }
        // 產生新的扇區
        if err := m.newSector(context.TODO(), sid, pieces[0].DealID, pieces[0].ppi()); err != nil {
            log.Errorf("%+v", err)
            return
        }
    }()
    return nil
}

在重點查看m.pledgeSector,該方法主要作用是為每隔扇區生成一個憑據,並把每隔憑據封裝成一個交易信息,提交到鏈上,並解析出鏈上的提交信息進行判斷交易id是否一致,存儲數據;返回信息為分片信息數組

func (m *Miner) pledgeSector(ctx context.Context, sectorID uint64, existingPieceSizes []uint64, sizes ...uint64) ([]Piece, error) {
    ...
    //  將交易信息提交到鏈上
    params, aerr := actors.SerializeParams(&actors.PublishStorageDealsParams{
        Deals: deals,
    })
    ...
    //等待鏈上反饋消息
    r, err := m.api.StateWaitMsg(ctx, smsg.Cid())
    if err != nil {
        return nil, err
    }
    ...
    //從鏈上消息中解析出DealID,看是否一致
    var resp actors.PublishStorageDealResponse
    if err := resp.UnmarshalCBOR(bytes.NewReader(r.Receipt.Return)); err != nil {
        return nil, err
    }
    if len(resp.DealIDs) != len(sizes) {
        return nil, xerrors.New("got unexpected number of DealIDs from PublishStorageDeals")
    }
    ....
    out := make([]Piece, len(sizes))
  
    //根據鏈上確認的結果,首先將piece的信息存入到sector里
    for i, size := range sizes {
        //填充數據
        ppi, err := m.sb.AddPiece(size, sectorID, io.LimitReader(rand.New(rand.NewSource(42)), int64(size)), existingPieceSizes)
        if err != nil {
            return nil, err
        }

        existingPieceSizes = append(existingPieceSizes, size)
        out[i] = Piece{
            DealID: resp.DealIDs[i],
            Size:   ppi.Size,
            CommP:  ppi.CommP[:],
        }
    }
    return out, nil
}

扇區信息生成之后調用 /storage/secotrs.go

//扇區信息生成之后,調用該方法
func (m *Miner) onSectorIncoming(sector *SectorInfo) {
    // 判斷id是否存在
    has, err := m.sectors.Has(sector.SectorID)
    if err != nil {
        return
    }
    if has {
        log.Warnf("SealPiece called more than once for sector %d", sector.SectorID)
        return
    }

    // 把數據寫入 扇區 硬盤中
    if err := m.sectors.Begin(sector.SectorID, sector); err != nil {
        log.Errorf("sector tracking failed: %s", err)
        return
    }

    go func() {
        select {
        case m.sectorUpdated <- sectorUpdate{  //更改狀態
            newState: api.Packing,
            id:       sector.SectorID,
        }:
        case <-m.stop:
            log.Warn("failed to send incoming sector update, miner shutting down")
        }
    }()
}    

以上為 Incomeing 過程,主要作用是計算piece大小,產生扇區id信息;把每個piece的大小產生憑據(包含交易信息等),提交到鏈上,進行驗證;之后用piece數組,產生扇區信息;然后把扇區的信息寫入磁盤,將狀態更改 Packing狀態,此過程將消耗大量的 cpu 和內存

Packing

后續的操作主要在 /storage/sector_states.go 文件中
主要是判斷扇區數據是否完整,將沒填滿的扇區填充完整,之后將狀態更改為 Unsealed狀態

// 打包的狀態,將沒喲填滿數據的扇區填滿
func (m *Miner) handlePacking(ctx context.Context, sector SectorInfo) *sectorUpdate {
    log.Infow("performing filling up rest of the sector...", "sector", sector.SectorID)

    var allocated uint64
    for _, piece := range sector.Pieces {
        allocated += piece.Size
    }

    ubytes := sectorbuilder.UserBytesForSectorSize(m.sb.SectorSize())

    if allocated > ubytes {
        return sector.upd().fatal(xerrors.Errorf("too much data in sector: %d > %d", allocated, ubytes))
    }

    //fillers From Remaining
    fillerSizes, err := fillersFromRem(ubytes - allocated)
    if err != nil {
        return sector.upd().fatal(err)
    }

    if len(fillerSizes) > 0 {
        log.Warnf("Creating %d filler pieces for sector %d", len(fillerSizes), sector.SectorID)
    }
    //此處調用  pledgeSector將扇區填滿
    pieces, err := m.pledgeSector(ctx, sector.SectorID, sector.existingPieces(), fillerSizes...)
    if err != nil {
        return sector.upd().fatal(xerrors.Errorf("filling up the sector (%v): %w", fillerSizes, err))
    }

    //數據填充完畢后,扇區的狀態轉換到了Unsealed狀態
    return sector.upd().to(api.Unsealed).state(func(info *SectorInfo) {
        info.Pieces = append(info.Pieces, pieces...)
    })
}

Unsealed

func (m *Miner) handleUnsealed(ctx context.Context, sector SectorInfo) *sectorUpdate {
    log.Infow("performing sector replication...", "sector", sector.SectorID)
    // 調用隨機函數返回一個隨機選票(包含區塊高度,和票據)
    // 隨機函數在初始化礦工生成的,運用的反射,具體需要詳細查看 ?
    ticket, err := m.tktFn(ctx)
    if err != nil {
        return sector.upd().fatal(err)
    }

    // 開始進行密封的操作,主要根據源數據產生加密數據,產生一份副本
    rspco, err := m.sb.SealPreCommit(sector.SectorID, *ticket, sector.pieceInfos())
    if err != nil {
        return sector.upd().to(api.SealFailed).error(xerrors.Errorf("seal pre commit failed: %w", err))
    }
    // 更改狀態,把數據的唯一復制憑據信息,和隨機數相關更新
    return sector.upd().to(api.PreCommitting).state(func(info *SectorInfo) {
        info.CommD = rspco.CommD[:]
        info.CommR = rspco.CommR[:]
        info.Ticket = SealTicket{
            BlockHeight: ticket.BlockHeight,
            TicketBytes: ticket.TicketBytes[:],
        }
    })
}

/lib/sectorbuilder/sectorbuilder.go 文件中
判斷在 .lotusstorage 文件下幾個目錄是存在 cache,staged,sealed;調用rust庫的代碼生成相關的憑據

func (sb *SectorBuilder) SealPreCommit(sectorID uint64, ticket SealTicket, pieces []PublicPieceInfo) (RawSealPreCommitOutput, error) {
   
    ...
        // 底層是rust部分的代碼生成憑據信息
        rspco, err := sectorbuilder.SealPreCommit(
        sb.ssize,
        PoRepProofPartitions,
        cacheDir,
        stagedPath,
        sealedPath,
        sectorID,
        addressToProverID(sb.Miner),
        ticket.TicketBytes,
        pieces,
    )

    log.Warn(xerrors.Errorf("[qz2.4]: time to precommit %v at :%v", sectorID, time.Since(start).Milliseconds()))
    start = time.Now()
    if err != nil {
        return RawSealPreCommitOutput{}, xerrors.Errorf("presealing sector %d (%s): %w", sectorID, stagedPath, err)
    }
    // 返會相關的憑據信息
    return RawSealPreCommitOutput(rspco), nil
    
}

此過程會產生大量的緩存文件用於計算,產生加密后數據的唯一副本相關的憑據,此時並沒有產生復制證明

PreCommitting

主要是講消息廣播到鏈上去,並把該消息cid存起來;主要是讓區塊到了指定的高度驗證數據的有效性

func (m *Miner) handlePreCommitting(ctx context.Context, sector SectorInfo) *sectorUpdate {
    // 要發到鏈上的消息
    params := &actors.SectorPreCommitInfo{
        SectorNumber: sector.SectorID,

        CommR:     sector.CommR,
        SealEpoch: sector.Ticket.BlockHeight,
        DealIDs:   sector.deals(),
    }
    enc, aerr := actors.SerializeParams(params)
    if aerr != nil {
        return sector.upd().to(api.PreCommitFailed).error(xerrors.Errorf("could not serialize commit sector parameters: %w", aerr))
    }
    // 封裝消息體
    msg := &types.Message{
        To:       m.maddr,
        From:     m.worker,
        Method:   actors.MAMethods.PreCommitSector,
        Params:   enc,
        Value:    types.NewInt(0), // TODO: need to ensure sufficient collateral
        GasLimit: types.NewInt(1000000 /* i dont know help */),
        GasPrice: types.NewInt(1),
    }

    log.Info("submitting precommit for sector: ", sector.SectorID)
    // 廣播
    smsg, err := m.api.MpoolPushMessage(ctx, msg)
    if err != nil {
        return sector.upd().to(api.PreCommitFailed).error(xerrors.Errorf("pushing message to mpool: %w", err))
    }
    // 將受到消息cid 保存
    return sector.upd().to(api.PreCommitted).state(func(info *SectorInfo) {
        mcid := smsg.Cid()
        info.PreCommitMessage = &mcid
    })
}

PreCommitted

func (m *Miner) handlePreCommitted(ctx context.Context, sector SectorInfo) *sectorUpdate {
   // 等待鏈上的消息
    mw, err := m.api.StateWaitMsg(ctx, *sector.PreCommitMessage)
    if err != nil {
        return sector.upd().to(api.PreCommitFailed).error(err)
    }

    ...
    // 區塊的高度+定義的延時量(8)
    randHeight := mw.TipSet.Height() + build.InteractivePoRepDelay - 1 // -1 because of how the messages are applied
    log.Infof("precommit for sector %d made it on chain, will start proof computation at height %d", sector.SectorID, randHeight)

    updateNonce := sector.Nonce
    // 一個是在區塊到達一定的高度執行的方法和回滾的方法
    err = m.events.ChainAt(func(ctx context.Context, ts *types.TipSet, curH uint64) error {
        // 根據區塊高度和ts key生成隨機數
        rand, err := m.api.ChainGetRandomness(ctx, ts.Key(), int64(randHeight))
        if err != nil {
            err = xerrors.Errorf("failed to get randomness for computing seal proof: %w", err)

            m.sectorUpdated <- *sector.upd().fatal(err)
            return err
        }
        // 更改狀態
        m.sectorUpdated <- *sector.upd().to(api.Committing).setNonce(updateNonce).state(func(info *SectorInfo) {
            // 將密封 seed更新
            info.Seed = SealSeed{
                BlockHeight: randHeight,
                TicketBytes: rand,
            }
        })

        updateNonce++

        return nil
    }, func(ctx context.Context, ts *types.TipSet) error {
        log.Warn("revert in interactive commit sector step")
        // TODO: need to cancel running process and restart...
        return nil
    }, build.InteractivePoRepConfidence, mw.TipSet.Height()+build.InteractivePoRepDelay)
    if err != nil {
        log.Warn("waitForPreCommitMessage ChainAt errored: ", err)
    }

    return nil
}

該過程主要是等待之前 生成扇區唯一副本和憑據廣播到鏈上的消息,等待之后,根據當前的區塊的高度加上一個延時變量(預估5分鍾左右),生成在該區塊時執行的方法,和回滾的方法。狀態更改 Committing

Committing

func (m *Miner) handleCommitting(ctx context.Context, sector SectorInfo) *sectorUpdate {
    ...
    // 產生復制證明憑據
    proof, err := m.sb.SealCommit(sector.SectorID, sector.Ticket.SB(), sector.Seed.SB(), sector.pieceInfos(), sector.rspco())
    if err != nil {
        return sector.upd().to(api.SealCommitFailed).error(xerrors.Errorf("computing seal proof failed: %w", err))
    }
    ...
    // 把包含證明文件的消息廣播
    smsg, err := m.api.MpoolPushMessage(ctx, msg)
    if err != nil {
        return sector.upd().to(api.CommitFailed).error(xerrors.Errorf("pushing message to mpool: %w", err))
    }

    // 更改狀態
    return sector.upd().to(api.CommitWait).state(func(info *SectorInfo) {
        mcid := smsg.Cid()
        info.CommitMessage = &mcid
        info.Proof = proof
    })
}


// 這個是重點關注的方法,產生復制證明的證明憑據
func (sb *SectorBuilder) SealCommit(sectorID uint64, ticket SealTicket, seed SealSeed, pieces []PublicPieceInfo, rspco RawSealPreCommitOutput) (proof []byte, err error) {
    // 產生一個工作任務
    call := workerCall{
        task: WorkerTask{
            Type:       WorkerCommit,
            TaskID:     atomic.AddUint64(&sb.taskCtr, 1),
            SectorID:   sectorID,
            SealTicket: ticket,
            Pieces:     pieces,

            SealSeed: seed,
            Rspco:    rspco,
        },
        ret: make(chan SealRes),
    }

    atomic.AddInt32(&sb.commitWait, 1)

    select { // prefer remote
    case sb.commitTasks <- call:
        proof, err = sb.sealCommitRemote(call)
    default:
        sb.checkRateLimit()

        rl := sb.rateLimit
        if sb.noCommit {
            rl = make(chan struct{})
        }
        start := time.Now()
        log.Warn(xerrors.Errorf("[qz2.6]: start to commit :%v", start))

        select { // use whichever is available
        case sb.commitTasks <- call: // 遠程work產生復制證明憑據
            proof, err = sb.sealCommitRemote(call)
            log.Warn(xerrors.Errorf("[qz2.7]: remote commit :%v", time.Since(start).Milliseconds()))

        case rl <- struct{}{}:  // 默認本地work產生復制證明的憑據,內部主要是調用 rust部分的代碼
            proof, err = sb.sealCommitLocal(sectorID, ticket, seed, pieces, rspco)
            log.Warn(xerrors.Errorf("[qz2.8]: local commit time :%v", time.Since(start).Milliseconds()))

        }
    }
    if err != nil {
        return nil, xerrors.Errorf("commit: %w", err)
    }

    return proof, nil
}

等待鏈上的消息,之后產生復制證明的憑據,並廣播到鏈上去

CommitWait

主要是接受鏈上的消息,判斷狀態,將扇區狀態更改為 proving,存儲成功

func (m *Miner) handleCommitWait(ctx context.Context, sector SectorInfo) *sectorUpdate {
    ...
    // 等待鏈上廣播來的消息
    mw, err := m.api.StateWaitMsg(ctx, *sector.CommitMessage)
    if err != nil {
        return sector.upd().to(api.CommitFailed).error(xerrors.Errorf("failed to wait for porep inclusion: %w", err))
    }

   // 判斷狀態
    if mw.Receipt.ExitCode != 0 {
        log.Errorf("UNHANDLED: submitting sector proof failed (exit=%d, msg=%s) (t:%x; s:%x(%d); p:%x)", mw.Receipt.ExitCode, sector.CommitMessage, sector.Ticket.TicketBytes, sector.Seed.TicketBytes, sector.Seed.BlockHeight, sector.Proof)
        return sector.upd().fatal(xerrors.Errorf("UNHANDLED: submitting sector proof failed (exit: %d)", mw.Receipt.ExitCode))
    }
    // 最終產生算力,更改扇區狀態
    return sector.upd().to(api.Proving).state(func(info *SectorInfo) {
    })
}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM