Skip to content
This repository has been archived by the owner on Jun 6, 2023. It is now read-only.

Commit

Permalink
improve deal accounting performance
Browse files Browse the repository at this point in the history
  • Loading branch information
whyrusleeping committed Apr 21, 2020
1 parent c64b47b commit 40e0feb
Show file tree
Hide file tree
Showing 5 changed files with 98 additions and 328 deletions.
156 changes: 2 additions & 154 deletions actors/builtin/market/cbor_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

133 changes: 50 additions & 83 deletions actors/builtin/market/market_actor.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,11 @@ func (a Actor) Exports() []interface{} {
builtin.MethodConstructor: a.Constructor,
2: a.AddBalance,
3: a.WithdrawBalance,
4: a.HandleExpiredDeals,
5: a.PublishStorageDeals,
6: a.VerifyDealsOnSectorProveCommit,
7: a.OnMinerSectorsTerminate,
8: a.ComputeDataCommitment,
9: a.HandleInitTimeoutDeals,
10: a.CronTick,
}
}

Expand Down Expand Up @@ -82,7 +81,6 @@ func (a Actor) WithdrawBalance(rt Runtime, params *WithdrawBalanceParams) *adt.E
rt.State().Transaction(&st, func() interface{} {
// Before any operations that check the balance tables for funds, execute all deferred
// deal state updates.
amountSlashedTotal = big.Add(amountSlashedTotal, st.updatePendingDealStatesForParty(rt, nominal))

minBalance := st.GetLockedBalance(rt, nominal)

Expand Down Expand Up @@ -148,7 +146,6 @@ type PublishStorageDealsReturn struct {

// Publish a new set of storage deals (not yet included in a sector).
func (a Actor) PublishStorageDeals(rt Runtime, params *PublishStorageDealsParams) *PublishStorageDealsReturn {
amountSlashedTotal := abi.NewTokenAmount(0)

// Deal message must have a From field identical to the provider of all the deals.
// This allows us to retain and verify only the client's signature in each deal proposal itself.
Expand Down Expand Up @@ -195,7 +192,7 @@ func (a Actor) PublishStorageDeals(rt Runtime, params *PublishStorageDealsParams
rt.Abortf(exitcode.ErrIllegalState, "failed to load proposals array: %s", err)
}

dbp, err := AsSetMultimap(adt.AsStore(rt), st.DealIDsByParty)
dbp, err := AsSetMultimap(adt.AsStore(rt), st.DealOpsByEpoch)
if err != nil {
rt.Abortf(exitcode.ErrIllegalState, "failed to load deal ids set: %s", err)
}
Expand All @@ -216,29 +213,17 @@ func (a Actor) PublishStorageDeals(rt Runtime, params *PublishStorageDealsParams
deal.Proposal.Provider = provider
deal.Proposal.Client = client

// Before any operations that check the balance tables for funds, execute all deferred
// deal state updates.
//
// Note: as an optimization, implementations may cache efficient data structures indicating
// which of the following set of updates are redundant and can be skipped.
amountSlashedTotal = big.Add(amountSlashedTotal, st.updatePendingDealStatesForParty(rt, client))
amountSlashedTotal = big.Add(amountSlashedTotal, st.updatePendingDealStatesForParty(rt, provider))

st.lockBalanceOrAbort(rt, client, deal.Proposal.ClientBalanceRequirement())
st.lockBalanceOrAbort(rt, provider, deal.Proposal.ProviderBalanceRequirement())

id := st.generateStorageDealID()

err := proposals.Set(id, &deal.Proposal)
if err != nil {
if err := proposals.Set(id, &deal.Proposal); err != nil {
rt.Abortf(exitcode.ErrIllegalState, "set deal: %v", err)
}

if err = dbp.Put(client, id); err != nil {
rt.Abortf(exitcode.ErrIllegalState, "set client deal id: %v", err)
}
if err = dbp.Put(provider, id); err != nil {
rt.Abortf(exitcode.ErrIllegalState, "set provider deal id: %v", err)
if err := dbp.Put(deal.Proposal.StartEpoch, id); err != nil {
rt.Abortf(exitcode.ErrIllegalState, "set deal in ops set: %v", err)
}

newDealIds = append(newDealIds, id)
Expand All @@ -254,12 +239,10 @@ func (a Actor) PublishStorageDeals(rt Runtime, params *PublishStorageDealsParams
rt.Abortf(exitcode.ErrIllegalState, "failed to flush deal ids map: %w", err)
}

st.DealIDsByParty = dipc
st.DealOpsByEpoch = dipc
return nil
})

_, code := rt.Send(builtin.BurntFundsActorAddr, builtin.MethodSend, nil, amountSlashedTotal)
builtin.RequireSuccess(rt, code, "failed to burn funds")
return &PublishStorageDealsReturn{newDealIds}
}

Expand Down Expand Up @@ -427,78 +410,62 @@ func (a Actor) OnMinerSectorsTerminate(rt Runtime, params *OnMinerSectorsTermina
return nil
}

type HandleExpiredDealsParams struct {
Deals []abi.DealID // TODO: RLE
}
func (a Actor) CronTick(rt Runtime, params *adt.EmptyValue) *adt.EmptyValue {
rt.ValidateImmediateCallerIs(builtin.CronActorAddr)
amountSlashed := big.Zero()

func (a Actor) HandleExpiredDeals(rt Runtime, params *HandleExpiredDealsParams) *adt.EmptyValue {
rt.ValidateImmediateCallerType(builtin.CallerTypesSignable...)
var slashed abi.TokenAmount
var st State
rt.State().Transaction(&st, func() interface{} {
slashed = st.updatePendingDealStates(rt, params.Deals, rt.CurrEpoch())
return nil
})
dbe, err := AsSetMultimap(adt.AsStore(rt), st.DealOpsByEpoch)
if err != nil {
rt.Abortf(exitcode.ErrIllegalState, "failed to load deal opts set: %s", err)
}

// TODO: award some small portion of slashed to caller as incentive
updatesNeeded := make(map[abi.ChainEpoch][]abi.DealID)

_, code := rt.Send(builtin.BurntFundsActorAddr, builtin.MethodSend, nil, slashed)
builtin.RequireSuccess(rt, code, "failed to burn funds")
return nil
}
for i := st.LastCron; i <= rt.CurrEpoch(); i++ {
if err := dbe.ForEach(i, func(deal abi.DealID) error {
slashAmount, nextEpoch := st.updatePendingDealState(rt, deal, rt.CurrEpoch())
if !slashAmount.IsZero() {
amountSlashed = big.Add(amountSlashed, slashAmount)
}

type HandleInitTimeoutDealsParams struct {
Deals []abi.DealID // TODO: RLE
}
if nextEpoch != 0 {
Assert(nextEpoch <= rt.CurrEpoch())

func (a Actor) HandleInitTimeoutDeals(rt Runtime, params *HandleInitTimeoutDealsParams) *adt.EmptyValue {
rt.ValidateImmediateCallerType(builtin.CallerTypesSignable...)
var st State
var verifiedDeals []*DealProposal
slashedAmount := rt.State().Transaction(&st, func() interface{} {
slashed := abi.NewTokenAmount(0)
for _, dealID := range params.Deals {
deal := st.mustGetDeal(rt, dealID)
state := st.mustGetDealState(rt, dealID)

// Deal has not been activated.
if state.SectorStartEpoch == epochUndefined {
// Now is after StartEpoch when the Deal should have been activated, hence clean up.
if rt.CurrEpoch() > deal.StartEpoch {
// Store VerifiedDeal to restore bytes for VerifiedClient.
if deal.VerifiedDeal {
verifiedDeals = append(verifiedDeals, deal)
}
newlySlashed := st.processDealInitTimedOut(rt, dealID)
big.Add(slashed, newlySlashed)
} else {
// All deals must have timed out.
rt.Abortf(exitcode.ErrIllegalArgument, "not all deals have timed out: %d", dealID)
updatesNeeded[nextEpoch] = append(updatesNeeded[nextEpoch], deal)
}

return nil
}); err != nil {
rt.Abortf(exitcode.ErrIllegalState, "failed to iterate deals for epoch: %s", err)
}
if err := dbe.RemoveAll(i); err != nil {
rt.Abortf(exitcode.ErrIllegalState, "failed to delete deals from set: %s", err)
}
}

return &slashed
}).(abi.TokenAmount)

// TODO: award some small portion of slashed to caller as incentive

// Restore verified dataset allowance for verified clients.
for _, deal := range verifiedDeals {
_, code := rt.Send(
builtin.VerifiedRegistryActorAddr,
builtin.MethodsVerifiedRegistry.RestoreBytes,
&verifreg.RestoreBytesParams{
Address: deal.Client,
DealSize: big.NewIntUnsigned(uint64(deal.PieceSize)),
},
abi.NewTokenAmount(0),
)
builtin.RequireSuccess(rt, code, "failed to restore bytes for verified client: %v", deal.Client)
}
// NB: its okay that we're doing a 'random' golang map iteration here
// because HAMTs and AMTs are insertion order independent, the same set of
// data inserted will always produce the same structure, no matter the order
for epoch, deals := range updatesNeeded {
if err := dbe.PutMany(epoch, deals); err != nil {
rt.Abortf(exitcode.ErrIllegalState, "failed to reinsert deal IDs into epoch set: %s", err)
}
}

ndbec, err := dbe.Root()
if err != nil {
rt.Abortf(exitcode.ErrIllegalState, "failed to get root of deals by epoch set: %s", err)
}

st.DealOpsByEpoch = ndbec

return nil
})

_, code := rt.Send(builtin.BurntFundsActorAddr, builtin.MethodSend, nil, slashedAmount)
builtin.RequireSuccess(rt, code, "failed to burn funds")
_, e := rt.Send(builtin.BurntFundsActorAddr, 0, nil, amountSlashed)
builtin.RequireSuccess(rt, e, "expected send to burnt funds actor to succeed")
return nil
}

Expand Down
Loading

0 comments on commit 40e0feb

Please sign in to comment.