Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(p2p): cover Exchange with traces #150

Merged
merged 11 commits into from
Feb 1, 2024
49 changes: 41 additions & 8 deletions p2p/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,19 @@ import (

"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/protocol"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"

"github.com/celestiaorg/go-header"
p2p_pb "github.com/celestiaorg/go-header/p2p/pb"
)

var (
tracerSession = otel.Tracer("header/p2p-session")
)

// errEmptyResponse means that server side closes the connection without sending at least 1
// response.
var errEmptyResponse = errors.New("empty response")
Expand Down Expand Up @@ -77,9 +85,15 @@ func newSession[H header.Header[H]](
func (s *session[H]) getRangeByHeight(
ctx context.Context,
from, amount, headersPerPeer uint64,
) ([]H, error) {
) (_ []H, err error) {
log.Debugw("requesting headers", "from", from, "to", from+amount-1) // -1 need to exclude to+1 height

ctx, span := tracerSession.Start(ctx, "get-range-by-height", trace.WithAttributes(
attribute.Int64("from", int64(from)),
attribute.Int64("to", int64(from+amount-1)),
))
defer span.End()

requests := prepareRequests(from, amount, headersPerPeer)
result := make(chan []H, len(requests))
s.reqCh = make(chan *p2p_pb.HeaderRequest, len(requests))
Expand All @@ -94,8 +108,11 @@ LOOP:
for {
select {
case <-s.ctx.Done():
return nil, errors.New("header/p2p: exchange is closed")
err = errors.New("header/p2p: exchange is closed")
span.SetStatus(codes.Error, err.Error())
return nil, err
case <-ctx.Done():
span.SetStatus(codes.Error, ctx.Err().Error())
return nil, ctx.Err()
case res := <-result:
headers = append(headers, res...)
Expand All @@ -113,6 +130,7 @@ LOOP:
"from", headers[0].Height(),
"to", headers[len(headers)-1].Height(),
)
span.SetStatus(codes.Ok, "")
return headers, nil
}

Expand Down Expand Up @@ -152,19 +170,32 @@ func (s *session[H]) doRequest(
req *p2p_pb.HeaderRequest,
headers chan []H,
) {
ctx, span := tracerSession.Start(ctx, "request-headers-from-peer", trace.WithAttributes(
attribute.String("peerID", stat.peerID.String()),
attribute.Int64("from", int64(req.GetOrigin())),
attribute.Int64("amount", int64(req.Amount)),
))

defer span.SetStatus(codes.Ok, "")
Wondertan marked this conversation as resolved.
Show resolved Hide resolved
defer span.End()

ctx, cancel := context.WithTimeout(ctx, s.requestTimeout)
defer cancel()

r, size, duration, err := sendMessage(ctx, s.host, stat.peerID, s.protocolID, req)
s.metrics.response(ctx, size, duration, err)
if err != nil {
span.AddEvent("error during range fetching", trace.WithAttributes(
attribute.String("error", err.Error())))
// we should not punish peer at this point and should try to parse responses, despite that error
// was received.
log.Debugw("requesting headers from peer failed", "peer", stat.peerID, "err", err)
}

h, err := s.processResponses(r)
if err != nil {
span.AddEvent("processing response failed", trace.WithAttributes(
attribute.String("error", err.Error())))
logFn := log.Errorw

switch err {
Expand Down Expand Up @@ -195,21 +226,23 @@ func (s *session[H]) doRequest(
"requestedAmount", req.Amount,
)

remainingHeaders := req.Amount - uint64(len(h))

span.AddEvent("request succeed", trace.WithAttributes(
attribute.Int64("remaining headers", int64(remainingHeaders))))

// update peer stats
stat.updateStats(size, duration)

responseLn := uint64(len(h))
// ensure that we received the correct amount of headers.
if responseLn < req.Amount {
from := h[responseLn-1].Height()
amount := req.Amount - responseLn

if remainingHeaders > 0 {
from := h[uint64(len(h))-1].Height()
select {
case <-s.ctx.Done():
return
// create a new request with the remaining headers.
// prepareRequests will return a slice with 1 element at this point
case s.reqCh <- prepareRequests(from+1, amount, req.Amount)[0]:
case s.reqCh <- prepareRequests(from+1, remainingHeaders, req.Amount)[0]:
log.Debugw("sending additional request to get remaining headers")
}
}
Expand Down
Loading