Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat!: optimize pending packets storage on consumer + migration #1037

Merged
merged 29 commits into from
Jul 13, 2023
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
5bb3e14
wip
shaspitz Jun 19, 2023
6b6eabe
tests
shaspitz Jun 19, 2023
84e0250
tests
shaspitz Jun 19, 2023
0eafb05
update genesis tests
shaspitz Jun 19, 2023
1af896f
comments
shaspitz Jun 19, 2023
cf2359d
migration and changelog
shaspitz Jun 19, 2023
1c7e7df
migration test
shaspitz Jun 19, 2023
84d4a75
Merge branch 'main' into shawn/optimize-pending-packets-storage
shaspitz Jun 20, 2023
98dd226
lints
shaspitz Jun 20, 2023
35a464b
Merge branch 'main' into shawn/optimize-pending-packets-storage
shaspitz Jun 21, 2023
d4393e9
merge fixes
shaspitz Jun 21, 2023
18f852a
clean
shaspitz Jun 21, 2023
0e1bd28
Merge branch 'main' into shawn/optimize-pending-packets-storage
shaspitz Jun 22, 2023
9737d0f
Update ccv.pb.go
shaspitz Jun 22, 2023
189c1da
add to ADR
shaspitz Jun 22, 2023
1c33106
address some PR comments
shaspitz Jun 23, 2023
394f709
Merge branch 'main' into shawn/optimize-pending-packets-storage
shaspitz Jun 23, 2023
a42229a
comment
shaspitz Jun 26, 2023
05acaba
Merge branch 'main' into shawn/optimize-pending-packets-storage
shaspitz Jun 29, 2023
d918fb4
Merge branch 'main' into shawn/optimize-pending-packets-storage
shaspitz Jul 1, 2023
f517201
Update ccv.pb.go
shaspitz Jul 1, 2023
e3f2133
Merge branch 'main' into shawn/optimize-pending-packets-storage
shaspitz Jul 6, 2023
747e8aa
Merge branch 'main' into shawn/optimize-pending-packets-storage
shaspitz Jul 10, 2023
d84b29d
lint
shaspitz Jul 11, 2023
8084b49
Merge branch 'main' into shawn/optimize-pending-packets-storage
shaspitz Jul 13, 2023
ab23828
Update x/ccv/consumer/keeper/keeper.go
shaspitz Jul 13, 2023
98c64d4
Merge branch 'main' into shawn/optimize-pending-packets-storage
shaspitz Jul 13, 2023
db04b8f
byte wise
shaspitz Jul 13, 2023
e6f696e
Merge branch 'shawn/optimize-pending-packets-storage' of https://gith…
shaspitz Jul 13, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

Add an entry to the unreleased section whenever merging a PR to main that is not targeted at a specific release. These entries will eventually be included in a release.

