Skip to content

Commit

Permalink
polygon/p2p: fix p2p fetcher peer errors on chain tip (#11927)
Browse files Browse the repository at this point in the history
More issues surfaced on chain tip when testing astrid:
1. context deadline exceeded when requesting new block event at tip from
peer - can happen, safe to ignore event and continue instead of
crashing:
```
[EROR] [09-06|03:41:00.183] [2/6 PolygonSync] stopping node          err="await *eth.BlockHeadersPacket66 response interrupted: context deadline exceeded"
```
2. Noticed we do not penalise peers for penalize-able errors when
calling `FetchBlocks` - added that in
3. We got another error that crashed the process -
`ErrNonSequentialHeaderNumbers` - it is safe to ignore new block event
if this happens and continue
```
EROR[09-05|20:26:35.141] [2/6 PolygonSync] stopping node          err="non sequential header numbers in fetch headers response: current=11608859, expected=11608860"
```
4. Added all other p2p errors that may happen and are safe to ignore at
tip event processing
5. Added debug logging for better visibility into chain tip events
6. Fixed missing check for whether we have already processed a new block
event (ie if its hash is already contained in the canonical chain
builder)
  • Loading branch information
taratorio committed Sep 10, 2024
1 parent d599dc8 commit 4acd212
Show file tree
Hide file tree
Showing 4 changed files with 284 additions and 114 deletions.
42 changes: 21 additions & 21 deletions polygon/p2p/fetcher_base_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -543,27 +543,27 @@ func newFetcherTest(t *testing.T, requestIdGenerator RequestIdGenerator) *fetche
messageSender := NewMessageSender(sentryClient)
fetcher := newFetcher(fetcherConfig, messageListener, messageSender, requestIdGenerator)
return &fetcherTest{
ctx: ctx,
ctxCancel: cancel,
t: t,
fetcher: fetcher,
logger: logger,
sentryClient: sentryClient,
messageListener: messageListener,
headersRequestResponseMocks: map[uint64]requestResponseMock{},
ctx: ctx,
ctxCancel: cancel,
t: t,
fetcher: fetcher,
logger: logger,
sentryClient: sentryClient,
messageListener: messageListener,
requestResponseMocks: map[uint64]requestResponseMock{},
}
}

type fetcherTest struct {
ctx context.Context
ctxCancel context.CancelFunc
t *testing.T
fetcher *fetcher
logger log.Logger
sentryClient *direct.MockSentryClient
messageListener MessageListener
headersRequestResponseMocks map[uint64]requestResponseMock
peerEvents chan *delayedMessage[*sentryproto.PeerEvent]
ctx context.Context
ctxCancel context.CancelFunc
t *testing.T
fetcher *fetcher
logger log.Logger
sentryClient *direct.MockSentryClient
messageListener MessageListener
requestResponseMocks map[uint64]requestResponseMock
peerEvents chan *delayedMessage[*sentryproto.PeerEvent]
}

