Skip to content

Commit

Permalink
crl-release-23.2: sstable: add LoadBlockSema option
Browse files Browse the repository at this point in the history
Add option to use a semaphore to limit the number of block loads in
parallel.
  • Loading branch information
RaduBerinde committed Jun 11, 2024
1 parent 600eaf7 commit a3d91f3
Show file tree
Hide file tree
Showing 10 changed files with 141 additions and 17 deletions.
104 changes: 104 additions & 0 deletions db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,17 @@ import (
"fmt"
"io"
"path/filepath"
"runtime"
"sort"
"strconv"
"strings"
"sync"
"sync/atomic"
"testing"
"time"

"github.com/cockroachdb/errors"
"github.com/cockroachdb/fifo"
"github.com/cockroachdb/pebble/internal/base"
"github.com/cockroachdb/pebble/internal/cache"
"github.com/cockroachdb/pebble/internal/invariants"
Expand Down Expand Up @@ -1968,3 +1971,104 @@ func BenchmarkRotateMemtables(b *testing.B) {
}
}
}

type readTrackFS struct {
vfs.FS

currReadCount atomic.Int32
maxReadCount atomic.Int32
}

type readTrackFile struct {
vfs.File
fs *readTrackFS
}

func (fs *readTrackFS) Open(name string, opts ...vfs.OpenOption) (vfs.File, error) {
file, err := fs.FS.Open(name, opts...)
if err != nil || !strings.HasSuffix(name, ".sst") {
return file, err
}
return &readTrackFile{
File: file,
fs: fs,
}, nil
}

func (f *readTrackFile) ReadAt(p []byte, off int64) (n int, err error) {
val := f.fs.currReadCount.Add(1)
defer f.fs.currReadCount.Add(-1)
for maxVal := f.fs.maxReadCount.Load(); val > maxVal; maxVal = f.fs.maxReadCount.Load() {
if f.fs.maxReadCount.CompareAndSwap(maxVal, val) {
break
}
}
return f.File.ReadAt(p, off)
}

