Skip to content

Commit

Permalink
fix: create delegation reverse index over multiple blocks at 1000 ite…
Browse files Browse the repository at this point in the history
…ms per block (#622)

* fix: create delegation reverse index over multiple blocks at 1000 items per block

* chore: update migration to migrate 10000 items per block
  • Loading branch information
PaddyMc committed Aug 30, 2024
1 parent b5b1d9f commit c88c7c3
Show file tree
Hide file tree
Showing 9 changed files with 280 additions and 5 deletions.
2 changes: 1 addition & 1 deletion tests/integration/staking/keeper/determinstic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,7 @@ func TestGRPCValidatorDelegations(t *testing.T) {
ValidatorAddr: validator.OperatorAddress,
}

testdata.DeterministicIterations(f.ctx, t, req, f.queryClient.ValidatorDelegations, 14475, false)
testdata.DeterministicIterations(f.ctx, t, req, f.queryClient.ValidatorDelegations, 17484, false)
}

func TestGRPCValidatorUnbondingDelegations(t *testing.T) {
Expand Down
15 changes: 15 additions & 0 deletions x/staking/keeper/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
abci "github.com/cometbft/cometbft/abci/types"

"github.com/cosmos/cosmos-sdk/telemetry"
sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/cosmos/cosmos-sdk/x/staking/types"
)

