diff --git a/erigon-lib/etl/etl_test.go b/erigon-lib/etl/etl_test.go index 11771356138..522c09f239e 100644 --- a/erigon-lib/etl/etl_test.go +++ b/erigon-lib/etl/etl_test.go @@ -17,11 +17,14 @@ package etl import ( "bytes" + "encoding/binary" "encoding/hex" "encoding/json" "fmt" + "github.com/ledgerwatch/erigon-lib/common" "io" "os" + "sort" "strings" "testing" @@ -514,6 +517,38 @@ func TestReuseCollectorAfterLoad(t *testing.T) { require.Equal(t, 1, see) } +func TestAppendAndSortPrefixes(t *testing.T) { + collector := NewCollector(t.Name(), "", NewAppendBuffer(4), log.New()) + defer collector.Close() + require := require.New(t) + + key := common.FromHex("ed7229d50cde8de174cc64a882a0833ca5f11669") + key1 := append(common.Copy(key), make([]byte, 16)...) + + keys := make([]string, 0) + for i := 10; i >= 0; i-- { + binary.BigEndian.PutUint64(key1[len(key):], uint64(i)) + binary.BigEndian.PutUint64(key1[len(key)+8:], uint64(i)) + kl := len(key1) + if i%5 == 0 && i != 0 { + kl = len(key) + 8 + } + keys = append(keys, fmt.Sprintf("%x", key1[:kl])) + require.NoError(collector.Collect(key1[:kl], key1[len(key):])) + } + + sort.Strings(keys) + i := 0 + + err := collector.Load(nil, "", func(k, v []byte, table CurrentTableReader, next LoadNextFunc) error { + t.Logf("collated %x %x\n", k, v) + require.EqualValuesf(keys[i], fmt.Sprintf("%x", k), "i=%d", i) + i++ + return nil + }, TransformArgs{}) + require.NoError(err) +} + func TestAppend(t *testing.T) { // append buffer doesn't support nil values collector := NewCollector(t.Name(), "", NewAppendBuffer(4), log.New()) diff --git a/erigon-lib/recsplit/recsplit.go b/erigon-lib/recsplit/recsplit.go index 092db31b63a..4ddd9b72916 100644 --- a/erigon-lib/recsplit/recsplit.go +++ b/erigon-lib/recsplit/recsplit.go @@ -580,7 +580,7 @@ func (rs *RecSplit) Build(ctx context.Context) error { return fmt.Errorf("already built") } if rs.keysAdded != rs.keyExpectedCount { - return fmt.Errorf("expected keys %d, got %d", rs.keyExpectedCount, rs.keysAdded) + return fmt.Errorf("rs %s expected keys %d, got %d", rs.indexFileName, rs.keyExpectedCount, rs.keysAdded) } var err error if rs.indexF, err = os.Create(rs.tmpFilePath); err != nil { diff --git a/erigon-lib/state/domain_test.go b/erigon-lib/state/domain_test.go index 727591a5fca..7cf6b0938b8 100644 --- a/erigon-lib/state/domain_test.go +++ b/erigon-lib/state/domain_test.go @@ -1517,9 +1517,10 @@ func generateUpdates(r *rand.Rand, totalTx, keyTxsLimit uint64) []upd { txNum := generateRandomTxNum(r, totalTx, usedTxNums) value := make([]byte, 10) r.Read(value) - if r.Intn(100) > 85 { - value = value[:0] - } + // TODO (awskii) this should be fixed in separate PR + //if r.Intn(100) > 85 { + // value = value[:0] + //} updates = append(updates, upd{txNum: txNum, value: value}) usedTxNums[txNum] = true diff --git a/erigon-lib/state/history.go b/erigon-lib/state/history.go index 82fcc9634a4..edb63a78ab9 100644 --- a/erigon-lib/state/history.go +++ b/erigon-lib/state/history.go @@ -696,17 +696,20 @@ func (h *History) collate(ctx context.Context, step, txFrom, txTo uint64, roTx k collector := etl.NewCollector(h.historyValsTable, h.iiCfg.dirs.Tmp, etl.NewSortableBuffer(CollateETLRAM), h.logger) defer collector.Close() - for k, v, err := keysCursor.Seek(txKey[:]); err == nil && k != nil; k, v, err = keysCursor.Next() { + var collateCount uint64 + + for txnmb, k, err := keysCursor.Seek(txKey[:]); err == nil && txnmb != nil; txnmb, k, err = keysCursor.Next() { if err != nil { return HistoryCollation{}, fmt.Errorf("iterate over %s history cursor: %w", h.filenameBase, err) } - txNum := binary.BigEndian.Uint64(k) + txNum := binary.BigEndian.Uint64(txnmb) if txNum >= txTo { // [txFrom; txTo) break } - if err := collector.Collect(append(v, k...), k); err != nil { - return HistoryCollation{}, fmt.Errorf("collect %s history key [%x]=>txn %d [%x]: %w", h.filenameBase, v, txNum, k, err) + if err := collector.Collect(k, txnmb); err != nil { + return HistoryCollation{}, fmt.Errorf("collect %s history key [%x]=>txn %d [%x]: %w", h.filenameBase, k, txNum, txnmb, err) } + collateCount++ select { case <-ctx.Done(): @@ -738,40 +741,44 @@ func (h *History) collate(ctx context.Context, step, txFrom, txTo uint64, roTx k if h.noFsync { efComp.DisableFsync() } - efHistoryComp = NewArchiveWriter(efComp, CompressNone) + var prevKey, prevEf []byte bitmap := bitmapdb.NewBitmap64() - var prevKey, prevValue []byte + efHistoryComp = NewArchiveWriter(efComp, CompressNone) + var indexedCount uint64 indexAddKV := func() error { - // fmt.Printf("indexAddKV(%v) | %x -> %x %p\n", bitmap.GetCardinality(), prevKey, k[:lk], bitmap) + //fmt.Printf("indexAddKV(%d) | %x -> %x %p\n", bitmap.GetCardinality(), prevKey, prevEf, bitmap) ef := eliasfano32.NewEliasFano(bitmap.GetCardinality(), bitmap.Maximum()) it := bitmap.Iterator() for it.HasNext() { ef.AddOffset(it.Next()) + indexedCount++ } + //count := bitmap.GetCardinality() bitmap.Clear() ef.Build() - prevValue = ef.AppendBytes(prevValue[:0]) + prevEf = ef.AppendBytes(prevEf[:0]) + //fmt.Printf("indexAddKV(%d) | %x -> %x %p\n", count, prevKey, prevEf, bitmap) if err = efHistoryComp.AddWord(prevKey); err != nil { return fmt.Errorf("add %s ef history key [%x]: %w", h.filenameBase, prevKey, err) } - if err = efHistoryComp.AddWord(prevValue); err != nil { + if err = efHistoryComp.AddWord(prevEf); err != nil { return fmt.Errorf("add %s ef history val: %w", h.filenameBase, err) } return nil } + keyBuf := make([]byte, 0, 256) + var histCount uint64 loadCollatedFunc := func(k, v []byte, table etl.CurrentTableReader, next etl.LoadNextFunc) error { - lk := len(k) - 8 - kTxNum, txNum := binary.BigEndian.Uint64(k[lk:]), binary.BigEndian.Uint64(v) - if kTxNum != txNum { - return fmt.Errorf("loadCollatedFunc: key+TxNum [%x] != TxNum [%x]", k, v) - } + //fmt.Printf("loadCollatedFunc %x -> %x\n", k, v) + txNum := binary.BigEndian.Uint64(v) if h.historyLargeValues { - key, val, err := c.SeekExact(k) + keyBuf = append(append(keyBuf[:0], k...), v...) + key, val, err := c.SeekExact(keyBuf) if err != nil { return fmt.Errorf("seekExact %s history val [%x]: %w", h.filenameBase, key, err) } @@ -782,12 +789,11 @@ func (h *History) collate(ctx context.Context, step, txFrom, txTo uint64, roTx k return fmt.Errorf("add %s history val [%x]=>[%x]: %w", h.filenameBase, key, val, err) } } else { - val, err := cd.SeekBothRange(k[:lk], v) + val, err := cd.SeekBothRange(k, v) if err != nil { return fmt.Errorf("seekBothRange %s history val [%x]: %w", h.filenameBase, k, err) } if val != nil && binary.BigEndian.Uint64(val) == txNum { - // fmt.Printf("HistCollate [%x]=>[%x]\n", []byte(key), val) val = val[8:] } else { val = nil @@ -797,17 +803,13 @@ func (h *History) collate(ctx context.Context, step, txFrom, txTo uint64, roTx k } } - if prevKey == nil { - prevKey = append(prevKey[:0], k[:lk]...) - } - - if !bytes.Equal(prevKey, k[:lk]) { + bitmap.Add(txNum) + if !bytes.Equal(prevKey, k) { if err := indexAddKV(); err != nil { return err } - prevKey = append(prevKey[:0], k[:lk]...) + prevKey = append(prevKey[:0], k...) } - bitmap.Add(txNum) return nil } @@ -822,6 +824,7 @@ func (h *History) collate(ctx context.Context, step, txFrom, txTo uint64, roTx k } } + fmt.Printf("%s collate %d indexed %d history: %d/%d\n", historyPath, collateCount, indexedCount, histCount, historyComp.Count()) closeComp = false mxCollationSizeHist.SetUint64(uint64(historyComp.Count())) @@ -831,7 +834,6 @@ func (h *History) collate(ctx context.Context, step, txFrom, txTo uint64, roTx k historyPath: historyPath, historyComp: historyComp, historyCount: historyComp.Count(), - //indexBitmaps: indexBitmaps, }, nil }