Skip to content

Commit

Permalink
piece doctor and sector state manager refactor (#1463)
Browse files Browse the repository at this point in the history
* fix timer.Reset and improve logs

* revert randomization

* piece doc: handle errors

* adjust piece check

* refactor unsealsectormanager

* refactor piece doctor

* add random ports

* ignore tests

* add version to boostd-data

* fix ctx in Start

* fix: add reader mock to fix tests

* fix: pass new piece directory to provider on test restart

* fix synchronisation

* note that panics are not propagated in tests

* carv1 panics piece directory

* print panics

* fix: use reader that supports Seek in piece reader mock

* fix: reset mock car reader on each invocation

* fix: TestOfflineDealDataCleanup

* add check for nil cancel func

* bump min check period for LevelDB to 5 minutes

* check if sector state mgr is initialised

* debug line for unflagging

* commenting out TestMultipleDealsConcurrent -- flaky test -- works locally

* add SectorStateUpdates pubsub

* add close for pubsub

* add mock sectorstatemgr

* add wrapper tests

* fixup

* cleanup

* cleanup

* better names

* t.Skip for test

* remove TODO above println for panic

* add unit tests for refreshState

* rename tests

* more cases

* more tests

* update description

* better comment

* better names and comments

---------

Co-authored-by: Dirk McCormick <dirkmdev@gmail.com>
  • Loading branch information
nonsense and dirkmc committed Jun 1, 2023
1 parent 93e3346 commit 5fe0bfd
Show file tree
Hide file tree
Showing 31 changed files with 1,420 additions and 1,013 deletions.
4 changes: 4 additions & 0 deletions cmd/boostd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ func before(cctx *cli.Context) error {
_ = logging.SetLogLevel("piecedir", "INFO")
_ = logging.SetLogLevel("index-provider-wrapper", "INFO")
_ = logging.SetLogLevel("unsmgr", "INFO")
_ = logging.SetLogLevel("piecedoc", "INFO")
_ = logging.SetLogLevel("piecedirectory", "INFO")

if cliutil.IsVeryVerbose {
_ = logging.SetLogLevel("boostd", "DEBUG")
Expand All @@ -83,6 +85,8 @@ func before(cctx *cli.Context) error {
_ = logging.SetLogLevel("piecedir", "DEBUG")
_ = logging.SetLogLevel("fxlog", "DEBUG")
_ = logging.SetLogLevel("unsmgr", "DEBUG")
_ = logging.SetLogLevel("piecedoc", "DEBUG")
_ = logging.SetLogLevel("piecedirectory", "DEBUG")
}

return nil
Expand Down
9 changes: 5 additions & 4 deletions cmd/boostd/recover.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/filecoin-project/boost/cmd/lib"
"github.com/filecoin-project/boost/db"
"github.com/filecoin-project/boost/piecedirectory"
bdclient "github.com/filecoin-project/boostd-data/client"
"github.com/filecoin-project/boostd-data/model"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-commp-utils/writer"
Expand Down Expand Up @@ -190,14 +191,14 @@ func action(cctx *cli.Context) error {
if ignoreLID {
pd = nil
} else {
pdClient := piecedirectory.NewStore()
defer pdClient.Close(ctx)
err = pdClient.Dial(ctx, cctx.String("api-lid"))
cl := bdclient.NewStore()
defer cl.Close(ctx)
err = cl.Dial(ctx, cctx.String("api-lid"))
if err != nil {
return fmt.Errorf("connecting to local index directory service: %w", err)
}
pr := &piecedirectory.SectorAccessorAsPieceReader{SectorAccessor: sa}
pd = piecedirectory.NewPieceDirectory(pdClient, pr, cctx.Int("add-index-throttle"))
pd = piecedirectory.NewPieceDirectory(cl, pr, cctx.Int("add-index-throttle"))
pd.Start(ctx)
}

Expand Down
13 changes: 7 additions & 6 deletions cmd/booster-bitswap/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/filecoin-project/boost/cmd/lib/remoteblockstore"
"github.com/filecoin-project/boost/metrics"
"github.com/filecoin-project/boost/piecedirectory"
bdclient "github.com/filecoin-project/boostd-data/client"
"github.com/filecoin-project/boostd-data/shared/tracing"
lcli "github.com/filecoin-project/lotus/cli"
"github.com/libp2p/go-libp2p/core/peer"
Expand Down Expand Up @@ -157,9 +158,9 @@ var runCmd = &cli.Command{
defer storageCloser()

// Connect to the local index directory service
pdClient := piecedirectory.NewStore()
defer pdClient.Close(ctx)
err = pdClient.Dial(ctx, cctx.String("api-lid"))
cl := bdclient.NewStore()
defer cl.Close(ctx)
err = cl.Dial(ctx, cctx.String("api-lid"))
if err != nil {
return fmt.Errorf("connecting to local index directory service: %w", err)
}
Expand Down Expand Up @@ -196,8 +197,8 @@ var runCmd = &cli.Command{
return fmt.Errorf("starting block filter: %w", err)
}
pr := &piecedirectory.SectorAccessorAsPieceReader{SectorAccessor: sa}
piecedirectory := piecedirectory.NewPieceDirectory(pdClient, pr, cctx.Int("add-index-throttle"))
remoteStore := remoteblockstore.NewRemoteBlockstore(piecedirectory, &bitswapBlockMetrics)
pd := piecedirectory.NewPieceDirectory(cl, pr, cctx.Int("add-index-throttle"))
remoteStore := remoteblockstore.NewRemoteBlockstore(pd, &bitswapBlockMetrics)
server := NewBitswapServer(remoteStore, host, multiFilter)

var proxyAddrInfo *peer.AddrInfo
Expand All @@ -210,7 +211,7 @@ var runCmd = &cli.Command{
}

// Start the local index directory
piecedirectory.Start(ctx)
pd.Start(ctx)

// Start the bitswap server
log.Infof("Starting booster-bitswap node on port %d", port)
Expand Down
15 changes: 8 additions & 7 deletions cmd/booster-http/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/filecoin-project/boost/cmd/lib/remoteblockstore"
"github.com/filecoin-project/boost/metrics"
"github.com/filecoin-project/boost/piecedirectory"
bdclient "github.com/filecoin-project/boostd-data/client"
"github.com/filecoin-project/boostd-data/model"
"github.com/filecoin-project/boostd-data/shared/tracing"
"github.com/filecoin-project/dagstore/mount"
Expand Down Expand Up @@ -127,9 +128,9 @@ var runCmd = &cli.Command{

// Connect to the local index directory service
ctx := lcli.ReqContext(cctx)
pdClient := piecedirectory.NewStore()
defer pdClient.Close(ctx)
err := pdClient.Dial(ctx, cctx.String("api-lid"))
cl := bdclient.NewStore()
defer cl.Close(ctx)
err := cl.Dial(ctx, cctx.String("api-lid"))
if err != nil {
return fmt.Errorf("connecting to local index directory service: %w", err)
}
Expand Down Expand Up @@ -168,7 +169,7 @@ var runCmd = &cli.Command{

// Create the server API
pr := &piecedirectory.SectorAccessorAsPieceReader{SectorAccessor: sa}
piecedirectory := piecedirectory.NewPieceDirectory(pdClient, pr, cctx.Int("add-index-throttle"))
pd := piecedirectory.NewPieceDirectory(cl, pr, cctx.Int("add-index-throttle"))

opts := &HttpServerOptions{
ServePieces: servePieces,
Expand Down Expand Up @@ -199,11 +200,11 @@ var runCmd = &cli.Command{
GetSizeFailResponseCount: metrics.HttpRblsGetSizeFailResponseCount,
GetSizeSuccessResponseCount: metrics.HttpRblsGetSizeSuccessResponseCount,
}
rbs := remoteblockstore.NewRemoteBlockstore(piecedirectory, &httpBlockMetrics)
rbs := remoteblockstore.NewRemoteBlockstore(pd, &httpBlockMetrics)
filtered := filters.NewFilteredBlockstore(rbs, multiFilter)
opts.Blockstore = filtered
}
sapi := serverApi{ctx: ctx, piecedirectory: piecedirectory, sa: sa}
sapi := serverApi{ctx: ctx, piecedirectory: pd, sa: sa}
server := NewHttpServer(
cctx.String("base-path"),
cctx.Int("port"),
Expand All @@ -212,7 +213,7 @@ var runCmd = &cli.Command{
)

// Start the local index directory
piecedirectory.Start(ctx)
pd.Start(ctx)

// Start the server
log.Infof("Starting booster-http node on port %d with base path '%s'",
Expand Down
2 changes: 1 addition & 1 deletion extern/boostd-data/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ unexport GOFLAGS

GOCC?=go

ldflags=-X=github.com/filecoin-project/boost/build.CurrentCommit=+git.$(subst -,.,$(shell git describe --always --match=NeVeRmAtCh --dirty 2>/dev/null || git rev-parse --short HEAD 2>/dev/null))
ldflags=-X=github.com/filecoin-project/boostd-data/build.CurrentCommit=+git.$(subst -,.,$(shell git describe --always --match=NeVeRmAtCh --dirty 2>/dev/null || git rev-parse --short HEAD 2>/dev/null))
ifneq ($(strip $(LDFLAGS)),)
ldflags+=-extldflags=$(LDFLAGS)
endif
Expand Down
9 changes: 9 additions & 0 deletions extern/boostd-data/build/build.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package build

var CurrentCommit string

const BuildVersion = "1.4.0"

func UserVersion() string {
return BuildVersion + CurrentCommit
}
28 changes: 28 additions & 0 deletions extern/boostd-data/clientutil/clientutil.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package clientutil

import (
"context"
"fmt"

"github.com/filecoin-project/boostd-data/client"
"github.com/filecoin-project/boostd-data/svc"
)

func NewTestStore(ctx context.Context) *client.Store {
bdsvc, err := svc.NewLevelDB("")
if err != nil {
panic(err)
}
ln, err := bdsvc.Start(ctx, "localhost:0")
if err != nil {
panic(err)
}

cl := client.NewStore()
err = cl.Dial(ctx, fmt.Sprintf("ws://%s", ln.String()))
if err != nil {
panic(err)
}

return cl
}
5 changes: 4 additions & 1 deletion extern/boostd-data/cmd/main.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package main

import (
"os"

"github.com/filecoin-project/boostd-data/build"
"github.com/filecoin-project/boostd-data/shared/cliutil"
logging "github.com/ipfs/go-log/v2"
"github.com/urfave/cli/v2"
"os"
)

var log = logging.Logger("boostd-data")
Expand All @@ -17,6 +19,7 @@ func main() {
app := &cli.App{
Name: "boostd-data",
Usage: "Service that implements boostd data API",
Version: build.UserVersion(),
EnableBashCompletion: true,
Flags: []cli.Flag{
cliutil.FlagVeryVerbose,
Expand Down
2 changes: 1 addition & 1 deletion extern/boostd-data/cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ func runAction(cctx *cli.Context, dbType string, store *svc.Service) error {

// Start the server
addr := cctx.String("addr")
err = store.Start(ctx, addr)
_, err = store.Start(ctx, addr)
if err != nil {
return fmt.Errorf("starting %s store: %w", dbType, err)
}
Expand Down
2 changes: 1 addition & 1 deletion extern/boostd-data/ldb/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,7 @@ func (db *DB) GetOffsetSize(ctx context.Context, cursorPrefix string, m multihas

var (
// The minimum frequency with which to check pieces for errors (eg bad index)
MinPieceCheckPeriod = 30 * time.Second
MinPieceCheckPeriod = 5 * time.Minute

// in-memory cursor to the position we reached in the leveldb table with respect to piece cids to process for errors with the doctor
offset int
Expand Down
8 changes: 4 additions & 4 deletions extern/boostd-data/svc/svc.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,15 +54,15 @@ func MakeLevelDBDir(repoPath string) (string, error) {
return repoPath, nil
}

func (s *Service) Start(ctx context.Context, addr string) error {
func (s *Service) Start(ctx context.Context, addr string) (net.Addr, error) {
ln, err := net.Listen("tcp", addr)
if err != nil {
return fmt.Errorf("setting up listener for local index directory service: %w", err)
return nil, fmt.Errorf("setting up listener for local index directory service: %w", err)
}

err = s.Impl.Start(ctx)
if err != nil {
return fmt.Errorf("starting local index directory service: %w", err)
return nil, fmt.Errorf("starting local index directory service: %w", err)
}

server := jsonrpc.NewServer()
Expand Down Expand Up @@ -97,5 +97,5 @@ func (s *Service) Start(ctx context.Context, addr string) error {
<-done
}()

return nil
return ln.Addr(), nil
}
36 changes: 28 additions & 8 deletions extern/boostd-data/svc/svc_size_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,19 @@ import (
"context"
"encoding/json"
"fmt"
"math"
"math/rand"
"testing"
"time"

"github.com/filecoin-project/boost/testutil"
"github.com/filecoin-project/boostd-data/client"
"github.com/filecoin-project/boostd-data/model"
"github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log/v2"
"github.com/ipld/go-car/v2/index"
"github.com/multiformats/go-multihash"
"github.com/stretchr/testify/require"
"math"
"math/rand"
"testing"
"time"
)

var tlg = logging.Logger("tlg")
Expand All @@ -33,7 +36,7 @@ func TestSizeLimit(t *testing.T) {
bdsvc, err := NewLevelDB("")
require.NoError(t, err)

testSizeLimit(ctx, t, bdsvc, "localhost:8042")
testSizeLimit(ctx, t, bdsvc, "localhost:0")
})

t.Run("yugabyte", func(t *testing.T) {
Expand All @@ -43,17 +46,17 @@ func TestSizeLimit(t *testing.T) {

bdsvc := NewYugabyte(TestYugabyteSettings)

addr := "localhost:8044"
addr := "localhost:0"
testSizeLimit(ctx, t, bdsvc, addr)
})
}

func testSizeLimit(ctx context.Context, t *testing.T, bdsvc *Service, addr string) {
err := bdsvc.Start(ctx, addr)
ln, err := bdsvc.Start(ctx, addr)
require.NoError(t, err)

cl := client.NewStore()
err = cl.Dial(context.Background(), fmt.Sprintf("ws://%s", addr))
err = cl.Dial(context.Background(), fmt.Sprintf("ws://%s", ln.String()))
require.NoError(t, err)
defer cl.Close(ctx)

Expand Down Expand Up @@ -119,3 +122,20 @@ func generateRandomCid(baseCid []byte) (cid.Cid, error) {

return c, nil
}

func toEntries(idx index.Index) (map[string]uint64, bool) {
it, ok := idx.(index.IterableIndex)
if !ok {
return nil, false
}

entries := make(map[string]uint64)
err := it.ForEach(func(mh multihash.Multihash, o uint64) error {
entries[mh.String()] = o
return nil
})
if err != nil {
return nil, false
}
return entries, true
}
Loading

0 comments on commit 5fe0bfd

Please sign in to comment.