Skip to content

Commit

Permalink
add StreamGroupInfo EntriesRead and Lag (#2510)
Browse files Browse the repository at this point in the history
Fixes #2467
  • Loading branch information
tvdias authored Aug 15, 2023
1 parent 2e6b2a8 commit 4cf2013
Show file tree
Hide file tree
Showing 5 changed files with 41 additions and 3 deletions.
1 change: 1 addition & 0 deletions docs/ReleaseNotes.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ Current package versions:

- Fix [#2507](https://github.com/StackExchange/StackExchange.Redis/issues/2507): Pub/sub with multi-item payloads should be usable ([#2508 by mgravell](https://github.com/StackExchange/StackExchange.Redis/pull/2508))
- Add: connection-id tracking (internal only, no public API) ([#2508 by mgravell](https://github.com/StackExchange/StackExchange.Redis/pull/2508))
- Fix [#2467](https://github.com/StackExchange/StackExchange.Redis/issues/2467): Add StreamGroupInfo EntriesRead and Lag ([#2510 by tvdias](https://github.com/StackExchange/StackExchange.Redis/pull/2510))

## 2.6.122

Expand Down
14 changes: 13 additions & 1 deletion src/StackExchange.Redis/APITypes/StreamGroupInfo.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,14 @@
/// </summary>
public readonly struct StreamGroupInfo
{
internal StreamGroupInfo(string name, int consumerCount, int pendingMessageCount, string? lastDeliveredId)
internal StreamGroupInfo(string name, int consumerCount, int pendingMessageCount, string? lastDeliveredId, long? entriesRead, long? lag)
{
Name = name;
ConsumerCount = consumerCount;
PendingMessageCount = pendingMessageCount;
LastDeliveredId = lastDeliveredId;
EntriesRead = entriesRead;
Lag = lag;
}

/// <summary>
Expand All @@ -33,4 +35,14 @@ internal StreamGroupInfo(string name, int consumerCount, int pendingMessageCount
/// The Id of the last message delivered to the group.
/// </summary>
public string? LastDeliveredId { get; }

/// <summary>
/// Total number of entries the group had read.
/// </summary>
public long? EntriesRead { get; }

/// <summary>
/// The number of entries in the range between the group's read entries and the stream's entries.
/// </summary>
public long? Lag { get; }
}
2 changes: 2 additions & 0 deletions src/StackExchange.Redis/PublicAPI/PublicAPI.Shipped.txt
Original file line number Diff line number Diff line change
Expand Up @@ -1507,6 +1507,8 @@ StackExchange.Redis.StreamEntry.this[StackExchange.Redis.RedisValue fieldName].g
StackExchange.Redis.StreamEntry.Values.get -> StackExchange.Redis.NameValueEntry[]!
StackExchange.Redis.StreamGroupInfo
StackExchange.Redis.StreamGroupInfo.ConsumerCount.get -> int
StackExchange.Redis.StreamGroupInfo.EntriesRead.get -> long?
StackExchange.Redis.StreamGroupInfo.Lag.get -> long?
StackExchange.Redis.StreamGroupInfo.LastDeliveredId.get -> string?
StackExchange.Redis.StreamGroupInfo.Name.get -> string!
StackExchange.Redis.StreamGroupInfo.PendingMessageCount.get -> int
Expand Down
15 changes: 14 additions & 1 deletion src/StackExchange.Redis/ResultProcessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2049,6 +2049,8 @@ internal static readonly CommandBytes
Pending = "pending",
Idle = "idle",
LastDeliveredId = "last-delivered-id",
EntriesRead = "entries-read",
Lag = "lag",
IP = "ip",
Port = "port";

Expand Down Expand Up @@ -2107,6 +2109,10 @@ protected override StreamGroupInfo ParseItem(in RawResult result)
// 6) (integer)2
// 7) last-delivered-id
// 8) "1588152489012-0"
// 9) "entries-read"
// 10) (integer)2
// 11) "lag"
// 12) (integer)0
// 2) 1) name
// 2) "some-other-group"
// 3) consumers
Expand All @@ -2115,17 +2121,24 @@ protected override StreamGroupInfo ParseItem(in RawResult result)
// 6) (integer)0
// 7) last-delivered-id
// 8) "1588152498034-0"
// 9) "entries-read"
// 10) (integer)1
// 11) "lag"
// 12) (integer)1

var arr = result.GetItems();
string? name = default, lastDeliveredId = default;
int consumerCount = default, pendingMessageCount = default;
long entriesRead = default, lag = default;

KeyValuePairParser.TryRead(arr, KeyValuePairParser.Name, ref name);
KeyValuePairParser.TryRead(arr, KeyValuePairParser.Consumers, ref consumerCount);
KeyValuePairParser.TryRead(arr, KeyValuePairParser.Pending, ref pendingMessageCount);
KeyValuePairParser.TryRead(arr, KeyValuePairParser.LastDeliveredId, ref lastDeliveredId);
KeyValuePairParser.TryRead(arr, KeyValuePairParser.EntriesRead, ref entriesRead);
KeyValuePairParser.TryRead(arr, KeyValuePairParser.Lag, ref lag);

return new StreamGroupInfo(name!, consumerCount, pendingMessageCount, lastDeliveredId);
return new StreamGroupInfo(name!, consumerCount, pendingMessageCount, lastDeliveredId, entriesRead, lag);
}
}

Expand Down
12 changes: 11 additions & 1 deletion tests/StackExchange.Redis.Tests/StreamTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1242,24 +1242,34 @@ public void StreamGroupInfoGet()
db.StreamCreateConsumerGroup(key, group1, StreamPosition.Beginning);
db.StreamCreateConsumerGroup(key, group2, StreamPosition.Beginning);

var groupInfoList = db.StreamGroupInfo(key);
Assert.Equal(0, groupInfoList[0].EntriesRead);
Assert.Equal(4, groupInfoList[0].Lag);
Assert.Equal(0, groupInfoList[0].EntriesRead);
Assert.Equal(4, groupInfoList[1].Lag);

// Read a single message into the first consumer.
db.StreamReadGroup(key, group1, consumer1, count: 1);

// Read the remaining messages into the second consumer.
db.StreamReadGroup(key, group2, consumer2);

var groupInfoList = db.StreamGroupInfo(key);
groupInfoList = db.StreamGroupInfo(key);

Assert.NotNull(groupInfoList);
Assert.Equal(2, groupInfoList.Length);

Assert.Equal(group1, groupInfoList[0].Name);
Assert.Equal(1, groupInfoList[0].PendingMessageCount);
Assert.True(IsMessageId(groupInfoList[0].LastDeliveredId)); // can't test actual - will vary
Assert.Equal(1, groupInfoList[0].EntriesRead);
Assert.Equal(3, groupInfoList[0].Lag);

Assert.Equal(group2, groupInfoList[1].Name);
Assert.Equal(4, groupInfoList[1].PendingMessageCount);
Assert.True(IsMessageId(groupInfoList[1].LastDeliveredId)); // can't test actual - will vary
Assert.Equal(4, groupInfoList[1].EntriesRead);
Assert.Equal(0, groupInfoList[1].Lag);
}

static bool IsMessageId(string? value)
Expand Down

0 comments on commit 4cf2013

Please sign in to comment.