Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BCF-2463] Add generic job type for lightweight OCR plugins #10665

Merged
merged 5 commits into from
Nov 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
169 changes: 157 additions & 12 deletions core/services/ocr2/delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"log"
"time"

"google.golang.org/grpc"
"gopkg.in/guregu/null.v4"

"github.com/ethereum/go-ethereum/common"
Expand All @@ -30,6 +31,7 @@ import (

relaylogger "github.com/smartcontractkit/chainlink-relay/pkg/logger"
"github.com/smartcontractkit/chainlink-relay/pkg/loop"
"github.com/smartcontractkit/chainlink-relay/pkg/loop/reportingplugins"
"github.com/smartcontractkit/chainlink-relay/pkg/types"

"github.com/smartcontractkit/chainlink/v2/core/bridges"
Expand All @@ -43,6 +45,7 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/dkg"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/dkg/persistence"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/functions"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/generic"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/median"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/mercury"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ocr2keeper"
Expand Down Expand Up @@ -70,7 +73,28 @@ import (
"github.com/smartcontractkit/chainlink/v2/plugins"
)

var ErrJobSpecNoRelayer = errors.New("OCR2 job spec could not get relayer id")
type ErrJobSpecNoRelayer struct {
PluginName string
Err error
}

func (e ErrJobSpecNoRelayer) Unwrap() error { return e.Err }

func (e ErrJobSpecNoRelayer) Error() string {
return fmt.Sprintf("%s services: OCR2 job spec could not get relayer ID: %s", e.PluginName, e.Err)
}

type ErrRelayNotEnabled struct {
PluginName string
Relay string
Err error
}

func (e ErrRelayNotEnabled) Unwrap() error { return e.Err }

func (e ErrRelayNotEnabled) Error() string {
return fmt.Sprintf("%s services: failed to get relay %s, is it enabled? %s", e.PluginName, e.Relay, e.Err)
}

type RelayGetter interface {
Get(id relay.ID) (loop.Relayer, error)
Expand Down Expand Up @@ -245,7 +269,7 @@ func (d *Delegate) OnDeleteJob(jb job.Job, q pg.Queryer) error {

rid, err := spec.RelayID()
if err != nil {
d.lggr.Errorw("DeleteJob: "+ErrJobSpecNoRelayer.Error(), "err", err)
d.lggr.Errorw("DeleteJob", "err", ErrJobSpecNoRelayer{Err: err, PluginName: string(spec.PluginType)})
return nil
}
// we only have clean to do for the EVM
Expand Down Expand Up @@ -337,7 +361,7 @@ func (d *Delegate) ServicesForSpec(jb job.Job) ([]job.ServiceCtx, error) {

rid, err := spec.RelayID()
if err != nil {
return nil, fmt.Errorf("ServicesForSpec: %w: %w", ErrJobSpecNoRelayer, err)
return nil, ErrJobSpecNoRelayer{Err: err, PluginName: string(spec.PluginType)}
}

if rid.Network == relay.EVM {
Expand Down Expand Up @@ -428,6 +452,9 @@ func (d *Delegate) ServicesForSpec(jb job.Job) ([]job.ServiceCtx, error) {
s4PluginDB := NewDB(d.db, spec.ID, s4PluginId, lggr, d.cfg.Database())
return d.newServicesOCR2Functions(lggr, jb, runResults, bootstrapPeers, kb, ocrDB, thresholdPluginDB, s4PluginDB, lc, ocrLogger)

case types.GenericPlugin:
return d.newServicesGenericPlugin(ctx, lggr, jb, bootstrapPeers, kb, ocrDB, lc, ocrLogger)

default:
return nil, errors.Errorf("plugin type %s not supported", spec.PluginType)
}
Expand Down Expand Up @@ -473,6 +500,124 @@ func GetEVMEffectiveTransmitterID(jb *job.Job, chain evm.Chain, lggr logger.Suga
return spec.TransmitterID.String, nil
}

type connProvider interface {
ClientConn() grpc.ClientConnInterface
}

func defaultPathFromPluginName(pluginName string) string {
// By default we install the command on the system path, in the
// form: `chainlink-<plugin name>`
return fmt.Sprintf("chainlink-%s", pluginName)
}

func (d *Delegate) newServicesGenericPlugin(
ctx context.Context,
lggr logger.SugaredLogger,
jb job.Job,
bootstrapPeers []commontypes.BootstrapperLocator,
kb ocr2key.KeyBundle,
ocrDB *db,
lc ocrtypes.LocalConfig,
ocrLogger commontypes.Logger,
) (srvs []job.ServiceCtx, err error) {
spec := jb.OCR2OracleSpec

p := validate.OCR2GenericPluginConfig{}
err = json.Unmarshal(spec.PluginConfig.Bytes(), &p)
if err != nil {
return nil, err
}
cconf := p.CoreConfig

command := cconf.Command
if command == "" {
command = defaultPathFromPluginName(cconf.PluginName)
}

// NOTE: we don't need to validate this config, since that happens as part of creating the job.
// See: validate/validate.go's `validateSpec`.

rid, err := spec.RelayID()
if err != nil {
return nil, ErrJobSpecNoRelayer{PluginName: cconf.PluginName, Err: err}
}

relayer, err := d.RelayGetter.Get(rid)
if err != nil {
return nil, ErrRelayNotEnabled{Err: err, Relay: spec.Relay, PluginName: p.CoreConfig.PluginName}
}

provider, err := relayer.NewPluginProvider(ctx, types.RelayArgs{
ExternalJobID: jb.ExternalJobID,
JobID: spec.ID,
ContractID: spec.ContractID,
New: d.isNewlyCreatedJob,
RelayConfig: spec.RelayConfig.Bytes(),
ProviderType: cconf.ProviderType,
}, types.PluginArgs{
TransmitterID: spec.TransmitterID.String,
PluginConfig: spec.PluginConfig.Bytes(),
})
if err != nil {
return nil, err
}
srvs = append(srvs, provider)

oracleEndpoint := d.monitoringEndpointGen.GenMonitoringEndpoint(
spec.ContractID,
synchronization.TelemetryType(cconf.TelemetryType),
rid.Network,
rid.ChainID,
)
oracleArgs := libocr2.OCR2OracleArgs{
BinaryNetworkEndpointFactory: d.peerWrapper.Peer2,
V2Bootstrappers: bootstrapPeers,
Database: ocrDB,
LocalConfig: lc,
Logger: ocrLogger,
MonitoringEndpoint: oracleEndpoint,
OffchainKeyring: kb,
OnchainKeyring: kb,
ContractTransmitter: provider.ContractTransmitter(),
ContractConfigTracker: provider.ContractConfigTracker(),
OffchainConfigDigester: provider.OffchainConfigDigester(),
}

pluginLggr := lggr.Named(cconf.PluginName).Named(spec.ContractID).Named(spec.GetID())
cmdFn, grpcOpts, err := d.cfg.RegisterLOOP(fmt.Sprintf("%s-%s-%s", cconf.PluginName, spec.ContractID, spec.GetID()), command)
if err != nil {
return nil, fmt.Errorf("failed to register loop: %w", err)
}

errorLog := &errorLog{jobID: jb.ID, recordError: d.jobORM.RecordError}
providerConn, ok := provider.(connProvider)
if !ok {
return nil, errors.New("provider not supported: the provider is not a LOOPP provider")
}

pluginConfig := types.ReportingPluginServiceConfig{
PluginName: cconf.PluginName,
Command: command,
ProviderType: cconf.ProviderType,
PluginConfig: string(p.PluginConfig),
}

pr := generic.NewPipelineRunnerAdapter(pluginLggr, jb, d.pipelineRunner)
ta := generic.NewTelemetryAdapter(d.monitoringEndpointGen)

plugin := reportingplugins.NewLOOPPService(pluginLggr, grpcOpts, cmdFn, pluginConfig, providerConn.ClientConn(), pr, ta, errorLog)
oracleArgs.ReportingPluginFactory = plugin
srvs = append(srvs, plugin)

oracle, err := libocr2.NewOracle(oracleArgs)
if err != nil {
return nil, err
}

srvs = append(srvs, job.NewServiceAdapter(oracle))
return srvs, nil
}

func (d *Delegate) newServicesMercury(
ctx context.Context,
lggr logger.SugaredLogger,
Expand All @@ -498,14 +643,14 @@ func (d *Delegate) newServicesMercury(

rid, err := spec.RelayID()
if err != nil {
return nil, fmt.Errorf("mercury services: %w: %w", ErrJobSpecNoRelayer, err)
return nil, ErrJobSpecNoRelayer{Err: err, PluginName: "mercury"}
}
if rid.Network != relay.EVM {
return nil, fmt.Errorf("mercury services: expected EVM relayer got %s", rid.Network)
}
relayer, err := d.RelayGetter.Get(rid)
if err != nil {
return nil, fmt.Errorf("failed to get relay %s is it enabled?: %w", spec.Relay, err)
return nil, ErrRelayNotEnabled{Err: err, Relay: spec.Relay, PluginName: "mercury"}
}
chain, err := d.legacyChains.Get(rid.ChainID)
if err != nil {
Expand Down Expand Up @@ -574,7 +719,7 @@ func (d *Delegate) newServicesMedian(

rid, err := spec.RelayID()
if err != nil {
return nil, fmt.Errorf("median services: %w: %w", ErrJobSpecNoRelayer, err)
return nil, ErrJobSpecNoRelayer{Err: err, PluginName: "median"}
}

oracleArgsNoPlugin := libocr2.OCR2OracleArgs{
Expand All @@ -593,7 +738,7 @@ func (d *Delegate) newServicesMedian(

relayer, err := d.RelayGetter.Get(rid)
if err != nil {
return nil, fmt.Errorf("median services; failed to get relay %s is it enabled?: %w", spec.Relay, err)
return nil, ErrRelayNotEnabled{Err: err, PluginName: "median", Relay: spec.Relay}
}

medianServices, err2 := median.NewMedianServices(ctx, jb, d.isNewlyCreatedJob, relayer, d.pipelineRunner, runResults, lggr, oracleArgsNoPlugin, mConfig, enhancedTelemChan, errorLog)
Expand All @@ -618,7 +763,7 @@ func (d *Delegate) newServicesDKG(
spec := jb.OCR2OracleSpec
rid, err := spec.RelayID()
if err != nil {
return nil, fmt.Errorf("DKG services: %w: %w", ErrJobSpecNoRelayer, err)
return nil, ErrJobSpecNoRelayer{Err: err, PluginName: "DKG"}
}
if rid.Network != relay.EVM {
return nil, fmt.Errorf("DKG services: expected EVM relayer got %s", rid.Network)
Expand Down Expand Up @@ -687,7 +832,7 @@ func (d *Delegate) newServicesOCR2VRF(

rid, err := spec.RelayID()
if err != nil {
return nil, fmt.Errorf("VRF services: %w: %w", ErrJobSpecNoRelayer, err)
return nil, ErrJobSpecNoRelayer{Err: err, PluginName: "VRF"}
}
if rid.Network != relay.EVM {
return nil, fmt.Errorf("VRF services: expected EVM relayer got %s", rid.Network)
Expand Down Expand Up @@ -912,7 +1057,7 @@ func (d *Delegate) newServicesOCR2Keepers21(
mc := d.cfg.Mercury().Credentials(credName)
rid, err := spec.RelayID()
if err != nil {
return nil, fmt.Errorf("keeper2 services: %w: %w", ErrJobSpecNoRelayer, err)
return nil, ErrJobSpecNoRelayer{Err: err, PluginName: "keeper2"}
}
if rid.Network != relay.EVM {
return nil, fmt.Errorf("keeper2 services: expected EVM relayer got %s", rid.Network)
Expand Down Expand Up @@ -1026,7 +1171,7 @@ func (d *Delegate) newServicesOCR2Keepers20(

rid, err := spec.RelayID()
if err != nil {
return nil, fmt.Errorf("keepers2.0 services: %w: %w", ErrJobSpecNoRelayer, err)
return nil, ErrJobSpecNoRelayer{Err: err, PluginName: "keepers2.0"}
}
if rid.Network != relay.EVM {
return nil, fmt.Errorf("keepers2.0 services: expected EVM relayer got %s", rid.Network)
Expand Down Expand Up @@ -1161,7 +1306,7 @@ func (d *Delegate) newServicesOCR2Functions(

rid, err := spec.RelayID()
if err != nil {
return nil, fmt.Errorf("functions services: %w: %w", ErrJobSpecNoRelayer, err)
return nil, ErrJobSpecNoRelayer{Err: err, PluginName: "functions"}
}
if rid.Network != relay.EVM {
return nil, fmt.Errorf("functions services: expected EVM relayer got %s", rid.Network)
Expand Down
7 changes: 7 additions & 0 deletions core/services/ocr2/plugins/generic/helpers_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package generic

import "github.com/smartcontractkit/libocr/commontypes"

func (t *TelemetryAdapter) Endpoints() map[[4]string]commontypes.MonitoringEndpoint {
return t.endpoints
}
32 changes: 32 additions & 0 deletions core/services/ocr2/plugins/generic/merge_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package generic

import (
"reflect"
"testing"

"github.com/stretchr/testify/assert"
)

func TestMerge(t *testing.T) {
vars := map[string]interface{}{
"jb": map[string]interface{}{
"databaseID": "some-job-id",
},
}
addedVars := map[string]interface{}{
"jb": map[string]interface{}{
"some-other-var": "foo",
},
"val": 0,
}

merge(vars, addedVars)

assert.True(t, reflect.DeepEqual(vars, map[string]interface{}{
"jb": map[string]interface{}{
"databaseID": "some-job-id",
"some-other-var": "foo",
},
"val": 0,
}), vars)
}
Loading
Loading