Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BCI-3492 [LogPoller]: Allow withObservedExecAndRowsAffected to report non-zero rows affected #14057

Merged
merged 5 commits into from
Aug 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/sweet-pumas-refuse.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": minor
---

#bugfix Addresses 2 minor issues with the pruning of LogPoller's db tables: logs not matching any filter will now be pruned, and rows deleted are now properly reported for observability
2 changes: 1 addition & 1 deletion core/chains/evm/logpoller/observability.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ func withObservedExecAndRowsAffected(o *ObservedORM, queryName string, queryType
WithLabelValues(o.chainId, queryName, string(queryType)).
Observe(float64(time.Since(queryStarted)))

if err != nil {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice catch!

if err == nil {
o.datasetSize.
WithLabelValues(o.chainId, queryName, string(queryType)).
Set(float64(rowsAffected))
Expand Down
11 changes: 11 additions & 0 deletions core/chains/evm/logpoller/observability_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/prometheus/client_golang/prometheus/testutil"

"github.com/smartcontractkit/chainlink-common/pkg/logger"

"github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils"
ubig "github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils/big"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils"
Expand Down Expand Up @@ -117,6 +118,16 @@ func TestCountersAreProperlyPopulatedForWrites(t *testing.T) {
assert.Equal(t, float64(20), testutil.ToFloat64(orm.logsInserted.WithLabelValues("420")))
assert.Equal(t, float64(2), testutil.ToFloat64(orm.blocksInserted.WithLabelValues("420")))

rowsAffected, err := orm.DeleteExpiredLogs(ctx, 3)
require.NoError(t, err)
require.Equal(t, int64(3), rowsAffected)
assert.Equal(t, 3, counterFromGaugeByLabels(orm.datasetSize, "420", "DeleteExpiredLogs", "delete"))

rowsAffected, err = orm.DeleteBlocksBefore(ctx, 30, 0)
require.NoError(t, err)
require.Equal(t, int64(2), rowsAffected)
assert.Equal(t, 2, counterFromGaugeByLabels(orm.datasetSize, "420", "DeleteBlocksBefore", "delete"))

// Don't update counters in case of an error
require.Error(t, orm.InsertLogsWithBlock(ctx, logs, NewLogPollerBlock(utils.RandomBytes32(), 0, time.Now(), 0)))
assert.Equal(t, float64(20), testutil.ToFloat64(orm.logsInserted.WithLabelValues("420")))
Expand Down
30 changes: 13 additions & 17 deletions core/chains/evm/logpoller/orm.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/smartcontractkit/chainlink-common/pkg/types/query"

"github.com/smartcontractkit/chainlink-common/pkg/sqlutil"

evmtypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types"

"github.com/smartcontractkit/chainlink/v2/core/chains/evm/types"
Expand Down Expand Up @@ -313,34 +314,29 @@ type Exp struct {
ShouldDelete bool
}

// DeleteExpiredLogs removes any logs which either:
// - don't match any currently registered filters, or
// - have a timestamp older than any matching filter's retention, UNLESS there is at
// least one matching filter with retention=0
func (o *DSORM) DeleteExpiredLogs(ctx context.Context, limit int64) (int64, error) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to accidentally remove logs when job is updated? Let's imagine the following scenario

  • We have a job, that registered multiple LogPoller filters during the init
  • CLO proposes a job replacement, a slight change to the job definition, but contracts stay the same so filters would be the same, so the logs required for plugin to work
  • NOP accepts the job which triggers removing the old job and creating a new one. It includes unregistering old filters and registering new ones
    • In the meantime LogPoller PruneExpiredLogs kicks in trying to remove stale logs. It sees a new set of "abandoned" logs, because filters were just removed. It removes these logs immediately.
  • Plugin init registers new filters (which are the same), but logs are gone so we might need to run replay

I wonder if this is something that could happen or impact the system. How does it work for other products? What if job is removed and added in a two way process? (via API). AFAIK CLO does this entire flow in a large single db transaction, but I guess it might change at some point because of the loopps architecture, right?

Copy link
Contributor Author

@reductionista reductionista Aug 7, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question! I'm pretty sure it's okay for now, since as you say CLO does the whole flow in a single db transaction. Someone could manually remove a job, and then add a similar one later... but I think it would be a natural user expectation to assume that there are no guarantees about events the old job cared about still being there for the new job to use. If CLO changes so that it's no longer in a single db transaction, that seems more problematic since it's presented to the user as an "Update" rather than a "Remove" followed by a separate "Create". But I think this would cause similar problems with or without this change. One way or another, we'll have to solve some issues if we can no longer guarantee the ability for CLO to make atomic job updates.

Plugin init registers new filters (which are the same), but logs are gone so we might need to run replay

Any plugin which cares about past logs already does a replay to the appropriate block when it's initialized. (For example, going back to pick up an OCR2 configuration event.) So I think the only issue is with missing logs that get emitted around the time the old job is getting swapped out with the new job. But even without this change, there is a potential for missing some logs while that's happening. This would just increase the likelihood of it happening, since it would apply not just to events emitted between the delete and create operations but also those emitted shortly before the delete where the old job hadn't had time to process them before being deleted.

One way I can think of for how we might be able to solve this issue is with ChainReader. If it can expose a method for updating a filter that does UnregisterFilter and then RegisterFilter within the same db transaction, that should solve it I think. We might already even be doing this with Bind(), I'm not sure but @ilija42 should know.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good. One thing that comes to my mind that might work as an additional safety check is to wait for some "sane" threshold before deleting logs without the filter. When retention is null we match only logs older than X days. Not sure what that threshold should be (maybe configurable?), but that might prevent us from accidentally losing logs right after deleting the job in some weird way. WDYT?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I was thinking that might be what we have to do, if we can't solve things with ChainReader.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I just thought of a simple solution which I think would be pretty robust: in UnregisterFilter(), we could return successful but delay the actual removal of the filter by 1 hour (or any long enough time, it could even be 24 hours but that seems like overkill). As long as any call to RegisterFilter() arrives within that 1 hour grace period that matches the logs that would have been deleted, there is no time at which the pruner would be allowed to delete them.

var err error
var result sql.Result
if limit > 0 {
result, err = o.ds.ExecContext(ctx, `
DELETE FROM evm.logs
query := `DELETE FROM evm.logs
WHERE (evm_chain_id, address, event_sig, block_number) IN (
SELECT l.evm_chain_id, l.address, l.event_sig, l.block_number
FROM evm.logs l
INNER JOIN (
SELECT address, event, MAX(retention) AS retention
LEFT JOIN (
SELECT address, event, CASE WHEN MIN(retention) = 0 THEN 0 ELSE MAX(retention) END AS retention
FROM evm.log_poller_filters
WHERE evm_chain_id = $1
GROUP BY evm_chain_id, address, event
HAVING NOT 0 = ANY(ARRAY_AGG(retention))
) r ON l.evm_chain_id = $1 AND l.address = r.address AND l.event_sig = r.event
AND l.block_timestamp <= STATEMENT_TIMESTAMP() - (r.retention / 10^9 * interval '1 second')
LIMIT $2
)`, ubig.New(o.chainID), limit)
WHERE r.retention IS NULL OR (r.retention != 0 AND l.block_timestamp <= STATEMENT_TIMESTAMP() - (r.retention / 10^9 * interval '1 second')) %s)`

if limit > 0 {
result, err = o.ds.ExecContext(ctx, fmt.Sprintf(query, "LIMIT $2"), ubig.New(o.chainID), limit)
} else {
result, err = o.ds.ExecContext(ctx, `WITH r AS
( SELECT address, event, MAX(retention) AS retention
FROM evm.log_poller_filters WHERE evm_chain_id=$1
GROUP BY evm_chain_id,address, event HAVING NOT 0 = ANY(ARRAY_AGG(retention))
) DELETE FROM evm.logs l USING r
WHERE l.evm_chain_id = $1 AND l.address=r.address AND l.event_sig=r.event
AND l.block_timestamp <= STATEMENT_TIMESTAMP() - (r.retention / 10^9 * interval '1 second')`, // retention is in nanoseconds (time.Duration aka BIGINT)
ubig.New(o.chainID))
result, err = o.ds.ExecContext(ctx, fmt.Sprintf(query, ""), ubig.New(o.chainID))
}

if err != nil {
Expand Down
13 changes: 7 additions & 6 deletions core/chains/evm/logpoller/orm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -457,20 +457,21 @@ func TestORM(t *testing.T) {
time.Sleep(2 * time.Millisecond) // just in case we haven't reached the end of the 1ms retention period
deleted, err := o1.DeleteExpiredLogs(ctx, 0)
require.NoError(t, err)
assert.Equal(t, int64(1), deleted)
assert.Equal(t, int64(4), deleted)

logs, err = o1.SelectLogsByBlockRange(ctx, 1, latest.BlockNumber)
require.NoError(t, err)
// The only log which should be deleted is the one which matches filter1 (ret=1ms) but not filter12 (ret=1 hour)
// Importantly, it shouldn't delete any logs matching only filter0 (ret=0 meaning permanent retention). Anything
// matching filter12 should be kept regardless of what other filters it matches.
assert.Len(t, logs, 7)
// It should have retained the log matching filter0 (due to ret=0 meaning permanent retention) as well as all
// 3 logs matching filter12 (ret=1 hour). It should have deleted 3 logs not matching any filter, as well as 1
// of the 2 logs matching filter1 (ret=1ms)--the one that doesn't also match filter12.
assert.Len(t, logs, 4)

// Delete logs after should delete all logs.
err = o1.DeleteLogsAndBlocksAfter(ctx, 1)
require.NoError(t, err)
logs, err = o1.SelectLogsByBlockRange(ctx, 1, latest.BlockNumber)
require.NoError(t, err)
require.Zero(t, len(logs))
assert.Zero(t, len(logs))
}

type PgxLogger struct {
Expand Down
Loading