From aeba51ab0fe785ab251eaf8d7005f203ece0986f Mon Sep 17 00:00:00 2001 From: Travis Patterson Date: Thu, 18 May 2023 15:42:29 -0600 Subject: [PATCH] Count unflushed chunks in instance stats (#9479) The instance wasn't counting unflushed chunks in the stats. This change checks that the chunk's `flushed time` is 0 which means it hasn't been flushed yet. --- pkg/ingester/instance.go | 5 +---- pkg/ingester/instance_test.go | 17 +++++++++++++++++ 2 files changed, 18 insertions(+), 4 deletions(-) diff --git a/pkg/ingester/instance.go b/pkg/ingester/instance.go index cae5d02cfabd..702751bdce4a 100644 --- a/pkg/ingester/instance.go +++ b/pkg/ingester/instance.go @@ -569,9 +569,6 @@ func (i *instance) GetStats(ctx context.Context, req *logproto.IndexStatsRequest from, through := req.From.Time(), req.Through.Time() if err = i.forMatchingStreams(ctx, from, matchers, nil, func(s *stream) error { - // checks for equality against chunk flush fields - var zeroValueTime time.Time - // Consider streams which overlap our time range if shouldConsiderStream(s, from, through) { s.chunkMtx.RLock() @@ -583,7 +580,7 @@ func (i *instance) GetStats(ctx context.Context, req *logproto.IndexStatsRequest // by the TSDB manager+shipper chkFrom, chkThrough := chk.chunk.Bounds() - if !chk.flushed.Equal(zeroValueTime) && from.Before(chkThrough) && through.After(chkFrom) { + if chk.flushed.IsZero() && from.Before(chkThrough) && through.After(chkFrom) { hasChunkOverlap = true res.Chunks++ factor := util.GetFactorOfTime(from.UnixNano(), through.UnixNano(), chkFrom.UnixNano(), chkThrough.UnixNano()) diff --git a/pkg/ingester/instance_test.go b/pkg/ingester/instance_test.go index a82a618c8046..327b8eb3d2e8 100644 --- a/pkg/ingester/instance_test.go +++ b/pkg/ingester/instance_test.go @@ -837,6 +837,23 @@ func TestStreamShardingUsage(t *testing.T) { }) } +func TestGetStats(t *testing.T) { + instance := defaultInstance(t) + resp, err := instance.GetStats(context.Background(), &logproto.IndexStatsRequest{ + From: 0, + Through: 11000, + Matchers: `{host="agent"}`, + }) + require.NoError(t, err) + + require.Equal(t, &logproto.IndexStatsResponse{ + Streams: 2, + Chunks: 2, + Bytes: 160, + Entries: 10, + }, resp) +} + func defaultInstance(t *testing.T) *instance { ingesterConfig := defaultIngesterTestConfig(t) defaultLimits := defaultLimitsTestConfig()