From 0e1de78fca849c135fd00cd85b5b87920e46e50d Mon Sep 17 00:00:00 2001 From: guybe7 Date: Mon, 6 May 2024 16:55:42 +0800 Subject: [PATCH] XREADGROUP from PEL should not affect server.dirty (#13251) Because it does not cause any propagation (arguably it should, see the comment in the tcl file) The motivation for this fix is that in 6.2 if dirty changed without propagation inside MULTI/EXEC it would cause propagation of EXEC only, which would result in the replica sending errors to its master --- src/stream.h | 2 +- src/t_stream.c | 28 ++++++----- tests/unit/type/stream-cgroups.tcl | 75 ++++++++++++++++++++++++++++++ 3 files changed, 93 insertions(+), 12 deletions(-) diff --git a/src/stream.h b/src/stream.h index bfc165440..146be3b12 100644 --- a/src/stream.h +++ b/src/stream.h @@ -116,7 +116,7 @@ struct client; stream *streamNew(void); void freeStream(stream *s); unsigned long streamLength(const robj *subject); -size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end, size_t count, int rev, streamCG *group, streamConsumer *consumer, int flags, streamPropInfo *spi); +size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end, size_t count, int rev, streamCG *group, streamConsumer *consumer, int flags, streamPropInfo *spi, unsigned long *propCount); void streamIteratorStart(streamIterator *si, stream *s, streamID *start, streamID *end, int rev); int streamIteratorGetID(streamIterator *si, streamID *id, int64_t *numfields); void streamIteratorGetField(streamIterator *si, unsigned char **fieldptr, unsigned char **valueptr, int64_t *fieldlen, int64_t *valuelen); diff --git a/src/t_stream.c b/src/t_stream.c index 2529cab08..478d75c5c 100644 --- a/src/t_stream.c +++ b/src/t_stream.c @@ -1657,7 +1657,7 @@ void streamPropagateConsumerCreation(client *c, robj *key, robj *groupname, sds #define STREAM_RWR_RAWENTRIES (1<<1) /* Do not emit protocol for array boundaries, just the entries. */ #define STREAM_RWR_HISTORY (1<<2) /* Only serve consumer local PEL. */ -size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end, size_t count, int rev, streamCG *group, streamConsumer *consumer, int flags, streamPropInfo *spi) { +size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end, size_t count, int rev, streamCG *group, streamConsumer *consumer, int flags, streamPropInfo *spi, unsigned long *propCount) { void *arraylen_ptr = NULL; size_t arraylen = 0; streamIterator si; @@ -1666,6 +1666,8 @@ size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end int propagate_last_id = 0; int noack = flags & STREAM_RWR_NOACK; + if (propCount) *propCount = 0; + /* If the client is asking for some history, we serve it using a * different function, so that we return entries *solely* from its * own PEL. This ensures each consumer will always and only see @@ -1764,6 +1766,7 @@ size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end robj *idarg = createObjectFromStreamID(&id); streamPropagateXCLAIM(c,spi->keyname,group,spi->groupname,idarg,nack); decrRefCount(idarg); + if (propCount) (*propCount)++; } } @@ -1771,8 +1774,10 @@ size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end if (count && count == arraylen) break; } - if (spi && propagate_last_id) + if (spi && propagate_last_id) { streamPropagateGroupID(c,spi->keyname,group,spi->groupname); + if (propCount) (*propCount)++; + } streamIteratorStop(&si); if (arraylen_ptr) setDeferredArrayLen(c,arraylen_ptr,arraylen); @@ -1808,7 +1813,7 @@ size_t streamReplyWithRangeFromConsumerPEL(client *c, stream *s, streamID *start streamID thisid; streamDecodeID(ri.key,&thisid); if (streamReplyWithRange(c,s,&thisid,&thisid,1,0,NULL,NULL, - STREAM_RWR_RAWENTRIES,NULL) == 0) + STREAM_RWR_RAWENTRIES,NULL,NULL) == 0) { /* Note that we may have a not acknowledged entry in the PEL * about a message that's no longer here because was removed @@ -2124,7 +2129,7 @@ void xrangeGenericCommand(client *c, int rev) { addReplyNullArray(c); } else { if (count == -1) count = 0; - streamReplyWithRange(c,s,&startid,&endid,count,rev,NULL,NULL,0,NULL); + streamReplyWithRange(c,s,&startid,&endid,count,rev,NULL,NULL,0,NULL,NULL); } } @@ -2386,12 +2391,13 @@ void xreadCommand(client *c) { addReplyBulk(c,c->argv[streams_arg+i]); int flags = 0; + unsigned long propCount = 0; if (noack) flags |= STREAM_RWR_NOACK; if (serve_history) flags |= STREAM_RWR_HISTORY; streamReplyWithRange(c,s,&start,NULL,count,0, groups ? groups[i] : NULL, - consumer, flags, &spi); - if (groups) server.dirty++; + consumer, flags, &spi, &propCount); + if (propCount) server.dirty++; } } @@ -3298,7 +3304,7 @@ void xclaimCommand(client *c) { if (justid) { addReplyStreamID(c,&id); } else { - serverAssert(streamReplyWithRange(c,o->ptr,&id,&id,1,0,NULL,NULL,STREAM_RWR_RAWENTRIES,NULL) == 1); + serverAssert(streamReplyWithRange(c,o->ptr,&id,&id,1,0,NULL,NULL,STREAM_RWR_RAWENTRIES,NULL,NULL) == 1); } arraylen++; @@ -3473,7 +3479,7 @@ void xautoclaimCommand(client *c) { if (justid) { addReplyStreamID(c,&id); } else { - serverAssert(streamReplyWithRange(c,o->ptr,&id,&id,1,0,NULL,NULL,STREAM_RWR_RAWENTRIES,NULL) == 1); + serverAssert(streamReplyWithRange(c,o->ptr,&id,&id,1,0,NULL,NULL,STREAM_RWR_RAWENTRIES,NULL,NULL) == 1); } arraylen++; count--; @@ -3697,18 +3703,18 @@ void xinfoReplyWithStreamInfo(client *c, stream *s) { end.ms = end.seq = UINT64_MAX; addReplyBulkCString(c,"first-entry"); emitted = streamReplyWithRange(c,s,&start,&end,1,0,NULL,NULL, - STREAM_RWR_RAWENTRIES,NULL); + STREAM_RWR_RAWENTRIES,NULL,NULL); if (!emitted) addReplyNull(c); addReplyBulkCString(c,"last-entry"); emitted = streamReplyWithRange(c,s,&start,&end,1,1,NULL,NULL, - STREAM_RWR_RAWENTRIES,NULL); + STREAM_RWR_RAWENTRIES,NULL,NULL); if (!emitted) addReplyNull(c); } else { /* XINFO STREAM FULL [COUNT ] */ /* Stream entries */ addReplyBulkCString(c,"entries"); - streamReplyWithRange(c,s,NULL,NULL,count,0,NULL,NULL,0,NULL); + streamReplyWithRange(c,s,NULL,NULL,count,0,NULL,NULL,0,NULL,NULL); /* Consumer groups */ addReplyBulkCString(c,"groups"); diff --git a/tests/unit/type/stream-cgroups.tcl b/tests/unit/type/stream-cgroups.tcl index 9b457dc67..2462a25ba 100644 --- a/tests/unit/type/stream-cgroups.tcl +++ b/tests/unit/type/stream-cgroups.tcl @@ -1009,6 +1009,68 @@ start_server { assert_error "*NOGROUP*" {r XGROUP CREATECONSUMER mystream mygroup consumer} } + test {XREADGROUP of multiple entries changes dirty by one} { + r DEL x + r XADD x 1-0 data a + r XADD x 2-0 data b + r XADD x 3-0 data c + r XADD x 4-0 data d + r XGROUP CREATE x g1 0 + r XGROUP CREATECONSUMER x g1 Alice + + set dirty [s rdb_changes_since_last_save] + set res [r XREADGROUP GROUP g1 Alice COUNT 2 STREAMS x ">"] + assert_equal $res {{x {{1-0 {data a}} {2-0 {data b}}}}} + set dirty2 [s rdb_changes_since_last_save] + assert {$dirty2 == $dirty + 1} + + set dirty [s rdb_changes_since_last_save] + set res [r XREADGROUP GROUP g1 Alice NOACK COUNT 2 STREAMS x ">"] + assert_equal $res {{x {{3-0 {data c}} {4-0 {data d}}}}} + set dirty2 [s rdb_changes_since_last_save] + assert {$dirty2 == $dirty + 1} + } + + test {XREADGROUP from PEL does not change dirty} { + # Techinally speaking, XREADGROUP from PEL should cause propagation + # because it change the delivery count/time + # It was decided that this metadata changes are too insiginificant + # to justify propagation + # This test covers that. + r DEL x + r XADD x 1-0 data a + r XADD x 2-0 data b + r XADD x 3-0 data c + r XADD x 4-0 data d + r XGROUP CREATE x g1 0 + r XGROUP CREATECONSUMER x g1 Alice + + set res [r XREADGROUP GROUP g1 Alice COUNT 2 STREAMS x ">"] + assert_equal $res {{x {{1-0 {data a}} {2-0 {data b}}}}} + + set dirty [s rdb_changes_since_last_save] + set res [r XREADGROUP GROUP g1 Alice COUNT 2 STREAMS x 0] + assert_equal $res {{x {{1-0 {data a}} {2-0 {data b}}}}} + set dirty2 [s rdb_changes_since_last_save] + assert {$dirty2 == $dirty} + + set dirty [s rdb_changes_since_last_save] + set res [r XREADGROUP GROUP g1 Alice COUNT 2 STREAMS x 9000] + assert_equal $res {{x {}}} + set dirty2 [s rdb_changes_since_last_save] + assert {$dirty2 == $dirty} + + # The current behavior is that we create the consumer (causes dirty++) even + # if we onlyneed to read from PEL. + # It feels like we shouldn't create the consumer in that case, but I added + # this test just for coverage of current behavior + set dirty [s rdb_changes_since_last_save] + set res [r XREADGROUP GROUP g1 noconsumer COUNT 2 STREAMS x 0] + assert_equal $res {{x {}}} + set dirty2 [s rdb_changes_since_last_save] + assert {$dirty2 == $dirty + 1} + } + start_server {tags {"stream needs:debug"} overrides {appendonly yes aof-use-rdb-preamble no appendfsync always}} { test {XREADGROUP with NOACK creates consumer} { r del mystream @@ -1329,6 +1391,19 @@ start_server { assert_equal [dict get $group entries-read] 3 assert_equal [dict get $group lag] 0 } + + test {XREADGROUP from PEL inside MULTI} { + # This scenario used to cause propagation of EXEC without MULTI in 6.2 + $replica config set propagation-error-behavior panic + $master del mystream + $master xadd mystream 1-0 a b c d e f + $master xgroup create mystream mygroup 0 + $master xreadgroup group mygroup ryan count 1 streams mystream > + $master multi + $master xreadgroup group mygroup ryan count 1 streams mystream 0 + set reply [$master exec] + assert_equal $reply {{{mystream {{1-0 {a b c d e f}}}}}} + } } start_server {tags {"stream needs:debug"} overrides {appendonly yes aof-use-rdb-preamble no}} {