Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

A new stat for bytes read off the disk #1702

Merged
merged 17 commits into from
Jul 13, 2022

Conversation

Thejas-bhat
Copy link
Contributor

@Thejas-bhat Thejas-bhat commented Jun 9, 2022

No description provided.

@Thejas-bhat Thejas-bhat marked this pull request as ready for review June 30, 2022 13:02
@abhinavdangeti abhinavdangeti added this to the v2.3.4 milestone Jun 30, 2022
Copy link
Contributor

@sreekanth-cb sreekanth-cb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

few comments..

index/scorch/scorch.go Outdated Show resolved Hide resolved
index/scorch/scorch.go Outdated Show resolved Hide resolved
index/scorch/snapshot_index.go Outdated Show resolved Hide resolved
index/scorch/snapshot_index.go Outdated Show resolved Hide resolved
index/scorch/snapshot_index.go Outdated Show resolved Hide resolved
index/scorch/snapshot_index.go Outdated Show resolved Hide resolved
Copy link
Contributor Author

@Thejas-bhat Thejas-bhat left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks sreekanth, I've included your suggestions

Copy link
Contributor

@sreekanth-cb sreekanth-cb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor...

@@ -426,6 +435,16 @@ type segmentMerge struct {
notifyCh chan *mergeTaskIntroStatus
}

func cumulateBytesRead(sbs []segment.Segment) uint64 {
rv := uint64(0)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

var rv uint64?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

for itemsDeQueued < numUpdates {
result := <-resultChan
resultSize := result.Size()
atomic.AddUint64(&s.iStats.analysisBytesAdded, uint64(resultSize))
totalAnalysisSize += resultSize
analysisResults[itemsDeQueued] = result
itemsDeQueued++
result.VisitFields(func(f index.Field) {
atomic.AddUint64(&s.stats.TotIndexedAnalysisBytes,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

better stats wordings

TotBytesIndexedAfterAnalysis?
TotBytesReadDuringQueryTime Or TotBytesReadAtQueryTime

so that both stats share the same prefix.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

@@ -146,10 +148,18 @@ func (i *IndexSnapshot) newIndexSnapshotFieldDict(field string,
results := make(chan *asynchSegmentResult)
for index, segment := range i.segment {
go func(index int, segment *SegmentSnapshot) {
var prevBytesRead uint64
if seg, ok := segment.segment.(diskStatsReporter); ok {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same interface checks were repeated at 152 and 159.
Can't we make that to 1?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah done, my bad.

postings.BytesRead()-prevBytesReadPL)
}

if itr, ok := rv.iterators[i].(diskStatsReporter); ok &&
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

avoid repeated interface checks for the same vars..

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is a re-initialisation of the var rv.iterators[i], so i can't reuse the same variable over here, since the bytes read as part of the iterator initialisation won't be part of the old variable.

@@ -387,13 +387,23 @@ func (s *Scorch) Batch(batch *index.Batch) (err error) {
analysisResults := make([]index.Document, int(numUpdates))
var itemsDeQueued uint64
var totalAnalysisSize int
analysisBytes := func(tokMap index.TokenFrequencies) (rv uint64) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add a flag here to calculate this only when the flag is set?

Feel like this is something we don't always necessarily need to do - hope none of this calculation shows up in CPU profiles?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if this is enough, we've to record copies - stored values and doc values. Also term vectors. I don't see you doing this in your zap PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to close the thread, these two changes are going to be included in upcoming PRs.

@@ -387,13 +387,23 @@ func (s *Scorch) Batch(batch *index.Batch) (err error) {
analysisResults := make([]index.Document, int(numUpdates))
var itemsDeQueued uint64
var totalAnalysisSize int
analysisBytes := func(tokMap index.TokenFrequencies) (rv uint64) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if this is enough, we've to record copies - stored values and doc values. Also term vectors. I don't see you doing this in your zap PR.

@abhinavdangeti abhinavdangeti self-requested a review July 12, 2022 15:26
@@ -352,6 +353,14 @@ func (s *Scorch) planMergeAtSnapshot(ctx context.Context,
atomic.AddUint64(&s.stats.TotFileMergePlanTasksErr, 1)
return err
}

switch segI := seg.(type) {
case segment.DiskStatsReporter:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Per my latest conversation with Jon, we shouldn't be accounting for content written to disk by the merger. More conversation needed here I suppose.

https://docs.google.com/document/d/1pmrasugnCIAXUNgR14h_tix_ZxU3V28unEQYhnz1u34/edit#heading=h.w7mwv55b14ou

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is the case here as well, we are only accounting for the BytesRead from disk for the new merge created segment. And this would anyway be needed/accounted for the next query on this newly formed segment.

Copy link
Contributor

@sreekanth-cb sreekanth-cb Jul 13, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to carry further the stats from the older segments that got merged into the newly formed segment so that the accounting of stats is cumulative. And hence the need for the SetBytesRead() api.
If we fail to consider the stats from the pre-merge segments, then the bytes accounting go wrong.

Have posted alternative thought in zapx PR.

Copy link
Contributor

@sreekanth-cb sreekanth-cb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.

@@ -424,6 +435,11 @@ func (i *IndexSnapshot) Document(id string) (rv index.Document, err error) {
segmentIndex, localDocNum := i.segmentIndexAndLocalDocNumFromGlobal(docNum)

rvd := document.NewDocument(id)
var prevBytesRead uint64
seg, ok := i.segment[segmentIndex].segment.(segment.DiskStatsReporter)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rename ok to say diskStatsAvailable so in line 472 below we know why things are ok.

rv.dicts = make([]segment.TermDictionary, len(is.segment))
for i, segment := range is.segment {
var prevBytesRead uint64
segP, ok := segment.segment.(diskStatsReporter)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here, rename ok to diskStatsAvailable.

@@ -661,10 +709,18 @@ func (i *IndexSnapshot) documentVisitFieldTermsOnSegment(
}

if ssvOk && ssv != nil && len(vFields) > 0 {
var prevBytesRead uint64
ssvp, ok := ssv.(segment.DiskStatsReporter)
if ok {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

@@ -76,6 +76,11 @@ func (i *IndexSnapshotTermFieldReader) Next(preAlloced *index.TermFieldDoc) (*in
}
// find the next hit
for i.segmentOffset < len(i.iterators) {
prevBytesRead := uint64(0)
itr, ok := i.iterators[i.segmentOffset].(segment.DiskStatsReporter)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

abhinavdangeti
abhinavdangeti previously approved these changes Jul 13, 2022
@abhinavdangeti abhinavdangeti dismissed their stale review July 13, 2022 16:31

Needs an additional zapx fix.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants