Skip to content

Commit

Permalink
[R4R]add sharedStorage for prefetching to L1 (ethereum#792)
Browse files Browse the repository at this point in the history
* add sharedStorage for prefetching to L1

* remote originStorage in stateObjects

* fix core

* fix bug of sync map

* remove read lock when get & set keys

* statedb copy use CopyWithSharedStorage

* reduce lock access

* fix comment

* avoid sharedPool effects on other modules

* remove  tryPreload

* fix comment

* fix var name

* fix lint

* fix L1 miss data && data condition

* fix comment
  • Loading branch information
flywukong authored and unclezoro committed Mar 29, 2022
1 parent 5f1aabe commit 4ff9697
Show file tree
Hide file tree
Showing 6 changed files with 108 additions and 89 deletions.
2 changes: 1 addition & 1 deletion core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -2101,7 +2101,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, er
if parent == nil {
parent = bc.GetHeader(block.ParentHash(), block.NumberU64()-1)
}
statedb, err := state.New(parent.Root, bc.stateCache, bc.snaps)
statedb, err := state.NewWithSharedPool(parent.Root, bc.stateCache, bc.snaps)
if err != nil {
return it.index, err
}
Expand Down
39 changes: 39 additions & 0 deletions core/state/shared_pool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package state

import (
"sync"

"github.com/ethereum/go-ethereum/common"
)

// sharedPool is used to store maps of originStorage of stateObjects
type StoragePool struct {
sync.RWMutex
sharedMap map[common.Address]*sync.Map
}

func NewStoragePool() *StoragePool {
sharedMap := make(map[common.Address]*sync.Map)
return &StoragePool{
sync.RWMutex{},
sharedMap,
}
}

// getStorage Check whether the storage exist in pool,
// new one if not exist, the content of storage will be fetched in stateObjects.GetCommittedState()
func (s *StoragePool) getStorage(address common.Address) *sync.Map {
s.RLock()
storageMap, ok := s.sharedMap[address]
s.RUnlock()
if !ok {
s.Lock()
defer s.Unlock()
if storageMap, ok = s.sharedMap[address]; !ok {
m := new(sync.Map)
s.sharedMap[address] = m
return m
}
}
return storageMap
}
56 changes: 45 additions & 11 deletions core/state/state_object.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"fmt"
"io"
"math/big"
"sync"
"time"

"github.com/ethereum/go-ethereum/common"
Expand Down Expand Up @@ -79,7 +80,9 @@ type StateObject struct {
trie Trie // storage trie, which becomes non-nil on first access
code Code // contract bytecode, which gets set when code is loaded

originStorage Storage // Storage cache of original entries to dedup rewrites, reset for every transaction
sharedOriginStorage *sync.Map // Storage cache of original entries to dedup rewrites, reset for every transaction
originStorage Storage

pendingStorage Storage // Storage entries that need to be flushed to disk, at the end of an entire block
dirtyStorage Storage // Storage entries that have been modified in the current transaction execution
fakeStorage Storage // Fake storage which constructed by caller for debugging purpose.
Expand Down Expand Up @@ -120,14 +123,21 @@ func newObject(db *StateDB, address common.Address, data Account) *StateObject {
if data.Root == (common.Hash{}) {
data.Root = emptyRoot
}
var storageMap *sync.Map
// Check whether the storage exist in pool, new originStorage if not exist
if db != nil && db.storagePool != nil {
storageMap = db.GetStorage(address)
}

return &StateObject{
db: db,
address: address,
addrHash: crypto.Keccak256Hash(address[:]),
data: data,
originStorage: make(Storage),
pendingStorage: make(Storage),
dirtyStorage: make(Storage),
db: db,
address: address,
addrHash: crypto.Keccak256Hash(address[:]),
data: data,
sharedOriginStorage: storageMap,
originStorage: make(Storage),
pendingStorage: make(Storage),
dirtyStorage: make(Storage),
}
}

Expand Down Expand Up @@ -194,6 +204,29 @@ func (s *StateObject) GetState(db Database, key common.Hash) common.Hash {
return s.GetCommittedState(db, key)
}

func (s *StateObject) getOriginStorage(key common.Hash) (common.Hash, bool) {
if value, cached := s.originStorage[key]; cached {
return value, true
}
// if L1 cache miss, try to get it from shared pool
if s.sharedOriginStorage != nil {
val, ok := s.sharedOriginStorage.Load(key)
if !ok {
return common.Hash{}, false
}
s.originStorage[key] = val.(common.Hash)
return val.(common.Hash), true
}
return common.Hash{}, false
}

func (s *StateObject) setOriginStorage(key common.Hash, value common.Hash) {
if s.db.writeOnSharedStorage && s.sharedOriginStorage != nil {
s.sharedOriginStorage.Store(key, value)
}
s.originStorage[key] = value
}

// GetCommittedState retrieves a value from the committed account storage trie.
func (s *StateObject) GetCommittedState(db Database, key common.Hash) common.Hash {
// If the fake storage is set, only lookup the state here(in the debugging mode)
Expand All @@ -204,7 +237,8 @@ func (s *StateObject) GetCommittedState(db Database, key common.Hash) common.Has
if value, pending := s.pendingStorage[key]; pending {
return value
}
if value, cached := s.originStorage[key]; cached {

if value, cached := s.getOriginStorage(key); cached {
return value
}
// If no live objects are available, attempt to use snapshots
Expand Down Expand Up @@ -263,7 +297,7 @@ func (s *StateObject) GetCommittedState(db Database, key common.Hash) common.Has
}
value.SetBytes(content)
}
s.originStorage[key] = value
s.setOriginStorage(key, value)
return value
}

Expand Down Expand Up @@ -320,6 +354,7 @@ func (s *StateObject) finalise(prefetch bool) {
slotsToPrefetch = append(slotsToPrefetch, common.CopyBytes(key[:])) // Copy needed for closure
}
}

