Skip to content

Commit

Permalink
feat(dql): add @unique constraint support in schema for new predicates (
Browse files Browse the repository at this point in the history
#50)

cherry picked from commit
92c5b7a

Co-authored-by: Aman Mangal <aman@dgraph.io>
  • Loading branch information
shivaji-dgraph and mangalaman93 committed Oct 25, 2023
1 parent 7b1f473 commit 77d71a7
Show file tree
Hide file tree
Showing 12 changed files with 1,564 additions and 369 deletions.
4 changes: 2 additions & 2 deletions dgraph/cmd/live/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ type loader struct {
reqNum uint64
reqs chan *request
zeroconn *grpc.ClientConn
schema *schema
schema *Schema
namespaces map[uint64]struct{}

upsertLock sync.RWMutex
Expand Down Expand Up @@ -240,7 +240,7 @@ func createValueEdge(nq *api.NQuad, sid uint64) (*pb.DirectedEdge, error) {
return p, nil
}

func fingerprintEdge(t *pb.DirectedEdge, pred *predicate) uint64 {
func fingerprintEdge(t *pb.DirectedEdge, pred *Predicate) uint64 {
var id uint64 = math.MaxUint64

// Value with a lang type.
Expand Down
21 changes: 11 additions & 10 deletions dgraph/cmd/live/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ type options struct {
preserveNs bool
}

type predicate struct {
type Predicate struct {
Predicate string `json:"predicate,omitempty"`
Type string `json:"type,omitempty"`
Tokenizer []string `json:"tokenizer,omitempty"`
Expand All @@ -89,21 +89,22 @@ type predicate struct {
Upsert bool `json:"upsert,omitempty"`
Reverse bool `json:"reverse,omitempty"`
NoConflict bool `json:"no_conflict,omitempty"`
Unique bool `json:"unique,omitempty"`
ValueType types.TypeID
}

type schema struct {
Predicates []*predicate `json:"schema,omitempty"`
preds map[string]*predicate
type Schema struct {
Predicates []*Predicate `json:"schema,omitempty"`
preds map[string]*Predicate
}

type request struct {
*api.Mutation
conflicts []uint64
}

func (l *schema) init(ns uint64, galaxyOperation bool) {
l.preds = make(map[string]*predicate)
func (l *Schema) init(ns uint64, galaxyOperation bool) {
l.preds = make(map[string]*Predicate)
for _, i := range l.Predicates {
i.ValueType, _ = types.TypeForName(i.Type)
if !galaxyOperation {
Expand All @@ -115,7 +116,7 @@ func (l *schema) init(ns uint64, galaxyOperation bool) {

var (
opt options
sch schema
sch Schema

// Live is the sub-command invoked when running "dgraph live".
Live x.SubCommand
Expand Down Expand Up @@ -185,7 +186,7 @@ func init() {
"specific namespace. Setting it to negative value will preserve the namespace.")
}

func getSchema(ctx context.Context, dgraphClient *dgo.Dgraph, galaxyOperation bool) (*schema, error) {
func getSchema(ctx context.Context, dgraphClient *dgo.Dgraph, galaxyOperation bool) (*Schema, error) {
txn := dgraphClient.NewTxn()
defer func() {
if err := txn.Discard(ctx); err != nil {
Expand Down Expand Up @@ -500,7 +501,7 @@ func (l *loader) processLoadFile(ctx context.Context, rd *bufio.Reader, ck chunk
sort.Slice(buffer, func(i, j int) bool {
iPred := sch.preds[x.NamespaceAttr(buffer[i].Namespace, buffer[i].Predicate)]
jPred := sch.preds[x.NamespaceAttr(buffer[j].Namespace, buffer[j].Predicate)]
t := func(a *predicate) int {
t := func(a *Predicate) int {
if a != nil && a.Count {
return 1
}
Expand Down Expand Up @@ -683,7 +684,7 @@ func (l *loader) populateNamespaces(ctx context.Context, dc *dgo.Dgraph, singleN
return err
}

var sch schema
var sch Schema
err = json.Unmarshal(res.GetJson(), &sch)
if err != nil {
return err
Expand Down
14 changes: 14 additions & 0 deletions dgraphtest/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -646,6 +646,20 @@ func (gc *GrpcClient) Mutate(mu *api.Mutation) (*api.Response, error) {
return txn.Mutate(ctx, mu)
}

func (gc *GrpcClient) Upsert(query string, mu *api.Mutation) (*api.Response, error) {
txn := gc.NewTxn()
defer func() { _ = txn.Discard(context.Background()) }()

ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
defer cancel()
req := &api.Request{
Query: query,
Mutations: []*api.Mutation{mu},
CommitNow: true,
}
return txn.Do(ctx, req)
}

// Query performa a given query in a new txn
func (gc *GrpcClient) Query(query string) (*api.Response, error) {
txn := gc.NewTxn()
Expand Down
Loading

0 comments on commit 77d71a7

Please sign in to comment.