Skip to content

Commit

Permalink
Merge branch 'develop' into fix/functions-reorg-handling-logpoller
Browse files Browse the repository at this point in the history
  • Loading branch information
KuphJr committed Oct 31, 2023
2 parents e605645 + 921a89c commit 8a3479e
Show file tree
Hide file tree
Showing 12 changed files with 140 additions and 292 deletions.
2 changes: 1 addition & 1 deletion core/services/directrequest/delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func (d *Delegate) ServicesForSpec(jb job.Job) ([]job.ServiceCtx, error) {
if err != nil {
return nil, err
}
concreteSpec := job.LoadEnvConfigVarsDR(chain.Config().EVM(), *jb.DirectRequestSpec)
concreteSpec := job.SetDRMinIncomingConfirmations(chain.Config().EVM().MinIncomingConfirmations(), *jb.DirectRequestSpec)

oracle, err := operator_wrapper.NewOperator(concreteSpec.ContractAddress.Address(), chain.Client())
if err != nil {
Expand Down
74 changes: 31 additions & 43 deletions core/services/job/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"gopkg.in/guregu/null.v4"

"github.com/smartcontractkit/chainlink-relay/pkg/types"

"github.com/smartcontractkit/chainlink/v2/core/assets"
"github.com/smartcontractkit/chainlink/v2/core/bridges"
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/config/toml"
Expand Down Expand Up @@ -233,35 +234,25 @@ func (pr *PipelineRun) SetID(value string) error {

// OCROracleSpec defines the job spec for OCR jobs.
type OCROracleSpec struct {
ID int32 `toml:"-"`
ContractAddress ethkey.EIP55Address `toml:"contractAddress"`
P2PBootstrapPeers pq.StringArray `toml:"p2pBootstrapPeers" db:"p2p_bootstrap_peers"`
P2PV2Bootstrappers pq.StringArray `toml:"p2pv2Bootstrappers" db:"p2pv2_bootstrappers"`
IsBootstrapPeer bool `toml:"isBootstrapPeer"`
EncryptedOCRKeyBundleID *models.Sha256Hash `toml:"keyBundleID"`
EncryptedOCRKeyBundleIDEnv bool
TransmitterAddress *ethkey.EIP55Address `toml:"transmitterAddress"`
TransmitterAddressEnv bool
ObservationTimeout models.Interval `toml:"observationTimeout"`
ObservationTimeoutEnv bool
BlockchainTimeout models.Interval `toml:"blockchainTimeout"`
BlockchainTimeoutEnv bool
ContractConfigTrackerSubscribeInterval models.Interval `toml:"contractConfigTrackerSubscribeInterval"`
ContractConfigTrackerSubscribeIntervalEnv bool
ContractConfigTrackerPollInterval models.Interval `toml:"contractConfigTrackerPollInterval"`
ContractConfigTrackerPollIntervalEnv bool
ContractConfigConfirmations uint16 `toml:"contractConfigConfirmations"`
ContractConfigConfirmationsEnv bool
EVMChainID *utils.Big `toml:"evmChainID" db:"evm_chain_id"`
DatabaseTimeout *models.Interval `toml:"databaseTimeout"`
DatabaseTimeoutEnv bool
ObservationGracePeriod *models.Interval `toml:"observationGracePeriod"`
ObservationGracePeriodEnv bool
ContractTransmitterTransmitTimeout *models.Interval `toml:"contractTransmitterTransmitTimeout"`
ContractTransmitterTransmitTimeoutEnv bool
CaptureEATelemetry bool `toml:"captureEATelemetry"`
CreatedAt time.Time `toml:"-"`
UpdatedAt time.Time `toml:"-"`
ID int32 `toml:"-"`
ContractAddress ethkey.EIP55Address `toml:"contractAddress"`
P2PBootstrapPeers pq.StringArray `toml:"p2pBootstrapPeers" db:"p2p_bootstrap_peers"`
P2PV2Bootstrappers pq.StringArray `toml:"p2pv2Bootstrappers" db:"p2pv2_bootstrappers"`
IsBootstrapPeer bool `toml:"isBootstrapPeer"`
EncryptedOCRKeyBundleID *models.Sha256Hash `toml:"keyBundleID"`
TransmitterAddress *ethkey.EIP55Address `toml:"transmitterAddress"`
ObservationTimeout models.Interval `toml:"observationTimeout"`
BlockchainTimeout models.Interval `toml:"blockchainTimeout"`
ContractConfigTrackerSubscribeInterval models.Interval `toml:"contractConfigTrackerSubscribeInterval"`
ContractConfigTrackerPollInterval models.Interval `toml:"contractConfigTrackerPollInterval"`
ContractConfigConfirmations uint16 `toml:"contractConfigConfirmations"`
EVMChainID *utils.Big `toml:"evmChainID" db:"evm_chain_id"`
DatabaseTimeout *models.Interval `toml:"databaseTimeout"`
ObservationGracePeriod *models.Interval `toml:"observationGracePeriod"`
ContractTransmitterTransmitTimeout *models.Interval `toml:"contractTransmitterTransmitTimeout"`
CaptureEATelemetry bool `toml:"captureEATelemetry"`
CreatedAt time.Time `toml:"-"`
UpdatedAt time.Time `toml:"-"`
}

// GetID is a getter function that returns the ID of the spec.
Expand Down Expand Up @@ -438,15 +429,14 @@ func (w *WebhookSpec) SetID(value string) error {
}

type DirectRequestSpec struct {
ID int32 `toml:"-"`
ContractAddress ethkey.EIP55Address `toml:"contractAddress"`
MinIncomingConfirmations clnull.Uint32 `toml:"minIncomingConfirmations"`
MinIncomingConfirmationsEnv bool `toml:"minIncomingConfirmationsEnv"`
Requesters models.AddressCollection `toml:"requesters"`
MinContractPayment *assets.Link `toml:"minContractPaymentLinkJuels"`
EVMChainID *utils.Big `toml:"evmChainID"`
CreatedAt time.Time `toml:"-"`
UpdatedAt time.Time `toml:"-"`
ID int32 `toml:"-"`
ContractAddress ethkey.EIP55Address `toml:"contractAddress"`
MinIncomingConfirmations clnull.Uint32 `toml:"minIncomingConfirmations"`
Requesters models.AddressCollection `toml:"requesters"`
MinContractPayment *assets.Link `toml:"minContractPaymentLinkJuels"`
EVMChainID *utils.Big `toml:"evmChainID"`
CreatedAt time.Time `toml:"-"`
UpdatedAt time.Time `toml:"-"`
}

type CronSpec struct {
Expand Down Expand Up @@ -522,13 +512,11 @@ type VRFSpec struct {
CoordinatorAddress ethkey.EIP55Address `toml:"coordinatorAddress"`
PublicKey secp256k1.PublicKey `toml:"publicKey"`
MinIncomingConfirmations uint32 `toml:"minIncomingConfirmations"`
ConfirmationsEnv bool `toml:"-"`
EVMChainID *utils.Big `toml:"evmChainID"`
FromAddresses []ethkey.EIP55Address `toml:"fromAddresses"`
PollPeriod time.Duration `toml:"pollPeriod"` // For v2 jobs
PollPeriodEnv bool
RequestedConfsDelay int64 `toml:"requestedConfsDelay"` // For v2 jobs. Optional, defaults to 0 if not provided.
RequestTimeout time.Duration `toml:"requestTimeout"` // Optional, defaults to 24hr if not provided.
PollPeriod time.Duration `toml:"pollPeriod"` // For v2 jobs
RequestedConfsDelay int64 `toml:"requestedConfsDelay"` // For v2 jobs. Optional, defaults to 0 if not provided.
RequestTimeout time.Duration `toml:"requestTimeout"` // Optional, defaults to 24hr if not provided.

// GasLanePrice specifies the gas lane price for this VRF job.
// If the specified keys in FromAddresses do not have the provided gas price the job
Expand Down
62 changes: 16 additions & 46 deletions core/services/job/orm.go
Original file line number Diff line number Diff line change
Expand Up @@ -705,61 +705,41 @@ func (o *orm) FindJobs(offset, limit int) (jobs []Job, count int, err error) {
return err
}
for i := range jobs {
err = multierr.Combine(err, o.LoadEnvConfigVars(&jobs[i]))
err = multierr.Combine(err, o.LoadConfigVars(&jobs[i]))
}
return nil
})
return jobs, int(count), err
}

func (o *orm) LoadEnvConfigVars(jb *Job) error {
func (o *orm) LoadConfigVars(jb *Job) error {
if jb.OCROracleSpec != nil {
ch, err := o.legacyChains.Get(jb.OCROracleSpec.EVMChainID.String())
if err != nil {
return err
}
newSpec, err := LoadEnvConfigVarsOCR(ch.Config().EVM().OCR(), ch.Config().OCR(), *jb.OCROracleSpec)
newSpec, err := LoadConfigVarsOCR(ch.Config().EVM().OCR(), ch.Config().OCR(), *jb.OCROracleSpec)
if err != nil {
return err
}
jb.OCROracleSpec = newSpec
} else if jb.VRFSpec != nil {
ch, err := o.legacyChains.Get(jb.VRFSpec.EVMChainID.String())
if err != nil {
return err
}
jb.VRFSpec = LoadEnvConfigVarsVRF(ch.Config().EVM(), *jb.VRFSpec)
} else if jb.DirectRequestSpec != nil {
ch, err := o.legacyChains.Get(jb.DirectRequestSpec.EVMChainID.String())
if err != nil {
return err
}
jb.DirectRequestSpec = LoadEnvConfigVarsDR(ch.Config().EVM(), *jb.DirectRequestSpec)
}
return nil
}

type DRSpecConfig interface {
MinIncomingConfirmations() uint32
}

func LoadEnvConfigVarsVRF(cfg DRSpecConfig, vrfs VRFSpec) *VRFSpec {
func LoadDefaultVRFPollPeriod(vrfs VRFSpec) *VRFSpec {
if vrfs.PollPeriod == 0 {
vrfs.PollPeriodEnv = true
vrfs.PollPeriod = 5 * time.Second
}

return &vrfs
}

func LoadEnvConfigVarsDR(cfg DRSpecConfig, drs DirectRequestSpec) *DirectRequestSpec {
// Take the largest of the global vs specific.
minIncomingConfirmations := cfg.MinIncomingConfirmations()
if !drs.MinIncomingConfirmations.Valid || drs.MinIncomingConfirmations.Uint32 < minIncomingConfirmations {
drs.MinIncomingConfirmationsEnv = true
drs.MinIncomingConfirmations = null.Uint32From(minIncomingConfirmations)
// SetDRMinIncomingConfirmations takes the largest of the global vs specific.
func SetDRMinIncomingConfirmations(defaultMinIncomingConfirmations uint32, drs DirectRequestSpec) *DirectRequestSpec {
if !drs.MinIncomingConfirmations.Valid || drs.MinIncomingConfirmations.Uint32 < defaultMinIncomingConfirmations {
drs.MinIncomingConfirmations = null.Uint32From(defaultMinIncomingConfirmations)
}

return &drs
}

Expand All @@ -773,54 +753,45 @@ type OCRConfig interface {
TransmitterAddress() (ethkey.EIP55Address, error)
}

// LoadEnvConfigVarsLocalOCR loads local OCR env vars into the OCROracleSpec.
func LoadEnvConfigVarsLocalOCR(evmOcrCfg evmconfig.OCR, os OCROracleSpec, ocrCfg OCRConfig) *OCROracleSpec {
// LoadConfigVarsLocalOCR loads local OCR vars into the OCROracleSpec.
func LoadConfigVarsLocalOCR(evmOcrCfg evmconfig.OCR, os OCROracleSpec, ocrCfg OCRConfig) *OCROracleSpec {
if os.ObservationTimeout == 0 {
os.ObservationTimeoutEnv = true
os.ObservationTimeout = models.Interval(ocrCfg.ObservationTimeout())
}
if os.BlockchainTimeout == 0 {
os.BlockchainTimeoutEnv = true
os.BlockchainTimeout = models.Interval(ocrCfg.BlockchainTimeout())
}
if os.ContractConfigTrackerSubscribeInterval == 0 {
os.ContractConfigTrackerSubscribeIntervalEnv = true
os.ContractConfigTrackerSubscribeInterval = models.Interval(ocrCfg.ContractSubscribeInterval())
}
if os.ContractConfigTrackerPollInterval == 0 {
os.ContractConfigTrackerPollIntervalEnv = true
os.ContractConfigTrackerPollInterval = models.Interval(ocrCfg.ContractPollInterval())
}
if os.ContractConfigConfirmations == 0 {
os.ContractConfigConfirmationsEnv = true
os.ContractConfigConfirmations = evmOcrCfg.ContractConfirmations()
}
if os.DatabaseTimeout == nil {
os.DatabaseTimeoutEnv = true
os.DatabaseTimeout = models.NewInterval(evmOcrCfg.DatabaseTimeout())
}
if os.ObservationGracePeriod == nil {
os.ObservationGracePeriodEnv = true
os.ObservationGracePeriod = models.NewInterval(evmOcrCfg.ObservationGracePeriod())
}
if os.ContractTransmitterTransmitTimeout == nil {
os.ContractTransmitterTransmitTimeoutEnv = true
os.ContractTransmitterTransmitTimeout = models.NewInterval(evmOcrCfg.ContractTransmitterTransmitTimeout())
}
os.CaptureEATelemetry = ocrCfg.CaptureEATelemetry()

return &os
}

// LoadEnvConfigVarsOCR loads OCR env vars into the OCROracleSpec.
func LoadEnvConfigVarsOCR(evmOcrCfg evmconfig.OCR, ocrCfg OCRConfig, os OCROracleSpec) (*OCROracleSpec, error) {
// LoadConfigVarsOCR loads OCR config vars into the OCROracleSpec.
func LoadConfigVarsOCR(evmOcrCfg evmconfig.OCR, ocrCfg OCRConfig, os OCROracleSpec) (*OCROracleSpec, error) {
if os.TransmitterAddress == nil {
ta, err := ocrCfg.TransmitterAddress()
if !errors.Is(errors.Cause(err), config.ErrEnvUnset) {
if err != nil {
return nil, err
}
os.TransmitterAddressEnv = true
os.TransmitterAddress = &ta
}
}
Expand All @@ -834,11 +805,10 @@ func LoadEnvConfigVarsOCR(evmOcrCfg evmconfig.OCR, ocrCfg OCRConfig, os OCROracl
if err != nil {
return nil, err
}
os.EncryptedOCRKeyBundleIDEnv = true
os.EncryptedOCRKeyBundleID = &encryptedOCRKeyBundleID
}

return LoadEnvConfigVarsLocalOCR(evmOcrCfg, os, ocrCfg), nil
return LoadConfigVarsLocalOCR(evmOcrCfg, os, ocrCfg), nil
}

func (o *orm) FindJobTx(id int32) (Job, error) {
Expand Down Expand Up @@ -872,7 +842,7 @@ func (o *orm) FindJobWithoutSpecErrors(id int32) (jb Job, err error) {
return jb, errors.Wrap(err, "FindJobWithoutSpecErrors failed")
}

return jb, o.LoadEnvConfigVars(&jb)
return jb, o.LoadConfigVars(&jb)
}

// FindSpecErrorsByJobIDs returns all jobs spec errors by jobs IDs
Expand Down Expand Up @@ -961,7 +931,7 @@ func (o *orm) findJob(jb *Job, col string, arg interface{}, qopts ...pg.QOpt) er
if err != nil {
return errors.Wrap(err, "findJob failed")
}
return o.LoadEnvConfigVars(jb)
return o.LoadConfigVars(jb)
}

func (o *orm) FindJobIDsWithBridge(name string) (jids []int32, err error) {
Expand Down Expand Up @@ -1205,7 +1175,7 @@ func (o *orm) FindJobsByPipelineSpecIDs(ids []int32) ([]Job, error) {
return err
}
for i := range jbs {
err = o.LoadEnvConfigVars(&jbs[i])
err = o.LoadConfigVars(&jbs[i])
//We must return the jobs even if the chainID is disabled
if err != nil && !errors.Is(err, chains.ErrNoSuchChainID) {
return err
Expand Down
32 changes: 17 additions & 15 deletions core/services/job/orm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/services/keystore"
"github.com/smartcontractkit/chainlink/v2/core/services/pg"
"github.com/smartcontractkit/chainlink/v2/core/services/pipeline"
"github.com/smartcontractkit/chainlink/v2/core/store/models"
)

func NewTestORM(t *testing.T, db *sqlx.DB, legacyChains evm.LegacyChainContainer, pipelineORM pipeline.ORM, bridgeORM bridges.ORM, keyStore keystore.Master, cfg pg.QConfig) job.ORM {
Expand All @@ -27,26 +28,28 @@ func NewTestORM(t *testing.T, db *sqlx.DB, legacyChains evm.LegacyChainContainer
return o
}

func TestLoadEnvConfigVarsLocalOCR(t *testing.T) {
func TestLoadConfigVarsLocalOCR(t *testing.T) {
t.Parallel()

config := configtest.NewTestGeneralConfig(t)
chainConfig := evmtest.NewChainScopedConfig(t, config)
jobSpec := &job.OCROracleSpec{}

jobSpec = job.LoadEnvConfigVarsLocalOCR(chainConfig.EVM().OCR(), *jobSpec, chainConfig.OCR())
jobSpec = job.LoadConfigVarsLocalOCR(chainConfig.EVM().OCR(), *jobSpec, chainConfig.OCR())

require.True(t, jobSpec.ObservationTimeoutEnv)
require.True(t, jobSpec.BlockchainTimeoutEnv)
require.True(t, jobSpec.ContractConfigTrackerSubscribeIntervalEnv)
require.True(t, jobSpec.ContractConfigTrackerPollIntervalEnv)
require.True(t, jobSpec.ContractConfigConfirmationsEnv)
require.True(t, jobSpec.DatabaseTimeoutEnv)
require.True(t, jobSpec.ObservationGracePeriodEnv)
require.True(t, jobSpec.ContractTransmitterTransmitTimeoutEnv)
require.Equal(t, models.Interval(chainConfig.OCR().ObservationTimeout()), jobSpec.ObservationTimeout)
require.Equal(t, models.Interval(chainConfig.OCR().BlockchainTimeout()), jobSpec.BlockchainTimeout)
require.Equal(t, models.Interval(chainConfig.OCR().ContractSubscribeInterval()), jobSpec.ContractConfigTrackerSubscribeInterval)
require.Equal(t, models.Interval(chainConfig.OCR().ContractPollInterval()), jobSpec.ContractConfigTrackerPollInterval)
require.Equal(t, chainConfig.OCR().CaptureEATelemetry(), jobSpec.CaptureEATelemetry)

require.Equal(t, chainConfig.EVM().OCR().ContractConfirmations(), jobSpec.ContractConfigConfirmations)
require.Equal(t, models.Interval(chainConfig.EVM().OCR().DatabaseTimeout()), *jobSpec.DatabaseTimeout)
require.Equal(t, models.Interval(chainConfig.EVM().OCR().ObservationGracePeriod()), *jobSpec.ObservationGracePeriod)
require.Equal(t, models.Interval(chainConfig.EVM().OCR().ContractTransmitterTransmitTimeout()), *jobSpec.ContractTransmitterTransmitTimeout)
}

func TestLoadEnvConfigVarsDR(t *testing.T) {
func TestSetDRMinIncomingConfirmations(t *testing.T) {
t.Parallel()

config := configtest.NewGeneralConfig(t, func(c *chainlink.Config, s *chainlink.Secrets) {
Expand All @@ -59,15 +62,14 @@ func TestLoadEnvConfigVarsDR(t *testing.T) {
MinIncomingConfirmations: clnull.Uint32From(10),
}

drs10 := job.LoadEnvConfigVarsDR(chainConfig.EVM(), jobSpec10)
assert.True(t, drs10.MinIncomingConfirmationsEnv)
drs10 := job.SetDRMinIncomingConfirmations(chainConfig.EVM().MinIncomingConfirmations(), jobSpec10)
assert.Equal(t, uint32(100), drs10.MinIncomingConfirmations.Uint32)

jobSpec200 := job.DirectRequestSpec{
MinIncomingConfirmations: clnull.Uint32From(200),
}

drs200 := job.LoadEnvConfigVarsDR(chainConfig.EVM(), jobSpec200)
assert.False(t, drs200.MinIncomingConfirmationsEnv)
drs200 := job.SetDRMinIncomingConfirmations(chainConfig.EVM().MinIncomingConfirmations(), jobSpec200)
assert.True(t, drs200.MinIncomingConfirmations.Valid)
assert.Equal(t, uint32(200), drs200.MinIncomingConfirmations.Uint32)
}
2 changes: 1 addition & 1 deletion core/services/ocr/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ type Config interface {
}

func toLocalConfig(cfg ValidationConfig, evmOcrConfig evmconfig.OCR, insecureCfg insecureConfig, spec job.OCROracleSpec, ocrConfig job.OCRConfig) ocrtypes.LocalConfig {
concreteSpec := job.LoadEnvConfigVarsLocalOCR(evmOcrConfig, spec, ocrConfig)
concreteSpec := job.LoadConfigVarsLocalOCR(evmOcrConfig, spec, ocrConfig)
lc := ocrtypes.LocalConfig{
BlockchainTimeout: concreteSpec.BlockchainTimeout.Duration(),
ContractConfigConfirmations: concreteSpec.ContractConfigConfirmations,
Expand Down
2 changes: 1 addition & 1 deletion core/services/ocr/delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func (d *Delegate) ServicesForSpec(jb job.Job) (services []job.ServiceCtx, err e
if err != nil {
return nil, err
}
concreteSpec, err := job.LoadEnvConfigVarsOCR(chain.Config().EVM().OCR(), chain.Config().OCR(), *jb.OCROracleSpec)
concreteSpec, err := job.LoadConfigVarsOCR(chain.Config().EVM().OCR(), chain.Config().OCR(), *jb.OCROracleSpec)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion core/services/vrf/v1/listener_v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func (lsn *Listener) getLatestHead() uint64 {
// Start complies with job.Service
func (lsn *Listener) Start(context.Context) error {
return lsn.StartOnce("VRFListener", func() error {
spec := job.LoadEnvConfigVarsVRF(lsn.Cfg, *lsn.Job.VRFSpec)
spec := job.LoadDefaultVRFPollPeriod(*lsn.Job.VRFSpec)

unsubscribeLogs := lsn.LogBroadcaster.Register(lsn, log.ListenerOpts{
Contract: lsn.Coordinator.Address(),
Expand Down
2 changes: 1 addition & 1 deletion core/services/vrf/v2/listener_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ func (lsn *listenerV2) Start(ctx context.Context) error {
"proofVerificationGas", GasProofVerification)
}

spec := job.LoadEnvConfigVarsVRF(lsn.cfg, *lsn.job.VRFSpec)
spec := job.LoadDefaultVRFPollPeriod(*lsn.job.VRFSpec)

unsubscribeLogs := lsn.logBroadcaster.Register(lsn, log.ListenerOpts{
Contract: lsn.coordinator.Address(),
Expand Down
Loading

0 comments on commit 8a3479e

Please sign in to comment.