Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: syncronize pruning manager #187

Merged
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions pruning/export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ var (
PruneSnapshotHeightsKey = pruneSnapshotHeightsKey

// functions
Int64SliceToBytes = int64SliceToBytes
ListToBytes = listToBytes
Int64SliceToBytes = int64SliceToBytes
ListToBytes = listToBytes
LoadPruningHeights = loadPruningHeights
LoadPruningSnapshotHeights = loadPruningSnapshotHeights
)
171 changes: 104 additions & 67 deletions pruning/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,14 @@ import (
)

type Manager struct {
logger log.Logger
opts types.PruningOptions
snapshotInterval uint64
pruneHeights []int64
pruneSnapshotHeights *list.List
mx sync.Mutex
db dbm.DB
logger log.Logger
opts types.PruningOptions
snapshotInterval uint64
pruneHeights []int64
pruneHeightsMx sync.Mutex
pruneSnapshotHeights *list.List
pruneSnapshotHeightsMx sync.Mutex
}

const errNegativeHeightsFmt = "failed to get pruned heights: %d"
Expand All @@ -28,15 +30,22 @@ var (
pruneSnapshotHeightsKey = []byte("s/pruneSnheights")
)

func NewManager(logger log.Logger) *Manager {
func NewManager(db dbm.DB, logger log.Logger) *Manager {
return &Manager{
logger: logger,
opts: types.NewPruningOptions(types.PruningNothing),
pruneHeights: []int64{},
db: db,
logger: logger,
opts: types.NewPruningOptions(types.PruningNothing),
pruneHeights: []int64{},
// Although pruneHeights happen in the same goroutine with the normal execution,
// we sync access to them to avoid soundness issues in the future if concurrency pattern changes.
pruneHeightsMx: sync.Mutex{},
alexanderbez marked this conversation as resolved.
Show resolved Hide resolved
// These are the heights that are multiples of snapshotInterval and kept for state sync snapshots.
// The heights are added to this list to be pruned when a snapshot is complete.
pruneSnapshotHeights: list.New(),
mx: sync.Mutex{},
pruneSnapshotHeights: list.New(),
// Snapshots are taken in a separate goroutine fromt the regular execution
// and can be delivered asynchrounously via HandleHeightSnapshot.
// Therefore, we sync access to pruneSnapshotHeights with this mutex.
pruneSnapshotHeightsMx: sync.Mutex{},
}
}

Expand All @@ -50,15 +59,25 @@ func (m *Manager) GetOptions() types.PruningOptions {
return m.opts
}

// GetPruningHeights returns all heights to be pruned during the next call to Prune().
func (m *Manager) GetPruningHeights() []int64 {
return m.pruneHeights
}
// GetFlushAndResetPruningHeights returns all heights to be pruned during the next call to Prune().
// It also flushes and resets the pruning heights.
func (m *Manager) GetFlushAndResetPruningHeights() ([]int64, error) {
if m.opts.GetPruningStrategy() == types.PruningNothing {
return []int64{}, nil
}
m.pruneHeightsMx.Lock()
defer m.pruneHeightsMx.Unlock()

pruningHeights := m.pruneHeights

// flush the updates to disk so that it is not lost if crash happens.
if err := m.db.SetSync(pruneHeightsKey, int64SliceToBytes(pruningHeights)); err != nil {
return nil, err
}

// ResetPruningHeights resets the heights to be pruned.
func (m *Manager) ResetPruningHeights() {
// reuse previously allocated memory.
m.pruneHeights = m.pruneHeights[:0]
m.pruneHeights = make([]int64, 0, m.opts.Interval)

return pruningHeights, nil
}

// HandleHeight determines if pruneHeight height needs to be kept for pruning at the right interval prescribed by
Expand All @@ -71,9 +90,14 @@ func (m *Manager) HandleHeight(previousHeight int64) int64 {
}

defer func() {
// handle persisted snapshot heights
m.mx.Lock()
defer m.mx.Unlock()
m.pruneHeightsMx.Lock()
defer m.pruneHeightsMx.Unlock()

m.pruneSnapshotHeightsMx.Lock()
defer m.pruneSnapshotHeightsMx.Unlock()

// move persisted snapshot heights to pruneHeights which
// represent the heights to be pruned at the next pruning interval.
var next *list.Element
for e := m.pruneSnapshotHeights.Front(); e != nil; e = next {
snHeight := e.Value.(int64)
Expand All @@ -87,6 +111,11 @@ func (m *Manager) HandleHeight(previousHeight int64) int64 {
next = e.Next()
}
}

// flush the updates to disk so that they are not lost if crash happens.
if err := m.db.SetSync(pruneHeightsKey, int64SliceToBytes(m.pruneHeights)); err != nil {
panic(err)
}
}()

if int64(m.opts.KeepRecent) < previousHeight {
Expand All @@ -97,21 +126,38 @@ func (m *Manager) HandleHeight(previousHeight int64) int64 {
// - snapshotInterval % (height - KeepRecent) != 0 as that means the height is not
// a 'snapshot' height.
if m.snapshotInterval == 0 || pruneHeight%int64(m.snapshotInterval) != 0 {
m.pruneHeightsMx.Lock()
defer m.pruneHeightsMx.Unlock()

m.pruneHeights = append(m.pruneHeights, pruneHeight)

// flush the updates to disk so that they are not lost if crash happens.
if err := m.db.SetSync(pruneHeightsKey, int64SliceToBytes(m.pruneHeights)); err != nil {
panic(err)
}
return pruneHeight
}
}
return 0
}

// HandleHeightSnapshot persists the snapshot height to be pruned at the next appropriate
// height defined by the pruning strategy. Flushes the update to disk and panics if the flush fails
// The input height must be greater than 0 and pruning strategy any but pruning nothing.
// If one of these conditions is not met, this function does nothing.
func (m *Manager) HandleHeightSnapshot(height int64) {
if m.opts.GetPruningStrategy() == types.PruningNothing {
if m.opts.GetPruningStrategy() == types.PruningNothing || height <= 0 {
return
}
m.mx.Lock()
defer m.mx.Unlock()
m.pruneSnapshotHeightsMx.Lock()
defer m.pruneSnapshotHeightsMx.Unlock()
m.logger.Debug("HandleHeightSnapshot", "height", height)
m.pruneSnapshotHeights.PushBack(height)

// flush the updates to disk so that they are not lost if crash happens.
if err := m.db.SetSync(pruneSnapshotHeightsKey, listToBytes(m.pruneSnapshotHeights)); err != nil {
panic(err)
}
}

// SetSnapshotInterval sets the interval at which the snapshots are taken.
Expand All @@ -124,95 +170,86 @@ func (m *Manager) ShouldPruneAtHeight(height int64) bool {
return m.opts.Interval > 0 && m.opts.GetPruningStrategy() != types.PruningNothing && height%int64(m.opts.Interval) == 0
}

// FlushPruningHeights flushes the pruning heights to the database for crash recovery.
func (m *Manager) FlushPruningHeights(batch dbm.Batch) {
if m.opts.GetPruningStrategy() == types.PruningNothing {
return
}
m.flushPruningHeights(batch)
m.flushPruningSnapshotHeights(batch)
}

// LoadPruningHeights loads the pruning heights from the database as a crash recovery.
func (m *Manager) LoadPruningHeights(db dbm.DB) error {
if m.opts.GetPruningStrategy() == types.PruningNothing {
return nil
}
if err := m.loadPruningHeights(db); err != nil {
loadedPruneHeights, err := loadPruningHeights(db)
if err != nil {
return err
}

if len(loadedPruneHeights) > 0 {
m.pruneHeightsMx.Lock()
defer m.pruneHeightsMx.Unlock()
m.pruneHeights = loadedPruneHeights
}

loadedPruneSnapshotHeights, err := loadPruningSnapshotHeights(db)
if err != nil {
return err
}
return m.loadPruningSnapshotHeights(db)

if loadedPruneSnapshotHeights.Len() > 0 {
m.pruneSnapshotHeightsMx.Lock()
defer m.pruneSnapshotHeightsMx.Unlock()
m.pruneSnapshotHeights = loadedPruneSnapshotHeights
}

return nil
}

func (m *Manager) loadPruningHeights(db dbm.DB) error {
func loadPruningHeights(db dbm.DB) ([]int64, error) {
bz, err := db.Get(pruneHeightsKey)
if err != nil {
return fmt.Errorf("failed to get pruned heights: %w", err)
return []int64{}, fmt.Errorf("failed to get pruned heights: %w", err)
alexanderbez marked this conversation as resolved.
Show resolved Hide resolved
}
if len(bz) == 0 {
return nil
return []int64{}, nil
}

prunedHeights := make([]int64, len(bz)/8)
i, offset := 0, 0
for offset < len(bz) {
h := int64(binary.BigEndian.Uint64(bz[offset : offset+8]))
if h < 0 {
return fmt.Errorf(errNegativeHeightsFmt, h)
return []int64{}, fmt.Errorf(errNegativeHeightsFmt, h)
}

prunedHeights[i] = h
i++
offset += 8
}

if len(prunedHeights) > 0 {
m.pruneHeights = prunedHeights
}

return nil
return prunedHeights, nil
}

func (m *Manager) loadPruningSnapshotHeights(db dbm.DB) error {
func loadPruningSnapshotHeights(db dbm.DB) (*list.List, error) {
bz, err := db.Get(pruneSnapshotHeightsKey)
pruneSnapshotHeights := list.New()
if err != nil {
return fmt.Errorf("failed to get post-snapshot pruned heights: %w", err)
return pruneSnapshotHeights, fmt.Errorf("failed to get post-snapshot pruned heights: %w", err)
alexanderbez marked this conversation as resolved.
Show resolved Hide resolved
}
if len(bz) == 0 {
return nil
return pruneSnapshotHeights, nil
}

pruneSnapshotHeights := list.New()
i, offset := 0, 0
for offset < len(bz) {
h := int64(binary.BigEndian.Uint64(bz[offset : offset+8]))
if h < 0 {
return fmt.Errorf(errNegativeHeightsFmt, h)
return pruneSnapshotHeights, fmt.Errorf(errNegativeHeightsFmt, h)
}

pruneSnapshotHeights.PushBack(h)
i++
offset += 8
}

m.mx.Lock()
defer m.mx.Unlock()
m.pruneSnapshotHeights = pruneSnapshotHeights

return nil
}

func (m *Manager) flushPruningHeights(batch dbm.Batch) {
batch.Set(pruneHeightsKey, int64SliceToBytes(m.pruneHeights))
}

func (m *Manager) flushPruningSnapshotHeights(batch dbm.Batch) {
m.mx.Lock()
defer m.mx.Unlock()
batch.Set(pruneSnapshotHeightsKey, listToBytes(m.pruneSnapshotHeights))
return pruneSnapshotHeights, nil
}

// TODO: convert to a generic version with Go 1.18.
func int64SliceToBytes(slice []int64) []byte {
bz := make([]byte, 0, len(slice)*8)
for _, ph := range slice {
Expand Down
Loading