Skip to content

Commit

Permalink
Clear event log db if previous version
Browse files Browse the repository at this point in the history
  • Loading branch information
martonp committed Jul 17, 2024
1 parent e124cb9 commit 81fe4d4
Showing 1 changed file with 41 additions and 0 deletions.
41 changes: 41 additions & 0 deletions client/mm/event_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package mm

import (
"bytes"
"context"
"encoding/binary"
"encoding/json"
Expand Down Expand Up @@ -139,6 +140,7 @@ var _ eventLogDB = (*boltEventLogDB)(nil)
* Schema:
*
* - botRuns
* - version
* - runBucket (<startTime><baseID><quoteID><host>)
* - startTime
* - endTime
Expand All @@ -153,6 +155,7 @@ var _ eventLogDB = (*boltEventLogDB)(nil)

var (
botRunsBucket = []byte("botRuns")
versionKey = []byte("version")
eventsBucket = []byte("events")
cfgsBucket = []byte("cfgs")

Expand All @@ -163,6 +166,8 @@ var (
noPendingKey = []byte("np")
)

const balanceStateDBVersion uint32 = 1

func newBoltEventLogDB(ctx context.Context, path string, log dex.Logger) (*boltEventLogDB, error) {
db, err := bbolt.Open(path, 0600, nil)
if err != nil {
Expand All @@ -184,6 +189,11 @@ func newBoltEventLogDB(ctx context.Context, path string, log dex.Logger) (*boltE
eventUpdates: eventUpdates,
}

err = eventLogDB.upgradeDB()
if err != nil {
return nil, err
}

go func() {
eventLogDB.listenForStoreEvents(ctx)
db.Close()
Expand Down Expand Up @@ -244,6 +254,33 @@ func calcFinalStateBasedOnEventDiff(runBucket, eventsBucket *bbolt.Bucket, event
return finalState, nil
}

func (db *boltEventLogDB) upgradeDB() error {
return db.Update(func(tx *bbolt.Tx) error {
botRuns := tx.Bucket(botRunsBucket)
versionB := botRuns.Get(versionKey)

var version uint32
if versionB != nil {
version = encode.BytesToUint32(versionB)
}

if version < balanceStateDBVersion {
err := botRuns.ForEachBucket(func(k []byte) error {
err := botRuns.DeleteBucket(k)
if err != nil {
return err
}
return nil
})
if err != nil {
return err
}
}

return botRuns.Put(versionKey, encode.Uint32Bytes(balanceStateDBVersion))
})
}

// updateEvent is called for each event that is popped off the updateEvent. If
// the event already exists, it is updated. If it does not exist, it is added.
// The stats for the run are also updated based on the event.
Expand Down Expand Up @@ -448,6 +485,7 @@ func (db *boltEventLogDB) runs(n uint64, refStartTime *uint64, refMkt *MarketWit
var runs []*MarketMakingRun
err := db.View(func(tx *bbolt.Tx) error {
botRuns := tx.Bucket(botRunsBucket)

runs = make([]*MarketMakingRun, 0, botRuns.Stats().BucketN)
cursor := botRuns.Cursor()

Expand All @@ -459,6 +497,9 @@ func (db *boltEventLogDB) runs(n uint64, refStartTime *uint64, refMkt *MarketWit
}

for ; k != nil; k, _ = cursor.Prev() {
if bytes.Equal(k, versionKey) {
continue
}
startTime, mkt, err := parseRunKey(k)
if err != nil {
return err
Expand Down

0 comments on commit 81fe4d4

Please sign in to comment.