Skip to content

Commit

Permalink
calculate tx the lifecycle duration (#839)
Browse files Browse the repository at this point in the history
* feat: add transactions len metrics of block processer

* calculate tx lifecycle median duration

* chore: auto version bump [bot]

* fix

* fix

* fix

* chore: auto version bump [bot]

* address comments

* remove error code

* address comment

* update

* Update core/tx_pool.go

Co-authored-by: colin <102356659+colinlyguo@users.noreply.github.com>

* Update core/types/transaction.go

Co-authored-by: colin <102356659+colinlyguo@users.noreply.github.com>

* Update core/tx_pool.go

Co-authored-by: colin <102356659+colinlyguo@users.noreply.github.com>

* Update core/tx_pool.go

---------

Co-authored-by: georgehao <georgehao@users.noreply.github.com>
Co-authored-by: Ömer Faruk Irmak <omerfirmak@gmail.com>
Co-authored-by: colin <102356659+colinlyguo@users.noreply.github.com>
  • Loading branch information
4 people committed Aug 13, 2024
1 parent 096b98a commit 4b85bbc
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 1 deletion.
24 changes: 24 additions & 0 deletions core/tx_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,8 @@ var (
slotsGauge = metrics.NewRegisteredGauge("txpool/slots", nil)

reheapTimer = metrics.NewRegisteredTimer("txpool/reheap", nil)

txLifecycleTimer = metrics.NewRegisteredTimer("txpool/txfifecycle", nil)
)

// TxStatus is the current status of a transaction as seen by the pool.
Expand Down Expand Up @@ -795,6 +797,7 @@ func (pool *TxPool) add(tx *types.Transaction, local bool) (replaced bool, err e
// New transaction is better, replace old one
if old != nil {
pool.all.Remove(old.Hash())
pool.calculateTxsLifecycle(types.Transactions{old}, time.Now())
pool.priced.Removed(1)
pendingReplaceMeter.Mark(1)
}
Expand Down Expand Up @@ -848,6 +851,7 @@ func (pool *TxPool) enqueueTx(hash common.Hash, tx *types.Transaction, local boo
if old != nil {
pool.all.Remove(old.Hash())
pool.priced.Removed(1)
pool.calculateTxsLifecycle(types.Transactions{old}, time.Now())
queuedReplaceMeter.Mark(1)
} else {
// Nothing was replaced, bump the queued counter
Expand Down Expand Up @@ -896,13 +900,15 @@ func (pool *TxPool) promoteTx(addr common.Address, hash common.Hash, tx *types.T
if !inserted {
// An older transaction was better, discard this
pool.all.Remove(hash)
pool.calculateTxsLifecycle(types.Transactions{tx}, time.Now())
pool.priced.Removed(1)
pendingDiscardMeter.Mark(1)
return false
}
// Otherwise discard any previous transaction and mark this
if old != nil {
pool.all.Remove(old.Hash())
pool.calculateTxsLifecycle(types.Transactions{old}, time.Now())
pool.priced.Removed(1)
pendingReplaceMeter.Mark(1)
} else {
Expand Down Expand Up @@ -1084,6 +1090,7 @@ func (pool *TxPool) removeTx(hash common.Hash, outofbound bool) {

// Remove it from the list of known transactions
pool.all.Remove(hash)
pool.calculateTxsLifecycle(types.Transactions{tx}, time.Now())
if outofbound {
pool.priced.Removed(1)
}
Expand Down Expand Up @@ -1418,6 +1425,7 @@ func (pool *TxPool) promoteExecutables(accounts []common.Address) []*types.Trans
for _, tx := range forwards {
hash := tx.Hash()
pool.all.Remove(hash)
pool.calculateTxsLifecycle(types.Transactions{tx}, time.Now())
}
log.Trace("Removed old queued transactions", "count", len(forwards))

Expand All @@ -1427,6 +1435,7 @@ func (pool *TxPool) promoteExecutables(accounts []common.Address) []*types.Trans
for _, tx := range drops {
hash := tx.Hash()
pool.all.Remove(hash)
pool.calculateTxsLifecycle(types.Transactions{tx}, time.Now())
}
log.Trace("Removed unpayable queued transactions", "count", len(drops))
queuedNofundsMeter.Mark(int64(len(drops)))
Expand All @@ -1449,6 +1458,7 @@ func (pool *TxPool) promoteExecutables(accounts []common.Address) []*types.Trans
for _, tx := range caps {
hash := tx.Hash()
pool.all.Remove(hash)
pool.calculateTxsLifecycle(types.Transactions{tx}, time.Now())
log.Trace("Removed cap-exceeding queued transaction", "hash", hash)
}
queuedRateLimitMeter.Mark(int64(len(caps)))
Expand Down Expand Up @@ -1531,6 +1541,7 @@ func (pool *TxPool) truncatePending() {
// Drop the transaction from the global pools too
hash := tx.Hash()
pool.all.Remove(hash)
pool.calculateTxsLifecycle(types.Transactions{tx}, time.Now())

// Update the account nonce to the dropped transaction
pool.pendingNonces.setIfLower(offenders[i], tx.Nonce())
Expand Down Expand Up @@ -1558,6 +1569,7 @@ func (pool *TxPool) truncatePending() {
// Drop the transaction from the global pools too
hash := tx.Hash()
pool.all.Remove(hash)
pool.calculateTxsLifecycle(types.Transactions{tx}, time.Now())

// Update the account nonce to the dropped transaction
pool.pendingNonces.setIfLower(addr, tx.Nonce())
Expand Down Expand Up @@ -1637,6 +1649,7 @@ func (pool *TxPool) demoteUnexecutables() {
for _, tx := range olds {
hash := tx.Hash()
pool.all.Remove(hash)
pool.calculateTxsLifecycle(types.Transactions{tx}, time.Now())
log.Trace("Removed old pending transaction", "hash", hash)
}
// Drop all transactions that are too costly (low balance or out of gas), and queue any invalids back for later
Expand All @@ -1646,6 +1659,7 @@ func (pool *TxPool) demoteUnexecutables() {
hash := tx.Hash()
log.Trace("Removed unpayable pending transaction", "hash", hash)
pool.all.Remove(hash)
pool.calculateTxsLifecycle(types.Transactions{tx}, time.Now())
}
pendingNofundsMeter.Mark(int64(len(drops)))

Expand Down Expand Up @@ -1681,6 +1695,16 @@ func (pool *TxPool) demoteUnexecutables() {
}
}

// calculateTxsLifecycle calculates the lifecycle of given txs
func (pool *TxPool) calculateTxsLifecycle(txs types.Transactions, t time.Time) {
for _, tx := range txs {
if tx.Time().Before(t) {
txLifecycle := t.Sub(tx.Time())
txLifecycleTimer.Update(txLifecycle)
}
}
}

// PauseReorgs stops any new reorg jobs to be started but doesn't interrupt any existing ones that are in flight
// Keep in mind this function might block, although it is not expected to block for any significant amount of time
func (pool *TxPool) PauseReorgs() {
Expand Down
5 changes: 5 additions & 0 deletions core/types/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,11 @@ func (tx *Transaction) UnmarshalBinary(b []byte) error {
return nil
}

// Time retrieves the time first seen locally
func (tx *Transaction) Time() time.Time {
return tx.time
}

// decodeTyped decodes a typed transaction from the canonical format.
func (tx *Transaction) decodeTyped(b []byte) (TxData, error) {
if len(b) <= 1 {
Expand Down
2 changes: 1 addition & 1 deletion params/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
const (
VersionMajor = 5 // Major version component of the current release
VersionMinor = 6 // Minor version component of the current release
VersionPatch = 0 // Patch version component of the current release
VersionPatch = 1 // Patch version component of the current release
VersionMeta = "mainnet" // Version metadata to append to the version string
)

Expand Down

0 comments on commit 4b85bbc

Please sign in to comment.