if s.db.prefetcher != nil && prefetch && len(slotsToPrefetch) > 0 && s.data.Root != emptyRoot {
s.db.prefetcher.prefetch(s.data.Root, slotsToPrefetch, s.addrHash)
}
Expand Down Expand Up @@ -356,7 +391,6 @@ func (s *StateObject) updateTrie(db Database) Trie {
continue
}
s.originStorage[key] = value

var v []byte
if (value == common.Hash{}) {
s.setError(tr.TryDelete(key[:]))
Expand Down
98 changes: 22 additions & 76 deletions core/state/statedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,7 @@ import (
"github.com/ethereum/go-ethereum/trie"
)

const (
preLoadLimit = 128
defaultNumOfSlots = 100
)
const defaultNumOfSlots = 100

type revision struct {
id int
Expand Down Expand Up @@ -101,6 +98,8 @@ type StateDB struct {
stateObjectsPending map[common.Address]struct{} // State objects finalized but not yet written to the trie
stateObjectsDirty map[common.Address]struct{} // State objects modified in the current execution

storagePool *StoragePool // sharedPool to store L1 originStorage of stateObjects
writeOnSharedStorage bool // Write to the shared origin storage of a stateObject while reading from the underlying storage layer.
// DB error.
// State objects are used by the consensus core and VM which are
// unable to deal with database-level errors. Any error that occurs
Expand Down Expand Up @@ -147,6 +146,16 @@ func New(root common.Hash, db Database, snaps *snapshot.Tree) (*StateDB, error)
return newStateDB(root, db, snaps)
}

// NewWithSharedPool creates a new state with sharedStorge on layer 1.5
func NewWithSharedPool(root common.Hash, db Database, snaps *snapshot.Tree) (*StateDB, error) {
statedb, err := newStateDB(root, db, snaps)
if err != nil {
return nil, err
}
statedb.storagePool = NewStoragePool()
return statedb, nil
}

func newStateDB(root common.Hash, db Database, snaps *snapshot.Tree) (*StateDB, error) {
sdb := &StateDB{
db: db,
Expand Down Expand Up @@ -178,6 +187,10 @@ func newStateDB(root common.Hash, db Database, snaps *snapshot.Tree) (*StateDB,
return sdb, nil
}

func (s *StateDB) EnableWriteOnSharedStorage() {
s.writeOnSharedStorage = true
}

// StartPrefetcher initializes a new trie prefetcher to pull in nodes from the
// state trie concurrently while the state is mutated so that when we reach the
// commit phase, most of the needed data is already hot.
Expand Down Expand Up @@ -591,78 +604,6 @@ func (s *StateDB) getStateObject(addr common.Address) *StateObject {
return nil
}

func (s *StateDB) TryPreload(block *types.Block, signer types.Signer) {
accounts := make(map[common.Address]bool, block.Transactions().Len())
accountsSlice := make([]common.Address, 0, block.Transactions().Len())
for _, tx := range block.Transactions() {
from, err := types.Sender(signer, tx)
if err != nil {
break
}
accounts[from] = true
if tx.To() != nil {
accounts[*tx.To()] = true
}
}
for account := range accounts {
accountsSlice = append(accountsSlice, account)
}
if len(accountsSlice) >= preLoadLimit && len(accountsSlice) > runtime.NumCPU() {
objsChan := make(chan []*StateObject, runtime.NumCPU())
for i := 0; i < runtime.NumCPU(); i++ {
start := i * len(accountsSlice) / runtime.NumCPU()
end := (i + 1) * len(accountsSlice) / runtime.NumCPU()
if i+1 == runtime.NumCPU() {
end = len(accountsSlice)
}
go func(start, end int) {
objs := s.preloadStateObject(accountsSlice[start:end])
objsChan <- objs
}(start, end)
}
for i := 0; i < runtime.NumCPU(); i++ {
objs := <-objsChan
for _, obj := range objs {
s.SetStateObject(obj)
}
}
}
}

func (s *StateDB) preloadStateObject(address []common.Address) []*StateObject {
// Prefer live objects if any is available
if s.snap == nil {
return nil
}
hasher := crypto.NewKeccakState()
objs := make([]*StateObject, 0, len(address))
for _, addr := range address {
// If no live objects are available, attempt to use snapshots
if acc, err := s.snap.Account(crypto.HashData(hasher, addr.Bytes())); err == nil {
if acc == nil {
continue
}
data := &Account{
Nonce: acc.Nonce,
Balance: acc.Balance,
CodeHash: acc.CodeHash,
Root: common.BytesToHash(acc.Root),
}
if len(data.CodeHash) == 0 {
data.CodeHash = emptyCodeHash
}
if data.Root == (common.Hash{}) {
data.Root = emptyRoot
}
// Insert into the live set
obj := newObject(s, addr, *data)
objs = append(objs, obj)
}
// Do not enable this feature when snapshot is not enabled.
}
return objs
}

// getDeletedStateObject is similar to getStateObject, but instead of returning
// nil for a deleted state object, it returns the actual object with the deleted
// flag set. This is needed by the state journal to revert to the correct s-
Expand Down Expand Up @@ -828,6 +769,7 @@ func (s *StateDB) Copy() *StateDB {
stateObjects: make(map[common.Address]*StateObject, len(s.journal.dirties)),
stateObjectsPending: make(map[common.Address]struct{}, len(s.stateObjectsPending)),
stateObjectsDirty: make(map[common.Address]struct{}, len(s.journal.dirties)),
storagePool: s.storagePool,
refund: s.refund,
logs: make(map[common.Hash][]*types.Log, len(s.logs)),
logSize: s.logSize,
Expand Down Expand Up @@ -1626,3 +1568,7 @@ func (s *StateDB) GetDirtyAccounts() []common.Address {
}
return accounts
}

func (s *StateDB) GetStorage(address common.Address) *sync.Map {
return s.storagePool.getStorage(address)
}
1 change: 1 addition & 0 deletions core/state_prefetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ func (p *statePrefetcher) Prefetch(block *types.Block, statedb *state.StateDB, c
for i := 0; i < prefetchThread; i++ {
go func(idx int) {
newStatedb := statedb.Copy()
newStatedb.EnableWriteOnSharedStorage()
gaspool := new(GasPool).AddGas(block.GasLimit())
blockContext := NewEVMBlockContext(header, p.bc, nil)
evm := vm.NewEVM(blockContext, vm.TxContext{}, statedb, p.config, cfg)
Expand Down
1 change: 0 additions & 1 deletion core/state_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,6 @@ func (p *StateProcessor) Process(block *types.Block, statedb *state.StateDB, cfg
gp = new(GasPool).AddGas(block.GasLimit())
)
signer := types.MakeSigner(p.bc.chainConfig, block.Number())
statedb.TryPreload(block, signer)
var receipts = make([]*types.Receipt, 0)
// Mutate the block and state according to any hard-fork specs
if p.config.DAOForkSupport && p.config.DAOForkBlock != nil && p.config.DAOForkBlock.Cmp(block.Number()) == 0 {
Expand Down

0 comments on commit 4ff9697

Please sign in to comment.