From dfd4bb11911e54bc233ecf54f5ae446a95067b11 Mon Sep 17 00:00:00 2001 From: Matthew Jaffee Date: Tue, 20 Dec 2022 07:45:32 -0600 Subject: [PATCH] Expose translation mvcc (#2381) * expose Transaction on TranslateStore for DAX Snapshotting * try to fix ramdisk nonsense apparently, we were running in either a shell env or docker env randomly, so this could sometimes pass and sometimes fail since the shell env had the ramdisk set up and docker didn't. Now we force to run in docker always and set up ramdisk explicitly. * debug ramdisk issue? * fix tests... and a buncha other stuff The executor test I modified failed when I changed DefaultPartitionN to 8, but just because stuff was out of order so I made it more robust. I edited some data gen stuff to make shorter lines because it was making grep results unusable. the actual fix is in translate_boltdb_test.go * clean up, fix code review feedback (cherry picked from commit 3693b9950ad2bb32c28b6cc7877817078063b58c) --- Makefile | 4 +-- api.go | 29 +++++++++++------- executor_test.go | 64 +++++++++++++++++++++------------------- http_handler.go | 17 +++++++++-- translate.go | 14 +++++++-- translate_boltdb.go | 11 ++----- translate_boltdb_test.go | 19 ++++++++---- 7 files changed, 97 insertions(+), 61 deletions(-) diff --git a/Makefile b/Makefile index 130624e6d..fc0d26306 100644 --- a/Makefile +++ b/Makefile @@ -77,9 +77,9 @@ testvsub: echo; echo "999 done testing subpkg $$pkg"; \ done -# make a 2GB RAMDisk. Speed up tests by running them with RAMDISK=/mnt/ramfs +# make a 2GB RAMDisk. Speed up tests by running them with RAMDISK=/mnt/ramdisk ramdisk-linux: - mount -o size=2G -t tmpfs none /mnt/ramfs + mount -o size=2G -t tmpfs none /mnt/ramdisk # make a 2GB RAMDisk. Speed up tests by running them with RAMDISK=/Volumes/RAMDisk ramdisk-osx: diff --git a/api.go b/api.go index 4b2467eb2..4304ca4e8 100644 --- a/api.go +++ b/api.go @@ -967,7 +967,7 @@ func (r RedirectError) Error() string { } // TranslateData returns all translation data in the specified partition. -func (api *API) TranslateData(ctx context.Context, indexName string, partition int) (io.WriterTo, error) { +func (api *API) TranslateData(ctx context.Context, indexName string, partition int) (TranslateStore, error) { span, _ := tracing.StartSpanFromContext(ctx, "API.TranslateData") defer span.Finish() @@ -1022,7 +1022,7 @@ func (api *API) TranslateData(ctx context.Context, indexName string, partition i } // FieldTranslateData returns all translation data in the specified field. -func (api *API) FieldTranslateData(ctx context.Context, indexName, fieldName string) (io.WriterTo, error) { +func (api *API) FieldTranslateData(ctx context.Context, indexName, fieldName string) (TranslateStore, error) { span, _ := tracing.StartSpanFromContext(ctx, "API.FieldTranslateData") defer span.Finish() if err := api.validate(apiFieldTranslateData); err != nil { @@ -3136,18 +3136,22 @@ func (api *API) SnapshotTableKeys(ctx context.Context, req *dax.SnapshotTableKey qtid := req.TableKey.QualifiedTableID() // Create the snapshot for the current version. - wrTo, err := api.TranslateData(ctx, string(req.TableKey), int(req.PartitionNum)) + trans, err := api.TranslateData(ctx, string(req.TableKey), int(req.PartitionNum)) if err != nil { - return errors.Wrapf(err, "getting index/partition writeto: %s/%d", req.TableKey, req.PartitionNum) + return errors.Wrapf(err, "getting index/partition translate store: %s/%d", req.TableKey, req.PartitionNum) } + // get a write tx to ensure no other writes while incrementing WL version. + wrTo, err := trans.Begin(true) + if err != nil { + return errors.Wrap(err, "beginning table translate write tx") + } + defer wrTo.Rollback() - // TODO(jaffee) need to ensure writes to translation data can't - // occur while this is happening. resource := api.serverlessStorage.GetTableKeyResource(qtid, req.PartitionNum) if err := resource.IncrementWLVersion(); err != nil { return errors.Wrap(err, "incrementing write log version") } - // TODO(jaffee) downgrade (currently non-existent) lock to read-only + // TODO(jaffee) downgrade write tx to read-only err = resource.SnapshotTo(wrTo) return errors.Wrap(err, "snapshotting table keys") } @@ -3158,17 +3162,22 @@ func (api *API) SnapshotFieldKeys(ctx context.Context, req *dax.SnapshotFieldKey qtid := req.TableKey.QualifiedTableID() // Create the snapshot for the current version. - // TODO(jaffee) change this to get write lock - wrTo, err := api.FieldTranslateData(ctx, string(req.TableKey), string(req.Field)) + trans, err := api.FieldTranslateData(ctx, string(req.TableKey), string(req.Field)) if err != nil { return errors.Wrap(err, "getting index/field writeto") } + // get a write tx to ensure no other writes while incrementing WL version. + wrTo, err := trans.Begin(true) + if err != nil { + return errors.Wrap(err, "beginning field translate write tx") + } + defer wrTo.Rollback() resource := api.serverlessStorage.GetFieldKeyResource(qtid, req.Field) if err := resource.IncrementWLVersion(); err != nil { return errors.Wrap(err, "incrementing writelog version") } - // TODO(jaffee) downgrade to read lock + // TODO(jaffee) downgrade to read tx err = resource.SnapshotTo(wrTo) return errors.Wrap(err, "snapshotTo in FieldKeys") } diff --git a/executor_test.go b/executor_test.go index 2e38a6f98..33ea09ad9 100644 --- a/executor_test.go +++ b/executor_test.go @@ -5132,47 +5132,50 @@ func TestExecutor_Execute_Extract_Keyed(t *testing.T) { `) resp := c.Query(t, c.Idx(), `Extract(All(), Rows(set))`) - expect := []interface{}{ - pilosa.ExtractedTable{ - Fields: []pilosa.ExtractedTableField{ - { - Name: "set", - Type: "[]uint64", - }, + expect := pilosa.ExtractedTable{ + Fields: []pilosa.ExtractedTableField{ + { + Name: "set", + Type: "[]uint64", }, - // The order of these probably shouldn't matter, but currently depends indirectly on the - // index. - Columns: []pilosa.ExtractedTableColumn{ - { - Column: pilosa.KeyOrID{Keyed: true, Key: "h"}, - Rows: []interface{}{ - []uint64{ - 1, - 2, - }, + }, + // The order of these probably shouldn't matter, but currently depends indirectly on the + // index. + Columns: []pilosa.ExtractedTableColumn{ + { + Column: pilosa.KeyOrID{Keyed: true, Key: "h"}, + Rows: []interface{}{ + []uint64{ + 1, + 2, }, }, - { - Column: pilosa.KeyOrID{Keyed: true, Key: "xyzzy"}, - Rows: []interface{}{ - []uint64{ - 2, - }, + }, + { + Column: pilosa.KeyOrID{Keyed: true, Key: "xyzzy"}, + Rows: []interface{}{ + []uint64{ + 2, }, }, - { - Column: pilosa.KeyOrID{Keyed: true, Key: "plugh"}, - Rows: []interface{}{ - []uint64{}, - }, + }, + { + Column: pilosa.KeyOrID{Keyed: true, Key: "plugh"}, + Rows: []interface{}{ + []uint64{}, }, }, }, } - if !reflect.DeepEqual(expect, resp.Results) { - t.Errorf("expected %v but got %v", expect, resp.Results) + if len(resp.Results) != 1 { + t.Fail() } + res := resp.Results[0].(pilosa.ExtractedTable) + if !reflect.DeepEqual(expect.Fields, res.Fields) { + t.Errorf("expected:\n%v\nbut got:\n%v", expect, resp.Results) + } + assert.ElementsMatch(t, expect.Columns, res.Columns) } func TestExecutor_Execute_MaxMemory(t *testing.T) { @@ -7429,6 +7432,7 @@ func backupCluster(t *testing.T, c *test.Cluster, index string) (backupDir strin buf := &bytes.Buffer{} backupLog := logger.NewStandardLogger(buf) + backupCommand := ctl.NewBackupCommand(backupLog) backupCommand.Host = c.Nodes[len(c.Nodes)-1].URL() // don't pick node 0 so we don't always get primary (better code coverage) backupCommand.Index = index diff --git a/http_handler.go b/http_handler.go index ecdcb623d..49a87dba3 100644 --- a/http_handler.go +++ b/http_handler.go @@ -2790,8 +2790,15 @@ func (h *Handler) handleGetTranslateData(w http.ResponseWriter, r *http.Request) http.Error(w, err.Error(), http.StatusNotFound) return } + tx, err := p.Begin(false) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + defer tx.Rollback() + // Stream translate data to response body. - if _, err := p.WriteTo(w); err != nil { + if _, err := tx.WriteTo(w); err != nil { h.logger.Errorf("error streaming translation data: %s", err) } return @@ -2816,8 +2823,14 @@ func (h *Handler) handleGetTranslateData(w http.ResponseWriter, r *http.Request) http.Error(w, err.Error(), http.StatusNotFound) return } + tx, err := p.Begin(false) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + defer tx.Rollback() // Stream translate partition to response body. - if _, err := p.WriteTo(w); err != nil { + if _, err := tx.WriteTo(w); err != nil { h.logger.Errorf("error streaming translation data: %s", err) } } diff --git a/translate.go b/translate.go index 0e8dee7c0..852420abc 100644 --- a/translate.go +++ b/translate.go @@ -76,9 +76,7 @@ type TranslateStore interface { // TODO: refactor this interface; readonly shoul // Returns a reader from the given ID offset. EntryReader(ctx context.Context, offset uint64) (TranslateEntryReader, error) - // WriteTo ensures that the TranslateStore implements io.WriterTo. - // It should write the contents of the store to the writer. - WriteTo(io.Writer) (int64, error) + Begin(write bool) (TranslatorTx, error) // ReadFrom ensures that the TranslateStore implements io.ReaderFrom. // It should read from the reader and replace the data store with @@ -88,6 +86,16 @@ type TranslateStore interface { // TODO: refactor this interface; readonly shoul Delete(records *roaring.Bitmap) (Commitor, error) } +// TranslatorTx reproduces a subset of the methods on the BoltDB Tx +// object. Others may be needed in the future and we should just add +// them here. The idea is not to scatter direct references to bolt +// stuff throughout the whole codebase. +type TranslatorTx interface { + WriteTo(io.Writer) (int64, error) + Rollback() error + // e.g. Commit() error +} + // OpenTranslateStoreFunc represents a function for instantiating and opening a TranslateStore. type OpenTranslateStoreFunc func(path, index, field string, partitionID, partitionN int, fsyncEnabled bool) (TranslateStore, error) diff --git a/translate_boltdb.go b/translate_boltdb.go index 0ac386ef1..9c5e3d303 100644 --- a/translate_boltdb.go +++ b/translate_boltdb.go @@ -396,14 +396,9 @@ func (s *BoltTranslateStore) MaxID() (max uint64, err error) { return max, nil } -// WriteTo writes the contents of the store to the writer. -func (s *BoltTranslateStore) WriteTo(w io.Writer) (int64, error) { - tx, err := s.db.Begin(false) - if err != nil { - return 0, err - } - defer func() { _ = tx.Rollback() }() - return tx.WriteTo(w) +// Begin starts and returns a transaction on the underlying store. +func (s *BoltTranslateStore) Begin(write bool) (TranslatorTx, error) { + return s.db.Begin(write) } // ReadFrom reads the content and overwrites the existing store. diff --git a/translate_boltdb_test.go b/translate_boltdb_test.go index 0f0848f2e..f9aeac3c7 100644 --- a/translate_boltdb_test.go +++ b/translate_boltdb_test.go @@ -458,12 +458,19 @@ func TestTranslateStore_ReadWrite(t *testing.T) { buf := bytes.NewBuffer(nil) expN := s.Size() - // After this, the buffer should contain batch0. - if n, err := s.WriteTo(buf); err != nil { - t.Fatalf("writing to buffer: %s", err) - } else if n != expN { - t.Fatalf("expected buffer size: %d, but got: %d", expN, n) - } + // wrap in a func so we can defer rollback. Need rollback to + // happen before the end of the test. I'm not entirely sure + // why, but it hangs if you don't. + func() { + tx, err := s.Begin(false) + require.NoError(t, err) + defer tx.Rollback() + + // After this, the buffer should contain batch0. + n, err := tx.WriteTo(buf) + require.NoError(t, err) + require.Equal(t, expN, n) + }() // Populate the store with the keys in batch1. batch1IDs, err := s.CreateKeys(batch1...)