Expand All @@ -19,5 +20,19 @@ func (k *Keeper) BeginBlocker(ctx context.Context) error {
// EndBlocker called at every block, update validator set
func (k *Keeper) EndBlocker(ctx context.Context) ([]abci.ValidatorUpdate, error) {
defer telemetry.ModuleMeasureSince(types.ModuleName, telemetry.Now(), telemetry.MetricKeyEndBlocker)

// TODO: Remove migration code and panic catch in the next upgrade
// Wrap the migration call in a function that can recover from panics
func() {
defer func() {
if r := recover(); r != nil {
k.Logger(sdk.UnwrapSDKContext(ctx)).Error("Panic in MigrateDelegationsByValidatorIndex", "recover", r)
}
}()

// Only migrate 10000 items per block to make the migration as fast as possible
k.MigrateDelegationsByValidatorIndex(sdk.UnwrapSDKContext(ctx), 10000)
}()

return k.BlockValidatorUpdates(ctx)
}
7 changes: 7 additions & 0 deletions x/staking/keeper/grpc_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package keeper

import (
"context"
"fmt"
"strings"

"google.golang.org/grpc/codes"
Expand Down Expand Up @@ -108,6 +109,12 @@ func (k Querier) ValidatorDelegations(ctx context.Context, req *types.QueryValid
pageRes *query.PageResponse
)
pageRes, err = query.Paginate(delStore, req.Pagination, func(delAddr, value []byte) error {
// Check the store to see if there is a value stored under the key
key := store.Get(types.NextMigrateDelegationsByValidatorIndexKey)
if key != nil {
// Users will never see this error as if there is an error the function defaults to the legacy implementation below
return fmt.Errorf("store migration is not finished, try again later")
}
bz := store.Get(types.GetDelegationKey(delAddr, valAddr))

var delegation types.Delegation
Expand Down
3 changes: 3 additions & 0 deletions x/staking/keeper/keeper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/golang/mock/gomock"
"github.com/stretchr/testify/suite"

"cosmossdk.io/core/store"
"cosmossdk.io/math"
storetypes "cosmossdk.io/store/types"

Expand Down Expand Up @@ -40,6 +41,7 @@ type KeeperTestSuite struct {
accountKeeper *stakingtestutil.MockAccountKeeper
queryClient stakingtypes.QueryClient
msgServer stakingtypes.MsgServer
storeService store.KVStoreService
}

func (s *KeeperTestSuite) SetupTest() {
Expand Down Expand Up @@ -73,6 +75,7 @@ func (s *KeeperTestSuite) SetupTest() {
s.stakingKeeper = keeper
s.bankKeeper = bankKeeper
s.accountKeeper = accountKeeper
s.storeService = storeService

stakingtypes.RegisterInterfaces(encCfg.InterfaceRegistry)
queryHelper := baseapp.NewQueryServerTestHelper(ctx, encCfg.InterfaceRegistry)
Expand Down
86 changes: 86 additions & 0 deletions x/staking/keeper/validator_index.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package keeper

import (
"fmt"

"cosmossdk.io/store/prefix"

"github.com/cosmos/cosmos-sdk/runtime"
sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/cosmos/cosmos-sdk/x/staking/types"
)

// MigrateDelegationsByValidatorIndex is a migration that runs over multiple blocks,
// this is necessary as to build the reverse index we need to iterate over a large set
// of delegations.
func (k Keeper) MigrateDelegationsByValidatorIndex(ctx sdk.Context, iterationLimit int) error {
store := runtime.KVStoreAdapter(k.storeService.OpenKVStore(ctx))
valStore := prefix.NewStore(store, types.DelegationKey)

// Check the store to see if there is a value stored under the key
key := store.Get(types.NextMigrateDelegationsByValidatorIndexKey)
if key == nil {
return nil
}

// Initialize the counter to 0
iterationCounter := 0

// Start the iterator from the key that is in the store
iterator := valStore.Iterator(key, nil)
defer iterator.Close()

for ; iterator.Valid(); iterator.Next() {
key := iterator.Key()

// Parse the index to use setting the reverse index
del, val, err := ParseDelegationKey(key)
if err != nil {
return err
}

// Set the reverse index in the store
store.Set(types.GetDelegationsByValKey(val, del), []byte{})

iterationCounter++
if iterationCounter >= iterationLimit {
ctx.Logger().Info(fmt.Sprintf("Migrated %d delegations, next key %x", iterationLimit, key))

// Set the key in the store after it has been processed
store.Set(types.NextMigrateDelegationsByValidatorIndexKey, key)
break
}
}

// If the iterator is invalid we have processed the full store
if !iterator.Valid() {
ctx.Logger().Info("Migration completed")
store.Delete(types.NextMigrateDelegationsByValidatorIndexKey)
}

return nil
}

// ParseDelegationKey parses given key and returns delagator, validator address bytes
func ParseDelegationKey(bz []byte) (sdk.AccAddress, sdk.ValAddress, error) {
delAddrLen := bz[0]
bz = bz[1:] // remove the length byte of delegator address.
if len(bz) == 0 {
return nil, nil, fmt.Errorf("no bytes left to parse delegator address: %X", bz)
}

del := bz[:int(delAddrLen)]
bz = bz[int(delAddrLen):] // remove the length byte of a delegator address
if len(bz) == 0 {
return nil, nil, fmt.Errorf("no bytes left to parse delegator address: %X", bz)
}

bz = bz[1:] // remove the validator address bytes.
if len(bz) == 0 {
return nil, nil, fmt.Errorf("no bytes left to parse validator address: %X", bz)
}

val := bz

return del, val, nil
}
150 changes: 150 additions & 0 deletions x/staking/keeper/validator_index_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
package keeper_test

import (
"cosmossdk.io/core/store"
sdkmath "cosmossdk.io/math"
storetypes "cosmossdk.io/store/types"

"github.com/cosmos/cosmos-sdk/codec"
"github.com/cosmos/cosmos-sdk/runtime"
"github.com/cosmos/cosmos-sdk/testutil/sims"
sdk "github.com/cosmos/cosmos-sdk/types"
moduletestutil "github.com/cosmos/cosmos-sdk/types/module/testutil"
"github.com/cosmos/cosmos-sdk/x/staking"
"github.com/cosmos/cosmos-sdk/x/staking/types"
)

// TestDelegationsByValidatorMigration tests the multi block migration of the reverse delegation index
func (s *KeeperTestSuite) TestDelegationsByValidatorMigration() {
require := s.Require()
ctx, keeper := s.ctx, s.stakingKeeper
store := s.storeService.OpenKVStore(ctx)
storeInit := runtime.KVStoreAdapter(store)
cdc := moduletestutil.MakeTestEncodingConfig(staking.AppModuleBasic{}).Codec

accAddrs := sims.CreateIncrementalAccounts(15)
valAddrs := sims.ConvertAddrsToValAddrs(accAddrs[0:1])
var addedDels []types.Delegation

// start at 1 as 0 addr is the validator addr
for i := 1; i < 11; i++ {
del1 := types.NewDelegation(accAddrs[i].String(), valAddrs[0].String(), sdkmath.LegacyNewDec(100))
store.Set(types.GetDelegationKey(accAddrs[i], valAddrs[0]), types.MustMarshalDelegation(cdc, del1))
addedDels = append(addedDels, del1)
}

// number of items we migrate per migration
migrationCadence := 6

// set the key in the store, this happens on the original migration
iterator := storetypes.KVStorePrefixIterator(storeInit, types.DelegationKey)
for ; iterator.Valid(); iterator.Next() {
key := iterator.Key()
store.Set(types.NextMigrateDelegationsByValidatorIndexKey, key[1:])
break
}

// before migration the state of delegations by val index should be empty
dels := getValDelegations(cdc, store, valAddrs[0])
require.Equal(len(dels), 0)

// run the first round of migrations first 6, 10 in store
err := keeper.MigrateDelegationsByValidatorIndex(ctx, migrationCadence)
require.NoError(err)

// after migration the state of delegations by val index should not be empty
dels = getValDelegations(cdc, store, valAddrs[0])
require.Equal(len(dels), migrationCadence)
require.NotEqual(len(dels), len(addedDels))

// check that the next value needed from the store is present
next, err := store.Get(types.NextMigrateDelegationsByValidatorIndexKey)
require.NoError(err)
require.NotNil(next)

// delegate to a validator while the migration is in progress
delagationWhileMigrationInProgress := types.NewDelegation(accAddrs[12].String(), valAddrs[0].String(), sdkmath.LegacyNewDec(100))
keeper.SetDelegation(ctx, delagationWhileMigrationInProgress)
addedDels = append(addedDels, delagationWhileMigrationInProgress)

// remove a delegation from a validator while the migration is in progress that has been processed
removeDelagationWhileMigrationInProgress := types.NewDelegation(accAddrs[3].String(), valAddrs[0].String(), sdkmath.LegacyNewDec(100))
keeper.RemoveDelegation(ctx, removeDelagationWhileMigrationInProgress)
// index in the array is 2
addedDels = deleteElement(addedDels, 2)

// remove the index on the off chance this happens during the migration
removeDelagationWhileMigrationInProgressNextIndex := types.NewDelegation(accAddrs[6].String(), valAddrs[0].String(), sdkmath.LegacyNewDec(100))
keeper.RemoveDelegation(ctx, removeDelagationWhileMigrationInProgressNextIndex)
// index in the array is 4, as we've removed one item
addedDels = deleteElement(addedDels, 4)

// remove a delegation from a validator while the migration is in progress that has not been processed
removeDelagationWhileMigrationInProgressNotProcessed := types.NewDelegation(accAddrs[10].String(), valAddrs[0].String(), sdkmath.LegacyNewDec(100))
keeper.RemoveDelegation(ctx, removeDelagationWhileMigrationInProgressNotProcessed)
// index in the array is 7, as we've removed 2 items
addedDels = deleteElement(addedDels, 7)

// while migrating get state of delegations by val index should be increased by 1
delagationWhileMigrationInProgressCount := getValDelegations(cdc, store, valAddrs[0])
require.Equal(len(delagationWhileMigrationInProgressCount), migrationCadence-1)

// run the second round of migrations
err = keeper.MigrateDelegationsByValidatorIndex(ctx, migrationCadence)
require.NoError(err)

// after migration the state of delegations by val index equal all delegations
dels = getValDelegations(cdc, store, valAddrs[0])
require.Equal(len(dels), len(addedDels))
require.Equal(dels, addedDels)

// check that the next value needed from the store is empty
next, err = store.Get(types.NextMigrateDelegationsByValidatorIndexKey)
require.NoError(err)
require.Nil(next)

// Iterate over the store by delegation key
delKeyCount := 0
iteratorDel := storetypes.KVStorePrefixIterator(storeInit, types.DelegationKey)
for ; iteratorDel.Valid(); iteratorDel.Next() {
delKeyCount++
}

// Iterate over the store by validator key
valKeyCount := 0
iteratorVal := storetypes.KVStorePrefixIterator(storeInit, types.DelegationByValIndexKey)
for ; iteratorVal.Valid(); iteratorVal.Next() {
valKeyCount++
}

// Make sure the store count is the same
require.Equal(valKeyCount, delKeyCount)
}

// deleteElement is a simple helper function to remove items from a slice
func deleteElement(slice []types.Delegation, index int) []types.Delegation {
return append(slice[:index], slice[index+1:]...)
}

// getValidatorDelegations is a helper function to get all delegations using the new v5 staking reverse index
func getValDelegations(cdc codec.Codec, keeperStore store.KVStore, valAddr sdk.ValAddress) []types.Delegation {
var delegations []types.Delegation

store := runtime.KVStoreAdapter(keeperStore)
iterator := storetypes.KVStorePrefixIterator(store, types.GetDelegationsByValPrefixKey(valAddr))

for ; iterator.Valid(); iterator.Next() {
var delegation types.Delegation
valAddr, delAddr, err := types.ParseDelegationsByValKey(iterator.Key())
if err != nil {
panic(err)
}

bz := store.Get(types.GetDelegationKey(delAddr, valAddr))
cdc.MustUnmarshal(bz, &delegation)

delegations = append(delegations, delegation)
}

return delegations
}
7 changes: 4 additions & 3 deletions x/staking/migrations/v5/keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@ const (
)

var (
DelegationKey = []byte{0x31} // key for a delegation
HistoricalInfoKey = []byte{0x50} // prefix for the historical info
DelegationByValIndexKey = []byte{0x71} // key for delegations by a validator
DelegationKey = []byte{0x31} // key for a delegation
HistoricalInfoKey = []byte{0x50} // prefix for the historical info
DelegationByValIndexKey = []byte{0x71} // key for delegations by a validator
NextMigrateDelegationsByValidatorIndexKey = []byte{0x81} // key used to migrate to the new validator index
)

// ParseDelegationKey parses given key and returns delagator, validator address bytes
Expand Down
13 changes: 12 additions & 1 deletion x/staking/migrations/v5/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,28 @@ import (
sdk "github.com/cosmos/cosmos-sdk/types"
)

func migrateDelegationsByValidatorIndex(ctx sdk.Context, store storetypes.KVStore, cdc codec.BinaryCodec) error {
func migrateDelegationsByValidatorIndex(ctx sdk.Context, store storetypes.KVStore, _ codec.BinaryCodec) error {
iterator := storetypes.KVStorePrefixIterator(store, DelegationKey)
iterationLimit := 1000
iterationCounter := 0

for ; iterator.Valid(); iterator.Next() {
key := iterator.Key()

del, val, err := ParseDelegationKey(key)
if err != nil {
return err
}

store.Set(GetDelegationsByValKey(val, del), []byte{})

iterationCounter++
if iterationCounter >= iterationLimit {
ctx.Logger().Info(fmt.Sprintf("Migrated 1000 delegations, next key %x", key[1:]))
// we set the store to the key sans the DelgationKey as we create a prefix store to iterate per block
store.Set(NextMigrateDelegationsByValidatorIndexKey, key[1:])
break
}
}

return nil
Expand Down
2 changes: 2 additions & 0 deletions x/staking/types/keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ var (
ParamsKey = []byte{0x51} // prefix for parameters for module x/staking

DelegationByValIndexKey = []byte{0x71} // key for delegations by a validator

NextMigrateDelegationsByValidatorIndexKey = []byte{0x81} // key used to migrate to the new validator index
)

// UnbondingType defines the type of unbonding operation
Expand Down

0 comments on commit c88c7c3

Please sign in to comment.