Skip to content

Commit

Permalink
Remove index counter functionality (#2101)
Browse files Browse the repository at this point in the history
* Remove index counter functionality

This counter is very expensive to maintain and ends up offering little value. If we want it in the future it should be implemented in dhstore.

- Remove index logic
- Cleanup datastore to remove index count records
  • Loading branch information
gammazero committed Jul 13, 2023
1 parent c9a0a61 commit e46f6e6
Show file tree
Hide file tree
Showing 20 changed files with 93 additions and 759 deletions.
20 changes: 1 addition & 19 deletions command/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"github.com/ipni/go-libipni/mautil"
"github.com/ipni/storetheindex/config"
"github.com/ipni/storetheindex/fsutil"
"github.com/ipni/storetheindex/internal/counter"
"github.com/ipni/storetheindex/internal/ingest"
"github.com/ipni/storetheindex/internal/registry"
httpadmin "github.com/ipni/storetheindex/server/admin"
Expand All @@ -39,12 +38,6 @@ import (
"github.com/urfave/cli/v2"
)

const (
dsInfoPrefix = "/dsInfo/"
dsVersionKey = "version"
dsVersion = "001"
)

// Recognized valuestore type names.
const (
vstoreDHStore = "dhstore"
Expand Down Expand Up @@ -170,12 +163,6 @@ func daemonAction(cctx *cli.Context) error {
// Create indexer core
indexerCore := engine.New(valueStore, engine.WithCache(resultCache))

var indexCounts *counter.IndexCounts
if cfg.Indexer.IndexCountEnabled {
indexCounts = counter.NewIndexCounts(dstore)
indexCounts.SetTotalAddend(cfg.Indexer.IndexCountTotalAddend)
}

// Create registry
reg, err := registry.New(cctx.Context, cfg.Discovery, dstore,
registry.WithFreezer(freezeDirs, cfg.Indexer.FreezeAtPercent))
Expand All @@ -200,7 +187,6 @@ func daemonAction(cctx *cli.Context) error {
httpfind.WithWriteTimeout(time.Duration(cfg.Finder.ApiWriteTimeout)),
httpfind.WithMaxConnections(cfg.Finder.MaxConnections),
httpfind.WithHomepage(cfg.Finder.Webpage),
httpfind.WithIndexCounts(indexCounts),
httpfind.WithVersion(cctx.App.Version),
)
if err != nil {
Expand Down Expand Up @@ -251,7 +237,7 @@ func daemonAction(cctx *cli.Context) error {
}

// Initialize ingester.
ingester, err = ingest.NewIngester(cfg.Ingest, p2pHost, indexerCore, reg, dstore, dsTmp, ingest.WithIndexCounts(indexCounts))
ingester, err = ingest.NewIngester(cfg.Ingest, p2pHost, indexerCore, reg, dstore, dsTmp)
if err != nil {
return err
}
Expand Down Expand Up @@ -426,10 +412,6 @@ func daemonAction(cctx *cli.Context) error {
}
}

if indexCounts != nil {
indexCounts.SetTotalAddend(cfg.Indexer.IndexCountTotalAddend)
}

if errChan != nil {
errChan <- nil
}
Expand Down
70 changes: 45 additions & 25 deletions command/update_datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,22 @@ import (
"github.com/ipfs/go-datastore/query"
)

// updateBatchSize is the number of records to update at a time.
const updateBatchSize = 500000
const (
dsInfoPrefix = "/dsInfo/"
dsVersionKey = "version"
dsVersion = "002"

// updateBatchSize is the number of records to update at a time.
updateBatchSize = 500000
)

func updateDatastore(ctx context.Context, ds datastore.Batching) error {
dsVerKey := datastore.NewKey(dsInfoPrefix + dsVersionKey)
curVerData, err := ds.Get(ctx, dsVerKey)
if err != nil && !errors.Is(err, datastore.ErrNotFound) {
return fmt.Errorf("cannot check datastore: %w", err)
}
var curVer string
curVer := "000"
if len(curVerData) != 0 {
curVer = string(curVerData)
}
Expand All @@ -31,12 +37,29 @@ func updateDatastore(ctx context.Context, ds datastore.Batching) error {
return fmt.Errorf("unknown datastore verssion: %s", curVer)
}

log.Infof("Updating datastore to version %s", dsVersion)
if err = rmDtFsmRecords(ctx, ds); err != nil {
return err
var count int

log.Infof("Updating datastore from version %s to %s", curVer, dsVersion)
if curVer < "001" {
count, err = deletePrefix(ctx, ds, "/data-transfer-v2")
if err != nil {
return err
}
if count != 0 {
log.Infow("Datastore update removed data-transfer fsm records", "count", count)
}
if err = rmOldTempRecords(ctx, ds); err != nil {
return err
}
}
if err = rmOldTempRecords(ctx, ds); err != nil {
return err
if curVer < "002" {
count, err = deletePrefix(ctx, ds, "/indexCounts")
if err != nil {
return err
}
if count != 0 {
log.Infow("Datastore update removed index count records", "count", count)
}
}

if err = ds.Put(ctx, dsVerKey, []byte(dsVersion)); err != nil {
Expand Down Expand Up @@ -128,36 +151,36 @@ func rmOldTempRecords(ctx context.Context, ds datastore.Batching) error {
return nil
}

func rmDtFsmRecords(ctx context.Context, ds datastore.Batching) error {
func deletePrefix(ctx context.Context, ds datastore.Batching, prefix string) (int, error) {
q := query.Query{
KeysOnly: true,
Prefix: "/data-transfer-v2",
Prefix: prefix,
}
results, err := ds.Query(ctx, q)
if err != nil {
return fmt.Errorf("cannot query datastore: %w", err)
return 0, fmt.Errorf("cannot query datastore: %w", err)
}
defer results.Close()

batch, err := ds.Batch(ctx)
if err != nil {
return fmt.Errorf("cannot create datastore batch: %w", err)
return 0, fmt.Errorf("cannot create datastore batch: %w", err)
}

var dtKeyCount, writeCount int
var keyCount, writeCount int
for result := range results.Next() {
if ctx.Err() != nil {
return ctx.Err()
return 0, ctx.Err()
}
if writeCount >= updateBatchSize {
writeCount = 0
if err = batch.Commit(ctx); err != nil {
return fmt.Errorf("cannot commit datastore: %w", err)
return 0, fmt.Errorf("cannot commit datastore: %w", err)
}
log.Infow("Datastore update removed data-transfer fsm records", "count", dtKeyCount)
log.Infow("Datastore update removed records", "count", keyCount)
}
if result.Error != nil {
return fmt.Errorf("cannot read query result from datastore: %w", result.Error)
return 0, fmt.Errorf("cannot read query result from datastore: %w", result.Error)
}
ent := result.Entry
if len(ent.Key) == 0 {
Expand All @@ -166,21 +189,18 @@ func rmDtFsmRecords(ctx context.Context, ds datastore.Batching) error {
}

if err = batch.Delete(ctx, datastore.NewKey(ent.Key)); err != nil {
return fmt.Errorf("cannot delete dt state key from datastore: %w", err)
return 0, fmt.Errorf("cannot delete key from datastore: %w", err)
}
writeCount++
dtKeyCount++
keyCount++
}

if err = batch.Commit(ctx); err != nil {
return fmt.Errorf("cannot commit datastore: %w", err)
return 0, fmt.Errorf("cannot commit datastore: %w", err)
}
if err = ds.Sync(context.Background(), datastore.NewKey(q.Prefix)); err != nil {
return err
return 0, err
}

if dtKeyCount != 0 {
log.Infow("Datastore update removed data-transfer fsm records", "count", dtKeyCount)
}
return nil
return keyCount, nil
}
7 changes: 0 additions & 7 deletions config/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,6 @@ type Indexer struct {
// ValueStoreDir is on, at which to trigger the indexer to enter frozen
// mode. A zero value uses the default. A negative value disables freezing.
FreezeAtPercent float64
// IndexCountEnabled sets whether ingest process should count the number of index records per provider and
// finder should show the number of index counts per provider.
IndexCountEnabled bool
// IndexCountTotalAddend is a value that is added to the index count total,
// to account for uncounted indexes that have existed in the value store
// before provider index counts were tracked. This value is reloadable.
IndexCountTotalAddend uint64
// ShutdownTimeout is the duration that a graceful shutdown has to complete
// before the daemon process is terminated. If unset or zero, configures no
// shutdown timeout. This value is reloadable.
Expand Down
8 changes: 0 additions & 8 deletions e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,10 +326,6 @@ func TestEndToEndWithReferenceProvider(t *testing.T) {
require.Equal(t, 1, carCount)
require.Equal(t, 1, headCount)

outProvider := e.run(ipni, "provider", "-pid", providerID, "--indexer", "localhost:3000")
// Check that IndexCount with correct value appears in providers output.
require.Contains(t, string(outProvider), "IndexCount: 1043")

root2 := filepath.Join(e.dir, ".storetheindex2")
e.env = append(e.env, fmt.Sprintf("%s=%s", config.EnvDir, root2))
e.run(indexer, "init", "--store", "dhstore", "--pubsub-topic", "/indexer/ingest/mainnet", "--no-bootstrap", "--dhstore", "http://127.0.0.1:40080",
Expand Down Expand Up @@ -421,10 +417,6 @@ func TestEndToEndWithReferenceProvider(t *testing.T) {
return true
}, 10*time.Second, time.Second)

outProvider = e.run(ipni, "provider", "-pid", providerID, "--indexer", "localhost:3000")
// Check that IndexCount is back to zero after removing car.
require.Contains(t, string(outProvider), "IndexCount: 0")

// Check that status is not frozen.
outStatus := e.run(indexer, "admin", "status", "--indexer", "localhost:3202")
require.Contains(t, string(outStatus), "Frozen: false", "expected indexer to be frozen")
Expand Down
Loading

0 comments on commit e46f6e6

Please sign in to comment.