Skip to content

Commit

Permalink
Fix flush during shutdown (#3306)
Browse files Browse the repository at this point in the history
  • Loading branch information
madaraszg-tulip authored Feb 20, 2024
1 parent 408f099 commit 55e0470
Showing 1 changed file with 20 additions and 11 deletions.
31 changes: 20 additions & 11 deletions modules/ingester/flush.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,19 @@ func (i *Ingester) handleFlush(ctx context.Context, userID string, blockID uuid.
return false, nil
}

func (i *Ingester) enqueueExec(op *flushOp) {
// Check if shutdown initiated
if i.flushQueues.IsStopped() {
handleAbandonedOp(op)
return
}

err := i.flushQueues.Enqueue(op)
if err != nil {
handleFailedOp(op, err)
}
}

func (i *Ingester) enqueue(op *flushOp, jitter bool) {
delay := time.Duration(0)

Expand All @@ -349,19 +362,15 @@ func (i *Ingester) enqueue(op *flushOp, jitter bool) {

op.at = time.Now().Add(delay)

if !jitter {
// Execute synchronously to make sure we can flush during shutdown
i.enqueueExec(op)
return
}

go func() {
time.Sleep(delay)

// Check if shutdown initiated
if i.flushQueues.IsStopped() {
handleAbandonedOp(op)
return
}

err := i.flushQueues.Enqueue(op)
if err != nil {
handleFailedOp(op, err)
}
i.enqueueExec(op)
}()
}

Expand Down

0 comments on commit 55e0470

Please sign in to comment.