Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Properly flush unique queues on startup #23154

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion modules/queue/queue_channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,10 @@ func (q *ChannelQueue) Shutdown() {
log.Trace("ChannelQueue: %s Flushing", q.name)
// We can't use Cleanup here because that will close the channel
if err := q.FlushWithContext(q.terminateCtx); err != nil {
log.Warn("ChannelQueue: %s Terminated before completed flushing", q.name)
count := atomic.LoadInt64(&q.numInQueue)
if count > 0 {
log.Warn("ChannelQueue: %s Terminated before completed flushing", q.name)
}
return
}
log.Debug("ChannelQueue: %s Flushed", q.name)
Expand Down
29 changes: 22 additions & 7 deletions modules/queue/queue_disk_channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,8 @@ func NewPersistableChannelQueue(handle HandlerFunc, cfg, exemplar interface{}) (
},
Workers: 0,
},
DataDir: config.DataDir,
DataDir: config.DataDir,
QueueName: config.Name + "-level",
}

levelQueue, err := NewLevelQueue(wrappedHandle, levelCfg, exemplar)
Expand Down Expand Up @@ -172,16 +173,18 @@ func (q *PersistableChannelQueue) Run(atShutdown, atTerminate func(func())) {
atShutdown(q.Shutdown)
atTerminate(q.Terminate)

if lq, ok := q.internal.(*LevelQueue); ok && lq.byteFIFO.Len(lq.shutdownCtx) != 0 {
if lq, ok := q.internal.(*LevelQueue); ok && lq.byteFIFO.Len(lq.terminateCtx) != 0 {
// Just run the level queue - we shut it down once it's flushed
go q.internal.Run(func(_ func()) {}, func(_ func()) {})
go func() {
for !q.IsEmpty() {
_ = q.internal.Flush(0)
for !lq.IsEmpty() {
_ = lq.Flush(0)
select {
case <-time.After(100 * time.Millisecond):
case <-q.internal.(*LevelQueue).shutdownCtx.Done():
log.Warn("LevelQueue: %s shut down before completely flushed", q.internal.(*LevelQueue).Name())
case <-lq.shutdownCtx.Done():
if lq.byteFIFO.Len(lq.terminateCtx) > 0 {
log.Warn("LevelQueue: %s shut down before completely flushed", q.internal.(*LevelQueue).Name())
}
return
}
}
Expand Down Expand Up @@ -316,10 +319,22 @@ func (q *PersistableChannelQueue) Shutdown() {
// Redirect all remaining data in the chan to the internal channel
log.Trace("PersistableChannelQueue: %s Redirecting remaining data", q.delayedStarter.name)
close(q.channelQueue.dataChan)
countOK, countLost := 0, 0
for data := range q.channelQueue.dataChan {
_ = q.internal.Push(data)
err := q.internal.Push(data)
if err != nil {
log.Error("PersistableChannelQueue: %s Unable redirect %v due to: %v", q.delayedStarter.name, data, err)
countLost++
} else {
countOK++
}
atomic.AddInt64(&q.channelQueue.numInQueue, -1)
}
if countLost > 0 {
log.Warn("PersistableChannelQueue: %s %d will be restored on restart, %d lost", q.delayedStarter.name, countOK, countLost)
} else if countOK > 0 {
log.Warn("PersistableChannelQueue: %s %d will be restored on restart", q.delayedStarter.name, countOK)
}
log.Trace("PersistableChannelQueue: %s Done Redirecting remaining data", q.delayedStarter.name)

log.Debug("PersistableChannelQueue: %s Shutdown", q.delayedStarter.name)
Expand Down
8 changes: 4 additions & 4 deletions modules/queue/queue_disk_channel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func TestPersistableChannelQueue(t *testing.T) {
Workers: 1,
BoostWorkers: 0,
MaxWorkers: 10,
Name: "first",
Name: "test-queue",
}, &testData{})
assert.NoError(t, err)

Expand Down Expand Up @@ -135,7 +135,7 @@ func TestPersistableChannelQueue(t *testing.T) {
Workers: 1,
BoostWorkers: 0,
MaxWorkers: 10,
Name: "second",
Name: "test-queue",
}, &testData{})
assert.NoError(t, err)

Expand Down Expand Up @@ -227,7 +227,7 @@ func TestPersistableChannelQueue_Pause(t *testing.T) {
Workers: 1,
BoostWorkers: 0,
MaxWorkers: 10,
Name: "first",
Name: "test-queue",
}, &testData{})
assert.NoError(t, err)

Expand Down Expand Up @@ -433,7 +433,7 @@ func TestPersistableChannelQueue_Pause(t *testing.T) {
Workers: 1,
BoostWorkers: 0,
MaxWorkers: 10,
Name: "second",
Name: "test-queue",
}, &testData{})
assert.NoError(t, err)
pausable, ok = queue.(Pausable)
Expand Down
4 changes: 3 additions & 1 deletion modules/queue/unique_queue_channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,9 @@ func (q *ChannelUniqueQueue) Shutdown() {
go func() {
log.Trace("ChannelUniqueQueue: %s Flushing", q.name)
if err := q.FlushWithContext(q.terminateCtx); err != nil {
log.Warn("ChannelUniqueQueue: %s Terminated before completed flushing", q.name)
if !q.IsEmpty() {
log.Warn("ChannelUniqueQueue: %s Terminated before completed flushing", q.name)
}
return
}
log.Debug("ChannelUniqueQueue: %s Flushed", q.name)
Expand Down
7 changes: 7 additions & 0 deletions modules/queue/unique_queue_channel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,13 @@ import (
"testing"
"time"

"code.gitea.io/gitea/modules/log"

"github.com/stretchr/testify/assert"
)

func TestChannelUniqueQueue(t *testing.T) {
_ = log.NewLogger(1000, "console", "console", `{"level":"warn","stacktracelevel":"NONE","stderr":true}`)
handleChan := make(chan *testData)
handle := func(data ...Data) []Data {
for _, datum := range data {
Expand Down Expand Up @@ -52,6 +55,8 @@ func TestChannelUniqueQueue(t *testing.T) {
}

func TestChannelUniqueQueue_Batch(t *testing.T) {
_ = log.NewLogger(1000, "console", "console", `{"level":"warn","stacktracelevel":"NONE","stderr":true}`)

handleChan := make(chan *testData)
handle := func(data ...Data) []Data {
for _, datum := range data {
Expand Down Expand Up @@ -98,6 +103,8 @@ func TestChannelUniqueQueue_Batch(t *testing.T) {
}

func TestChannelUniqueQueue_Pause(t *testing.T) {
_ = log.NewLogger(1000, "console", "console", `{"level":"warn","stacktracelevel":"NONE","stderr":true}`)

lock := sync.Mutex{}
var queue Queue
var err error
Expand Down
41 changes: 33 additions & 8 deletions modules/queue/unique_queue_disk_channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,8 @@ func NewPersistableChannelUniqueQueue(handle HandlerFunc, cfg, exemplar interfac
},
Workers: 0,
},
DataDir: config.DataDir,
DataDir: config.DataDir,
QueueName: config.Name + "-level",
}

queue.channelQueue = channelUniqueQueue.(*ChannelUniqueQueue)
Expand Down Expand Up @@ -209,17 +210,29 @@ func (q *PersistableChannelUniqueQueue) Run(atShutdown, atTerminate func(func())
atTerminate(q.Terminate)
_ = q.channelQueue.AddWorkers(q.channelQueue.workers, 0)

if luq, ok := q.internal.(*LevelUniqueQueue); ok && luq.ByteFIFOUniqueQueue.byteFIFO.Len(luq.shutdownCtx) != 0 {
if luq, ok := q.internal.(*LevelUniqueQueue); ok && !luq.IsEmpty() {
lunny marked this conversation as resolved.
Show resolved Hide resolved
// Just run the level queue - we shut it down once it's flushed
go q.internal.Run(func(_ func()) {}, func(_ func()) {})
go luq.Run(func(_ func()) {}, func(_ func()) {})
go func() {
_ = q.internal.Flush(0)
log.Debug("LevelUniqueQueue: %s flushed so shutting down", q.internal.(*LevelUniqueQueue).Name())
q.internal.(*LevelUniqueQueue).Shutdown()
GetManager().Remove(q.internal.(*LevelUniqueQueue).qid)
_ = luq.Flush(0)
for !luq.IsEmpty() {
_ = luq.Flush(0)
select {
case <-time.After(100 * time.Millisecond):
case <-luq.shutdownCtx.Done():
if luq.byteFIFO.Len(luq.terminateCtx) > 0 {
log.Warn("LevelUniqueQueue: %s shut down before completely flushed", luq.Name())
}
return
}
}
log.Debug("LevelUniqueQueue: %s flushed so shutting down", luq.Name())
luq.Shutdown()
GetManager().Remove(luq.qid)
}()
} else {
log.Debug("PersistableChannelUniqueQueue: %s Skipping running the empty level queue", q.delayedStarter.name)
_ = q.internal.Flush(0)
q.internal.(*LevelUniqueQueue).Shutdown()
GetManager().Remove(q.internal.(*LevelUniqueQueue).qid)
}
Expand Down Expand Up @@ -285,8 +298,20 @@ func (q *PersistableChannelUniqueQueue) Shutdown() {
// Redirect all remaining data in the chan to the internal channel
close(q.channelQueue.dataChan)
log.Trace("PersistableChannelUniqueQueue: %s Redirecting remaining data", q.delayedStarter.name)
countOK, countLost := 0, 0
for data := range q.channelQueue.dataChan {
_ = q.internal.Push(data)
err := q.internal.(*LevelUniqueQueue).Push(data)
if err != nil {
log.Error("PersistableChannelUniqueQueue: %s Unable redirect %v due to: %v", q.delayedStarter.name, data, err)
countLost++
} else {
countOK++
}
}
if countLost > 0 {
log.Warn("PersistableChannelUniqueQueue: %s %d will be restored on restart, %d lost", q.delayedStarter.name, countOK, countLost)
} else if countOK > 0 {
log.Warn("PersistableChannelUniqueQueue: %s %d will be restored on restart", q.delayedStarter.name, countOK)
}
log.Trace("PersistableChannelUniqueQueue: %s Done Redirecting remaining data", q.delayedStarter.name)

Expand Down
Loading