From 5451b77fee962726bb3991e8a507ffd8f4f7440a Mon Sep 17 00:00:00 2001 From: ShivajiKharse <115525374+shivaji-dgraph@users.noreply.github.com> Date: Wed, 8 May 2024 17:11:09 +0530 Subject: [PATCH] fix(restore): fix incr restore and normal restore for vector predicates (#9078) --- systest/vector/backup_test.go | 237 ++++++++++++++++++++++++++++++++++ worker/backup_ee.go | 23 ++++ worker/restore_map.go | 10 ++ 3 files changed, 270 insertions(+) create mode 100644 systest/vector/backup_test.go diff --git a/systest/vector/backup_test.go b/systest/vector/backup_test.go new file mode 100644 index 00000000000..f11eb8b75ff --- /dev/null +++ b/systest/vector/backup_test.go @@ -0,0 +1,237 @@ +//go:build !oss && integration + +/* + * Copyright 2023 Dgraph Labs, Inc. and Contributors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package main + +import ( + "context" + "fmt" + "slices" + "strings" + "testing" + "time" + + "github.com/dgraph-io/dgo/v230/protos/api" + "github.com/dgraph-io/dgraph/dgraphtest" + "github.com/dgraph-io/dgraph/x" + "github.com/stretchr/testify/require" +) + +func TestVectorIncrBackupRestore(t *testing.T) { + conf := dgraphtest.NewClusterConfig().WithNumAlphas(1).WithNumZeros(1).WithReplicas(1).WithACL(time.Hour) + c, err := dgraphtest.NewLocalCluster(conf) + require.NoError(t, err) + defer func() { c.Cleanup(t.Failed()) }() + require.NoError(t, c.Start()) + + gc, cleanup, err := c.Client() + require.NoError(t, err) + defer cleanup() + require.NoError(t, gc.LoginIntoNamespace(context.Background(), + dgraphtest.DefaultUser, dgraphtest.DefaultPassword, x.GalaxyNamespace)) + + hc, err := c.HTTPClient() + require.NoError(t, err) + require.NoError(t, hc.LoginIntoNamespace(dgraphtest.DefaultUser, + dgraphtest.DefaultPassword, x.GalaxyNamespace)) + + require.NoError(t, gc.SetupSchema(testSchema)) + + numVectors := 500 + pred := "project_discription_v" + allVectors := make([][][]float32, 0, 5) + allRdfs := make([]string, 0, 5) + for i := 1; i <= 5; i++ { + var rdfs string + var vectors [][]float32 + rdfs, vectors = dgraphtest.GenerateRandomVectors(numVectors*(i-1), numVectors*i, 1, pred) + allVectors = append(allVectors, vectors) + allRdfs = append(allRdfs, rdfs) + mu := &api.Mutation{SetNquads: []byte(rdfs), CommitNow: true} + _, err := gc.Mutate(mu) + require.NoError(t, err) + + t.Logf("taking backup #%v\n", i) + require.NoError(t, hc.Backup(c, i == 1, dgraphtest.DefaultBackupDir)) + } + + for i := 1; i <= 5; i++ { + t.Logf("restoring backup #%v\n", i) + + incrFrom := i - 1 + require.NoError(t, hc.Restore(c, dgraphtest.DefaultBackupDir, "", incrFrom, i)) + require.NoError(t, dgraphtest.WaitForRestore(c)) + query := `{ + vector(func: has(project_discription_v)) { + count(uid) + } + }` + result, err := gc.Query(query) + require.NoError(t, err) + + require.JSONEq(t, fmt.Sprintf(`{"vector":[{"count":%v}]}`, numVectors*i), string(result.GetJson())) + var allSpredVec [][]float32 + for i, vecArr := range allVectors { + if i <= i { + allSpredVec = append(allSpredVec, vecArr...) + } + } + for p, vector := range allVectors[i-1] { + triple := strings.Split(allRdfs[i-1], "\n")[p] + uid := strings.Split(triple, " ")[0] + queriedVector, err := gc.QuerySingleVectorsUsingUid(uid, pred) + require.NoError(t, err) + + require.Equal(t, allVectors[i-1][p], queriedVector[0]) + + similarVectors, err := gc.QueryMultipleVectorsUsingSimilarTo(vector, pred, numVectors) + require.NoError(t, err) + require.GreaterOrEqual(t, len(similarVectors), 10) + for _, similarVector := range similarVectors { + require.Contains(t, allSpredVec, similarVector) + } + } + } +} + +func TestVectorBackupRestore(t *testing.T) { + conf := dgraphtest.NewClusterConfig().WithNumAlphas(1).WithNumZeros(1).WithReplicas(1).WithACL(time.Hour) + c, err := dgraphtest.NewLocalCluster(conf) + require.NoError(t, err) + defer func() { c.Cleanup(t.Failed()) }() + require.NoError(t, c.Start()) + + gc, cleanup, err := c.Client() + require.NoError(t, err) + defer cleanup() + require.NoError(t, gc.LoginIntoNamespace(context.Background(), + dgraphtest.DefaultUser, dgraphtest.DefaultPassword, x.GalaxyNamespace)) + + hc, err := c.HTTPClient() + require.NoError(t, err) + require.NoError(t, hc.LoginIntoNamespace(dgraphtest.DefaultUser, + dgraphtest.DefaultPassword, x.GalaxyNamespace)) + + require.NoError(t, gc.SetupSchema(testSchema)) + + numVectors := 1000 + pred := "project_discription_v" + rdfs, vectors := dgraphtest.GenerateRandomVectors(0, numVectors, 10, pred) + + mu := &api.Mutation{SetNquads: []byte(rdfs), CommitNow: true} + _, err = gc.Mutate(mu) + require.NoError(t, err) + + t.Log("taking backup \n") + require.NoError(t, hc.Backup(c, false, dgraphtest.DefaultBackupDir)) + + t.Log("restoring backup \n") + require.NoError(t, hc.Restore(c, dgraphtest.DefaultBackupDir, "", 0, 0)) + require.NoError(t, dgraphtest.WaitForRestore(c)) + + testVectorQuery(t, gc, vectors, rdfs, pred, numVectors) +} + +func TestVectorBackupRestoreDropIndex(t *testing.T) { + // setup cluster + conf := dgraphtest.NewClusterConfig().WithNumAlphas(1).WithNumZeros(1).WithReplicas(1).WithACL(time.Hour) + c, err := dgraphtest.NewLocalCluster(conf) + require.NoError(t, err) + defer func() { c.Cleanup(t.Failed()) }() + require.NoError(t, c.Start()) + + gc, cleanup, err := c.Client() + require.NoError(t, err) + defer cleanup() + require.NoError(t, gc.LoginIntoNamespace(context.Background(), + dgraphtest.DefaultUser, dgraphtest.DefaultPassword, x.GalaxyNamespace)) + + hc, err := c.HTTPClient() + require.NoError(t, err) + require.NoError(t, hc.LoginIntoNamespace(dgraphtest.DefaultUser, + dgraphtest.DefaultPassword, x.GalaxyNamespace)) + + // add vector predicate + index + require.NoError(t, gc.SetupSchema(testSchema)) + // add data to the vector predicate + numVectors := 3 + pred := "project_discription_v" + rdfs, vectors := dgraphtest.GenerateRandomVectors(0, numVectors, 1, pred) + mu := &api.Mutation{SetNquads: []byte(rdfs), CommitNow: true} + _, err = gc.Mutate(mu) + require.NoError(t, err) + + t.Log("taking full backup \n") + require.NoError(t, hc.Backup(c, false, dgraphtest.DefaultBackupDir)) + + // drop index + require.NoError(t, gc.SetupSchema(testSchemaWithoutIndex)) + + // add more data to the vector predicate + rdfs, vectors2 := dgraphtest.GenerateRandomVectors(3, numVectors+3, 1, pred) + mu = &api.Mutation{SetNquads: []byte(rdfs), CommitNow: true} + _, err = gc.Mutate(mu) + require.NoError(t, err) + + // delete some entries + mu = &api.Mutation{DelNquads: []byte(strings.Split(rdfs, "\n")[1]), CommitNow: true} + _, err = gc.Mutate(mu) + require.NoError(t, err) + + vectors2 = slices.Delete(vectors2, 1, 2) + + mu = &api.Mutation{DelNquads: []byte(strings.Split(rdfs, "\n")[0]), CommitNow: true} + _, err = gc.Mutate(mu) + require.NoError(t, err) + vectors2 = slices.Delete(vectors2, 0, 1) + + t.Log("taking first incr backup \n") + require.NoError(t, hc.Backup(c, false, dgraphtest.DefaultBackupDir)) + + // add index + require.NoError(t, gc.SetupSchema(testSchema)) + + t.Log("taking second incr backup \n") + require.NoError(t, hc.Backup(c, false, dgraphtest.DefaultBackupDir)) + + // restore backup + t.Log("restoring backup \n") + require.NoError(t, hc.Restore(c, dgraphtest.DefaultBackupDir, "", 0, 0)) + require.NoError(t, dgraphtest.WaitForRestore(c)) + + query := ` { + vectors(func: has(project_discription_v)) { + count(uid) + } + }` + resp, err := gc.Query(query) + require.NoError(t, err) + require.JSONEq(t, `{"vectors":[{"count":4}]}`, string(resp.GetJson())) + + require.NoError(t, err) + allVec := append(vectors, vectors2...) + + for _, vector := range allVec { + + similarVectors, err := gc.QueryMultipleVectorsUsingSimilarTo(vector, pred, 4) + require.NoError(t, err) + for _, similarVector := range similarVectors { + require.Contains(t, allVec, similarVector) + } + } +} diff --git a/worker/backup_ee.go b/worker/backup_ee.go index 31b0156faf6..dc77652dba5 100644 --- a/worker/backup_ee.go +++ b/worker/backup_ee.go @@ -37,6 +37,7 @@ import ( "github.com/dgraph-io/dgraph/ee/enc" "github.com/dgraph-io/dgraph/posting" "github.com/dgraph-io/dgraph/protos/pb" + "github.com/dgraph-io/dgraph/tok/hnsw" "github.com/dgraph-io/dgraph/x" "github.com/dgraph-io/ristretto/z" ) @@ -194,6 +195,28 @@ func ProcessBackupRequest(ctx context.Context, req *pb.BackupRequest) error { for pred := range group.Tablets { predMap[gid] = append(predMap[gid], pred) } + + } + + // see if any of the predicates are vector predicates and add the supporting + // vector predicates to the backup request. + vecPredMap := make(map[uint32][]string) + for gid, preds := range predMap { + schema, err := GetSchemaOverNetwork(ctx, &pb.SchemaRequest{Predicates: preds}) + if err != nil { + return err + } + + for _, pred := range schema { + if pred.Type == "float32vector" && len(pred.IndexSpecs) != 0 { + vecPredMap[gid] = append(predMap[gid], pred.Predicate+hnsw.VecEntry, pred.Predicate+hnsw.VecKeyword, + pred.Predicate+hnsw.VecDead) + } + } + } + + for gid, preds := range vecPredMap { + predMap[gid] = append(predMap[gid], preds...) } glog.Infof( diff --git a/worker/restore_map.go b/worker/restore_map.go index 4a962f18fac..5829bf03bb0 100644 --- a/worker/restore_map.go +++ b/worker/restore_map.go @@ -27,6 +27,7 @@ import ( "os" "path/filepath" "strconv" + "strings" "sync" "sync/atomic" "time" @@ -44,6 +45,7 @@ import ( "github.com/dgraph-io/dgraph/ee/enc" "github.com/dgraph-io/dgraph/posting" "github.com/dgraph-io/dgraph/protos/pb" + "github.com/dgraph-io/dgraph/tok/hnsw" "github.com/dgraph-io/dgraph/x" "github.com/dgraph-io/ristretto/z" ) @@ -470,6 +472,7 @@ func (m *mapper) processReqCh(ctx context.Context) error { } return nil } + // We changed the format of predicate in 2103 and 2105. SchemaUpdate and TypeUpdate have // predicate stored within them, so they also need to be updated accordingly. switch in.version { @@ -488,6 +491,13 @@ func (m *mapper) processReqCh(ctx context.Context) error { default: // for manifest versions >= 2015, do nothing. } + + // If the predicate is a vector indexing predicate, skip further processing. + // currently we don't store vector supporting predicates in the schema. + if strings.HasSuffix(parsedKey.Attr, hnsw.VecEntry) || strings.HasSuffix(parsedKey.Attr, hnsw.VecKeyword) || + strings.HasSuffix(parsedKey.Attr, hnsw.VecDead) { + return nil + } // Reset the StreamId to prevent ordering issues while writing to stream writer. kv.StreamId = 0 // Schema and type keys are not stored in an intermediate format so their