Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix IO Stats computation #119

Merged
merged 4 commits into from
Jul 22, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions contentcoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"encoding/binary"
"io"
"reflect"
"sync/atomic"

"github.com/golang/snappy"
)
Expand Down Expand Up @@ -48,6 +49,9 @@ type chunkedContentCoder struct {
chunkMeta []MetaData

compressed []byte // temp buf for snappy compression

// atomic access to this variable
bytesWritten uint64
}

// MetaData represents the data information inside a
Expand Down Expand Up @@ -105,6 +109,14 @@ func (c *chunkedContentCoder) Close() error {
return c.flushContents()
}

func (c *chunkedContentCoder) incrementBytesWritten(val uint64) {
atomic.AddUint64(&c.bytesWritten, val)
}

func (c *chunkedContentCoder) getBytesWritten() uint64 {
return atomic.LoadUint64(&c.bytesWritten)
}

func (c *chunkedContentCoder) flushContents() error {
// flush the contents, with meta information at first
buf := make([]byte, binary.MaxVarintLen64)
Expand All @@ -127,6 +139,7 @@ func (c *chunkedContentCoder) flushContents() error {
c.final = append(c.final, c.chunkMetaBuf.Bytes()...)
// write the compressed data to the final data
c.compressed = snappy.Encode(c.compressed[:cap(c.compressed)], c.chunkBuf.Bytes())
c.incrementBytesWritten(uint64(len(c.compressed)))
c.final = append(c.final, c.compressed...)

c.chunkLens[c.currChunk] = uint64(len(c.compressed) + len(metaData))
Expand Down
6 changes: 5 additions & 1 deletion docvalues.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,14 +142,18 @@ func (di *docValueReader) BytesRead() uint64 {
return atomic.LoadUint64(&di.bytesRead)
}

func (di *docValueReader) SetBytesRead(val uint64) {
func (di *docValueReader) ResetBytesRead(val uint64) {
atomic.StoreUint64(&di.bytesRead, val)
}

func (di *docValueReader) incrementBytesRead(val uint64) {
atomic.AddUint64(&di.bytesRead, val)
}

func (di *docValueReader) BytesWritten() uint64 {
return 0
}

func (di *docValueReader) loadDvChunk(chunkNumber uint64, s *SegmentBase) error {
// advance to the chunk where the docValues
// reside for the given docNum
Expand Down
13 changes: 13 additions & 0 deletions intcoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"bytes"
"encoding/binary"
"io"
"sync/atomic"
)

// We can safely use 0 to represent termNotEncoded since 0
Expand All @@ -34,6 +35,9 @@ type chunkedIntCoder struct {
currChunk uint64

buf []byte

// atomic access to this variable
bytesWritten uint64
}

// newChunkedIntCoder returns a new chunk int coder which packs data into
Expand Down Expand Up @@ -73,6 +77,14 @@ func (c *chunkedIntCoder) SetChunkSize(chunkSize uint64, maxDocNum uint64) {
}
}

func (c *chunkedIntCoder) incrementBytesWritten(val uint64) {
atomic.AddUint64(&c.bytesWritten, val)
}

func (c *chunkedIntCoder) getBytesWritten() uint64 {
return atomic.LoadUint64(&c.bytesWritten)
}

// Add encodes the provided integers into the correct chunk for the provided
// doc num. You MUST call Add() with increasing docNums.
func (c *chunkedIntCoder) Add(docNum uint64, vals ...uint64) error {
Expand All @@ -94,6 +106,7 @@ func (c *chunkedIntCoder) Add(docNum uint64, vals ...uint64) error {
if err != nil {
return err
}
c.incrementBytesWritten(uint64(wb))
}

return nil
Expand Down
30 changes: 27 additions & 3 deletions new.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"math"
"sort"
"sync"
"sync/atomic"

"github.com/RoaringBitmap/roaring"
index "github.com/blevesearch/bleve_index_api"
Expand Down Expand Up @@ -80,6 +81,7 @@ func (*ZapPlugin) newWithChunkMode(results []index.Document,
if err == nil && s.reset() == nil {
s.lastNumDocs = len(results)
s.lastOutSize = len(br.Bytes())
sb.setBytesWritten(s.getBytesWritten())
interimPool.Put(s)
}

Expand Down Expand Up @@ -141,6 +143,9 @@ type interim struct {

lastNumDocs int
lastOutSize int

// atomic access to this variable
bytesWritten uint64
}

func (s *interim) reset() (err error) {
Expand Down Expand Up @@ -484,6 +489,14 @@ func (s *interim) processDocument(docNum uint64,
}
}

func (s *interim) getBytesWritten() uint64 {
return atomic.LoadUint64(&s.bytesWritten)
}

func (s *interim) incrementBytesWritten(val uint64) {
atomic.AddUint64(&s.bytesWritten, val)
}

func (s *interim) writeStoredFields() (
storedIndexOffset uint64, err error) {
varBuf := make([]byte, binary.MaxVarintLen64)
Expand Down Expand Up @@ -559,7 +572,7 @@ func (s *interim) writeStoredFields() (
metaBytes := s.metaBuf.Bytes()

compressed = snappy.Encode(compressed[:cap(compressed)], data)

s.incrementBytesWritten(uint64(len(compressed)))
docStoredOffsets[docNum] = uint64(s.w.Count())

_, err := writeUvarints(s.w,
Expand Down Expand Up @@ -597,6 +610,10 @@ func (s *interim) writeStoredFields() (
return storedIndexOffset, nil
}

func (s *interim) setBytesWritten(val uint64) {
atomic.StoreUint64(&s.bytesWritten, val)
}

func (s *interim) writeDicts() (fdvIndexOffset uint64, dictOffsets []uint64, err error) {
dictOffsets = make([]uint64, len(s.FieldsInv))

Expand Down Expand Up @@ -682,7 +699,7 @@ func (s *interim) writeDicts() (fdvIndexOffset uint64, dictOffsets []uint64, err
if err != nil {
return 0, nil, err
}

prevBytesWritten := locEncoder.getBytesWritten()
for _, loc := range locs[locOffset : locOffset+freqNorm.numLocs] {
err = locEncoder.Add(docNum,
uint64(loc.fieldID), loc.pos, loc.start, loc.end,
Expand All @@ -696,7 +713,9 @@ func (s *interim) writeDicts() (fdvIndexOffset uint64, dictOffsets []uint64, err
return 0, nil, err
}
}

if locEncoder.getBytesWritten()-prevBytesWritten > 0 {
s.incrementBytesWritten(locEncoder.getBytesWritten() - prevBytesWritten)
}
locOffset += freqNorm.numLocs
}

Expand Down Expand Up @@ -750,6 +769,8 @@ func (s *interim) writeDicts() (fdvIndexOffset uint64, dictOffsets []uint64, err
return 0, nil, err
}

s.incrementBytesWritten(uint64(len(vellumData)))

// reset vellum for reuse
s.builderBuf.Reset()

Expand All @@ -764,6 +785,7 @@ func (s *interim) writeDicts() (fdvIndexOffset uint64, dictOffsets []uint64, err
if err != nil {
return 0, nil, err
}

fdvEncoder := newChunkedContentCoder(chunkSize, uint64(len(s.results)-1), s.w, false)
if s.IncludeDocValues[fieldID] {
for docNum, docTerms := range docTermMap {
Expand All @@ -779,6 +801,8 @@ func (s *interim) writeDicts() (fdvIndexOffset uint64, dictOffsets []uint64, err
return 0, nil, err
}

s.setBytesWritten(s.getBytesWritten())

fdvOffsetsStart[fieldID] = uint64(s.w.Count())

_, err = fdvEncoder.Write()
Expand Down
16 changes: 12 additions & 4 deletions posting.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ func (p *PostingsList) Count() uint64 {
// The purpose of this implementation is to get
// the bytes read from the postings lists stored
// on disk, while querying
func (p *PostingsList) SetBytesRead(val uint64) {
func (p *PostingsList) ResetBytesRead(val uint64) {
atomic.StoreUint64(&p.bytesRead, val)
}

Expand All @@ -266,6 +266,10 @@ func (p *PostingsList) incrementBytesRead(val uint64) {
atomic.AddUint64(&p.bytesRead, val)
}

func (p *PostingsList) BytesWritten() uint64 {
return 0
}

func (rv *PostingsList) read(postingsOffset uint64, d *Dictionary) error {
rv.postingsOffset = postingsOffset

Expand Down Expand Up @@ -363,7 +367,7 @@ func (i *PostingsIterator) Size() int {
// the bytes read from the disk which includes
// the freqNorm and location specific information
// of a hit
func (i *PostingsIterator) SetBytesRead(val uint64) {
func (i *PostingsIterator) ResetBytesRead(val uint64) {
atomic.StoreUint64(&i.bytesRead, val)
}

Expand All @@ -375,6 +379,10 @@ func (i *PostingsIterator) incrementBytesRead(val uint64) {
atomic.AddUint64(&i.bytesRead, val)
}

func (i *PostingsIterator) BytesWritten() uint64 {
return 0
}

func (i *PostingsIterator) loadChunk(chunk int) error {
if i.includeFreqNorm {
err := i.freqNormReader.loadChunk(chunk)
Expand All @@ -386,15 +394,15 @@ func (i *PostingsIterator) loadChunk(chunk int) error {
// the postingsIterator is tracking only the chunk loaded
// and the cumulation is tracked correctly in the downstream
// intDecoder
i.SetBytesRead(i.freqNormReader.getBytesRead())
i.ResetBytesRead(i.freqNormReader.getBytesRead())
}

if i.includeLocs {
err := i.locReader.loadChunk(chunk)
if err != nil {
return err
}
i.SetBytesRead(i.locReader.getBytesRead())
i.ResetBytesRead(i.locReader.getBytesRead())
}

i.currChunk = uint32(chunk)
Expand Down
23 changes: 21 additions & 2 deletions segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,8 @@ type SegmentBase struct {
size uint64

// atomic access to this variable
bytesRead uint64
bytesRead uint64
bytesWritten uint64

m sync.Mutex
fieldFSTs map[uint16]*vellum.FST
Expand Down Expand Up @@ -226,7 +227,7 @@ func (s *Segment) loadConfig() error {
// interface, as the intention is to retrieve the bytes
// read from the on-disk segment as part of the current
// query.
func (s *Segment) SetBytesRead(val uint64) {
func (s *Segment) ResetBytesRead(val uint64) {
atomic.StoreUint64(&s.SegmentBase.bytesRead, val)
}

Expand All @@ -235,10 +236,28 @@ func (s *Segment) BytesRead() uint64 {
atomic.LoadUint64(&s.SegmentBase.bytesRead)
}

func (s *Segment) BytesWritten() uint64 {
return 0
}

func (s *Segment) incrementBytesRead(val uint64) {
atomic.AddUint64(&s.bytesRead, val)
}

func (s *SegmentBase) BytesWritten() uint64 {
return atomic.LoadUint64(&s.bytesWritten)
}

func (s *SegmentBase) setBytesWritten(val uint64) {
atomic.AddUint64(&s.bytesWritten, val)
}

func (s *SegmentBase) BytesRead() uint64 {
return 0
}

func (s *SegmentBase) ResetBytesRead(val uint64) {}

func (s *SegmentBase) incrementBytesRead(val uint64) {
atomic.AddUint64(&s.bytesRead, val)
}
Expand Down