Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

optimize filestore verify #5286

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
212 changes: 108 additions & 104 deletions core/commands/filestore.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
package commands

import (
"context"
"fmt"
"io"
"os"
"sort"

oldCmds "github.com/ipfs/go-ipfs/commands"
lgc "github.com/ipfs/go-ipfs/commands/legacy"
Expand All @@ -23,7 +22,7 @@ var FileStoreCmd = &cmds.Command{
},
Subcommands: map[string]*cmds.Command{
"ls": lsFileStore,
"verify": lgc.NewCommand(verifyFileStore),
"verify": verifyFileStore,
"dups": lgc.NewCommand(dupsFileStore),
},
}
Expand All @@ -48,36 +47,8 @@ The output is:
Options: []cmdkit.Option{
cmdkit.BoolOption("file-order", "sort the results based on the path of the backing file"),
},
Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) {
_, fs, err := getFilestore(env)
if err != nil {
res.SetError(err, cmdkit.ErrNormal)
return
}
args := req.Arguments
if len(args) > 0 {
out := perKeyActionToChan(req.Context, args, func(c *cid.Cid) *filestore.ListRes {
return filestore.List(fs, c)
})

err = res.Emit(out)
if err != nil {
log.Error(err)
}
} else {
fileOrder, _ := req.Options["file-order"].(bool)
next, err := filestore.ListAll(fs, fileOrder)
if err != nil {
res.SetError(err, cmdkit.ErrNormal)
return
}

out := listResToChan(req.Context, next)
err = res.Emit(out)
if err != nil {
log.Error(err)
}
}
Run: func(req *cmds.Request, re cmds.ResponseEmitter, env cmds.Environment) {
listOrVerify(req, re, env, false)
},
PostRun: cmds.PostRunMap{
cmds.CLI: func(req *cmds.Request, re cmds.ResponseEmitter) cmds.ResponseEmitter {
Expand Down Expand Up @@ -118,7 +89,7 @@ The output is:
Type: filestore.ListRes{},
}

var verifyFileStore = &oldCmds.Command{
var verifyFileStore = &cmds.Command{
Helptext: cmdkit.HelpText{
Tagline: "Verify objects in filestore.",
LongDescription: `
Expand Down Expand Up @@ -148,46 +119,41 @@ For ERROR entries the error will also be printed to stderr.
Options: []cmdkit.Option{
cmdkit.BoolOption("file-order", "verify the objects based on the order of the backing file"),
},
Run: func(req oldCmds.Request, res oldCmds.Response) {
_, fs, err := getFilestore(req.InvocContext())
if err != nil {
res.SetError(err, cmdkit.ErrNormal)
return
}
args := req.Arguments()
if len(args) > 0 {
out := perKeyActionToChan(req.Context(), args, func(c *cid.Cid) *filestore.ListRes {
return filestore.Verify(fs, c)
})
res.SetOutput(out)
} else {
fileOrder, _, _ := req.Option("file-order").Bool()
next, err := filestore.VerifyAll(fs, fileOrder)
if err != nil {
res.SetError(err, cmdkit.ErrNormal)
return
}
out := listResToChan(req.Context(), next)
res.SetOutput(out)
}
Run: func(req *cmds.Request, re cmds.ResponseEmitter, env cmds.Environment) {
listOrVerify(req, re, env, true)
},
Marshalers: oldCmds.MarshalerMap{
oldCmds.Text: func(res oldCmds.Response) (io.Reader, error) {
v, err := unwrapOutput(res.Output())
if err != nil {
return nil, err
}
PostRun: cmds.PostRunMap{
cmds.CLI: func(req *cmds.Request, re cmds.ResponseEmitter) cmds.ResponseEmitter {
reNext, res := cmds.NewChanResponsePair(req)

r, ok := v.(*filestore.ListRes)
if !ok {
return nil, e.TypeErr(r, v)
}
go func() {
defer re.Close()

if r.Status == filestore.StatusOtherError {
fmt.Fprintf(res.Stderr(), "%s\n", r.ErrorMsg)
}
fmt.Fprintf(res.Stdout(), "%s %s\n", r.Status.Format(), r.FormatLong())
return nil, nil
var errors bool
for {
v, err := res.Next()
if !cmds.HandleError(err, res, re) {
break
}

r, ok := v.(*filestore.ListRes)
if !ok {
log.Error(e.New(e.TypeErr(r, v)))
return
}

if r.Status == filestore.StatusOtherError {
fmt.Fprintf(os.Stderr, "%s\n", r.ErrorMsg)
}
fmt.Fprintf(os.Stdout, "%s %s\n", r.Status.Format(), r.FormatLong())
}

if errors {
re.SetError("errors while displaying some entries", cmdkit.ErrNormal)
}
}()

return reNext
},
},
Type: filestore.ListRes{},
Expand Down Expand Up @@ -242,45 +208,83 @@ func getFilestore(env interface{}) (*core.IpfsNode, *filestore.Filestore, error)
return n, fs, err
}

func listResToChan(ctx context.Context, next func() *filestore.ListRes) <-chan interface{} {
out := make(chan interface{}, 128)
go func() {
defer close(out)
for {
r := next()
if r == nil {
func listOrVerify(req *cmds.Request, re cmds.ResponseEmitter, env cmds.Environment, verify bool) {
_, fs, err := getFilestore(env)
if err != nil {
re.SetError(err, cmdkit.ErrNormal)
return
}

single := filestore.List
all := filestore.ListAll
if verify {
single = filestore.Verify
all = filestore.VerifyAll
}

if len(req.Arguments) > 0 {
cids := make([]*cid.Cid, len(req.Arguments))
var err error
for i, cs := range req.Arguments {
cids[i], err = cid.Decode(cs)
if err != nil {
re.SetError(fmt.Errorf("%s is not a valid cid: %s", cs, err), cmdkit.ErrClient)
return
}
select {
case out <- r:
case <-ctx.Done():
}
for _, c := range cids {
if err := re.Emit(single(fs, c)); err != nil {
re.SetError(err, cmdkit.ErrNormal)
return
}
if err := req.Context.Err(); err != nil {
re.SetError(err, cmdkit.ErrNormal)
return
}
}
}()
return out
}

func perKeyActionToChan(ctx context.Context, args []string, action func(*cid.Cid) *filestore.ListRes) <-chan interface{} {
out := make(chan interface{}, 128)
go func() {
defer close(out)
for _, arg := range args {
c, err := cid.Decode(arg)
if err != nil {
out <- &filestore.ListRes{
Status: filestore.StatusOtherError,
ErrorMsg: fmt.Sprintf("%s: %v", arg, err),
return
}
fileOrder, _ := req.Options["file-order"].(bool)
ch, err := all(req.Context, fs)
if err != nil {
re.SetError(err, cmdkit.ErrNormal)
return
}
if fileOrder {
var results []*filestore.ListRes
for r := range ch {
results = append(results, r)
}
// TODO: Should be handled by the cmdkit library.
if err := req.Context.Err(); err != nil {
re.SetError(err, cmdkit.ErrNormal)
return
}
sort.Slice(results, func(i, j int) bool {
if results[i].FilePath == results[j].FilePath {
// XXX: is this case even possible?
if results[i].Offset == results[j].Offset {
return results[i].Key.KeyString() < results[j].Key.KeyString()
}
continue
}
r := action(c)
select {
case out <- r:
case <-ctx.Done():
return
return results[i].Offset < results[j].Offset
}
return results[i].FilePath < results[j].FilePath
})
for _, r := range results {
re.Emit(r)
}
return
}

for r := range ch {
if err := re.Emit(r); err != nil {
re.SetError(err, cmdkit.ErrNormal)
return
}
}()
return out
}
// TODO: Should be handled by the cmdkit library.
if err := req.Context.Err(); err != nil {
re.SetError(err, cmdkit.ErrNormal)
return
}
}
Loading