Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: twap key refactor #7472

Merged
merged 17 commits into from
Feb 17, 2024
Merged
1 change: 0 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,6 @@ go-mock-update:
mockgen -source=x/poolmanager/types/pool.go -destination=tests/mocks/pool.go -package=mocks
mockgen -source=x/gamm/types/pool.go -destination=tests/mocks/cfmm_pool.go -package=mocks
mockgen -source=x/concentrated-liquidity/types/cl_pool_extensionI.go -destination=tests/mocks/cl_pool.go -package=mocks
mockgen -source=ingest/sqs/domain/pools.go -destination=tests/mocks/sqs_pool.go -package=mocks -mock_names=PoolI=MockSQSPoolI
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Drive by change, this folder no longer lives in Osmosis repo


###############################################################################
### Release ###
Expand Down
16 changes: 0 additions & 16 deletions app/upgrades/v17/upgrades_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,9 +218,6 @@ func (s *UpgradeTestSuite) TestUpgrade() {
clPool2TwapRecordHistoricalPoolIndexPreUpgrade, err := keepers.TwapKeeper.GetAllHistoricalPoolIndexedTWAPsForPoolId(s.Ctx, lastPoolIdMinusOne)
s.Require().NoError(err)

clPoolsTwapRecordHistoricalTimeIndexPreUpgrade, err := keepers.TwapKeeper.GetAllHistoricalTimeIndexedTWAPs(s.Ctx)
s.Require().NoError(err)

// Run upgrade handler.
dummyUpgrade(s)
s.Require().NotPanics(func() {
Expand All @@ -239,15 +236,11 @@ func (s *UpgradeTestSuite) TestUpgrade() {
clPool2TwapRecordHistoricalPoolIndexPostUpgrade, err := keepers.TwapKeeper.GetAllHistoricalPoolIndexedTWAPsForPoolId(s.Ctx, lastPoolIdMinusOne)
s.Require().NoError(err)

clPoolsTwapRecordHistoricalTimeIndexPostUpgrade, err := keepers.TwapKeeper.GetAllHistoricalTimeIndexedTWAPs(s.Ctx)
s.Require().NoError(err)

// check that all TWAP records aren't empty
s.Require().NotEmpty(clPool1TwapRecordPostUpgrade)
s.Require().NotEmpty(clPool1TwapRecordHistoricalPoolIndexPostUpgrade)
s.Require().NotEmpty(clPool2TwapRecordPostUpgrade)
s.Require().NotEmpty(clPool2TwapRecordHistoricalPoolIndexPostUpgrade)
s.Require().NotEmpty(clPoolsTwapRecordHistoricalTimeIndexPostUpgrade)

for _, data := range []struct {
pre, post []types.TwapRecord
Expand All @@ -262,15 +255,6 @@ func (s *UpgradeTestSuite) TestUpgrade() {
}
}

for i := range clPoolsTwapRecordHistoricalTimeIndexPostUpgrade {
record := clPoolsTwapRecordHistoricalTimeIndexPostUpgrade[i]
if record.PoolId == lastPoolIdMinusOne || record.PoolId == lastPoolIdMinusTwo {
assertTwapFlipped(s, clPoolsTwapRecordHistoricalTimeIndexPreUpgrade[i], record)
} else if record.PoolId == lastPoolID {
assertEqual(s, clPoolsTwapRecordHistoricalTimeIndexPreUpgrade[i], record)
}
}

// Retrieve the community pool balance (and the feePool balance) after the upgrade
communityPoolBalancePost := s.App.BankKeeper.GetAllBalances(s.Ctx, communityPoolAddress)
feePoolCommunityPoolPost := s.App.DistrKeeper.GetFeePool(s.Ctx).CommunityPool
Expand Down
13 changes: 7 additions & 6 deletions proto/osmosis/twap/v1beta1/twap_record.proto
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,6 @@ syntax = "proto3";
package osmosis.twap.v1beta1;

import "gogoproto/gogo.proto";
import "google/protobuf/any.proto";
import "cosmos_proto/cosmos.proto";
import "cosmos/base/v1beta1/coin.proto";
import "google/protobuf/timestamp.proto";

option go_package = "github.com/osmosis-labs/osmosis/v23/x/twap/types";
Expand Down Expand Up @@ -90,7 +87,11 @@ message PruningState {
(gogoproto.stdtime) = true,
(gogoproto.moretags) = "yaml:\"last_kept_time\""
];
// last_key_seen is the last key of the TWAP records that were pruned
// before reaching the block's prune limit
bytes last_key_seen = 3;
// Deprecated: This field is deprecated.
bytes last_key_seen = 3 [ deprecated = true ];
// last_seen_pool_id is the pool_id that we will begin pruning in the next
// block. This value starts at the highest pool_id at time of epoch, and
// decreases until it reaches 1. When it reaches 1, the pruning
// process is complete.
uint64 last_seen_pool_id = 4;
}
4 changes: 1 addition & 3 deletions x/twap/keeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,7 @@ func (k Keeper) InitGenesis(ctx sdk.Context, genState *types.GenesisState) {

// ExportGenesis returns the twap module's exported genesis.
func (k Keeper) ExportGenesis(ctx sdk.Context) *types.GenesisState {
// These are ordered in increasing order, guaranteed by the iterator
// that is prefixed by time.
twapRecords, err := k.GetAllHistoricalTimeIndexedTWAPs(ctx)
twapRecords, err := k.getAllHistoricalPoolIndexedTWAPs(ctx)
if err != nil {
panic(err)
}
Expand Down
6 changes: 0 additions & 6 deletions x/twap/keeper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -341,12 +341,6 @@ func (s *TestSuite) getAllHistoricalRecordsForPool(poolId uint64) []types.TwapRe
func (s *TestSuite) validateExpectedRecords(expectedRecords []types.TwapRecord) {
twapKeeper := s.twapkeeper

// validate that the time indexed TWAPs are cleared.
timeIndexedTwaps, err := twapKeeper.GetAllHistoricalTimeIndexedTWAPs(s.Ctx)
s.Require().NoError(err)
s.Require().Len(timeIndexedTwaps, len(expectedRecords))
s.Require().Equal(timeIndexedTwaps, expectedRecords)

// validate that the pool indexed TWAPs are cleared.
poolIndexedTwaps, err := twapKeeper.GetAllHistoricalPoolIndexedTWAPs(s.Ctx)
s.Require().NoError(err)
Expand Down
13 changes: 8 additions & 5 deletions x/twap/listeners.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,14 @@ func (k Keeper) EpochHooks() epochtypes.EpochHooks {
func (hook *epochhook) AfterEpochEnd(ctx sdk.Context, epochIdentifier string, epochNumber int64) error {
if epochIdentifier == hook.k.PruneEpochIdentifier(ctx) {
lastKeptTime := ctx.BlockTime().Add(-hook.k.RecordHistoryKeepPeriod(ctx))
hook.k.SetPruningState(ctx, types.PruningState{
IsPruning: true,
LastKeptTime: lastKeptTime,
LastKeySeen: types.FormatHistoricalTimeIndexTWAPKey(lastKeptTime, 0, "", ""),
})
poolIdToStartFrom := hook.k.poolmanagerKeeper.GetNextPoolId(ctx) - 1
if poolIdToStartFrom > 0 {
hook.k.SetPruningState(ctx, types.PruningState{
IsPruning: true,
LastKeptTime: lastKeptTime,
LastSeenPoolId: poolIdToStartFrom,
})
}
}
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion x/twap/listeners_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ func (s *TestSuite) TestAfterEpochEnd() {

s.twapkeeper.StoreNewRecord(s.Ctx, newestRecord)

twapsBeforeEpoch, err := s.twapkeeper.GetAllHistoricalTimeIndexedTWAPs(s.Ctx)
twapsBeforeEpoch, err := s.twapkeeper.GetAllHistoricalPoolIndexedTWAPs(s.Ctx)
s.Require().NoError(err)
s.Require().Equal(2, len(twapsBeforeEpoch))

Expand Down
97 changes: 40 additions & 57 deletions x/twap/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,8 @@ func (k Keeper) getChangedPools(ctx sdk.Context) []uint64 {
// storeHistoricalTWAP writes a twap to the store, in all needed indexing.
func (k Keeper) StoreHistoricalTWAP(ctx sdk.Context, twap types.TwapRecord) {
store := ctx.KVStore(k.storeKey)
key1 := types.FormatHistoricalTimeIndexTWAPKey(twap.Time, twap.PoolId, twap.Asset0Denom, twap.Asset1Denom)
key2 := types.FormatHistoricalPoolIndexTWAPKey(twap.PoolId, twap.Asset0Denom, twap.Asset1Denom, twap.Time)
osmoutils.MustSet(store, key1, &twap)
osmoutils.MustSet(store, key2, &twap)
key := types.FormatHistoricalPoolIndexTWAPKey(twap.PoolId, twap.Asset0Denom, twap.Asset1Denom, twap.Time)
osmoutils.MustSet(store, key, &twap)
}

// pruneRecordsBeforeTimeButNewest prunes all records for each pool before the given time but the newest
Expand All @@ -93,77 +91,68 @@ func (k Keeper) StoreHistoricalTWAP(ctx sdk.Context, twap types.TwapRecord) {
func (k Keeper) pruneRecordsBeforeTimeButNewest(ctx sdk.Context, state types.PruningState) error {
czarcas7ic marked this conversation as resolved.
Show resolved Hide resolved
store := ctx.KVStore(k.storeKey)

// Reverse iterator guarantees that we iterate through the newest per pool first.
// Due to how it is indexed, we will only iterate times starting from
// lastKeptTime exclusively down to the oldest record.
iter := store.ReverseIterator(
[]byte(types.HistoricalTWAPTimeIndexPrefix),
state.LastKeySeen)
defer iter.Close()

// We mark what (pool id, asset 0, asset 1) triplets we've seen.
// We prune all records for a triplet that we haven't already seen.
type uniqueTriplet struct {
poolId uint64
asset0 string
asset1 string
}
seenPoolAssetTriplets := map[uniqueTriplet]struct{}{}

var numPruned uint16
var lastPoolIdCompleted uint64

for ; iter.Valid(); iter.Next() {
timeIndexKey := iter.Key()
timeS, poolId, asset0, asset1, err := types.ParseFieldsFromHistoricalTimeKey(timeIndexKey)
for poolId := state.LastSeenPoolId; poolId > 0; poolId-- {
denoms, err := k.poolmanagerKeeper.RouteGetPoolDenoms(ctx, poolId)
if err != nil {
return err
}

poolKey := uniqueTriplet{
poolId,
asset0,
asset1,
// Notice, even if ranging over denomPairs takes us over the prune per block limit,
// we still continue to iterate over all denom pairs of the pool.
// This simplifies logic so that we can consider a pool "done" once we start it.
ValarDragon marked this conversation as resolved.
Show resolved Hide resolved
// It also prevents choosing between keeping more records for a pool than we need to,
// and having to store more state in the pruning state.
denomPairs := types.GetAllUniqueDenomPairs(denoms)
for _, denomPair := range denomPairs {
// Reverse iterator guarantees that we iterate through the newest per pool first.
// Due to how it is indexed, we will only iterate times starting from
// lastKeptTime exclusively down to the oldest record.
iter := store.ReverseIterator(
types.FormatHistoricalPoolIndexDenomPairTWAPKey(poolId, denomPair.Denom0, denomPair.Denom1),
types.FormatHistoricalPoolIndexTWAPKey(poolId, denomPair.Denom0, denomPair.Denom1, state.LastKeptTime))
defer iter.Close()

firstIteration := true
for ; iter.Valid(); iter.Next() {
if !firstIteration {
// We have stored the newest record, so we can prune the rest.
timeIndexKey := iter.Key()
store.Delete(timeIndexKey)
numPruned += 1
} else {
// If this is the first iteration after we have gotten through the records after lastKeptTime, we
// still keep the record in order to allow interpolation (see function description for more details).
firstIteration = false
}
}
}
_, hasSeenPoolRecord := seenPoolAssetTriplets[poolKey]
if !hasSeenPoolRecord {
seenPoolAssetTriplets[poolKey] = struct{}{}
continue
}

// Now we need to delete the historical record, formatted by both historical time and pool index.
// We already are iterating over the historical time index, so we delete that key. Then we
// reformat the key to delete the historical pool index key.
store.Delete(timeIndexKey)
poolIndexKey := types.FormatHistoricalPoolIndexTWAPKeyFromStrTime(poolId, asset0, asset1, timeS)
store.Delete(poolIndexKey)

// Increment the number of records pruned by 2, since we delete two records per iteration.
numPruned += 2
lastPoolIdCompleted = poolId

if numPruned >= NumRecordsToPrunePerBlock {
// We have hit the limit, so we stop pruning.
break
}
}

if !iter.Valid() {
// The iterator is exhausted, so we have pruned all records.
if lastPoolIdCompleted == 1 {
// We have pruned all records.
state.IsPruning = false
k.SetPruningState(ctx, state)
} else {
// We have not pruned all records, so we update the last key seen.
state.LastKeySeen = iter.Key()
// We have not pruned all records, so we update the last seen pool id as the pool ID after the last completed pool.
state.LastSeenPoolId = lastPoolIdCompleted - 1
k.SetPruningState(ctx, state)
}
return nil
}

func (k Keeper) DeleteHistoricalRecord(ctx sdk.Context, twap types.TwapRecord) {
store := ctx.KVStore(k.storeKey)
key1 := types.FormatHistoricalTimeIndexTWAPKey(twap.Time, twap.PoolId, twap.Asset0Denom, twap.Asset1Denom)
key2 := types.FormatHistoricalPoolIndexTWAPKey(twap.PoolId, twap.Asset0Denom, twap.Asset1Denom, twap.Time)
store.Delete(key1)
store.Delete(key2)
key := types.FormatHistoricalPoolIndexTWAPKey(twap.PoolId, twap.Asset0Denom, twap.Asset1Denom, twap.Time)
store.Delete(key)
}

// getMostRecentRecordStoreRepresentation returns the most recent twap record in the store
Expand Down Expand Up @@ -211,13 +200,7 @@ func (k Keeper) GetAllMostRecentRecordsForPoolWithDenoms(ctx sdk.Context, poolId
return []types.TwapRecord{record}, err
}

// getAllHistoricalTimeIndexedTWAPs returns all historical TWAPs indexed by time.
func (k Keeper) GetAllHistoricalTimeIndexedTWAPs(ctx sdk.Context) ([]types.TwapRecord, error) {
return osmoutils.GatherValuesFromStorePrefix(ctx.KVStore(k.storeKey), []byte(types.HistoricalTWAPTimeIndexPrefix), types.ParseTwapFromBz)
}

// getAllHistoricalPoolIndexedTWAPs returns all historical TWAPs indexed by pool id.
// nolint: unused
func (k Keeper) getAllHistoricalPoolIndexedTWAPs(ctx sdk.Context) ([]types.TwapRecord, error) {
return osmoutils.GatherValuesFromStorePrefix(ctx.KVStore(k.storeKey), []byte(types.HistoricalTWAPPoolIndexPrefix), types.ParseTwapFromBz)
}
Expand Down
Loading