Skip to content

Commit

Permalink
feat(shwap/bitswap): add getter tracing
Browse files Browse the repository at this point in the history
  • Loading branch information
Wondertan committed Sep 19, 2024
1 parent 89e8781 commit 5b9669e
Showing 1 changed file with 43 additions and 7 deletions.
50 changes: 43 additions & 7 deletions share/shwap/p2p/bitswap/getter.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@ import (

"github.com/ipfs/boxo/blockstore"
"github.com/ipfs/boxo/exchange"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"

"github.com/celestiaorg/celestia-app/v2/pkg/wrapper"
"github.com/celestiaorg/rsmt2d"
Expand All @@ -16,6 +20,8 @@ import (
"github.com/celestiaorg/celestia-node/share/shwap"
)

var tracer = otel.Tracer("shwap/bitswap")

// Getter implements share.Getter.
type Getter struct {
exchange exchange.SessionExchange
Expand Down Expand Up @@ -74,19 +80,26 @@ func (g *Getter) GetShares(
return nil, fmt.Errorf("empty coordinates")
}

ctx, span := tracer.Start(ctx, "get-shares")
defer span.End()

blks := make([]Block, len(rowIdxs))
for i, rowIdx := range rowIdxs {
sid, err := NewEmptySampleBlock(hdr.Height(), rowIdx, colIdxs[i], len(hdr.DAH.RowRoots))
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, "NewEmptySampleBlock")
return nil, err
}

blks[i] = sid
}

ses := g.session(hdr)
ses := g.session(ctx, hdr)
err := Fetch(ctx, g.exchange, hdr.DAH, blks, WithStore(g.bstore), WithFetcher(ses))
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, "Fetch")
return nil, err
}

Expand All @@ -95,6 +108,7 @@ func (g *Getter) GetShares(
shares[i] = blk.(*SampleBlock).Container.Share
}

span.SetStatus(codes.Ok, "")
return shares, nil
}

Expand Down Expand Up @@ -124,20 +138,27 @@ func (g *Getter) GetEDS(
ctx context.Context,
hdr *header.ExtendedHeader,
) (*rsmt2d.ExtendedDataSquare, error) {
ctx, span := tracer.Start(ctx, "get-eds")
defer span.End()

sqrLn := len(hdr.DAH.RowRoots)
blks := make([]Block, sqrLn/2)
for i := 0; i < sqrLn/2; i++ {
blk, err := NewEmptyRowBlock(hdr.Height(), i, sqrLn)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, "NewEmptyRowBlock")
return nil, err
}

blks[i] = blk
}

ses := g.session(hdr)
ses := g.session(ctx, hdr)
err := Fetch(ctx, g.exchange, hdr.DAH, blks, WithFetcher(ses))
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, "Fetch")
return nil, err
}

Expand All @@ -148,9 +169,12 @@ func (g *Getter) GetEDS(

square, err := edsFromRows(hdr.DAH, rows)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, "edsFromRows")
return nil, err
}

span.SetStatus(codes.Ok, "")
return square, nil
}

Expand All @@ -166,19 +190,26 @@ func (g *Getter) GetSharesByNamespace(
return nil, err
}

ctx, span := tracer.Start(ctx, "get-shares-by-namespace")
defer span.End()

rowIdxs := share.RowsWithNamespace(hdr.DAH, ns)
blks := make([]Block, len(rowIdxs))
for i, rowNdIdx := range rowIdxs {
rndblk, err := NewEmptyRowNamespaceDataBlock(hdr.Height(), rowNdIdx, ns, len(hdr.DAH.RowRoots))
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, "NewEmptyRowNamespaceDataBlock")
return nil, err
}
blks[i] = rndblk
}

ses := g.session(hdr)
ses := g.session(ctx, hdr)
err := Fetch(ctx, g.exchange, hdr.DAH, blks, WithFetcher(ses))
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, "Fetch")
return nil, err
}

Expand All @@ -191,16 +222,21 @@ func (g *Getter) GetSharesByNamespace(
}
}

span.SetStatus(codes.Ok, "")
return nsShrs, nil
}

// session decides which fetching session to use for the given header.
func (g *Getter) session(hdr *header.ExtendedHeader) exchange.Fetcher {
if pruner.IsWithinAvailabilityWindow(hdr.Time(), g.availWndw) {
return g.availableSession
func (g *Getter) session(ctx context.Context, hdr *header.ExtendedHeader) exchange.Fetcher {
session := g.archivalSession

isWithinAvailability := pruner.IsWithinAvailabilityWindow(hdr.Time(), g.availWndw)
if isWithinAvailability {
session = g.availableSession
}

return g.archivalSession
trace.SpanFromContext(ctx).SetAttributes(attribute.Bool("within_availability", isWithinAvailability))
return session
}

// edsFromRows imports given Rows and computes EDS out of them, assuming enough Rows were provided.
Expand Down

0 comments on commit 5b9669e

Please sign in to comment.