Skip to content

Commit

Permalink
feat(dql): add @unique constraint support in schema for new predicates (
Browse files Browse the repository at this point in the history
#8827)

Partially Fixes #8827
Closes: DGRAPHCORE-206
Docs PR: dgraph-io/dgraph-docs#638

This PR adds support for uniqueness constraint using @unique directive
in DQL schema. This unique directive ensures that all values of the
predicate are different in a Dgraph Cluster. This completes phase 1, and
enables adding a new predicate with unique directive. As part of the
phase 2, we will work on adding support for unique directive for
existing predicates.

## Performance
Live Loader before this change on 21 million dataset took 10m54s whereas
after this change took 11m02s. It did not make any significant different
to non-unique predicates.

## How to Use
You can now specify unique in schema as follows: `email: string @unique
@Index(hash) @upsert .`. Now, Dgraph will ensure that no mutation adds a
duplicate for the predicate email.

## Phase 2 [TODO]
- [ ] check if @unique can be added to schema depending upon whether
existing data has any duplicates. If the existing data has any
duplicates, we do not allow adding the @unique directive and return a
query that allows user to identify these UIDs.
- [ ] If index computation is in progress, we should not allow mutations
with predicates for which @unique is set
- [ ] Fix ACL to ensure that we do not end up adding duplicate users
- [ ] Ensure that unique constraint is not violated during Bulk loader

---------

Co-authored-by: Aman Mangal <aman@dgraph.io>
  • Loading branch information
shivaji-dgraph and mangalaman93 committed Aug 28, 2023
1 parent 9a964dd commit 92c5b7a
Show file tree
Hide file tree
Showing 12 changed files with 1,564 additions and 368 deletions.
4 changes: 2 additions & 2 deletions dgraph/cmd/live/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ type loader struct {
reqNum uint64
reqs chan *request
zeroconn *grpc.ClientConn
schema *schema
schema *Schema
namespaces map[uint64]struct{}

upsertLock sync.RWMutex
Expand Down Expand Up @@ -240,7 +240,7 @@ func createValueEdge(nq *api.NQuad, sid uint64) (*pb.DirectedEdge, error) {
return p, nil
}

func fingerprintEdge(t *pb.DirectedEdge, pred *predicate) uint64 {
func fingerprintEdge(t *pb.DirectedEdge, pred *Predicate) uint64 {
var id uint64 = math.MaxUint64

// Value with a lang type.
Expand Down
21 changes: 11 additions & 10 deletions dgraph/cmd/live/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ type options struct {
preserveNs bool
}

type predicate struct {
type Predicate struct {
Predicate string `json:"predicate,omitempty"`
Type string `json:"type,omitempty"`
Tokenizer []string `json:"tokenizer,omitempty"`
Expand All @@ -89,21 +89,22 @@ type predicate struct {
Upsert bool `json:"upsert,omitempty"`
Reverse bool `json:"reverse,omitempty"`
NoConflict bool `json:"no_conflict,omitempty"`
Unique bool `json:"unique,omitempty"`
ValueType types.TypeID
}

type schema struct {
Predicates []*predicate `json:"schema,omitempty"`
preds map[string]*predicate
type Schema struct {
Predicates []*Predicate `json:"schema,omitempty"`
preds map[string]*Predicate
}

type request struct {
*api.Mutation
conflicts []uint64
}

func (l *schema) init(ns uint64, galaxyOperation bool) {
l.preds = make(map[string]*predicate)
func (l *Schema) init(ns uint64, galaxyOperation bool) {
l.preds = make(map[string]*Predicate)
for _, i := range l.Predicates {
i.ValueType, _ = types.TypeForName(i.Type)
if !galaxyOperation {
Expand All @@ -115,7 +116,7 @@ func (l *schema) init(ns uint64, galaxyOperation bool) {

var (
opt options
sch schema
sch Schema

// Live is the sub-command invoked when running "dgraph live".
Live x.SubCommand
Expand Down Expand Up @@ -185,7 +186,7 @@ func init() {
"specific namespace. Setting it to negative value will preserve the namespace.")
}

func getSchema(ctx context.Context, dgraphClient *dgo.Dgraph, galaxyOperation bool) (*schema, error) {
func getSchema(ctx context.Context, dgraphClient *dgo.Dgraph, galaxyOperation bool) (*Schema, error) {
txn := dgraphClient.NewTxn()
defer func() {
if err := txn.Discard(ctx); err != nil {
Expand Down Expand Up @@ -500,7 +501,7 @@ func (l *loader) processLoadFile(ctx context.Context, rd *bufio.Reader, ck chunk
sort.Slice(buffer, func(i, j int) bool {
iPred := sch.preds[x.NamespaceAttr(buffer[i].Namespace, buffer[i].Predicate)]
jPred := sch.preds[x.NamespaceAttr(buffer[j].Namespace, buffer[j].Predicate)]
t := func(a *predicate) int {
t := func(a *Predicate) int {
if a != nil && a.Count {
return 1
}
Expand Down Expand Up @@ -683,7 +684,7 @@ func (l *loader) populateNamespaces(ctx context.Context, dc *dgo.Dgraph, singleN
return err
}

var sch schema
var sch Schema
err = json.Unmarshal(res.GetJson(), &sch)
if err != nil {
return err
Expand Down
14 changes: 14 additions & 0 deletions dgraphtest/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -646,6 +646,20 @@ func (gc *GrpcClient) Mutate(mu *api.Mutation) (*api.Response, error) {
return txn.Mutate(ctx, mu)
}

func (gc *GrpcClient) Upsert(query string, mu *api.Mutation) (*api.Response, error) {
txn := gc.NewTxn()
defer func() { _ = txn.Discard(context.Background()) }()

ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
defer cancel()
req := &api.Request{
Query: query,
Mutations: []*api.Mutation{mu},
CommitNow: true,
}
return txn.Do(ctx, req)
}

// Query performa a given query in a new txn
func (gc *GrpcClient) Query(query string) (*api.Response, error) {
txn := gc.NewTxn()
Expand Down
Loading

0 comments on commit 92c5b7a

Please sign in to comment.