diff --git a/Makefile b/Makefile index 130624e6d..fc0d26306 100644 --- a/Makefile +++ b/Makefile @@ -77,9 +77,9 @@ testvsub: echo; echo "999 done testing subpkg $$pkg"; \ done -# make a 2GB RAMDisk. Speed up tests by running them with RAMDISK=/mnt/ramfs +# make a 2GB RAMDisk. Speed up tests by running them with RAMDISK=/mnt/ramdisk ramdisk-linux: - mount -o size=2G -t tmpfs none /mnt/ramfs + mount -o size=2G -t tmpfs none /mnt/ramdisk # make a 2GB RAMDisk. Speed up tests by running them with RAMDISK=/Volumes/RAMDisk ramdisk-osx: diff --git a/api.go b/api.go index 4b2467eb2..4304ca4e8 100644 --- a/api.go +++ b/api.go @@ -967,7 +967,7 @@ func (r RedirectError) Error() string { } // TranslateData returns all translation data in the specified partition. -func (api *API) TranslateData(ctx context.Context, indexName string, partition int) (io.WriterTo, error) { +func (api *API) TranslateData(ctx context.Context, indexName string, partition int) (TranslateStore, error) { span, _ := tracing.StartSpanFromContext(ctx, "API.TranslateData") defer span.Finish() @@ -1022,7 +1022,7 @@ func (api *API) TranslateData(ctx context.Context, indexName string, partition i } // FieldTranslateData returns all translation data in the specified field. -func (api *API) FieldTranslateData(ctx context.Context, indexName, fieldName string) (io.WriterTo, error) { +func (api *API) FieldTranslateData(ctx context.Context, indexName, fieldName string) (TranslateStore, error) { span, _ := tracing.StartSpanFromContext(ctx, "API.FieldTranslateData") defer span.Finish() if err := api.validate(apiFieldTranslateData); err != nil { @@ -3136,18 +3136,22 @@ func (api *API) SnapshotTableKeys(ctx context.Context, req *dax.SnapshotTableKey qtid := req.TableKey.QualifiedTableID() // Create the snapshot for the current version. - wrTo, err := api.TranslateData(ctx, string(req.TableKey), int(req.PartitionNum)) + trans, err := api.TranslateData(ctx, string(req.TableKey), int(req.PartitionNum)) if err != nil { - return errors.Wrapf(err, "getting index/partition writeto: %s/%d", req.TableKey, req.PartitionNum) + return errors.Wrapf(err, "getting index/partition translate store: %s/%d", req.TableKey, req.PartitionNum) } + // get a write tx to ensure no other writes while incrementing WL version. + wrTo, err := trans.Begin(true) + if err != nil { + return errors.Wrap(err, "beginning table translate write tx") + } + defer wrTo.Rollback() - // TODO(jaffee) need to ensure writes to translation data can't - // occur while this is happening. resource := api.serverlessStorage.GetTableKeyResource(qtid, req.PartitionNum) if err := resource.IncrementWLVersion(); err != nil { return errors.Wrap(err, "incrementing write log version") } - // TODO(jaffee) downgrade (currently non-existent) lock to read-only + // TODO(jaffee) downgrade write tx to read-only err = resource.SnapshotTo(wrTo) return errors.Wrap(err, "snapshotting table keys") } @@ -3158,17 +3162,22 @@ func (api *API) SnapshotFieldKeys(ctx context.Context, req *dax.SnapshotFieldKey qtid := req.TableKey.QualifiedTableID() // Create the snapshot for the current version. - // TODO(jaffee) change this to get write lock - wrTo, err := api.FieldTranslateData(ctx, string(req.TableKey), string(req.Field)) + trans, err := api.FieldTranslateData(ctx, string(req.TableKey), string(req.Field)) if err != nil { return errors.Wrap(err, "getting index/field writeto") } + // get a write tx to ensure no other writes while incrementing WL version. + wrTo, err := trans.Begin(true) + if err != nil { + return errors.Wrap(err, "beginning field translate write tx") + } + defer wrTo.Rollback() resource := api.serverlessStorage.GetFieldKeyResource(qtid, req.Field) if err := resource.IncrementWLVersion(); err != nil { return errors.Wrap(err, "incrementing writelog version") } - // TODO(jaffee) downgrade to read lock + // TODO(jaffee) downgrade to read tx err = resource.SnapshotTo(wrTo) return errors.Wrap(err, "snapshotTo in FieldKeys") } diff --git a/executor_test.go b/executor_test.go index 2e38a6f98..33ea09ad9 100644 --- a/executor_test.go +++ b/executor_test.go @@ -5132,47 +5132,50 @@ func TestExecutor_Execute_Extract_Keyed(t *testing.T) { `) resp := c.Query(t, c.Idx(), `Extract(All(), Rows(set))`) - expect := []interface{}{ - pilosa.ExtractedTable{ - Fields: []pilosa.ExtractedTableField{ - { - Name: "set", - Type: "[]uint64", - }, + expect := pilosa.ExtractedTable{ + Fields: []pilosa.ExtractedTableField{ + { + Name: "set", + Type: "[]uint64", }, - // The order of these probably shouldn't matter, but currently depends indirectly on the - // index. - Columns: []pilosa.ExtractedTableColumn{ - { - Column: pilosa.KeyOrID{Keyed: true, Key: "h"}, - Rows: []interface{}{ - []uint64{ - 1, - 2, - }, + }, + // The order of these probably shouldn't matter, but currently depends indirectly on the + // index. + Columns: []pilosa.ExtractedTableColumn{ + { + Column: pilosa.KeyOrID{Keyed: true, Key: "h"}, + Rows: []interface{}{ + []uint64{ + 1, + 2, }, }, - { - Column: pilosa.KeyOrID{Keyed: true, Key: "xyzzy"}, - Rows: []interface{}{ - []uint64{ - 2, - }, + }, + { + Column: pilosa.KeyOrID{Keyed: true, Key: "xyzzy"}, + Rows: []interface{}{ + []uint64{ + 2, }, }, - { - Column: pilosa.KeyOrID{Keyed: true, Key: "plugh"}, - Rows: []interface{}{ - []uint64{}, - }, + }, + { + Column: pilosa.KeyOrID{Keyed: true, Key: "plugh"}, + Rows: []interface{}{ + []uint64{}, }, }, }, } - if !reflect.DeepEqual(expect, resp.Results) { - t.Errorf("expected %v but got %v", expect, resp.Results) + if len(resp.Results) != 1 { + t.Fail() } + res := resp.Results[0].(pilosa.ExtractedTable) + if !reflect.DeepEqual(expect.Fields, res.Fields) { + t.Errorf("expected:\n%v\nbut got:\n%v", expect, resp.Results) + } + assert.ElementsMatch(t, expect.Columns, res.Columns) } func TestExecutor_Execute_MaxMemory(t *testing.T) { @@ -7429,6 +7432,7 @@ func backupCluster(t *testing.T, c *test.Cluster, index string) (backupDir strin buf := &bytes.Buffer{} backupLog := logger.NewStandardLogger(buf) + backupCommand := ctl.NewBackupCommand(backupLog) backupCommand.Host = c.Nodes[len(c.Nodes)-1].URL() // don't pick node 0 so we don't always get primary (better code coverage) backupCommand.Index = index diff --git a/http_handler.go b/http_handler.go index ecdcb623d..49a87dba3 100644 --- a/http_handler.go +++ b/http_handler.go @@ -2790,8 +2790,15 @@ func (h *Handler) handleGetTranslateData(w http.ResponseWriter, r *http.Request) http.Error(w, err.Error(), http.StatusNotFound) return } + tx, err := p.Begin(false) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + defer tx.Rollback() + // Stream translate data to response body. - if _, err := p.WriteTo(w); err != nil { + if _, err := tx.WriteTo(w); err != nil { h.logger.Errorf("error streaming translation data: %s", err) } return @@ -2816,8 +2823,14 @@ func (h *Handler) handleGetTranslateData(w http.ResponseWriter, r *http.Request) http.Error(w, err.Error(), http.StatusNotFound) return } + tx, err := p.Begin(false) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + defer tx.Rollback() // Stream translate partition to response body. - if _, err := p.WriteTo(w); err != nil { + if _, err := tx.WriteTo(w); err != nil { h.logger.Errorf("error streaming translation data: %s", err) } } diff --git a/translate.go b/translate.go index 0e8dee7c0..852420abc 100644 --- a/translate.go +++ b/translate.go @@ -76,9 +76,7 @@ type TranslateStore interface { // TODO: refactor this interface; readonly shoul // Returns a reader from the given ID offset. EntryReader(ctx context.Context, offset uint64) (TranslateEntryReader, error) - // WriteTo ensures that the TranslateStore implements io.WriterTo. - // It should write the contents of the store to the writer. - WriteTo(io.Writer) (int64, error) + Begin(write bool) (TranslatorTx, error) // ReadFrom ensures that the TranslateStore implements io.ReaderFrom. // It should read from the reader and replace the data store with @@ -88,6 +86,16 @@ type TranslateStore interface { // TODO: refactor this interface; readonly shoul Delete(records *roaring.Bitmap) (Commitor, error) } +// TranslatorTx reproduces a subset of the methods on the BoltDB Tx +// object. Others may be needed in the future and we should just add +// them here. The idea is not to scatter direct references to bolt +// stuff throughout the whole codebase. +type TranslatorTx interface { + WriteTo(io.Writer) (int64, error) + Rollback() error + // e.g. Commit() error +} + // OpenTranslateStoreFunc represents a function for instantiating and opening a TranslateStore. type OpenTranslateStoreFunc func(path, index, field string, partitionID, partitionN int, fsyncEnabled bool) (TranslateStore, error) diff --git a/translate_boltdb.go b/translate_boltdb.go index 0ac386ef1..9c5e3d303 100644 --- a/translate_boltdb.go +++ b/translate_boltdb.go @@ -396,14 +396,9 @@ func (s *BoltTranslateStore) MaxID() (max uint64, err error) { return max, nil } -// WriteTo writes the contents of the store to the writer. -func (s *BoltTranslateStore) WriteTo(w io.Writer) (int64, error) { - tx, err := s.db.Begin(false) - if err != nil { - return 0, err - } - defer func() { _ = tx.Rollback() }() - return tx.WriteTo(w) +// Begin starts and returns a transaction on the underlying store. +func (s *BoltTranslateStore) Begin(write bool) (TranslatorTx, error) { + return s.db.Begin(write) } // ReadFrom reads the content and overwrites the existing store. diff --git a/translate_boltdb_test.go b/translate_boltdb_test.go index 0f0848f2e..f9aeac3c7 100644 --- a/translate_boltdb_test.go +++ b/translate_boltdb_test.go @@ -458,12 +458,19 @@ func TestTranslateStore_ReadWrite(t *testing.T) { buf := bytes.NewBuffer(nil) expN := s.Size() - // After this, the buffer should contain batch0. - if n, err := s.WriteTo(buf); err != nil { - t.Fatalf("writing to buffer: %s", err) - } else if n != expN { - t.Fatalf("expected buffer size: %d, but got: %d", expN, n) - } + // wrap in a func so we can defer rollback. Need rollback to + // happen before the end of the test. I'm not entirely sure + // why, but it hangs if you don't. + func() { + tx, err := s.Begin(false) + require.NoError(t, err) + defer tx.Rollback() + + // After this, the buffer should contain batch0. + n, err := tx.WriteTo(buf) + require.NoError(t, err) + require.Equal(t, expN, n) + }() // Populate the store with the keys in batch1. batch1IDs, err := s.CreateKeys(batch1...)