Skip to content

Commit

Permalink
save
Browse files Browse the repository at this point in the history
  • Loading branch information
awskii committed Apr 18, 2024
1 parent 7c3e45e commit dd5f3eb
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 29 deletions.
35 changes: 35 additions & 0 deletions erigon-lib/etl/etl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,14 @@ package etl

import (
"bytes"
"encoding/binary"
"encoding/hex"
"encoding/json"
"fmt"
"github.com/ledgerwatch/erigon-lib/common"
"io"
"os"
"sort"
"strings"
"testing"

Expand Down Expand Up @@ -514,6 +517,38 @@ func TestReuseCollectorAfterLoad(t *testing.T) {
require.Equal(t, 1, see)
}

func TestAppendAndSortPrefixes(t *testing.T) {
collector := NewCollector(t.Name(), "", NewAppendBuffer(4), log.New())
defer collector.Close()
require := require.New(t)

key := common.FromHex("ed7229d50cde8de174cc64a882a0833ca5f11669")
key1 := append(common.Copy(key), make([]byte, 16)...)

keys := make([]string, 0)
for i := 10; i >= 0; i-- {
binary.BigEndian.PutUint64(key1[len(key):], uint64(i))
binary.BigEndian.PutUint64(key1[len(key)+8:], uint64(i))
kl := len(key1)
if i%5 == 0 && i != 0 {
kl = len(key) + 8
}
keys = append(keys, fmt.Sprintf("%x", key1[:kl]))
require.NoError(collector.Collect(key1[:kl], key1[len(key):]))
}

sort.Strings(keys)
i := 0

err := collector.Load(nil, "", func(k, v []byte, table CurrentTableReader, next LoadNextFunc) error {
t.Logf("collated %x %x\n", k, v)
require.EqualValuesf(keys[i], fmt.Sprintf("%x", k), "i=%d", i)
i++
return nil
}, TransformArgs{})
require.NoError(err)
}

func TestAppend(t *testing.T) {
// append buffer doesn't support nil values
collector := NewCollector(t.Name(), "", NewAppendBuffer(4), log.New())
Expand Down
2 changes: 1 addition & 1 deletion erigon-lib/recsplit/recsplit.go
Original file line number Diff line number Diff line change
Expand Up @@ -580,7 +580,7 @@ func (rs *RecSplit) Build(ctx context.Context) error {
return fmt.Errorf("already built")
}
if rs.keysAdded != rs.keyExpectedCount {
return fmt.Errorf("expected keys %d, got %d", rs.keyExpectedCount, rs.keysAdded)
return fmt.Errorf("rs %s expected keys %d, got %d", rs.indexFileName, rs.keyExpectedCount, rs.keysAdded)
}
var err error
if rs.indexF, err = os.Create(rs.tmpFilePath); err != nil {
Expand Down
7 changes: 4 additions & 3 deletions erigon-lib/state/domain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1517,9 +1517,10 @@ func generateUpdates(r *rand.Rand, totalTx, keyTxsLimit uint64) []upd {
txNum := generateRandomTxNum(r, totalTx, usedTxNums)
value := make([]byte, 10)
r.Read(value)
if r.Intn(100) > 85 {
value = value[:0]
}
// TODO (awskii) this should be fixed in separate PR
//if r.Intn(100) > 85 {
// value = value[:0]
//}

updates = append(updates, upd{txNum: txNum, value: value})
usedTxNums[txNum] = true
Expand Down
52 changes: 27 additions & 25 deletions erigon-lib/state/history.go
Original file line number Diff line number Diff line change
Expand Up @@ -696,17 +696,20 @@ func (h *History) collate(ctx context.Context, step, txFrom, txTo uint64, roTx k
collector := etl.NewCollector(h.historyValsTable, h.iiCfg.dirs.Tmp, etl.NewSortableBuffer(CollateETLRAM), h.logger)
defer collector.Close()

for k, v, err := keysCursor.Seek(txKey[:]); err == nil && k != nil; k, v, err = keysCursor.Next() {
var collateCount uint64

for txnmb, k, err := keysCursor.Seek(txKey[:]); err == nil && txnmb != nil; txnmb, k, err = keysCursor.Next() {
if err != nil {
return HistoryCollation{}, fmt.Errorf("iterate over %s history cursor: %w", h.filenameBase, err)
}
txNum := binary.BigEndian.Uint64(k)
txNum := binary.BigEndian.Uint64(txnmb)
if txNum >= txTo { // [txFrom; txTo)
break
}
if err := collector.Collect(append(v, k...), k); err != nil {
return HistoryCollation{}, fmt.Errorf("collect %s history key [%x]=>txn %d [%x]: %w", h.filenameBase, v, txNum, k, err)
if err := collector.Collect(k, txnmb); err != nil {
return HistoryCollation{}, fmt.Errorf("collect %s history key [%x]=>txn %d [%x]: %w", h.filenameBase, k, txNum, txnmb, err)
}
collateCount++

select {
case <-ctx.Done():
Expand Down Expand Up @@ -738,40 +741,44 @@ func (h *History) collate(ctx context.Context, step, txFrom, txTo uint64, roTx k
if h.noFsync {
efComp.DisableFsync()
}
efHistoryComp = NewArchiveWriter(efComp, CompressNone)

var prevKey, prevEf []byte
bitmap := bitmapdb.NewBitmap64()
var prevKey, prevValue []byte
efHistoryComp = NewArchiveWriter(efComp, CompressNone)
var indexedCount uint64

indexAddKV := func() error {
// fmt.Printf("indexAddKV(%v) | %x -> %x %p\n", bitmap.GetCardinality(), prevKey, k[:lk], bitmap)
//fmt.Printf("indexAddKV(%d) | %x -> %x %p\n", bitmap.GetCardinality(), prevKey, prevEf, bitmap)
ef := eliasfano32.NewEliasFano(bitmap.GetCardinality(), bitmap.Maximum())
it := bitmap.Iterator()
for it.HasNext() {
ef.AddOffset(it.Next())
indexedCount++
}
//count := bitmap.GetCardinality()
bitmap.Clear()
ef.Build()

prevValue = ef.AppendBytes(prevValue[:0])
prevEf = ef.AppendBytes(prevEf[:0])
//fmt.Printf("indexAddKV(%d) | %x -> %x %p\n", count, prevKey, prevEf, bitmap)

if err = efHistoryComp.AddWord(prevKey); err != nil {
return fmt.Errorf("add %s ef history key [%x]: %w", h.filenameBase, prevKey, err)
}
if err = efHistoryComp.AddWord(prevValue); err != nil {
if err = efHistoryComp.AddWord(prevEf); err != nil {
return fmt.Errorf("add %s ef history val: %w", h.filenameBase, err)
}
return nil
}

keyBuf := make([]byte, 0, 256)
var histCount uint64
loadCollatedFunc := func(k, v []byte, table etl.CurrentTableReader, next etl.LoadNextFunc) error {
lk := len(k) - 8
kTxNum, txNum := binary.BigEndian.Uint64(k[lk:]), binary.BigEndian.Uint64(v)
if kTxNum != txNum {
return fmt.Errorf("loadCollatedFunc: key+TxNum [%x] != TxNum [%x]", k, v)
}
//fmt.Printf("loadCollatedFunc %x -> %x\n", k, v)
txNum := binary.BigEndian.Uint64(v)
if h.historyLargeValues {
key, val, err := c.SeekExact(k)
keyBuf = append(append(keyBuf[:0], k...), v...)
key, val, err := c.SeekExact(keyBuf)
if err != nil {
return fmt.Errorf("seekExact %s history val [%x]: %w", h.filenameBase, key, err)
}
Expand All @@ -782,12 +789,11 @@ func (h *History) collate(ctx context.Context, step, txFrom, txTo uint64, roTx k
return fmt.Errorf("add %s history val [%x]=>[%x]: %w", h.filenameBase, key, val, err)
}
} else {
val, err := cd.SeekBothRange(k[:lk], v)
val, err := cd.SeekBothRange(k, v)
if err != nil {
return fmt.Errorf("seekBothRange %s history val [%x]: %w", h.filenameBase, k, err)
}
if val != nil && binary.BigEndian.Uint64(val) == txNum {
// fmt.Printf("HistCollate [%x]=>[%x]\n", []byte(key), val)
val = val[8:]
} else {
val = nil
Expand All @@ -797,17 +803,13 @@ func (h *History) collate(ctx context.Context, step, txFrom, txTo uint64, roTx k
}
}

if prevKey == nil {
prevKey = append(prevKey[:0], k[:lk]...)
}

if !bytes.Equal(prevKey, k[:lk]) {
bitmap.Add(txNum)
if !bytes.Equal(prevKey, k) {
if err := indexAddKV(); err != nil {
return err
}
prevKey = append(prevKey[:0], k[:lk]...)
prevKey = append(prevKey[:0], k...)
}
bitmap.Add(txNum)

return nil
}
Expand All @@ -822,6 +824,7 @@ func (h *History) collate(ctx context.Context, step, txFrom, txTo uint64, roTx k
}
}

fmt.Printf("%s collate %d indexed %d history: %d/%d\n", historyPath, collateCount, indexedCount, histCount, historyComp.Count())
closeComp = false
mxCollationSizeHist.SetUint64(uint64(historyComp.Count()))

Expand All @@ -831,7 +834,6 @@ func (h *History) collate(ctx context.Context, step, txFrom, txTo uint64, roTx k
historyPath: historyPath,
historyComp: historyComp,
historyCount: historyComp.Count(),
//indexBitmaps: indexBitmaps,
}, nil
}

Expand Down

0 comments on commit dd5f3eb

Please sign in to comment.