Skip to content

Commit

Permalink
accounting bytes written stat with all the indexing options
Browse files Browse the repository at this point in the history
  • Loading branch information
Thejas-bhat committed Jul 20, 2022
1 parent 0080cf1 commit 3cb7cf7
Show file tree
Hide file tree
Showing 7 changed files with 115 additions and 20 deletions.
15 changes: 15 additions & 0 deletions contentcoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"encoding/binary"
"io"
"reflect"
"sync/atomic"

"github.com/golang/snappy"
)
Expand Down Expand Up @@ -48,6 +49,9 @@ type chunkedContentCoder struct {
chunkMeta []MetaData

compressed []byte // temp buf for snappy compression

// atomic access to this variable
bytesWritten uint64
}

// MetaData represents the data information inside a
Expand Down Expand Up @@ -105,6 +109,16 @@ func (c *chunkedContentCoder) Close() error {
return c.flushContents()
}

func (c *chunkedContentCoder) incrementBytesWritten(val uint64) {
if CollectDiskStats {
atomic.AddUint64(&c.bytesWritten, val)
}
}

func (c *chunkedContentCoder) getBytesWritten() uint64 {
return atomic.LoadUint64(&c.bytesWritten)
}

func (c *chunkedContentCoder) flushContents() error {
// flush the contents, with meta information at first
buf := make([]byte, binary.MaxVarintLen64)
Expand All @@ -127,6 +141,7 @@ func (c *chunkedContentCoder) flushContents() error {
c.final = append(c.final, c.chunkMetaBuf.Bytes()...)
// write the compressed data to the final data
c.compressed = snappy.Encode(c.compressed[:cap(c.compressed)], c.chunkBuf.Bytes())
c.incrementBytesWritten(uint64(len(c.compressed)))
c.final = append(c.final, c.compressed...)

c.chunkLens[c.currChunk] = uint64(len(c.compressed) + len(metaData))
Expand Down
8 changes: 6 additions & 2 deletions docvalues.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,16 +142,20 @@ func (di *docValueReader) BytesRead() uint64 {
return atomic.LoadUint64(&di.bytesRead)
}

func (di *docValueReader) SetBytesRead(val uint64) {
func (di *docValueReader) ResetBytesRead(val uint64) {
atomic.StoreUint64(&di.bytesRead, val)
}

func (di *docValueReader) incrementBytesRead(val uint64) {
if segment.CollectIOStats {
if CollectDiskStats {
atomic.AddUint64(&di.bytesRead, val)
}
}

func (di *docValueReader) BytesWritten() uint64 {
return 0
}

func (di *docValueReader) loadDvChunk(chunkNumber uint64, s *SegmentBase) error {
// advance to the chunk where the docValues
// reside for the given docNum
Expand Down
6 changes: 2 additions & 4 deletions intDecoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ import (
"encoding/binary"
"fmt"
"sync/atomic"

segment "github.com/blevesearch/scorch_segment_api/v2"
)

type chunkedIntDecoder struct {
Expand Down Expand Up @@ -61,7 +59,7 @@ func newChunkedIntDecoder(buf []byte, offset uint64, rv *chunkedIntDecoder) *chu
rv.chunkOffsets[i], read = binary.Uvarint(buf[offset+n : offset+n+binary.MaxVarintLen64])
n += uint64(read)
}
if segment.CollectIOStats {
if CollectDiskStats {
atomic.AddUint64(&rv.bytesRead, n)
}
rv.dataStartOffset = offset + n
Expand Down Expand Up @@ -93,7 +91,7 @@ func (d *chunkedIntDecoder) loadChunk(chunk int) error {
start += s
end += e
d.curChunkBytes = d.data[start:end]
if segment.CollectIOStats {
if CollectDiskStats {
atomic.AddUint64(&d.bytesRead, uint64(len(d.curChunkBytes)))
}
if d.r == nil {
Expand Down
15 changes: 15 additions & 0 deletions intcoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"bytes"
"encoding/binary"
"io"
"sync/atomic"
)

// We can safely use 0 to represent termNotEncoded since 0
Expand All @@ -34,6 +35,9 @@ type chunkedIntCoder struct {
currChunk uint64

buf []byte

// atomic access to this variable
bytesWritten uint64
}

// newChunkedIntCoder returns a new chunk int coder which packs data into
Expand Down Expand Up @@ -73,6 +77,16 @@ func (c *chunkedIntCoder) SetChunkSize(chunkSize uint64, maxDocNum uint64) {
}
}

func (c *chunkedIntCoder) incrementBytesWritten(val uint64) {
if CollectDiskStats {
atomic.AddUint64(&c.bytesWritten, val)
}
}

func (c *chunkedIntCoder) getBytesWritten() uint64 {
return atomic.LoadUint64(&c.bytesWritten)
}

// Add encodes the provided integers into the correct chunk for the provided
// doc num. You MUST call Add() with increasing docNums.
func (c *chunkedIntCoder) Add(docNum uint64, vals ...uint64) error {
Expand All @@ -94,6 +108,7 @@ func (c *chunkedIntCoder) Add(docNum uint64, vals ...uint64) error {
if err != nil {
return err
}
c.incrementBytesWritten(uint64(wb))
}

return nil
Expand Down
38 changes: 35 additions & 3 deletions new.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"math"
"sort"
"sync"
"sync/atomic"

"github.com/RoaringBitmap/roaring"
index "github.com/blevesearch/bleve_index_api"
Expand All @@ -32,6 +33,10 @@ var NewSegmentBufferNumResultsBump int = 100
var NewSegmentBufferNumResultsFactor float64 = 1.0
var NewSegmentBufferAvgBytesPerDocFactor float64 = 1.0

// This flag controls the disk stats collection from the segment files
// during indexing and querying
var CollectDiskStats bool

// ValidateDocFields can be set by applications to perform additional checks
// on fields in a document being added to a new segment, by default it does
// nothing.
Expand Down Expand Up @@ -80,6 +85,7 @@ func (*ZapPlugin) newWithChunkMode(results []index.Document,
if err == nil && s.reset() == nil {
s.lastNumDocs = len(results)
s.lastOutSize = len(br.Bytes())
sb.setBytesWritten(s.getBytesWritten())
interimPool.Put(s)
}

Expand Down Expand Up @@ -141,6 +147,9 @@ type interim struct {

lastNumDocs int
lastOutSize int

// atomic access to this variable
bytesWritten uint64
}

func (s *interim) reset() (err error) {
Expand Down Expand Up @@ -484,6 +493,16 @@ func (s *interim) processDocument(docNum uint64,
}
}

func (s *interim) getBytesWritten() uint64 {
return atomic.LoadUint64(&s.bytesWritten)
}

func (s *interim) incrementBytesWritten(val uint64) {
if CollectDiskStats {
atomic.AddUint64(&s.bytesWritten, val)
}
}

func (s *interim) writeStoredFields() (
storedIndexOffset uint64, err error) {
varBuf := make([]byte, binary.MaxVarintLen64)
Expand Down Expand Up @@ -559,7 +578,7 @@ func (s *interim) writeStoredFields() (
metaBytes := s.metaBuf.Bytes()

compressed = snappy.Encode(compressed[:cap(compressed)], data)

s.incrementBytesWritten(uint64(len(compressed)))
docStoredOffsets[docNum] = uint64(s.w.Count())

_, err := writeUvarints(s.w,
Expand Down Expand Up @@ -597,6 +616,12 @@ func (s *interim) writeStoredFields() (
return storedIndexOffset, nil
}

func (s *interim) setBytesWritten(val uint64) {
if CollectDiskStats {
atomic.StoreUint64(&s.bytesWritten, val)
}
}

func (s *interim) writeDicts() (fdvIndexOffset uint64, dictOffsets []uint64, err error) {
dictOffsets = make([]uint64, len(s.FieldsInv))

Expand Down Expand Up @@ -682,7 +707,7 @@ func (s *interim) writeDicts() (fdvIndexOffset uint64, dictOffsets []uint64, err
if err != nil {
return 0, nil, err
}

prevBytesWritten := locEncoder.getBytesWritten()
for _, loc := range locs[locOffset : locOffset+freqNorm.numLocs] {
err = locEncoder.Add(docNum,
uint64(loc.fieldID), loc.pos, loc.start, loc.end,
Expand All @@ -696,7 +721,9 @@ func (s *interim) writeDicts() (fdvIndexOffset uint64, dictOffsets []uint64, err
return 0, nil, err
}
}

if locEncoder.getBytesWritten()-prevBytesWritten > 0 {
s.incrementBytesWritten(locEncoder.getBytesWritten() - prevBytesWritten)
}
locOffset += freqNorm.numLocs
}

Expand Down Expand Up @@ -750,6 +777,8 @@ func (s *interim) writeDicts() (fdvIndexOffset uint64, dictOffsets []uint64, err
return 0, nil, err
}

s.incrementBytesWritten(uint64(len(vellumData)))

// reset vellum for reuse
s.builderBuf.Reset()

Expand All @@ -764,6 +793,7 @@ func (s *interim) writeDicts() (fdvIndexOffset uint64, dictOffsets []uint64, err
if err != nil {
return 0, nil, err
}

fdvEncoder := newChunkedContentCoder(chunkSize, uint64(len(s.results)-1), s.w, false)
if s.IncludeDocValues[fieldID] {
for docNum, docTerms := range docTermMap {
Expand All @@ -779,6 +809,8 @@ func (s *interim) writeDicts() (fdvIndexOffset uint64, dictOffsets []uint64, err
return 0, nil, err
}

s.setBytesWritten(s.getBytesWritten())

fdvOffsetsStart[fieldID] = uint64(s.w.Count())

_, err = fdvEncoder.Write()
Expand Down
20 changes: 14 additions & 6 deletions posting.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ func (p *PostingsList) Count() uint64 {
// The purpose of this implementation is to get
// the bytes read from the postings lists stored
// on disk, while querying
func (p *PostingsList) SetBytesRead(val uint64) {
func (p *PostingsList) ResetBytesRead(val uint64) {
atomic.StoreUint64(&p.bytesRead, val)
}

Expand All @@ -263,11 +263,15 @@ func (p *PostingsList) BytesRead() uint64 {
}

func (p *PostingsList) incrementBytesRead(val uint64) {
if segment.CollectIOStats {
if CollectDiskStats {
atomic.AddUint64(&p.bytesRead, val)
}
}

func (p *PostingsList) BytesWritten() uint64 {
return 0
}

func (rv *PostingsList) read(postingsOffset uint64, d *Dictionary) error {
rv.postingsOffset = postingsOffset

Expand Down Expand Up @@ -365,7 +369,7 @@ func (i *PostingsIterator) Size() int {
// the bytes read from the disk which includes
// the freqNorm and location specific information
// of a hit
func (i *PostingsIterator) SetBytesRead(val uint64) {
func (i *PostingsIterator) ResetBytesRead(val uint64) {
atomic.StoreUint64(&i.bytesRead, val)
}

Expand All @@ -374,11 +378,15 @@ func (i *PostingsIterator) BytesRead() uint64 {
}

func (i *PostingsIterator) incrementBytesRead(val uint64) {
if segment.CollectIOStats {
if CollectDiskStats {
atomic.AddUint64(&i.bytesRead, val)
}
}

func (i *PostingsIterator) BytesWritten() uint64 {
return 0
}

func (i *PostingsIterator) loadChunk(chunk int) error {
if i.includeFreqNorm {
err := i.freqNormReader.loadChunk(chunk)
Expand All @@ -390,15 +398,15 @@ func (i *PostingsIterator) loadChunk(chunk int) error {
// the postingsIterator is tracking only the chunk loaded
// and the cumulation is tracked correctly in the downstream
// intDecoder
i.SetBytesRead(i.freqNormReader.getBytesRead())
i.ResetBytesRead(i.freqNormReader.getBytesRead())
}

if i.includeLocs {
err := i.locReader.loadChunk(chunk)
if err != nil {
return err
}
i.SetBytesRead(i.locReader.getBytesRead())
i.ResetBytesRead(i.locReader.getBytesRead())
}

i.currChunk = uint32(chunk)
Expand Down
33 changes: 28 additions & 5 deletions segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,8 @@ type SegmentBase struct {
size uint64

// atomic access to this variable
bytesRead uint64
bytesRead uint64
bytesWritten uint64

m sync.Mutex
fieldFSTs map[uint16]*vellum.FST
Expand Down Expand Up @@ -226,23 +227,45 @@ func (s *Segment) loadConfig() error {
// interface, as the intention is to retrieve the bytes
// read from the on-disk segment as part of the current
// query.
func (s *Segment) SetBytesRead(val uint64) {
atomic.StoreUint64(&s.SegmentBase.bytesRead, val)
func (s *Segment) ResetBytesRead(val uint64) {
if CollectDiskStats {
atomic.StoreUint64(&s.SegmentBase.bytesRead, val)
}
}

func (s *Segment) BytesRead() uint64 {
return atomic.LoadUint64(&s.bytesRead) +
atomic.LoadUint64(&s.SegmentBase.bytesRead)
}

func (s *Segment) BytesWritten() uint64 {
return 0
}

func (s *Segment) incrementBytesRead(val uint64) {
if segment.CollectIOStats {
if CollectDiskStats {
atomic.AddUint64(&s.bytesRead, val)
}
}

func (s *SegmentBase) BytesWritten() uint64 {
return atomic.LoadUint64(&s.bytesWritten)
}

func (s *SegmentBase) setBytesWritten(val uint64) {
if CollectDiskStats {
atomic.AddUint64(&s.bytesWritten, val)
}
}

func (s *SegmentBase) BytesRead() uint64 {
return 0
}

func (s *SegmentBase) ResetBytesRead(val uint64) {}

func (s *SegmentBase) incrementBytesRead(val uint64) {
if segment.CollectIOStats {
if CollectDiskStats {
atomic.AddUint64(&s.bytesRead, val)
}
}
Expand Down

0 comments on commit 3cb7cf7

Please sign in to comment.