mirror of https://mirror.osredm.com/root/redis.git
Fix wrong behavior of XREAD + after last entry of stream have been removed (#13632)
Close #13628 This PR changes behavior of special `+` id of XREAD command. Now it uses `streamLastValidID` to find last entry instead of `last_id` field of stream object. This PR adds test for the issue. **Notes** Initial idea to update `last_id` while executing XDEL seems to be wrong. `last_id` is used to strore last generated id and not id of last entry. --------- Co-authored-by: debing.sun <debing.sun@redis.com> Co-authored-by: guybe7 <guy.benoish@redislabs.com>
This commit is contained in:
parent
985bf68f34
commit
33f03f6fc8
|
@ -2300,14 +2300,13 @@ void xreadCommand(client *c) {
|
||||||
"just return an empty result set.");
|
"just return an empty result set.");
|
||||||
goto cleanup;
|
goto cleanup;
|
||||||
}
|
}
|
||||||
if (o) {
|
if (o && ((stream *)o->ptr)->length) {
|
||||||
stream *s = o->ptr;
|
stream *s = o->ptr;
|
||||||
ids[id_idx] = s->last_id;
|
/* We need to get the last valid ID.
|
||||||
if (streamDecrID(&ids[id_idx]) != C_OK) {
|
* It is impossible to use s->last_id because
|
||||||
/* shouldn't happen */
|
* entry with s->last_id may have been removed. */
|
||||||
addReplyError(c,"the stream last element ID is 0-0");
|
streamLastValidID(s, &ids[id_idx]);
|
||||||
goto cleanup;
|
streamDecrID(&ids[id_idx]);
|
||||||
}
|
|
||||||
} else {
|
} else {
|
||||||
ids[id_idx].ms = 0;
|
ids[id_idx].ms = 0;
|
||||||
ids[id_idx].seq = 0;
|
ids[id_idx].seq = 0;
|
||||||
|
|
|
@ -435,6 +435,17 @@ start_server {
|
||||||
|
|
||||||
# verify nil is still received when reading last entry
|
# verify nil is still received when reading last entry
|
||||||
assert_equal [r XREAD STREAMS lestream +] {}
|
assert_equal [r XREAD STREAMS lestream +] {}
|
||||||
|
|
||||||
|
# case when stream created empty
|
||||||
|
|
||||||
|
# make sure the stream is not initialized
|
||||||
|
r DEL lestream
|
||||||
|
|
||||||
|
# create empty stream with XGROUP CREATE
|
||||||
|
r XGROUP CREATE lestream legroup $ MKSTREAM
|
||||||
|
|
||||||
|
# verify nil is received when reading last entry
|
||||||
|
assert_equal [r XREAD STREAMS lestream +] {}
|
||||||
}
|
}
|
||||||
|
|
||||||
test {XREAD last element blocking from empty stream} {
|
test {XREAD last element blocking from empty stream} {
|
||||||
|
@ -510,6 +521,22 @@ start_server {
|
||||||
assert_equal $res {{lestream {{3-0 {k3 v3}}}}}
|
assert_equal $res {{lestream {{3-0 {k3 v3}}}}}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
test "XREAD: read last element after XDEL (issue #13628)" {
|
||||||
|
# Should return actual last element after XDEL of current last element
|
||||||
|
|
||||||
|
# Add 2 entries to a stream and delete last one
|
||||||
|
r DEL stream
|
||||||
|
r XADD stream 1-0 f 1
|
||||||
|
r XADD stream 2-0 f 2
|
||||||
|
r XDEL stream 2-0
|
||||||
|
|
||||||
|
# Read last entry
|
||||||
|
set res [r XREAD STREAMS stream +]
|
||||||
|
|
||||||
|
# Verify the last entry was read
|
||||||
|
assert_equal $res {{stream {{1-0 {f 1}}}}}
|
||||||
|
}
|
||||||
|
|
||||||
test "XREAD: XADD + DEL should not awake client" {
|
test "XREAD: XADD + DEL should not awake client" {
|
||||||
set rd [redis_deferring_client]
|
set rd [redis_deferring_client]
|
||||||
r del s1
|
r del s1
|
||||||
|
|
Loading…
Reference in New Issue