Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[KS-79] Remote trigger setup #12845

Merged
merged 1 commit into from
Apr 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/orange-squids-kick.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": patch
---

#internal Remote Trigger setup
24 changes: 17 additions & 7 deletions core/capabilities/remote/trigger_publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
sync "sync"
"time"

"github.com/smartcontractkit/chainlink-common/pkg/capabilities"
commoncap "github.com/smartcontractkit/chainlink-common/pkg/capabilities"
"github.com/smartcontractkit/chainlink-common/pkg/capabilities/pb"
"github.com/smartcontractkit/chainlink-common/pkg/services"
Expand Down Expand Up @@ -87,17 +88,21 @@ func (p *triggerPublisher) Receive(msg *types.MessageBody) {
key := registrationKey{msg.CallerDonId, req.Metadata.WorkflowID}
nowMs := time.Now().UnixMilli()
p.mu.Lock()
defer p.mu.Unlock()
p.messageCache.Insert(key, sender, nowMs, msg.Payload)
_, exists := p.registrations[key]
if exists {
p.lggr.Debugw("trigger registration already exists", "capabilityId", p.capInfo.ID, "workflowId", req.Metadata.WorkflowID)
return
}
// NOTE: require 2F+1 by default, introduce different strategies later (KS-76)
minRequired := uint32(2*callerDon.F + 1)
ready, payloads := p.messageCache.Ready(key, minRequired, nowMs-int64(p.config.RegistrationExpiryMs), false)
p.mu.Unlock()
if !ready {
p.lggr.Debugw("not ready to aggregate yet", "capabilityId", p.capInfo.ID, "workflowId", req.Metadata.WorkflowID, "minRequired", minRequired)
return
}
agg := NewDefaultModeAggregator(uint32(callerDon.F + 1))
aggregated, err := agg.Aggregate("", payloads)
aggregated, err := AggregateModeRaw(payloads, uint32(callerDon.F+1))
if err != nil {
p.lggr.Errorw("failed to aggregate trigger registrations", "capabilityId", p.capInfo.ID, "workflowId", req.Metadata.WorkflowID, "err", err)
return
Expand All @@ -107,7 +112,6 @@ func (p *triggerPublisher) Receive(msg *types.MessageBody) {
p.lggr.Errorw("failed to unmarshal request", "capabilityId", p.capInfo.ID, "err", err)
return
}
p.mu.Lock()
callbackCh := make(chan commoncap.CapabilityResponse)
ctx, cancel := p.stopCh.NewCtx()
err = p.underlying.RegisterTrigger(ctx, callbackCh, unmarshaled)
Expand All @@ -123,7 +127,6 @@ func (p *triggerPublisher) Receive(msg *types.MessageBody) {
} else {
p.lggr.Errorw("failed to register trigger", "capabilityId", p.capInfo.ID, "workflowId", req.Metadata.WorkflowID, "err", err)
}
p.mu.Unlock()
} else {
p.lggr.Errorw("received trigger request with unknown method", "method", msg.Method, "sender", sender)
}
Expand Down Expand Up @@ -171,7 +174,13 @@ func (p *triggerPublisher) triggerEventLoop(callbackCh chan commoncap.Capability
p.lggr.Infow("triggerEventLoop channel closed", "capabilityId", p.capInfo.ID, "workflowId", key.workflowId)
return
}
p.lggr.Debugw("received trigger event", "capabilityId", p.capInfo.ID, "workflowId", key.workflowId)
triggerEvent := capabilities.TriggerEvent{}
err := response.Value.UnwrapTo(&triggerEvent)
if err != nil {
p.lggr.Errorw("can't unwrap trigger event", "capabilityId", p.capInfo.ID, "workflowId", key.workflowId, "err", err)
break
}
p.lggr.Debugw("received trigger event", "capabilityId", p.capInfo.ID, "workflowId", key.workflowId, "triggerEventID", triggerEvent.ID)
marshaled, err := pb.MarshalCapabilityResponse(response)
if err != nil {
p.lggr.Debugw("can't marshal trigger event", "err", err)
Expand All @@ -186,7 +195,8 @@ func (p *triggerPublisher) triggerEventLoop(callbackCh chan commoncap.Capability
Metadata: &types.MessageBody_TriggerEventMetadata{
TriggerEventMetadata: &types.TriggerEventMetadata{
// NOTE: optionally introduce batching across workflows as an optimization
WorkflowIds: []string{key.workflowId},
WorkflowIds: []string{key.workflowId},
TriggerEventId: triggerEvent.ID,
},
},
}
Expand Down
15 changes: 6 additions & 9 deletions core/capabilities/remote/trigger_subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ func (s *triggerSubscriber) RegisterTrigger(ctx context.Context, callback chan<-
callback: callback,
rawRequest: rawRequest,
}
s.lggr.Infow("RegisterTrigger called", "capabilityId", s.capInfo.ID, "donId", s.capDonInfo.ID, "workflowID", request.Metadata.WorkflowID)
return nil
}

Expand All @@ -114,8 +115,8 @@ func (s *triggerSubscriber) registrationLoop() {
case <-s.stopCh:
return
case <-ticker.C:
s.lggr.Infow("register trigger for remote capability", "capabilityId", s.capInfo.ID, "donId", s.capDonInfo.ID, "nMembers", len(s.capDonInfo.Members))
s.mu.RLock()
s.lggr.Infow("register trigger for remote capability", "capabilityId", s.capInfo.ID, "donId", s.capDonInfo.ID, "nMembers", len(s.capDonInfo.Members), "nWorkflows", len(s.registeredWorkflows))
for _, registration := range s.registeredWorkflows {
// NOTE: send to all by default, introduce different strategies later (KS-76)
for _, peerID := range s.capDonInfo.Members {
Expand Down Expand Up @@ -180,18 +181,14 @@ func (s *triggerSubscriber) Receive(msg *types.MessageBody) {
continue
}
if ready {
s.lggr.Debugw("trigger event ready to aggregate", "triggerEventID", meta.TriggerEventId, "capabilityId", s.capInfo.ID, "workflowId", workflowId)
aggregatedResponse, err := s.aggregator.Aggregate(meta.TriggerEventId, payloads)
if err != nil {
s.lggr.Errorw("failed to aggregate responses", "capabilityId", s.capInfo.ID, "workflowId", workflowId, "err", err)
s.lggr.Errorw("failed to aggregate responses", "triggerEventID", meta.TriggerEventId, "capabilityId", s.capInfo.ID, "workflowId", workflowId, "err", err)
continue
}
unmarshaled, err := pb.UnmarshalCapabilityResponse(aggregatedResponse)
if err != nil {
s.lggr.Errorw("failed to unmarshal responses", "capabilityId", s.capInfo.ID, "workflowId", workflowId, "err", err)
continue
}
s.lggr.Info("remote trigger event aggregated", "triggerEventID", meta.TriggerEventId, "capabilityId", s.capInfo.ID, "workflowId", workflowId)
registration.callback <- unmarshaled
s.lggr.Infow("remote trigger event aggregated", "triggerEventID", meta.TriggerEventId, "capabilityId", s.capInfo.ID, "workflowId", workflowId)
registration.callback <- aggregatedResponse
}
}
} else {
Expand Down
3 changes: 2 additions & 1 deletion core/capabilities/remote/types/types.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package types

import (
commoncap "github.com/smartcontractkit/chainlink-common/pkg/capabilities"
p2ptypes "github.com/smartcontractkit/chainlink/v2/core/services/p2p/types"
)

Expand All @@ -23,7 +24,7 @@ type Receiver interface {
}

type Aggregator interface {
Aggregate(eventID string, responses [][]byte) ([]byte, error)
Aggregate(eventID string, responses [][]byte) (commoncap.CapabilityResponse, error)
}

// NOTE: this type will become part of the Registry (KS-108)
Expand Down
25 changes: 20 additions & 5 deletions core/capabilities/remote/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (

"google.golang.org/protobuf/proto"

commoncap "github.com/smartcontractkit/chainlink-common/pkg/capabilities"
"github.com/smartcontractkit/chainlink-common/pkg/capabilities/pb"
remotetypes "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/types"
p2ptypes "github.com/smartcontractkit/chainlink/v2/core/services/p2p/types"
)
Expand Down Expand Up @@ -60,16 +62,29 @@ func NewDefaultModeAggregator(minIdenticalResponses uint32) *defaultModeAggregat
}
}

func (a *defaultModeAggregator) Aggregate(_ string, responses [][]byte) ([]byte, error) {
func (a *defaultModeAggregator) Aggregate(_ string, responses [][]byte) (commoncap.CapabilityResponse, error) {
found, err := AggregateModeRaw(responses, a.minIdenticalResponses)
if err != nil {
return commoncap.CapabilityResponse{}, fmt.Errorf("failed to aggregate responses, err: %w", err)
}

unmarshaled, err := pb.UnmarshalCapabilityResponse(found)
if err != nil {
return commoncap.CapabilityResponse{}, fmt.Errorf("failed to unmarshal aggregated responses, err: %w", err)
}
return unmarshaled, nil
}

func AggregateModeRaw(elemList [][]byte, minIdenticalResponses uint32) ([]byte, error) {
hashToCount := make(map[string]uint32)
var found []byte
for _, resp := range responses {
for _, elem := range elemList {
hasher := sha256.New()
hasher.Write(resp)
hasher.Write(elem)
sha := hex.EncodeToString(hasher.Sum(nil))
hashToCount[sha]++
if hashToCount[sha] >= a.minIdenticalResponses {
found = resp
if hashToCount[sha] >= minIdenticalResponses {
found = elem
break
}
}
Expand Down
40 changes: 21 additions & 19 deletions core/capabilities/remote/utils_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package remote_test

import (
"bytes"
"crypto/ed25519"
"crypto/rand"
"testing"
Expand Down Expand Up @@ -90,29 +89,32 @@ func TestToPeerID(t *testing.T) {
}

func TestDefaultModeAggregator_Aggregate(t *testing.T) {
capResponse1 := marshalCapabilityResponse(t, triggerEvent1, nil)
capResponse2 := marshalCapabilityResponse(t, triggerEvent2, nil)
val, err := values.Wrap(triggerEvent1)
require.NoError(t, err)
capResponse1 := commoncap.CapabilityResponse{
Value: val,
Err: nil,
}
marshaled1, err := pb.MarshalCapabilityResponse(capResponse1)
require.NoError(t, err)

val2, err := values.Wrap(triggerEvent2)
require.NoError(t, err)
capResponse2 := commoncap.CapabilityResponse{
Value: val2,
Err: nil,
}
marshaled2, err := pb.MarshalCapabilityResponse(capResponse2)
require.NoError(t, err)

agg := remote.NewDefaultModeAggregator(2)
_, err := agg.Aggregate("", [][]byte{capResponse1})
_, err = agg.Aggregate("", [][]byte{marshaled1})
require.Error(t, err)

_, err = agg.Aggregate("", [][]byte{capResponse1, capResponse2})
_, err = agg.Aggregate("", [][]byte{marshaled1, marshaled2})
require.Error(t, err)

res, err := agg.Aggregate("", [][]byte{capResponse1, capResponse2, capResponse1})
require.NoError(t, err)
require.True(t, bytes.Equal(res, capResponse1))
}

func marshalCapabilityResponse(t *testing.T, capValue any, capError error) []byte {
val, err := values.Wrap(capValue)
require.NoError(t, err)
capResponse := commoncap.CapabilityResponse{
Value: val,
Err: capError,
}
marshaled, err := pb.MarshalCapabilityResponse(capResponse)
res, err := agg.Aggregate("", [][]byte{marshaled1, marshaled2, marshaled1})
require.NoError(t, err)
return marshaled
require.Equal(t, res, capResponse1)
}
Loading
Loading