Skip to content

Commit

Permalink
feat(vector): fix live loader and add tests for dropall, drop namespa…
Browse files Browse the repository at this point in the history
…ce, live load (#9063)

Fix live loader, and float32vector typo in dql.
  • Loading branch information
shivaji-dgraph committed Apr 5, 2024
1 parent 5b69c8b commit 2aeef65
Show file tree
Hide file tree
Showing 14 changed files with 1,020 additions and 375 deletions.
1 change: 1 addition & 0 deletions chunker/rdf_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,7 @@ var typeMap = map[string]types.TypeID{
"xs:float": types.FloatID,
"xs:base64Binary": types.BinaryID,
"geo:geojson": types.GeoID,
"xs:[]float32": types.VFloatID,
"http://www.w3.org/2001/XMLSchema#string": types.StringID,
"http://www.w3.org/2001/XMLSchema#dateTime": types.DateTimeID,
"http://www.w3.org/2001/XMLSchema#date": types.DateTimeID,
Expand Down
4 changes: 2 additions & 2 deletions dgraphtest/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -528,7 +528,7 @@ loop2:
return errors.Errorf("restore wasn't started on at least 1 alpha")
}

func (hc *HTTPClient) Export(dest string, namespace int) error {
func (hc *HTTPClient) Export(dest, format string, namespace int) error {
const exportRequest = `mutation export($dest: String!, $f: String!, $ns: Int) {
export(input: {destination: $dest, format: $f, namespace: $ns}) {
response {
Expand All @@ -540,7 +540,7 @@ func (hc *HTTPClient) Export(dest string, namespace int) error {
Query: exportRequest,
Variables: map[string]interface{}{
"dest": dest,
"f": "rdf",
"f": format,
"ns": namespace,
},
}
Expand Down
10 changes: 8 additions & 2 deletions dgraphtest/load.go
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,7 @@ func (c *LocalCluster) LiveLoadFromExport(exportDir string) error {
}()

// .rdf.gz, .schema.gz,.gql_schema.gz
var rdfFiles, schemaFiles, gqlSchemaFiles []string
var rdfFiles, schemaFiles, gqlSchemaFiles, jsonFiles []string
tr := tar.NewReader(ts)
for {
header, err := tr.Next()
Expand All @@ -404,6 +404,8 @@ func (c *LocalCluster) LiveLoadFromExport(exportDir string) error {
switch {
case strings.HasSuffix(fileName, ".rdf.gz"):
rdfFiles = append(rdfFiles, hostFile)
case strings.HasSuffix(fileName, ".json.gz"):
jsonFiles = append(jsonFiles, hostFile)
case strings.HasSuffix(fileName, ".schema.gz"):
schemaFiles = append(schemaFiles, hostFile)
case strings.HasSuffix(fileName, ".gql_schema.gz"):
Expand Down Expand Up @@ -441,10 +443,14 @@ func (c *LocalCluster) LiveLoadFromExport(exportDir string) error {
}

opts := LiveOpts{
DataFiles: rdfFiles,
SchemaFiles: schemaFiles,
GqlSchemaFiles: gqlSchemaFiles,
}
if len(rdfFiles) == 0 {
opts.DataFiles = jsonFiles
} else {
opts.DataFiles = rdfFiles
}
if err := c.LiveLoad(opts); err != nil {
return errors.Wrapf(err, "error running live loader: %v", err)
}
Expand Down
22 changes: 21 additions & 1 deletion dgraphtest/local_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -666,7 +666,7 @@ func (c *LocalCluster) Upgrade(version string, strategy UpgradeStrategy) error {
}
}
// using -1 as namespace exports all the namespaces
if err := hc.Export(DefaultExportDir, -1); err != nil {
if err := hc.Export(DefaultExportDir, "rdf", -1); err != nil {
return errors.Wrap(err, "error taking export during upgrade")
}
if err := c.Stop(); err != nil {
Expand Down Expand Up @@ -747,6 +747,26 @@ func (c *LocalCluster) Client() (*GrpcClient, func(), error) {
return &GrpcClient{Dgraph: client}, cleanup, nil
}

func (c *LocalCluster) AlphaClient(id int) (*GrpcClient, func(), error) {
alpha := c.alphas[id]
url, err := alpha.alphaURL(c)
if err != nil {
return nil, nil, errors.Wrap(err, "error getting health URL")
}
conn, err := grpc.Dial(url, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
return nil, nil, errors.Wrap(err, "error connecting to alpha")
}

client := dgo.NewDgraphClient(api.NewDgraphClient(conn))
cleanup := func() {
if err := conn.Close(); err != nil {
log.Printf("[WARNING] error closing connection: %v", err)
}
}
return &GrpcClient{Dgraph: client}, cleanup, nil
}

// HTTPClient creates an HTTP client
func (c *LocalCluster) HTTPClient() (*HTTPClient, error) {
adminURL, err := c.serverURL("alpha", "/admin")
Expand Down
106 changes: 106 additions & 0 deletions dgraphtest/vector.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/*
* Copyright 2023 Dgraph Labs, Inc. and Contributors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package dgraphtest

import (
"encoding/json"
"fmt"
"math/rand"
"strings"

"github.com/dgraph-io/dgo/v230/protos/api"
)

func GenerateRandomVector(size int) []float32 {
vector := make([]float32, size)
for i := 0; i < size; i++ {
vector[i] = rand.Float32() * 10
}
return vector
}

func formatVector(label string, vector []float32, index int) string {
vectorString := fmt.Sprintf(`"[%s]"`, strings.Trim(strings.Join(strings.Fields(fmt.Sprint(vector)), ", "), "[]"))
return fmt.Sprintf("<0x%x> <%s> %s . \n", index+10, label, vectorString)
}

func GenerateRandomVectors(lowerLimit, uppermLimit, vectorSize int, label string) (string, [][]float32) {
var builder strings.Builder
var vectors [][]float32
// builder.WriteString("`")
for i := lowerLimit; i < uppermLimit; i++ {
randomVector := GenerateRandomVector(vectorSize)
vectors = append(vectors, randomVector)
formattedVector := formatVector(label, randomVector, i)
builder.WriteString(formattedVector)
}

return builder.String(), vectors
}

func (gc *GrpcClient) QueryMultipleVectorsUsingSimilarTo(vector []float32, pred string, topK int) ([][]float32, error) {
vectorQuery := fmt.Sprintf(`
{
vector(func: similar_to(%v, %v, "%v")) {
uid
%v
}
}`, pred, topK, vector, pred)
resp, err := gc.Query(vectorQuery)

if err != nil {
return [][]float32{}, err
}

return UnmarshalVectorResp(resp)
}

func (gc *GrpcClient) QuerySingleVectorsUsingUid(uid, pred string) ([][]float32, error) {
vectorQuery := fmt.Sprintf(`
{
vector(func: uid(%v)) {
uid
%v
}
}`, uid[1:len(uid)-1], pred)

resp, err := gc.Query(vectorQuery)
if err != nil {
return [][]float32{}, err
}

return UnmarshalVectorResp(resp)
}

func UnmarshalVectorResp(resp *api.Response) ([][]float32, error) {
type Data struct {
Vector []struct {
UID string `json:"uid"`
ProjectDescriptionV []float32 `json:"project_discription_v"`
} `json:"vector"`
}
var data Data
if err := json.Unmarshal(resp.Json, &data); err != nil {
return nil, err
}

var vectors [][]float32
for _, item := range data.Vector {
vectors = append(vectors, item.ProjectDescriptionV)
}
return vectors, nil
}
8 changes: 4 additions & 4 deletions dql/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,10 +357,10 @@ func parseValue(v varInfo) (types.Val, error) {
}, nil
}
}
case "vector32float":
case "float32vector":
{
if i, err := types.ParseVFloat(v.Value); err != nil {
return types.Val{}, errors.Wrapf(err, "Expected a vfloat but got %v", v.Value)
return types.Val{}, errors.Wrapf(err, "Expected a float32vector but got %v", v.Value)
} else {
return types.Val{
Tid: types.VFloatID,
Expand Down Expand Up @@ -415,10 +415,10 @@ func checkValueType(vm varMap) error {
return errors.Wrapf(err, "Expected a bool but got %v", v.Value)
}
}
case "vfloat":
case "float32vector":
{
if _, err := types.ParseVFloat(v.Value); err != nil {
return errors.Wrapf(err, "Expected a vfloat but got %v", v.Value)
return errors.Wrapf(err, "Expected a vector32float but got %v", v.Value)
}
}
case "string": // Value is a valid string. No checks required.
Expand Down
2 changes: 2 additions & 0 deletions protos/pb.proto
Original file line number Diff line number Diff line change
Expand Up @@ -483,6 +483,8 @@ message SchemaNode {
bool lang = 9;
bool no_conflict = 10;
bool unique = 11;
repeated VectorIndexSpec index_specs = 12;

}

message SchemaResult {
Expand Down
Loading

0 comments on commit 2aeef65

Please sign in to comment.