Skip to content

Commit

Permalink
Better combines stats in ingesters (#9474)
Browse files Browse the repository at this point in the history
* Reduces double counting streams when possible
* proportionally add stats based on chunk overlap range
* code reuse
  • Loading branch information
owen-d committed May 18, 2023
1 parent 358b896 commit 64c6f6b
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 26 deletions.
11 changes: 8 additions & 3 deletions pkg/ingester/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -575,7 +575,7 @@ func (i *instance) GetStats(ctx context.Context, req *logproto.IndexStatsRequest
// Consider streams which overlap our time range
if shouldConsiderStream(s, from, through) {
s.chunkMtx.RLock()
res.Streams++
var hasChunkOverlap bool
for _, chk := range s.chunks {
// Consider chunks which overlap our time range
// and haven't been flushed.
Expand All @@ -584,12 +584,17 @@ func (i *instance) GetStats(ctx context.Context, req *logproto.IndexStatsRequest
chkFrom, chkThrough := chk.chunk.Bounds()

if !chk.flushed.Equal(zeroValueTime) && from.Before(chkThrough) && through.After(chkFrom) {
hasChunkOverlap = true
res.Chunks++
res.Entries += uint64(chk.chunk.Size())
res.Bytes += uint64(chk.chunk.UncompressedSize())
factor := util.GetFactorOfTime(from.UnixNano(), through.UnixNano(), chkFrom.UnixNano(), chkThrough.UnixNano())
res.Entries += uint64(factor * float64(chk.chunk.Size()))
res.Bytes += uint64(factor * float64(chk.chunk.UncompressedSize()))
}

}
if hasChunkOverlap {
res.Streams++
}
s.chunkMtx.RUnlock()
}
return nil
Expand Down
27 changes: 4 additions & 23 deletions pkg/storage/stores/tsdb/index/chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ import (

"github.com/prometheus/common/model"

"github.com/grafana/loki/pkg/util"
"github.com/grafana/loki/pkg/util/encoding"
"github.com/grafana/loki/pkg/util/math"
)

// Meta holds information about a chunk of data.
Expand Down Expand Up @@ -245,27 +245,8 @@ func (cs *ChunkStats) addRaw(chunks int, kb, entries uint32) {
}

func (cs *ChunkStats) AddChunk(chk *ChunkMeta, from, through int64) {
// Assuming entries and bytes are evenly distributed in the chunk,
// We will take the proportional number of entries and number of bytes
// if (chk.MinTime < from) and/or (chk.MaxTime > through).
//
// MinTime From Through MaxTime
// ┌────────┬─────────────────┬────────┐
// │ * Chunk * │
// └────────┴─────────────────┴────────┘
// ▲ A | C | B ▲
// └───────────────────────────────────┘
// T = MinTime - MaxTime
//
// We want to get the percentage of time that fits into C
// to use it as a factor to get the amount of bytes and entries
// factor = C = (T - (A + B)) / T = (chunkTime - (leadingTime + trailingTime)) / chunkTime
chunkTime := chk.MaxTime - chk.MinTime
leadingTime := math.Max64(0, from-chk.MinTime)
trailingTime := math.Max64(0, chk.MaxTime-through)
factor := float32(chunkTime-(leadingTime+trailingTime)) / float32(chunkTime)

kb := uint32(float32(chk.KB) * factor)
entries := uint32(float32(chk.Entries) * factor)
factor := util.GetFactorOfTime(from, through, chk.MinTime, chk.MaxTime)
kb := uint32(float64(chk.KB) * factor)
entries := uint32(float64(chk.Entries) * factor)
cs.addRaw(1, kb, entries)
}

0 comments on commit 64c6f6b

Please sign in to comment.