Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

br/stream: Added toolkit for managing migrations #55665

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions br/pkg/storage/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go_library(
name = "storage",
srcs = [
"azblob.go",
"batch.go",
"compress.go",
"flags.go",
"gcs.go",
Expand All @@ -27,6 +28,7 @@ go_library(
deps = [
"//br/pkg/errors",
"//br/pkg/logutil",
"//br/pkg/utils/iter",
"//pkg/lightning/log",
"//pkg/sessionctx/variable",
"//pkg/util",
Expand Down Expand Up @@ -76,6 +78,7 @@ go_library(
"@org_golang_x_net//http2",
"@org_golang_x_oauth2//google",
"@org_uber_go_atomic//:atomic",
"@org_uber_go_multierr//:multierr",
"@org_uber_go_zap//:zap",
],
)
Expand All @@ -85,6 +88,7 @@ go_test(
timeout = "short",
srcs = [
"azblob_test.go",
"batch_test.go",
"compress_test.go",
"gcs_test.go",
"local_test.go",
Expand Down
165 changes: 165 additions & 0 deletions br/pkg/storage/batch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
package storage

import (
"context"
"encoding/json"
"fmt"
"io"
"os"
"path"
"sync"

"github.com/pingcap/errors"
berrors "github.com/pingcap/tidb/br/pkg/errors"
"go.uber.org/multierr"
)

// Effect is an side effect that happens in the batch storage.
type Effect any

// EffPut is the side effect of a call to `WriteFile`.
type EffPut struct {
File string `json:"file"`
Content []byte `json:"content"`
}

// EffDeleteFiles is the side effect of a call to `DeleteFiles`.
type EffDeleteFiles struct {
Files []string `json:"files"`
}

// EffDeleteFile is the side effect of a call to `DeleteFile`.
type EffDeleteFile string

// EffRename is the side effect of a call to `Rename`.
type EffRename struct {
From string `json:"from"`
To string `json:"to"`
}

// JSONEffects converts a slices of effects into json.
// The json will be a tagged union: `{"type": $go_type_name, "effect": $effect}`
func JSONEffects(es []Effect, output io.Writer) error {
type Typed struct {
Type string `json:"type"`
Eff Effect `json:"effect"`
}

out := make([]Typed, 0, len(es))
for _, eff := range es {
out = append(out, Typed{
Type: fmt.Sprintf("%T", eff),
Eff: eff,
})
}

return json.NewEncoder(output).Encode(out)
}

func SaveJSONEffectsToTmp(es []Effect) (string, error) {
// Save the json to a subdir so user can redirect the output path by symlinking...
tmp, err := os.CreateTemp(path.Join(os.TempDir(), "tidb_br"), "br-effects-*.json")
if err != nil {
return "", err
}
if err := JSONEffects(es, tmp); err != nil {
return "", err
}
return tmp.Name(), nil
}

// Batched is a wrapper of an external storage that suspends all write operations ("effects").
// If `Close()` without calling `Commit()`, nothing will happen in the underlying external storage.
// In that case, we have done a "dry run".
//
// You may use `ReadOnlyEffects()` to get the history of the effects.
// But don't modify the returned slice!
//
// You may use `Commit()` to execute all suspended effects.
type Batched struct {
ExternalStorage
effectsMu sync.Mutex
// It will be one of:
// EffPut, EffDeleteFiles, EffDeleteFile, EffRename
effects []Effect
}

// Batch wraps an external storage instance to a batched version.
func Batch(s ExternalStorage) *Batched {
return &Batched{ExternalStorage: s}
}

// Fetch all effects from the batched storage.
//
// **The returned slice should not be modified.**
func (d *Batched) ReadOnlyEffects() []Effect {
d.effectsMu.Lock()
defer d.effectsMu.Unlock()
return d.effects
}

// CleanEffects cleans all suspended effects.
func (d *Batched) CleanEffects() {
d.effectsMu.Lock()
defer d.effectsMu.Unlock()
d.effects = nil
}

func (d *Batched) DeleteFiles(ctx context.Context, names []string) error {
d.effectsMu.Lock()
defer d.effectsMu.Unlock()
d.effects = append(d.effects, EffDeleteFiles{Files: names})
return nil
}

func (d *Batched) DeleteFile(ctx context.Context, name string) error {
d.effectsMu.Lock()
defer d.effectsMu.Unlock()
d.effects = append(d.effects, EffDeleteFile(name))
return nil
}

func (d *Batched) WriteFile(ctx context.Context, name string, data []byte) error {
d.effectsMu.Lock()
defer d.effectsMu.Unlock()
d.effects = append(d.effects, EffPut{File: name, Content: data})
return nil
}

func (d *Batched) Rename(ctx context.Context, oldName, newName string) error {
d.effectsMu.Lock()
defer d.effectsMu.Unlock()
d.effects = append(d.effects, EffRename{From: oldName, To: newName})
return nil
}

func (d *Batched) Create(ctx context.Context, path string, option *WriterOption) (ExternalFileWriter, error) {
return nil, errors.Annotatef(berrors.ErrStorageUnknown, "ExternalStorage.Create isn't allowed in batch mode for now.")
}

// Commit performs all effects recorded so long in the REAL external storage.
// This will cleanup all of the suspended effects.
func (d *Batched) Commit(ctx context.Context) error {
d.effectsMu.Lock()
defer d.effectsMu.Unlock()

var err error
for _, eff := range d.effects {
switch e := eff.(type) {
case EffPut:
err = multierr.Combine(d.ExternalStorage.WriteFile(ctx, e.File, e.Content), err)
case EffDeleteFiles:
err = multierr.Combine(d.ExternalStorage.DeleteFiles(ctx, e.Files), err)
case EffDeleteFile:
err = multierr.Combine(d.ExternalStorage.DeleteFile(ctx, string(e)), err)
case EffRename:
err = multierr.Combine(d.ExternalStorage.Rename(ctx, e.From, e.To), err)
default:
return errors.Annotatef(berrors.ErrStorageUnknown, "Unknown effect type %T", eff)
}
}

d.effects = nil

return nil
}
108 changes: 108 additions & 0 deletions br/pkg/storage/batch_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
package storage_test

import (
"context"
"io"
"os"
"testing"

. "github.com/pingcap/tidb/br/pkg/storage"
"github.com/stretchr/testify/require"
)

func TestBatched(t *testing.T) {
ctx := context.Background()
bat := Batch(nil) // Passing nil as we don't need actual storage operations

// Test operations
operations := []struct {
name string
op func() error
expected []Effect
}{
{
name: "DeleteFiles",
op: func() error {
return bat.DeleteFiles(ctx, []string{"file1.txt", "file2.txt"})
},
expected: []Effect{EffDeleteFiles{Files: []string{"file1.txt", "file2.txt"}}},
},
{
name: "DeleteFile",
op: func() error {
return bat.DeleteFile(ctx, "file3.txt")
},
expected: []Effect{EffDeleteFile("file3.txt")},
},
{
name: "WriteFile",
op: func() error {
return bat.WriteFile(ctx, "file4.txt", []byte("content"))
},
expected: []Effect{EffPut{File: "file4.txt", Content: []byte("content")}},
},
{
name: "Rename",
op: func() error {
return bat.Rename(ctx, "oldName.txt", "newName.txt")
},
expected: []Effect{EffRename{From: "oldName.txt", To: "newName.txt"}},
},
{
name: "SequenceOfOperations",
op: func() error {
if err := bat.DeleteFile(ctx, "file5.txt"); err != nil {
return err
}
if err := bat.WriteFile(ctx, "file6.txt", []byte("new content")); err != nil {
return err
}
return bat.Rename(ctx, "file6.txt", "fileRenamed.txt")
},
expected: []Effect{
EffDeleteFile("file5.txt"),
EffPut{File: "file6.txt", Content: []byte("new content")},
EffRename{From: "file6.txt", To: "fileRenamed.txt"},
}},
}

for _, op := range operations {
t.Run(op.name, func(t *testing.T) {
require.NoError(t, op.op())

effects := bat.ReadOnlyEffects()
require.Equal(t, len(op.expected), len(effects))
for i, effect := range effects {
require.Equal(t, op.expected[i], effect)
}

// Reset effects for the next test
bat.CleanEffects()
})
}
}

func TestJSONEffects(t *testing.T) {
effects := []Effect{
EffPut{File: "example.txt", Content: []byte("Hello, world")},
EffDeleteFiles{Files: []string{"old_file.txt", "temp.txt"}},
EffDeleteFile("obsolete.txt"),
EffRename{From: "old_name.txt", To: "new_name.txt"},
}

tmp, err := SaveJSONEffectsToTmp(effects)
require.NoError(t, err)
f, err := os.Open(tmp)
require.NoError(t, err)
buf, err := io.ReadAll(f)
require.NoError(t, err)

expectedJSON := `[
{"type":"storage.EffPut","effect":{"file":"example.txt","content":"SGVsbG8sIHdvcmxk"}},
{"type":"storage.EffDeleteFiles","effect":{"files":["old_file.txt","temp.txt"]}},
{"type":"storage.EffDeleteFile","effect":"obsolete.txt"},
{"type":"storage.EffRename","effect":{"from":"old_name.txt","to":"new_name.txt"}}
]`

require.JSONEq(t, expectedJSON, string(buf), "Output JSON should match expected JSON")
}
49 changes: 49 additions & 0 deletions br/pkg/storage/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"context"
"sync/atomic"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/br/pkg/utils/iter"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
)

Expand Down Expand Up @@ -37,3 +39,50 @@ var activeUploadWorkerCnt atomic.Int64
func GetActiveUploadWorkerCount() int64 {
return activeUploadWorkerCnt.Load()
}

// UnmarshalDir iterates over a prefix, then "unmarshal" the content of each file it met with the unmarshal function.
// Returning an iterator that yields the unmarshaled content.
// The "unmarshal" function should put the result of unmarshalling to the `target` argument.
func UnmarshalDir[T any](ctx context.Context, walkOpt *WalkOption, s ExternalStorage, unmarshal func(target *T, name string, content []byte) error) iter.TryNextor[*T] {
ch := make(chan *T)
errCh := make(chan error, 1)
reader := func() {
defer close(ch)
err := s.WalkDir(ctx, walkOpt, func(path string, size int64) error {
metaBytes, err := s.ReadFile(ctx, path)
if err != nil {
return errors.Annotatef(err, "failed during reading file %s", path)
}
var meta T
if err := unmarshal(&meta, path, metaBytes); err != nil {
return errors.Annotatef(err, "failed to parse subcompaction meta of file %s", path)
}
select {
case ch <- &meta:
case <-ctx.Done():
return ctx.Err()
}
return nil
})
if err != nil {
select {
case errCh <- err:
case <-ctx.Done():
}
}
}
go reader()
return iter.Func(func(ctx context.Context) iter.IterResult[*T] {
select {
case <-ctx.Done():
return iter.Throw[*T](ctx.Err())
case err := <-errCh:
return iter.Throw[*T](err)
case meta, ok := <-ch:
if !ok {
return iter.Done[*T]()
}
return iter.Emit(meta)
}
})
}
Loading
Loading