Skip to content

Commit

Permalink
Merge branch 'master' of github.com:google/badwolf
Browse files Browse the repository at this point in the history
  • Loading branch information
xllora committed Jan 8, 2017
2 parents 3a20b36 + a988658 commit 6a0bd91
Show file tree
Hide file tree
Showing 9 changed files with 247 additions and 94 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# BadWolf

[![Build Status](https://travis-ci.org/google/badwolf.svg?branch=master)](https://travis-ci.org/google/badwolf) [![GoDoc](https://godoc.org/github.com/google/badwolf?status.svg)](https://godoc.org/github.com/google/badwolf)
[![Build Status](https://travis-ci.org/google/badwolf.svg?branch=master)](https://travis-ci.org/google/badwolf) [![Go Report Card](https://goreportcard.com/badge/github.com/google/badwolf)](https://goreportcard.com/report/github.com/google/badwolf) [![GoDoc](https://godoc.org/github.com/google/badwolf?status.svg)](https://godoc.org/github.com/google/badwolf)

BadWolf is a temporal graph store loosely modeled after the concepts introduced
by the
Expand Down
135 changes: 108 additions & 27 deletions bql/planner/planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,12 @@ import (
"bytes"
"errors"
"fmt"
"io"
"reflect"
"strconv"
"strings"
"sync"
"time"

"golang.org/x/net/context"

Expand All @@ -43,11 +46,28 @@ type Executor interface {
String() string
}

// trace attemps to write a trace if a valid writer is provided. The
// tracer is lazy on the string generation to avoid adding too much
// overhead when tracing ins not on.
func trace(w io.Writer, msgs func() []string) {
if w == nil {
return
}
for _, msg := range msgs() {
w.Write([]byte("["))
w.Write([]byte(time.Now().String()))
w.Write([]byte("] "))
w.Write([]byte(msg))
w.Write([]byte("\n"))
}
}

// createPlan encapsulates the sequence of instructions that need to be
// executed in order to satisfy the execution of a valid create BQL statement.
type createPlan struct {
stm *semantic.Statement
store storage.Store
stm *semantic.Statement
store storage.Store
tracer io.Writer
}

// Execute creates the indicated graphs.
Expand All @@ -58,6 +78,9 @@ func (p *createPlan) Execute(ctx context.Context) (*table.Table, error) {
}
errs := []string{}
for _, g := range p.stm.GraphNames() {
trace(p.tracer, func() []string {
return []string{"Creating new graph \"" + g + "\""}
})
if _, err := p.store.NewGraph(ctx, g); err != nil {
errs = append(errs, err.Error())
}
Expand All @@ -76,8 +99,9 @@ func (p *createPlan) String() string {
// dropPlan encapsulates the sequence of instructions that need to be
// executed in order to satisfy the execution of a valid drop BQL statement.
type dropPlan struct {
stm *semantic.Statement
store storage.Store
stm *semantic.Statement
store storage.Store
tracer io.Writer
}

// Execute drops the indicated graphs.
Expand All @@ -88,6 +112,9 @@ func (p *dropPlan) Execute(ctx context.Context) (*table.Table, error) {
}
errs := []string{}
for _, g := range p.stm.GraphNames() {
trace(p.tracer, func() []string {
return []string{"Deleting graph \"" + g + "\""}
})
if err := p.store.DeleteGraph(ctx, g); err != nil {
errs = append(errs, err.Error())
}
Expand All @@ -106,8 +133,9 @@ func (p *dropPlan) String() string {
// insertPlan encapsulates the sequence of instructions that need to be
// executed in order to satisfy the execution of a valid insert BQL statement.
type insertPlan struct {
stm *semantic.Statement
store storage.Store
stm *semantic.Statement
store storage.Store
tracer io.Writer
}

type updater func(storage.Graph, []*triple.Triple) error
Expand Down Expand Up @@ -153,6 +181,9 @@ func (p *insertPlan) Execute(ctx context.Context) (*table.Table, error) {
return nil, err
}
return t, update(ctx, p.stm, p.store, func(g storage.Graph, d []*triple.Triple) error {
trace(p.tracer, func() []string {
return []string{"Inserting triples to graph \"" + g.ID(ctx) + "\""}
})
return g.AddTriples(ctx, d)
})
}
Expand All @@ -175,8 +206,9 @@ func (p *insertPlan) String() string {
// deletePlan encapsulates the sequence of instructions that need to be
// executed in order to satisfy the execution of a valid delete BQL statement.
type deletePlan struct {
stm *semantic.Statement
store storage.Store
stm *semantic.Statement
store storage.Store
tracer io.Writer
}

// Execute deletes the provided data into the indicated graphs.
Expand All @@ -186,6 +218,9 @@ func (p *deletePlan) Execute(ctx context.Context) (*table.Table, error) {
return nil, err
}
return t, update(ctx, p.stm, p.store, func(g storage.Graph, d []*triple.Triple) error {
trace(p.tracer, func() []string {
return []string{"Removing triples from graph \"" + g.ID(ctx) + "\""}
})
return g.RemoveTriples(ctx, d)
})
}
Expand Down Expand Up @@ -218,10 +253,11 @@ type queryPlan struct {
cls []*semantic.GraphClause
tbl *table.Table
chanSize int
tracer io.Writer
}

// newQueryPlan returns a new query plan ready to be executed.
func newQueryPlan(ctx context.Context, store storage.Store, stm *semantic.Statement, chanSize int) (*queryPlan, error) {
func newQueryPlan(ctx context.Context, store storage.Store, stm *semantic.Statement, chanSize int, w io.Writer) (*queryPlan, error) {
bs := []string{}
for _, b := range stm.Bindings() {
bs = append(bs, b)
Expand All @@ -230,20 +266,16 @@ func newQueryPlan(ctx context.Context, store storage.Store, stm *semantic.Statem
if err != nil {
return nil, err
}
qp := &queryPlan{
return &queryPlan{
stm: stm,
store: store,
bndgs: bs,
grfsNames: stm.GraphNames(),
cls: stm.SortedGraphPatternClauses(),
tbl: t,
chanSize: chanSize,
}
if err := stm.Init(ctx, store); err != nil {
return nil, err
}
qp.grfs = stm.Graphs()
return qp, nil
tracer: w,
}, nil
}

// processClause retrieves the triples for the provided triple given the
Expand Down Expand Up @@ -503,6 +535,9 @@ func (p *queryPlan) filterOnExistence(ctx context.Context, cls *semantic.GraphCl
// data from the specified graphs.
func (p *queryPlan) processGraphPattern(ctx context.Context, lo *storage.LookupOptions) error {
for _, cls := range p.cls {
trace(p.tracer, func() []string {
return []string{"Processing graph clause " + cls.String()}
})
// The current planner is based on naively executing clauses by
// specificity.
unresolvable, err := p.processClause(ctx, cls, lo)
Expand All @@ -522,15 +557,24 @@ func (p *queryPlan) processGraphPattern(ctx context.Context, lo *storage.LookupO
func (p *queryPlan) projectAndGroupBy() error {
grp := p.stm.GroupByBindings()
if len(grp) == 0 { // The table only needs to be projected.
trace(p.tracer, func() []string {
return []string{fmt.Sprintf("Running projection for %v", grp)}
})
p.tbl.AddBindings(p.stm.OutputBindings())
// For each row, copy each input binding value to its appropriate alias.
for _, prj := range p.stm.Projections() {
for _, row := range p.tbl.Rows() {
row[prj.Alias] = row[prj.Binding]
}
}
trace(p.tracer, func() []string {
return []string{fmt.Sprintf("Output bindings projected %v", p.stm.OutputBindings())}
})
return p.tbl.ProjectBindings(p.stm.OutputBindings())
}
trace(p.tracer, func() []string {
return []string{"Starting roup reduce and projection"}
})
// The table needs to be group reduced.
// Project only binding involved in the group operation.
tmpBindings := []string{}
Expand All @@ -539,6 +583,9 @@ func (p *queryPlan) projectAndGroupBy() error {
cfg := table.SortConfig{}
aaps := []table.AliasAccPair{}
for _, prj := range p.stm.Projections() {
trace(p.tracer, func() []string {
return []string{"Analysing projection " + prj.String()}
})
// Only include used incoming bindings.
tmpBindings = append(tmpBindings, prj.Binding)
// Update sorting configuration.
Expand Down Expand Up @@ -584,22 +631,38 @@ func (p *queryPlan) projectAndGroupBy() error {
}
aaps = append(aaps, aap)
}
trace(p.tracer, func() []string {
return []string{fmt.Sprintf("Projecting %v", tmpBindings)}
})
if err := p.tbl.ProjectBindings(tmpBindings); err != nil {
return err
}
trace(p.tracer, func() []string {
return []string{"Reducing the table using configuration " + cfg.String()}
})
p.tbl.Reduce(cfg, aaps)
return nil
}

// orderBy takes the resulting table and sorts its contents according to the
// specifications of the ORDER BY clause.
func (p *queryPlan) orderBy() {
p.tbl.Sort(p.stm.OrderByConfig())
order := p.stm.OrderByConfig()
if len(order) <= 0 {
return
}
trace(p.tracer, func() []string {
return []string{"Ordering by " + order.String()}
})
p.tbl.Sort(order)
}

// having runs the filtering based on the having clause if needed.
func (p *queryPlan) having() error {
if p.stm.HasHavingClause() {
trace(p.tracer, func() []string {
return []string{"Having filtering"}
})
eval := p.stm.HavingEvaluator()
ok := true
var eErr error
Expand All @@ -620,14 +683,28 @@ func (p *queryPlan) having() error {
// limit truncates the table if the limit clause if available.
func (p *queryPlan) limit() {
if p.stm.IsLimitSet() {
trace(p.tracer, func() []string {
return []string{"Limit results to " + strconv.Itoa(int(p.stm.Limit()))}
})
p.tbl.Limit(p.stm.Limit())
}
}

// Execute queries the indicated graphs.
func (p *queryPlan) Execute(ctx context.Context) (*table.Table, error) {
// Fetch and catch graph instances.
trace(p.tracer, func() []string {
return []string{fmt.Sprintf("Caching graph instances for graphs %v", p.stm.GraphNames())}
})
if err := p.stm.Init(ctx, p.store); err != nil {
return nil, err
}
p.grfs = p.stm.Graphs()
// Retrieve the data.
lo := p.stm.GlobalLookupOptions()
trace(p.tracer, func() []string {
return []string{"Setting global lookup options to " + lo.String()}
})
if err := p.processGraphPattern(ctx, lo); err != nil {
return nil, err
}
Expand Down Expand Up @@ -698,29 +775,33 @@ func (p *queryPlan) String() string {
}

// New create a new executable plan given a semantic BQL statement.
func New(ctx context.Context, store storage.Store, stm *semantic.Statement, chanSize int) (Executor, error) {
func New(ctx context.Context, store storage.Store, stm *semantic.Statement, chanSize int, w io.Writer) (Executor, error) {
switch stm.Type() {
case semantic.Query:
return newQueryPlan(ctx, store, stm, chanSize)
return newQueryPlan(ctx, store, stm, chanSize, w)
case semantic.Insert:
return &insertPlan{
stm: stm,
store: store,
stm: stm,
store: store,
tracer: w,
}, nil
case semantic.Delete:
return &deletePlan{
stm: stm,
store: store,
stm: stm,
store: store,
tracer: w,
}, nil
case semantic.Create:
return &createPlan{
stm: stm,
store: store,
stm: stm,
store: store,
tracer: w,
}, nil
case semantic.Drop:
return &dropPlan{
stm: stm,
store: store,
stm: stm,
store: store,
tracer: w,
}, nil
default:
return nil, fmt.Errorf("planner.New: unknown statement type in statement %v", stm)
Expand Down
16 changes: 8 additions & 8 deletions bql/planner/planner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func insertTest(t *testing.T) {
if err = p.Parse(grammar.NewLLk(bql, 1), stm); err != nil {
t.Errorf("Parser.consume: failed to accept BQL %q with error %v", bql, err)
}
pln, err := New(ctx, memory.DefaultStore, stm, 0)
pln, err := New(ctx, memory.DefaultStore, stm, 0, nil)
if err != nil {
t.Errorf("planner.New: should have not failed to create a plan using memory.DefaultStorage for statement %v with error %v", stm, err)
}
Expand Down Expand Up @@ -82,7 +82,7 @@ func deleteTest(t *testing.T) {
if err = p.Parse(grammar.NewLLk(bql, 1), stm); err != nil {
t.Errorf("Parser.consume: failed to accept BQL %q with error %v", bql, err)
}
pln, err := New(ctx, memory.DefaultStore, stm, 0)
pln, err := New(ctx, memory.DefaultStore, stm, 0, nil)
if err != nil {
t.Errorf("planner.New: should have not failed to create a plan using memory.DefaultStorage for statement %v with error %v", stm, err)
}
Expand Down Expand Up @@ -153,7 +153,7 @@ func TestPlannerCreateGraph(t *testing.T) {
if err = p.Parse(grammar.NewLLk(bql, 1), stm); err != nil {
t.Errorf("Parser.consume: failed to accept BQL %q with error %v", bql, err)
}
pln, err := New(ctx, memory.DefaultStore, stm, 0)
pln, err := New(ctx, memory.DefaultStore, stm, 0, nil)
if err != nil {
t.Errorf("planner.New: should have not failed to create a plan using memory.DefaultStorage for statement %v with error %v", stm, err)
}
Expand Down Expand Up @@ -184,7 +184,7 @@ func TestPlannerDropGraph(t *testing.T) {
if err = p.Parse(grammar.NewLLk(bql, 1), stm); err != nil {
t.Errorf("Parser.consume: failed to accept BQL %q with error %v", bql, err)
}
pln, err := New(ctx, memory.DefaultStore, stm, 0)
pln, err := New(ctx, memory.DefaultStore, stm, 0, nil)
if err != nil {
t.Errorf("planner.New: should have not failed to create a plan using memory.DefaultStorage for statement %v with error %v", stm, err)
}
Expand Down Expand Up @@ -495,7 +495,7 @@ func TestPlannerQuery(t *testing.T) {
if err := p.Parse(grammar.NewLLk(entry.q, 1), st); err != nil {
t.Errorf("Parser.consume: failed to parse query %q with error %v", entry.q, err)
}
plnr, err := New(ctx, s, st, 0)
plnr, err := New(ctx, s, st, 0, nil)
if err != nil {
t.Errorf("planner.New failed to create a valid query plan with error %v", err)
}
Expand Down Expand Up @@ -546,7 +546,7 @@ func TestTreeTraversalToRoot(t *testing.T) {
if err := p.Parse(grammar.NewLLk(traversalQuery, 1), st); err != nil {
t.Errorf("Parser.consume: failed to parse query %q with error %v", traversalQuery, err)
}
plnr, err := New(ctx, s, st, 0)
plnr, err := New(ctx, s, st, 0, nil)
if err != nil {
t.Errorf("planner.New failed to create a valid query plan with error %v", err)
}
Expand Down Expand Up @@ -593,7 +593,7 @@ func TestChaining(t *testing.T) {
if err := p.Parse(grammar.NewLLk(traversalQuery, 1), st); err != nil {
t.Errorf("Parser.consume: failed to parse query %q with error %v", traversalQuery, err)
}
plnr, err := New(ctx, s, st, 0)
plnr, err := New(ctx, s, st, 0, nil)
if err != nil {
t.Errorf("planner.New failed to create a valid query plan with error %v", err)
}
Expand Down Expand Up @@ -624,7 +624,7 @@ func benchmarkQuery(query string, b *testing.B) {
if err := p.Parse(grammar.NewLLk(query, 1), st); err != nil {
b.Errorf("Parser.consume: failed to parse query %q with error %v", query, err)
}
plnr, err := New(ctx, s, st, 0)
plnr, err := New(ctx, s, st, 0, nil)
if err != nil {
b.Errorf("planner.New failed to create a valid query plan with error %v", err)
}
Expand Down
Loading

0 comments on commit 6a0bd91

Please sign in to comment.