Skip to content

Commit

Permalink
Sync ad chains by segment (#2464)
Browse files Browse the repository at this point in the history
Ad chains my be very long, so it is necessary to sync them by segment.
  • Loading branch information
gammazero committed Dec 30, 2023
1 parent 8b314f2 commit 8acc51b
Show file tree
Hide file tree
Showing 5 changed files with 81 additions and 25 deletions.
3 changes: 2 additions & 1 deletion e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,9 +359,10 @@ func testEndToEndWithReferenceProvider(t *testing.T, publisherProto string) {
if testing.Verbose() {
logLevel = "debug"
}
outgc := string(e.Run(ipnigc, "provider", "-pid", providerID, "-ll", logLevel, "--commit",
outgc := string(e.Run(ipnigc, "provider", "-pid", providerID, "-ll", logLevel,
"-i", "http://localhost:3200",
"-i", "http://localhost:3000",
"-sync-segment-size", "2",
))
t.Logf("GC Results:\n%s\n", outgc)
require.Contains(t, outgc, `"count": 1043, "total": 1043, "source": "CAR"`)
Expand Down
7 changes: 7 additions & 0 deletions ipni-gc/cmd/ipnigc/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,12 @@ var daemonFlags = []cli.Flag{
Usage: "Set log level for other loggers that are not ipni-gc",
Value: "error",
},
&cli.IntFlag{
Name: "sync-segment-size",
Usage: "Set advertisement chain sync segment size. This specifies how many ads to sync in each segment.",
Aliases: []string{"sync-ss"},
Value: 4096,
},
}

func daemonAction(cctx *cli.Context) error {
Expand Down Expand Up @@ -124,6 +130,7 @@ func daemonAction(cctx *cli.Context) error {
reaper.WithPCache(pc),
reaper.WithTopicName(cfg.Ingest.PubSubTopic),
reaper.WithHttpTimeout(time.Duration(cfg.Ingest.HttpSyncTimeout)),
reaper.WithSyncSegmentSize(cctx.Int("sync-segment-size")),
)
if err != nil {
return err
Expand Down
7 changes: 7 additions & 0 deletions ipni-gc/cmd/ipnigc/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,12 @@ var providerFlags = []cli.Flag{
Aliases: []string{"ss"},
Value: 16384,
},
&cli.IntFlag{
Name: "sync-segment-size",
Usage: "Set advertisement chain sync segment size. This specifies how many ads to sync in each segment.",
Aliases: []string{"sync-ss"},
Value: 4096,
},
}

func providerAction(cctx *cli.Context) error {
Expand Down Expand Up @@ -159,6 +165,7 @@ func providerAction(cctx *cli.Context) error {
reaper.WithSegmentSize(cctx.Int("segment-size")),
reaper.WithTopicName(cfg.Ingest.PubSubTopic),
reaper.WithHttpTimeout(time.Duration(cfg.Ingest.HttpSyncTimeout)),
reaper.WithSyncSegmentSize(cctx.Int("sync-segment-size")),
)
if err != nil {
return err
Expand Down
15 changes: 14 additions & 1 deletion ipni-gc/reaper/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
const (
defaultHttpTimeout = 10 * time.Second
defaultSegmentSize = 16384
defaultSyncSegSize = 4096
defaultTopic = "/indexer/ingest/mainnet"
)

Expand All @@ -29,6 +30,7 @@ type config struct {
p2pHost host.Host
pcache *pcache.ProviderCache
segmentSize int
syncSegSize int
topic string
}

Expand All @@ -43,6 +45,7 @@ func getOpts(opts []Option) (config, error) {
entsFromPub: true,
httpTimeout: defaultHttpTimeout,
segmentSize: defaultSegmentSize,
syncSegSize: defaultSyncSegSize,
topic: defaultTopic,
}

Expand Down Expand Up @@ -135,7 +138,7 @@ func WithPCache(pc *pcache.ProviderCache) Option {
}

// WithSegmentSize sets the size of the segments that the ad chain is broken
// into for processing.
// into for processing after syncing.
func WithSegmentSize(size int) Option {
return func(c *config) error {
if size > 0 {
Expand All @@ -145,6 +148,16 @@ func WithSegmentSize(size int) Option {
}
}

// WithSyncSegmentSize sets sice that the ad chain is broken into when syncing.
func WithSyncSegmentSize(size int) Option {
return func(c *config) error {
if size > 0 {
c.syncSegSize = size
}
return nil
}
}

// WithTopicName sets the topic name on which the provider announces advertised
// content. Defaults to '/indexer/ingest/mainnet'.
func WithTopicName(topic string) Option {
Expand Down
74 changes: 51 additions & 23 deletions ipni-gc/reaper/reaper.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ type Reaper struct {
segmentSize int
stats GCStats
statsMutex sync.Mutex
syncSegSize int
topic string
}

Expand Down Expand Up @@ -192,6 +193,7 @@ func New(idxr indexer.Interface, fileStore filestore.Interface, options ...Optio
indexer: idxr,
pcache: opts.pcache,
segmentSize: opts.segmentSize,
syncSegSize: opts.syncSegSize,
topic: opts.topic,
}, nil
}
Expand Down Expand Up @@ -271,7 +273,7 @@ func (r *Reaper) Reap(ctx context.Context, providerID peer.ID) error {
defer dstoreTmp.Close()

// Create ipni dagsync Subscriber for the provider.
sub, err := makeSubscriber(r.host, dstoreTmp, r.topic, r.httpTimeout)
sub, err := r.makeSubscriber(dstoreTmp)
if err != nil {
return fmt.Errorf("failed to start dagsync subscriber: %w", err)
}
Expand Down Expand Up @@ -596,6 +598,53 @@ func (r *Reaper) deleteCarFile(ctx context.Context, adCid cid.Cid) (int64, error
return file.Size, nil
}

func (r *Reaper) makeSubscriber(dstoreTmp datastore.Batching) (*dagsync.Subscriber, error) {
linksys := cidlink.DefaultLinkSystem()

linksys.StorageReadOpener = func(lctx ipld.LinkContext, lnk ipld.Link) (io.Reader, error) {
c := lnk.(cidlink.Link).Cid
val, err := dstoreTmp.Get(lctx.Ctx, datastore.NewKey(c.String()))
if err != nil {
return nil, err
}
return bytes.NewBuffer(val), nil
}
linksys.StorageWriteOpener = func(lctx ipld.LinkContext) (io.Writer, ipld.BlockWriteCommitter, error) {
buf := bytes.NewBuffer(nil)
return buf, func(lnk ipld.Link) error {
c := lnk.(cidlink.Link).Cid
return dstoreTmp.Put(lctx.Ctx, datastore.NewKey(c.String()), buf.Bytes())
}, nil
}

return dagsync.NewSubscriber(r.host, dstoreTmp, linksys, r.topic,
dagsync.HttpTimeout(r.httpTimeout),
dagsync.SegmentDepthLimit(int64(r.syncSegSize)))
}

func (s *scythe) generalDagsyncBlockHook(_ peer.ID, adCid cid.Cid, actions dagsync.SegmentSyncActions) {
// The only kind of block we should get by loading CIDs here should be
// Advertisement.
//
// Because:
// - the default subscription selector only selects advertisements.
// - explicit Ingester.Sync only selects advertisement.
// - entries are synced with an explicit selector separate from
// advertisement syncs and should use dagsync.ScopedBlockHook to
// override this hook and decode chunks instead.
//
// Therefore, we only attempt to load advertisements here and signal
// failure if the load fails.
ad, err := s.loadAd(adCid)
if err != nil {
actions.FailSync(err)
} else if ad.PreviousID != nil {
actions.SetNextSyncCid(ad.PreviousID.(cidlink.Link).Cid)
} else {
actions.SetNextSyncCid(cid.Undef)
}
}

func (s *scythe) reap(ctx context.Context, latestAdCid cid.Cid) error {
log.Infow("Starting GC for provider", "latestAd", latestAdCid, "provider", s.providerID)
if latestAdCid == cid.Undef {
Expand All @@ -617,6 +666,7 @@ func (s *scythe) reap(ctx context.Context, latestAdCid cid.Cid) error {
_, err = s.sub.SyncAdChain(ctx, s.publisher,
dagsync.WithHeadAdCid(latestAdCid),
dagsync.WithStopAdCid(gcState.LastProcessedAdCid),
dagsync.ScopedBlockHook(s.generalDagsyncBlockHook),
)
if err != nil {
return fmt.Errorf("failed to sync advertisement chain: %w", err)
Expand Down Expand Up @@ -873,28 +923,6 @@ func (s *scythe) deleteCarFile(ctx context.Context, adCid cid.Cid) error {
return nil
}

func makeSubscriber(host host.Host, dstoreTmp datastore.Batching, topic string, httpTimeout time.Duration) (*dagsync.Subscriber, error) {
linksys := cidlink.DefaultLinkSystem()

linksys.StorageReadOpener = func(lctx ipld.LinkContext, lnk ipld.Link) (io.Reader, error) {
c := lnk.(cidlink.Link).Cid
val, err := dstoreTmp.Get(lctx.Ctx, datastore.NewKey(c.String()))
if err != nil {
return nil, err
}
return bytes.NewBuffer(val), nil
}
linksys.StorageWriteOpener = func(lctx ipld.LinkContext) (io.Writer, ipld.BlockWriteCommitter, error) {
buf := bytes.NewBuffer(nil)
return buf, func(lnk ipld.Link) error {
c := lnk.(cidlink.Link).Cid
return dstoreTmp.Put(lctx.Ctx, datastore.NewKey(c.String()), buf.Bytes())
}, nil
}

return dagsync.NewSubscriber(host, dstoreTmp, linksys, topic, dagsync.HttpTimeout(httpTimeout))
}

func (s *scythe) loadGCState(ctx context.Context) (GCState, error) {
if s.dstore == nil {
return GCState{}, nil
Expand Down

0 comments on commit 8acc51b

Please sign in to comment.