func TestLoadBlockSema(t *testing.T) {
fs := &readTrackFS{FS: vfs.NewMem()}
sema := fifo.NewSemaphore(100)
db, err := Open("", testingRandomized(t, &Options{
Cache: cache.New(1),
FS: fs,
LoadBlockSema: sema,
}))
require.NoError(t, err)

key := func(i, j int) []byte {
return []byte(fmt.Sprintf("%02d/%02d", i, j))
}

// Create 20 regions and compact them separately, so we end up with 20
// disjoint tables.
const numRegions = 20
const numKeys = 20
for i := 0; i < numRegions; i++ {
for j := 0; j < numKeys; j++ {
require.NoError(t, db.Set(key(i, j), []byte("value"), nil))
}
require.NoError(t, db.Compact(key(i, 0), key(i, numKeys-1), false))
}

// Read all regions to warm up the table cache.
for i := 0; i < numRegions; i++ {
val, closer, err := db.Get(key(i, 1))
require.NoError(t, err)
require.Equal(t, []byte("value"), val)
if closer != nil {
closer.Close()
}
}

for _, n := range []int64{1, 2, 4} {
t.Run(fmt.Sprintf("%d", n), func(t *testing.T) {
sema.UpdateCapacity(n)
fs.maxReadCount.Store(0)
var wg sync.WaitGroup
// Spin up workers that perform random reads.
const numWorkers = 20
for i := 0; i < numWorkers; i++ {
wg.Add(1)
go func() {
defer wg.Done()
const numQueries = 100
for i := 0; i < numQueries; i++ {
val, closer, err := db.Get(key(rand.Intn(numRegions), rand.Intn(numKeys)))
require.NoError(t, err)
require.Equal(t, []byte("value"), val)
if closer != nil {
closer.Close()
}
runtime.Gosched()
}
}()
}
wg.Wait()
// Verify the maximum read count did not exceed the limit.
maxReadCount := fs.maxReadCount.Load()
require.Greater(t, maxReadCount, int32(0))
require.LessOrEqual(t, maxReadCount, int32(n))
})
}
}
5 changes: 3 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ require (
github.com/cespare/xxhash/v2 v2.2.0
github.com/cockroachdb/datadriven v1.0.3-0.20230413201302-be42291fc80f
github.com/cockroachdb/errors v1.11.1
github.com/cockroachdb/fifo v0.0.0-20240606204812-0bbfbd93a7ce
github.com/cockroachdb/redact v1.1.5
github.com/cockroachdb/tokenbucket v0.0.0-20230807174530-cc333fc44b06
github.com/ghemawat/stream v0.0.0-20171120220530-696b145b53b9
Expand All @@ -18,10 +19,10 @@ require (
github.com/prometheus/client_golang v1.12.0
github.com/prometheus/client_model v0.2.1-0.20210607210712-147c58e9608a
github.com/spf13/cobra v1.0.0
github.com/stretchr/testify v1.8.1
github.com/stretchr/testify v1.9.0
golang.org/x/exp v0.0.0-20230626212559-97b1e661b5df
golang.org/x/perf v0.0.0-20230113213139-801c7ef9e5c5
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4
golang.org/x/sync v0.7.0
golang.org/x/sys v0.11.0
)

Expand Down
14 changes: 6 additions & 8 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ github.com/cockroachdb/datadriven v1.0.3-0.20230413201302-be42291fc80f h1:otljaY
github.com/cockroachdb/datadriven v1.0.3-0.20230413201302-be42291fc80f/go.mod h1:a9RdTaap04u637JoCzcUoIcDmvwSUtcUFtT/C3kJlTU=
github.com/cockroachdb/errors v1.11.1 h1:xSEW75zKaKCWzR3OfxXUxgrk/NtT4G1MiOv5lWZazG8=
github.com/cockroachdb/errors v1.11.1/go.mod h1:8MUxA3Gi6b25tYlFEBGLf+D8aISL+M4MIpiWMSNRfxw=
github.com/cockroachdb/fifo v0.0.0-20240606204812-0bbfbd93a7ce h1:giXvy4KSc/6g/esnpM7Geqxka4WSqI1SZc7sMJFd3y4=
github.com/cockroachdb/fifo v0.0.0-20240606204812-0bbfbd93a7ce/go.mod h1:9/y3cnZ5GKakj/H4y9r9GTjCvAFta7KLgSHPJJYc52M=
github.com/cockroachdb/logtags v0.0.0-20230118201751-21c54148d20b h1:r6VH0faHjZeQy818SGhaone5OnYfxFR/+AzdY3sf5aE=
github.com/cockroachdb/logtags v0.0.0-20230118201751-21c54148d20b/go.mod h1:Vz9DsVWQQhf3vs21MhPMZpMGSht7O/2vFW2xusFUVOs=
github.com/cockroachdb/redact v1.1.5 h1:u1PMllDkdFfPWaNGMyLD1+so+aq3uUItthCFqzwPJ30=
Expand Down Expand Up @@ -310,16 +312,12 @@ github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An
github.com/spf13/viper v1.4.0/go.mod h1:PTJ7Z/lr49W6bUbkmS1V3by4uWynFiR9p7+dSq/yZzE=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
github.com/ugorji/go v1.1.4/go.mod h1:uQMGLiO92mf5W77hV/PUCpI3pbzQx3CRekS0kk+RGrc=
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU=
Expand Down Expand Up @@ -439,8 +437,8 @@ golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4 h1:uVc8UZUe6tr40fFVnUP5Oj+veunVezqYl9z7DYw9xzw=
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M=
golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20181107165924-66b7b1311ac8/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
Expand Down
7 changes: 7 additions & 0 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"time"

"github.com/cockroachdb/errors"
"github.com/cockroachdb/fifo"
"github.com/cockroachdb/pebble/internal/base"
"github.com/cockroachdb/pebble/internal/cache"
"github.com/cockroachdb/pebble/internal/humanize"
Expand Down Expand Up @@ -481,6 +482,11 @@ type Options struct {
// The default cache size is 8 MB.
Cache *cache.Cache

// LoadBlockSema, if set, is used to limit the number of blocks that can be
// loaded (i.e. read from the filesystem) in parallel. Each load acquires one
// unit from the semaphore for the duration of the read.
LoadBlockSema *fifo.Semaphore

// Cleaner cleans obsolete files.
//
// The default cleaner uses the DeleteCleaner.
Expand Down Expand Up @@ -1713,6 +1719,7 @@ func (o *Options) MakeReaderOptions() sstable.ReaderOptions {
var readerOpts sstable.ReaderOptions
if o != nil {
readerOpts.Cache = o.Cache
readerOpts.LoadBlockSema = o.LoadBlockSema
readerOpts.Comparer = o.Comparer
readerOpts.Filters = o.Filters
if o.Merger != nil {
Expand Down
6 changes: 6 additions & 0 deletions sstable/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package sstable

import (
"github.com/cockroachdb/fifo"
"github.com/cockroachdb/pebble/internal/base"
"github.com/cockroachdb/pebble/internal/cache"
)
Expand Down Expand Up @@ -111,6 +112,11 @@ type ReaderOptions struct {
// The default cache size is a zero-size cache.
Cache *cache.Cache

// LoadBlockSema, if set, is used to limit the number of blocks that can be
// loaded (i.e. read from the filesystem) in parallel. Each load acquires one
// unit from the semaphore for the duration of the read.
LoadBlockSema *fifo.Semaphore

// User properties specified in this map will not be added to sst.Properties.UserProperties.
DeniedUserProperties map[string]struct{}

Expand Down
9 changes: 9 additions & 0 deletions sstable/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -539,6 +539,15 @@ func (r *Reader) readBlock(
}

// Cache miss.

if sema := r.opts.LoadBlockSema; sema != nil {
if err := sema.Acquire(ctx, 1); err != nil {
// An error here can only come from the context.
return bufferHandle{}, err
}
defer sema.Release(1)
}

var compressed cacheValueOrBuf
if bufferPool != nil {
compressed = cacheValueOrBuf{
Expand Down
1 change: 0 additions & 1 deletion testdata/batch_reader
Original file line number Diff line number Diff line change
Expand Up @@ -95,4 +95,3 @@ scan
----
Count: 1
err: decoding user key: pebble: invalid batch

4 changes: 2 additions & 2 deletions testdata/event_listener
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ Zombie tables: 0 (0B)
Backing tables: 0 (0B)
Virtual tables: 0 (0B)
Block cache: 6 entries (1.1KB) hit rate: 11.1%
Table cache: 1 entries (800B) hit rate: 40.0%
Table cache: 1 entries (808B) hit rate: 40.0%
Secondary cache: 0 entries (0B) hit rate: 0.0%
Snapshots: 0 earliest seq num: 0
Table iters: 0
Expand Down Expand Up @@ -388,7 +388,7 @@ Zombie tables: 0 (0B)
Backing tables: 0 (0B)
Virtual tables: 0 (0B)
Block cache: 12 entries (2.3KB) hit rate: 14.3%
Table cache: 1 entries (800B) hit rate: 50.0%
Table cache: 1 entries (808B) hit rate: 50.0%
Secondary cache: 0 entries (0B) hit rate: 0.0%
Snapshots: 0 earliest seq num: 0
Table iters: 0
Expand Down
2 changes: 1 addition & 1 deletion testdata/ingest
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ Zombie tables: 0 (0B)
Backing tables: 0 (0B)
Virtual tables: 0 (0B)
Block cache: 6 entries (1.2KB) hit rate: 35.7%
Table cache: 1 entries (800B) hit rate: 50.0%
Table cache: 1 entries (808B) hit rate: 50.0%
Secondary cache: 0 entries (0B) hit rate: 0.0%
Snapshots: 0 earliest seq num: 0
Table iters: 0
Expand Down
6 changes: 3 additions & 3 deletions testdata/metrics
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ Zombie tables: 0 (0B)
Backing tables: 0 (0B)
Virtual tables: 0 (0B)
Block cache: 3 entries (556B) hit rate: 0.0%
Table cache: 1 entries (800B) hit rate: 0.0%
Table cache: 1 entries (808B) hit rate: 0.0%
Secondary cache: 0 entries (0B) hit rate: 0.0%
Snapshots: 0 earliest seq num: 0
Table iters: 1
Expand Down Expand Up @@ -197,7 +197,7 @@ Zombie tables: 1 (661B)
Backing tables: 0 (0B)
Virtual tables: 0 (0B)
Block cache: 3 entries (556B) hit rate: 42.9%
Table cache: 1 entries (800B) hit rate: 66.7%
Table cache: 1 entries (808B) hit rate: 66.7%
Secondary cache: 0 entries (0B) hit rate: 0.0%
Snapshots: 0 earliest seq num: 0
Table iters: 1
Expand Down Expand Up @@ -445,7 +445,7 @@ Zombie tables: 0 (0B)
Backing tables: 0 (0B)
Virtual tables: 0 (0B)
Block cache: 12 entries (2.4KB) hit rate: 31.1%
Table cache: 3 entries (2.3KB) hit rate: 57.9%
Table cache: 3 entries (2.4KB) hit rate: 57.9%
Secondary cache: 0 entries (0B) hit rate: 0.0%
Snapshots: 0 earliest seq num: 0
Table iters: 0
Expand Down

0 comments on commit a3d91f3

Please sign in to comment.