mirror of https://mirror.osredm.com/root/redis.git
Add tests for blocking XREAD[GROUP] when the stream ran dry (#10035)
The purpose of this commit is to add some tests to cover #5299, which was fixed in #5300 but without tests. This commit should close #5306 and #5299.
This commit is contained in:
parent
8deb9a4f1e
commit
b7f9e9ae39
|
@ -205,11 +205,87 @@ start_server {
|
||||||
$rd close
|
$rd close
|
||||||
}
|
}
|
||||||
|
|
||||||
|
test {Blocking XREADGROUP for stream that ran dry (issue #5299)} {
|
||||||
|
set rd [redis_deferring_client]
|
||||||
|
|
||||||
|
# Add a entry then delete it, now stream's last_id is 666.
|
||||||
|
r DEL mystream
|
||||||
|
r XGROUP CREATE mystream mygroup $ MKSTREAM
|
||||||
|
r XADD mystream 666 key value
|
||||||
|
r XDEL mystream 666
|
||||||
|
|
||||||
|
# Pass a special `>` ID but without new entry, released on timeout.
|
||||||
|
$rd XREADGROUP GROUP mygroup myconsumer BLOCK 10 STREAMS mystream >
|
||||||
|
assert_equal [$rd read] {}
|
||||||
|
|
||||||
|
# Throw an error if the ID equal or smaller than the last_id.
|
||||||
|
assert_error ERR*equal*smaller* {r XADD mystream 665 key value}
|
||||||
|
assert_error ERR*equal*smaller* {r XADD mystream 666 key value}
|
||||||
|
|
||||||
|
# Entered blocking state and then release because of the new entry.
|
||||||
|
$rd XREADGROUP GROUP mygroup myconsumer BLOCK 0 STREAMS mystream >
|
||||||
|
wait_for_blocked_clients_count 1
|
||||||
|
r XADD mystream 667 key value
|
||||||
|
assert_equal [$rd read] {{mystream {{667-0 {key value}}}}}
|
||||||
|
|
||||||
|
$rd close
|
||||||
|
}
|
||||||
|
|
||||||
|
test "Blocking XREADGROUP will ignore BLOCK if ID is not >" {
|
||||||
|
set rd [redis_deferring_client]
|
||||||
|
|
||||||
|
# Add a entry then delete it, now stream's last_id is 666.
|
||||||
|
r DEL mystream
|
||||||
|
r XGROUP CREATE mystream mygroup $ MKSTREAM
|
||||||
|
r XADD mystream 666 key value
|
||||||
|
r XDEL mystream 666
|
||||||
|
|
||||||
|
# Return right away instead of blocking, return the stream with an
|
||||||
|
# empty list instead of NIL if the ID specified is not the special `>` ID.
|
||||||
|
foreach id {0 600 666 700} {
|
||||||
|
$rd XREADGROUP GROUP mygroup myconsumer BLOCK 0 STREAMS mystream $id
|
||||||
|
assert_equal [$rd read] {{mystream {}}}
|
||||||
|
}
|
||||||
|
|
||||||
|
# After adding a new entry, `XREADGROUP BLOCK` still return the stream
|
||||||
|
# with an empty list because the pending list is empty.
|
||||||
|
r XADD mystream 667 key value
|
||||||
|
foreach id {0 600 666 667 700} {
|
||||||
|
$rd XREADGROUP GROUP mygroup myconsumer BLOCK 0 STREAMS mystream $id
|
||||||
|
assert_equal [$rd read] {{mystream {}}}
|
||||||
|
}
|
||||||
|
|
||||||
|
# After we read it once, the pending list is not empty at this time,
|
||||||
|
# pass any ID smaller than 667 will return one of the pending entry.
|
||||||
|
set res [r XREADGROUP GROUP mygroup myconsumer BLOCK 0 STREAMS mystream >]
|
||||||
|
assert_equal $res {{mystream {{667-0 {key value}}}}}
|
||||||
|
foreach id {0 600 666} {
|
||||||
|
$rd XREADGROUP GROUP mygroup myconsumer BLOCK 0 STREAMS mystream $id
|
||||||
|
assert_equal [$rd read] {{mystream {{667-0 {key value}}}}}
|
||||||
|
}
|
||||||
|
|
||||||
|
# Pass ID equal or greater than 667 will return the stream with an empty list.
|
||||||
|
foreach id {667 700} {
|
||||||
|
$rd XREADGROUP GROUP mygroup myconsumer BLOCK 0 STREAMS mystream $id
|
||||||
|
assert_equal [$rd read] {{mystream {}}}
|
||||||
|
}
|
||||||
|
|
||||||
|
# After we ACK the pending entry, return the stream with an empty list.
|
||||||
|
r XACK mystream mygroup 667
|
||||||
|
foreach id {0 600 666 667 700} {
|
||||||
|
$rd XREADGROUP GROUP mygroup myconsumer BLOCK 0 STREAMS mystream $id
|
||||||
|
assert_equal [$rd read] {{mystream {}}}
|
||||||
|
}
|
||||||
|
|
||||||
|
$rd close
|
||||||
|
}
|
||||||
|
|
||||||
test {XGROUP DESTROY should unblock XREADGROUP with -NOGROUP} {
|
test {XGROUP DESTROY should unblock XREADGROUP with -NOGROUP} {
|
||||||
r del mystream
|
r del mystream
|
||||||
r XGROUP CREATE mystream mygroup $ MKSTREAM
|
r XGROUP CREATE mystream mygroup $ MKSTREAM
|
||||||
set rd [redis_deferring_client]
|
set rd [redis_deferring_client]
|
||||||
$rd XREADGROUP GROUP mygroup Alice BLOCK 100 STREAMS mystream ">"
|
$rd XREADGROUP GROUP mygroup Alice BLOCK 0 STREAMS mystream ">"
|
||||||
|
wait_for_blocked_clients_count 1
|
||||||
r XGROUP DESTROY mystream mygroup
|
r XGROUP DESTROY mystream mygroup
|
||||||
assert_error "*NOGROUP*" {$rd read}
|
assert_error "*NOGROUP*" {$rd read}
|
||||||
$rd close
|
$rd close
|
||||||
|
@ -220,6 +296,7 @@ start_server {
|
||||||
r XGROUP CREATE mystream{t} mygroup $ MKSTREAM
|
r XGROUP CREATE mystream{t} mygroup $ MKSTREAM
|
||||||
set rd [redis_deferring_client]
|
set rd [redis_deferring_client]
|
||||||
$rd XREADGROUP GROUP mygroup Alice BLOCK 0 STREAMS mystream{t} ">"
|
$rd XREADGROUP GROUP mygroup Alice BLOCK 0 STREAMS mystream{t} ">"
|
||||||
|
wait_for_blocked_clients_count 1
|
||||||
r XGROUP CREATE mystream2{t} mygroup $ MKSTREAM
|
r XGROUP CREATE mystream2{t} mygroup $ MKSTREAM
|
||||||
r XADD mystream2{t} 100 f1 v1
|
r XADD mystream2{t} 100 f1 v1
|
||||||
r RENAME mystream2{t} mystream{t}
|
r RENAME mystream2{t} mystream{t}
|
||||||
|
@ -232,6 +309,7 @@ start_server {
|
||||||
r XGROUP CREATE mystream{t} mygroup $ MKSTREAM
|
r XGROUP CREATE mystream{t} mygroup $ MKSTREAM
|
||||||
set rd [redis_deferring_client]
|
set rd [redis_deferring_client]
|
||||||
$rd XREADGROUP GROUP mygroup Alice BLOCK 0 STREAMS mystream{t} ">"
|
$rd XREADGROUP GROUP mygroup Alice BLOCK 0 STREAMS mystream{t} ">"
|
||||||
|
wait_for_blocked_clients_count 1
|
||||||
r XADD mystream2{t} 100 f1 v1
|
r XADD mystream2{t} 100 f1 v1
|
||||||
r RENAME mystream2{t} mystream{t}
|
r RENAME mystream2{t} mystream{t}
|
||||||
assert_error "*NOGROUP*" {$rd read} ;# mystream2{t} didn't have mygroup before RENAME
|
assert_error "*NOGROUP*" {$rd read} ;# mystream2{t} didn't have mygroup before RENAME
|
||||||
|
@ -560,6 +638,7 @@ start_server {
|
||||||
r XREADGROUP GROUP mygroup Alice NOACK STREAMS mystream ">"
|
r XREADGROUP GROUP mygroup Alice NOACK STREAMS mystream ">"
|
||||||
set rd [redis_deferring_client]
|
set rd [redis_deferring_client]
|
||||||
$rd XREADGROUP GROUP mygroup Bob BLOCK 0 NOACK STREAMS mystream ">"
|
$rd XREADGROUP GROUP mygroup Bob BLOCK 0 NOACK STREAMS mystream ">"
|
||||||
|
wait_for_blocked_clients_count 1
|
||||||
r XADD mystream * f2 v2
|
r XADD mystream * f2 v2
|
||||||
set grpinfo [r xinfo groups mystream]
|
set grpinfo [r xinfo groups mystream]
|
||||||
|
|
||||||
|
@ -580,6 +659,7 @@ start_server {
|
||||||
r XREADGROUP GROUP mygroup Alice NOACK STREAMS mystream ">"
|
r XREADGROUP GROUP mygroup Alice NOACK STREAMS mystream ">"
|
||||||
set rd [redis_deferring_client]
|
set rd [redis_deferring_client]
|
||||||
$rd XREADGROUP GROUP mygroup Bob BLOCK 0 NOACK STREAMS mystream ">"
|
$rd XREADGROUP GROUP mygroup Bob BLOCK 0 NOACK STREAMS mystream ">"
|
||||||
|
wait_for_blocked_clients_count 1
|
||||||
r XGROUP CREATECONSUMER mystream mygroup Charlie
|
r XGROUP CREATECONSUMER mystream mygroup Charlie
|
||||||
set grpinfo [lindex [r xinfo groups mystream] 0]
|
set grpinfo [lindex [r xinfo groups mystream] 0]
|
||||||
|
|
||||||
|
|
|
@ -354,6 +354,31 @@ start_server {
|
||||||
$rd close
|
$rd close
|
||||||
}
|
}
|
||||||
|
|
||||||
|
test "Blocking XREAD for stream that ran dry (issue #5299)" {
|
||||||
|
set rd [redis_deferring_client]
|
||||||
|
|
||||||
|
# Add a entry then delete it, now stream's last_id is 666.
|
||||||
|
r DEL mystream
|
||||||
|
r XADD mystream 666 key value
|
||||||
|
r XDEL mystream 666
|
||||||
|
|
||||||
|
# Pass a ID smaller than stream's last_id, released on timeout.
|
||||||
|
$rd XREAD BLOCK 10 STREAMS mystream 665
|
||||||
|
assert_equal [$rd read] {}
|
||||||
|
|
||||||
|
# Throw an error if the ID equal or smaller than the last_id.
|
||||||
|
assert_error ERR*equal*smaller* {r XADD mystream 665 key value}
|
||||||
|
assert_error ERR*equal*smaller* {r XADD mystream 666 key value}
|
||||||
|
|
||||||
|
# Entered blocking state and then release because of the new entry.
|
||||||
|
$rd XREAD BLOCK 0 STREAMS mystream 665
|
||||||
|
wait_for_blocked_clients_count 1
|
||||||
|
r XADD mystream 667 key value
|
||||||
|
assert_equal [$rd read] {{mystream {{667-0 {key value}}}}}
|
||||||
|
|
||||||
|
$rd close
|
||||||
|
}
|
||||||
|
|
||||||
test "XREAD: XADD + DEL should not awake client" {
|
test "XREAD: XADD + DEL should not awake client" {
|
||||||
set rd [redis_deferring_client]
|
set rd [redis_deferring_client]
|
||||||
r del s1
|
r del s1
|
||||||
|
|
Loading…
Reference in New Issue