Skip to content

Commit

Permalink
Includes accounting of bytes read during query time from
Browse files Browse the repository at this point in the history
the persisted zap files
  • Loading branch information
Thejas-bhat committed Jul 13, 2022
1 parent 16e1263 commit 760343f
Show file tree
Hide file tree
Showing 7 changed files with 108 additions and 5 deletions.
26 changes: 24 additions & 2 deletions docvalues.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ type docValueReader struct {
curChunkHeader []MetaData
curChunkData []byte // compressed data cache
uncompressed []byte // temp buf for snappy decompression
bytesRead uint64
}

func (di *docValueReader) size() int {
Expand Down Expand Up @@ -96,6 +97,10 @@ func (s *SegmentBase) loadFieldDocValueReader(field string,
chunkOffsetsLen := binary.BigEndian.Uint64(s.mem[fieldDvLocEnd-16 : fieldDvLocEnd-8])
// acquire position of chunk offsets
chunkOffsetsPosition = (fieldDvLocEnd - 16) - chunkOffsetsLen

// 16 bytes since it corresponds to the length
// of chunk offsets and the position of the offsets
s.bytesRead += uint64(16)
} else {
return nil, fmt.Errorf("loadFieldDocValueReader: fieldDvLoc too small: %d-%d", fieldDvLocEnd, fieldDvLocStart)
}
Expand All @@ -116,13 +121,28 @@ func (s *SegmentBase) loadFieldDocValueReader(field string,
fdvIter.chunkOffsets[i] = loc
offset += uint64(read)
}

s.bytesRead += offset
// set the data offset
fdvIter.dvDataLoc = fieldDvLocStart

return fdvIter, nil
}

// Implements the segment.DiskStatsReporter interface
// The purpose of this implementation is to get
// the bytes read from the disk (pertaining to the
// docvalues) while querying.
// the loadDvChunk retrieves the next chunk of docvalues
// and the bytes retrieved off the disk pertaining to that
// is accounted as well.
func (di *docValueReader) BytesRead() uint64 {
return di.bytesRead
}

func (di *docValueReader) SetBytesRead(val uint64) {
di.bytesRead = val
}

func (di *docValueReader) loadDvChunk(chunkNumber uint64, s *SegmentBase) error {
// advance to the chunk where the docValues
// reside for the given docNum
Expand All @@ -145,7 +165,7 @@ func (di *docValueReader) loadDvChunk(chunkNumber uint64, s *SegmentBase) error
return fmt.Errorf("failed to read the chunk")
}
chunkMetaLoc := destChunkDataLoc + uint64(read)

di.bytesRead += uint64(read)
offset := uint64(0)
if cap(di.curChunkHeader) < int(numDocs) {
di.curChunkHeader = make([]MetaData, int(numDocs))
Expand All @@ -161,6 +181,7 @@ func (di *docValueReader) loadDvChunk(chunkNumber uint64, s *SegmentBase) error

compressedDataLoc := chunkMetaLoc + offset
dataLength := curChunkEnd - compressedDataLoc
di.bytesRead += uint64(dataLength + offset)
di.curChunkData = s.mem[compressedDataLoc : compressedDataLoc+dataLength]
di.curChunkNum = chunkNumber
di.uncompressed = di.uncompressed[:0]
Expand Down Expand Up @@ -295,6 +316,7 @@ func (s *SegmentBase) VisitDocValues(localDocNum uint64, fields []string,
if err != nil {
return dvs, err
}
s.bytesRead += dvr.BytesRead()
}

_ = dvr.visitDocValues(localDocNum, visitor)
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ require (
github.com/RoaringBitmap/roaring v0.9.4
github.com/blevesearch/bleve_index_api v1.0.1
github.com/blevesearch/mmap-go v1.0.4
github.com/blevesearch/scorch_segment_api/v2 v2.1.0
github.com/blevesearch/scorch_segment_api/v2 v2.1.1
github.com/blevesearch/vellum v1.0.8
github.com/golang/snappy v0.0.1
github.com/spf13/cobra v0.0.5
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ github.com/blevesearch/bleve_index_api v1.0.1 h1:nx9++0hnyiGOHJwQQYfsUGzpRdEVE5L
github.com/blevesearch/bleve_index_api v1.0.1/go.mod h1:fiwKS0xLEm+gBRgv5mumf0dhgFr2mDgZah1pqv1c1M4=
github.com/blevesearch/mmap-go v1.0.4 h1:OVhDhT5B/M1HNPpYPBKIEJaD0F3Si+CrEKULGCDPWmc=
github.com/blevesearch/mmap-go v1.0.4/go.mod h1:EWmEAOmdAS9z/pi/+Toxu99DnsbhG1TIxUoRmJw/pSs=
github.com/blevesearch/scorch_segment_api/v2 v2.1.0 h1:NFwteOpZEvJk5Vg0H6gD0hxupsG3JYocE4DBvsA2GZI=
github.com/blevesearch/scorch_segment_api/v2 v2.1.0/go.mod h1:uch7xyyO/Alxkuxa+CGs79vw0QY8BENSBjg6Mw5L5DE=
github.com/blevesearch/scorch_segment_api/v2 v2.1.1 h1:J8UDudUpDJz21d/hCMIshCeRordwnDTftgXcSDMUx40=
github.com/blevesearch/scorch_segment_api/v2 v2.1.1/go.mod h1:uch7xyyO/Alxkuxa+CGs79vw0QY8BENSBjg6Mw5L5DE=
github.com/blevesearch/vellum v1.0.8 h1:iMGh4lfxza4BnWO/UJTMPlI3HsK9YawjPv+TteVa9ck=
github.com/blevesearch/vellum v1.0.8/go.mod h1:+cpRi/tqq49xUYSQN2P7A5zNSNrS+MscLeeaZ3J46UA=
github.com/coreos/etcd v3.3.10+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE=
Expand Down
13 changes: 13 additions & 0 deletions intDecoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ type chunkedIntDecoder struct {
curChunkBytes []byte
data []byte
r *memUvarintReader
numBytesRead uint64
}

// newChunkedIntDecoder expects an optional or reset chunkedIntDecoder for better reuse.
Expand Down Expand Up @@ -55,10 +56,20 @@ func newChunkedIntDecoder(buf []byte, offset uint64, rv *chunkedIntDecoder) *chu
rv.chunkOffsets[i], read = binary.Uvarint(buf[offset+n : offset+n+binary.MaxVarintLen64])
n += uint64(read)
}
rv.numBytesRead += n
rv.dataStartOffset = offset + n
return rv
}