func (ft *fetcherTest) run(f func(ctx context.Context, t *testing.T)) {
Expand Down Expand Up @@ -611,7 +611,7 @@ func (ft *fetcherTest) mockSentryInboundMessagesStream(mocks ...requestResponseM
var numInboundMessages int
for _, mock := range mocks {
numInboundMessages += len(mock.mockResponseInboundMessages)
ft.headersRequestResponseMocks[mock.requestId] = mock
ft.requestResponseMocks[mock.requestId] = mock
}

inboundMessageStreamChan := make(chan *delayedMessage[*sentryproto.InboundMessage], numInboundMessages)
Expand Down Expand Up @@ -643,7 +643,7 @@ func (ft *fetcherTest) mockSentryInboundMessagesStream(mocks ...requestResponseM
return nil, err
}

delete(ft.headersRequestResponseMocks, mock.requestId)
delete(ft.requestResponseMocks, mock.requestId)
for _, inboundMessage := range mock.mockResponseInboundMessages {
inboundMessageStreamChan <- &delayedMessage[*sentryproto.InboundMessage]{
message: inboundMessage,
Expand All @@ -668,7 +668,7 @@ func (ft *fetcherTest) mockSendMessageByIdForHeaders(req *sentryproto.SendMessag
return requestResponseMock{}, err
}

mock, ok := ft.headersRequestResponseMocks[pkt.RequestId]
mock, ok := ft.requestResponseMocks[pkt.RequestId]
if !ok {
return requestResponseMock{}, fmt.Errorf("unexpected request id %d", pkt.RequestId)
}
Expand Down Expand Up @@ -699,7 +699,7 @@ func (ft *fetcherTest) mockSendMessageByIdForBodies(req *sentryproto.SendMessage
return requestResponseMock{}, err
}

mock, ok := ft.headersRequestResponseMocks[pkt.RequestId]
mock, ok := ft.requestResponseMocks[pkt.RequestId]
if !ok {
return requestResponseMock{}, fmt.Errorf("unexpected request id %d", pkt.RequestId)
}
Expand Down
42 changes: 35 additions & 7 deletions polygon/p2p/fetcher_penalizing.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,23 +31,42 @@ func NewPenalizingFetcher(logger log.Logger, fetcher Fetcher, peerPenalizer Peer
}

func newPenalizingFetcher(logger log.Logger, fetcher Fetcher, peerPenalizer PeerPenalizer) *penalizingFetcher {
fetchHeadersPenalizeErrs := []error{
&ErrTooManyHeaders{},
&ErrNonSequentialHeaderNumbers{},
}

fetchBodiesPenalizeErrs := []error{
&ErrTooManyBodies{},
}

fetchBlocksPenalizeErrs := make([]error, 0, len(fetchHeadersPenalizeErrs)+len(fetchBodiesPenalizeErrs))
fetchBlocksPenalizeErrs = append(fetchBlocksPenalizeErrs, fetchHeadersPenalizeErrs...)
fetchBlocksPenalizeErrs = append(fetchBlocksPenalizeErrs, fetchBodiesPenalizeErrs...)

return &penalizingFetcher{
Fetcher: fetcher,
logger: logger,
peerPenalizer: peerPenalizer,
Fetcher: fetcher,
logger: logger,
peerPenalizer: peerPenalizer,
fetchHeadersPenalizeErrs: fetchHeadersPenalizeErrs,
fetchBodiesPenalizeErrs: fetchBodiesPenalizeErrs,
fetchBlocksPenalizeErrs: fetchBlocksPenalizeErrs,
}
}

type penalizingFetcher struct {
Fetcher
logger log.Logger
peerPenalizer PeerPenalizer
logger log.Logger
peerPenalizer PeerPenalizer
fetchHeadersPenalizeErrs []error
fetchBodiesPenalizeErrs []error
fetchBlocksPenalizeErrs []error
}

func (pf *penalizingFetcher) FetchHeaders(ctx context.Context, start uint64, end uint64, peerId *PeerId) (FetcherResponse[[]*types.Header], error) {
headers, err := pf.Fetcher.FetchHeaders(ctx, start, end, peerId)
if err != nil {
return FetcherResponse[[]*types.Header]{}, pf.maybePenalize(ctx, peerId, err, &ErrTooManyHeaders{}, &ErrNonSequentialHeaderNumbers{})
return FetcherResponse[[]*types.Header]{}, pf.maybePenalize(ctx, peerId, err, pf.fetchHeadersPenalizeErrs...)
}

return headers, nil
Expand All @@ -56,12 +75,21 @@ func (pf *penalizingFetcher) FetchHeaders(ctx context.Context, start uint64, end
func (pf *penalizingFetcher) FetchBodies(ctx context.Context, headers []*types.Header, peerId *PeerId) (FetcherResponse[[]*types.Body], error) {
bodies, err := pf.Fetcher.FetchBodies(ctx, headers, peerId)
if err != nil {
return FetcherResponse[[]*types.Body]{}, pf.maybePenalize(ctx, peerId, err, &ErrTooManyBodies{})
return FetcherResponse[[]*types.Body]{}, pf.maybePenalize(ctx, peerId, err, pf.fetchBodiesPenalizeErrs...)
}

return bodies, nil
}

func (pf *penalizingFetcher) FetchBlocks(ctx context.Context, start uint64, end uint64, peerId *PeerId) (FetcherResponse[[]*types.Block], error) {
blocks, err := pf.Fetcher.FetchBlocks(ctx, start, end, peerId)
if err != nil {
return FetcherResponse[[]*types.Block]{}, pf.maybePenalize(ctx, peerId, err, pf.fetchBlocksPenalizeErrs...)
}

return blocks, nil
}

func (pf *penalizingFetcher) maybePenalize(ctx context.Context, peerId *PeerId, err error, penalizeErrs ...error) error {
var shouldPenalize bool
for _, penalizeErr := range penalizeErrs {
Expand Down
Loading

0 comments on commit 4acd212

Please sign in to comment.