Skip to content

Commit

Permalink
Reducing unnecessary attestation processing from gossip (#11906)
Browse files Browse the repository at this point in the history
Don't need to process attestations from gossip if the committee index
associated with the attestation is not subscribed or doesn't require
aggregation.
  • Loading branch information
domiwei committed Sep 6, 2024
1 parent 375b29e commit 55eb461
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 20 deletions.
5 changes: 4 additions & 1 deletion cl/phase1/network/gossip_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,10 @@ func (g *GossipManager) routeAndProcess(ctx context.Context, data *sentinel.Goss
if err := att.DecodeSSZ(data.Data, int(version)); err != nil {
return err
}
return g.attestationService.ProcessMessage(ctx, data.SubnetId, att)
if g.committeeSub.NeedToAggregate(att.AttestantionData().CommitteeIndex()) {
return g.attestationService.ProcessMessage(ctx, data.SubnetId, att)
}
return nil
default:
return fmt.Errorf("unknown topic %s", data.Name)
}
Expand Down
48 changes: 29 additions & 19 deletions cl/validator/committee_subscription/committee_subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,9 +88,8 @@ func NewCommitteeSubscribeManagement(
}

type validatorSub struct {
subnetId uint64
aggregate bool
latestTargetSlot uint64
aggregate bool
largestTargetSlot uint64
}

func (c *CommitteeSubscribeMgmt) AddAttestationSubscription(ctx context.Context, p *cltypes.BeaconCommitteeSubscription) error {
Expand All @@ -111,28 +110,30 @@ func (c *CommitteeSubscribeMgmt) AddAttestationSubscription(ctx context.Context,

if _, ok := c.validatorSubs[cIndex]; !ok {
c.validatorSubs[cIndex] = &validatorSub{
subnetId: subnetId,
aggregate: p.IsAggregator,
latestTargetSlot: slot,
aggregate: p.IsAggregator,
largestTargetSlot: slot,
}
} else {
// set aggregator to true if any validator in the committee is an aggregator
c.validatorSubs[cIndex].aggregate = (c.validatorSubs[cIndex].aggregate || p.IsAggregator)
// update latest target slot
if c.validatorSubs[cIndex].latestTargetSlot < slot {
c.validatorSubs[cIndex].latestTargetSlot = slot
if c.validatorSubs[cIndex].largestTargetSlot < slot {
c.validatorSubs[cIndex].largestTargetSlot = slot
}
}

c.validatorSubsMutex.Unlock()

// set sentinel gossip expiration by subnet id
request := sentinel.RequestSubscribeExpiry{
Topic: gossip.TopicNameBeaconAttestation(subnetId),
ExpiryUnixSecs: uint64(time.Now().Add(30 * time.Minute).Unix()), // temporarily set to 30 minutes
}
if _, err := c.sentinel.SetSubscribeExpiry(ctx, &request); err != nil {
return err
if p.IsAggregator {
epochDuration := time.Duration(c.beaconConfig.SlotsPerEpoch) * time.Duration(c.beaconConfig.SecondsPerSlot) * time.Second
// set sentinel gossip expiration by subnet id
request := sentinel.RequestSubscribeExpiry{
Topic: gossip.TopicNameBeaconAttestation(subnetId),
ExpiryUnixSecs: uint64(time.Now().Add(epochDuration).Unix()), // expire after epoch
}
if _, err := c.sentinel.SetSubscribeExpiry(ctx, &request); err != nil {
return err
}
}
return nil
}
Expand All @@ -150,6 +151,15 @@ func (c *CommitteeSubscribeMgmt) CheckAggregateAttestation(att *solid.Attestatio
return nil
}

func (c *CommitteeSubscribeMgmt) NeedToAggregate(committeeIndex uint64) bool {
c.validatorSubsMutex.RLock()
defer c.validatorSubsMutex.RUnlock()
if sub, ok := c.validatorSubs[committeeIndex]; ok {
return sub.aggregate
}
return false
}

func (c *CommitteeSubscribeMgmt) sweepByStaleSlots(ctx context.Context) {
slotIsStale := func(curSlot, targetSlot uint64) bool {
if curSlot <= targetSlot {
Expand All @@ -158,8 +168,8 @@ func (c *CommitteeSubscribeMgmt) sweepByStaleSlots(ctx context.Context) {
}
return curSlot-targetSlot > c.netConfig.AttestationPropagationSlotRange
}
// sweep every minute
ticker := time.NewTicker(time.Duration(c.beaconConfig.SecondsPerSlot) * time.Second)
// sweep every 3 seconds
ticker := time.NewTicker(3 * time.Second)
defer ticker.Stop()
for {
select {
Expand All @@ -170,11 +180,11 @@ func (c *CommitteeSubscribeMgmt) sweepByStaleSlots(ctx context.Context) {
toRemoves := make([]uint64, 0)
c.validatorSubsMutex.Lock()
for committeeIdx, sub := range c.validatorSubs {
if slotIsStale(curSlot, sub.latestTargetSlot) {
if slotIsStale(curSlot, sub.largestTargetSlot) {
toRemoves = append(toRemoves, committeeIdx)
}
// try remove aggregator flag to avoid unnecessary aggregation
if curSlot > sub.latestTargetSlot {
if curSlot > sub.largestTargetSlot {
sub.aggregate = false
}
}
Expand Down
1 change: 1 addition & 0 deletions cl/validator/committee_subscription/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,5 @@ import (
type CommitteeSubscribe interface {
AddAttestationSubscription(ctx context.Context, p *cltypes.BeaconCommitteeSubscription) error
CheckAggregateAttestation(att *solid.Attestation) error
NeedToAggregate(committeeIndex uint64) bool
}

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 55eb461

Please sign in to comment.