Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature: Allow stream custom maxsize per batch #2063

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,14 @@
// Note: Calls to ChooseKey are concurrent.
ChooseKey func(item *Item) bool

// MaxSize is the maximum allowed size of a stream batch. This is a soft limit
// as a single list that is still over the limit will have to be sent as is since it
// cannot be split further. This limit prevents the framework from creating batches
// so big that sending them causes issues (e.g running into the max size gRPC limit).
// If necessary, set it up before the Stream starts synchronisation
// This is not a concurrency-safe setting
MaxSize uint64

// KeyToList, similar to ChooseKey, is only invoked on the highest version of the value. It
// is upto the caller to iterate over the versions and generate zero, one or more KVs. It
// is expected that the user would advance the iterator to go through the versions of the
Expand Down Expand Up @@ -315,7 +323,7 @@
// Send the batch immediately if it already exceeds the maximum allowed size.
// If the size of the batch exceeds maxStreamSize, break from the loop to
// avoid creating a batch that is so big that certain limits are reached.
if batch.LenNoPadding() > int(maxStreamSize) {
if batch.LenNoPadding() > int(st.MaxSize) {

Check failure on line 326 in stream.go

View workflow job for this annotation

GitHub Actions / lint

G115: integer overflow conversion uint64 -> int (gosec)
break loop
}
select {
Expand Down Expand Up @@ -452,6 +460,7 @@
db: db,
NumGo: db.opt.NumGoroutines,
LogPrefix: "Badger.Stream",
MaxSize: maxStreamSize,
}
}

Expand Down
62 changes: 62 additions & 0 deletions stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,68 @@ func TestStream(t *testing.T) {
require.NoError(t, db.Close())
}


func TestStreamMaxSize(t *testing.T) {
if !*manual {
t.Skip("Skipping test meant to be run manually.")
return
}
// Set the maxStreamSize to 1MB for the duration of the test so that the it can use a smaller
// dataset than it would otherwise need.
originalMaxStreamSize := maxStreamSize
maxStreamSize = 1 << 20
defer func() {
maxStreamSize = originalMaxStreamSize
}()

testSize := int(1e6)
dir, err := os.MkdirTemp("", "badger-big-test")
require.NoError(t, err)
defer removeDir(dir)

db, err := OpenManaged(DefaultOptions(dir))
require.NoError(t, err)

var count int
wb := db.NewWriteBatchAt(5)
for _, prefix := range []string{"p0", "p1", "p2"} {
for i := 1; i <= testSize; i++ {
require.NoError(t, wb.SetEntry(NewEntry(keyWithPrefix(prefix, i), value(i))))
count++
}
}
require.NoError(t, wb.Flush())

stream := db.NewStreamAt(math.MaxUint64)
stream.LogPrefix = "Testing"
c := &collector{}
stream.Send = c.Send

// default value
require.Equal(t, stream.MaxSize, maxStreamSize)

// reset maxsize
stream.MaxSize = 1024 * 1024 * 50

// Test case 1. Retrieve everything.
err = stream.Orchestrate(ctxb)
require.NoError(t, err)
require.Equal(t, 3*testSize, len(c.kv), "Expected 30000. Got: %d", len(c.kv))

m := make(map[string]int)
for _, kv := range c.kv {
prefix, ki := keyToInt(kv.Key)
expected := value(ki)
require.Equal(t, expected, kv.Value)
m[prefix]++
}
require.Equal(t, 3, len(m))
for pred, count := range m {
require.Equal(t, testSize, count, "Count mismatch for pred: %s", pred)
}
require.NoError(t, db.Close())
}

func TestStreamWithThreadId(t *testing.T) {
dir, err := os.MkdirTemp("", "badger-test")
require.NoError(t, err)
Expand Down
Loading