From 55eb4612bffc887130786b4197e4b3a0688c92ce Mon Sep 17 00:00:00 2001 From: Kewei Date: Fri, 6 Sep 2024 20:18:32 +0800 Subject: [PATCH] Reducing unnecessary attestation processing from gossip (#11906) Don't need to process attestations from gossip if the committee index associated with the attestation is not subscribed or doesn't require aggregation. --- cl/phase1/network/gossip_manager.go | 5 +- .../committee_subscription.go | 48 +++++++++++-------- .../committee_subscription/interface.go | 1 + .../mock_services/committee_subscribe_mock.go | 38 +++++++++++++++ 4 files changed, 72 insertions(+), 20 deletions(-) diff --git a/cl/phase1/network/gossip_manager.go b/cl/phase1/network/gossip_manager.go index 8e56bd851b0..ee5c0feff9e 100644 --- a/cl/phase1/network/gossip_manager.go +++ b/cl/phase1/network/gossip_manager.go @@ -228,7 +228,10 @@ func (g *GossipManager) routeAndProcess(ctx context.Context, data *sentinel.Goss if err := att.DecodeSSZ(data.Data, int(version)); err != nil { return err } - return g.attestationService.ProcessMessage(ctx, data.SubnetId, att) + if g.committeeSub.NeedToAggregate(att.AttestantionData().CommitteeIndex()) { + return g.attestationService.ProcessMessage(ctx, data.SubnetId, att) + } + return nil default: return fmt.Errorf("unknown topic %s", data.Name) } diff --git a/cl/validator/committee_subscription/committee_subscription.go b/cl/validator/committee_subscription/committee_subscription.go index 5b4a9134443..4744078afb1 100644 --- a/cl/validator/committee_subscription/committee_subscription.go +++ b/cl/validator/committee_subscription/committee_subscription.go @@ -88,9 +88,8 @@ func NewCommitteeSubscribeManagement( } type validatorSub struct { - subnetId uint64 - aggregate bool - latestTargetSlot uint64 + aggregate bool + largestTargetSlot uint64 } func (c *CommitteeSubscribeMgmt) AddAttestationSubscription(ctx context.Context, p *cltypes.BeaconCommitteeSubscription) error { @@ -111,28 +110,30 @@ func (c *CommitteeSubscribeMgmt) AddAttestationSubscription(ctx context.Context, if _, ok := c.validatorSubs[cIndex]; !ok { c.validatorSubs[cIndex] = &validatorSub{ - subnetId: subnetId, - aggregate: p.IsAggregator, - latestTargetSlot: slot, + aggregate: p.IsAggregator, + largestTargetSlot: slot, } } else { // set aggregator to true if any validator in the committee is an aggregator c.validatorSubs[cIndex].aggregate = (c.validatorSubs[cIndex].aggregate || p.IsAggregator) // update latest target slot - if c.validatorSubs[cIndex].latestTargetSlot < slot { - c.validatorSubs[cIndex].latestTargetSlot = slot + if c.validatorSubs[cIndex].largestTargetSlot < slot { + c.validatorSubs[cIndex].largestTargetSlot = slot } } c.validatorSubsMutex.Unlock() - // set sentinel gossip expiration by subnet id - request := sentinel.RequestSubscribeExpiry{ - Topic: gossip.TopicNameBeaconAttestation(subnetId), - ExpiryUnixSecs: uint64(time.Now().Add(30 * time.Minute).Unix()), // temporarily set to 30 minutes - } - if _, err := c.sentinel.SetSubscribeExpiry(ctx, &request); err != nil { - return err + if p.IsAggregator { + epochDuration := time.Duration(c.beaconConfig.SlotsPerEpoch) * time.Duration(c.beaconConfig.SecondsPerSlot) * time.Second + // set sentinel gossip expiration by subnet id + request := sentinel.RequestSubscribeExpiry{ + Topic: gossip.TopicNameBeaconAttestation(subnetId), + ExpiryUnixSecs: uint64(time.Now().Add(epochDuration).Unix()), // expire after epoch + } + if _, err := c.sentinel.SetSubscribeExpiry(ctx, &request); err != nil { + return err + } } return nil } @@ -150,6 +151,15 @@ func (c *CommitteeSubscribeMgmt) CheckAggregateAttestation(att *solid.Attestatio return nil } +func (c *CommitteeSubscribeMgmt) NeedToAggregate(committeeIndex uint64) bool { + c.validatorSubsMutex.RLock() + defer c.validatorSubsMutex.RUnlock() + if sub, ok := c.validatorSubs[committeeIndex]; ok { + return sub.aggregate + } + return false +} + func (c *CommitteeSubscribeMgmt) sweepByStaleSlots(ctx context.Context) { slotIsStale := func(curSlot, targetSlot uint64) bool { if curSlot <= targetSlot { @@ -158,8 +168,8 @@ func (c *CommitteeSubscribeMgmt) sweepByStaleSlots(ctx context.Context) { } return curSlot-targetSlot > c.netConfig.AttestationPropagationSlotRange } - // sweep every minute - ticker := time.NewTicker(time.Duration(c.beaconConfig.SecondsPerSlot) * time.Second) + // sweep every 3 seconds + ticker := time.NewTicker(3 * time.Second) defer ticker.Stop() for { select { @@ -170,11 +180,11 @@ func (c *CommitteeSubscribeMgmt) sweepByStaleSlots(ctx context.Context) { toRemoves := make([]uint64, 0) c.validatorSubsMutex.Lock() for committeeIdx, sub := range c.validatorSubs { - if slotIsStale(curSlot, sub.latestTargetSlot) { + if slotIsStale(curSlot, sub.largestTargetSlot) { toRemoves = append(toRemoves, committeeIdx) } // try remove aggregator flag to avoid unnecessary aggregation - if curSlot > sub.latestTargetSlot { + if curSlot > sub.largestTargetSlot { sub.aggregate = false } } diff --git a/cl/validator/committee_subscription/interface.go b/cl/validator/committee_subscription/interface.go index 1c2f3bf0149..0dc47b4f10f 100644 --- a/cl/validator/committee_subscription/interface.go +++ b/cl/validator/committee_subscription/interface.go @@ -27,4 +27,5 @@ import ( type CommitteeSubscribe interface { AddAttestationSubscription(ctx context.Context, p *cltypes.BeaconCommitteeSubscription) error CheckAggregateAttestation(att *solid.Attestation) error + NeedToAggregate(committeeIndex uint64) bool } diff --git a/cl/validator/committee_subscription/mock_services/committee_subscribe_mock.go b/cl/validator/committee_subscription/mock_services/committee_subscribe_mock.go index 58dd1c84e69..a92d667e24e 100644 --- a/cl/validator/committee_subscription/mock_services/committee_subscribe_mock.go +++ b/cl/validator/committee_subscription/mock_services/committee_subscribe_mock.go @@ -116,3 +116,41 @@ func (c *MockCommitteeSubscribeCheckAggregateAttestationCall) DoAndReturn(f func c.Call = c.Call.DoAndReturn(f) return c } + +// NeedToAggregate mocks base method. +func (m *MockCommitteeSubscribe) NeedToAggregate(arg0 uint64) bool { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "NeedToAggregate", arg0) + ret0, _ := ret[0].(bool) + return ret0 +} + +// NeedToAggregate indicates an expected call of NeedToAggregate. +func (mr *MockCommitteeSubscribeMockRecorder) NeedToAggregate(arg0 any) *MockCommitteeSubscribeNeedToAggregateCall { + mr.mock.ctrl.T.Helper() + call := mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "NeedToAggregate", reflect.TypeOf((*MockCommitteeSubscribe)(nil).NeedToAggregate), arg0) + return &MockCommitteeSubscribeNeedToAggregateCall{Call: call} +} + +// MockCommitteeSubscribeNeedToAggregateCall wrap *gomock.Call +type MockCommitteeSubscribeNeedToAggregateCall struct { + *gomock.Call +} + +// Return rewrite *gomock.Call.Return +func (c *MockCommitteeSubscribeNeedToAggregateCall) Return(arg0 bool) *MockCommitteeSubscribeNeedToAggregateCall { + c.Call = c.Call.Return(arg0) + return c +} + +// Do rewrite *gomock.Call.Do +func (c *MockCommitteeSubscribeNeedToAggregateCall) Do(f func(uint64) bool) *MockCommitteeSubscribeNeedToAggregateCall { + c.Call = c.Call.Do(f) + return c +} + +// DoAndReturn rewrite *gomock.Call.DoAndReturn +func (c *MockCommitteeSubscribeNeedToAggregateCall) DoAndReturn(f func(uint64) bool) *MockCommitteeSubscribeNeedToAggregateCall { + c.Call = c.Call.DoAndReturn(f) + return c +}