diff --git a/index/scorch/snapshot_index.go b/index/scorch/snapshot_index.go index 3d95106c9..13c0f2639 100644 --- a/index/scorch/snapshot_index.go +++ b/index/scorch/snapshot_index.go @@ -28,7 +28,7 @@ import ( "github.com/RoaringBitmap/roaring" "github.com/blevesearch/bleve/v2/document" index "github.com/blevesearch/bleve_index_api" - segmentl "github.com/blevesearch/scorch_segment_api/v2" + segment "github.com/blevesearch/scorch_segment_api/v2" "github.com/blevesearch/vellum" lev "github.com/blevesearch/vellum/levenshtein" bolt "go.etcd.io/bbolt" @@ -38,13 +38,13 @@ import ( var lb1, lb2 *lev.LevenshteinAutomatonBuilder type asynchSegmentResult struct { - dict segmentl.TermDictionary - dictItr segmentl.DictionaryIterator + dict segment.TermDictionary + dictItr segment.DictionaryIterator index int docs *roaring.Bitmap - postings segmentl.PostingsList + postings segment.PostingsList err error } @@ -60,6 +60,8 @@ var reflectStaticSizeIndexSnapshot int // in the kvConfig. var DefaultFieldTFRCacheThreshold uint64 = 10 +type bytesOffDiskStats segment.BytesOffDiskStats + func init() { var is interface{} = IndexSnapshot{} reflectStaticSizeIndexSnapshot = int(reflect.TypeOf(is).Size()) @@ -140,21 +142,21 @@ func (i *IndexSnapshot) updateSize() { } func (i *IndexSnapshot) newIndexSnapshotFieldDict(field string, - makeItr func(i segmentl.TermDictionary) segmentl.DictionaryIterator, + makeItr func(i segment.TermDictionary) segment.DictionaryIterator, randomLookup bool) (*IndexSnapshotFieldDict, error) { results := make(chan *asynchSegmentResult) for index, segment := range i.segment { go func(index int, segment *SegmentSnapshot) { var prevBytesRead uint64 - if seg, ok := segment.segment.(segmentl.BytesOffDiskStats); ok { + if seg, ok := segment.segment.(bytesOffDiskStats); ok { prevBytesRead = seg.BytesRead() } dict, err := segment.segment.Dictionary(field) if err != nil { results <- &asynchSegmentResult{err: err} } else { - if seg, ok := segment.segment.(segmentl.BytesOffDiskStats); ok { + if seg, ok := segment.segment.(bytesOffDiskStats); ok { atomic.AddUint64(&i.parent.stats.TotBytesReadQueryTime, seg.BytesRead()-prevBytesRead) } @@ -209,7 +211,7 @@ func (i *IndexSnapshot) newIndexSnapshotFieldDict(field string, } func (i *IndexSnapshot) FieldDict(field string) (index.FieldDict, error) { - return i.newIndexSnapshotFieldDict(field, func(i segmentl.TermDictionary) segmentl.DictionaryIterator { + return i.newIndexSnapshotFieldDict(field, func(i segment.TermDictionary) segment.DictionaryIterator { return i.AutomatonIterator(nil, nil, nil) }, false) } @@ -237,7 +239,7 @@ func calculateExclusiveEndFromInclusiveEnd(inclusiveEnd []byte) []byte { func (i *IndexSnapshot) FieldDictRange(field string, startTerm []byte, endTerm []byte) (index.FieldDict, error) { - return i.newIndexSnapshotFieldDict(field, func(i segmentl.TermDictionary) segmentl.DictionaryIterator { + return i.newIndexSnapshotFieldDict(field, func(i segment.TermDictionary) segment.DictionaryIterator { endTermExclusive := calculateExclusiveEndFromInclusiveEnd(endTerm) return i.AutomatonIterator(nil, startTerm, endTermExclusive) }, false) @@ -264,7 +266,7 @@ func calculateExclusiveEndFromPrefix(in []byte) []byte { func (i *IndexSnapshot) FieldDictPrefix(field string, termPrefix []byte) (index.FieldDict, error) { termPrefixEnd := calculateExclusiveEndFromPrefix(termPrefix) - return i.newIndexSnapshotFieldDict(field, func(i segmentl.TermDictionary) segmentl.DictionaryIterator { + return i.newIndexSnapshotFieldDict(field, func(i segment.TermDictionary) segment.DictionaryIterator { return i.AutomatonIterator(nil, termPrefix, termPrefixEnd) }, false) } @@ -279,7 +281,7 @@ func (i *IndexSnapshot) FieldDictRegexp(field string, return nil, err } - return i.newIndexSnapshotFieldDict(field, func(i segmentl.TermDictionary) segmentl.DictionaryIterator { + return i.newIndexSnapshotFieldDict(field, func(i segment.TermDictionary) segment.DictionaryIterator { return i.AutomatonIterator(a, prefixBeg, prefixEnd) }, false) } @@ -307,7 +309,7 @@ func (i *IndexSnapshot) FieldDictFuzzy(field string, prefixEnd = calculateExclusiveEndFromPrefix(prefixBeg) } - return i.newIndexSnapshotFieldDict(field, func(i segmentl.TermDictionary) segmentl.DictionaryIterator { + return i.newIndexSnapshotFieldDict(field, func(i segment.TermDictionary) segment.DictionaryIterator { return i.AutomatonIterator(a, prefixBeg, prefixEnd) }, false) } @@ -433,7 +435,7 @@ func (i *IndexSnapshot) Document(id string) (rv index.Document, err error) { rvd := document.NewDocument(id) var prevBytesRead uint64 - if seg, ok := i.segment[segmentIndex].segment.(segmentl.BytesOffDiskStats); ok { + if seg, ok := i.segment[segmentIndex].segment.(segment.BytesOffDiskStats); ok { prevBytesRead = seg.BytesRead() } err = i.segment[segmentIndex].VisitDocument(localDocNum, func(name string, typ byte, val []byte, pos []uint64) bool { @@ -465,9 +467,8 @@ func (i *IndexSnapshot) Document(id string) (rv index.Document, err error) { if err != nil { return nil, err } - if seg, ok := i.segment[segmentIndex].segment.(segmentl.BytesOffDiskStats); ok { + if seg, ok := i.segment[segmentIndex].segment.(segment.BytesOffDiskStats); ok { delta := seg.BytesRead() - prevBytesRead - // log.Printf("stored field section %v\n", delta) atomic.AddUint64(&i.parent.stats.TotBytesReadQueryTime, delta) } return rvd, nil @@ -529,10 +530,10 @@ func (is *IndexSnapshot) TermFieldReader(term []byte, field string, includeFreq, rv.field = field rv.snapshot = is if rv.postings == nil { - rv.postings = make([]segmentl.PostingsList, len(is.segment)) + rv.postings = make([]segment.PostingsList, len(is.segment)) } if rv.iterators == nil { - rv.iterators = make([]segmentl.PostingsIterator, len(is.segment)) + rv.iterators = make([]segment.PostingsIterator, len(is.segment)) } rv.segmentOffset = 0 rv.includeFreq = includeFreq @@ -541,17 +542,17 @@ func (is *IndexSnapshot) TermFieldReader(term []byte, field string, includeFreq, rv.currPosting = nil rv.currID = rv.currID[:0] if rv.dicts == nil { - rv.dicts = make([]segmentl.TermDictionary, len(is.segment)) + rv.dicts = make([]segment.TermDictionary, len(is.segment)) for i, segment := range is.segment { var prevBytesRead uint64 - if segP, ok := segment.segment.(segmentl.BytesOffDiskStats); ok { + if segP, ok := segment.segment.(bytesOffDiskStats); ok { prevBytesRead = segP.BytesRead() } dict, err := segment.segment.Dictionary(field) if err != nil { return nil, err } - if segP, ok := segment.segment.(segmentl.BytesOffDiskStats); ok { + if segP, ok := segment.segment.(bytesOffDiskStats); ok { atomic.AddUint64(&is.parent.stats.TotBytesReadQueryTime, segP.BytesRead()-prevBytesRead) } rv.dicts[i] = dict @@ -560,8 +561,8 @@ func (is *IndexSnapshot) TermFieldReader(term []byte, field string, includeFreq, for i, segment := range is.segment { var prevBytesReadPL uint64 - if _, ok := segment.segment.(segmentl.BytesOffDiskStats); ok { - if postings, ok := rv.postings[i].(segmentl.BytesOffDiskStats); ok { + if _, ok := segment.segment.(bytesOffDiskStats); ok { + if postings, ok := rv.postings[i].(bytesOffDiskStats); ok { prevBytesReadPL = postings.BytesRead() } } @@ -571,20 +572,24 @@ func (is *IndexSnapshot) TermFieldReader(term []byte, field string, includeFreq, } rv.postings[i] = pl var prevBytesReadItr uint64 - if _, ok := segment.segment.(segmentl.BytesOffDiskStats); ok { - if itr, ok := rv.iterators[i].(segmentl.BytesOffDiskStats); ok { + if _, ok := segment.segment.(bytesOffDiskStats); ok { + if itr, ok := rv.iterators[i].(bytesOffDiskStats); ok { prevBytesReadItr = itr.BytesRead() } } rv.iterators[i] = pl.Iterator(includeFreq, includeNorm, includeTermVectors, rv.iterators[i]) - if _, ok := segment.segment.(segmentl.BytesOffDiskStats); ok { - if postings, ok := pl.(segmentl.BytesOffDiskStats); ok && prevBytesReadPL < postings.BytesRead() { - atomic.AddUint64(&is.parent.stats.TotBytesReadQueryTime, postings.BytesRead()-prevBytesReadPL) + if _, ok := segment.segment.(bytesOffDiskStats); ok { + if postings, ok := pl.(bytesOffDiskStats); ok && + prevBytesReadPL < postings.BytesRead() { + atomic.AddUint64(&is.parent.stats.TotBytesReadQueryTime, + postings.BytesRead()-prevBytesReadPL) } - if itr, ok := rv.iterators[i].(segmentl.BytesOffDiskStats); ok && prevBytesReadItr < itr.BytesRead() { - atomic.AddUint64(&is.parent.stats.TotBytesReadQueryTime, itr.BytesRead()-prevBytesReadItr) + if itr, ok := rv.iterators[i].(bytesOffDiskStats); ok && + prevBytesReadItr < itr.BytesRead() { + atomic.AddUint64(&is.parent.stats.TotBytesReadQueryTime, + itr.BytesRead()-prevBytesReadItr) } } } @@ -668,13 +673,13 @@ func docInternalToNumber(in index.IndexInternalID) (uint64, error) { func (i *IndexSnapshot) documentVisitFieldTermsOnSegment( segmentIndex int, localDocNum uint64, fields []string, cFields []string, - visitor index.DocValueVisitor, dvs segmentl.DocVisitState) ( - cFieldsOut []string, dvsOut segmentl.DocVisitState, err error) { + visitor index.DocValueVisitor, dvs segment.DocVisitState) ( + cFieldsOut []string, dvsOut segment.DocVisitState, err error) { ss := i.segment[segmentIndex] var vFields []string // fields that are visitable via the segment - ssv, ssvOk := ss.segment.(segmentl.DocValueVisitable) + ssv, ssvOk := ss.segment.(segment.DocValueVisitable) if ssvOk && ssv != nil { vFields, err = ssv.VisitableDocValueFields() if err != nil { @@ -706,14 +711,14 @@ func (i *IndexSnapshot) documentVisitFieldTermsOnSegment( if ssvOk && ssv != nil && len(vFields) > 0 { var prevBytesRead uint64 - if ssvp, ok := ssv.(segmentl.BytesOffDiskStats); ok { + if ssvp, ok := ssv.(segment.BytesOffDiskStats); ok { prevBytesRead = ssvp.BytesRead() } dvs, err = ssv.VisitDocValues(localDocNum, fields, visitor, dvs) if err != nil { return nil, nil, err } - if ssvp, ok := ssv.(segmentl.BytesOffDiskStats); ok { + if ssvp, ok := ssv.(segment.BytesOffDiskStats); ok { atomic.AddUint64(&i.parent.stats.TotBytesReadQueryTime, ssvp.BytesRead()-prevBytesRead) } } @@ -740,7 +745,7 @@ func (i *IndexSnapshot) DocValueReader(fields []string) ( type DocValueReader struct { i *IndexSnapshot fields []string - dvs segmentl.DocVisitState + dvs segment.DocVisitState currSegmentIndex int currCachedFields []string @@ -796,7 +801,7 @@ func (i *IndexSnapshot) DumpFields() chan interface{} { func (i *IndexSnapshot) diskSegmentsPaths() map[string]struct{} { rv := make(map[string]struct{}, len(i.segment)) for _, segmentSnapshot := range i.segment { - if seg, ok := segmentSnapshot.segment.(segmentl.PersistedSegment); ok { + if seg, ok := segmentSnapshot.segment.(segment.PersistedSegment); ok { rv[seg.Path()] = struct{}{} } } @@ -808,7 +813,7 @@ func (i *IndexSnapshot) diskSegmentsPaths() map[string]struct{} { func (i *IndexSnapshot) reClaimableDocsRatio() float64 { var totalCount, liveCount uint64 for _, segmentSnapshot := range i.segment { - if _, ok := segmentSnapshot.segment.(segmentl.PersistedSegment); ok { + if _, ok := segmentSnapshot.segment.(segment.PersistedSegment); ok { totalCount += uint64(segmentSnapshot.FullSize()) liveCount += uint64(segmentSnapshot.Count()) } diff --git a/index_test.go b/index_test.go index 6933ec188..67f86bdec 100644 --- a/index_test.go +++ b/index_test.go @@ -322,7 +322,7 @@ func TestBytesRead(t *testing.T) { stats, _ = idx.StatsMap()["index"].(map[string]interface{}) bytesRead, _ = stats["num_bytes_read_query_time"].(uint64) if bytesRead-prevBytesRead != 16556 { - t.Fatalf("expected bytes read for fuzzy query is 16176, got %v\n", + t.Fatalf("expected bytes read for fuzzy query is 16556, got %v\n", bytesRead-prevBytesRead) } prevBytesRead = bytesRead