Skip to content

Commit

Permalink
Merge pull request #9858 from adlrocha/adlrocha/consistent-bcast
Browse files Browse the repository at this point in the history
feat:networking:  (Synchronous) Consistent Broadcast for Filecoin EC
  • Loading branch information
arajasek committed Mar 30, 2023
2 parents 2db6b12 + 682ddf6 commit 8b2208f
Show file tree
Hide file tree
Showing 11 changed files with 481 additions and 2 deletions.
5 changes: 5 additions & 0 deletions build/params_2k.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package build
import (
"os"
"strconv"
"time"

"github.com/ipfs/go-cid"

Expand Down Expand Up @@ -139,3 +140,7 @@ const BootstrapPeerThreshold = 1
const Eip155ChainId = 31415926

var WhitelistedBlock = cid.Undef

// Reducing the delivery delay for equivocation of
// consistent broadcast to just half a second.
var CBDeliveryDelay = 500 * time.Milisecond
7 changes: 7 additions & 0 deletions build/params_butterfly.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
package build

import (
"time"

"github.com/ipfs/go-cid"

"github.com/filecoin-project/go-address"
Expand Down Expand Up @@ -87,3 +89,8 @@ const BootstrapPeerThreshold = 2
const Eip155ChainId = 3141592

var WhitelistedBlock = cid.Undef

// CBDeliveryDelay is the delay before deliver in the synchronous consistent broadcast.
// This determines the wait time for the detection of potential equivocations.
// It is a variable instead of a constant so it can be conveniently configured in tests
var CBDeliveryDelay = 2 * time.Second
6 changes: 6 additions & 0 deletions build/params_calibnet.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package build
import (
"os"
"strconv"
"time"

"github.com/ipfs/go-cid"

Expand Down Expand Up @@ -122,3 +123,8 @@ const BootstrapPeerThreshold = 4
const Eip155ChainId = 314159

var WhitelistedBlock = cid.Undef

// CBDeliveryDelay is the delay before deliver in the synchronous consistent broadcast.
// This determines the wait time for the detection of potential equivocations.
// It is a variable instead of a constant so it can be conveniently configured in tests
var CBDeliveryDelay = 2 * time.Second
6 changes: 6 additions & 0 deletions build/params_interop.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package build
import (
"os"
"strconv"
"time"

"github.com/ipfs/go-cid"

Expand Down Expand Up @@ -128,3 +129,8 @@ const BootstrapPeerThreshold = 2
const Eip155ChainId = 3141592

var WhitelistedBlock = cid.Undef

// CBDeliveryDelay is the delay before deliver in the synchronous consistent broadcast.
// This determines the wait time for the detection of potential equivocations.
// It is a variable instead of a constant so it can be conveniently configured in tests
var CBDeliveryDelay = 2 * time.Second
6 changes: 6 additions & 0 deletions build/params_mainnet.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"math"
"os"
"strconv"
"time"

"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/abi"
Expand Down Expand Up @@ -137,3 +138,8 @@ const Eip155ChainId = 314

// we skip checks on message validity in this block to sidestep the zero-bls signature
var WhitelistedBlock = MustParseCid("bafy2bzaceapyg2uyzk7vueh3xccxkuwbz3nxewjyguoxvhx77malc2lzn2ybi")

// CBDeliveryDelay is the delay before deliver in the synchronous consistent broadcast.
// This determines the wait time for the detection of potential equivocations.
// It is a variable instead of a constant so it can be conveniently configured in tests
var CBDeliveryDelay = 2 * time.Second
197 changes: 197 additions & 0 deletions chain/sub/bcast/consistent.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
package bcast

import (
"context"
"sync"
"time"

"github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log/v2"
"golang.org/x/xerrors"

"github.com/filecoin-project/go-state-types/abi"

"github.com/filecoin-project/lotus/chain/types"
)

var log = logging.Logger("sub-cb")

const (
// GcSanityCheck determines the number of epochs in the past
// that will be garbage collected from the current epoch.
GcSanityCheck = 100
// GcLookback determines the number of epochs kept in the consistent
// broadcast cache.
GcLookback = 5
// GcDeepCheck determines the number of epochs in the past that we
// we try cleaning in the deep garbage collection round.
GcDeepCheck = 2880 // (24h*60m*60s)/30s per epoch
// GcDeepInterval determines after the number of epochs for which
// we are going to start a deeper garbage collection round.
GcDeepInterval = 1000
)

type blksInfo struct {
ctx context.Context
cancel context.CancelFunc
blks []cid.Cid
}

type bcastDict struct {
m map[string]*blksInfo
}

func (bd *bcastDict) load(key []byte) (*blksInfo, bool) {
v, ok := bd.m[string(key)]
if !ok {
return nil, ok
}
return v, ok
}

func (bd *bcastDict) blkLen(key []byte) int {
return len(bd.m[string(key)].blks)
}

func (bd *bcastDict) store(key []byte, d *blksInfo) {
bd.m[string(key)] = d
}

// ConsistentBCast tracks recent information about the
// blocks and tickets received at different epochs
type ConsistentBCast struct {
lk sync.RWMutex
delay time.Duration
m map[abi.ChainEpoch]*bcastDict
lastDeepGc abi.ChainEpoch
}

func newBcastDict() *bcastDict {
return &bcastDict{m: make(map[string]*blksInfo)}
}

func BCastKey(bh *types.BlockHeader) []byte {
return bh.Ticket.VRFProof
}

func NewConsistentBCast(delay time.Duration) *ConsistentBCast {
return &ConsistentBCast{
delay: delay,
m: make(map[abi.ChainEpoch]*bcastDict),
}
}

func cidExists(cids []cid.Cid, c cid.Cid) bool {
for _, v := range cids {
if v == c {
return true
}
}
return false
}

func (bInfo *blksInfo) eqErr() error {
bInfo.cancel()
return xerrors.Errorf("different blocks with the same ticket already seen")
}

func (cb *ConsistentBCast) Len() int {
cb.lk.RLock()
defer cb.lk.RUnlock()
return len(cb.m)
}

// RcvBlock is called every time a new block is received through the network.
//
// This function keeps track of all the blocks with a specific VRFProof received
// for the same height. Every time a new block with a VRFProof not seen at certain
// height is received, a new timer is triggered to wait for the delay time determined by
// the consistent broadcast before informing the syncer. During this time, if a new
// block with the same VRFProof for that height is received, it means a miner is
// trying to equivocate, and both blocks are discarded.
//
// The delay time should be set to a value high enough to allow any block sent for
// certain epoch to be propagated to a large amount of miners in the network.
func (cb *ConsistentBCast) RcvBlock(ctx context.Context, blk *types.BlockMsg) {
cb.lk.Lock()
defer cb.lk.Unlock()
bcastDict, ok := cb.m[blk.Header.Height]
if !ok {
bcastDict = newBcastDict()
cb.m[blk.Header.Height] = bcastDict
}

key := BCastKey(blk.Header)
blkCid := blk.Cid()

bInfo, ok := bcastDict.load(key)
if ok {
if len(bInfo.blks) > 1 {
log.Errorf("equivocation detected for height %d: %s", blk.Header.Height, bInfo.eqErr())
return
}

if !cidExists(bInfo.blks, blkCid) {
bcastDict.store(key, &blksInfo{bInfo.ctx, bInfo.cancel, append(bInfo.blks, blkCid)})
// By calling bInfo.eqErr() inside this log we cancel the context for all blocks waiting for
// the epoch-ticket combination making them to fail and not be sent to the syncer, as
// a potential equivocation is detected.
log.Errorf("equivocation detected for height %d: %s", blk.Header.Height, bInfo.eqErr())
return
}
return
}

ctx, cancel := context.WithTimeout(ctx, cb.delay)
bcastDict.store(key, &blksInfo{ctx, cancel, []cid.Cid{blkCid}})
}

// WaitForDelivery is called before informing the syncer about a new block
// to check if the consistent broadcast delay triggered or if the block should
// be held off for a bit more time.
func (cb *ConsistentBCast) WaitForDelivery(bh *types.BlockHeader) error {
cb.lk.RLock()
bcastDict := cb.m[bh.Height]
key := BCastKey(bh)
bInfo, ok := bcastDict.load(key)
cb.lk.RUnlock()
if !ok {
return xerrors.Errorf("something went wrong, unknown block with Epoch + VRFProof (cid=%s) in consistent broadcast storage", key)
}

// Wait for the timeout
<-bInfo.ctx.Done()
if bcastDict.blkLen(key) > 1 {
return xerrors.Errorf("equivocation detected for epoch %d. Two blocks being broadcast with same VRFProof", bh.Height)
}
return nil
}

// GarbageCollect cleans the consistent broadcast cache periodically.
//
// A light garbage collection is triggered before every block delivery
// while a deeper one is triggered once every GcDeepCheck to ensure
// that nothing was left behind.
func (cb *ConsistentBCast) GarbageCollect(currEpoch abi.ChainEpoch) {
cb.lk.Lock()
defer cb.lk.Unlock()

// perform a deeper sanity check every now and then
gcRange := GcSanityCheck
if cb.lastDeepGc+GcDeepInterval > currEpoch {
gcRange = GcDeepCheck
cb.lastDeepGc = currEpoch
}

// keep currEpoch-gcRange and delete a few more in the past
// as a sanity-check
// Garbage collection is triggered before block delivery,
// and we use the sanity-check in case there were a few rounds
// without delivery, and the garbage collection wasn't triggered
// for a few epochs.
for i := 0; i < gcRange; i++ {
if currEpoch > GcLookback {
delete(cb.m, currEpoch-abi.ChainEpoch(GcLookback+i))
}
}
}
Loading

0 comments on commit 8b2208f

Please sign in to comment.