Skip to content

Commit

Permalink
Merge branch 'devel' into e35
Browse files Browse the repository at this point in the history
  • Loading branch information
awskii committed Apr 15, 2024
2 parents 8f11915 + 6a9ce5a commit b03d25f
Show file tree
Hide file tree
Showing 10 changed files with 34 additions and 42 deletions.
2 changes: 1 addition & 1 deletion cl/phase1/execution_client/execution_client_direct.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func (cc *ExecutionClientDirect) SupportInsertion() bool {
}

func (cc *ExecutionClientDirect) InsertBlocks(ctx context.Context, blocks []*types.Block, wait bool) error {
if !wait {
if wait {
return cc.chainRW.InsertBlocksAndWait(ctx, blocks)
}
return cc.chainRW.InsertBlocks(ctx, blocks)
Expand Down
8 changes: 4 additions & 4 deletions cmd/downloader/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,7 @@ var torrentMagnet = &cobra.Command{

func manifestVerify(ctx context.Context, logger log.Logger) error {
webseedsList := common.CliString2Array(webseeds)
if len(webseedsList) == 0 { //fallback to default if exact list not passed
if len(webseedsList) == 0 { // fallback to default if exact list not passed
if known, ok := snapcfg.KnownWebseeds[chain]; ok {
webseedsList = append(webseedsList, known...)
}
Expand Down Expand Up @@ -440,9 +440,9 @@ func manifestVerify(ctx context.Context, logger log.Logger) error {
continue
}
}

_ = webseedFileProviders // todo add support of file providers
logger.Warn("file providers are not supported yet", "fileProviders", webseedFileProviders)
if len(webseedFileProviders) > 0 {
logger.Warn("file providers are not supported yet", "fileProviders", webseedFileProviders)
}

wseed := downloader.NewWebSeeds(webseedHttpProviders, log.LvlDebug, logger)
return wseed.VerifyManifestedBuckets(ctx, verifyFailfast)
Expand Down
9 changes: 5 additions & 4 deletions erigon-lib/downloader/webseed.go
Original file line number Diff line number Diff line change
Expand Up @@ -454,10 +454,11 @@ func (d *WebSeeds) retrieveFileEtag(ctx context.Context, file *url.URL) (string,
if etag == "" {
return "", fmt.Errorf("webseed.http: file has no etag, url=%s", file.String())
}
etag = strings.Trim(etag, "\"")
if strings.Contains(etag, "-") {
return etag, ErrInvalidEtag
}
// Todo(awskii): figure out reason why multipart etags contains "-" and remove this check
//etag = strings.Trim(etag, "\"")
//if strings.Contains(etag, "-") {
// return etag, ErrInvalidEtag
//}
return etag, nil
}

Expand Down
14 changes: 6 additions & 8 deletions polygon/heimdall/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,18 +63,16 @@ func (c *Checkpoint) UnmarshalJSON(b []byte) error {

type Checkpoints []*Checkpoint

func (cs *Checkpoints) Len() int {
return len(*cs)
func (cs Checkpoints) Len() int {
return len(cs)
}

func (cs *Checkpoints) Less(i, j int) bool {
v := *cs
return v[i].StartBlock().Uint64() < v[j].StartBlock().Uint64()
func (cs Checkpoints) Less(i, j int) bool {
return cs[i].StartBlock().Uint64() < cs[j].StartBlock().Uint64()
}

func (cs *Checkpoints) Swap(i, j int) {
v := *cs
v[i], v[j] = v[j], v[i]
func (cs Checkpoints) Swap(i, j int) {
cs[i], cs[j] = cs[j], cs[i]
}

type CheckpointResponse struct {
Expand Down
2 changes: 1 addition & 1 deletion polygon/heimdall/heimdall.go
Original file line number Diff line number Diff line change
Expand Up @@ -633,7 +633,7 @@ func (h *heimdall) batchFetchCheckpoints(
page++
}

sort.Sort(&checkpoints)
sort.Sort(checkpoints)

for i, checkpoint := range checkpoints[lastStored:] {
err := store.PutCheckpoint(ctx, CheckpointId(i+1), checkpoint)
Expand Down
3 changes: 2 additions & 1 deletion polygon/p2p/fetcher_tracking_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,8 +161,9 @@ func TestTrackingFetcherFetchBodiesUpdatesPeerTracker(t *testing.T) {

func newTrackingFetcherTest(t *testing.T, requestIdGenerator RequestIdGenerator) *trackingFetcherTest {
fetcherTest := newFetcherTest(t, requestIdGenerator)
logger := fetcherTest.logger
peerTracker := newPeerTracker(PreservingPeerShuffle)
unregister := fetcherTest.messageListener.RegisterPeerEventObserver(NewPeerEventObserver(peerTracker))
unregister := fetcherTest.messageListener.RegisterPeerEventObserver(NewPeerEventObserver(logger, peerTracker))
t.Cleanup(unregister)
trackingFetcher := newTrackingFetcher(fetcherTest.fetcher, peerTracker)
return &trackingFetcherTest{
Expand Down
17 changes: 0 additions & 17 deletions polygon/p2p/peer_penalizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,23 +11,6 @@ type PeerPenalizer interface {
Penalize(ctx context.Context, peerId *PeerId) error
}

func NewTrackingPeerPenalizer(peerPenalizer PeerPenalizer, peerTracker PeerTracker) PeerPenalizer {
return &trackingPeerPenalizer{
PeerPenalizer: peerPenalizer,
peerTracker: peerTracker,
}
}

type trackingPeerPenalizer struct {
PeerPenalizer
peerTracker PeerTracker
}

func (p *trackingPeerPenalizer) Penalize(ctx context.Context, peerId *PeerId) error {
p.peerTracker.PeerDisconnected(peerId)
return p.PeerPenalizer.Penalize(ctx, peerId)
}

func NewPeerPenalizer(sentryClient direct.SentryClient) PeerPenalizer {
return &peerPenalizer{
sentryClient: sentryClient,
Expand Down
12 changes: 9 additions & 3 deletions polygon/p2p/peer_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package p2p
import (
"sync"

"github.com/ledgerwatch/log/v3"

"github.com/ledgerwatch/erigon-lib/gointerfaces/sentry"
)

Expand Down Expand Up @@ -90,13 +92,17 @@ func (pt *peerTracker) updatePeerSyncProgress(peerId *PeerId, update func(psp *p
update(peerSyncProgress)
}

func NewPeerEventObserver(peerTracker PeerTracker) MessageObserver[*sentry.PeerEvent] {
func NewPeerEventObserver(logger log.Logger, peerTracker PeerTracker) MessageObserver[*sentry.PeerEvent] {
return func(message *sentry.PeerEvent) {
peerId := PeerIdFromH512(message.PeerId)

logger.Debug("[p2p.peerEventObserver] received new peer event", "id", message.EventId, "peerId", peerId)

switch message.EventId {
case sentry.PeerEvent_Connect:
peerTracker.PeerConnected(PeerIdFromH512(message.PeerId))
peerTracker.PeerConnected(peerId)
case sentry.PeerEvent_Disconnect:
peerTracker.PeerDisconnected(PeerIdFromH512(message.PeerId))
peerTracker.PeerDisconnected(peerId)
}
}
}
5 changes: 4 additions & 1 deletion polygon/p2p/peer_tracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,11 @@ import (
"testing"
"time"

"github.com/ledgerwatch/log/v3"
"github.com/stretchr/testify/require"

"github.com/ledgerwatch/erigon-lib/gointerfaces/sentry"
"github.com/ledgerwatch/erigon/turbo/testlog"
)

func TestPeerTracker(t *testing.T) {
Expand Down Expand Up @@ -48,8 +50,9 @@ func TestPeerTracker(t *testing.T) {
func TestPeerTrackerPeerEventObserver(t *testing.T) {
t.Parallel()

logger := testlog.Logger(t, log.LvlInfo)
peerTracker := newPeerTracker(PreservingPeerShuffle)
peerTrackerPeerEventObserver := NewPeerEventObserver(peerTracker)
peerTrackerPeerEventObserver := NewPeerEventObserver(logger, peerTracker)
messageListenerTest := newMessageListenerTest(t)
messageListenerTest.mockSentryStreams()
messageListenerTest.run(func(ctx context.Context, t *testing.T) {
Expand Down
4 changes: 2 additions & 2 deletions polygon/p2p/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,9 @@ func newService(
requestIdGenerator RequestIdGenerator,
) *service {
peerTracker := NewPeerTracker()
peerPenalizer := NewTrackingPeerPenalizer(NewPeerPenalizer(sentryClient), peerTracker)
peerPenalizer := NewPeerPenalizer(sentryClient)
messageListener := NewMessageListener(logger, sentryClient, statusDataFactory, peerPenalizer)
messageListener.RegisterPeerEventObserver(NewPeerEventObserver(peerTracker))
messageListener.RegisterPeerEventObserver(NewPeerEventObserver(logger, peerTracker))
messageSender := NewMessageSender(sentryClient)
fetcher := NewFetcher(fetcherConfig, messageListener, messageSender, requestIdGenerator)
fetcher = NewPenalizingFetcher(logger, fetcher, peerPenalizer)
Expand Down

0 comments on commit b03d25f

Please sign in to comment.