Skip to content

Commit

Permalink
code cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
Thejas-bhat committed Jun 30, 2022
1 parent 54590cc commit f0970db
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 38 deletions.
79 changes: 42 additions & 37 deletions index/scorch/snapshot_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (
"github.com/RoaringBitmap/roaring"
"github.com/blevesearch/bleve/v2/document"
index "github.com/blevesearch/bleve_index_api"
segmentl "github.com/blevesearch/scorch_segment_api/v2"
segment "github.com/blevesearch/scorch_segment_api/v2"
"github.com/blevesearch/vellum"
lev "github.com/blevesearch/vellum/levenshtein"
bolt "go.etcd.io/bbolt"
Expand All @@ -38,13 +38,13 @@ import (
var lb1, lb2 *lev.LevenshteinAutomatonBuilder

type asynchSegmentResult struct {
dict segmentl.TermDictionary
dictItr segmentl.DictionaryIterator
dict segment.TermDictionary
dictItr segment.DictionaryIterator

index int
docs *roaring.Bitmap

postings segmentl.PostingsList
postings segment.PostingsList

err error
}
Expand All @@ -60,6 +60,8 @@ var reflectStaticSizeIndexSnapshot int
// in the kvConfig.
var DefaultFieldTFRCacheThreshold uint64 = 10

type bytesOffDiskStats segment.BytesOffDiskStats

func init() {
var is interface{} = IndexSnapshot{}
reflectStaticSizeIndexSnapshot = int(reflect.TypeOf(is).Size())
Expand Down Expand Up @@ -140,21 +142,21 @@ func (i *IndexSnapshot) updateSize() {
}

func (i *IndexSnapshot) newIndexSnapshotFieldDict(field string,
makeItr func(i segmentl.TermDictionary) segmentl.DictionaryIterator,
makeItr func(i segment.TermDictionary) segment.DictionaryIterator,
randomLookup bool) (*IndexSnapshotFieldDict, error) {

results := make(chan *asynchSegmentResult)
for index, segment := range i.segment {
go func(index int, segment *SegmentSnapshot) {
var prevBytesRead uint64
if seg, ok := segment.segment.(segmentl.BytesOffDiskStats); ok {
if seg, ok := segment.segment.(bytesOffDiskStats); ok {
prevBytesRead = seg.BytesRead()
}
dict, err := segment.segment.Dictionary(field)
if err != nil {
results <- &asynchSegmentResult{err: err}
} else {
if seg, ok := segment.segment.(segmentl.BytesOffDiskStats); ok {
if seg, ok := segment.segment.(bytesOffDiskStats); ok {
atomic.AddUint64(&i.parent.stats.TotBytesReadQueryTime,
seg.BytesRead()-prevBytesRead)
}
Expand Down Expand Up @@ -209,7 +211,7 @@ func (i *IndexSnapshot) newIndexSnapshotFieldDict(field string,
}

func (i *IndexSnapshot) FieldDict(field string) (index.FieldDict, error) {
return i.newIndexSnapshotFieldDict(field, func(i segmentl.TermDictionary) segmentl.DictionaryIterator {
return i.newIndexSnapshotFieldDict(field, func(i segment.TermDictionary) segment.DictionaryIterator {
return i.AutomatonIterator(nil, nil, nil)
}, false)
}
Expand Down Expand Up @@ -237,7 +239,7 @@ func calculateExclusiveEndFromInclusiveEnd(inclusiveEnd []byte) []byte {

func (i *IndexSnapshot) FieldDictRange(field string, startTerm []byte,
endTerm []byte) (index.FieldDict, error) {
return i.newIndexSnapshotFieldDict(field, func(i segmentl.TermDictionary) segmentl.DictionaryIterator {
return i.newIndexSnapshotFieldDict(field, func(i segment.TermDictionary) segment.DictionaryIterator {
endTermExclusive := calculateExclusiveEndFromInclusiveEnd(endTerm)
return i.AutomatonIterator(nil, startTerm, endTermExclusive)
}, false)
Expand All @@ -264,7 +266,7 @@ func calculateExclusiveEndFromPrefix(in []byte) []byte {
func (i *IndexSnapshot) FieldDictPrefix(field string,
termPrefix []byte) (index.FieldDict, error) {
termPrefixEnd := calculateExclusiveEndFromPrefix(termPrefix)
return i.newIndexSnapshotFieldDict(field, func(i segmentl.TermDictionary) segmentl.DictionaryIterator {
return i.newIndexSnapshotFieldDict(field, func(i segment.TermDictionary) segment.DictionaryIterator {
return i.AutomatonIterator(nil, termPrefix, termPrefixEnd)
}, false)
}
Expand All @@ -279,7 +281,7 @@ func (i *IndexSnapshot) FieldDictRegexp(field string,
return nil, err
}

return i.newIndexSnapshotFieldDict(field, func(i segmentl.TermDictionary) segmentl.DictionaryIterator {
return i.newIndexSnapshotFieldDict(field, func(i segment.TermDictionary) segment.DictionaryIterator {
return i.AutomatonIterator(a, prefixBeg, prefixEnd)
}, false)
}
Expand Down Expand Up @@ -307,7 +309,7 @@ func (i *IndexSnapshot) FieldDictFuzzy(field string,
prefixEnd = calculateExclusiveEndFromPrefix(prefixBeg)
}

return i.newIndexSnapshotFieldDict(field, func(i segmentl.TermDictionary) segmentl.DictionaryIterator {
return i.newIndexSnapshotFieldDict(field, func(i segment.TermDictionary) segment.DictionaryIterator {
return i.AutomatonIterator(a, prefixBeg, prefixEnd)
}, false)
}
Expand Down Expand Up @@ -433,7 +435,7 @@ func (i *IndexSnapshot) Document(id string) (rv index.Document, err error) {

rvd := document.NewDocument(id)
var prevBytesRead uint64
if seg, ok := i.segment[segmentIndex].segment.(segmentl.BytesOffDiskStats); ok {
if seg, ok := i.segment[segmentIndex].segment.(segment.BytesOffDiskStats); ok {
prevBytesRead = seg.BytesRead()
}
err = i.segment[segmentIndex].VisitDocument(localDocNum, func(name string, typ byte, val []byte, pos []uint64) bool {
Expand Down Expand Up @@ -465,9 +467,8 @@ func (i *IndexSnapshot) Document(id string) (rv index.Document, err error) {
if err != nil {
return nil, err
}
if seg, ok := i.segment[segmentIndex].segment.(segmentl.BytesOffDiskStats); ok {
if seg, ok := i.segment[segmentIndex].segment.(segment.BytesOffDiskStats); ok {
delta := seg.BytesRead() - prevBytesRead
// log.Printf("stored field section %v\n", delta)
atomic.AddUint64(&i.parent.stats.TotBytesReadQueryTime, delta)
}
return rvd, nil
Expand Down Expand Up @@ -529,10 +530,10 @@ func (is *IndexSnapshot) TermFieldReader(term []byte, field string, includeFreq,
rv.field = field
rv.snapshot = is
if rv.postings == nil {
rv.postings = make([]segmentl.PostingsList, len(is.segment))
rv.postings = make([]segment.PostingsList, len(is.segment))
}
if rv.iterators == nil {
rv.iterators = make([]segmentl.PostingsIterator, len(is.segment))
rv.iterators = make([]segment.PostingsIterator, len(is.segment))
}
rv.segmentOffset = 0
rv.includeFreq = includeFreq
Expand All @@ -541,17 +542,17 @@ func (is *IndexSnapshot) TermFieldReader(term []byte, field string, includeFreq,
rv.currPosting = nil
rv.currID = rv.currID[:0]
if rv.dicts == nil {
rv.dicts = make([]segmentl.TermDictionary, len(is.segment))
rv.dicts = make([]segment.TermDictionary, len(is.segment))
for i, segment := range is.segment {
var prevBytesRead uint64
if segP, ok := segment.segment.(segmentl.BytesOffDiskStats); ok {
if segP, ok := segment.segment.(bytesOffDiskStats); ok {
prevBytesRead = segP.BytesRead()
}
dict, err := segment.segment.Dictionary(field)
if err != nil {
return nil, err
}
if segP, ok := segment.segment.(segmentl.BytesOffDiskStats); ok {
if segP, ok := segment.segment.(bytesOffDiskStats); ok {
atomic.AddUint64(&is.parent.stats.TotBytesReadQueryTime, segP.BytesRead()-prevBytesRead)
}
rv.dicts[i] = dict
Expand All @@ -560,8 +561,8 @@ func (is *IndexSnapshot) TermFieldReader(term []byte, field string, includeFreq,

for i, segment := range is.segment {
var prevBytesReadPL uint64
if _, ok := segment.segment.(segmentl.BytesOffDiskStats); ok {
if postings, ok := rv.postings[i].(segmentl.BytesOffDiskStats); ok {
if _, ok := segment.segment.(bytesOffDiskStats); ok {
if postings, ok := rv.postings[i].(bytesOffDiskStats); ok {
prevBytesReadPL = postings.BytesRead()
}
}
Expand All @@ -571,20 +572,24 @@ func (is *IndexSnapshot) TermFieldReader(term []byte, field string, includeFreq,
}
rv.postings[i] = pl
var prevBytesReadItr uint64
if _, ok := segment.segment.(segmentl.BytesOffDiskStats); ok {
if itr, ok := rv.iterators[i].(segmentl.BytesOffDiskStats); ok {
if _, ok := segment.segment.(bytesOffDiskStats); ok {
if itr, ok := rv.iterators[i].(bytesOffDiskStats); ok {
prevBytesReadItr = itr.BytesRead()
}
}
rv.iterators[i] = pl.Iterator(includeFreq, includeNorm, includeTermVectors, rv.iterators[i])

if _, ok := segment.segment.(segmentl.BytesOffDiskStats); ok {
if postings, ok := pl.(segmentl.BytesOffDiskStats); ok && prevBytesReadPL < postings.BytesRead() {
atomic.AddUint64(&is.parent.stats.TotBytesReadQueryTime, postings.BytesRead()-prevBytesReadPL)
if _, ok := segment.segment.(bytesOffDiskStats); ok {
if postings, ok := pl.(bytesOffDiskStats); ok &&
prevBytesReadPL < postings.BytesRead() {
atomic.AddUint64(&is.parent.stats.TotBytesReadQueryTime,
postings.BytesRead()-prevBytesReadPL)
}

if itr, ok := rv.iterators[i].(segmentl.BytesOffDiskStats); ok && prevBytesReadItr < itr.BytesRead() {
atomic.AddUint64(&is.parent.stats.TotBytesReadQueryTime, itr.BytesRead()-prevBytesReadItr)
if itr, ok := rv.iterators[i].(bytesOffDiskStats); ok &&
prevBytesReadItr < itr.BytesRead() {
atomic.AddUint64(&is.parent.stats.TotBytesReadQueryTime,
itr.BytesRead()-prevBytesReadItr)
}
}
}
Expand Down Expand Up @@ -668,13 +673,13 @@ func docInternalToNumber(in index.IndexInternalID) (uint64, error) {

func (i *IndexSnapshot) documentVisitFieldTermsOnSegment(
segmentIndex int, localDocNum uint64, fields []string, cFields []string,
visitor index.DocValueVisitor, dvs segmentl.DocVisitState) (
cFieldsOut []string, dvsOut segmentl.DocVisitState, err error) {
visitor index.DocValueVisitor, dvs segment.DocVisitState) (
cFieldsOut []string, dvsOut segment.DocVisitState, err error) {
ss := i.segment[segmentIndex]

var vFields []string // fields that are visitable via the segment

ssv, ssvOk := ss.segment.(segmentl.DocValueVisitable)
ssv, ssvOk := ss.segment.(segment.DocValueVisitable)
if ssvOk && ssv != nil {
vFields, err = ssv.VisitableDocValueFields()
if err != nil {
Expand Down Expand Up @@ -706,14 +711,14 @@ func (i *IndexSnapshot) documentVisitFieldTermsOnSegment(

if ssvOk && ssv != nil && len(vFields) > 0 {
var prevBytesRead uint64
if ssvp, ok := ssv.(segmentl.BytesOffDiskStats); ok {
if ssvp, ok := ssv.(segment.BytesOffDiskStats); ok {
prevBytesRead = ssvp.BytesRead()
}
dvs, err = ssv.VisitDocValues(localDocNum, fields, visitor, dvs)
if err != nil {
return nil, nil, err
}
if ssvp, ok := ssv.(segmentl.BytesOffDiskStats); ok {
if ssvp, ok := ssv.(segment.BytesOffDiskStats); ok {
atomic.AddUint64(&i.parent.stats.TotBytesReadQueryTime, ssvp.BytesRead()-prevBytesRead)
}
}
Expand All @@ -740,7 +745,7 @@ func (i *IndexSnapshot) DocValueReader(fields []string) (
type DocValueReader struct {
i *IndexSnapshot
fields []string
dvs segmentl.DocVisitState
dvs segment.DocVisitState

currSegmentIndex int
currCachedFields []string
Expand Down Expand Up @@ -796,7 +801,7 @@ func (i *IndexSnapshot) DumpFields() chan interface{} {
func (i *IndexSnapshot) diskSegmentsPaths() map[string]struct{} {
rv := make(map[string]struct{}, len(i.segment))
for _, segmentSnapshot := range i.segment {
if seg, ok := segmentSnapshot.segment.(segmentl.PersistedSegment); ok {
if seg, ok := segmentSnapshot.segment.(segment.PersistedSegment); ok {
rv[seg.Path()] = struct{}{}
}
}
Expand All @@ -808,7 +813,7 @@ func (i *IndexSnapshot) diskSegmentsPaths() map[string]struct{} {
func (i *IndexSnapshot) reClaimableDocsRatio() float64 {
var totalCount, liveCount uint64
for _, segmentSnapshot := range i.segment {
if _, ok := segmentSnapshot.segment.(segmentl.PersistedSegment); ok {
if _, ok := segmentSnapshot.segment.(segment.PersistedSegment); ok {
totalCount += uint64(segmentSnapshot.FullSize())
liveCount += uint64(segmentSnapshot.Count())
}
Expand Down
2 changes: 1 addition & 1 deletion index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ func TestBytesRead(t *testing.T) {
stats, _ = idx.StatsMap()["index"].(map[string]interface{})
bytesRead, _ = stats["num_bytes_read_query_time"].(uint64)
if bytesRead-prevBytesRead != 16556 {
t.Fatalf("expected bytes read for fuzzy query is 16176, got %v\n",
t.Fatalf("expected bytes read for fuzzy query is 16556, got %v\n",
bytesRead-prevBytesRead)
}
prevBytesRead = bytesRead
Expand Down

0 comments on commit f0970db

Please sign in to comment.