diff --git a/core/services/ocr2/delegate.go b/core/services/ocr2/delegate.go index ef1ae7c5888..0acb726cba7 100644 --- a/core/services/ocr2/delegate.go +++ b/core/services/ocr2/delegate.go @@ -8,6 +8,7 @@ import ( "log" "time" + "google.golang.org/grpc" "gopkg.in/guregu/null.v4" "github.com/ethereum/go-ethereum/common" @@ -30,6 +31,7 @@ import ( relaylogger "github.com/smartcontractkit/chainlink-relay/pkg/logger" "github.com/smartcontractkit/chainlink-relay/pkg/loop" + "github.com/smartcontractkit/chainlink-relay/pkg/loop/reportingplugins" "github.com/smartcontractkit/chainlink-relay/pkg/types" "github.com/smartcontractkit/chainlink/v2/core/bridges" @@ -428,6 +430,9 @@ func (d *Delegate) ServicesForSpec(jb job.Job) ([]job.ServiceCtx, error) { s4PluginDB := NewDB(d.db, spec.ID, s4PluginId, lggr, d.cfg.Database()) return d.newServicesOCR2Functions(lggr, jb, runResults, bootstrapPeers, kb, ocrDB, thresholdPluginDB, s4PluginDB, lc, ocrLogger) + case types.GenericPlugin: + return d.newServicesGenericPlugin(ctx, lggr, jb, bootstrapPeers, kb, ocrDB, lc, ocrLogger, d.RelayGetter, d.cfg) + default: return nil, errors.Errorf("plugin type %s not supported", spec.PluginType) } @@ -473,6 +478,135 @@ func GetEVMEffectiveTransmitterID(jb *job.Job, chain evm.Chain, lggr logger.Suga return spec.TransmitterID.String, nil } +type coreConfig struct { + Command string `json:"command"` + ProviderType string `json:"provider_type"` + PluginName string `json:"plugin_name"` +} + +type PluginConfig struct { + CoreConfig coreConfig `json:"core_config"` + PluginConfig json.RawMessage +} + +func validateGenericPluginSpec(c coreConfig) error { + if c.Command == "" { + return errors.New("generic config invalid: must provide command path") + } + + if c.PluginName == "" { + return errors.New("generic config invalid: must provide plugin name") + } + + return nil +} + +type connProvider interface { + ClientConn() grpc.ClientConnInterface +} + +func (d *Delegate) newServicesGenericPlugin( + ctx context.Context, + lggr logger.SugaredLogger, + jb job.Job, + bootstrapPeers []commontypes.BootstrapperLocator, + kb ocr2key.KeyBundle, + ocrDB *db, + lc ocrtypes.LocalConfig, + ocrLogger commontypes.Logger, + relayGetter RelayGetter, + registrar plugins.RegistrarConfig, +) (srvs []job.ServiceCtx, err error) { + spec := jb.OCR2OracleSpec + + p := PluginConfig{} + err = json.Unmarshal(spec.PluginConfig.Bytes(), &p) + if err != nil { + return nil, err + } + cconf := p.CoreConfig + + err = validateGenericPluginSpec(cconf) + if err != nil { + return nil, err + } + + rid, err := spec.RelayID() + if err != nil { + return nil, fmt.Errorf("%s services: %w: %w", cconf.PluginName, ErrJobSpecNoRelayer, err) + } + + relayer, err := relayGetter.Get(rid) + if err != nil { + return nil, fmt.Errorf("%s services; failed to get relay %s is it enabled?: %w", p.CoreConfig.PluginName, spec.Relay, err) + } + + provider, err := relayer.NewPluginProvider(ctx, types.RelayArgs{ + ExternalJobID: jb.ExternalJobID, + JobID: spec.ID, + ContractID: spec.ContractID, + New: d.isNewlyCreatedJob, + RelayConfig: spec.RelayConfig.Bytes(), + ProviderType: cconf.ProviderType, + }, types.PluginArgs{ + TransmitterID: spec.TransmitterID.String, + PluginConfig: spec.PluginConfig.Bytes(), + }) + if err != nil { + return nil, err + } + srvs = append(srvs, provider) + + // TODO: figure out what we want to do for telemetry + // TODO: map telemetry type + var t synchronization.TelemetryType + oracleArgs := libocr2.OCR2OracleArgs{ + BinaryNetworkEndpointFactory: d.peerWrapper.Peer2, + V2Bootstrappers: bootstrapPeers, + Database: ocrDB, + LocalConfig: lc, + Logger: ocrLogger, + MonitoringEndpoint: d.monitoringEndpointGen.GenMonitoringEndpoint(spec.ContractID, t, rid.Network, rid.ChainID), + OffchainKeyring: kb, + OnchainKeyring: kb, + ContractTransmitter: provider.ContractTransmitter(), + ContractConfigTracker: provider.ContractConfigTracker(), + OffchainConfigDigester: provider.OffchainConfigDigester(), + } + + pluginLggr := lggr.Named(cconf.PluginName).Named(spec.ContractID).Named(spec.GetID()) + cmdFn, grpcOpts, err := registrar.RegisterLOOP(fmt.Sprintf("%s-%s-%s", cconf.PluginName, spec.ContractID, spec.GetID()), cconf.Command) + if err != nil { + return nil, fmt.Errorf("failed to register loop: %w", err) + } + + errorLog := &errorLog{jobID: jb.ID, recordError: d.jobORM.RecordError} + providerConn, ok := provider.(connProvider) + if !ok { + return nil, errors.New("provider not supported: the provider is not a LOOPP provider") + } + + pluginConfig := types.ReportingPluginServiceConfig{ + PluginName: cconf.PluginName, + Command: cconf.Command, + ProviderType: cconf.ProviderType, + PluginConfig: string(p.PluginConfig), + } + + // TODO: Add the pipeline runner and telemetry service + plugin := reportingplugins.NewLOOPPService(pluginLggr, grpcOpts, cmdFn, pluginConfig, providerConn.ClientConn(), errorLog) + oracleArgs.ReportingPluginFactory = plugin + srvs = append(srvs, plugin) + + oracle, err := libocr2.NewOracle(oracleArgs) + if err != nil { + return nil, err + } + + srvs = append(srvs, job.NewServiceAdapter(oracle)) + return srvs, nil +} + func (d *Delegate) newServicesMercury( ctx context.Context, lggr logger.SugaredLogger, diff --git a/go.mod b/go.mod index fbcfcfc6e87..c7f9d477561 100644 --- a/go.mod +++ b/go.mod @@ -100,6 +100,7 @@ require ( golang.org/x/time v0.3.0 golang.org/x/tools v0.14.0 gonum.org/v1/gonum v0.13.0 + google.golang.org/grpc v1.58.3 google.golang.org/protobuf v1.31.0 gopkg.in/guregu/null.v2 v2.1.2 gopkg.in/guregu/null.v4 v4.0.0 @@ -364,7 +365,6 @@ require ( google.golang.org/genproto v0.0.0-20230717213848-3f92550aa753 // indirect google.golang.org/genproto/googleapis/api v0.0.0-20230717213848-3f92550aa753 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20230717213848-3f92550aa753 // indirect - google.golang.org/grpc v1.58.3 // indirect gopkg.in/ini.v1 v1.67.0 // indirect gopkg.in/natefinch/npipe.v2 v2.0.0-20160621034901-c1b8fa8bdcce // indirect gopkg.in/yaml.v2 v2.4.0 // indirect