diff --git a/.changeset/pretty-experts-unite.md b/.changeset/pretty-experts-unite.md new file mode 100644 index 00000000000..4a1f903d439 --- /dev/null +++ b/.changeset/pretty-experts-unite.md @@ -0,0 +1,7 @@ +--- +"chainlink": patch +--- + +Added log buffer v1 with improved performance, stability and control over scaling parameters. + +Added a feature flag for using log buffer v1. diff --git a/core/services/ocr2/delegate.go b/core/services/ocr2/delegate.go index a053b53992d..7b4200efd68 100644 --- a/core/services/ocr2/delegate.go +++ b/core/services/ocr2/delegate.go @@ -59,6 +59,7 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ocr2keeper" "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/autotelemetry21" ocr2keeper21core "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/core" + "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider" ocr2vrfconfig "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ocr2vrf/config" ocr2coordinator "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ocr2vrf/coordinator" "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ocr2vrf/juelsfeecoin" @@ -1313,6 +1314,14 @@ func (d *Delegate) newServicesOCR2Keepers21( return nil, errors.New("could not coerce PluginProvider to AutomationProvider") } + // TODO: (AUTO-9355) remove once we remove v0 + if useBufferV1 := cfg.UseBufferV1 != nil && *cfg.UseBufferV1; useBufferV1 { + logProviderFeatures, ok := keeperProvider.LogEventProvider().(logprovider.LogEventProviderFeatures) + if ok { + logProviderFeatures.WithBufferVersion("v1") + } + } + services, err := ocr2keeper.EVMDependencies21(kb) if err != nil { return nil, errors.Wrap(err, "could not build dependencies for ocr2 keepers") diff --git a/core/services/ocr2/plugins/ocr2keeper/config.go b/core/services/ocr2/plugins/ocr2keeper/config.go index ec56f9c6993..4b41e5a0285 100644 --- a/core/services/ocr2/plugins/ocr2keeper/config.go +++ b/core/services/ocr2/plugins/ocr2keeper/config.go @@ -60,6 +60,9 @@ type PluginConfig struct { ContractVersion string `json:"contractVersion"` // CaptureAutomationCustomTelemetry is a bool flag to toggle Custom Telemetry Service CaptureAutomationCustomTelemetry *bool `json:"captureAutomationCustomTelemetry,omitempty"` + // UseBufferV1 is a bool flag to toggle the new log buffer implementation + // TODO: (AUTO-9355) remove once we have a single version + UseBufferV1 *bool `json:"useBufferV1,omitempty"` } func ValidatePluginConfig(cfg PluginConfig) error { diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer_v1.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer_v1.go new file mode 100644 index 00000000000..fbc1da075df --- /dev/null +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer_v1.go @@ -0,0 +1,426 @@ +package logprovider + +import ( + "math" + "math/big" + "sort" + "sync" + "sync/atomic" + + "github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller" + "github.com/smartcontractkit/chainlink/v2/core/logger" + "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/prommetrics" +) + +type BufferedLog struct { + ID *big.Int + Log logpoller.Log +} + +type LogBuffer interface { + // Enqueue adds logs to the buffer and might also drop logs if the limit for the + // given upkeep was exceeded. Returns the number of logs that were added and number of logs that were dropped. + Enqueue(id *big.Int, logs ...logpoller.Log) (added int, dropped int) + // Dequeue pulls logs from the buffer that are within the given block window, + // with a maximum number of logs per upkeep and a total maximum number of logs to return. + // It also accepts a function to select upkeeps. + // Returns logs (associated to upkeeps) and the number of remaining + // logs in that window for the involved upkeeps. + Dequeue(block int64, blockRate, upkeepLimit, maxResults int, upkeepSelector func(id *big.Int) bool) ([]BufferedLog, int) + // SetConfig sets the buffer size and the maximum number of logs to keep for each upkeep. + SetConfig(lookback, blockRate, logLimit uint32) + // NumOfUpkeeps returns the number of upkeeps that are being tracked by the buffer. + NumOfUpkeeps() int + // SyncFilters removes upkeeps that are not in the filter store. + SyncFilters(filterStore UpkeepFilterStore) error +} + +func DefaultUpkeepSelector(id *big.Int) bool { + return true +} + +type logBufferOptions struct { + // number of blocks to keep in the buffer + lookback *atomic.Uint32 + // blockRate is the number of blocks per window + blockRate *atomic.Uint32 + // max number of logs to keep in the buffer for each upkeep per window (LogLimit*10) + windowLimit *atomic.Uint32 +} + +func newLogBufferOptions(lookback, blockRate, logLimit uint32) *logBufferOptions { + opts := &logBufferOptions{ + windowLimit: new(atomic.Uint32), + lookback: new(atomic.Uint32), + blockRate: new(atomic.Uint32), + } + opts.override(lookback, blockRate, logLimit) + + return opts +} + +func (o *logBufferOptions) override(lookback, blockRate, logLimit uint32) { + o.windowLimit.Store(logLimit * 10) + o.lookback.Store(lookback) + o.blockRate.Store(blockRate) +} + +func (o *logBufferOptions) windows() int { + return int(math.Ceil(float64(o.lookback.Load()) / float64(o.blockRate.Load()))) +} + +type logBuffer struct { + lggr logger.Logger + opts *logBufferOptions + // last block number seen by the buffer + lastBlockSeen *atomic.Int64 + // map of upkeep id to its queue + queues map[string]*upkeepLogQueue + lock sync.RWMutex +} + +func NewLogBuffer(lggr logger.Logger, lookback, blockRate, logLimit uint32) LogBuffer { + return &logBuffer{ + lggr: lggr.Named("KeepersRegistry.LogEventBufferV1"), + opts: newLogBufferOptions(lookback, blockRate, logLimit), + lastBlockSeen: new(atomic.Int64), + queues: make(map[string]*upkeepLogQueue), + } +} + +// Enqueue adds logs to the buffer and might also drop logs if the limit for the +// given upkeep was exceeded. It will create a new buffer if it does not exist. +// Returns the number of logs that were added and number of logs that were dropped. +func (b *logBuffer) Enqueue(uid *big.Int, logs ...logpoller.Log) (int, int) { + buf, ok := b.getUpkeepQueue(uid) + if !ok || buf == nil { + buf = newUpkeepLogQueue(b.lggr, uid, b.opts) + b.setUpkeepQueue(uid, buf) + } + latestBlock := latestBlockNumber(logs...) + if b.lastBlockSeen.Load() < latestBlock { + b.lastBlockSeen.Store(latestBlock) + } + blockThreshold := b.lastBlockSeen.Load() - int64(b.opts.lookback.Load()) + if blockThreshold <= 0 { + blockThreshold = 1 + } + return buf.enqueue(blockThreshold, logs...) +} + +// Dequeue greedly pulls logs from the buffers. +// Returns logs and the number of remaining logs in the buffer. +func (b *logBuffer) Dequeue(block int64, blockRate, upkeepLimit, maxResults int, upkeepSelector func(id *big.Int) bool) ([]BufferedLog, int) { + b.lock.RLock() + defer b.lock.RUnlock() + + start, end := getBlockWindow(block, blockRate) + return b.dequeue(start, end, upkeepLimit, maxResults, upkeepSelector) +} + +// dequeue pulls logs from the buffers, depends the given selector (upkeepSelector), +// in block range [start,end] with minimum number of results per upkeep (upkeepLimit) +// and the maximum number of results (capacity). +// Returns logs and the number of remaining logs in the buffer for the given range and selector. +// NOTE: this method is not thread safe and should be called within a lock. +func (b *logBuffer) dequeue(start, end int64, upkeepLimit, capacity int, upkeepSelector func(id *big.Int) bool) ([]BufferedLog, int) { + var result []BufferedLog + var remainingLogs int + for _, q := range b.queues { + if !upkeepSelector(q.id) { + // if the upkeep is not selected, skip it + continue + } + logsInRange := q.sizeOfRange(start, end) + if logsInRange == 0 { + // if there are no logs in the range, skip the upkeep + continue + } + if capacity == 0 { + // if there is no more capacity for results, just count the remaining logs + remainingLogs += logsInRange + continue + } + if upkeepLimit > capacity { + // adjust limit if it is higher than the actual capacity + upkeepLimit = capacity + } + logs, remaining := q.dequeue(start, end, upkeepLimit) + for _, l := range logs { + result = append(result, BufferedLog{ID: q.id, Log: l}) + capacity-- + } + remainingLogs += remaining + } + return result, remainingLogs +} + +func (b *logBuffer) SetConfig(lookback, blockRate, logLimit uint32) { + b.lock.Lock() + defer b.lock.Unlock() + + b.opts.override(lookback, blockRate, logLimit) +} + +func (b *logBuffer) NumOfUpkeeps() int { + b.lock.RLock() + defer b.lock.RUnlock() + + return len(b.queues) +} + +func (b *logBuffer) SyncFilters(filterStore UpkeepFilterStore) error { + b.lock.Lock() + defer b.lock.Unlock() + + for upkeepID := range b.queues { + uid := new(big.Int) + _, ok := uid.SetString(upkeepID, 10) + if ok && !filterStore.Has(uid) { + // remove upkeep that is not in the filter store + delete(b.queues, upkeepID) + } + } + + return nil +} + +func (b *logBuffer) getUpkeepQueue(uid *big.Int) (*upkeepLogQueue, bool) { + b.lock.RLock() + defer b.lock.RUnlock() + + ub, ok := b.queues[uid.String()] + return ub, ok +} + +func (b *logBuffer) setUpkeepQueue(uid *big.Int, buf *upkeepLogQueue) { + b.lock.Lock() + defer b.lock.Unlock() + + b.queues[uid.String()] = buf +} + +// TODO (AUTO-9256) separate files + +// logTriggerState represents the state of a log in the buffer. +type logTriggerState uint8 + +const ( + // the log was dropped due to buffer limits + logTriggerStateDropped logTriggerState = iota + // the log was enqueued by the buffer + logTriggerStateEnqueued + // the log was visited/dequeued from the buffer + logTriggerStateDequeued +) + +// logTriggerStateEntry represents the state of a log in the buffer and the block number of the log. +// TODO (AUTO-10013) handling of reorgs might require to store the block hash as well. +type logTriggerStateEntry struct { + state logTriggerState + block int64 +} + +// upkeepLogQueue is a priority queue for logs associated to a specific upkeep. +// It keeps track of the logs that were already visited and the capacity of the queue. +type upkeepLogQueue struct { + lggr logger.Logger + + id *big.Int + opts *logBufferOptions + + // logs is the buffer of logs for the upkeep + logs []logpoller.Log + // states keeps track of the state of the logs that are known to the queue + // and the block number they were seen at + states map[string]logTriggerStateEntry + lock sync.RWMutex +} + +func newUpkeepLogQueue(lggr logger.Logger, id *big.Int, opts *logBufferOptions) *upkeepLogQueue { + maxLogs := int(opts.windowLimit.Load()) * opts.windows() // limit per window * windows + return &upkeepLogQueue{ + lggr: lggr.With("upkeepID", id.String()), + id: id, + opts: opts, + logs: make([]logpoller.Log, 0, maxLogs), + states: make(map[string]logTriggerStateEntry), + } +} + +// sizeOfRange returns the number of logs in the buffer that are within the given block range. +func (q *upkeepLogQueue) sizeOfRange(start, end int64) int { + q.lock.RLock() + defer q.lock.RUnlock() + + size := 0 + for _, l := range q.logs { + if l.BlockNumber >= start && l.BlockNumber <= end { + size++ + } + } + return size +} + +// dequeue pulls logs from the buffer that are within the given block range, +// with a limit of logs to pull. Returns logs and the number of remaining logs in the buffer. +func (q *upkeepLogQueue) dequeue(start, end int64, limit int) ([]logpoller.Log, int) { + q.lock.Lock() + defer q.lock.Unlock() + + if len(q.logs) == 0 { + return nil, 0 + } + + var results []logpoller.Log + var remaining int + updatedLogs := make([]logpoller.Log, 0) + for _, l := range q.logs { + if l.BlockNumber >= start && l.BlockNumber <= end { + if len(results) < limit { + results = append(results, l) + lid := logID(l) + if s, ok := q.states[lid]; ok { + s.state = logTriggerStateDequeued + q.states[lid] = s + } + continue + } + remaining++ + } + updatedLogs = append(updatedLogs, l) + } + + if len(results) > 0 { + q.logs = updatedLogs + q.lggr.Debugw("Dequeued logs", "start", start, "end", end, "limit", limit, "results", len(results), "remaining", remaining) + } + + prommetrics.AutomationLogBufferFlow.WithLabelValues(prommetrics.LogBufferFlowDirectionEgress).Add(float64(len(results))) + + return results, remaining +} + +// enqueue adds logs to the buffer and might also drop logs if the limit for the +// given upkeep was exceeded. Additionally, it will drop logs that are older than blockThreshold. +// Returns the number of logs that were added and number of logs that were dropped. +func (q *upkeepLogQueue) enqueue(blockThreshold int64, logsToAdd ...logpoller.Log) (int, int) { + q.lock.Lock() + defer q.lock.Unlock() + + logs := q.logs + var added int + for _, log := range logsToAdd { + if log.BlockNumber < blockThreshold { + // q.lggr.Debugw("Skipping log from old block", "blockThreshold", blockThreshold, "logBlock", log.BlockNumber, "logIndex", log.LogIndex) + continue + } + lid := logID(log) + if _, ok := q.states[lid]; ok { + // q.lggr.Debugw("Skipping known log", "blockThreshold", blockThreshold, "logBlock", log.BlockNumber, "logIndex", log.LogIndex) + continue + } + q.states[lid] = logTriggerStateEntry{state: logTriggerStateEnqueued, block: log.BlockNumber} + added++ + logs = append(logs, log) + } + q.logs = logs + + var dropped int + if added > 0 { + q.orderLogs() + dropped = q.clean(blockThreshold) + q.lggr.Debugw("Enqueued logs", "added", added, "dropped", dropped, "blockThreshold", blockThreshold, "q size", len(q.logs), "visited size", len(q.states)) + } + + prommetrics.AutomationLogBufferFlow.WithLabelValues(prommetrics.LogBufferFlowDirectionIngress).Add(float64(added)) + prommetrics.AutomationLogBufferFlow.WithLabelValues(prommetrics.LogBufferFlowDirectionDropped).Add(float64(dropped)) + + return added, dropped +} + +// orderLogs sorts the logs in the buffer. +// NOTE: this method is not thread safe and should be called within a lock. +func (q *upkeepLogQueue) orderLogs() { + // sort logs by block number, tx hash and log index + // to keep the q sorted and to ensure that logs can be + // grouped by block windows for the cleanup + sort.SliceStable(q.logs, func(i, j int) bool { + return LogSorter(q.logs[i], q.logs[j]) + }) +} + +// clean removes logs that are older than blockThreshold and drops logs if the limit for the +// given upkeep was exceeded. Returns the number of logs that were dropped. +// NOTE: this method is not thread safe and should be called within a lock. +func (q *upkeepLogQueue) clean(blockThreshold int64) int { + var dropped, expired int + blockRate := int(q.opts.blockRate.Load()) + windowLimit := int(q.opts.windowLimit.Load()) + updated := make([]logpoller.Log, 0) + // helper variables to keep track of the current window capacity + currentWindowCapacity, currentWindowStart := 0, int64(0) + for _, l := range q.logs { + if blockThreshold > l.BlockNumber { // old log, removed + prommetrics.AutomationLogBufferFlow.WithLabelValues(prommetrics.LogBufferFlowDirectionExpired).Inc() + // q.lggr.Debugw("Expiring old log", "blockNumber", l.BlockNumber, "blockThreshold", blockThreshold, "logIndex", l.LogIndex) + logid := logID(l) + delete(q.states, logid) + expired++ + continue + } + start, _ := getBlockWindow(l.BlockNumber, blockRate) + if start != currentWindowStart { + // new window, reset capacity + currentWindowStart = start + currentWindowCapacity = 0 + } + currentWindowCapacity++ + // if capacity has been reached, drop the log + if currentWindowCapacity > windowLimit { + lid := logID(l) + if s, ok := q.states[lid]; ok { + s.state = logTriggerStateDropped + q.states[lid] = s + } + dropped++ + prommetrics.AutomationLogBufferFlow.WithLabelValues(prommetrics.LogBufferFlowDirectionDropped).Inc() + q.lggr.Debugw("Reached log buffer limits, dropping log", "blockNumber", l.BlockNumber, + "blockHash", l.BlockHash, "txHash", l.TxHash, "logIndex", l.LogIndex, "len updated", len(updated), + "currentWindowStart", currentWindowStart, "currentWindowCapacity", currentWindowCapacity, + "maxLogsPerWindow", windowLimit, "blockRate", blockRate) + continue + } + updated = append(updated, l) + } + + if dropped > 0 || expired > 0 { + q.lggr.Debugw("Cleaned logs", "dropped", dropped, "expired", expired, "blockThreshold", blockThreshold, "len updated", len(updated), "len before", len(q.logs)) + q.logs = updated + } + + q.cleanStates(blockThreshold) + + return dropped +} + +// cleanStates removes states that are older than blockThreshold. +// NOTE: this method is not thread safe and should be called within a lock. +func (q *upkeepLogQueue) cleanStates(blockThreshold int64) { + for lid, s := range q.states { + if s.block <= blockThreshold { + delete(q.states, lid) + } + } +} + +// getBlockWindow returns the start and end block of the window for the given block. +func getBlockWindow(block int64, blockRate int) (start int64, end int64) { + windowSize := int64(blockRate) + if windowSize == 0 { + return block, block + } + start = block - (block % windowSize) + end = start + windowSize - 1 + return +} diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer_v1_test.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer_v1_test.go new file mode 100644 index 00000000000..19f806d35b9 --- /dev/null +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer_v1_test.go @@ -0,0 +1,472 @@ +package logprovider + +import ( + "math/big" + "testing" + + "github.com/ethereum/go-ethereum/common" + "github.com/stretchr/testify/require" + + "github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller" + "github.com/smartcontractkit/chainlink/v2/core/logger" +) + +func TestLogEventBufferV1(t *testing.T) { + buf := NewLogBuffer(logger.TestLogger(t), 10, 20, 1) + + buf.Enqueue(big.NewInt(1), + logpoller.Log{BlockNumber: 2, TxHash: common.HexToHash("0x1"), LogIndex: 0}, + logpoller.Log{BlockNumber: 2, TxHash: common.HexToHash("0x1"), LogIndex: 1}, + ) + buf.Enqueue(big.NewInt(2), + logpoller.Log{BlockNumber: 2, TxHash: common.HexToHash("0x2"), LogIndex: 0}, + logpoller.Log{BlockNumber: 2, TxHash: common.HexToHash("0x1"), LogIndex: 2}, + ) + results, remaining := buf.Dequeue(int64(1), 10, 1, 2, DefaultUpkeepSelector) + require.Equal(t, 2, len(results)) + require.Equal(t, 2, remaining) + require.True(t, results[0].ID.Cmp(results[1].ID) != 0) + results, remaining = buf.Dequeue(int64(1), 10, 1, 2, DefaultUpkeepSelector) + require.Equal(t, 2, len(results)) + require.Equal(t, 0, remaining) +} + +func TestLogEventBufferV1_SyncFilters(t *testing.T) { + buf := NewLogBuffer(logger.TestLogger(t), 10, 20, 1) + + buf.Enqueue(big.NewInt(1), + logpoller.Log{BlockNumber: 2, TxHash: common.HexToHash("0x1"), LogIndex: 0}, + logpoller.Log{BlockNumber: 2, TxHash: common.HexToHash("0x1"), LogIndex: 1}, + ) + buf.Enqueue(big.NewInt(2), + logpoller.Log{BlockNumber: 2, TxHash: common.HexToHash("0x2"), LogIndex: 0}, + logpoller.Log{BlockNumber: 2, TxHash: common.HexToHash("0x1"), LogIndex: 2}, + ) + filterStore := NewUpkeepFilterStore() + filterStore.AddActiveUpkeeps(upkeepFilter{upkeepID: big.NewInt(1)}) + + require.Equal(t, 2, buf.NumOfUpkeeps()) + require.NoError(t, buf.SyncFilters(filterStore)) + require.Equal(t, 1, buf.NumOfUpkeeps()) +} + +func TestLogEventBufferV1_Dequeue(t *testing.T) { + tests := []struct { + name string + logsInBuffer map[*big.Int][]logpoller.Log + args dequeueArgs + lookback int + results []logpoller.Log + remaining int + }{ + { + name: "empty", + logsInBuffer: map[*big.Int][]logpoller.Log{}, + args: newDequeueArgs(10, 1, 1, 10, nil), + lookback: 20, + results: []logpoller.Log{}, + }, + { + name: "happy path", + logsInBuffer: map[*big.Int][]logpoller.Log{ + big.NewInt(1): { + {BlockNumber: 12, TxHash: common.HexToHash("0x12"), LogIndex: 0}, + {BlockNumber: 14, TxHash: common.HexToHash("0x15"), LogIndex: 1}, + }, + }, + args: newDequeueArgs(10, 5, 3, 10, nil), + lookback: 20, + results: []logpoller.Log{ + {}, {}, + }, + }, + { + name: "with upkeep limits", + logsInBuffer: map[*big.Int][]logpoller.Log{ + big.NewInt(1): { + {BlockNumber: 12, TxHash: common.HexToHash("0x12"), LogIndex: 1}, + {BlockNumber: 12, TxHash: common.HexToHash("0x12"), LogIndex: 0}, + {BlockNumber: 13, TxHash: common.HexToHash("0x13"), LogIndex: 0}, + {BlockNumber: 13, TxHash: common.HexToHash("0x13"), LogIndex: 1}, + {BlockNumber: 14, TxHash: common.HexToHash("0x14"), LogIndex: 1}, + {BlockNumber: 14, TxHash: common.HexToHash("0x14"), LogIndex: 2}, + }, + big.NewInt(2): { + {BlockNumber: 12, TxHash: common.HexToHash("0x12"), LogIndex: 11}, + {BlockNumber: 12, TxHash: common.HexToHash("0x12"), LogIndex: 10}, + {BlockNumber: 13, TxHash: common.HexToHash("0x13"), LogIndex: 10}, + {BlockNumber: 13, TxHash: common.HexToHash("0x13"), LogIndex: 11}, + {BlockNumber: 14, TxHash: common.HexToHash("0x14"), LogIndex: 11}, + {BlockNumber: 14, TxHash: common.HexToHash("0x14"), LogIndex: 12}, + }, + }, + args: newDequeueArgs(10, 5, 2, 10, nil), + lookback: 20, + results: []logpoller.Log{ + {}, {}, {}, {}, + }, + remaining: 8, + }, + { + name: "with max results", + logsInBuffer: map[*big.Int][]logpoller.Log{ + big.NewInt(1): append(createDummyLogSequence(2, 0, 12, common.HexToHash("0x12")), createDummyLogSequence(2, 0, 13, common.HexToHash("0x13"))...), + big.NewInt(2): append(createDummyLogSequence(2, 10, 12, common.HexToHash("0x12")), createDummyLogSequence(2, 10, 13, common.HexToHash("0x13"))...), + }, + args: newDequeueArgs(10, 5, 3, 4, nil), + lookback: 20, + results: []logpoller.Log{ + {}, {}, {}, {}, + }, + remaining: 4, + }, + { + name: "with upkeep selector", + logsInBuffer: map[*big.Int][]logpoller.Log{ + big.NewInt(1): { + {BlockNumber: 12, TxHash: common.HexToHash("0x12"), LogIndex: 0}, + {BlockNumber: 14, TxHash: common.HexToHash("0x15"), LogIndex: 1}, + }, + }, + args: newDequeueArgs(10, 5, 5, 10, func(id *big.Int) bool { return false }), + lookback: 20, + results: []logpoller.Log{}, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + buf := NewLogBuffer(logger.TestLogger(t), uint32(tc.lookback), uint32(tc.args.blockRate), uint32(tc.args.upkeepLimit)) + for id, logs := range tc.logsInBuffer { + added, dropped := buf.Enqueue(id, logs...) + require.Equal(t, len(logs), added+dropped) + } + results, remaining := buf.Dequeue(tc.args.block, tc.args.blockRate, tc.args.upkeepLimit, tc.args.maxResults, tc.args.upkeepSelector) + require.Equal(t, len(tc.results), len(results)) + require.Equal(t, tc.remaining, remaining) + }) + } +} + +func TestLogEventBufferV1_Enqueue(t *testing.T) { + tests := []struct { + name string + logsToAdd map[*big.Int][]logpoller.Log + added, dropped map[string]int + sizeOfRange map[*big.Int]int + rangeStart, rangeEnd int64 + lookback, blockRate, upkeepLimit uint32 + }{ + { + name: "empty", + logsToAdd: map[*big.Int][]logpoller.Log{}, + added: map[string]int{}, + dropped: map[string]int{}, + sizeOfRange: map[*big.Int]int{}, + rangeStart: 0, + rangeEnd: 10, + blockRate: 1, + upkeepLimit: 1, + lookback: 20, + }, + { + name: "happy path", + logsToAdd: map[*big.Int][]logpoller.Log{ + big.NewInt(1): { + {BlockNumber: 12, TxHash: common.HexToHash("0x12"), LogIndex: 0}, + {BlockNumber: 14, TxHash: common.HexToHash("0x15"), LogIndex: 1}, + }, + big.NewInt(2): { + {BlockNumber: 12, TxHash: common.HexToHash("0x12"), LogIndex: 11}, + }, + }, + added: map[string]int{ + big.NewInt(1).String(): 2, + big.NewInt(2).String(): 1, + }, + dropped: map[string]int{ + big.NewInt(1).String(): 0, + big.NewInt(2).String(): 0, + }, + sizeOfRange: map[*big.Int]int{ + big.NewInt(1): 2, + big.NewInt(2): 1, + }, + rangeStart: 10, + rangeEnd: 20, + blockRate: 5, + upkeepLimit: 1, + lookback: 20, + }, + { + name: "above limits", + logsToAdd: map[*big.Int][]logpoller.Log{ + big.NewInt(1): createDummyLogSequence(11, 0, 12, common.HexToHash("0x12")), + big.NewInt(2): { + {BlockNumber: 12, TxHash: common.HexToHash("0x12"), LogIndex: 11}, + }, + }, + added: map[string]int{ + big.NewInt(1).String(): 11, + big.NewInt(2).String(): 1, + }, + dropped: map[string]int{ + big.NewInt(1).String(): 1, + big.NewInt(2).String(): 0, + }, + sizeOfRange: map[*big.Int]int{ + big.NewInt(1): 10, + big.NewInt(2): 1, + }, + rangeStart: 10, + rangeEnd: 20, + blockRate: 10, + upkeepLimit: 1, + lookback: 20, + }, + { + name: "out of block range", + logsToAdd: map[*big.Int][]logpoller.Log{ + big.NewInt(1): append(createDummyLogSequence(2, 0, 1, common.HexToHash("0x1")), createDummyLogSequence(2, 0, 100, common.HexToHash("0x1"))...), + }, + added: map[string]int{ + big.NewInt(1).String(): 2, + }, + dropped: map[string]int{ + big.NewInt(1).String(): 0, + }, + sizeOfRange: map[*big.Int]int{ + big.NewInt(1): 2, + }, + rangeStart: 1, + rangeEnd: 101, + blockRate: 10, + upkeepLimit: 10, + lookback: 20, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + buf := NewLogBuffer(logger.TestLogger(t), tc.lookback, tc.blockRate, tc.upkeepLimit) + for id, logs := range tc.logsToAdd { + added, dropped := buf.Enqueue(id, logs...) + sid := id.String() + if _, ok := tc.added[sid]; !ok { + tc.added[sid] = 0 + } + if _, ok := tc.dropped[sid]; !ok { + tc.dropped[sid] = 0 + } + require.Equal(t, tc.added[sid], added) + require.Equal(t, tc.dropped[sid], dropped) + } + for id, size := range tc.sizeOfRange { + q, ok := buf.(*logBuffer).getUpkeepQueue(id) + require.True(t, ok) + require.Equal(t, size, q.sizeOfRange(tc.rangeStart, tc.rangeEnd)) + } + }) + } +} + +func TestLogEventBufferV1_UpkeepQueue(t *testing.T) { + t.Run("enqueue dequeue", func(t *testing.T) { + q := newUpkeepLogQueue(logger.TestLogger(t), big.NewInt(1), newLogBufferOptions(10, 1, 1)) + + added, dropped := q.enqueue(10, logpoller.Log{BlockNumber: 20, TxHash: common.HexToHash("0x1"), LogIndex: 0}) + require.Equal(t, 0, dropped) + require.Equal(t, 1, added) + require.Equal(t, 1, q.sizeOfRange(1, 20)) + logs, remaining := q.dequeue(19, 21, 10) + require.Equal(t, 1, len(logs)) + require.Equal(t, 0, remaining) + }) + + t.Run("enqueue with limits", func(t *testing.T) { + q := newUpkeepLogQueue(logger.TestLogger(t), big.NewInt(1), newLogBufferOptions(10, 1, 1)) + + added, dropped := q.enqueue(10, + createDummyLogSequence(15, 0, 20, common.HexToHash("0x20"))..., + ) + require.Equal(t, 5, dropped) + require.Equal(t, 15, added) + }) + + t.Run("dequeue with limits", func(t *testing.T) { + q := newUpkeepLogQueue(logger.TestLogger(t), big.NewInt(1), newLogBufferOptions(10, 1, 3)) + + added, dropped := q.enqueue(10, + logpoller.Log{BlockNumber: 20, TxHash: common.HexToHash("0x1"), LogIndex: 0}, + logpoller.Log{BlockNumber: 20, TxHash: common.HexToHash("0x1"), LogIndex: 1}, + logpoller.Log{BlockNumber: 20, TxHash: common.HexToHash("0x1"), LogIndex: 10}, + ) + require.Equal(t, 0, dropped) + require.Equal(t, 3, added) + + logs, remaining := q.dequeue(19, 21, 2) + require.Equal(t, 2, len(logs)) + require.Equal(t, 1, remaining) + }) +} + +func TestLogEventBufferV1_UpkeepQueue_sizeOfRange(t *testing.T) { + t.Run("empty", func(t *testing.T) { + q := newUpkeepLogQueue(logger.TestLogger(t), big.NewInt(1), newLogBufferOptions(10, 1, 1)) + + require.Equal(t, 0, q.sizeOfRange(1, 10)) + }) + + t.Run("happy path", func(t *testing.T) { + q := newUpkeepLogQueue(logger.TestLogger(t), big.NewInt(1), newLogBufferOptions(10, 1, 1)) + + added, dropped := q.enqueue(10, logpoller.Log{BlockNumber: 20, TxHash: common.HexToHash("0x1"), LogIndex: 0}) + require.Equal(t, 0, dropped) + require.Equal(t, 1, added) + require.Equal(t, 0, q.sizeOfRange(1, 10)) + require.Equal(t, 1, q.sizeOfRange(1, 20)) + }) +} + +func TestLogEventBufferV1_UpkeepQueue_clean(t *testing.T) { + t.Run("empty", func(t *testing.T) { + q := newUpkeepLogQueue(logger.TestLogger(t), big.NewInt(1), newLogBufferOptions(10, 1, 1)) + + q.clean(10) + }) + + t.Run("happy path", func(t *testing.T) { + buf := NewLogBuffer(logger.TestLogger(t), 10, 5, 1) + + buf.Enqueue(big.NewInt(1), + logpoller.Log{BlockNumber: 2, TxHash: common.HexToHash("0x1"), LogIndex: 0}, + logpoller.Log{BlockNumber: 2, TxHash: common.HexToHash("0x1"), LogIndex: 1}, + ) + buf.Enqueue(big.NewInt(1), + logpoller.Log{BlockNumber: 11, TxHash: common.HexToHash("0x111"), LogIndex: 0}, + logpoller.Log{BlockNumber: 11, TxHash: common.HexToHash("0x111"), LogIndex: 1}, + ) + + q, ok := buf.(*logBuffer).getUpkeepQueue(big.NewInt(1)) + require.True(t, ok) + require.Equal(t, 4, q.sizeOfRange(1, 11)) + + buf.Enqueue(big.NewInt(1), + logpoller.Log{BlockNumber: 17, TxHash: common.HexToHash("0x171"), LogIndex: 0}, + logpoller.Log{BlockNumber: 17, TxHash: common.HexToHash("0x171"), LogIndex: 1}, + ) + + require.Equal(t, 4, q.sizeOfRange(1, 18)) + require.Equal(t, 0, q.clean(12)) + require.Equal(t, 2, q.sizeOfRange(1, 18)) + q.lock.Lock() + defer q.lock.Unlock() + require.Equal(t, 2, len(q.states)) + }) +} + +func TestLogEventBufferV1_BlockWindow(t *testing.T) { + tests := []struct { + name string + block int64 + blockRate int + wantStart int64 + wantEnd int64 + }{ + { + name: "block 0, blockRate 1", + block: 0, + blockRate: 1, + wantStart: 0, + wantEnd: 0, + }, + { + name: "block 81, blockRate 1", + block: 81, + blockRate: 1, + wantStart: 81, + wantEnd: 81, + }, + { + name: "block 0, blockRate 4", + block: 0, + blockRate: 4, + wantStart: 0, + wantEnd: 3, + }, + { + name: "block 81, blockRate 4", + block: 81, + blockRate: 4, + wantStart: 80, + wantEnd: 83, + }, + { + name: "block 83, blockRate 4", + block: 83, + blockRate: 4, + wantStart: 80, + wantEnd: 83, + }, + { + name: "block 84, blockRate 4", + block: 84, + blockRate: 4, + wantStart: 84, + wantEnd: 87, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + start, end := getBlockWindow(tc.block, tc.blockRate) + require.Equal(t, tc.wantStart, start) + require.Equal(t, tc.wantEnd, end) + }) + } +} + +type dequeueArgs struct { + block int64 + blockRate int + upkeepLimit int + maxResults int + upkeepSelector func(id *big.Int) bool +} + +func newDequeueArgs(block int64, blockRate int, upkeepLimit int, maxResults int, upkeepSelector func(id *big.Int) bool) dequeueArgs { + args := dequeueArgs{ + block: block, + blockRate: blockRate, + upkeepLimit: upkeepLimit, + maxResults: maxResults, + upkeepSelector: upkeepSelector, + } + + if upkeepSelector == nil { + args.upkeepSelector = DefaultUpkeepSelector + } + if blockRate == 0 { + args.blockRate = 1 + } + if maxResults == 0 { + args.maxResults = 10 + } + if upkeepLimit == 0 { + args.upkeepLimit = 1 + } + + return args +} + +func createDummyLogSequence(n, startIndex int, block int64, tx common.Hash) []logpoller.Log { + logs := make([]logpoller.Log, n) + for i := 0; i < n; i++ { + logs[i] = logpoller.Log{ + BlockNumber: block, + TxHash: tx, + LogIndex: int64(i + startIndex), + } + } + return logs +} diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/factory.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/factory.go index 263fa69223f..64833f9269b 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/factory.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/factory.go @@ -4,8 +4,6 @@ import ( "math/big" "time" - "golang.org/x/time/rate" - "github.com/smartcontractkit/chainlink/v2/core/chains/evm/client" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller" "github.com/smartcontractkit/chainlink/v2/core/logger" @@ -17,7 +15,7 @@ import ( func New(lggr logger.Logger, poller logpoller.LogPoller, c client.Client, stateStore core.UpkeepStateReader, finalityDepth uint32, chainID *big.Int) (LogEventProvider, LogRecoverer) { filterStore := NewUpkeepFilterStore() packer := NewLogEventsPacker() - opts := NewOptions(int64(finalityDepth)) + opts := NewOptions(int64(finalityDepth), chainID) provider := NewLogProvider(lggr, poller, chainID, packer, filterStore, opts) recoverer := NewLogRecoverer(lggr, poller, c, stateStore, packer, filterStore, opts) @@ -27,22 +25,36 @@ func New(lggr logger.Logger, poller logpoller.LogPoller, c client.Client, stateS // LogTriggersOptions holds the options for the log trigger components. type LogTriggersOptions struct { + chainID *big.Int // LookbackBlocks is the number of blocks the provider will look back for logs. // The recoverer will scan for logs up to this depth. // NOTE: MUST be set to a greater-or-equal to the chain's finality depth. LookbackBlocks int64 // ReadInterval is the interval to fetch logs in the background. ReadInterval time.Duration - // BlockRateLimit is the rate limit on the range of blocks the we fetch logs for. - BlockRateLimit rate.Limit - // blockLimitBurst is the burst upper limit on the range of blocks the we fetch logs for. - BlockLimitBurst int // Finality depth is the number of blocks to wait before considering a block final. FinalityDepth int64 + + // TODO: (AUTO-9355) remove once we have a single version + BufferVersion BufferVersion + // LogLimit is the minimum number of logs to process in a single block window. + LogLimit uint32 + // BlockRate determines the block window for log processing. + BlockRate uint32 } -func NewOptions(finalityDepth int64) LogTriggersOptions { +// BufferVersion is the version of the log buffer. +// TODO: (AUTO-9355) remove once we have a single version +type BufferVersion string + +const ( + BufferVersionDefault BufferVersion = "" + BufferVersionV1 BufferVersion = "v1" +) + +func NewOptions(finalityDepth int64, chainID *big.Int) LogTriggersOptions { opts := new(LogTriggersOptions) + opts.chainID = chainID opts.Defaults(finalityDepth) return *opts } @@ -60,13 +72,35 @@ func (o *LogTriggersOptions) Defaults(finalityDepth int64) { if o.ReadInterval == 0 { o.ReadInterval = time.Second } - if o.BlockLimitBurst == 0 { - o.BlockLimitBurst = int(o.LookbackBlocks) - } - if o.BlockRateLimit == 0 { - o.BlockRateLimit = rate.Every(o.ReadInterval) - } if o.FinalityDepth == 0 { o.FinalityDepth = finalityDepth } + if o.BlockRate == 0 { + o.BlockRate = o.defaultBlockRate() + } + if o.LogLimit == 0 { + o.LogLimit = o.defaultLogLimit() + } +} + +func (o *LogTriggersOptions) defaultBlockRate() uint32 { + switch o.chainID.Int64() { + case 42161, 421613, 421614: // Arbitrum + return 4 + default: + return 1 + } +} + +func (o *LogTriggersOptions) defaultLogLimit() uint32 { + switch o.chainID.Int64() { + case 42161, 421613, 421614: // Arbitrum + return 1 + case 1, 4, 5, 42, 11155111: // Eth + return 20 + case 10, 420, 56, 97, 137, 80001, 43113, 43114, 8453, 84531: // Optimism, BSC, Polygon, Avax, Base + return 5 + default: + return 2 + } } diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/filter.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/filter.go index 44780cbc4b1..c0f204aa57b 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/filter.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/filter.go @@ -5,7 +5,6 @@ import ( "math/big" "github.com/ethereum/go-ethereum/common" - "golang.org/x/time/rate" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller" ) @@ -21,9 +20,6 @@ type upkeepFilter struct { // lastPollBlock is the last block number the logs were fetched for this upkeep // used by log event provider. lastPollBlock int64 - // blockLimiter is used to limit the number of blocks to fetch logs for an upkeep. - // used by log event provider. - blockLimiter *rate.Limiter // lastRePollBlock is the last block number the logs were recovered for this upkeep // used by log recoverer. lastRePollBlock int64 @@ -42,7 +38,6 @@ func (f upkeepFilter) Clone() upkeepFilter { configUpdateBlock: f.configUpdateBlock, lastPollBlock: f.lastPollBlock, lastRePollBlock: f.lastRePollBlock, - blockLimiter: f.blockLimiter, } } diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/integration_test.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/integration_test.go index 51cdeccafdf..8108f1a3466 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/integration_test.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/integration_test.go @@ -2,7 +2,6 @@ package logprovider_test import ( "context" - "errors" "math/big" "testing" "time" @@ -15,15 +14,12 @@ import ( "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/eth/ethconfig" "github.com/jmoiron/sqlx" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.uber.org/zap/zapcore" - "golang.org/x/time/rate" ocr2keepers "github.com/smartcontractkit/chainlink-common/pkg/types/automation" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/assets" - "github.com/smartcontractkit/chainlink/v2/core/chains/evm/client" evmclient "github.com/smartcontractkit/chainlink/v2/core/chains/evm/client" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller" "github.com/smartcontractkit/chainlink/v2/core/gethwrappers/generated/log_upkeep_counter_wrapper" @@ -37,90 +33,115 @@ import ( ) func TestIntegration_LogEventProvider(t *testing.T) { - ctx, cancel := context.WithCancel(testutils.Context(t)) - defer cancel() + tests := []struct { + name string + bufferVersion logprovider.BufferVersion + logLimit uint32 + }{ + { + name: "default buffer", + bufferVersion: logprovider.BufferVersionDefault, + logLimit: 10, + }, + { + name: "buffer v1", + bufferVersion: logprovider.BufferVersionV1, + logLimit: 10, + }, + } - backend, stopMining, accounts := setupBackend(t) - defer stopMining() - carrol := accounts[2] + for _, tc := range tests { + bufferVersion, logLimit := tc.bufferVersion, tc.logLimit + t.Run(tc.name, func(t *testing.T) { + ctx, cancel := context.WithCancel(testutils.Context(t)) + defer cancel() - db := setupDB(t) - defer db.Close() + backend, stopMining, accounts := setupBackend(t) + defer stopMining() + carrol := accounts[2] - opts := logprovider.NewOptions(200) - opts.ReadInterval = time.Second / 2 - lp, ethClient := setupDependencies(t, db, backend) - filterStore := logprovider.NewUpkeepFilterStore() - provider, _ := setup(logger.TestLogger(t), lp, nil, nil, filterStore, &opts) - logProvider := provider.(logprovider.LogEventProviderTest) + db := setupDB(t) + defer db.Close() - n := 10 + opts := logprovider.NewOptions(200, big.NewInt(1)) + opts.ReadInterval = time.Second / 2 + opts.BufferVersion = bufferVersion + opts.LogLimit = logLimit - backend.Commit() - lp.PollAndSaveLogs(ctx, 1) // Ensure log poller has a latest block + lp, ethClient := setupDependencies(t, db, backend) + filterStore := logprovider.NewUpkeepFilterStore() + provider, _ := setup(logger.TestLogger(t), lp, nil, nil, filterStore, &opts) + logProvider := provider.(logprovider.LogEventProviderTest) - ids, addrs, contracts := deployUpkeepCounter(ctx, t, n, ethClient, backend, carrol, logProvider) - lp.PollAndSaveLogs(ctx, int64(n)) + n := 10 - go func() { - if err := logProvider.Start(ctx); err != nil { - t.Logf("error starting log provider: %s", err) - t.Fail() - } - }() - defer logProvider.Close() + backend.Commit() + lp.PollAndSaveLogs(ctx, 1) // Ensure log poller has a latest block - logsRounds := 10 + ids, addrs, contracts := deployUpkeepCounter(ctx, t, n, ethClient, backend, carrol, logProvider) + lp.PollAndSaveLogs(ctx, int64(n)) - poll := pollFn(ctx, t, lp, ethClient) + go func() { + if err := logProvider.Start(ctx); err != nil { + t.Logf("error starting log provider: %s", err) + t.Fail() + } + }() + defer logProvider.Close() - triggerEvents(ctx, t, backend, carrol, logsRounds, poll, contracts...) + logsRounds := 10 - poll(backend.Commit()) + poll := pollFn(ctx, t, lp, ethClient) - waitLogPoller(ctx, t, backend, lp, ethClient) + triggerEvents(ctx, t, backend, carrol, logsRounds, poll, contracts...) - waitLogProvider(ctx, t, logProvider, 3) + poll(backend.Commit()) - allPayloads := collectPayloads(ctx, t, logProvider, n, 5) - require.GreaterOrEqual(t, len(allPayloads), n, - "failed to get logs after restart") + waitLogPoller(ctx, t, backend, lp, ethClient) - t.Run("Restart", func(t *testing.T) { - t.Log("restarting log provider") - // assuming that our service was closed and restarted, - // we should be able to backfill old logs and fetch new ones - filterStore := logprovider.NewUpkeepFilterStore() - logProvider2 := logprovider.NewLogProvider(logger.TestLogger(t), lp, big.NewInt(1), logprovider.NewLogEventsPacker(), filterStore, opts) + waitLogProvider(ctx, t, logProvider, 3) - poll(backend.Commit()) - go func() { - if err2 := logProvider2.Start(ctx); err2 != nil { - t.Logf("error starting log provider: %s", err2) - t.Fail() - } - }() - defer logProvider2.Close() - - // re-register filters - for i, id := range ids { - err := logProvider2.RegisterFilter(ctx, logprovider.FilterOptions{ - UpkeepID: id, - TriggerConfig: newPlainLogTriggerConfig(addrs[i]), - // using block number at which the upkeep was registered, - // before we emitted any logs - UpdateBlock: uint64(n), - }) - require.NoError(t, err) - } + allPayloads := collectPayloads(ctx, t, logProvider, n, logsRounds/2) + require.GreaterOrEqual(t, len(allPayloads), n, + "failed to get logs after restart") - waitLogProvider(ctx, t, logProvider2, 2) + t.Run("Restart", func(t *testing.T) { + t.Log("restarting log provider") + // assuming that our service was closed and restarted, + // we should be able to backfill old logs and fetch new ones + filterStore := logprovider.NewUpkeepFilterStore() + logProvider2 := logprovider.NewLogProvider(logger.TestLogger(t), lp, big.NewInt(1), logprovider.NewLogEventsPacker(), filterStore, opts) - t.Log("getting logs after restart") - logsAfterRestart := collectPayloads(ctx, t, logProvider2, n, 5) - require.GreaterOrEqual(t, len(logsAfterRestart), n, - "failed to get logs after restart") - }) + poll(backend.Commit()) + go func() { + if err2 := logProvider2.Start(ctx); err2 != nil { + t.Logf("error starting log provider: %s", err2) + t.Fail() + } + }() + defer logProvider2.Close() + + // re-register filters + for i, id := range ids { + err := logProvider2.RegisterFilter(ctx, logprovider.FilterOptions{ + UpkeepID: id, + TriggerConfig: newPlainLogTriggerConfig(addrs[i]), + // using block number at which the upkeep was registered, + // before we emitted any logs + UpdateBlock: uint64(n), + }) + require.NoError(t, err) + } + + waitLogProvider(ctx, t, logProvider2, 2) + + t.Log("getting logs after restart") + logsAfterRestart := collectPayloads(ctx, t, logProvider2, n, 5) + require.GreaterOrEqual(t, len(logsAfterRestart), n, + "failed to get logs after restart") + }) + }) + } } func TestIntegration_LogEventProvider_UpdateConfig(t *testing.T) { @@ -198,258 +219,79 @@ func TestIntegration_LogEventProvider_UpdateConfig(t *testing.T) { } func TestIntegration_LogEventProvider_Backfill(t *testing.T) { - ctx, cancel := context.WithTimeout(testutils.Context(t), time.Second*60) - defer cancel() - - backend, stopMining, accounts := setupBackend(t) - defer stopMining() - carrol := accounts[2] - - db := setupDB(t) - defer db.Close() - - opts := logprovider.NewOptions(200) - opts.ReadInterval = time.Second / 4 - lp, ethClient := setupDependencies(t, db, backend) - filterStore := logprovider.NewUpkeepFilterStore() - provider, _ := setup(logger.TestLogger(t), lp, nil, nil, filterStore, &opts) - logProvider := provider.(logprovider.LogEventProviderTest) - - n := 10 - - backend.Commit() - lp.PollAndSaveLogs(ctx, 1) // Ensure log poller has a latest block - _, _, contracts := deployUpkeepCounter(ctx, t, n, ethClient, backend, carrol, logProvider) - - poll := pollFn(ctx, t, lp, ethClient) - - rounds := 8 - for i := 0; i < rounds; i++ { - poll(backend.Commit()) - triggerEvents(ctx, t, backend, carrol, n, poll, contracts...) - poll(backend.Commit()) - } - - waitLogPoller(ctx, t, backend, lp, ethClient) - - // starting the log provider should backfill logs - go func() { - if startErr := logProvider.Start(ctx); startErr != nil { - t.Logf("error starting log provider: %s", startErr) - t.Fail() - } - }() - defer logProvider.Close() - - waitLogProvider(ctx, t, logProvider, 3) - - allPayloads := collectPayloads(ctx, t, logProvider, n, 5) - require.GreaterOrEqual(t, len(allPayloads), len(contracts), "failed to backfill logs") -} - -func TestIntegration_LogEventProvider_RateLimit(t *testing.T) { - setupTest := func( - t *testing.T, - opts *logprovider.LogTriggersOptions, - ) ( - context.Context, - *backends.SimulatedBackend, - func(blockHash common.Hash), - logprovider.LogEventProviderTest, - []*big.Int, - func(), - ) { - ctx, cancel := context.WithCancel(testutils.Context(t)) - backend, stopMining, accounts := setupBackend(t) - userContractAccount := accounts[2] - db := setupDB(t) - - deferFunc := func() { - cancel() - stopMining() - _ = db.Close() - } - lp, ethClient := setupDependencies(t, db, backend) - filterStore := logprovider.NewUpkeepFilterStore() - provider, _ := setup(logger.TestLogger(t), lp, nil, nil, filterStore, opts) - logProvider := provider.(logprovider.LogEventProviderTest) - backend.Commit() - lp.PollAndSaveLogs(ctx, 1) // Ensure log poller has a latest block - - rounds := 5 - numberOfUserContracts := 10 - poll := pollFn(ctx, t, lp, ethClient) - - // deployUpkeepCounter creates 'n' blocks and 'n' contracts - ids, _, contracts := deployUpkeepCounter( - ctx, - t, - numberOfUserContracts, - ethClient, - backend, - userContractAccount, - logProvider) - - // have log poller save logs for current blocks - lp.PollAndSaveLogs(ctx, int64(numberOfUserContracts)) - - for i := 0; i < rounds; i++ { - triggerEvents( - ctx, - t, - backend, - userContractAccount, - numberOfUserContracts, - poll, - contracts...) - - for dummyBlocks := 0; dummyBlocks < numberOfUserContracts; dummyBlocks++ { - _ = backend.Commit() - } - - poll(backend.Commit()) - } - + tests := []struct { + name string + bufferVersion logprovider.BufferVersion + logLimit uint32 + }{ { - // total block history at this point should be 566 - var minimumBlockCount int64 = 500 - latestBlock, _ := lp.LatestBlock(ctx) - - assert.GreaterOrEqual(t, latestBlock.BlockNumber, minimumBlockCount, "to ensure the integrety of the test, the minimum block count before the test should be %d but got %d", minimumBlockCount, latestBlock) - } - - require.NoError(t, logProvider.ReadLogs(ctx, ids...)) - - return ctx, backend, poll, logProvider, ids, deferFunc + name: "default buffer", + bufferVersion: logprovider.BufferVersionDefault, + logLimit: 10, + }, + { + name: "buffer v1", + bufferVersion: logprovider.BufferVersionV1, + logLimit: 10, + }, } - // polling for logs at approximately the same rate as a chain produces - // blocks should not encounter rate limits - t.Run("should allow constant polls within the rate and burst limit", func(t *testing.T) { - ctx, backend, poll, logProvider, ids, deferFunc := setupTest(t, &logprovider.LogTriggersOptions{ - LookbackBlocks: 200, - // BlockRateLimit is set low to ensure the test does not exceed the - // rate limit - BlockRateLimit: rate.Every(50 * time.Millisecond), - // BlockLimitBurst is just set to a non-zero value - BlockLimitBurst: 5, - }) - - defer deferFunc() + for _, tc := range tests { + bufferVersion, limitLow := tc.bufferVersion, tc.logLimit + t.Run(tc.name, func(t *testing.T) { - // set the wait time between reads higher than the rate limit - readWait := 50 * time.Millisecond - timer := time.NewTimer(readWait) + ctx, cancel := context.WithTimeout(testutils.Context(t), time.Second*60) + defer cancel() - for i := 0; i < 4; i++ { - <-timer.C + backend, stopMining, accounts := setupBackend(t) + defer stopMining() + carrol := accounts[2] - // advance 1 block for every read - poll(backend.Commit()) - - err := logProvider.ReadLogs(ctx, ids...) - if err != nil { - assert.False(t, errors.Is(err, logprovider.ErrBlockLimitExceeded), "error should not contain block limit exceeded") - } + db := setupDB(t) + defer db.Close() - timer.Reset(readWait) - } + opts := logprovider.NewOptions(200, big.NewInt(1)) + opts.ReadInterval = time.Second / 4 + opts.BufferVersion = bufferVersion + opts.LogLimit = limitLow - poll(backend.Commit()) + lp, ethClient := setupDependencies(t, db, backend) + filterStore := logprovider.NewUpkeepFilterStore() + provider, _ := setup(logger.TestLogger(t), lp, nil, nil, filterStore, &opts) + logProvider := provider.(logprovider.LogEventProviderTest) - _, err := logProvider.GetLatestPayloads(ctx) + n := 10 - require.NoError(t, err) - }) + backend.Commit() + lp.PollAndSaveLogs(ctx, 1) // Ensure log poller has a latest block + _, _, contracts := deployUpkeepCounter(ctx, t, n, ethClient, backend, carrol, logProvider) - t.Run("should produce a rate limit error for over burst limit", func(t *testing.T) { - ctx, backend, poll, logProvider, ids, deferFunc := setupTest(t, &logprovider.LogTriggersOptions{ - LookbackBlocks: 200, - // BlockRateLimit is set low to ensure the test does not exceed the - // rate limit - BlockRateLimit: rate.Every(50 * time.Millisecond), - // BlockLimitBurst is just set to a non-zero value - BlockLimitBurst: 5, - }) + poll := pollFn(ctx, t, lp, ethClient) - defer deferFunc() - - // set the wait time between reads higher than the rate limit - readWait := 50 * time.Millisecond - timer := time.NewTimer(readWait) - - for i := 0; i < 4; i++ { - <-timer.C - - // advance 4 blocks for every read - for x := 0; x < 4; x++ { + rounds := 8 + for i := 0; i < rounds; i++ { + poll(backend.Commit()) + triggerEvents(ctx, t, backend, carrol, n, poll, contracts...) poll(backend.Commit()) } - err := logProvider.ReadLogs(ctx, ids...) - if err != nil { - assert.True(t, errors.Is(err, logprovider.ErrBlockLimitExceeded), "error should not contain block limit exceeded") - } - - timer.Reset(readWait) - } - - poll(backend.Commit()) + waitLogPoller(ctx, t, backend, lp, ethClient) - _, err := logProvider.GetLatestPayloads(ctx) + // starting the log provider should backfill logs + go func() { + if startErr := logProvider.Start(ctx); startErr != nil { + t.Logf("error starting log provider: %s", startErr) + t.Fail() + } + }() + defer logProvider.Close() - require.NoError(t, err) - }) + waitLogProvider(ctx, t, logProvider, 3) - t.Run("should allow polling after lookback number of blocks have passed", func(t *testing.T) { - ctx, backend, poll, logProvider, ids, deferFunc := setupTest(t, &logprovider.LogTriggersOptions{ - // BlockRateLimit is set low to ensure the test does not exceed the - // rate limit - BlockRateLimit: rate.Every(50 * time.Millisecond), - // BlockLimitBurst is set low to ensure the test exceeds the burst limit - BlockLimitBurst: 5, - // LogBlocksLookback is set low to reduce the number of blocks required - // to reset the block limiter to maxBurst - LookbackBlocks: 50, + allPayloads := collectPayloads(ctx, t, logProvider, n*rounds, 5) + require.GreaterOrEqual(t, len(allPayloads), len(contracts), "failed to backfill logs") }) - - defer deferFunc() - - // simulate a burst in unpolled blocks - for i := 0; i < 20; i++ { - _ = backend.Commit() - } - - poll(backend.Commit()) - - // all entries should error at this point because there are too many - // blocks to processes - err := logProvider.ReadLogs(ctx, ids...) - if err != nil { - assert.True(t, errors.Is(err, logprovider.ErrBlockLimitExceeded), "error should not contain block limit exceeded") - } - - // progress the chain by the same number of blocks as the lookback limit - // to trigger the usage of maxBurst - for i := 0; i < 50; i++ { - _ = backend.Commit() - } - - poll(backend.Commit()) - - // all entries should reset to the maxBurst because they are beyond - // the log lookback - err = logProvider.ReadLogs(ctx, ids...) - if err != nil { - assert.True(t, errors.Is(err, logprovider.ErrBlockLimitExceeded), "error should not contain block limit exceeded") - } - - poll(backend.Commit()) - - _, err = logProvider.GetLatestPayloads(ctx) - - require.NoError(t, err) - }) + } } func TestIntegration_LogRecoverer_Backfill(t *testing.T) { @@ -533,7 +375,6 @@ func collectPayloads(ctx context.Context, t *testing.T, logProvider logprovider. for ctx.Err() == nil && len(allPayloads) < n && rounds > 0 { logs, err := logProvider.GetLatestPayloads(ctx) require.NoError(t, err) - require.LessOrEqual(t, len(logs), logprovider.AllowedLogsPerUpkeep, "failed to get all logs") allPayloads = append(allPayloads, logs...) rounds-- } @@ -670,10 +511,10 @@ func setupDependencies(t *testing.T, db *sqlx.DB, backend *backends.SimulatedBac return lp, ethClient } -func setup(lggr logger.Logger, poller logpoller.LogPoller, c client.Client, stateStore evmregistry21.UpkeepStateReader, filterStore logprovider.UpkeepFilterStore, opts *logprovider.LogTriggersOptions) (logprovider.LogEventProvider, logprovider.LogRecoverer) { +func setup(lggr logger.Logger, poller logpoller.LogPoller, c evmclient.Client, stateStore evmregistry21.UpkeepStateReader, filterStore logprovider.UpkeepFilterStore, opts *logprovider.LogTriggersOptions) (logprovider.LogEventProvider, logprovider.LogRecoverer) { packer := logprovider.NewLogEventsPacker() if opts == nil { - o := logprovider.NewOptions(200) + o := logprovider.NewOptions(200, big.NewInt(1)) opts = &o } provider := logprovider.NewLogProvider(lggr, poller, big.NewInt(1), packer, filterStore, *opts) diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/log.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/log.go new file mode 100644 index 00000000000..9156e341688 --- /dev/null +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/log.go @@ -0,0 +1,69 @@ +package logprovider + +import ( + "encoding/hex" + + ocr2keepers "github.com/smartcontractkit/chainlink-common/pkg/types/automation" + + "github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller" +) + +// LogSorter sorts the logs based on block number, tx hash and log index. +// returns true if b should come before a. +func LogSorter(a, b logpoller.Log) bool { + return LogComparator(a, b) > 0 +} + +// LogComparator compares the logs based on block number, log index. +// tx hash is also checked in case the log index is not unique within a block. +// +// Returns: +// +// -1 if a < b +// 0 if a == b +// +1 if a > b +func LogComparator(a, b logpoller.Log) int { + blockDiff := int(a.BlockNumber - b.BlockNumber) + if blockDiff != 0 { + return normalizeCompareResult(blockDiff) + } + logIndexDiff := int(a.LogIndex - b.LogIndex) + if logIndexDiff != 0 { + return normalizeCompareResult(logIndexDiff) + } + return a.TxHash.Big().Cmp(b.TxHash.Big()) +} + +// normalizeCompareResult normalizes the result of a comparison to -1, 0, 1 +func normalizeCompareResult(res int) int { + switch { + case res < 0: + return -1 + case res > 0: + return 1 + default: + return 0 + } +} + +// logID returns a unique identifier for a log, which is an hex string +// of ocr2keepers.LogTriggerExtension.LogIdentifier() +func logID(l logpoller.Log) string { + ext := ocr2keepers.LogTriggerExtension{ + Index: uint32(l.LogIndex), + } + copy(ext.TxHash[:], l.TxHash[:]) + copy(ext.BlockHash[:], l.BlockHash[:]) + return hex.EncodeToString(ext.LogIdentifier()) +} + +// latestBlockNumber returns the latest block number from the given logs +func latestBlockNumber(logs ...logpoller.Log) int64 { + var latest int64 + for _, l := range logs { + if l.BlockNumber > latest { + latest = l.BlockNumber + } + } + return latest +} diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/log_test.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/log_test.go new file mode 100644 index 00000000000..9ee8e98a996 --- /dev/null +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/log_test.go @@ -0,0 +1,133 @@ +package logprovider + +import ( + "testing" + + "github.com/ethereum/go-ethereum/common" + "github.com/stretchr/testify/require" + + "github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller" +) + +func TestLogComparatorSorter(t *testing.T) { + tests := []struct { + name string + a logpoller.Log + b logpoller.Log + wantCmp int + wantSort bool + }{ + { + name: "a == b", + a: logpoller.Log{ + BlockNumber: 1, + TxHash: common.HexToHash("0x1"), + LogIndex: 1, + }, + b: logpoller.Log{ + BlockNumber: 1, + TxHash: common.HexToHash("0x1"), + LogIndex: 1, + }, + wantCmp: 0, + wantSort: false, + }, + { + name: "a < b: block number", + a: logpoller.Log{ + BlockNumber: 1, + TxHash: common.HexToHash("0x1"), + LogIndex: 1, + }, + b: logpoller.Log{ + BlockNumber: 4, + TxHash: common.HexToHash("0x1"), + LogIndex: 1, + }, + wantCmp: -1, + wantSort: false, + }, + { + name: "a < b: log index", + a: logpoller.Log{ + BlockNumber: 1, + TxHash: common.HexToHash("0x1"), + LogIndex: 1, + }, + b: logpoller.Log{ + BlockNumber: 1, + TxHash: common.HexToHash("0x1"), + LogIndex: 2, + }, + wantCmp: -1, + wantSort: false, + }, + { + name: "a > b: block number", + a: logpoller.Log{ + BlockNumber: 3, + TxHash: common.HexToHash("0x1"), + LogIndex: 1, + }, + b: logpoller.Log{ + BlockNumber: 2, + TxHash: common.HexToHash("0x1"), + LogIndex: 1, + }, + wantCmp: 1, + wantSort: true, + }, + { + name: "a > b: log index", + a: logpoller.Log{ + BlockNumber: 1, + TxHash: common.HexToHash("0x1"), + LogIndex: 4, + }, + b: logpoller.Log{ + BlockNumber: 1, + TxHash: common.HexToHash("0x1"), + LogIndex: 2, + }, + wantCmp: 1, + wantSort: true, + }, + { + name: "a > b: tx hash", + a: logpoller.Log{ + BlockNumber: 1, + TxHash: common.HexToHash("0x21"), + LogIndex: 2, + }, + b: logpoller.Log{ + BlockNumber: 1, + TxHash: common.HexToHash("0x1"), + LogIndex: 2, + }, + wantCmp: 1, + wantSort: true, + }, + { + name: "a < b: tx hash", + a: logpoller.Log{ + BlockNumber: 1, + TxHash: common.HexToHash("0x1"), + LogIndex: 2, + }, + b: logpoller.Log{ + BlockNumber: 1, + TxHash: common.HexToHash("0x4"), + LogIndex: 2, + }, + wantCmp: -1, + wantSort: false, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + require.Equal(t, tc.wantCmp, LogComparator(tc.a, tc.b)) + require.Equal(t, tc.wantSort, LogSorter(tc.a, tc.b)) + }) + } +} diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/provider.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/provider.go index 60505a2989e..b07b08d3354 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/provider.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/provider.go @@ -46,6 +46,10 @@ var ( // reorgBuffer is the number of blocks to add as a buffer to the block range when reading logs. reorgBuffer = int64(32) readerThreads = 4 + + bufferSyncInterval = 10 * time.Minute + // logLimitMinimum is how low the log limit can go. + logLimitMinimum = 1 ) // LogTriggerConfig is an alias for log trigger config. @@ -79,8 +83,13 @@ type LogEventProviderTest interface { CurrentPartitionIdx() uint64 } +type LogEventProviderFeatures interface { + WithBufferVersion(v BufferVersion) +} + var _ LogEventProvider = &logEventProvider{} var _ LogEventProviderTest = &logEventProvider{} +var _ LogEventProviderFeatures = &logEventProvider{} // logEventProvider manages log filters for upkeeps and enables to read the log events. type logEventProvider struct { @@ -98,6 +107,7 @@ type logEventProvider struct { filterStore UpkeepFilterStore buffer *logEventBuffer + bufferV1 LogBuffer opts LogTriggersOptions @@ -107,18 +117,12 @@ type logEventProvider struct { } func NewLogProvider(lggr logger.Logger, poller logpoller.LogPoller, chainID *big.Int, packer LogDataPacker, filterStore UpkeepFilterStore, opts LogTriggersOptions) *logEventProvider { - defaultBlockRate := defaultBlockRateForChain(chainID) - defaultLogLimit := defaultLogLimitForChain(chainID) - - // TODO apply these to the log buffer later - _ = defaultBlockRate - _ = defaultLogLimit - return &logEventProvider{ threadCtrl: utils.NewThreadControl(), lggr: lggr.Named("KeepersRegistry.LogEventProvider"), packer: packer, buffer: newLogEventBuffer(lggr, int(opts.LookbackBlocks), defaultNumOfLogUpkeeps, defaultFastExecLogsHigh), + bufferV1: NewLogBuffer(lggr, uint32(opts.LookbackBlocks), opts.BlockRate, opts.LogLimit), poller: poller, opts: opts, filterStore: filterStore, @@ -127,20 +131,38 @@ func NewLogProvider(lggr logger.Logger, poller logpoller.LogPoller, chainID *big } func (p *logEventProvider) SetConfig(cfg ocr2keepers.LogEventProviderConfig) { + p.lock.Lock() + defer p.lock.Unlock() + blockRate := cfg.BlockRate logLimit := cfg.LogLimit if blockRate == 0 { - blockRate = defaultBlockRateForChain(p.chainID) + blockRate = p.opts.defaultBlockRate() } if logLimit == 0 { - logLimit = defaultLogLimitForChain(p.chainID) + logLimit = p.opts.defaultLogLimit() } p.lggr.With("where", "setConfig").Infow("setting config ", "bockRate", blockRate, "logLimit", logLimit) - // TODO set block rate and log limit on the buffer - //p.buffer.SetConfig(blockRate, logLimit) + atomic.StoreUint32(&p.opts.BlockRate, blockRate) + atomic.StoreUint32(&p.opts.LogLimit, logLimit) + + switch p.opts.BufferVersion { + case BufferVersionV1: + p.bufferV1.SetConfig(uint32(p.opts.LookbackBlocks), blockRate, logLimit) + default: + } +} + +func (p *logEventProvider) WithBufferVersion(v BufferVersion) { + p.lock.Lock() + defer p.lock.Unlock() + + p.lggr.Debugw("with buffer version", "version", v) + + p.opts.BufferVersion = v } func (p *logEventProvider) Start(context.Context) error { @@ -169,6 +191,24 @@ func (p *logEventProvider) Start(context.Context) error { }) }) + p.threadCtrl.Go(func(ctx context.Context) { + // sync filters with buffer periodically, + // to ensure that inactive upkeeps won't waste capacity. + ticker := time.NewTicker(bufferSyncInterval) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + if err := p.syncBufferFilters(); err != nil { + p.lggr.Warnw("failed to sync buffer filters", "err", err) + } + case <-ctx.Done(): + return + } + } + }) + return nil }) } @@ -190,33 +230,94 @@ func (p *logEventProvider) GetLatestPayloads(ctx context.Context) ([]ocr2keepers return nil, fmt.Errorf("%w: %s", ErrHeadNotAvailable, err) } prommetrics.AutomationLogProviderLatestBlock.Set(float64(latest.BlockNumber)) - start := latest.BlockNumber - p.opts.LookbackBlocks - if start <= 0 { - start = 1 + payloads := p.getLogsFromBuffer(latest.BlockNumber) + + if len(payloads) > 0 { + p.lggr.Debugw("Fetched payloads from buffer", "latestBlock", latest.BlockNumber, "payloads", len(payloads)) } - logs := p.buffer.dequeueRange(start, latest.BlockNumber, AllowedLogsPerUpkeep, MaxPayloads) - // p.lggr.Debugw("got latest logs from buffer", "latest", latest, "diff", diff, "logs", len(logs)) + return payloads, nil +} +func (p *logEventProvider) createPayload(id *big.Int, log logpoller.Log) (ocr2keepers.UpkeepPayload, error) { + trig := logToTrigger(log) + checkData, err := p.packer.PackLogData(log) + if err != nil { + p.lggr.Warnw("failed to pack log data", "err", err, "log", log, "id", id) + return ocr2keepers.UpkeepPayload{}, err + } + payload, err := core.NewUpkeepPayload(id, trig, checkData) + if err != nil { + p.lggr.Warnw("failed to create upkeep payload", "err", err, "id", id, "trigger", trig, "checkData", checkData) + return ocr2keepers.UpkeepPayload{}, err + } + return payload, nil +} + +// getBufferDequeueArgs returns the arguments for the buffer to dequeue logs. +// It adjust the log limit low based on the number of upkeeps to ensure that more upkeeps get slots in the result set. +func (p *logEventProvider) getBufferDequeueArgs() (blockRate, logLimitLow, maxResults, numOfUpkeeps int) { + blockRate, logLimitLow, maxResults, numOfUpkeeps = int(p.opts.BlockRate), int(p.opts.LogLimit), MaxPayloads, p.bufferV1.NumOfUpkeeps() + // in case we have more upkeeps than the max results, we reduce the log limit low + // so that more upkeeps will get slots in the result set. + for numOfUpkeeps > maxResults/logLimitLow { + if logLimitLow == logLimitMinimum { + // Log limit low can't go less than logLimitMinimum (1). + // If some upkeeps are not getting slots in the result set, they supposed to be picked up + // in the next iteration if the range is still applicable. + // TODO: alerts to notify the system is at full capacity. + // TODO: handle this case properly by distributing available slots across upkeeps to avoid + // starvation when log volume is high. + p.lggr.Warnw("The system is at full capacity", "maxResults", maxResults, "numOfUpkeeps", numOfUpkeeps, "logLimitLow", logLimitLow) + break + } + p.lggr.Debugw("Too many upkeeps, reducing the log limit low", "maxResults", maxResults, "numOfUpkeeps", numOfUpkeeps, "logLimitLow_before", logLimitLow) + logLimitLow-- + } + return +} + +func (p *logEventProvider) getLogsFromBuffer(latestBlock int64) []ocr2keepers.UpkeepPayload { var payloads []ocr2keepers.UpkeepPayload - for _, l := range logs { - log := l.log - trig := logToTrigger(log) - checkData, err := p.packer.PackLogData(log) - if err != nil { - p.lggr.Warnw("failed to pack log data", "err", err, "log", log) - continue + + start := latestBlock - p.opts.LookbackBlocks + if start <= 0 { // edge case when the chain is new (e.g. tests) + start = 1 + } + + switch p.opts.BufferVersion { + case BufferVersionV1: + // in v1, we use a greedy approach - we keep dequeuing logs until we reach the max results or cover the entire range. + blockRate, logLimitLow, maxResults, _ := p.getBufferDequeueArgs() + for len(payloads) < maxResults && start <= latestBlock { + logs, remaining := p.bufferV1.Dequeue(start, blockRate, logLimitLow, maxResults-len(payloads), DefaultUpkeepSelector) + if len(logs) > 0 { + p.lggr.Debugw("Dequeued logs", "start", start, "latestBlock", latestBlock, "logs", len(logs)) + } + for _, l := range logs { + payload, err := p.createPayload(l.ID, l.Log) + if err == nil { + payloads = append(payloads, payload) + } + } + if remaining > 0 { + p.lggr.Debugw("Remaining logs", "start", start, "latestBlock", latestBlock, "remaining", remaining) + // TODO: handle remaining logs in a better way than consuming the entire window, e.g. do not repeat more than x times + continue + } + start += int64(blockRate) } - payload, err := core.NewUpkeepPayload(l.upkeepID, trig, checkData) - if err != nil { - p.lggr.Warnw("failed to create upkeep payload", "err", err, "id", l.upkeepID, "trigger", trig, "checkData", checkData) - continue + default: + logs := p.buffer.dequeueRange(start, latestBlock, AllowedLogsPerUpkeep, MaxPayloads) + for _, l := range logs { + payload, err := p.createPayload(l.upkeepID, l.log) + if err == nil { + payloads = append(payloads, payload) + } } - - payloads = append(payloads, payload) } - return payloads, nil + return payloads } // ReadLogs fetches the logs for the given upkeeps. @@ -380,8 +481,6 @@ func (p *logEventProvider) readLogs(ctx context.Context, latest int64, filters [ // special case of a new blockchain (e.g. simulated chain) lookbackBlocks = latest - 1 } - // maxBurst will be used to increase the burst limit to allow a long range scan - maxBurst := int(lookbackBlocks + 1) for i, filter := range filters { if len(filter.addr) == 0 { @@ -391,13 +490,6 @@ func (p *logEventProvider) readLogs(ctx context.Context, latest int64, filters [ // range should not exceed [lookbackBlocks, latest] if start < latest-lookbackBlocks { start = latest - lookbackBlocks - filter.blockLimiter.SetBurst(maxBurst) - } - - resv := filter.blockLimiter.ReserveN(time.Now(), int(latest-start)) - if !resv.OK() { - merr = errors.Join(merr, fmt.Errorf("%w: %s", ErrBlockLimitExceeded, filter.upkeepID.String())) - continue } // adding a buffer to check for reorged logs. start = start - reorgBuffer @@ -408,8 +500,6 @@ func (p *logEventProvider) readLogs(ctx context.Context, latest int64, filters [ // query logs based on contract address, event sig, and blocks logs, err := p.poller.LogsWithSigs(ctx, start, latest, []common.Hash{filter.topics[0]}, common.BytesToAddress(filter.addr)) if err != nil { - // cancel limit reservation as we failed to get logs - resv.Cancel() if ctx.Err() != nil { // exit if the context was canceled return merr @@ -419,15 +509,12 @@ func (p *logEventProvider) readLogs(ctx context.Context, latest int64, filters [ } filteredLogs := filter.Select(logs...) - // if this limiter's burst was set to the max -> - // reset it and cancel the reservation to allow further processing - if filter.blockLimiter.Burst() == maxBurst { - resv.Cancel() - filter.blockLimiter.SetBurst(p.opts.BlockLimitBurst) + switch p.opts.BufferVersion { + case BufferVersionV1: + p.bufferV1.Enqueue(filter.upkeepID, filteredLogs...) + default: + p.buffer.enqueue(filter.upkeepID, filteredLogs...) } - - p.buffer.enqueue(filter.upkeepID, filteredLogs...) - // Update the lastPollBlock for filter in slice this is then // updated into filter store in updateFiltersLastPoll filters[i].lastPollBlock = latest @@ -436,24 +523,15 @@ func (p *logEventProvider) readLogs(ctx context.Context, latest int64, filters [ return merr } -func defaultBlockRateForChain(chainID *big.Int) uint32 { - switch chainID.Int64() { - case 42161, 421613, 421614: // Arbitrum - return 4 - default: - return 1 - } -} +func (p *logEventProvider) syncBufferFilters() error { + p.lock.RLock() + buffVersion := p.opts.BufferVersion + p.lock.RUnlock() -func defaultLogLimitForChain(chainID *big.Int) uint32 { - switch chainID.Int64() { - case 42161, 421613, 421614: // Arbitrum - return 1 - case 1, 4, 5, 42, 11155111: // Eth - return 20 - case 10, 420, 56, 97, 137, 80001, 43113, 43114, 8453, 84531: // Optimism, BSC, Polygon, Avax, Base - return 5 + switch buffVersion { + case BufferVersionV1: + return p.bufferV1.SyncFilters(p.filterStore) default: - return 1 + return nil } } diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/provider_life_cycle.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/provider_life_cycle.go index ae6a373ad22..db47ac2ecd8 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/provider_life_cycle.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/provider_life_cycle.go @@ -9,7 +9,6 @@ import ( "time" "github.com/ethereum/go-ethereum/common" - "golang.org/x/time/rate" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller" ) @@ -84,8 +83,7 @@ func (p *logEventProvider) RegisterFilter(ctx context.Context, opts FilterOption filter = *currentFilter } else { // new filter filter = upkeepFilter{ - upkeepID: upkeepID, - blockLimiter: rate.NewLimiter(p.opts.BlockRateLimit, p.opts.BlockLimitBurst), + upkeepID: upkeepID, } } filter.lastPollBlock = 0 diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/provider_life_cycle_test.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/provider_life_cycle_test.go index 96a397827be..26e989c7466 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/provider_life_cycle_test.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/provider_life_cycle_test.go @@ -100,7 +100,7 @@ func TestLogEventProvider_LifeCycle(t *testing.T) { }, } - p := NewLogProvider(logger.TestLogger(t), nil, big.NewInt(1), &mockedPacker{}, NewUpkeepFilterStore(), NewOptions(200)) + p := NewLogProvider(logger.TestLogger(t), nil, big.NewInt(1), &mockedPacker{}, NewUpkeepFilterStore(), NewOptions(200, big.NewInt(1))) for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { @@ -152,7 +152,7 @@ func TestEventLogProvider_RefreshActiveUpkeeps(t *testing.T) { mp.On("LatestBlock", mock.Anything).Return(logpoller.LogPollerBlock{}, nil) mp.On("ReplayAsync", mock.Anything).Return(nil) - p := NewLogProvider(logger.TestLogger(t), mp, big.NewInt(1), &mockedPacker{}, NewUpkeepFilterStore(), NewOptions(200)) + p := NewLogProvider(logger.TestLogger(t), mp, big.NewInt(1), &mockedPacker{}, NewUpkeepFilterStore(), NewOptions(200, big.NewInt(1))) require.NoError(t, p.RegisterFilter(ctx, FilterOptions{ UpkeepID: core.GenUpkeepID(types.LogTrigger, "1111").BigInt(), @@ -231,7 +231,7 @@ func TestLogEventProvider_ValidateLogTriggerConfig(t *testing.T) { }, } - p := NewLogProvider(logger.TestLogger(t), nil, big.NewInt(1), &mockedPacker{}, NewUpkeepFilterStore(), NewOptions(200)) + p := NewLogProvider(logger.TestLogger(t), nil, big.NewInt(1), &mockedPacker{}, NewUpkeepFilterStore(), NewOptions(200, big.NewInt(1))) for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { err := p.validateLogTriggerConfig(tc.cfg) diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/provider_test.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/provider_test.go index ade2c630ebd..57da895403e 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/provider_test.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/provider_test.go @@ -11,7 +11,6 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" - "golang.org/x/time/rate" ocr2keepers "github.com/smartcontractkit/chainlink-common/pkg/types/automation" @@ -22,7 +21,7 @@ import ( ) func TestLogEventProvider_GetFilters(t *testing.T) { - p := NewLogProvider(logger.TestLogger(t), nil, big.NewInt(1), &mockedPacker{}, NewUpkeepFilterStore(), NewOptions(200)) + p := NewLogProvider(logger.TestLogger(t), nil, big.NewInt(1), &mockedPacker{}, NewUpkeepFilterStore(), NewOptions(200, big.NewInt(1))) _, f := newEntry(p, 1) p.filterStore.AddActiveUpkeeps(f) @@ -64,7 +63,7 @@ func TestLogEventProvider_GetFilters(t *testing.T) { } func TestLogEventProvider_UpdateEntriesLastPoll(t *testing.T) { - p := NewLogProvider(logger.TestLogger(t), nil, big.NewInt(1), &mockedPacker{}, NewUpkeepFilterStore(), NewOptions(200)) + p := NewLogProvider(logger.TestLogger(t), nil, big.NewInt(1), &mockedPacker{}, NewUpkeepFilterStore(), NewOptions(200, big.NewInt(1))) n := 10 @@ -177,7 +176,7 @@ func TestLogEventProvider_ScheduleReadJobs(t *testing.T) { ctx := testutils.Context(t) readInterval := 10 * time.Millisecond - opts := NewOptions(200) + opts := NewOptions(200, big.NewInt(1)) opts.ReadInterval = readInterval p := NewLogProvider(logger.TestLogger(t), mp, big.NewInt(1), &mockedPacker{}, NewUpkeepFilterStore(), opts) @@ -255,7 +254,7 @@ func TestLogEventProvider_ReadLogs(t *testing.T) { }, nil) filterStore := NewUpkeepFilterStore() - p := NewLogProvider(logger.TestLogger(t), mp, big.NewInt(1), &mockedPacker{}, filterStore, NewOptions(200)) + p := NewLogProvider(logger.TestLogger(t), mp, big.NewInt(1), &mockedPacker{}, filterStore, NewOptions(200, big.NewInt(1))) var ids []*big.Int for i := 0; i < 10; i++ { @@ -310,10 +309,9 @@ func newEntry(p *logEventProvider, i int, args ...string) (LogTriggerConfig, upk topics := make([]common.Hash, len(filter.EventSigs)) copy(topics, filter.EventSigs) f := upkeepFilter{ - upkeepID: uid, - addr: filter.Addresses[0].Bytes(), - topics: topics, - blockLimiter: rate.NewLimiter(p.opts.BlockRateLimit, p.opts.BlockLimitBurst), + upkeepID: uid, + addr: filter.Addresses[0].Bytes(), + topics: topics, } return cfg, f } diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/recoverer.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/recoverer.go index 26c56c23b8c..5ef321cbf7d 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/recoverer.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/recoverer.go @@ -100,8 +100,8 @@ func NewLogRecoverer(lggr logger.Logger, poller logpoller.LogPoller, client clie threadCtrl: utils.NewThreadControl(), - blockTime: &atomic.Int64{}, - lookbackBlocks: &atomic.Int64{}, + blockTime: new(atomic.Int64), + lookbackBlocks: new(atomic.Int64), interval: opts.ReadInterval * 5, pending: make([]ocr2keepers.UpkeepPayload, 0), diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/recoverer_test.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/recoverer_test.go index 54338207190..65a05b2537e 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/recoverer_test.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/recoverer_test.go @@ -34,7 +34,7 @@ func TestLogRecoverer_GetRecoverables(t *testing.T) { ctx := testutils.Context(t) lp := &lpmocks.LogPoller{} lp.On("LatestBlock", mock.Anything).Return(logpoller.LogPollerBlock{BlockNumber: 100}, nil) - r := NewLogRecoverer(logger.TestLogger(t), lp, nil, nil, nil, nil, NewOptions(200)) + r := NewLogRecoverer(logger.TestLogger(t), lp, nil, nil, nil, nil, NewOptions(200, big.NewInt(1))) tests := []struct { name string @@ -1152,7 +1152,7 @@ func TestLogRecoverer_pending(t *testing.T) { maxPendingPayloadsPerUpkeep = origMaxPendingPayloadsPerUpkeep }() - r := NewLogRecoverer(logger.TestLogger(t), nil, nil, nil, nil, nil, NewOptions(200)) + r := NewLogRecoverer(logger.TestLogger(t), nil, nil, nil, nil, nil, NewOptions(200, big.NewInt(1))) r.lock.Lock() r.pending = tc.exist for i, p := range tc.new { @@ -1233,7 +1233,7 @@ func setupTestRecoverer(t *testing.T, interval time.Duration, lookbackBlocks int lp := new(lpmocks.LogPoller) statesReader := new(mocks.UpkeepStateReader) filterStore := NewUpkeepFilterStore() - opts := NewOptions(lookbackBlocks) + opts := NewOptions(lookbackBlocks, big.NewInt(1)) opts.ReadInterval = interval / 5 opts.LookbackBlocks = lookbackBlocks recoverer := NewLogRecoverer(logger.TestLogger(t), lp, nil, statesReader, &mockedPacker{}, filterStore, opts) diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/prommetrics/metrics.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/prommetrics/metrics.go index 6b68f5c6afd..682b8710c0c 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/prommetrics/metrics.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/prommetrics/metrics.go @@ -41,6 +41,7 @@ const ( LogBufferFlowDirectionIngress = "ingress" LogBufferFlowDirectionEgress = "egress" LogBufferFlowDirectionDropped = "dropped" + LogBufferFlowDirectionExpired = "expired" ) // Automation metrics diff --git a/core/services/ocr2/plugins/ocr2keeper/integration_21_test.go b/core/services/ocr2/plugins/ocr2keeper/integration_21_test.go index 4aa9b0cb7dc..288e7e74fdb 100644 --- a/core/services/ocr2/plugins/ocr2keeper/integration_21_test.go +++ b/core/services/ocr2/plugins/ocr2keeper/integration_21_test.go @@ -54,6 +54,7 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/services/job" "github.com/smartcontractkit/chainlink/v2/core/services/keystore/keys/ethkey" "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ocr2keeper" + "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider" "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury" "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/streams" "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm" @@ -118,7 +119,7 @@ func TestIntegration_KeeperPluginConditionalUpkeep(t *testing.T) { require.NoError(t, err) registry := deployKeeper21Registry(t, steve, backend, linkAddr, linkFeedAddr, gasFeedAddr) - setupNodes(t, nodeKeys, registry, backend, steve) + setupNodes(t, nodeKeys, registry, backend, steve, false) <-time.After(time.Second * 5) @@ -172,311 +173,368 @@ func TestIntegration_KeeperPluginConditionalUpkeep(t *testing.T) { } func TestIntegration_KeeperPluginLogUpkeep(t *testing.T) { - g := gomega.NewWithT(t) - - // setup blockchain - sergey := testutils.MustNewSimTransactor(t) // owns all the link - steve := testutils.MustNewSimTransactor(t) // registry owner - carrol := testutils.MustNewSimTransactor(t) // upkeep owner - genesisData := core.GenesisAlloc{ - sergey.From: {Balance: assets.Ether(10000).ToInt()}, - steve.From: {Balance: assets.Ether(10000).ToInt()}, - carrol.From: {Balance: assets.Ether(10000).ToInt()}, - } - // Generate 5 keys for nodes (1 bootstrap + 4 ocr nodes) and fund them with ether - var nodeKeys [5]ethkey.KeyV2 - for i := int64(0); i < 5; i++ { - nodeKeys[i] = cltest.MustGenerateRandomKey(t) - genesisData[nodeKeys[i].Address] = core.GenesisAccount{Balance: assets.Ether(1000).ToInt()} + tests := []struct { + name string + logBufferVersion logprovider.BufferVersion + }{ + { + name: "default buffer", + logBufferVersion: logprovider.BufferVersionDefault, + }, + { + name: "buffer v1", + logBufferVersion: logprovider.BufferVersionV1, + }, } - backend := cltest.NewSimulatedBackend(t, genesisData, uint32(ethconfig.Defaults.Miner.GasCeil)) - stopMining := cltest.Mine(backend, 3*time.Second) // Should be greater than deltaRound since we cannot access old blocks on simulated blockchain - defer stopMining() + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + g := gomega.NewWithT(t) + + // setup blockchain + sergey := testutils.MustNewSimTransactor(t) // owns all the link + steve := testutils.MustNewSimTransactor(t) // registry owner + carrol := testutils.MustNewSimTransactor(t) // upkeep owner + genesisData := core.GenesisAlloc{ + sergey.From: {Balance: assets.Ether(10000).ToInt()}, + steve.From: {Balance: assets.Ether(10000).ToInt()}, + carrol.From: {Balance: assets.Ether(10000).ToInt()}, + } + // Generate 5 keys for nodes (1 bootstrap + 4 ocr nodes) and fund them with ether + var nodeKeys [5]ethkey.KeyV2 + for i := int64(0); i < 5; i++ { + nodeKeys[i] = cltest.MustGenerateRandomKey(t) + genesisData[nodeKeys[i].Address] = core.GenesisAccount{Balance: assets.Ether(1000).ToInt()} + } - // Deploy registry - linkAddr, _, linkToken, err := link_token_interface.DeployLinkToken(sergey, backend) - require.NoError(t, err) - gasFeedAddr, _, _, err := mock_v3_aggregator_contract.DeployMockV3AggregatorContract(steve, backend, 18, big.NewInt(60000000000)) - require.NoError(t, err) - linkFeedAddr, _, _, err := mock_v3_aggregator_contract.DeployMockV3AggregatorContract(steve, backend, 18, big.NewInt(2000000000000000000)) - require.NoError(t, err) + backend := cltest.NewSimulatedBackend(t, genesisData, uint32(ethconfig.Defaults.Miner.GasCeil)) + stopMining := cltest.Mine(backend, 3*time.Second) // Should be greater than deltaRound since we cannot access old blocks on simulated blockchain + defer stopMining() - registry := deployKeeper21Registry(t, steve, backend, linkAddr, linkFeedAddr, gasFeedAddr) - setupNodes(t, nodeKeys, registry, backend, steve) - upkeeps := 1 + // Deploy registry + linkAddr, _, linkToken, err := link_token_interface.DeployLinkToken(sergey, backend) + require.NoError(t, err) + gasFeedAddr, _, _, err := mock_v3_aggregator_contract.DeployMockV3AggregatorContract(steve, backend, 18, big.NewInt(60000000000)) + require.NoError(t, err) + linkFeedAddr, _, _, err := mock_v3_aggregator_contract.DeployMockV3AggregatorContract(steve, backend, 18, big.NewInt(2000000000000000000)) + require.NoError(t, err) - _, err = linkToken.Transfer(sergey, carrol.From, big.NewInt(0).Mul(oneHunEth, big.NewInt(int64(upkeeps+1)))) - require.NoError(t, err) + registry := deployKeeper21Registry(t, steve, backend, linkAddr, linkFeedAddr, gasFeedAddr) + setupNodes(t, nodeKeys, registry, backend, steve, tc.logBufferVersion == logprovider.BufferVersionV1) + upkeeps := 1 - backend.Commit() + _, err = linkToken.Transfer(sergey, carrol.From, big.NewInt(0).Mul(oneHunEth, big.NewInt(int64(upkeeps+1)))) + require.NoError(t, err) - ids, addrs, contracts := deployUpkeeps(t, backend, carrol, steve, linkToken, registry, upkeeps) - require.Equal(t, upkeeps, len(ids)) - require.Equal(t, len(ids), len(contracts)) - require.Equal(t, len(ids), len(addrs)) + backend.Commit() - backend.Commit() + ids, addrs, contracts := deployUpkeeps(t, backend, carrol, steve, linkToken, registry, upkeeps) + require.Equal(t, upkeeps, len(ids)) + require.Equal(t, len(ids), len(contracts)) + require.Equal(t, len(ids), len(addrs)) - emits := 1 - go emitEvents(testutils.Context(t), t, emits, contracts, carrol, func() { - backend.Commit() - }) - - listener, done := listenPerformed(t, backend, registry, ids, int64(1)) - g.Eventually(listener, testutils.WaitTimeout(t), cltest.DBPollingInterval).Should(gomega.BeTrue()) - done() + backend.Commit() - t.Run("recover logs", func(t *testing.T) { - addr, contract := addrs[0], contracts[0] - upkeepID := registerUpkeep(t, registry, addr, carrol, steve, backend) - backend.Commit() - t.Logf("Registered new upkeep %s for address %s", upkeepID.String(), addr.String()) - // Emit 100 logs in a burst - recoverEmits := 100 - i := 0 - emitEvents(testutils.Context(t), t, 100, []*log_upkeep_counter_wrapper.LogUpkeepCounter{contract}, carrol, func() { - i++ - if i%(recoverEmits/4) == 0 { + emits := 1 + go emitEvents(testutils.Context(t), t, emits, contracts, carrol, func() { backend.Commit() - time.Sleep(time.Millisecond * 250) // otherwise we get "invalid transaction nonce" errors - } - }) + }) - beforeDummyBlocks := backend.Blockchain().CurrentBlock().Number.Uint64() + listener, done := listenPerformed(t, backend, registry, ids, int64(1)) + g.Eventually(listener, testutils.WaitTimeout(t), cltest.DBPollingInterval).Should(gomega.BeTrue()) + done() - // Mine enough blocks to ensure these logs don't fall into log provider range - dummyBlocks := 500 - for i := 0; i < dummyBlocks; i++ { - backend.Commit() - time.Sleep(time.Millisecond * 10) - } + t.Run("recover logs", func(t *testing.T) { + addr, contract := addrs[0], contracts[0] + upkeepID := registerUpkeep(t, registry, addr, carrol, steve, backend) + backend.Commit() + t.Logf("Registered new upkeep %s for address %s", upkeepID.String(), addr.String()) + // Emit 100 logs in a burst + recoverEmits := 100 + i := 0 + emitEvents(testutils.Context(t), t, 100, []*log_upkeep_counter_wrapper.LogUpkeepCounter{contract}, carrol, func() { + i++ + if i%(recoverEmits/4) == 0 { + backend.Commit() + time.Sleep(time.Millisecond * 250) // otherwise we get "invalid transaction nonce" errors + } + }) - t.Logf("Mined %d blocks, waiting for logs to be recovered", dummyBlocks) + beforeDummyBlocks := backend.Blockchain().CurrentBlock().Number.Uint64() - listener, done := listenPerformedN(t, backend, registry, ids, int64(beforeDummyBlocks), recoverEmits) - g.Eventually(listener, testutils.WaitTimeout(t), cltest.DBPollingInterval).Should(gomega.BeTrue()) - done() - }) -} + // Mine enough blocks to ensure these logs don't fall into log provider range + dummyBlocks := 500 + for i := 0; i < dummyBlocks; i++ { + backend.Commit() + time.Sleep(time.Millisecond * 10) + } -func TestIntegration_KeeperPluginLogUpkeep_Retry(t *testing.T) { - g := gomega.NewWithT(t) + t.Logf("Mined %d blocks, waiting for logs to be recovered", dummyBlocks) - // setup blockchain - linkOwner := testutils.MustNewSimTransactor(t) // owns all the link - registryOwner := testutils.MustNewSimTransactor(t) // registry owner - upkeepOwner := testutils.MustNewSimTransactor(t) // upkeep owner - genesisData := core.GenesisAlloc{ - linkOwner.From: {Balance: assets.Ether(10000).ToInt()}, - registryOwner.From: {Balance: assets.Ether(10000).ToInt()}, - upkeepOwner.From: {Balance: assets.Ether(10000).ToInt()}, + listener, done := listenPerformedN(t, backend, registry, ids, int64(beforeDummyBlocks), recoverEmits) + defer done() + g.Eventually(listener, testutils.WaitTimeout(t), cltest.DBPollingInterval).Should(gomega.BeTrue()) + }) + }) } +} - // Generate 5 keys for nodes (1 bootstrap + 4 ocr nodes) and fund them with ether - var nodeKeys [5]ethkey.KeyV2 - for i := int64(0); i < 5; i++ { - nodeKeys[i] = cltest.MustGenerateRandomKey(t) - genesisData[nodeKeys[i].Address] = core.GenesisAccount{Balance: assets.Ether(1000).ToInt()} +func TestIntegration_KeeperPluginLogUpkeep_Retry(t *testing.T) { + tests := []struct { + name string + logBufferVersion logprovider.BufferVersion + }{ + { + name: "default buffer", + logBufferVersion: logprovider.BufferVersionDefault, + }, + { + name: "buffer v1", + logBufferVersion: logprovider.BufferVersionV1, + }, } - backend := cltest.NewSimulatedBackend(t, genesisData, uint32(ethconfig.Defaults.Miner.GasCeil)) - stopMining := cltest.Mine(backend, 3*time.Second) // Should be greater than deltaRound since we cannot access old blocks on simulated blockchain - defer stopMining() + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + g := gomega.NewWithT(t) + + // setup blockchain + linkOwner := testutils.MustNewSimTransactor(t) // owns all the link + registryOwner := testutils.MustNewSimTransactor(t) // registry owner + upkeepOwner := testutils.MustNewSimTransactor(t) // upkeep owner + genesisData := core.GenesisAlloc{ + linkOwner.From: {Balance: assets.Ether(10000).ToInt()}, + registryOwner.From: {Balance: assets.Ether(10000).ToInt()}, + upkeepOwner.From: {Balance: assets.Ether(10000).ToInt()}, + } - // Deploy registry - linkAddr, _, linkToken, err := link_token_interface.DeployLinkToken(linkOwner, backend) - require.NoError(t, err) + // Generate 5 keys for nodes (1 bootstrap + 4 ocr nodes) and fund them with ether + var nodeKeys [5]ethkey.KeyV2 + for i := int64(0); i < 5; i++ { + nodeKeys[i] = cltest.MustGenerateRandomKey(t) + genesisData[nodeKeys[i].Address] = core.GenesisAccount{Balance: assets.Ether(1000).ToInt()} + } - gasFeedAddr, _, _, err := mock_v3_aggregator_contract.DeployMockV3AggregatorContract(registryOwner, backend, 18, big.NewInt(60000000000)) - require.NoError(t, err) + backend := cltest.NewSimulatedBackend(t, genesisData, uint32(ethconfig.Defaults.Miner.GasCeil)) + stopMining := cltest.Mine(backend, 3*time.Second) // Should be greater than deltaRound since we cannot access old blocks on simulated blockchain + defer stopMining() - linkFeedAddr, _, _, err := mock_v3_aggregator_contract.DeployMockV3AggregatorContract(registryOwner, backend, 18, big.NewInt(2000000000000000000)) - require.NoError(t, err) + // Deploy registry + linkAddr, _, linkToken, err := link_token_interface.DeployLinkToken(linkOwner, backend) + require.NoError(t, err) - registry := deployKeeper21Registry(t, registryOwner, backend, linkAddr, linkFeedAddr, gasFeedAddr) + gasFeedAddr, _, _, err := mock_v3_aggregator_contract.DeployMockV3AggregatorContract(registryOwner, backend, 18, big.NewInt(60000000000)) + require.NoError(t, err) - _, mercuryServer := setupNodes(t, nodeKeys, registry, backend, registryOwner) + linkFeedAddr, _, _, err := mock_v3_aggregator_contract.DeployMockV3AggregatorContract(registryOwner, backend, 18, big.NewInt(2000000000000000000)) + require.NoError(t, err) - const upkeepCount = 10 - const mercuryFailCount = upkeepCount * 3 * 2 + registry := deployKeeper21Registry(t, registryOwner, backend, linkAddr, linkFeedAddr, gasFeedAddr) - // testing with the mercury server involves mocking responses. currently, - // there is not a way to connect a mercury call to an upkeep id (though we - // could add custom headers) so the test must be fairly basic and just - // count calls before switching to successes - var ( - mu sync.Mutex - count int - ) + _, mercuryServer := setupNodes(t, nodeKeys, registry, backend, registryOwner, tc.logBufferVersion == logprovider.BufferVersionV1) - mercuryServer.RegisterHandler(func(w http.ResponseWriter, r *http.Request) { - mu.Lock() - defer mu.Unlock() + const upkeepCount = 10 + const mercuryFailCount = upkeepCount * 3 * 2 - count++ + // testing with the mercury server involves mocking responses. currently, + // there is not a way to connect a mercury call to an upkeep id (though we + // could add custom headers) so the test must be fairly basic and just + // count calls before switching to successes + var ( + mu sync.Mutex + count int + ) - _ = r.ParseForm() + mercuryServer.RegisterHandler(func(w http.ResponseWriter, r *http.Request) { + mu.Lock() + defer mu.Unlock() - t.Logf("MercuryHTTPServe:RequestURI: %s", r.RequestURI) + count++ - for key, value := range r.Form { - t.Logf("MercuryHTTPServe:FormValue: key: %s; value: %s;", key, value) - } + _ = r.ParseForm() - // the streams lookup retries against the remote server 3 times before - // returning a result as retryable. - // the simulation here should force the streams lookup process to return - // retryable 2 times. - // the total count of failures should be (upkeepCount * 3 * tryCount) - if count <= mercuryFailCount { - w.WriteHeader(http.StatusNotFound) + t.Logf("MercuryHTTPServe:RequestURI: %s", r.RequestURI) - return - } + for key, value := range r.Form { + t.Logf("MercuryHTTPServe:FormValue: key: %s; value: %s;", key, value) + } - // start sending success messages - output := `{"chainlinkBlob":"0x0001c38d71fed6c320b90e84b6f559459814d068e2a1700adc931ca9717d4fe70000000000000000000000000000000000000000000000000000000001a80b52b4bf1233f9cb71144a253a1791b202113c4ab4a92fa1b176d684b4959666ff8200000000000000000000000000000000000000000000000000000000000000e000000000000000000000000000000000000000000000000000000000000002000000000000000000000000000000000000000000000000000000000000000260000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000001004254432d5553442d415242495452554d2d544553544e4554000000000000000000000000000000000000000000000000000000000000000000000000645570be000000000000000000000000000000000000000000000000000002af2b818dc5000000000000000000000000000000000000000000000000000002af2426faf3000000000000000000000000000000000000000000000000000002af32dc209700000000000000000000000000000000000000000000000000000000012130f8df0a9745bb6ad5e2df605e158ba8ad8a33ef8a0acf9851f0f01668a3a3f2b68600000000000000000000000000000000000000000000000000000000012130f60000000000000000000000000000000000000000000000000000000000000002c4a7958dce105089cf5edb68dad7dcfe8618d7784eb397f97d5a5fade78c11a58275aebda478968e545f7e3657aba9dcbe8d44605e4c6fde3e24edd5e22c94270000000000000000000000000000000000000000000000000000000000000002459c12d33986018a8959566d145225f0c4a4e61a9a3f50361ccff397899314f0018162cf10cd89897635a0bb62a822355bd199d09f4abe76e4d05261bb44733d"}` + // the streams lookup retries against the remote server 3 times before + // returning a result as retryable. + // the simulation here should force the streams lookup process to return + // retryable 2 times. + // the total count of failures should be (upkeepCount * 3 * tryCount) + if count <= mercuryFailCount { + w.WriteHeader(http.StatusNotFound) - w.WriteHeader(http.StatusOK) - _, _ = w.Write([]byte(output)) - }) + return + } - defer mercuryServer.Stop() + // start sending success messages + output := `{"chainlinkBlob":"0x0001c38d71fed6c320b90e84b6f559459814d068e2a1700adc931ca9717d4fe70000000000000000000000000000000000000000000000000000000001a80b52b4bf1233f9cb71144a253a1791b202113c4ab4a92fa1b176d684b4959666ff8200000000000000000000000000000000000000000000000000000000000000e000000000000000000000000000000000000000000000000000000000000002000000000000000000000000000000000000000000000000000000000000000260000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000001004254432d5553442d415242495452554d2d544553544e4554000000000000000000000000000000000000000000000000000000000000000000000000645570be000000000000000000000000000000000000000000000000000002af2b818dc5000000000000000000000000000000000000000000000000000002af2426faf3000000000000000000000000000000000000000000000000000002af32dc209700000000000000000000000000000000000000000000000000000000012130f8df0a9745bb6ad5e2df605e158ba8ad8a33ef8a0acf9851f0f01668a3a3f2b68600000000000000000000000000000000000000000000000000000000012130f60000000000000000000000000000000000000000000000000000000000000002c4a7958dce105089cf5edb68dad7dcfe8618d7784eb397f97d5a5fade78c11a58275aebda478968e545f7e3657aba9dcbe8d44605e4c6fde3e24edd5e22c94270000000000000000000000000000000000000000000000000000000000000002459c12d33986018a8959566d145225f0c4a4e61a9a3f50361ccff397899314f0018162cf10cd89897635a0bb62a822355bd199d09f4abe76e4d05261bb44733d"}` - _, err = linkToken.Transfer(linkOwner, upkeepOwner.From, big.NewInt(0).Mul(oneHunEth, big.NewInt(int64(upkeepCount+1)))) - require.NoError(t, err) + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte(output)) + }) - backend.Commit() + defer mercuryServer.Stop() - feeds, err := newFeedLookupUpkeepController(backend, registryOwner) - require.NoError(t, err, "no error expected from creating a feed lookup controller") + _, err = linkToken.Transfer(linkOwner, upkeepOwner.From, big.NewInt(0).Mul(oneHunEth, big.NewInt(int64(upkeepCount+1)))) + require.NoError(t, err) - // deploy multiple upkeeps that listen to a log emitter and need to be - // performed for each log event - _ = feeds.DeployUpkeeps(t, backend, upkeepOwner, upkeepCount, func(int) bool { - return false - }) - _ = feeds.RegisterAndFund(t, registry, registryOwner, backend, linkToken) - _ = feeds.EnableMercury(t, backend, registry, registryOwner) - _ = feeds.VerifyEnv(t, backend, registry, registryOwner) + backend.Commit() - // start emitting events in a separate go-routine - // feed lookup relies on a single contract event log to perform multiple - // listener contracts - go func() { - // only 1 event is necessary to make all 10 upkeeps eligible - _ = feeds.EmitEvents(t, backend, 1, func() { - // pause per emit for expected block production time - time.Sleep(3 * time.Second) + feeds, err := newFeedLookupUpkeepController(backend, registryOwner) + require.NoError(t, err, "no error expected from creating a feed lookup controller") + + // deploy multiple upkeeps that listen to a log emitter and need to be + // performed for each log event + _ = feeds.DeployUpkeeps(t, backend, upkeepOwner, upkeepCount, func(int) bool { + return false + }) + _ = feeds.RegisterAndFund(t, registry, registryOwner, backend, linkToken) + _ = feeds.EnableMercury(t, backend, registry, registryOwner) + _ = feeds.VerifyEnv(t, backend, registry, registryOwner) + + // start emitting events in a separate go-routine + // feed lookup relies on a single contract event log to perform multiple + // listener contracts + go func() { + // only 1 event is necessary to make all 10 upkeeps eligible + _ = feeds.EmitEvents(t, backend, 1, func() { + // pause per emit for expected block production time + time.Sleep(3 * time.Second) + }) + }() + + listener, done := listenPerformed(t, backend, registry, feeds.UpkeepsIds(), int64(1)) + defer done() + g.Eventually(listener, testutils.WaitTimeout(t)-(5*time.Second), cltest.DBPollingInterval).Should(gomega.BeTrue()) }) - }() - - listener, done := listenPerformed(t, backend, registry, feeds.UpkeepsIds(), int64(1)) - g.Eventually(listener, testutils.WaitTimeout(t)-(5*time.Second), cltest.DBPollingInterval).Should(gomega.BeTrue()) - - done() + } } func TestIntegration_KeeperPluginLogUpkeep_ErrHandler(t *testing.T) { - g := gomega.NewWithT(t) - - // setup blockchain - linkOwner := testutils.MustNewSimTransactor(t) // owns all the link - registryOwner := testutils.MustNewSimTransactor(t) // registry owner - upkeepOwner := testutils.MustNewSimTransactor(t) // upkeep owner - genesisData := core.GenesisAlloc{ - linkOwner.From: {Balance: assets.Ether(10000).ToInt()}, - registryOwner.From: {Balance: assets.Ether(10000).ToInt()}, - upkeepOwner.From: {Balance: assets.Ether(10000).ToInt()}, + tests := []struct { + name string + logBufferVersion logprovider.BufferVersion + }{ + { + name: "default buffer", + logBufferVersion: logprovider.BufferVersionDefault, + }, + { + name: "buffer v1", + logBufferVersion: logprovider.BufferVersionV1, + }, } - // Generate 5 keys for nodes (1 bootstrap + 4 ocr nodes) and fund them with ether - var nodeKeys [5]ethkey.KeyV2 - for i := int64(0); i < 5; i++ { - nodeKeys[i] = cltest.MustGenerateRandomKey(t) - genesisData[nodeKeys[i].Address] = core.GenesisAccount{Balance: assets.Ether(1000).ToInt()} - } + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + g := gomega.NewWithT(t) + + // setup blockchain + linkOwner := testutils.MustNewSimTransactor(t) // owns all the link + registryOwner := testutils.MustNewSimTransactor(t) // registry owner + upkeepOwner := testutils.MustNewSimTransactor(t) // upkeep owner + genesisData := core.GenesisAlloc{ + linkOwner.From: {Balance: assets.Ether(10000).ToInt()}, + registryOwner.From: {Balance: assets.Ether(10000).ToInt()}, + upkeepOwner.From: {Balance: assets.Ether(10000).ToInt()}, + } - backend := cltest.NewSimulatedBackend(t, genesisData, uint32(ethconfig.Defaults.Miner.GasCeil)) - stopMining := cltest.Mine(backend, 3*time.Second) // Should be greater than deltaRound since we cannot access old blocks on simulated blockchain - defer stopMining() + // Generate 5 keys for nodes (1 bootstrap + 4 ocr nodes) and fund them with ether + var nodeKeys [5]ethkey.KeyV2 + for i := int64(0); i < 5; i++ { + nodeKeys[i] = cltest.MustGenerateRandomKey(t) + genesisData[nodeKeys[i].Address] = core.GenesisAccount{Balance: assets.Ether(1000).ToInt()} + } - // Deploy registry - linkAddr, _, linkToken, err := link_token_interface.DeployLinkToken(linkOwner, backend) - require.NoError(t, err) + backend := cltest.NewSimulatedBackend(t, genesisData, uint32(ethconfig.Defaults.Miner.GasCeil)) + stopMining := cltest.Mine(backend, 3*time.Second) // Should be greater than deltaRound since we cannot access old blocks on simulated blockchain + defer stopMining() - gasFeedAddr, _, _, err := mock_v3_aggregator_contract.DeployMockV3AggregatorContract(registryOwner, backend, 18, big.NewInt(60000000000)) - require.NoError(t, err) + // Deploy registry + linkAddr, _, linkToken, err := link_token_interface.DeployLinkToken(linkOwner, backend) + require.NoError(t, err) - linkFeedAddr, _, _, err := mock_v3_aggregator_contract.DeployMockV3AggregatorContract(registryOwner, backend, 18, big.NewInt(2000000000000000000)) - require.NoError(t, err) + gasFeedAddr, _, _, err := mock_v3_aggregator_contract.DeployMockV3AggregatorContract(registryOwner, backend, 18, big.NewInt(60000000000)) + require.NoError(t, err) - registry := deployKeeper21Registry(t, registryOwner, backend, linkAddr, linkFeedAddr, gasFeedAddr) + linkFeedAddr, _, _, err := mock_v3_aggregator_contract.DeployMockV3AggregatorContract(registryOwner, backend, 18, big.NewInt(2000000000000000000)) + require.NoError(t, err) - _, mercuryServer := setupNodes(t, nodeKeys, registry, backend, registryOwner) + registry := deployKeeper21Registry(t, registryOwner, backend, linkAddr, linkFeedAddr, gasFeedAddr) - upkeepCount := 10 + _, mercuryServer := setupNodes(t, nodeKeys, registry, backend, registryOwner, tc.logBufferVersion == logprovider.BufferVersionV1) - errResponses := []int{ - http.StatusUnauthorized, - http.StatusBadRequest, - http.StatusInternalServerError, - } - startMercuryServer(t, mercuryServer, func(i int) (int, []byte) { - var resp int - if i < len(errResponses) { - resp = errResponses[i] - } - if resp == 0 { - resp = http.StatusNotFound - } - return resp, nil - }) - defer mercuryServer.Stop() + upkeepCount := 10 - _, err = linkToken.Transfer(linkOwner, upkeepOwner.From, big.NewInt(0).Mul(oneHunEth, big.NewInt(int64(upkeepCount+1)))) - require.NoError(t, err) + errResponses := []int{ + http.StatusUnauthorized, + http.StatusBadRequest, + http.StatusInternalServerError, + http.StatusNotFound, + http.StatusNotFound, + http.StatusNotFound, + http.StatusUnauthorized, + } + startMercuryServer(t, mercuryServer, func(i int) (int, []byte) { + var resp int + if i < len(errResponses) { + resp = errResponses[i] + } + if resp == 0 { + resp = http.StatusNotFound + } + return resp, nil + }) + defer mercuryServer.Stop() - backend.Commit() + _, err = linkToken.Transfer(linkOwner, upkeepOwner.From, big.NewInt(0).Mul(oneHunEth, big.NewInt(int64(upkeepCount+1)))) + require.NoError(t, err) - feeds, err := newFeedLookupUpkeepController(backend, registryOwner) - require.NoError(t, err, "no error expected from creating a feed lookup controller") + backend.Commit() - // deploy multiple upkeeps that listen to a log emitter and need to be - // performed for each log event - checkResultsProvider := func(i int) bool { - return i%2 == 1 - } - require.NoError(t, feeds.DeployUpkeeps(t, backend, upkeepOwner, upkeepCount, checkResultsProvider)) - require.NoError(t, feeds.RegisterAndFund(t, registry, registryOwner, backend, linkToken)) - require.NoError(t, feeds.EnableMercury(t, backend, registry, registryOwner)) - require.NoError(t, feeds.VerifyEnv(t, backend, registry, registryOwner)) - - startBlock := backend.Blockchain().CurrentBlock().Number.Int64() - // start emitting events in a separate go-routine - // feed lookup relies on a single contract event log to perform multiple - // listener contracts - go func() { - // only 1 event is necessary to make all 10 upkeeps eligible - _ = feeds.EmitEvents(t, backend, 1, func() { - // pause per emit for expected block production time - time.Sleep(3 * time.Second) - }) - }() + feeds, err := newFeedLookupUpkeepController(backend, registryOwner) + require.NoError(t, err, "no error expected from creating a feed lookup controller") - go makeDummyBlocks(t, backend, 3*time.Second, 1000) + // deploy multiple upkeeps that listen to a log emitter and need to be + // performed for each log event + checkResultsProvider := func(i int) bool { + return i%2 == 1 + } + require.NoError(t, feeds.DeployUpkeeps(t, backend, upkeepOwner, upkeepCount, checkResultsProvider)) + require.NoError(t, feeds.RegisterAndFund(t, registry, registryOwner, backend, linkToken)) + require.NoError(t, feeds.EnableMercury(t, backend, registry, registryOwner)) + require.NoError(t, feeds.VerifyEnv(t, backend, registry, registryOwner)) + + startBlock := backend.Blockchain().CurrentBlock().Number.Int64() + // start emitting events in a separate go-routine + // feed lookup relies on a single contract event log to perform multiple + // listener contracts + go func() { + // only 1 event is necessary to make all 10 upkeeps eligible + _ = feeds.EmitEvents(t, backend, 1, func() { + // pause per emit for expected block production time + time.Sleep(3 * time.Second) + }) + }() + + go makeDummyBlocks(t, backend, 3*time.Second, 1000) + + idsToCheck := make([]*big.Int, 0) + for i, uid := range feeds.UpkeepsIds() { + if checkResultsProvider(i) { + idsToCheck = append(idsToCheck, uid) + } + } - idsToCheck := make([]*big.Int, 0) - for i, uid := range feeds.UpkeepsIds() { - if checkResultsProvider(i) { - idsToCheck = append(idsToCheck, uid) - } + listener, done := listenPerformed(t, backend, registry, idsToCheck, startBlock) + defer done() + g.Eventually(listener, testutils.WaitTimeout(t)-(5*time.Second), cltest.DBPollingInterval).Should(gomega.BeTrue()) + }) } - - listener, done := listenPerformed(t, backend, registry, idsToCheck, startBlock) - g.Eventually(listener, testutils.WaitTimeout(t)-(5*time.Second), cltest.DBPollingInterval).Should(gomega.BeTrue()) - done() } func startMercuryServer(t *testing.T, mercuryServer *mercury.SimulatedMercuryServer, responder func(i int) (int, []byte)) { @@ -586,7 +644,7 @@ func listenPerformed(t *testing.T, backend *backends.SimulatedBackend, registry return listenPerformedN(t, backend, registry, ids, startBlock, 0) } -func setupNodes(t *testing.T, nodeKeys [5]ethkey.KeyV2, registry *iregistry21.IKeeperRegistryMaster, backend *backends.SimulatedBackend, usr *bind.TransactOpts) ([]Node, *mercury.SimulatedMercuryServer) { +func setupNodes(t *testing.T, nodeKeys [5]ethkey.KeyV2, registry *iregistry21.IKeeperRegistryMaster, backend *backends.SimulatedBackend, usr *bind.TransactOpts, useBufferV1 bool) ([]Node, *mercury.SimulatedMercuryServer) { lggr := logger.TestLogger(t) mServer := mercury.NewSimulatedMercuryServer() mServer.Start() @@ -660,7 +718,8 @@ func setupNodes(t *testing.T, nodeKeys [5]ethkey.KeyV2, registry *iregistry21.IK cacheEvictionInterval = "1s" mercuryCredentialName = "%s" contractVersion = "v2.1" - `, i, registry.Address(), node.KeyBundle.ID(), node.Transmitter, fmt.Sprintf("%s@127.0.0.1:%d", bootstrapPeerID, bootstrapNodePort), MercuryCredName)) + useBufferV1 = %v + `, i, registry.Address(), node.KeyBundle.ID(), node.Transmitter, fmt.Sprintf("%s@127.0.0.1:%d", bootstrapPeerID, bootstrapNodePort), MercuryCredName, useBufferV1)) } // Setup config on contract