Skip to content

Commit

Permalink
handle future chain import and skip peer drop (ethereum#650)
Browse files Browse the repository at this point in the history
* handle future chain import and skip peer drop

* add block import metric

* params: bump version to v0.3.3-stable
  • Loading branch information
manav2401 committed Jan 7, 2023
1 parent 3eb234c commit b480db1
Show file tree
Hide file tree
Showing 9 changed files with 53 additions and 39 deletions.
6 changes: 6 additions & 0 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ var (
snapshotStorageReadTimer = metrics.NewRegisteredTimer("chain/snapshot/storage/reads", nil)
snapshotCommitTimer = metrics.NewRegisteredTimer("chain/snapshot/commits", nil)

blockImportTimer = metrics.NewRegisteredMeter("chain/imports", nil)
blockInsertTimer = metrics.NewRegisteredTimer("chain/inserts", nil)
blockValidationTimer = metrics.NewRegisteredTimer("chain/validation", nil)
blockExecutionTimer = metrics.NewRegisteredTimer("chain/execution", nil)
Expand Down Expand Up @@ -1518,6 +1519,11 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals, setHead bool)
it := newInsertIterator(chain, results, bc.validator)
block, err := it.next()

// Update the block import meter; it will just record chains we've received
// from other peers. (Note that the actual chain which gets imported would be
// quite low).
blockImportTimer.Mark(int64(len(headers)))

// Check the validity of incoming chain
isValid, err1 := bc.forker.ValidateReorg(bc.CurrentBlock().Header(), headers)
if err1 != nil {
Expand Down
4 changes: 1 addition & 3 deletions core/forkchoice.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,9 +114,7 @@ func (f *ForkChoice) ReorgNeeded(current *types.Header, header *types.Header) (b
func (f *ForkChoice) ValidateReorg(current *types.Header, chain []*types.Header) (bool, error) {
// Call the bor chain validator service
if f.validator != nil {
if isValid := f.validator.IsValidChain(current, chain); !isValid {
return false, nil
}
return f.validator.IsValidChain(current, chain)
}

return true, nil
Expand Down
30 changes: 15 additions & 15 deletions core/forkchoice_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,15 @@ import (

// chainValidatorFake is a mock for the chain validator service
type chainValidatorFake struct {
validate func(currentHeader *types.Header, chain []*types.Header) bool
validate func(currentHeader *types.Header, chain []*types.Header) (bool, error)
}

// chainReaderFake is a mock for the chain reader service
type chainReaderFake struct {
getTd func(hash common.Hash, number uint64) *big.Int
}

func newChainValidatorFake(validate func(currentHeader *types.Header, chain []*types.Header) bool) *chainValidatorFake {
func newChainValidatorFake(validate func(currentHeader *types.Header, chain []*types.Header) (bool, error)) *chainValidatorFake {
return &chainValidatorFake{validate: validate}
}

Expand All @@ -46,18 +46,18 @@ func TestPastChainInsert(t *testing.T) {
getTd := func(hash common.Hash, number uint64) *big.Int {
return big.NewInt(int64(number))
}
validate := func(currentHeader *types.Header, chain []*types.Header) bool {
validate := func(currentHeader *types.Header, chain []*types.Header) (bool, error) {
// Put all explicit conditions here
// If canonical chain is empty and we're importing a chain of 64 blocks
if currentHeader.Number.Uint64() == uint64(0) && len(chain) == 64 {
return true
return true, nil
}
// If canonical chain is of len 64 and we're importing a past chain from 54-64, then accept it
if currentHeader.Number.Uint64() == uint64(64) && chain[0].Number.Uint64() == 55 && len(chain) == 10 {
return true
return true, nil
}

return false
return false, nil
}
mockChainReader := newChainReaderFake(getTd)
mockChainValidator := newChainValidatorFake(validate)
Expand Down Expand Up @@ -116,18 +116,18 @@ func TestFutureChainInsert(t *testing.T) {
getTd := func(hash common.Hash, number uint64) *big.Int {
return big.NewInt(int64(number))
}
validate := func(currentHeader *types.Header, chain []*types.Header) bool {
validate := func(currentHeader *types.Header, chain []*types.Header) (bool, error) {
// Put all explicit conditions here
// If canonical chain is empty and we're importing a chain of 64 blocks
if currentHeader.Number.Uint64() == uint64(0) && len(chain) == 64 {
return true
return true, nil
}
// If length of future chains > some value, they should not be accepted
if currentHeader.Number.Uint64() == uint64(64) && len(chain) <= 10 {
return true
return true, nil
}

return false
return false, nil
}
mockChainReader := newChainReaderFake(getTd)
mockChainValidator := newChainValidatorFake(validate)
Expand Down Expand Up @@ -174,18 +174,18 @@ func TestOverlappingChainInsert(t *testing.T) {
getTd := func(hash common.Hash, number uint64) *big.Int {
return big.NewInt(int64(number))
}
validate := func(currentHeader *types.Header, chain []*types.Header) bool {
validate := func(currentHeader *types.Header, chain []*types.Header) (bool, error) {
// Put all explicit conditions here
// If canonical chain is empty and we're importing a chain of 64 blocks
if currentHeader.Number.Uint64() == uint64(0) && len(chain) == 64 {
return true
return true, nil
}
// If length of chain is > some fixed value then don't accept it
if currentHeader.Number.Uint64() == uint64(64) && len(chain) <= 20 {
return true
return true, nil
}

return false
return false, nil
}
mockChainReader := newChainReaderFake(getTd)
mockChainValidator := newChainValidatorFake(validate)
Expand Down Expand Up @@ -227,7 +227,7 @@ func (c *chainReaderFake) GetTd(hash common.Hash, number uint64) *big.Int {
func (w *chainValidatorFake) IsValidPeer(remoteHeader *types.Header, fetchHeadersByNumber func(number uint64, amount int, skip int, reverse bool) ([]*types.Header, []common.Hash, error)) (bool, error) {
return true, nil
}
func (w *chainValidatorFake) IsValidChain(current *types.Header, headers []*types.Header) bool {
func (w *chainValidatorFake) IsValidChain(current *types.Header, headers []*types.Header) (bool, error) {
return w.validate(current, headers)
}
func (w *chainValidatorFake) ProcessCheckpoint(endBlockNum uint64, endBlockHash common.Hash) {}
Expand Down
11 changes: 7 additions & 4 deletions eth/downloader/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,10 +357,6 @@ func (d *Downloader) LegacySync(id string, head common.Hash, td, ttd *big.Int, m
return err // This is an expected fault, don't keep printing it in a spin-loop
}

if errors.Is(err, whitelist.ErrNoRemoteCheckoint) {
log.Warn("Doesn't have remote checkpoint yet", "peer", id, "err", err)
}

log.Warn("Synchronisation failed, retrying", "peer", id, "err", err)

return err
Expand Down Expand Up @@ -1581,6 +1577,13 @@ func (d *Downloader) importBlockResults(results []*fetchResult) error {
// of the blocks delivered from the downloader, and the indexing will be off.
log.Debug("Downloaded item processing failed on sidechain import", "index", index, "err", err)
}

// If we've received too long future chain error (from whitelisting service),
// return that as the root error and `errInvalidChain` as context.
if errors.Is(err, whitelist.ErrLongFutureChain) {
return fmt.Errorf("%v: %w", errInvalidChain, err)
}

return fmt.Errorf("%w: %v", errInvalidChain, err)
}
return nil
Expand Down
4 changes: 2 additions & 2 deletions eth/downloader/downloader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1426,8 +1426,8 @@ func (w *whitelistFake) IsValidPeer(_ *types.Header, _ func(number uint64, amoun
return w.validate(w.count)
}

func (w *whitelistFake) IsValidChain(current *types.Header, headers []*types.Header) bool {
return true
func (w *whitelistFake) IsValidChain(current *types.Header, headers []*types.Header) (bool, error) {
return true, nil
}
func (w *whitelistFake) ProcessCheckpoint(_ uint64, _ common.Hash) {}

Expand Down
15 changes: 8 additions & 7 deletions eth/downloader/whitelist/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ func NewService(maxCapacity uint) *Service {

var (
ErrCheckpointMismatch = errors.New("checkpoint mismatch")
ErrLongFutureChain = errors.New("received future chain of unacceptable length")
ErrNoRemoteCheckoint = errors.New("remote peer doesn't have a checkoint")
)

Expand Down Expand Up @@ -74,16 +75,16 @@ func (w *Service) IsValidPeer(remoteHeader *types.Header, fetchHeadersByNumber f

// IsValidChain checks the validity of chain by comparing it
// against the local checkpoint entries
func (w *Service) IsValidChain(currentHeader *types.Header, chain []*types.Header) bool {
func (w *Service) IsValidChain(currentHeader *types.Header, chain []*types.Header) (bool, error) {
// Check if we have checkpoints to validate incoming chain in memory
if len(w.checkpointWhitelist) == 0 {
// We don't have any entries, no additional validation will be possible
return true
return true, nil
}

// Return if we've received empty chain
if len(chain) == 0 {
return false
return false, nil
}

var (
Expand All @@ -95,7 +96,7 @@ func (w *Service) IsValidChain(currentHeader *types.Header, chain []*types.Heade
if chain[len(chain)-1].Number.Uint64() < oldestCheckpointNumber {
// We have future whitelisted entries, so no additional validation will be possible
// This case will occur when bor is in middle of sync, but heimdall is ahead/fully synced.
return true
return true, nil
}

// Split the chain into past and future chain
Expand All @@ -109,18 +110,18 @@ func (w *Service) IsValidChain(currentHeader *types.Header, chain []*types.Heade

// Don't accept future chain of unacceptable length (from current block)
if len(futureChain)+offset > int(w.checkpointInterval) {
return false
return false, ErrLongFutureChain
}

// Iterate over the chain and validate against the last checkpoint
// It will handle all cases where the incoming chain has atleast one checkpoint
for i := len(pastChain) - 1; i >= 0; i-- {
if _, ok := w.checkpointWhitelist[pastChain[i].Number.Uint64()]; ok {
return pastChain[i].Hash() == w.checkpointWhitelist[pastChain[i].Number.Uint64()]
return pastChain[i].Hash() == w.checkpointWhitelist[pastChain[i].Number.Uint64()], nil
}
}

return true
return true, nil
}

func splitChain(current uint64, chain []*types.Header) ([]*types.Header, []*types.Header) {
Expand Down
18 changes: 12 additions & 6 deletions eth/downloader/whitelist/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,9 @@ func TestIsValidChain(t *testing.T) {
s := NewMockService(10, 10)
chainA := createMockChain(1, 20) // A1->A2...A19->A20
// case1: no checkpoint whitelist, should consider the chain as valid
res := s.IsValidChain(nil, chainA)
res, err := s.IsValidChain(nil, chainA)
require.Equal(t, res, true, "expected chain to be valid")
require.Equal(t, err, nil, "expected error to be nil")

tempChain := createMockChain(21, 22) // A21->A22

Expand All @@ -132,8 +133,9 @@ func TestIsValidChain(t *testing.T) {

// case2: We're behind the oldest whitelisted block entry, should consider
// the chain as valid as we're still far behind the latest blocks
res = s.IsValidChain(chainA[len(chainA)-1], chainA)
res, err = s.IsValidChain(chainA[len(chainA)-1], chainA)
require.Equal(t, res, true, "expected chain to be valid")
require.Equal(t, err, nil, "expected error to be nil")

// Clear checkpoint whitelist and add blocks A5 and A15 in whitelist
s.PurgeCheckpointWhitelist()
Expand All @@ -144,8 +146,9 @@ func TestIsValidChain(t *testing.T) {

// case3: Try importing a past chain having valid checkpoint, should
// consider the chain as valid
res = s.IsValidChain(chainA[len(chainA)-1], chainA)
res, err = s.IsValidChain(chainA[len(chainA)-1], chainA)
require.Equal(t, res, true, "expected chain to be valid")
require.Equal(t, err, nil, "expected error to be nil")

// Clear checkpoint whitelist and mock blocks in whitelist
tempChain = createMockChain(20, 20) // A20
Expand All @@ -156,22 +159,25 @@ func TestIsValidChain(t *testing.T) {
require.Equal(t, s.length(), 1, "expected 1 items in whitelist")

// case4: Try importing a past chain having invalid checkpoint
res = s.IsValidChain(chainA[len(chainA)-1], chainA)
res, _ = s.IsValidChain(chainA[len(chainA)-1], chainA)
require.Equal(t, res, false, "expected chain to be invalid")
// Not checking error here because we return nil in case of checkpoint mismatch

// create a future chain to be imported of length <= `checkpointInterval`
chainB := createMockChain(21, 30) // B21->B22...B29->B30

// case5: Try importing a future chain of acceptable length
res = s.IsValidChain(chainA[len(chainA)-1], chainB)
res, err = s.IsValidChain(chainA[len(chainA)-1], chainB)
require.Equal(t, res, true, "expected chain to be valid")
require.Equal(t, err, nil, "expected error to be nil")

// create a future chain to be imported of length > `checkpointInterval`
chainB = createMockChain(21, 40) // C21->C22...C39->C40

// case5: Try importing a future chain of unacceptable length
res = s.IsValidChain(chainA[len(chainA)-1], chainB)
res, err = s.IsValidChain(chainA[len(chainA)-1], chainB)
require.Equal(t, res, false, "expected chain to be invalid")
require.Equal(t, err, ErrLongFutureChain, "expected error")
}

func TestSplitChain(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ type StateSyncFilter struct {
// interface for whitelist service
type ChainValidator interface {
IsValidPeer(remoteHeader *types.Header, fetchHeadersByNumber func(number uint64, amount int, skip int, reverse bool) ([]*types.Header, []common.Hash, error)) (bool, error)
IsValidChain(currentHeader *types.Header, chain []*types.Header) bool
IsValidChain(currentHeader *types.Header, chain []*types.Header) (bool, error)
ProcessCheckpoint(endBlockNum uint64, endBlockHash common.Hash)
GetCheckpointWhitelist() map[uint64]common.Hash
PurgeCheckpointWhitelist()
Expand Down
2 changes: 1 addition & 1 deletion params/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
const (
VersionMajor = 0 // Major version component of the current release
VersionMinor = 3 // Minor version component of the current release
VersionPatch = 2 // Patch version component of the current release
VersionPatch = 3 // Patch version component of the current release
VersionMeta = "stable" // Version metadata to append to the version string
)

Expand Down

0 comments on commit b480db1

Please sign in to comment.