* (feat!) optimize pending packets storage on consumer, with migration! [#1037](https://github.com/cosmos/interchain-security/pull/1037)

## v3.0.0

Date: June 21st, 2023
Expand Down
16 changes: 16 additions & 0 deletions docs/docs/adrs/adr-008-throttle-retries.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ title: Throttle with retries
## Changelog

* 6/9/23: Initial draft
* 6/22/23: added note on consumer pending packets storage optimization

## Status

Expand Down Expand Up @@ -46,6 +47,21 @@ With the behavior described, we maintain very similar behavior to the current th

In the normal case, when no or a few slash packets are being sent, the VSCMaturedPackets will not be delayed, and hence unbonding will not be delayed.

### Consumer pending packets storage optimization

In addition to the mentioned consumer changes above. An optimization will need to be made to the consumer's pending packets storage to properly implement the feature from this ADR.

The consumer ccv module previously queued "pending packets" to be sent on each endblocker in [SendPackets](https://github.com/cosmos/interchain-security/blob/3bc4e7135066d848aac60b0787364c07157fd36d/x/ccv/consumer/keeper/relay.go#L178). These packets are queued in state with a protobuf list of `ConsumerPacketData`. For a single append operation, the entire list is deserialized, then a packet is appended to that list, and the list is serialized again. See older version of [AppendPendingPacket](https://github.com/cosmos/interchain-security/blob/05c2dae7c6372b1252b9e97215d07c6aa7618f33/x/ccv/consumer/keeper/keeper.go#L606). That is, a single append operation has O(N) complexity, where N is the size of the list.

This poor append performance isn't a problem when the pending packets list is small. But with this ADR being implemented, the pending packets list could potentially grow to the order of thousands of entries, in the scenario that a slash packet is bouncing.

We can improve the append time for this queue by converting it from a protobuf-esq list, to a queue implemented with sdk-esq code. The idea is to persist an uint64 index that will be incremented each time you queue up a packet. You can think of this as storing the tail of the queue. Then, packet data will be keyed by that index, making the data naturally ordered byte-wite for sdk's iterator. The index will also be stored in the packet data value bytes, so that the index can later be used to delete certain packets from the queue.
shaspitz marked this conversation as resolved.
Show resolved Hide resolved

Two things are achieved with this approach:

* More efficient packet append/enqueue times
* The ability to delete select packets from the queue (previously all packets were deleted at once)

### Provider changes

The main change needed for the provider is the removal of queuing logic for slash and vsc matured packets upon being received.
Expand Down
5 changes: 4 additions & 1 deletion proto/interchain_security/ccv/v1/ccv.proto
Original file line number Diff line number Diff line change
Expand Up @@ -56,17 +56,20 @@ message SlashPacketData {
// unbonding operations.
message MaturedUnbondingOps { repeated uint64 ids = 1; }

// ConsumerPacketData contains a consumer packet data and a type tag
// ConsumerPacketData contains a consumer packet data, type tag, and index for storage.
message ConsumerPacketData {
ConsumerPacketDataType type = 1;

oneof data {
SlashPacketData slashPacketData = 2;
VSCMaturedPacketData vscMaturedPacketData = 3;
}
uint64 idx = 4;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Extra care should be taken in reviewing this PR, as it does mutate a proto file. Note this index field was added to the ConsumerPacketData type so that we can use that index in PendingDataPacketsKey, to implement a queue with constant append time. See AppendPendingPacket

}


// ConsumerPacketDataList is a list of consumer packet data packets.
// NOTE: It is only used for exporting / importing state in InitGenesis and ExportGenesis.
message ConsumerPacketDataList {
repeated ConsumerPacketData list = 1 [ (gogoproto.nullable) = false ];
}
Expand Down
5 changes: 2 additions & 3 deletions tests/integration/expired_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func (s *CCVTestSuite) TestConsumerPacketSendExpiredClient() {
// check that the packets were added to the list of pending data packets
consumerPackets := consumerKeeper.GetPendingPackets(s.consumerCtx())
s.Require().NotEmpty(consumerPackets)
s.Require().Equal(2, len(consumerPackets.GetList()), "unexpected number of pending data packets")
s.Require().Len(consumerPackets, 2, "unexpected number of pending data packets")

// try to send slash packet for downtime infraction
addr := ed25519.GenPrivKey().PubKey().Address()
Expand All @@ -137,7 +137,7 @@ func (s *CCVTestSuite) TestConsumerPacketSendExpiredClient() {
// check that the packets were added to the list of pending data packets
consumerPackets = consumerKeeper.GetPendingPackets(s.consumerCtx())
s.Require().NotEmpty(consumerPackets)
s.Require().Equal(4, len(consumerPackets.GetList()), "unexpected number of pending data packets")
s.Require().Len(consumerPackets, 4, "unexpected number of pending data packets")

// upgrade expired client to the consumer
upgradeExpiredClient(s, Provider)
Expand All @@ -148,7 +148,6 @@ func (s *CCVTestSuite) TestConsumerPacketSendExpiredClient() {
// check that the list of pending data packets is emptied
consumerPackets = consumerKeeper.GetPendingPackets(s.consumerCtx())
s.Require().Empty(consumerPackets)
s.Require().Equal(0, len(consumerPackets.GetList()), "unexpected number of pending data packets")

// relay all packet from consumer to provider
relayAllCommittedPackets(s, s.consumerChain, s.path, ccv.ConsumerPortID, s.path.EndpointA.ChannelID, 4)
Expand Down
24 changes: 12 additions & 12 deletions tests/integration/slashing.go
Original file line number Diff line number Diff line change
Expand Up @@ -484,15 +484,15 @@ func (suite *CCVTestSuite) TestValidatorDowntime() {

// check that slash packet is queued
pendingPackets := consumerKeeper.GetPendingPackets(ctx)
suite.Require().NotEmpty(pendingPackets.List, "pending packets empty")
suite.Require().Len(pendingPackets.List, 1, "pending packets len should be 1 is %d", len(pendingPackets.List))
suite.Require().NotEmpty(pendingPackets, "pending packets empty")
suite.Require().Len(pendingPackets, 1, "pending packets len should be 1 is %d", len(pendingPackets))

// clear queue, commit packets
suite.consumerApp.GetConsumerKeeper().SendPackets(ctx)

// check queue was cleared
pendingPackets = suite.consumerApp.GetConsumerKeeper().GetPendingPackets(ctx)
suite.Require().Empty(pendingPackets.List, "pending packets NOT empty")
suite.Require().Empty(pendingPackets, "pending packets NOT empty")

// verify that the slash packet was sent
gotCommit := consumerIBCKeeper.ChannelKeeper.GetPacketCommitment(ctx, ccv.ConsumerPortID, channelID, seq)
Expand Down Expand Up @@ -571,15 +571,15 @@ func (suite *CCVTestSuite) TestValidatorDoubleSigning() {

// check slash packet is queued
pendingPackets := suite.consumerApp.GetConsumerKeeper().GetPendingPackets(ctx)
suite.Require().NotEmpty(pendingPackets.List, "pending packets empty")
suite.Require().Len(pendingPackets.List, 1, "pending packets len should be 1 is %d", len(pendingPackets.List))
suite.Require().NotEmpty(pendingPackets, "pending packets empty")
suite.Require().Len(pendingPackets, 1, "pending packets len should be 1 is %d", len(pendingPackets))

// clear queue, commit packets
suite.consumerApp.GetConsumerKeeper().SendPackets(ctx)

// check queue was cleared
pendingPackets = suite.consumerApp.GetConsumerKeeper().GetPendingPackets(ctx)
suite.Require().Empty(pendingPackets.List, "pending packets NOT empty")
suite.Require().Empty(pendingPackets, "pending packets NOT empty")

// check slash packet is sent
gotCommit := suite.consumerApp.GetIBCKeeper().ChannelKeeper.GetPacketCommitment(ctx, ccv.ConsumerPortID, channelID, seq)
Expand Down Expand Up @@ -634,7 +634,7 @@ func (suite *CCVTestSuite) TestQueueAndSendSlashPacket() {
// the downtime slash request duplicates
dataPackets := consumerKeeper.GetPendingPackets(ctx)
suite.Require().NotEmpty(dataPackets)
suite.Require().Len(dataPackets.GetList(), 12)
suite.Require().Len(dataPackets, 12)

// save consumer next sequence
seq, _ := consumerIBCKeeper.ChannelKeeper.GetNextSequenceSend(ctx, ccv.ConsumerPortID, channelID)
Expand All @@ -661,7 +661,7 @@ func (suite *CCVTestSuite) TestQueueAndSendSlashPacket() {
// check that pending data packets got cleared
dataPackets = consumerKeeper.GetPendingPackets(ctx)
suite.Require().Empty(dataPackets)
suite.Require().Len(dataPackets.GetList(), 0)
suite.Require().Len(dataPackets, 0)
}

// TestCISBeforeCCVEstablished tests that the consumer chain doesn't panic or
Expand All @@ -672,14 +672,14 @@ func (suite *CCVTestSuite) TestCISBeforeCCVEstablished() {

// Check pending packets is empty
pendingPackets := consumerKeeper.GetPendingPackets(suite.consumerCtx())
suite.Require().Len(pendingPackets.List, 0)
suite.Require().Len(pendingPackets, 0)

consumerKeeper.SlashWithInfractionReason(suite.consumerCtx(), []byte{0x01, 0x02, 0x3},
66, 4324, sdk.MustNewDecFromStr("0.05"), stakingtypes.Infraction_INFRACTION_DOWNTIME)

// Check slash packet was queued
pendingPackets = consumerKeeper.GetPendingPackets(suite.consumerCtx())
suite.Require().Len(pendingPackets.List, 1)
suite.Require().Len(pendingPackets, 1)

// Pass 5 blocks, confirming the consumer doesn't panic
for i := 0; i < 5; i++ {
Expand All @@ -688,7 +688,7 @@ func (suite *CCVTestSuite) TestCISBeforeCCVEstablished() {

// Check packet is still queued
pendingPackets = consumerKeeper.GetPendingPackets(suite.consumerCtx())
suite.Require().Len(pendingPackets.List, 1)
suite.Require().Len(pendingPackets, 1)

// establish ccv channel
suite.SetupCCVChannel(suite.path)
Expand All @@ -697,5 +697,5 @@ func (suite *CCVTestSuite) TestCISBeforeCCVEstablished() {
// Pass one more block, and confirm the packet is sent now that ccv channel is established
suite.consumerChain.NextBlock()
pendingPackets = consumerKeeper.GetPendingPackets(suite.consumerCtx())
suite.Require().Len(pendingPackets.List, 0)
suite.Require().Len(pendingPackets, 0)
}
16 changes: 12 additions & 4 deletions x/ccv/consumer/keeper/genesis.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,12 @@ func (k Keeper) InitGenesis(ctx sdk.Context, state *consumertypes.GenesisState)
k.SetLastTransmissionBlockHeight(ctx, state.LastTransmissionBlockHeight)
}

// set pending consumer pending packets
// Set pending consumer packets, using the depreciated ConsumerPacketDataList type
// that exists for genesis.
// note that the list includes pending mature VSC packet only if the handshake is completed
k.AppendPendingPacket(ctx, state.PendingConsumerPackets.List...)
for _, packet := range state.PendingConsumerPackets.List {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Genesis methods still expect the depreciated type ConsumerPacketDataList to reduce the complexity of how we'd actually deploy this PR

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See my comment on ccv.proto

k.AppendPendingPacket(ctx, packet.Type, packet.Data)
}

// set height to valset update id mapping
for _, h2v := range state.HeightToValsetUpdateId {
Expand Down Expand Up @@ -121,6 +124,11 @@ func (k Keeper) ExportGenesis(ctx sdk.Context) (genesis *consumertypes.GenesisSt
// export the current validator set
valset := k.MustGetCurrentValidatorsAsABCIUpdates(ctx)

// export pending packets using the depreciated ConsumerPacketDataList type
pendingPackets := k.GetPendingPackets(ctx)
pendingPacketsDepreciated := ccv.ConsumerPacketDataList{}
pendingPacketsDepreciated.List = append(pendingPacketsDepreciated.List, pendingPackets...)

// export all the states created after a provider channel got established
if channelID, ok := k.GetProviderChannel(ctx); ok {
clientID, found := k.GetProviderClientID(ctx)
Expand All @@ -135,7 +143,7 @@ func (k Keeper) ExportGenesis(ctx sdk.Context) (genesis *consumertypes.GenesisSt
k.GetAllPacketMaturityTimes(ctx),
valset,
k.GetAllHeightToValsetUpdateIDs(ctx),
k.GetPendingPackets(ctx),
pendingPacketsDepreciated,
k.GetAllOutstandingDowntimes(ctx),
k.GetLastTransmissionBlockHeight(ctx),
params,
Expand All @@ -155,7 +163,7 @@ func (k Keeper) ExportGenesis(ctx sdk.Context) (genesis *consumertypes.GenesisSt
nil,
valset,
k.GetAllHeightToValsetUpdateIDs(ctx),
k.GetPendingPackets(ctx),
pendingPacketsDepreciated,
nil,
consumertypes.LastTransmissionBlockHeight{},
params,
Expand Down
27 changes: 23 additions & 4 deletions x/ccv/consumer/keeper/genesis_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,12 @@ func TestInitGenesis(t *testing.T) {
func(ctx sdk.Context, ck consumerkeeper.Keeper, gs *consumertypes.GenesisState) {
assertConsumerPortIsBound(t, ctx, &ck)

require.Equal(t, pendingDataPackets, ck.GetPendingPackets(ctx))
obtainedPendingPackets := ck.GetPendingPackets(ctx)
for idx, expectedPacketData := range pendingDataPackets.List {
require.Equal(t, expectedPacketData.Type, obtainedPendingPackets[idx].Type)
require.Equal(t, expectedPacketData.Data, obtainedPendingPackets[idx].Data)
}

assertHeightValsetUpdateIDs(t, ctx, &ck, defaultHeightValsetUpdateIDs)
assertProviderClientID(t, ctx, &ck, provClientID)
require.Equal(t, validator.Address.Bytes(), ck.GetAllCCValidator(ctx)[0].Address)
Expand Down Expand Up @@ -186,7 +191,12 @@ func TestInitGenesis(t *testing.T) {
require.Equal(t, provChannelID, gotChannelID)

require.True(t, ck.PacketMaturityTimeExists(ctx, matPackets[0].VscId, matPackets[0].MaturityTime))
require.Equal(t, pendingDataPackets, ck.GetPendingPackets(ctx))

obtainedPendingPackets := ck.GetPendingPackets(ctx)
for idx, expectedPacketData := range pendingDataPackets.List {
require.Equal(t, expectedPacketData.Type, obtainedPendingPackets[idx].Type)
require.Equal(t, expectedPacketData.Data, obtainedPendingPackets[idx].Data)
}

require.Equal(t, gs.OutstandingDowntimeSlashing, ck.GetAllOutstandingDowntimes(ctx))

Expand Down Expand Up @@ -252,12 +262,16 @@ func TestExportGenesis(t *testing.T) {
Data: &ccv.ConsumerPacketData_SlashPacketData{
SlashPacketData: ccv.NewSlashPacketData(abciValidator, vscID, stakingtypes.Infraction_INFRACTION_DOWNTIME),
},
Idx: 0,
},
{
Type: ccv.VscMaturedPacket,
Data: &ccv.ConsumerPacketData_VscMaturedPacketData{
VscMaturedPacketData: ccv.NewVSCMaturedPacketData(vscID),
},
// This idx is a part of the expected genesis state.
// If the keeper is correctly storing consumer packet data, indexes should be populated.
Idx: 1,
},
},
}
Expand Down Expand Up @@ -291,7 +305,10 @@ func TestExportGenesis(t *testing.T) {
ck.SetCCValidator(ctx, cVal)
ck.SetParams(ctx, params)

ck.AppendPendingPacket(ctx, consPackets.List...)
for _, packet := range consPackets.List {
ck.AppendPendingPacket(ctx, packet.Type, packet.Data)
}

ck.SetHeightValsetUpdateID(ctx, defaultHeightValsetUpdateIDs[0].Height, defaultHeightValsetUpdateIDs[0].ValsetUpdateId)
},
consumertypes.NewRestartGenesisState(
Expand Down Expand Up @@ -321,7 +338,9 @@ func TestExportGenesis(t *testing.T) {
ck.SetHeightValsetUpdateID(ctx, updatedHeightValsetUpdateIDs[0].Height, updatedHeightValsetUpdateIDs[0].ValsetUpdateId)
ck.SetHeightValsetUpdateID(ctx, updatedHeightValsetUpdateIDs[1].Height, updatedHeightValsetUpdateIDs[1].ValsetUpdateId)

ck.AppendPendingPacket(ctx, consPackets.List...)
for _, packet := range consPackets.List {
ck.AppendPendingPacket(ctx, packet.Type, packet.Data)
}

// populate the required states for an established CCV channel
ck.SetPacketMaturityTime(ctx, matPackets[0].VscId, matPackets[0].MaturityTime)
Expand Down
90 changes: 61 additions & 29 deletions x/ccv/consumer/keeper/keeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -592,48 +592,80 @@ func (k Keeper) GetAllValidators(ctx sdk.Context) (validators []stakingtypes.Val
return validators
}

// SetPendingPackets sets the pending CCV packets
func (k Keeper) SetPendingPackets(ctx sdk.Context, packets ccv.ConsumerPacketDataList) {
// getAndIncrementPendingPacketsIdx returns the current pending packets index and increments it.
// This index is used for implementing a FIFO queue of pending packets in the KV store.
func (k Keeper) getAndIncrementPendingPacketsIdx(ctx sdk.Context) (toReturn uint64) {
store := ctx.KVStore(k.storeKey)
bz, err := packets.Marshal()
if err != nil {
// This should never happen
panic(fmt.Errorf("failed to marshal ConsumerPacketDataList: %w", err))
bz := store.Get(types.PendingPacketsIndexKey())
if bz == nil {
toReturn = 0
} else {
toReturn = sdk.BigEndianToUint64(bz)
}
shaspitz marked this conversation as resolved.
Show resolved Hide resolved
store.Set(types.PendingDataPacketsKey(), bz)
toStore := toReturn + 1
store.Set(types.PendingPacketsIndexKey(), sdk.Uint64ToBigEndian(toStore))
return toReturn
}

// GetPendingPackets returns the pending CCV packets from the store
func (k Keeper) GetPendingPackets(ctx sdk.Context) ccv.ConsumerPacketDataList {
var packets ccv.ConsumerPacketDataList

// GetPendingPackets returns ALL the pending CCV packets from the store
func (k Keeper) GetPendingPackets(ctx sdk.Context) []ccv.ConsumerPacketData {
var packets []ccv.ConsumerPacketData
store := ctx.KVStore(k.storeKey)
bz := store.Get(types.PendingDataPacketsKey())
if bz == nil {
return packets
// Note: PendingDataPacketsBytePrefix is the correct prefix, NOT PendingDataPacketsByteKey.
// See consistency with PendingDataPacketsKey().
iterator := sdk.KVStorePrefixIterator(store, []byte{types.PendingDataPacketsBytePrefix})
defer iterator.Close()
for ; iterator.Valid(); iterator.Next() {
var packet ccv.ConsumerPacketData
bz := iterator.Value()
err := packet.Unmarshal(bz)
if err != nil {
// An error here would indicate something is very wrong,
panic(fmt.Errorf("failed to unmarshal pending data packet: %w", err))
}
packets = append(packets, packet)
}
return packets
}

err := packets.Unmarshal(bz)
if err != nil {
// An error here would indicate something is very wrong,
// the PendingPackets are assumed to be correctly serialized in SetPendingPackets.
panic(fmt.Errorf("failed to unmarshal pending data packets: %w", err))
// DeletePendingDataPackets deletes pending data packets with given indexes
func (k Keeper) DeletePendingDataPackets(ctx sdk.Context, idxs ...uint64) {
store := ctx.KVStore(k.storeKey)
for _, idx := range idxs {
store.Delete(types.PendingDataPacketsKey(idx))
}

return packets
}

// DeletePendingDataPackets clears the pending data packets in store
func (k Keeper) DeletePendingDataPackets(ctx sdk.Context) {
func (k Keeper) DeleteAllPendingDataPackets(ctx sdk.Context) {
shaspitz marked this conversation as resolved.
Show resolved Hide resolved
store := ctx.KVStore(k.storeKey)
store.Delete(types.PendingDataPacketsKey())
// Note: PendingDataPacketsBytePrefix is the correct prefix, NOT PendingDataPacketsByteKey.
// See consistency with PendingDataPacketsKey().
iterator := sdk.KVStorePrefixIterator(store, []byte{types.PendingDataPacketsBytePrefix})
keysToDel := [][]byte{}
defer iterator.Close()
for ; iterator.Valid(); iterator.Next() {
keysToDel = append(keysToDel, iterator.Key())
}
for _, key := range keysToDel {
store.Delete(key)
}
}

// AppendPendingDataPacket appends the given data packet to the pending data packets in store
func (k Keeper) AppendPendingPacket(ctx sdk.Context, packet ...ccv.ConsumerPacketData) {
pending := k.GetPendingPackets(ctx)
list := append(pending.GetList(), packet...)
k.SetPendingPackets(ctx, ccv.ConsumerPacketDataList{List: list})
// AppendPendingPacket enqueues the given data packet to the end of the pending data packets queue
func (k Keeper) AppendPendingPacket(ctx sdk.Context, packetType ccv.ConsumerPacketDataType, data ccv.ExportedIsConsumerPacketData_Data) {
cpd := ccv.NewConsumerPacketData(
packetType,
data,
k.getAndIncrementPendingPacketsIdx(ctx),
)
key := types.PendingDataPacketsKey(cpd.Idx)
store := ctx.KVStore(k.storeKey)
bz, err := cpd.Marshal()
if err != nil {
// This should never happen
panic(fmt.Errorf("failed to marshal ConsumerPacketData: %w", err))
}
store.Set(key, bz)
}

func (k Keeper) MarkAsPrevStandaloneChain(ctx sdk.Context) {
Expand Down
Loading
Loading