Skip to content

Commit

Permalink
Expose translation mvcc (FeatureBaseDB#2381)
Browse files Browse the repository at this point in the history
* expose Transaction on TranslateStore for DAX Snapshotting

* try to fix ramdisk nonsense

apparently, we were running in either a shell env or docker env
randomly, so this could sometimes pass and sometimes fail since the
shell env had the ramdisk set up and docker didn't.

Now we force to run in docker always and set up ramdisk explicitly.

* debug ramdisk issue?

* fix tests... and a buncha other stuff

The executor test I modified failed when I changed DefaultPartitionN
to 8, but just because stuff was out of order so I made it more
robust.

I edited some data gen stuff to make shorter lines because it was
making grep results unusable.

the actual fix is in translate_boltdb_test.go

* clean up, fix code review feedback

(cherry picked from commit 3693b99)
  • Loading branch information
jaffee authored and ch7ck committed Jan 10, 2023
1 parent a6c165d commit dfd4bb1
Show file tree
Hide file tree
Showing 7 changed files with 97 additions and 61 deletions.
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,9 @@ testvsub:
echo; echo "999 done testing subpkg $$pkg"; \
done

# make a 2GB RAMDisk. Speed up tests by running them with RAMDISK=/mnt/ramfs
# make a 2GB RAMDisk. Speed up tests by running them with RAMDISK=/mnt/ramdisk
ramdisk-linux:
mount -o size=2G -t tmpfs none /mnt/ramfs
mount -o size=2G -t tmpfs none /mnt/ramdisk

# make a 2GB RAMDisk. Speed up tests by running them with RAMDISK=/Volumes/RAMDisk
ramdisk-osx:
Expand Down
29 changes: 19 additions & 10 deletions api.go
Original file line number Diff line number Diff line change
Expand Up @@ -967,7 +967,7 @@ func (r RedirectError) Error() string {
}

// TranslateData returns all translation data in the specified partition.
func (api *API) TranslateData(ctx context.Context, indexName string, partition int) (io.WriterTo, error) {
func (api *API) TranslateData(ctx context.Context, indexName string, partition int) (TranslateStore, error) {
span, _ := tracing.StartSpanFromContext(ctx, "API.TranslateData")
defer span.Finish()

Expand Down Expand Up @@ -1022,7 +1022,7 @@ func (api *API) TranslateData(ctx context.Context, indexName string, partition i
}

// FieldTranslateData returns all translation data in the specified field.
func (api *API) FieldTranslateData(ctx context.Context, indexName, fieldName string) (io.WriterTo, error) {
func (api *API) FieldTranslateData(ctx context.Context, indexName, fieldName string) (TranslateStore, error) {
span, _ := tracing.StartSpanFromContext(ctx, "API.FieldTranslateData")
defer span.Finish()
if err := api.validate(apiFieldTranslateData); err != nil {
Expand Down Expand Up @@ -3136,18 +3136,22 @@ func (api *API) SnapshotTableKeys(ctx context.Context, req *dax.SnapshotTableKey
qtid := req.TableKey.QualifiedTableID()

// Create the snapshot for the current version.
wrTo, err := api.TranslateData(ctx, string(req.TableKey), int(req.PartitionNum))
trans, err := api.TranslateData(ctx, string(req.TableKey), int(req.PartitionNum))
if err != nil {
return errors.Wrapf(err, "getting index/partition writeto: %s/%d", req.TableKey, req.PartitionNum)
return errors.Wrapf(err, "getting index/partition translate store: %s/%d", req.TableKey, req.PartitionNum)
}
// get a write tx to ensure no other writes while incrementing WL version.
wrTo, err := trans.Begin(true)
if err != nil {
return errors.Wrap(err, "beginning table translate write tx")
}
defer wrTo.Rollback()

// TODO(jaffee) need to ensure writes to translation data can't
// occur while this is happening.
resource := api.serverlessStorage.GetTableKeyResource(qtid, req.PartitionNum)
if err := resource.IncrementWLVersion(); err != nil {
return errors.Wrap(err, "incrementing write log version")
}
// TODO(jaffee) downgrade (currently non-existent) lock to read-only
// TODO(jaffee) downgrade write tx to read-only
err = resource.SnapshotTo(wrTo)
return errors.Wrap(err, "snapshotting table keys")
}
Expand All @@ -3158,17 +3162,22 @@ func (api *API) SnapshotFieldKeys(ctx context.Context, req *dax.SnapshotFieldKey
qtid := req.TableKey.QualifiedTableID()

// Create the snapshot for the current version.
// TODO(jaffee) change this to get write lock
wrTo, err := api.FieldTranslateData(ctx, string(req.TableKey), string(req.Field))
trans, err := api.FieldTranslateData(ctx, string(req.TableKey), string(req.Field))
if err != nil {
return errors.Wrap(err, "getting index/field writeto")
}
// get a write tx to ensure no other writes while incrementing WL version.
wrTo, err := trans.Begin(true)
if err != nil {
return errors.Wrap(err, "beginning field translate write tx")
}
defer wrTo.Rollback()

resource := api.serverlessStorage.GetFieldKeyResource(qtid, req.Field)
if err := resource.IncrementWLVersion(); err != nil {
return errors.Wrap(err, "incrementing writelog version")
}
// TODO(jaffee) downgrade to read lock
// TODO(jaffee) downgrade to read tx
err = resource.SnapshotTo(wrTo)
return errors.Wrap(err, "snapshotTo in FieldKeys")
}
Expand Down
64 changes: 34 additions & 30 deletions executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5132,47 +5132,50 @@ func TestExecutor_Execute_Extract_Keyed(t *testing.T) {
`)

resp := c.Query(t, c.Idx(), `Extract(All(), Rows(set))`)
expect := []interface{}{
pilosa.ExtractedTable{
Fields: []pilosa.ExtractedTableField{
{
Name: "set",
Type: "[]uint64",
},
expect := pilosa.ExtractedTable{
Fields: []pilosa.ExtractedTableField{
{
Name: "set",
Type: "[]uint64",
},
// The order of these probably shouldn't matter, but currently depends indirectly on the
// index.
Columns: []pilosa.ExtractedTableColumn{
{
Column: pilosa.KeyOrID{Keyed: true, Key: "h"},
Rows: []interface{}{
[]uint64{
1,
2,
},
},
// The order of these probably shouldn't matter, but currently depends indirectly on the
// index.
Columns: []pilosa.ExtractedTableColumn{
{
Column: pilosa.KeyOrID{Keyed: true, Key: "h"},
Rows: []interface{}{
[]uint64{
1,
2,
},
},
{
Column: pilosa.KeyOrID{Keyed: true, Key: "xyzzy"},
Rows: []interface{}{
[]uint64{
2,
},
},
{
Column: pilosa.KeyOrID{Keyed: true, Key: "xyzzy"},
Rows: []interface{}{
[]uint64{
2,
},
},
{
Column: pilosa.KeyOrID{Keyed: true, Key: "plugh"},
Rows: []interface{}{
[]uint64{},
},
},
{
Column: pilosa.KeyOrID{Keyed: true, Key: "plugh"},
Rows: []interface{}{
[]uint64{},
},
},
},
}

if !reflect.DeepEqual(expect, resp.Results) {
t.Errorf("expected %v but got %v", expect, resp.Results)
if len(resp.Results) != 1 {
t.Fail()
}
res := resp.Results[0].(pilosa.ExtractedTable)
if !reflect.DeepEqual(expect.Fields, res.Fields) {
t.Errorf("expected:\n%v\nbut got:\n%v", expect, resp.Results)
}
assert.ElementsMatch(t, expect.Columns, res.Columns)
}

func TestExecutor_Execute_MaxMemory(t *testing.T) {
Expand Down Expand Up @@ -7429,6 +7432,7 @@ func backupCluster(t *testing.T, c *test.Cluster, index string) (backupDir strin

buf := &bytes.Buffer{}
backupLog := logger.NewStandardLogger(buf)

backupCommand := ctl.NewBackupCommand(backupLog)
backupCommand.Host = c.Nodes[len(c.Nodes)-1].URL() // don't pick node 0 so we don't always get primary (better code coverage)
backupCommand.Index = index
Expand Down
17 changes: 15 additions & 2 deletions http_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2790,8 +2790,15 @@ func (h *Handler) handleGetTranslateData(w http.ResponseWriter, r *http.Request)
http.Error(w, err.Error(), http.StatusNotFound)
return
}
tx, err := p.Begin(false)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
defer tx.Rollback()

// Stream translate data to response body.
if _, err := p.WriteTo(w); err != nil {
if _, err := tx.WriteTo(w); err != nil {
h.logger.Errorf("error streaming translation data: %s", err)
}
return
Expand All @@ -2816,8 +2823,14 @@ func (h *Handler) handleGetTranslateData(w http.ResponseWriter, r *http.Request)
http.Error(w, err.Error(), http.StatusNotFound)
return
}
tx, err := p.Begin(false)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
defer tx.Rollback()
// Stream translate partition to response body.
if _, err := p.WriteTo(w); err != nil {
if _, err := tx.WriteTo(w); err != nil {
h.logger.Errorf("error streaming translation data: %s", err)
}
}
Expand Down
14 changes: 11 additions & 3 deletions translate.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,7 @@ type TranslateStore interface { // TODO: refactor this interface; readonly shoul
// Returns a reader from the given ID offset.
EntryReader(ctx context.Context, offset uint64) (TranslateEntryReader, error)

// WriteTo ensures that the TranslateStore implements io.WriterTo.
// It should write the contents of the store to the writer.
WriteTo(io.Writer) (int64, error)
Begin(write bool) (TranslatorTx, error)

// ReadFrom ensures that the TranslateStore implements io.ReaderFrom.
// It should read from the reader and replace the data store with
Expand All @@ -88,6 +86,16 @@ type TranslateStore interface { // TODO: refactor this interface; readonly shoul
Delete(records *roaring.Bitmap) (Commitor, error)
}

// TranslatorTx reproduces a subset of the methods on the BoltDB Tx
// object. Others may be needed in the future and we should just add
// them here. The idea is not to scatter direct references to bolt
// stuff throughout the whole codebase.
type TranslatorTx interface {
WriteTo(io.Writer) (int64, error)
Rollback() error
// e.g. Commit() error
}

// OpenTranslateStoreFunc represents a function for instantiating and opening a TranslateStore.
type OpenTranslateStoreFunc func(path, index, field string, partitionID, partitionN int, fsyncEnabled bool) (TranslateStore, error)

Expand Down
11 changes: 3 additions & 8 deletions translate_boltdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -396,14 +396,9 @@ func (s *BoltTranslateStore) MaxID() (max uint64, err error) {
return max, nil
}

// WriteTo writes the contents of the store to the writer.
func (s *BoltTranslateStore) WriteTo(w io.Writer) (int64, error) {
tx, err := s.db.Begin(false)
if err != nil {
return 0, err
}
defer func() { _ = tx.Rollback() }()
return tx.WriteTo(w)
// Begin starts and returns a transaction on the underlying store.
func (s *BoltTranslateStore) Begin(write bool) (TranslatorTx, error) {
return s.db.Begin(write)
}

// ReadFrom reads the content and overwrites the existing store.
Expand Down
19 changes: 13 additions & 6 deletions translate_boltdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -458,12 +458,19 @@ func TestTranslateStore_ReadWrite(t *testing.T) {
buf := bytes.NewBuffer(nil)
expN := s.Size()

// After this, the buffer should contain batch0.
if n, err := s.WriteTo(buf); err != nil {
t.Fatalf("writing to buffer: %s", err)
} else if n != expN {
t.Fatalf("expected buffer size: %d, but got: %d", expN, n)
}
// wrap in a func so we can defer rollback. Need rollback to
// happen before the end of the test. I'm not entirely sure
// why, but it hangs if you don't.
func() {
tx, err := s.Begin(false)
require.NoError(t, err)
defer tx.Rollback()

// After this, the buffer should contain batch0.
n, err := tx.WriteTo(buf)
require.NoError(t, err)
require.Equal(t, expN, n)
}()

// Populate the store with the keys in batch1.
batch1IDs, err := s.CreateKeys(batch1...)
Expand Down

0 comments on commit dfd4bb1

Please sign in to comment.