Skip to content

Commit

Permalink
core/services/chainlink: start using sqlutil.DB instead of pg.Q (#12386)
Browse files Browse the repository at this point in the history
  • Loading branch information
jmank88 committed Mar 13, 2024
1 parent 8823302 commit 942f687
Show file tree
Hide file tree
Showing 47 changed files with 214 additions and 180 deletions.
5 changes: 2 additions & 3 deletions core/chains/evm/forwarders/forwarder_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,9 @@ import (
"github.com/ethereum/go-ethereum/core/types"
pkgerrors "github.com/pkg/errors"

"github.com/jmoiron/sqlx"

"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/services"
"github.com/smartcontractkit/chainlink-common/pkg/sqlutil"
"github.com/smartcontractkit/chainlink-common/pkg/utils"

evmclient "github.com/smartcontractkit/chainlink/v2/core/chains/evm/client"
Expand Down Expand Up @@ -55,7 +54,7 @@ type FwdMgr struct {
wg sync.WaitGroup
}

func NewFwdMgr(db *sqlx.DB, client evmclient.Client, logpoller evmlogpoller.LogPoller, l logger.Logger, cfg Config) *FwdMgr {
func NewFwdMgr(db sqlutil.DB, client evmclient.Client, logpoller evmlogpoller.LogPoller, l logger.Logger, cfg Config) *FwdMgr {
lggr := logger.Sugared(logger.Named(l, "EVMForwarderManager"))
fwdMgr := FwdMgr{
logger: lggr,
Expand Down
2 changes: 1 addition & 1 deletion core/chains/evm/forwarders/forwarder_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func TestFwdMgr_MaybeForwardTransaction(t *testing.T) {
require.NoError(t, err)

cleanupCalled := false
cleanup := func(tx sqlutil.Queryer, evmChainId int64, addr common.Address) error {
cleanup := func(tx sqlutil.DB, evmChainId int64, addr common.Address) error {
require.Equal(t, testutils.FixtureChainID.Int64(), evmChainId)
require.Equal(t, forwarderAddr, addr)
require.NotNil(t, tx)
Expand Down
4 changes: 2 additions & 2 deletions core/chains/evm/forwarders/mocks/orm.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 5 additions & 5 deletions core/chains/evm/forwarders/orm.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,17 @@ type ORM interface {
CreateForwarder(ctx context.Context, addr common.Address, evmChainId big.Big) (fwd Forwarder, err error)
FindForwarders(ctx context.Context, offset, limit int) ([]Forwarder, int, error)
FindForwardersByChain(ctx context.Context, evmChainId big.Big) ([]Forwarder, error)
DeleteForwarder(ctx context.Context, id int64, cleanup func(tx sqlutil.Queryer, evmChainId int64, addr common.Address) error) error
DeleteForwarder(ctx context.Context, id int64, cleanup func(tx sqlutil.DB, evmChainId int64, addr common.Address) error) error
FindForwardersInListByChain(ctx context.Context, evmChainId big.Big, addrs []common.Address) ([]Forwarder, error)
}

type DbORM struct {
db sqlutil.Queryer
db sqlutil.DB
}

var _ ORM = &DbORM{}

func NewORM(db sqlutil.Queryer) *DbORM {
func NewORM(db sqlutil.DB) *DbORM {
return &DbORM{db: db}
}

Expand All @@ -38,7 +38,7 @@ func (o *DbORM) Transaction(ctx context.Context, fn func(*DbORM) error) (err err
}

// new returns a NewORM like o, but backed by q.
func (o *DbORM) new(q sqlutil.Queryer) *DbORM { return NewORM(q) }
func (o *DbORM) new(q sqlutil.DB) *DbORM { return NewORM(q) }

// CreateForwarder creates the Forwarder address associated with the current EVM chain id.
func (o *DbORM) CreateForwarder(ctx context.Context, addr common.Address, evmChainId big.Big) (fwd Forwarder, err error) {
Expand All @@ -50,7 +50,7 @@ func (o *DbORM) CreateForwarder(ctx context.Context, addr common.Address, evmCha
// DeleteForwarder removes a forwarder address.
// If cleanup is non-nil, it can be used to perform any chain- or contract-specific cleanup that need to happen atomically
// on forwarder deletion. If cleanup returns an error, forwarder deletion will be aborted.
func (o *DbORM) DeleteForwarder(ctx context.Context, id int64, cleanup func(tx sqlutil.Queryer, evmChainID int64, addr common.Address) error) (err error) {
func (o *DbORM) DeleteForwarder(ctx context.Context, id int64, cleanup func(tx sqlutil.DB, evmChainID int64, addr common.Address) error) (err error) {
return o.Transaction(ctx, func(orm *DbORM) error {
var dest struct {
EvmChainId int64
Expand Down
4 changes: 2 additions & 2 deletions core/chains/evm/forwarders/orm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (

type TestORM struct {
ORM
db sqlutil.Queryer
db sqlutil.DB
}

func setupORM(t *testing.T) *TestORM {
Expand Down Expand Up @@ -54,7 +54,7 @@ func Test_DeleteForwarder(t *testing.T) {
rets := []error{ErrCleaningUp, nil, nil, ErrCleaningUp}
expected := []error{ErrCleaningUp, nil, sql.ErrNoRows, sql.ErrNoRows}

testCleanupFn := func(q sqlutil.Queryer, evmChainID int64, addr common.Address) error {
testCleanupFn := func(q sqlutil.DB, evmChainID int64, addr common.Address) error {
require.Less(t, cleanupCalled, len(rets))
cleanupCalled++
return rets[cleanupCalled-1]
Expand Down
4 changes: 2 additions & 2 deletions core/chains/evm/headtracker/orm.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,11 @@ var _ ORM = &DbORM{}

type DbORM struct {
chainID ubig.Big
db sqlutil.Queryer
db sqlutil.DB
}

// NewORM creates an ORM scoped to chainID.
func NewORM(chainID big.Int, db sqlutil.Queryer) *DbORM {
func NewORM(chainID big.Int, db sqlutil.DB) *DbORM {
return &DbORM{
chainID: ubig.Big(chainID),
db: db,
Expand Down
4 changes: 2 additions & 2 deletions core/chains/evm/logpoller/observability.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@ import (
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/jmoiron/sqlx"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"

"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/sqlutil"
)

type queryType string
Expand Down Expand Up @@ -76,7 +76,7 @@ type ObservedORM struct {

// NewObservedORM creates an observed version of log poller's ORM created by NewORM
// Please see ObservedLogPoller for more details on how latencies are measured
func NewObservedORM(chainID *big.Int, db *sqlx.DB, lggr logger.Logger) *ObservedORM {
func NewObservedORM(chainID *big.Int, db sqlutil.DB, lggr logger.Logger) *ObservedORM {
return &ObservedORM{
ORM: NewORM(chainID, db, lggr),
queryDuration: lpQueryDuration,
Expand Down
13 changes: 6 additions & 7 deletions core/chains/evm/logpoller/orm.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/jmoiron/sqlx"
pkgerrors "github.com/pkg/errors"

"github.com/smartcontractkit/chainlink-common/pkg/logger"
Expand Down Expand Up @@ -62,14 +61,14 @@ type ORM interface {

type DbORM struct {
chainID *big.Int
db sqlutil.Queryer
db sqlutil.DB
lggr logger.Logger
}

var _ ORM = &DbORM{}

// NewORM creates an DbORM scoped to chainID.
func NewORM(chainID *big.Int, db sqlutil.Queryer, lggr logger.Logger) *DbORM {
func NewORM(chainID *big.Int, db sqlutil.DB, lggr logger.Logger) *DbORM {
return &DbORM{
chainID: chainID,
db: db,
Expand All @@ -82,7 +81,7 @@ func (o *DbORM) Transaction(ctx context.Context, fn func(*DbORM) error) (err err
}

// new returns a NewORM like o, but backed by q.
func (o *DbORM) new(q sqlutil.Queryer) *DbORM { return NewORM(o.chainID, q, o.lggr) }
func (o *DbORM) new(q sqlutil.DB) *DbORM { return NewORM(o.chainID, q, o.lggr) }

// InsertBlock is idempotent to support replays.
func (o *DbORM) InsertBlock(ctx context.Context, blockHash common.Hash, blockNumber int64, blockTimestamp time.Time, finalizedBlock int64) error {
Expand Down Expand Up @@ -333,7 +332,7 @@ func (o *DbORM) InsertLogs(ctx context.Context, logs []Log) error {
return err
}
return o.Transaction(ctx, func(orm *DbORM) error {
return orm.insertLogsWithinTx(ctx, logs, orm.db.(*sqlx.Tx))
return orm.insertLogsWithinTx(ctx, logs, orm.db)
})
}

Expand All @@ -353,11 +352,11 @@ func (o *DbORM) InsertLogsWithBlock(ctx context.Context, logs []Log, block LogPo
if err != nil {
return err
}
return orm.insertLogsWithinTx(ctx, logs, orm.db.(*sqlx.Tx))
return orm.insertLogsWithinTx(ctx, logs, orm.db)
})
}

func (o *DbORM) insertLogsWithinTx(ctx context.Context, logs []Log, tx sqlutil.Queryer) error {
func (o *DbORM) insertLogsWithinTx(ctx context.Context, logs []Log, tx sqlutil.DB) error {
batchInsertSize := 4000
for i := 0; i < len(logs); i += batchInsertSize {
start, end := i, i+batchInsertSize
Expand Down
6 changes: 4 additions & 2 deletions core/chains/evm/txmgr/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/jmoiron/sqlx"

"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/sqlutil"
"github.com/smartcontractkit/chainlink/v2/common/txmgr"
txmgrtypes "github.com/smartcontractkit/chainlink/v2/common/txmgr/types"
evmclient "github.com/smartcontractkit/chainlink/v2/core/chains/evm/client"
Expand All @@ -20,7 +21,8 @@ import (

// NewTxm constructs the necessary dependencies for the EvmTxm (broadcaster, confirmer, etc) and returns a new EvmTxManager
func NewTxm(
db *sqlx.DB,
sqlxDB *sqlx.DB,
db sqlutil.DB,
chainConfig ChainConfig,
fCfg FeeConfig,
txConfig config.Transactions,
Expand All @@ -44,7 +46,7 @@ func NewTxm(
checker := &CheckerFactory{Client: client}
// create tx attempt builder
txAttemptBuilder := NewEvmTxAttemptBuilder(*client.ConfiguredChainID(), fCfg, keyStore, estimator)
txStore := NewTxStore(db, lggr, dbConfig)
txStore := NewTxStore(sqlxDB, lggr, dbConfig)
txNonceSyncer := NewNonceSyncer(txStore, lggr, client)

txmCfg := NewEvmTxmConfig(chainConfig) // wrap Evm specific config
Expand Down
1 change: 1 addition & 0 deletions core/chains/evm/txmgr/txmgr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ func makeTestEvmTxm(
)

return txmgr.NewTxm(
db,
db,
ccfg,
fcfg,
Expand Down
16 changes: 10 additions & 6 deletions core/chains/legacyevm/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

common "github.com/smartcontractkit/chainlink-common/pkg/chains"
"github.com/smartcontractkit/chainlink-common/pkg/services"
"github.com/smartcontractkit/chainlink-common/pkg/sqlutil"
"github.com/smartcontractkit/chainlink-common/pkg/types"
"github.com/smartcontractkit/chainlink-common/pkg/utils/mailbox"

Expand Down Expand Up @@ -166,7 +167,8 @@ type ChainOpts struct {
MailMon *mailbox.Monitor
GasEstimator gas.EvmFeeEstimator

*sqlx.DB
SqlxDB *sqlx.DB // Deprecated: use DB instead
DB sqlutil.DB

// TODO BCF-2513 remove test code from the API
// Gen-functions are useful for dependency injection by tests
Expand All @@ -187,6 +189,9 @@ func (o ChainOpts) Validate() error {
if o.MailMon == nil {
err = errors.Join(err, errors.New("nil MailMon"))
}
if o.SqlxDB == nil {
err = errors.Join(err, errors.New("nil SqlxDB"))
}
if o.DB == nil {
err = errors.Join(err, errors.New("nil DB"))
}
Expand Down Expand Up @@ -223,14 +228,13 @@ func newChain(ctx context.Context, cfg *evmconfig.ChainScoped, nodes []*toml.Nod
client = opts.GenEthClient(chainID)
}

db := opts.DB
headBroadcaster := headtracker.NewHeadBroadcaster(l)
headSaver := headtracker.NullSaver
var headTracker httypes.HeadTracker
if !cfg.EVMRPCEnabled() {
headTracker = headtracker.NullTracker
} else if opts.GenHeadTracker == nil {
orm := headtracker.NewORM(*chainID, db)
orm := headtracker.NewORM(*chainID, opts.DB)
headSaver = headtracker.NewHeadSaver(l, orm, cfg.EVM(), cfg.EVM().HeadTracker())
headTracker = headtracker.NewHeadTracker(l, client, cfg.EVM(), cfg.EVM().HeadTracker(), headBroadcaster, headSaver, opts.MailMon)
} else {
Expand All @@ -252,12 +256,12 @@ func newChain(ctx context.Context, cfg *evmconfig.ChainScoped, nodes []*toml.Nod
LogPrunePageSize: int64(cfg.EVM().LogPrunePageSize()),
BackupPollerBlockDelay: int64(cfg.EVM().BackupLogPollerBlockDelay()),
}
logPoller = logpoller.NewLogPoller(logpoller.NewObservedORM(chainID, db, l), client, l, lpOpts)
logPoller = logpoller.NewLogPoller(logpoller.NewObservedORM(chainID, opts.DB, l), client, l, lpOpts)
}
}

// note: gas estimator is started as a part of the txm
txm, gasEstimator, err := newEvmTxm(db, cfg.EVM(), cfg.EVMRPCEnabled(), cfg.Database(), cfg.Database().Listener(), client, l, logPoller, opts)
txm, gasEstimator, err := newEvmTxm(opts.SqlxDB, opts.DB, cfg.EVM(), cfg.EVMRPCEnabled(), cfg.Database(), cfg.Database().Listener(), client, l, logPoller, opts)
if err != nil {
return nil, fmt.Errorf("failed to instantiate EvmTxm for chain with ID %s: %w", chainID.String(), err)
}
Expand All @@ -280,7 +284,7 @@ func newChain(ctx context.Context, cfg *evmconfig.ChainScoped, nodes []*toml.Nod
if !cfg.EVMRPCEnabled() {
logBroadcaster = &log.NullBroadcaster{ErrMsg: fmt.Sprintf("Ethereum is disabled for chain %d", chainID)}
} else if opts.GenLogBroadcaster == nil {
logORM := log.NewORM(db, l, cfg.Database(), *chainID)
logORM := log.NewORM(opts.SqlxDB, l, cfg.Database(), *chainID)
logBroadcaster = log.NewBroadcaster(logORM, client, cfg.EVM(), l, highestSeenHead, opts.MailMon)
} else {
logBroadcaster = opts.GenLogBroadcaster(chainID)
Expand Down
1 change: 1 addition & 0 deletions core/chains/legacyevm/chain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ func TestChainOpts_Validate(t *testing.T) {
o := legacyevm.ChainOpts{
AppConfig: tt.fields.AppConfig,
MailMon: tt.fields.MailMon,
SqlxDB: tt.fields.DB,
DB: tt.fields.DB,
}
if err := o.Validate(); (err != nil) != tt.wantErr {
Expand Down
5 changes: 4 additions & 1 deletion core/chains/legacyevm/evm_txm.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

"github.com/jmoiron/sqlx"

"github.com/smartcontractkit/chainlink-common/pkg/sqlutil"
evmclient "github.com/smartcontractkit/chainlink/v2/core/chains/evm/client"
evmconfig "github.com/smartcontractkit/chainlink/v2/core/chains/evm/config"
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/gas"
Expand All @@ -14,7 +15,8 @@ import (
)

func newEvmTxm(
db *sqlx.DB,
sqlxDB *sqlx.DB,
db sqlutil.DB,
cfg evmconfig.EVM,
evmRPCEnabled bool,
databaseConfig txmgr.DatabaseConfig,
Expand Down Expand Up @@ -51,6 +53,7 @@ func newEvmTxm(

if opts.GenTxManager == nil {
txm, err = txmgr.NewTxm(
sqlxDB,
db,
cfg,
txmgr.NewEvmTxmFeeConfig(cfg.GasEstimator()),
Expand Down
4 changes: 2 additions & 2 deletions core/cmd/ocr2vrf_configure_commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ func (s *Shell) ConfigureOCR2VRFNode(c *cli.Context, owner *bind.TransactOpts, e
if err != nil {
return nil, err
}
err = s.authorizeForwarder(c, ldb.DB(), lggr, chainID, ec, owner, sendingKeysAddresses)
err = s.authorizeForwarder(c, ldb.DB(), chainID, ec, owner, sendingKeysAddresses)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -319,7 +319,7 @@ func (s *Shell) appendForwarders(ctx context.Context, chainID int64, ks keystore
return sendingKeys, sendingKeysAddresses, nil
}

func (s *Shell) authorizeForwarder(c *cli.Context, db *sqlx.DB, lggr logger.Logger, chainID int64, ec *ethclient.Client, owner *bind.TransactOpts, sendingKeysAddresses []common.Address) error {
func (s *Shell) authorizeForwarder(c *cli.Context, db *sqlx.DB, chainID int64, ec *ethclient.Client, owner *bind.TransactOpts, sendingKeysAddresses []common.Address) error {
ctx := s.ctx()
// Replace the transmitter ID with the forwarder address.
forwarderAddress := c.String("forwarder-address")
Expand Down
Loading

0 comments on commit 942f687

Please sign in to comment.