流程圖
Incoming
lotus-miner-storage,首先調用 PledgeSector 通過類似微服務的方式調用
在 cmd/lotus-storage-miner/sectors.go 發出生成扇區的命令,通過微服務的方式調用
var pledgeSectorCmd = &cli.Command{
Name: "pledge-sector",
Usage: "store random data in a sector",
Action: func(cctx *cli.Context) error {
// 獲取miner網關地址
nodeApi, closer, err := lcli.GetStorageMinerAPI(cctx)
if err != nil {
return err
}
defer closer()
ctx := lcli.ReqContext(cctx)
return nodeApi.PledgeSector(ctx)
},
}
在 storage/garbage.go 生成新的扇區,獲取分片數組大小,扇區id,該過程關鍵在調用內部方法 m.pledgeSector產生數據,填滿扇區數據。
func (m *Miner) PledgeSector() error {
go func() {
ctx := context.TODO() // we can't use the context from command which invokes
// this, as we run everything here async, and it's cancelled when the
// command exits
// 一共多少個分片,是否跟生成默克爾書的分塊對應?
size := sectorbuilder.UserBytesForSectorSize(m.sb.SectorSize())
// 扇區id
sid, err := m.sb.AcquireSectorId()
if err != nil {
log.Errorf("%+v", err)
return
}
// 產生分片數組,該方法中會將生成的簽名信息提交到鏈上,重點方法
pieces, err := m.pledgeSector(ctx, sid, []uint64{}, size)
if err != nil {
log.Errorf("%+v", err)
return
}
// 產生新的扇區
if err := m.newSector(context.TODO(), sid, pieces[0].DealID, pieces[0].ppi()); err != nil {
log.Errorf("%+v", err)
return
}
}()
return nil
}
在重點查看m.pledgeSector,該方法主要作用是為每隔扇區生成一個憑據,並把每隔憑據封裝成一個交易信息,提交到鏈上,並解析出鏈上的提交信息進行判斷交易id是否一致,存儲數據;返回信息為分片信息數組
func (m *Miner) pledgeSector(ctx context.Context, sectorID uint64, existingPieceSizes []uint64, sizes ...uint64) ([]Piece, error) {
...
// 將交易信息提交到鏈上
params, aerr := actors.SerializeParams(&actors.PublishStorageDealsParams{
Deals: deals,
})
...
//等待鏈上反饋消息
r, err := m.api.StateWaitMsg(ctx, smsg.Cid())
if err != nil {
return nil, err
}
...
//從鏈上消息中解析出DealID,看是否一致
var resp actors.PublishStorageDealResponse
if err := resp.UnmarshalCBOR(bytes.NewReader(r.Receipt.Return)); err != nil {
return nil, err
}
if len(resp.DealIDs) != len(sizes) {
return nil, xerrors.New("got unexpected number of DealIDs from PublishStorageDeals")
}
....
out := make([]Piece, len(sizes))
//根據鏈上確認的結果,首先將piece的信息存入到sector里
for i, size := range sizes {
//填充數據
ppi, err := m.sb.AddPiece(size, sectorID, io.LimitReader(rand.New(rand.NewSource(42)), int64(size)), existingPieceSizes)
if err != nil {
return nil, err
}
existingPieceSizes = append(existingPieceSizes, size)
out[i] = Piece{
DealID: resp.DealIDs[i],
Size: ppi.Size,
CommP: ppi.CommP[:],
}
}
return out, nil
}
扇區信息生成之后調用 /storage/secotrs.go
//扇區信息生成之后,調用該方法
func (m *Miner) onSectorIncoming(sector *SectorInfo) {
// 判斷id是否存在
has, err := m.sectors.Has(sector.SectorID)
if err != nil {
return
}
if has {
log.Warnf("SealPiece called more than once for sector %d", sector.SectorID)
return
}
// 把數據寫入 扇區 硬盤中
if err := m.sectors.Begin(sector.SectorID, sector); err != nil {
log.Errorf("sector tracking failed: %s", err)
return
}
go func() {
select {
case m.sectorUpdated <- sectorUpdate{ //更改狀態
newState: api.Packing,
id: sector.SectorID,
}:
case <-m.stop:
log.Warn("failed to send incoming sector update, miner shutting down")
}
}()
}
以上為 Incomeing 過程,主要作用是計算piece大小,產生扇區id信息;把每個piece的大小產生憑據(包含交易信息等),提交到鏈上,進行驗證;之后用piece數組,產生扇區信息;然后把扇區的信息寫入磁盤,將狀態更改 Packing狀態,此過程將消耗大量的 cpu 和內存
Packing
后續的操作主要在 /storage/sector_states.go 文件中
主要是判斷扇區數據是否完整,將沒填滿的扇區填充完整,之后將狀態更改為 Unsealed狀態
// 打包的狀態,將沒喲填滿數據的扇區填滿
func (m *Miner) handlePacking(ctx context.Context, sector SectorInfo) *sectorUpdate {
log.Infow("performing filling up rest of the sector...", "sector", sector.SectorID)
var allocated uint64
for _, piece := range sector.Pieces {
allocated += piece.Size
}
ubytes := sectorbuilder.UserBytesForSectorSize(m.sb.SectorSize())
if allocated > ubytes {
return sector.upd().fatal(xerrors.Errorf("too much data in sector: %d > %d", allocated, ubytes))
}
//fillers From Remaining
fillerSizes, err := fillersFromRem(ubytes - allocated)
if err != nil {
return sector.upd().fatal(err)
}
if len(fillerSizes) > 0 {
log.Warnf("Creating %d filler pieces for sector %d", len(fillerSizes), sector.SectorID)
}
//此處調用 pledgeSector將扇區填滿
pieces, err := m.pledgeSector(ctx, sector.SectorID, sector.existingPieces(), fillerSizes...)
if err != nil {
return sector.upd().fatal(xerrors.Errorf("filling up the sector (%v): %w", fillerSizes, err))
}
//數據填充完畢后,扇區的狀態轉換到了Unsealed狀態
return sector.upd().to(api.Unsealed).state(func(info *SectorInfo) {
info.Pieces = append(info.Pieces, pieces...)
})
}
Unsealed
func (m *Miner) handleUnsealed(ctx context.Context, sector SectorInfo) *sectorUpdate {
log.Infow("performing sector replication...", "sector", sector.SectorID)
// 調用隨機函數返回一個隨機選票(包含區塊高度,和票據)
// 隨機函數在初始化礦工生成的,運用的反射,具體需要詳細查看 ?
ticket, err := m.tktFn(ctx)
if err != nil {
return sector.upd().fatal(err)
}
// 開始進行密封的操作,主要根據源數據產生加密數據,產生一份副本
rspco, err := m.sb.SealPreCommit(sector.SectorID, *ticket, sector.pieceInfos())
if err != nil {
return sector.upd().to(api.SealFailed).error(xerrors.Errorf("seal pre commit failed: %w", err))
}
// 更改狀態,把數據的唯一復制憑據信息,和隨機數相關更新
return sector.upd().to(api.PreCommitting).state(func(info *SectorInfo) {
info.CommD = rspco.CommD[:]
info.CommR = rspco.CommR[:]
info.Ticket = SealTicket{
BlockHeight: ticket.BlockHeight,
TicketBytes: ticket.TicketBytes[:],
}
})
}
/lib/sectorbuilder/sectorbuilder.go 文件中
判斷在 .lotusstorage 文件下幾個目錄是存在 cache,staged,sealed;調用rust庫的代碼生成相關的憑據
func (sb *SectorBuilder) SealPreCommit(sectorID uint64, ticket SealTicket, pieces []PublicPieceInfo) (RawSealPreCommitOutput, error) {
...
// 底層是rust部分的代碼生成憑據信息
rspco, err := sectorbuilder.SealPreCommit(
sb.ssize,
PoRepProofPartitions,
cacheDir,
stagedPath,
sealedPath,
sectorID,
addressToProverID(sb.Miner),
ticket.TicketBytes,
pieces,
)
log.Warn(xerrors.Errorf("[qz2.4]: time to precommit %v at :%v", sectorID, time.Since(start).Milliseconds()))
start = time.Now()
if err != nil {
return RawSealPreCommitOutput{}, xerrors.Errorf("presealing sector %d (%s): %w", sectorID, stagedPath, err)
}
// 返會相關的憑據信息
return RawSealPreCommitOutput(rspco), nil
}
此過程會產生大量的緩存文件用於計算,產生加密后數據的唯一副本相關的憑據,此時並沒有產生復制證明
PreCommitting
主要是講消息廣播到鏈上去,並把該消息cid存起來;主要是讓區塊到了指定的高度驗證數據的有效性
func (m *Miner) handlePreCommitting(ctx context.Context, sector SectorInfo) *sectorUpdate {
// 要發到鏈上的消息
params := &actors.SectorPreCommitInfo{
SectorNumber: sector.SectorID,
CommR: sector.CommR,
SealEpoch: sector.Ticket.BlockHeight,
DealIDs: sector.deals(),
}
enc, aerr := actors.SerializeParams(params)
if aerr != nil {
return sector.upd().to(api.PreCommitFailed).error(xerrors.Errorf("could not serialize commit sector parameters: %w", aerr))
}
// 封裝消息體
msg := &types.Message{
To: m.maddr,
From: m.worker,
Method: actors.MAMethods.PreCommitSector,
Params: enc,
Value: types.NewInt(0), // TODO: need to ensure sufficient collateral
GasLimit: types.NewInt(1000000 /* i dont know help */),
GasPrice: types.NewInt(1),
}
log.Info("submitting precommit for sector: ", sector.SectorID)
// 廣播
smsg, err := m.api.MpoolPushMessage(ctx, msg)
if err != nil {
return sector.upd().to(api.PreCommitFailed).error(xerrors.Errorf("pushing message to mpool: %w", err))
}
// 將受到消息cid 保存
return sector.upd().to(api.PreCommitted).state(func(info *SectorInfo) {
mcid := smsg.Cid()
info.PreCommitMessage = &mcid
})
}
PreCommitted
func (m *Miner) handlePreCommitted(ctx context.Context, sector SectorInfo) *sectorUpdate {
// 等待鏈上的消息
mw, err := m.api.StateWaitMsg(ctx, *sector.PreCommitMessage)
if err != nil {
return sector.upd().to(api.PreCommitFailed).error(err)
}
...
// 區塊的高度+定義的延時量(8)
randHeight := mw.TipSet.Height() + build.InteractivePoRepDelay - 1 // -1 because of how the messages are applied
log.Infof("precommit for sector %d made it on chain, will start proof computation at height %d", sector.SectorID, randHeight)
updateNonce := sector.Nonce
// 一個是在區塊到達一定的高度執行的方法和回滾的方法
err = m.events.ChainAt(func(ctx context.Context, ts *types.TipSet, curH uint64) error {
// 根據區塊高度和ts key生成隨機數
rand, err := m.api.ChainGetRandomness(ctx, ts.Key(), int64(randHeight))
if err != nil {
err = xerrors.Errorf("failed to get randomness for computing seal proof: %w", err)
m.sectorUpdated <- *sector.upd().fatal(err)
return err
}
// 更改狀態
m.sectorUpdated <- *sector.upd().to(api.Committing).setNonce(updateNonce).state(func(info *SectorInfo) {
// 將密封 seed更新
info.Seed = SealSeed{
BlockHeight: randHeight,
TicketBytes: rand,
}
})
updateNonce++
return nil
}, func(ctx context.Context, ts *types.TipSet) error {
log.Warn("revert in interactive commit sector step")
// TODO: need to cancel running process and restart...
return nil
}, build.InteractivePoRepConfidence, mw.TipSet.Height()+build.InteractivePoRepDelay)
if err != nil {
log.Warn("waitForPreCommitMessage ChainAt errored: ", err)
}
return nil
}
該過程主要是等待之前 生成扇區唯一副本和憑據廣播到鏈上的消息,等待之后,根據當前的區塊的高度加上一個延時變量(預估5分鍾左右),生成在該區塊時執行的方法,和回滾的方法。狀態更改 Committing
Committing
func (m *Miner) handleCommitting(ctx context.Context, sector SectorInfo) *sectorUpdate {
...
// 產生復制證明憑據
proof, err := m.sb.SealCommit(sector.SectorID, sector.Ticket.SB(), sector.Seed.SB(), sector.pieceInfos(), sector.rspco())
if err != nil {
return sector.upd().to(api.SealCommitFailed).error(xerrors.Errorf("computing seal proof failed: %w", err))
}
...
// 把包含證明文件的消息廣播
smsg, err := m.api.MpoolPushMessage(ctx, msg)
if err != nil {
return sector.upd().to(api.CommitFailed).error(xerrors.Errorf("pushing message to mpool: %w", err))
}
// 更改狀態
return sector.upd().to(api.CommitWait).state(func(info *SectorInfo) {
mcid := smsg.Cid()
info.CommitMessage = &mcid
info.Proof = proof
})
}
// 這個是重點關注的方法,產生復制證明的證明憑據
func (sb *SectorBuilder) SealCommit(sectorID uint64, ticket SealTicket, seed SealSeed, pieces []PublicPieceInfo, rspco RawSealPreCommitOutput) (proof []byte, err error) {
// 產生一個工作任務
call := workerCall{
task: WorkerTask{
Type: WorkerCommit,
TaskID: atomic.AddUint64(&sb.taskCtr, 1),
SectorID: sectorID,
SealTicket: ticket,
Pieces: pieces,
SealSeed: seed,
Rspco: rspco,
},
ret: make(chan SealRes),
}
atomic.AddInt32(&sb.commitWait, 1)
select { // prefer remote
case sb.commitTasks <- call:
proof, err = sb.sealCommitRemote(call)
default:
sb.checkRateLimit()
rl := sb.rateLimit
if sb.noCommit {
rl = make(chan struct{})
}
start := time.Now()
log.Warn(xerrors.Errorf("[qz2.6]: start to commit :%v", start))
select { // use whichever is available
case sb.commitTasks <- call: // 遠程work產生復制證明憑據
proof, err = sb.sealCommitRemote(call)
log.Warn(xerrors.Errorf("[qz2.7]: remote commit :%v", time.Since(start).Milliseconds()))
case rl <- struct{}{}: // 默認本地work產生復制證明的憑據,內部主要是調用 rust部分的代碼
proof, err = sb.sealCommitLocal(sectorID, ticket, seed, pieces, rspco)
log.Warn(xerrors.Errorf("[qz2.8]: local commit time :%v", time.Since(start).Milliseconds()))
}
}
if err != nil {
return nil, xerrors.Errorf("commit: %w", err)
}
return proof, nil
}
等待鏈上的消息,之后產生復制證明的憑據,並廣播到鏈上去
CommitWait
主要是接受鏈上的消息,判斷狀態,將扇區狀態更改為 proving,存儲成功
func (m *Miner) handleCommitWait(ctx context.Context, sector SectorInfo) *sectorUpdate {
...
// 等待鏈上廣播來的消息
mw, err := m.api.StateWaitMsg(ctx, *sector.CommitMessage)
if err != nil {
return sector.upd().to(api.CommitFailed).error(xerrors.Errorf("failed to wait for porep inclusion: %w", err))
}
// 判斷狀態
if mw.Receipt.ExitCode != 0 {
log.Errorf("UNHANDLED: submitting sector proof failed (exit=%d, msg=%s) (t:%x; s:%x(%d); p:%x)", mw.Receipt.ExitCode, sector.CommitMessage, sector.Ticket.TicketBytes, sector.Seed.TicketBytes, sector.Seed.BlockHeight, sector.Proof)
return sector.upd().fatal(xerrors.Errorf("UNHANDLED: submitting sector proof failed (exit: %d)", mw.Receipt.ExitCode))
}
// 最終產生算力,更改扇區狀態
return sector.upd().to(api.Proving).state(func(info *SectorInfo) {
})
}