// A util function which fetches the query time
// specific bytes encoded by intcoder (for eg the
// freqNorm and location details of a term in document)
// the loadChunk retrieves the next chunk and the
// number of bytes retrieve in that operation is accounted
func (d *chunkedIntDecoder) bytesRead() uint64 {
return d.numBytesRead
}

func (d *chunkedIntDecoder) loadChunk(chunk int) error {
if d.startOffset == termNotEncoded {
d.r = newMemUvarintReader([]byte(nil))
Expand All @@ -75,6 +86,7 @@ func (d *chunkedIntDecoder) loadChunk(chunk int) error {
start += s
end += e
d.curChunkBytes = d.data[start:end]
d.numBytesRead += uint64(len(d.curChunkBytes))
if d.r == nil {
d.r = newMemUvarintReader(d.curChunkBytes)
} else {
Expand All @@ -89,6 +101,7 @@ func (d *chunkedIntDecoder) reset() {
d.dataStartOffset = 0
d.chunkOffsets = d.chunkOffsets[:0]
d.curChunkBytes = d.curChunkBytes[:0]
d.numBytesRead = 0
d.data = d.data[:0]
if d.r != nil {
d.r.Reset([]byte(nil))
Expand Down
40 changes: 40 additions & 0 deletions posting.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ type PostingsList struct {
normBits1Hit uint64

chunkSize uint64
bytesRead uint64
}

// represents an immutable, empty postings list
Expand Down Expand Up @@ -208,11 +209,13 @@ func (p *PostingsList) iterator(includeFreq, includeNorm, includeLocs bool,
// initialize freq chunk reader
if rv.includeFreqNorm {
rv.freqNormReader = newChunkedIntDecoder(p.sb.mem, p.freqOffset, rv.freqNormReader)
rv.bytesRead += rv.freqNormReader.bytesRead()
}

// initialize the loc chunk reader
if rv.includeLocs {
rv.locReader = newChunkedIntDecoder(p.sb.mem, p.locOffset, rv.locReader)
rv.bytesRead += rv.locReader.bytesRead()
}

rv.all = p.postings.Iterator()
Expand Down Expand Up @@ -244,6 +247,18 @@ func (p *PostingsList) Count() uint64 {
return n - e
}

// Implements the segment.DiskStatsReporter interface
// The purpose of this implementation is to get
// the bytes read from the postings lists stored
// on disk, while querying
func (p *PostingsList) SetBytesRead(val uint64) {
p.bytesRead = val
}

func (p *PostingsList) BytesRead() uint64 {
return p.bytesRead
}

func (rv *PostingsList) read(postingsOffset uint64, d *Dictionary) error {
rv.postingsOffset = postingsOffset

Expand All @@ -268,6 +283,8 @@ func (rv *PostingsList) read(postingsOffset uint64, d *Dictionary) error {

roaringBytes := d.sb.mem[postingsOffset+n : postingsOffset+n+postingsLen]

rv.bytesRead += (n + postingsLen)

if rv.postings == nil {
rv.postings = roaring.NewBitmap()
}
Expand Down Expand Up @@ -316,6 +333,8 @@ type PostingsIterator struct {

includeFreqNorm bool
includeLocs bool

bytesRead uint64
}

var emptyPostingsIterator = &PostingsIterator{}
Expand All @@ -331,19 +350,40 @@ func (i *PostingsIterator) Size() int {
return sizeInBytes
}

// Implements the segment.DiskStatsReporter interface
// The purpose of this implementation is to get
// the bytes read from the disk which includes
// the freqNorm and location specific information
// of a hit
func (i *PostingsIterator) SetBytesRead(val uint64) {
i.bytesRead = val
}

func (i *PostingsIterator) BytesRead() uint64 {
return i.bytesRead
}

func (i *PostingsIterator) loadChunk(chunk int) error {
if i.includeFreqNorm {
err := i.freqNormReader.loadChunk(chunk)
if err != nil {
return err
}

// assign the bytes read at this point, since
// the postingsIterator is tracking only the chunk loaded
// and the cumulation is tracked correctly in the downstream
// intDecoder
i.bytesRead = i.freqNormReader.bytesRead()

}

if i.includeLocs {
err := i.locReader.loadChunk(chunk)
if err != nil {
return err
}
i.bytesRead = i.locReader.bytesRead()
}

i.currChunk = uint32(chunk)
Expand Down
4 changes: 4 additions & 0 deletions read.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ func (s *SegmentBase) getDocStoredMetaAndCompressed(docNum uint64) ([]byte, []by
meta := s.mem[storedOffset+n : storedOffset+n+metaLen]
data := s.mem[storedOffset+n+metaLen : storedOffset+n+metaLen+dataLen]

s.bytesRead += (metaLen + dataLen)

return meta, data
}

Expand All @@ -39,5 +41,7 @@ func (s *SegmentBase) getDocStoredOffsets(docNum uint64) (
dataLen, read := binary.Uvarint(s.mem[storedOffset+n : storedOffset+n+binary.MaxVarintLen64])
n += uint64(read)

s.bytesRead += n

return indexOffset, storedOffset, n, metaLen, dataLen
}
24 changes: 24 additions & 0 deletions segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ type SegmentBase struct {
fieldDvReaders map[uint16]*docValueReader // naive chunk cache per field
fieldDvNames []string // field names cached in fieldDvReaders
size uint64
bytesRead uint64

m sync.Mutex
fieldFSTs map[uint16]*vellum.FST
Expand Down Expand Up @@ -210,9 +211,26 @@ func (s *Segment) loadConfig() error {

numDocsOffset := storedIndexOffset - 8
s.numDocs = binary.BigEndian.Uint64(s.mm[numDocsOffset : numDocsOffset+8])

// 8*4 + 4*3 = 44 bytes being accounted from all the offsets
// above being read from the file
s.bytesRead += 44
return nil
}

// Implements the segment.DiskStatsReporter interface
// Only the persistedSegment type implments the
// interface, as the intention is to retrieve the bytes
// read from the on-disk segment as part of the current
// query.
func (s *Segment) SetBytesRead(val uint64) {
s.SegmentBase.bytesRead = val
}

func (s *Segment) BytesRead() uint64 {
return s.bytesRead + s.SegmentBase.bytesRead
}

func (s *SegmentBase) loadFields() error {
// NOTE for now we assume the fields index immediately precedes
// the footer, and if this changes, need to adjust accordingly (or
Expand All @@ -224,6 +242,9 @@ func (s *SegmentBase) loadFields() error {
for s.fieldsIndexOffset+(8*fieldID) < fieldsIndexEnd {
addr := binary.BigEndian.Uint64(s.mem[s.fieldsIndexOffset+(8*fieldID) : s.fieldsIndexOffset+(8*fieldID)+8])

// accounting the address of the dictLoc being read from file
s.bytesRead += 8

dictLoc, read := binary.Uvarint(s.mem[addr:fieldsIndexEnd])
n := uint64(read)
s.dictLocs = append(s.dictLocs, dictLoc)
Expand All @@ -233,6 +254,7 @@ func (s *SegmentBase) loadFields() error {
n += uint64(read)

name := string(s.mem[addr+n : addr+n+nameLen])
s.bytesRead += (n + nameLen)
s.fieldsInv = append(s.fieldsInv, name)
s.fieldsMap[name] = uint16(fieldID + 1)

Expand Down Expand Up @@ -267,6 +289,7 @@ func (sb *SegmentBase) dictionary(field string) (rv *Dictionary, err error) {
// read the length of the vellum data
vellumLen, read := binary.Uvarint(sb.mem[dictStart : dictStart+binary.MaxVarintLen64])
fstBytes := sb.mem[dictStart+uint64(read) : dictStart+uint64(read)+vellumLen]
sb.bytesRead += (uint64(read) + vellumLen)
rv.fst, err = vellum.Load(fstBytes)
if err != nil {
sb.m.Unlock()
Expand Down Expand Up @@ -556,6 +579,7 @@ func (s *SegmentBase) loadDvReaders() error {
}
read += uint64(n)

s.bytesRead += read
fieldDvReader, err := s.loadFieldDocValueReader(field, fieldLocStart, fieldLocEnd)
if err != nil {
return err
Expand Down

0 comments on commit 760343f

Please sign in to comment.