Skip to content

Commit

Permalink
Adding load command for bulk triple upload
Browse files Browse the repository at this point in the history
  • Loading branch information
xllora committed Mar 6, 2016
1 parent ef3e736 commit a2077d2
Show file tree
Hide file tree
Showing 4 changed files with 154 additions and 6 deletions.
8 changes: 5 additions & 3 deletions tools/vcli/bw/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/google/badwolf/storage"
"github.com/google/badwolf/tools/vcli/bw/assert"
"github.com/google/badwolf/tools/vcli/bw/command"
"github.com/google/badwolf/tools/vcli/bw/load"
"github.com/google/badwolf/tools/vcli/bw/repl"
"github.com/google/badwolf/tools/vcli/bw/run"
"github.com/google/badwolf/tools/vcli/bw/version"
Expand Down Expand Up @@ -95,9 +96,10 @@ func InitializeDriver(driverName string, drivers map[string]StoreGenerator) (sto

// InitializeCommands intializes the avaialbe commands with the given storage
// instance.
func InitializeCommands(driver storage.Store, chanSize int) []*command.Command {
func InitializeCommands(driver storage.Store, chanSize, bulkTripleOpSize, builderSize int) []*command.Command {
return []*command.Command{
assert.New(driver, literal.DefaultBuilder(), chanSize),
load.New(driver, bulkTripleOpSize, builderSize),
run.New(driver, chanSize),
repl.New(driver, chanSize),
version.New(),
Expand Down Expand Up @@ -132,7 +134,7 @@ func Eval(ctx context.Context, args []string, cmds []*command.Command) int {
}

// Run executes the main of the command line tool.
func Run(driverName string, drivers map[string]StoreGenerator, chanSize int) int {
func Run(driverName string, drivers map[string]StoreGenerator, chanSize, bulkTripleOpSize, builderSize int) int {
driver, err := InitializeDriver(driverName, drivers)
if err != nil {
fmt.Fprintln(os.Stderr, err)
Expand All @@ -145,5 +147,5 @@ func Run(driverName string, drivers map[string]StoreGenerator, chanSize int) int
}
args = append(args, s)
}
return Eval(context.Background(), args, InitializeCommands(driver, chanSize))
return Eval(context.Background(), args, InitializeCommands(driver, chanSize, bulkTripleOpSize, builderSize))
}
26 changes: 26 additions & 0 deletions tools/vcli/bw/io/io.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,3 +58,29 @@ func ReadLines(path string) ([]string, error) {
}
return lines, scanner.Err()
}

// ProcessLines from a file using the provied call back. The error of the
// callback will be passed through. Returns the number of processed errors
// before the error. Returns the line where the error occurred or the total
// numbers of lines processed.
func ProcessLines(path string, fp func(line string) error) (int, error) {
f, err := os.Open(path)
if err != nil {
return 0, err
}
defer f.Close()

scanner := bufio.NewScanner(f)
cnt := 0
for scanner.Scan() {
l := strings.TrimSpace(scanner.Text())
cnt++
if len(l) == 0 || strings.Index(l, "#") == 0 {
continue
}
if err := fp(l); err != nil {
return cnt, err
}
}
return cnt, scanner.Err()
}
118 changes: 118 additions & 0 deletions tools/vcli/bw/load/load.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
// Copyright 2016 Google Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Package load contains the command allowing to run a sequence of BQL commands
// from the provided file.
package load

import (
"fmt"
"os"
"strings"

"golang.org/x/net/context"

"github.com/google/badwolf/storage"
"github.com/google/badwolf/tools/vcli/bw/command"
"github.com/google/badwolf/tools/vcli/bw/io"
"github.com/google/badwolf/triple"
"github.com/google/badwolf/triple/literal"
)

// New creates the help command.
func New(store storage.Store, bulkSize, builderSize int) *command.Command {
cmd := &command.Command{
UsageLine: "load <file_path> <graph_names_separated_by_commas>",
Short: "load triples in bulk stored in a file.",
Long: `Loads all the triples stored in a file into the provided graphs.
Graph names need to be separated by commans with no whitespaces. Each triple
needs to placed in a single line. Each triple needs to be formated so it can be
parsed as indicated in the documetation (see https://github.com/google/badwolf).
All data in the file will be treated as triples. A line starting with # will
be treated as a commented line. If the load fails you may end up with partially
loaded data.
`,
}
cmd.Run = func(ctx context.Context, args []string) int {
return loadCommand(ctx, cmd, args, store, bulkSize, builderSize)
}
return cmd
}

func loadCommand(ctx context.Context, cmd *command.Command, args []string, store storage.Store, bulkSize, builderSize int) int {
if len(args) <= 3 {
fmt.Fprintf(os.Stderr, "Missing required file path and/or graph names.\n\n")
cmd.Usage()
return 2
}
graphs, lb := strings.Split(args[len(args)-1], ","), literal.NewBoundedBuilder(builderSize)
trplsChan, errChan, doneChan := make(chan *triple.Triple), make(chan error), make(chan bool)
path := args[len(args)-2]
go storeTriple(ctx, store, graphs, bulkSize, trplsChan, errChan, doneChan)
cnt, err := io.ProcessLines(path, func(line string) error {
t, err := triple.Parse(line, lb)
if err != nil {
return err
}
trplsChan <- t
return <-errChan
})
flush(ctx, graphs, store)
close(trplsChan)
close(errChan)
if err != nil {
fmt.Fprintf(os.Stderr, "Failed to process file %q. Ivalid triple on line %d.", path, cnt)
return 2
}
fmt.Printf("Successfully processed %d lines from file %q.\nTriples loaded into graphs:\n\t- %s\n", cnt, path, strings.Join(graphs, "\n\t- "))
return 0
}

var workingTrpls []*triple.Triple

func flush(ctx context.Context, graphs []string, store storage.Store) error {
defer func() {
workingTrpls = nil
}()
if len(workingTrpls) > 0 {
for _, graph := range graphs {
g, err := store.Graph(ctx, graph)
if err != nil {
return err
}
if err := g.AddTriples(ctx, workingTrpls); err != nil {
return err
}
}
}
return nil
}

func storeTriple(ctx context.Context, store storage.Store, graphs []string, bulkSize int, trplChan <-chan *triple.Triple, errChan chan<- error, doneChan <-chan bool) {
for {
select {
case <-doneChan:
return
case trpl := <-trplChan:
if len(workingTrpls) < bulkSize {
workingTrpls = append(workingTrpls, trpl)
errChan <- nil
} else {
err := flush(ctx, graphs, store)
workingTrpls = append(workingTrpls, trpl)
errChan <- err
}
}
}
}
8 changes: 5 additions & 3 deletions tools/vcli/bw/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,10 @@ var (
// drivers contains the registed drivers available for this command line tool.
registeredDrivers map[string]common.StoreGenerator
// Available flags.
driver = flag.String("driver", "VOLATILE", "The storage driver to use {VOLATILE}.")
bqlChannelSize = flag.Int("bql_channel_size", 0, "Internal channel size to use on BQL queries.")
driver = flag.String("driver", "VOLATILE", "The storage driver to use {VOLATILE}.")
bqlChannelSize = flag.Int("bql_channel_size", 0, "Internal channel size to use on BQL queries.")
bulkTripleOpSize = flag.Int("bulk_triple_op_size", 1000, "Number of triples to use in bulk load operations.")
bulkTriplBuildersize = flag.Int("bulk_triple_builder_size_in_bytes", 1000, "Maximum size of literals when parsing a triple.")
// Add your driver flags below.
)

Expand All @@ -50,5 +52,5 @@ func registerDrivers() {
func main() {
flag.Parse()
registerDrivers()
os.Exit(common.Run(*driver, registeredDrivers, *bqlChannelSize))
os.Exit(common.Run(*driver, registeredDrivers, *bqlChannelSize, *bulkTripleOpSize, *bulkTriplBuildersize))
}

0 comments on commit a2077d2

Please sign in to comment.