diff --git a/src/t_stream.c b/src/t_stream.c index 39d73d13b..547b76c3f 100644 --- a/src/t_stream.c +++ b/src/t_stream.c @@ -1460,6 +1460,17 @@ void streamReplyWithCGLag(client *c, stream *s, streamCG *cg) { /* The lag of a newly-initialized stream is 0. */ lag = 0; valid = 1; + } else if (!s->length) { /* All entries deleted, now empty. */ + lag = 0; + valid = 1; + } else if (streamCompareID(&cg->last_id,&s->first_id) < 0 && + streamCompareID(&s->max_deleted_entry_id,&s->first_id) < 0) + { + /* When both the consumer group's last_id and the maximum tombstone are behind + * the stream's first entry, the consumer group's lag will always be equal to + * the number of remainin entries in the stream. */ + lag = s->length; + valid = 1; } else if (cg->entries_read != SCG_INVALID_ENTRIES_READ && !streamRangeHasTombstones(s,&cg->last_id,NULL)) { /* No fragmentation ahead means that the group's logical reads counter * is valid for performing the lag calculation. */ diff --git a/tests/unit/type/stream-cgroups.tcl b/tests/unit/type/stream-cgroups.tcl index 27ad9c338..f88ad61ee 100644 --- a/tests/unit/type/stream-cgroups.tcl +++ b/tests/unit/type/stream-cgroups.tcl @@ -1184,15 +1184,25 @@ start_server { r XREADGROUP GROUP g1 alice STREAMS x > ;# Read one entry r XADD x 2-0 data c r XADD x 3-0 data d - r XDEL x 1-0 r XDEL x 2-0 + # Now the latest tombstone(2-0) is before the first entry(3-0), but there is still - # a tombstone(2-0) after the last_id of the consume group. + # a tombstone(2-0) after the last_id(1-0) of the consume group. set reply [r XINFO STREAM x FULL] set group [lindex [dict get $reply groups] 0] assert_equal [dict get $group entries-read] 1 assert_equal [dict get $group lag] {} + r XDEL x 1-0 + # Although there is a tombstone(2-0) after the consumer group's last_id(1-0), all + # entries before the maximal tombstone have been deleted. This means that both the + # last_id and the largest tombstone are behind the first entry. Therefore, tombstones + # no longer affect the lag, which now reflects the remaining entries in the stream. + set reply [r XINFO STREAM x FULL] + set group [lindex [dict get $reply groups] 0] + assert_equal [dict get $group entries-read] 1 + assert_equal [dict get $group lag] 1 + # Now there is a tombstone(2-0) after the last_id of the consume group, so after consuming # entry(3-0), the group's counter will be invalid. r XREADGROUP GROUP g1 alice STREAMS x > @@ -1202,6 +1212,39 @@ start_server { assert_equal [dict get $group lag] 0 } + test {Consumer group lag with XTRIM} { + r DEL x + r XGROUP CREATE x mygroup $ MKSTREAM + r XADD x 1-0 data a + r XADD x 2-0 data b + r XADD x 3-0 data c + r XADD x 4-0 data d + r XADD x 5-0 data e + r XREADGROUP GROUP mygroup alice COUNT 1 STREAMS x > + + set reply [r XINFO STREAM x FULL] + set group [lindex [dict get $reply groups] 0] + assert_equal [dict get $group entries-read] 1 + assert_equal [dict get $group lag] 4 + + # Although XTRIM doesn't update the `max-deleted-entry-id`, it always updates the + # position of the first entry. When trimming causes the first entry to be behind + # the consumer group's last_id, the consumer group's lag will always be equal to + # the number of remainin entries in the stream. + r XTRIM x MAXLEN 1 + set reply [r XINFO STREAM x FULL] + set group [lindex [dict get $reply groups] 0] + assert_equal [dict get $reply max-deleted-entry-id] "0-0" + assert_equal [dict get $group entries-read] 1 + assert_equal [dict get $group lag] 1 + + # When all the entries were deleted, the lag is always 0. + r XTRIM x MAXLEN 0 + set reply [r XINFO STREAM x FULL] + set group [lindex [dict get $reply groups] 0] + assert_equal [dict get $group lag] 0 + } + test {Loading from legacy (Redis <= v6.2.x, rdb_ver < 10) persistence} { # The payload was DUMPed from a v5 instance after: # XADD x 1-0 data a