diff --git a/src/blocked.c b/src/blocked.c index 4211072ee..4898cdcbf 100644 --- a/src/blocked.c +++ b/src/blocked.c @@ -285,8 +285,8 @@ void serveClientsBlockedOnListKey(robj *o, readyList *rl) { } robj *dstkey = receiver->bpop.target; - int wherefrom = receiver->bpop.listpos.wherefrom; - int whereto = receiver->bpop.listpos.whereto; + int wherefrom = receiver->bpop.blockpos.wherefrom; + int whereto = receiver->bpop.blockpos.whereto; /* Protect receiver->bpop.target, that will be * freed by the next unblockClient() @@ -320,9 +320,9 @@ void serveClientsBlockedOnSortedSetKey(robj *o, readyList *rl) { if (de) { list *clients = dictGetVal(de); int numclients = listLength(clients); - unsigned long zcard = zsetLength(o); + int deleted = 0; - while(numclients-- && zcard) { + while (numclients--) { listNode *clientnode = listFirst(clients); client *receiver = clientnode->value; @@ -333,23 +333,38 @@ void serveClientsBlockedOnSortedSetKey(robj *o, readyList *rl) { continue; } - int where = (receiver->lastcmd && - receiver->lastcmd->proc == bzpopminCommand) - ? ZSET_MIN : ZSET_MAX; + long llen = zsetLength(o); + long count = receiver->bpop.count; + int where = receiver->bpop.blockpos.wherefrom; + int use_nested_array = (receiver->lastcmd && + receiver->lastcmd->proc == bzmpopCommand) + ? 1 : 0; + int reply_nil_when_empty = use_nested_array; + monotime replyTimer; elapsedStart(&replyTimer); - genericZpopCommand(receiver,&rl->key,1,where,1,NULL); + genericZpopCommand(receiver, &rl->key, 1, where, 1, count, use_nested_array, reply_nil_when_empty, &deleted); updateStatsOnUnblock(receiver, 0, elapsedUs(replyTimer)); unblockClient(receiver); - zcard--; /* Replicate the command. */ - robj *argv[2]; + int argc = 2; + robj *argv[3]; argv[0] = where == ZSET_MIN ? shared.zpopmin : shared.zpopmax; argv[1] = rl->key; incrRefCount(rl->key); - propagate(receiver->db->id,argv,2,PROPAGATE_AOF|PROPAGATE_REPL); + if (count != 0) { + /* Replicate it as command with COUNT. */ + robj *count_obj = createStringObjectFromLongLong((count > llen) ? llen : count); + argv[2] = count_obj; + argc++; + } + propagate(receiver->db->id, argv, argc, PROPAGATE_AOF|PROPAGATE_REPL); decrRefCount(argv[1]); + if (count != 0) decrRefCount(argv[2]); + + /* The zset is empty and has been deleted. */ + if (deleted) break; } } } @@ -613,7 +628,7 @@ void handleClientsBlockedOnKeys(void) { * * 'count' for those commands that support the optional count argument. * Otherwise the value is 0. */ -void blockForKeys(client *c, int btype, robj **keys, int numkeys, long count, mstime_t timeout, robj *target, struct listPos *listpos, streamID *ids) { +void blockForKeys(client *c, int btype, robj **keys, int numkeys, long count, mstime_t timeout, robj *target, struct blockPos *blockpos, streamID *ids) { dictEntry *de; list *l; int j; @@ -622,7 +637,7 @@ void blockForKeys(client *c, int btype, robj **keys, int numkeys, long count, ms c->bpop.timeout = timeout; c->bpop.target = target; - if (listpos != NULL) c->bpop.listpos = *listpos; + if (blockpos != NULL) c->bpop.blockpos = *blockpos; if (target != NULL) incrRefCount(target); diff --git a/src/db.c b/src/db.c index 43f51ffdf..20a210ba7 100644 --- a/src/db.c +++ b/src/db.c @@ -1711,6 +1711,16 @@ int blmpopGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult return genericGetKeys(0, 2, 3, 1, argv, argc, result); } +int zmpopGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result) { + UNUSED(cmd); + return genericGetKeys(0, 1, 2, 1, argv, argc, result); +} + +int bzmpopGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result) { + UNUSED(cmd); + return genericGetKeys(0, 2, 3, 1, argv, argc, result); +} + /* Helper function to extract keys from the SORT command. * * SORT ... STORE ... diff --git a/src/server.c b/src/server.c index 72000cc9c..cf0322c18 100644 --- a/src/server.c +++ b/src/server.c @@ -792,6 +792,13 @@ struct redisCommand redisCommandTable[] = { KSPEC_BS_INDEX,.bs.index={1}, KSPEC_FK_RANGE,.fk.range={0,1,0}}}}, + {"zmpop", zmpopCommand,-4, + "write @sortedset", + {{"write", + KSPEC_BS_INDEX,.bs.index={1}, + KSPEC_FK_KEYNUM,.fk.keynum={0,1,1}}}, + zmpopGetKeys}, + {"bzpopmin",bzpopminCommand,-3, "write no-script fast @sortedset @blocking", {{"write", @@ -804,6 +811,13 @@ struct redisCommand redisCommandTable[] = { KSPEC_BS_INDEX,.bs.index={1}, KSPEC_FK_RANGE,.fk.range={-2,1,0}}}}, + {"bzmpop",bzmpopCommand,-5, + "write @sortedset @blocking", + {{"write", + KSPEC_BS_INDEX,.bs.index={2}, + KSPEC_FK_KEYNUM,.fk.keynum={0,1,1}}}, + blmpopGetKeys}, + {"zrandmember",zrandmemberCommand,-2, "read-only random @sortedset", {{"read", diff --git a/src/server.h b/src/server.h index 3e337929f..73dd2a2ce 100644 --- a/src/server.h +++ b/src/server.h @@ -812,12 +812,12 @@ typedef struct blockingState { * operation such as BLPOP or XREAD. Or NULL. */ robj *target; /* The key that should receive the element, * for BLMOVE. */ - struct listPos { + struct blockPos { int wherefrom; /* Where to pop from */ int whereto; /* Where to push to */ - } listpos; /* The positions in the src/dst lists + } blockpos; /* The positions in the src/dst lists/zsets * where we want to pop/push an element - * for BLPOP, BRPOP and BLMOVE. */ + * for BLPOP, BRPOP, BLMOVE and BZMPOP. */ /* BLOCK_STREAM */ size_t xread_count; /* XREAD COUNT option. */ @@ -2343,7 +2343,7 @@ int zsetAdd(robj *zobj, double score, sds ele, int in_flags, int *out_flags, dou long zsetRank(robj *zobj, sds ele, int reverse); int zsetDel(robj *zobj, sds ele); robj *zsetDup(robj *o); -void genericZpopCommand(client *c, robj **keyv, int keyc, int where, int emitkey, robj *countarg); +void genericZpopCommand(client *c, robj **keyv, int keyc, int where, int emitkey, long count, int use_nested_array, int reply_nil_when_empty, int *deleted); sds lpGetObject(unsigned char *sptr); int zslValueGteMin(double value, zrangespec *spec); int zslValueLteMax(double value, zrangespec *spec); @@ -2557,6 +2557,8 @@ int memoryGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult int lcsGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result); int lmpopGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result); int blmpopGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result); +int zmpopGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result); +int bzmpopGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result); unsigned short crc16(const char *buf, int len); @@ -2593,7 +2595,7 @@ int getTimeoutFromObjectOrReply(client *c, robj *object, mstime_t *timeout, int void disconnectAllBlockedClients(void); void handleClientsBlockedOnKeys(void); void signalKeyAsReady(redisDb *db, robj *key, int type); -void blockForKeys(client *c, int btype, robj **keys, int numkeys, long count, mstime_t timeout, robj *target, struct listPos *listpos, streamID *ids); +void blockForKeys(client *c, int btype, robj **keys, int numkeys, long count, mstime_t timeout, robj *target, struct blockPos *blockpos, streamID *ids); void updateStatsOnUnblock(client *c, long blocked_us, long reply_us); /* timeout.c -- Blocked clients timeout and connections timeout. */ @@ -2751,8 +2753,10 @@ void zremrangebyscoreCommand(client *c); void zremrangebylexCommand(client *c); void zpopminCommand(client *c); void zpopmaxCommand(client *c); +void zmpopCommand(client *c); void bzpopminCommand(client *c); void bzpopmaxCommand(client *c); +void bzmpopCommand(client *c); void zrandmemberCommand(client *c); void multiCommand(client *c); void execCommand(client *c); diff --git a/src/t_list.c b/src/t_list.c index 4e15a819d..c1c4248d7 100644 --- a/src/t_list.c +++ b/src/t_list.c @@ -1104,7 +1104,7 @@ void blockingPopGenericCommand(client *c, robj **keys, int numkeys, int where, i } /* If the keys do not exist we must block */ - struct listPos pos = {where}; + struct blockPos pos = {where}; blockForKeys(c,BLOCKED_LIST,keys,numkeys,count,timeout,NULL,&pos,NULL); } @@ -1129,7 +1129,7 @@ void blmoveGenericCommand(client *c, int wherefrom, int whereto, mstime_t timeou addReplyNull(c); } else { /* The list is empty and the client blocks. */ - struct listPos pos = {wherefrom, whereto}; + struct blockPos pos = {wherefrom, whereto}; blockForKeys(c,BLOCKED_LIST,c->argv + 1,1,0,timeout,c->argv[2],&pos,NULL); } } else { diff --git a/src/t_zset.c b/src/t_zset.c index 44d6ff12e..25c48c2be 100644 --- a/src/t_zset.c +++ b/src/t_zset.c @@ -3753,31 +3753,35 @@ void zscanCommand(client *c) { } /* This command implements the generic zpop operation, used by: - * ZPOPMIN, ZPOPMAX, BZPOPMIN and BZPOPMAX. This function is also used - * inside blocked.c in the unblocking stage of BZPOPMIN and BZPOPMAX. + * ZPOPMIN, ZPOPMAX, BZPOPMIN, BZPOPMAX and ZMPOP. This function is also used + * inside blocked.c in the unblocking stage of BZPOPMIN, BZPOPMAX and BZMPOP. * * If 'emitkey' is true also the key name is emitted, useful for the blocking * behavior of BZPOP[MIN|MAX], since we can block into multiple keys. + * Or in ZMPOP/BZMPOP, because we also can take multiple keys. * - * The synchronous version instead does not need to emit the key, but may - * use the 'count' argument to return multiple items if available. */ -void genericZpopCommand(client *c, robj **keyv, int keyc, int where, int emitkey, robj *countarg) { + * 'count' is the number of elements requested to pop, or 0 for plain single pop. + * + * 'use_nested_array' when false it generates a flat array (with or without key name). + * When true, it generates a nested 2 level array of field + score pairs, or 3 level when emitkey is set. + * + * 'reply_nil_when_empty' when true we reply a NIL if we are not able to pop up any elements. + * Like in ZMPOP/BZMPOP we reply with a structured nested array containing key name + * and member + score pairs. In these commands, we reply with null when we have no result. + * Otherwise in ZPOPMIN/ZPOPMAX we reply an empty array by default. + * + * 'deleted' is an optional output argument to get an indication + * if the key got deleted by this function. + * */ +void genericZpopCommand(client *c, robj **keyv, int keyc, int where, int emitkey, + long count, int use_nested_array, int reply_nil_when_empty, int *deleted) { int idx; robj *key = NULL; robj *zobj = NULL; sds ele; double score; - long count = 1; - /* If a count argument as passed, parse it or return an error. */ - if (countarg) { - if (getLongFromObjectOrReply(c,countarg,&count,NULL) != C_OK) - return; - if (count <= 0) { - addReply(c,shared.emptyarray); - return; - } - } + if (deleted) *deleted = 0; /* Check type and break on the first error, otherwise identify candidate. */ idx = 0; @@ -3791,20 +3795,38 @@ void genericZpopCommand(client *c, robj **keyv, int keyc, int where, int emitkey /* No candidate for zpopping, return empty. */ if (!zobj) { - addReply(c,shared.emptyarray); + if (reply_nil_when_empty) { + addReplyNullArray(c); + } else { + addReply(c,shared.emptyarray); + } return; } - void *arraylen_ptr = addReplyDeferredLen(c); long result_count = 0; - /* We emit the key only for the blocking variant. */ - if (emitkey) addReplyBulk(c,key); + /* When count is 0, we need to correct it to 1 for plain single pop. */ + if (count == 0) count = 1; - /* Respond with a single (flat) array in RESP2 or if countarg is not - * provided (returning a single element). In RESP3, when countarg is - * provided, use nested array. */ - int use_nested_array = c->resp > 2 && countarg != NULL; + long llen = zsetLength(zobj); + long rangelen = (count > llen) ? llen : count; + + if (!use_nested_array && !emitkey) { + /* ZPOPMIN/ZPOPMAX with or without COUNT option in RESP2. */ + addReplyArrayLen(c, rangelen * 2); + } else if (use_nested_array && !emitkey) { + /* ZPOPMIN/ZPOPMAX with COUNT option in RESP3. */ + addReplyArrayLen(c, rangelen); + } else if (!use_nested_array && emitkey) { + /* BZPOPMIN/BZPOPMAX in RESP2 and RESP3. */ + addReplyArrayLen(c, rangelen * 2 + 1); + addReplyBulk(c, key); + } else if (use_nested_array && emitkey) { + /* ZMPOP/BZMPOP in RESP2 and RESP3. */ + addReplyArrayLen(c, 2); + addReplyBulk(c, key); + addReplyArrayLen(c, rangelen); + } /* Remove the element. */ do { @@ -3861,64 +3883,114 @@ void genericZpopCommand(client *c, robj **keyv, int keyc, int where, int emitkey addReplyDouble(c,score); sdsfree(ele); ++result_count; + } while(--rangelen); - /* Remove the key, if indeed needed. */ - if (zsetLength(zobj) == 0) { - dbDelete(c->db,key); - notifyKeyspaceEvent(NOTIFY_GENERIC,"del",key,c->db->id); - break; - } - } while(--count); + /* Remove the key, if indeed needed. */ + if (zsetLength(zobj) == 0) { + if (deleted) *deleted = 1; - if (!use_nested_array) { - result_count *= 2; + dbDelete(c->db,key); + notifyKeyspaceEvent(NOTIFY_GENERIC,"del",key,c->db->id); } - setDeferredArrayLen(c,arraylen_ptr,result_count + (emitkey != 0)); + + if (c->cmd->proc == zmpopCommand) { + /* Always replicate it as ZPOP[MIN|MAX] with COUNT option instead of ZMPOP. */ + robj *count_obj = createStringObjectFromLongLong((count > llen) ? llen : count); + rewriteClientCommandVector(c, 3, + (where == ZSET_MAX) ? shared.zpopmax : shared.zpopmin, + key, count_obj); + decrRefCount(count_obj); + } +} + +/* ZPOPMIN/ZPOPMAX key [] */ +void zpopMinMaxCommand(client *c, int where) { + if (c->argc > 3) { + addReplyErrorObject(c,shared.syntaxerr); + return; + } + + long count = 0; /* 0 for plain single pop. */ + if (c->argc == 3) { + if (getLongFromObjectOrReply(c, c->argv[2], &count, NULL) != C_OK) + return; + + if (count <= 0) { + addReply(c, shared.emptyarray); + return; + } + } + + /* Respond with a single (flat) array in RESP2 or if count is 0 + * (returning a single element). In RESP3, when count > 0 use nested array. */ + int use_nested_array = (c->resp > 2 && count != 0); + + genericZpopCommand(c, &c->argv[1], 1, where, 0, count, use_nested_array, 0, NULL); } /* ZPOPMIN key [] */ void zpopminCommand(client *c) { - if (c->argc > 3) { - addReplyErrorObject(c,shared.syntaxerr); - return; - } - genericZpopCommand(c,&c->argv[1],1,ZSET_MIN,0, - c->argc == 3 ? c->argv[2] : NULL); + zpopMinMaxCommand(c, ZSET_MIN); } /* ZMAXPOP key [] */ void zpopmaxCommand(client *c) { - if (c->argc > 3) { - addReplyErrorObject(c,shared.syntaxerr); - return; - } - genericZpopCommand(c,&c->argv[1],1,ZSET_MAX,0, - c->argc == 3 ? c->argv[2] : NULL); + zpopMinMaxCommand(c, ZSET_MAX); } -/* BZPOPMIN / BZPOPMAX actual implementation. */ -void blockingGenericZpopCommand(client *c, int where) { +/* BZPOPMIN, BZPOPMAX, BZMPOP actual implementation. + * + * 'numkeys' is the number of keys. + * + * 'timeout_idx' parameter position of block timeout. + * + * 'where' ZSET_MIN or ZSET_MAX. + * + * 'count' is the number of elements requested to pop, or 0 for plain single pop. + * + * 'use_nested_array' when false it generates a flat array (with or without key name). + * When true, it generates a nested 3 level array of keyname, field + score pairs. + * */ +void blockingGenericZpopCommand(client *c, robj **keys, int numkeys, int where, + int timeout_idx, long count, int use_nested_array, int reply_nil_when_empty) { robj *o; + robj *key; mstime_t timeout; int j; - if (getTimeoutFromObjectOrReply(c,c->argv[c->argc-1],&timeout,UNIT_SECONDS) + if (getTimeoutFromObjectOrReply(c,c->argv[timeout_idx],&timeout,UNIT_SECONDS) != C_OK) return; - for (j = 1; j < c->argc-1; j++) { - o = lookupKeyWrite(c->db,c->argv[j]); + for (j = 0; j < numkeys; j++) { + key = keys[j]; + o = lookupKeyWrite(c->db,key); + /* Non-existing key, move to next key. */ + if (o == NULL) continue; + if (checkType(c,o,OBJ_ZSET)) return; - if (o != NULL) { - if (zsetLength(o) != 0) { - /* Non empty zset, this is like a normal ZPOP[MIN|MAX]. */ - genericZpopCommand(c,&c->argv[j],1,where,1,NULL); - /* Replicate it as an ZPOP[MIN|MAX] instead of BZPOP[MIN|MAX]. */ - rewriteClientCommandVector(c,2, - where == ZSET_MAX ? shared.zpopmax : shared.zpopmin, - c->argv[j]); - return; - } + + long llen = zsetLength(o); + /* Empty zset, move to next key. */ + if (llen == 0) continue; + + /* Non empty zset, this is like a normal ZPOP[MIN|MAX]. */ + genericZpopCommand(c, &key, 1, where, 1, count, use_nested_array, reply_nil_when_empty, NULL); + + if (count == 0) { + /* Replicate it as ZPOP[MIN|MAX] instead of BZPOP[MIN|MAX]. */ + rewriteClientCommandVector(c,2, + (where == ZSET_MAX) ? shared.zpopmax : shared.zpopmin, + key); + } else { + /* Replicate it as ZPOP[MIN|MAX] with COUNT option. */ + robj *count_obj = createStringObjectFromLongLong((count > llen) ? llen : count); + rewriteClientCommandVector(c, 3, + (where == ZSET_MAX) ? shared.zpopmax : shared.zpopmin, + key, count_obj); + decrRefCount(count_obj); } + + return; } /* If we are not allowed to block the client and the zset is empty the only thing @@ -3929,17 +4001,18 @@ void blockingGenericZpopCommand(client *c, int where) { } /* If the keys do not exist we must block */ - blockForKeys(c,BLOCKED_ZSET,c->argv + 1,c->argc - 2,0,timeout,NULL,NULL,NULL); + struct blockPos pos = {where}; + blockForKeys(c,BLOCKED_ZSET,c->argv+1,c->argc-2,count,timeout,NULL,&pos,NULL); } // BZPOPMIN key [key ...] timeout void bzpopminCommand(client *c) { - blockingGenericZpopCommand(c,ZSET_MIN); + blockingGenericZpopCommand(c, c->argv+1, c->argc-2, ZSET_MIN, c->argc-1, 0, 0, 0); } // BZPOPMAX key [key ...] timeout void bzpopmaxCommand(client *c) { - blockingGenericZpopCommand(c,ZSET_MAX); + blockingGenericZpopCommand(c, c->argv+1, c->argc-2, ZSET_MAX, c->argc-1, 0, 0, 0); } static void zarndmemberReplyWithListpack(client *c, unsigned int count, listpackEntry *keys, listpackEntry *vals) { @@ -4189,3 +4262,68 @@ void zrandmemberCommand(client *c) { zsetTypeRandomElement(zset, zsetLength(zset), &ele,NULL); zsetReplyFromListpackEntry(c,&ele); } + +/* ZMPOP/BZMPOP + * + * 'numkeys_idx' parameter position of key number. + * 'is_block' this indicates whether it is a blocking variant. */ +void zmpopGenericCommand(client *c, int numkeys_idx, int is_block) { + long j; + long numkeys = 0; /* Number of keys. */ + int where = 0; /* ZSET_MIN or ZSET_MAX. */ + long count = 1; /* Reply will consist of up to count elements, depending on the zset's length. */ + + /* Parse the numkeys. */ + if (getRangeLongFromObjectOrReply(c, c->argv[numkeys_idx], 1, LONG_MAX, + &numkeys, "numkeys should be greater than 0") != C_OK) + return; + + /* Parse the where. where_idx: the index of where in the c->argv. */ + long where_idx = numkeys_idx + numkeys + 1; + if (where_idx >= c->argc) { + addReplyErrorObject(c, shared.syntaxerr); + return; + } + if (!strcasecmp(c->argv[where_idx]->ptr, "MIN")) { + where = ZSET_MIN; + } else if (!strcasecmp(c->argv[where_idx]->ptr, "MAX")) { + where = ZSET_MAX; + } else { + addReplyErrorObject(c, shared.syntaxerr); + return; + } + + /* Parse the optional arguments. */ + for (j = where_idx + 1; j < c->argc; j++) { + char *opt = c->argv[j]->ptr; + int moreargs = (c->argc - 1) - j; + + if (!strcasecmp(opt, "COUNT") && moreargs) { + j++; + if (getRangeLongFromObjectOrReply(c, c->argv[j], 1, LONG_MAX, + &count,"count should be greater than 0") != C_OK) + return; + } else { + addReplyErrorObject(c, shared.syntaxerr); + return; + } + } + + if (is_block) { + /* BLOCK. We will handle CLIENT_DENY_BLOCKING flag in blockingGenericZpopCommand. */ + blockingGenericZpopCommand(c, c->argv+numkeys_idx+1, numkeys, where, 1, count, 1, 1); + } else { + /* NON-BLOCK */ + genericZpopCommand(c, c->argv+numkeys_idx+1, numkeys, where, 1, count, 1, 1, NULL); + } +} + +/* ZMPOP numkeys [ ...] MIN|MAX [COUNT count] */ +void zmpopCommand(client *c) { + zmpopGenericCommand(c, 1, 0); +} + +/* BZMPOP timeout numkeys [ ...] MIN|MAX [COUNT count] */ +void bzmpopCommand(client *c) { + zmpopGenericCommand(c, 2, 1); +} diff --git a/tests/integration/aof.tcl b/tests/integration/aof.tcl index aff7baacf..a803eab51 100644 --- a/tests/integration/aof.tcl +++ b/tests/integration/aof.tcl @@ -333,7 +333,6 @@ tags {"aof external:skip"} { set client [redis [dict get $srv host] [dict get $srv port] 0 $::tls] set client2 [redis [dict get $srv host] [dict get $srv port] 1 $::tls] wait_done_loading $client - wait_done_loading $client2 # Pop all elements from mylist, should be blmpop delete mylist. $client lmpop 1 mylist left count 1 @@ -368,4 +367,51 @@ tags {"aof external:skip"} { assert_equal 2 [$client llen mylist3] } } + + # Test that ZMPOP/BZMPOP work fine with AOF. + create_aof { + append_to_aof [formatCommand zadd myzset 1 one 2 two 3 three] + append_to_aof [formatCommand zadd myzset2 4 four 5 five 6 six] + append_to_aof [formatCommand zadd myzset3 1 one 2 two 3 three 4 four 5 five] + } + + start_server_aof [list dir $server_path aof-load-truncated no] { + test "AOF+ZMPOP/BZMPOP: pop elements from the zset" { + set client [redis [dict get $srv host] [dict get $srv port] 0 $::tls] + set client2 [redis [dict get $srv host] [dict get $srv port] 1 $::tls] + wait_done_loading $client + + # Pop all elements from myzset, should be bzmpop delete myzset. + $client zmpop 1 myzset min count 1 + $client bzmpop 0 1 myzset min count 10 + + # Pop all elements from myzset2, should be zmpop delete myzset2. + $client bzmpop 0 2 myzset myzset2 max count 10 + $client zmpop 2 myzset myzset2 max count 2 + + # Blocking path, be blocked and then released. + $client2 bzmpop 0 2 myzset myzset2 min count 2 + after 100 + $client zadd myzset2 1 one 2 two 3 three + + # Pop up the last element in myzset2 + $client bzmpop 0 3 myzset myzset2 myzset3 min count 1 + + # Leave two elements in myzset3. + $client bzmpop 0 3 myzset myzset2 myzset3 max count 3 + } + } + + start_server_aof [list dir $server_path aof-load-truncated no] { + test "AOF+ZMPOP/BZMPOP: after pop elements from the zset" { + set client [redis [dict get $srv host] [dict get $srv port] 0 $::tls] + wait_done_loading $client + + # myzset and myzset2 no longer exist. + assert_equal 0 [$client exists myzset myzset2] + + # Length of myzset3 is two. + assert_equal 2 [$client zcard myzset3] + } + } } diff --git a/tests/unit/type/list.tcl b/tests/unit/type/list.tcl index 735c5eb18..41b70ccb9 100644 --- a/tests/unit/type/list.tcl +++ b/tests/unit/type/list.tcl @@ -6,7 +6,7 @@ start_server { } { source "tests/unit/type/list-common.tcl" - # A helper function for BPOP/BLMPOP with one input key. + # A helper function to execute either B*POP or BLMPOP* with one input key. proc bpop_command {rd pop key timeout} { if {$pop == "BLMPOP_LEFT"} { $rd blmpop $timeout 1 $key left count 1 @@ -17,7 +17,7 @@ start_server { } } - # A helper function for BPOP/BLMPOP with two input keys. + # A helper function to execute either B*POP or BLMPOP* with two input keys. proc bpop_command_two_key {rd pop key key2 timeout} { if {$pop == "BLMPOP_LEFT"} { $rd blmpop $timeout 2 $key $key2 left count 1 @@ -719,14 +719,14 @@ foreach {pop} {BLPOP BLMPOP_LEFT} { set rd [redis_deferring_client] set repl [attach_to_replication_stream] - # BLMPOP without block. + # BLMPOP without being blocked. r lpush mylist{t} a b c r rpush mylist2{t} 1 2 3 r blmpop 0 1 mylist{t} left count 1 r blmpop 0 2 mylist{t} mylist2{t} right count 10 r blmpop 0 2 mylist{t} mylist2{t} right count 10 - # BLMPOP with block. + # BLMPOP that gets blocked. $rd blmpop 0 1 mylist{t} left count 1 wait_for_blocked_client r lpush mylist{t} a @@ -737,6 +737,10 @@ foreach {pop} {BLPOP BLMPOP_LEFT} { wait_for_blocked_client r rpush mylist2{t} a b c + # Released on timeout. + assert_equal {} [r blmpop 0.01 1 mylist{t} left count 10] + r set foo{t} bar ;# something else to propagate after, so we can make sure the above pop didn't. + assert_replication_stream $repl { {select *} {lpush mylist{t} a b c} @@ -750,6 +754,7 @@ foreach {pop} {BLPOP BLMPOP_LEFT} { {lpop mylist{t} 3} {rpush mylist2{t} a b c} {rpop mylist2{t} 3} + {set foo{t} bar} } } {} {needs:repl} diff --git a/tests/unit/type/zset.tcl b/tests/unit/type/zset.tcl index 2ccb19175..852a15464 100644 --- a/tests/unit/type/zset.tcl +++ b/tests/unit/type/zset.tcl @@ -6,6 +6,96 @@ start_server {tags {"zset"}} { } } + # A helper function to verify either ZPOP* or ZMPOP* response. + proc verify_pop_response {pop res zpop_expected_response zmpop_expected_response} { + if {[string match "*ZM*" $pop]} { + assert_equal $res $zmpop_expected_response + } else { + assert_equal $res $zpop_expected_response + } + } + + # A helper function to verify either ZPOP* or ZMPOP* response when given one input key. + proc verify_zpop_response {rd pop key count zpop_expected_response zmpop_expected_response} { + if {[string match "ZM*" $pop]} { + lassign [split $pop "_"] pop where + + if {$count == 0} { + set res [$rd $pop 1 $key $where] + } else { + set res [$rd $pop 1 $key $where COUNT $count] + } + } else { + if {$count == 0} { + set res [$rd $pop $key] + } else { + set res [$rd $pop $key $count] + } + } + verify_pop_response $pop $res $zpop_expected_response $zmpop_expected_response + } + + # A helper function to verify either BZPOP* or BZMPOP* response when given one input key. + proc verify_bzpop_response {rd pop key timeout count bzpop_expected_response bzmpop_expected_response} { + if {[string match "BZM*" $pop]} { + lassign [split $pop "_"] pop where + + if {$count == 0} { + $rd $pop $timeout 1 $key $where + } else { + $rd $pop $timeout 1 $key $where COUNT $count + } + } else { + $rd $pop $key $timeout + } + verify_pop_response $pop [$rd read] $bzpop_expected_response $bzmpop_expected_response + } + + # A helper function to verify either ZPOP* or ZMPOP* response when given two input keys. + proc verify_bzpop_two_key_response {rd pop key key2 timeout count bzpop_expected_response bzmpop_expected_response} { + if {[string match "BZM*" $pop]} { + lassign [split $pop "_"] pop where + + if {$count == 0} { + $rd $pop $timeout 2 $key $key2 $where + } else { + $rd $pop $timeout 2 $key $key2 $where COUNT $count + } + } else { + $rd $pop $key $key2 $timeout + } + verify_pop_response $pop [$rd read] $bzpop_expected_response $bzmpop_expected_response + } + + # A helper function to execute either BZPOP* or BZMPOP* with one input key. + proc bzpop_command {rd pop key timeout} { + if {[string match "BZM*" $pop]} { + lassign [split $pop "_"] pop where + $rd $pop $timeout 1 $key $where COUNT 1 + } else { + $rd $pop $key $timeout + } + } + + # A helper function to verify nil response in readraw base on RESP version. + proc verify_nil_response {resp nil_response} { + if {$resp == 2} { + assert_equal $nil_response {*-1} + } elseif {$resp == 3} { + assert_equal $nil_response {_} + } + } + + # A helper function to verify zset score response in readraw base on RESP version. + proc verify_score_response {rd resp score} { + if {$resp == 2} { + assert_equal [$rd read] {$1} + assert_equal [$rd read] $score + } elseif {$resp == 3} { + assert_equal [$rd read] ",$score" + } + } + proc basics {encoding} { set original_max_entries [lindex [r config get zset-max-ziplist-entries] 1] set original_max_value [lindex [r config get zset-max-ziplist-value] 1] @@ -509,10 +599,10 @@ start_server {tags {"zset"}} { assert_equal {} [r zrevrangebylex zset \[elez \[elex] assert_equal {} [r zrevrangebylex zset (hill (omega] } - + test "ZLEXCOUNT advanced - $encoding" { create_default_lex_zset - + assert_equal 9 [r zlexcount zset - +] assert_equal 0 [r zlexcount zset + -] assert_equal 0 [r zlexcount zset + \[c] @@ -913,110 +1003,103 @@ start_server {tags {"zset"}} { } } - test "Basic ZPOP with a single key - $encoding" { + foreach {popmin popmax} {ZPOPMIN ZPOPMAX ZMPOP_MIN ZMPOP_MAX} { + test "Basic $popmin/$popmax with a single key - $encoding" { r del zset - assert_equal {} [r zpopmin zset] + verify_zpop_response r $popmin zset 0 {} {} + create_zset zset {-1 a 1 b 2 c 3 d 4 e} - assert_equal {a -1} [r zpopmin zset] - assert_equal {b 1} [r zpopmin zset] - assert_equal {e 4} [r zpopmax zset] - assert_equal {d 3} [r zpopmax zset] - assert_equal {c 2} [r zpopmin zset] + verify_zpop_response r $popmin zset 0 {a -1} {zset {{a -1}}} + verify_zpop_response r $popmin zset 0 {b 1} {zset {{b 1}}} + verify_zpop_response r $popmax zset 0 {e 4} {zset {{e 4}}} + verify_zpop_response r $popmax zset 0 {d 3} {zset {{d 3}}} + verify_zpop_response r $popmin zset 0 {c 2} {zset {{c 2}}} assert_equal 0 [r exists zset] - r set foo bar - assert_error "*WRONGTYPE*" {r zpopmin foo} } - test "ZPOP with count - $encoding" { + test "$popmin/$popmax with count - $encoding" { r del z1 - r del z2 - r del z3 - r del foo - r set foo bar - assert_equal {} [r zpopmin z1 2] - assert_error "*WRONGTYPE*" {r zpopmin foo 2} + verify_zpop_response r $popmin z1 2 {} {} + create_zset z1 {0 a 1 b 2 c 3 d} - assert_equal {a 0 b 1} [r zpopmin z1 2] - assert_equal {d 3 c 2} [r zpopmax z1 2] + verify_zpop_response r $popmin z1 2 {a 0 b 1} {z1 {{a 0} {b 1}}} + verify_zpop_response r $popmax z1 2 {d 3 c 2} {z1 {{d 3} {c 2}}} } + } - test "BZPOP with a single existing sorted set - $encoding" { + foreach {popmin popmax} {BZPOPMIN BZPOPMAX BZMPOP_MIN BZMPOP_MAX} { + test "$popmin/$popmax with a single existing sorted set - $encoding" { set rd [redis_deferring_client] - create_zset zset {0 a 1 b 2 c} + create_zset zset {0 a 1 b 2 c 3 d} - $rd bzpopmin zset 5 - assert_equal {zset a 0} [$rd read] - $rd bzpopmin zset 5 - assert_equal {zset b 1} [$rd read] - $rd bzpopmax zset 5 - assert_equal {zset c 2} [$rd read] + verify_bzpop_response $rd $popmin zset 5 0 {zset a 0} {zset {{a 0}}} + verify_bzpop_response $rd $popmax zset 5 0 {zset d 3} {zset {{d 3}}} + verify_bzpop_response $rd $popmin zset 5 0 {zset b 1} {zset {{b 1}}} + verify_bzpop_response $rd $popmax zset 5 0 {zset c 2} {zset {{c 2}}} assert_equal 0 [r exists zset] } - test "BZPOP with multiple existing sorted sets - $encoding" { + test "$popmin/$popmax with multiple existing sorted sets - $encoding" { set rd [redis_deferring_client] create_zset z1{t} {0 a 1 b 2 c} create_zset z2{t} {3 d 4 e 5 f} - $rd bzpopmin z1{t} z2{t} 5 - assert_equal {z1{t} a 0} [$rd read] - $rd bzpopmax z1{t} z2{t} 5 - assert_equal {z1{t} c 2} [$rd read] + verify_bzpop_two_key_response $rd $popmin z1{t} z2{t} 5 0 {z1{t} a 0} {z1{t} {{a 0}}} + verify_bzpop_two_key_response $rd $popmax z1{t} z2{t} 5 0 {z1{t} c 2} {z1{t} {{c 2}}} assert_equal 1 [r zcard z1{t}] assert_equal 3 [r zcard z2{t}] - $rd bzpopmax z2{t} z1{t} 5 - assert_equal {z2{t} f 5} [$rd read] - $rd bzpopmin z2{t} z1{t} 5 - assert_equal {z2{t} d 3} [$rd read] + verify_bzpop_two_key_response $rd $popmax z2{t} z1{t} 5 0 {z2{t} f 5} {z2{t} {{f 5}}} + verify_bzpop_two_key_response $rd $popmin z2{t} z1{t} 5 0 {z2{t} d 3} {z2{t} {{d 3}}} assert_equal 1 [r zcard z1{t}] assert_equal 1 [r zcard z2{t}] } - test "BZPOP second sorted set has members - $encoding" { + test "$popmin/$popmax second sorted set has members - $encoding" { set rd [redis_deferring_client] r del z1{t} create_zset z2{t} {3 d 4 e 5 f} - $rd bzpopmax z1{t} z2{t} 5 - assert_equal {z2{t} f 5} [$rd read] - $rd bzpopmin z2{t} z1{t} 5 - assert_equal {z2{t} d 3} [$rd read] + + verify_bzpop_two_key_response $rd $popmax z1{t} z2{t} 5 0 {z2{t} f 5} {z2{t} {{f 5}}} + verify_bzpop_two_key_response $rd $popmin z1{t} z2{t} 5 0 {z2{t} d 3} {z2{t} {{d 3}}} assert_equal 0 [r zcard z1{t}] assert_equal 1 [r zcard z2{t}] } + } - test "Basic ZPOP - $encoding RESP3" { + foreach {popmin popmax} {ZPOPMIN ZPOPMAX ZMPOP_MIN ZMPOP_MAX} { + test "Basic $popmin/$popmax - $encoding RESP3" { r hello 3 - r del z1 create_zset z1 {0 a 1 b 2 c 3 d} - assert_equal {a 0.0} [r zpopmin z1] - assert_equal {d 3.0} [r zpopmax z1] + verify_zpop_response r $popmin z1 0 {a 0.0} {z1 {{a 0.0}}} + verify_zpop_response r $popmax z1 0 {d 3.0} {z1 {{d 3.0}}} r hello 2 } - test "ZPOP with count - $encoding RESP3" { + test "$popmin/$popmax with count - $encoding RESP3" { r hello 3 - r del z1 create_zset z1 {0 a 1 b 2 c 3 d} - assert_equal {{a 0.0} {b 1.0}} [r zpopmin z1 2] - assert_equal {{d 3.0} {c 2.0}} [r zpopmax z1 2] + verify_zpop_response r $popmin z1 2 {{a 0.0} {b 1.0}} {z1 {{a 0.0} {b 1.0}}} + verify_zpop_response r $popmax z1 2 {{d 3.0} {c 2.0}} {z1 {{d 3.0} {c 2.0}}} r hello 2 } + } - test "BZPOP - $encoding RESP3" { + foreach {popmin popmax} {BZPOPMIN BZPOPMAX BZMPOP_MIN BZMPOP_MAX} { + test "$popmin/$popmax - $encoding RESP3" { r hello 3 set rd [redis_deferring_client] - create_zset zset {0 a 1 b 2 c} + create_zset zset {0 a 1 b 2 c 3 d} + + verify_bzpop_response $rd $popmin zset 5 0 {zset a 0} {zset {{a 0}}} + verify_bzpop_response $rd $popmax zset 5 0 {zset d 3} {zset {{d 3}}} + verify_bzpop_response $rd $popmin zset 5 0 {zset b 1} {zset {{b 1}}} + verify_bzpop_response $rd $popmax zset 5 0 {zset c 2} {zset {{c 2}}} - $rd bzpopmin zset 5 - assert_equal {zset a 0} [$rd read] - $rd bzpopmin zset 5 - assert_equal {zset b 1} [$rd read] - $rd bzpopmax zset 5 - assert_equal {zset c 2} [$rd read] assert_equal 0 [r exists zset] r hello 2 } + } r config set zset-max-ziplist-entries $original_max_entries r config set zset-max-ziplist-value $original_max_value @@ -1025,6 +1108,248 @@ start_server {tags {"zset"}} { basics listpack basics skiplist + test "ZPOP/ZMPOP against wrong type" { + r set foo{t} bar + assert_error "*WRONGTYPE*" {r zpopmin foo{t}} + assert_error "*WRONGTYPE*" {r zpopmax foo{t}} + assert_error "*WRONGTYPE*" {r zpopmin foo{t} 2} + + assert_error "*WRONGTYPE*" {r zmpop 1 foo{t} min} + assert_error "*WRONGTYPE*" {r zmpop 1 foo{t} max} + assert_error "*WRONGTYPE*" {r zmpop 1 foo{t} max count 200} + + r del foo{t} + r set foo2{t} bar + assert_error "*WRONGTYPE*" {r zmpop 2 foo{t} foo2{t} min} + assert_error "*WRONGTYPE*" {r zmpop 2 foo2{t} foo1{t} max count 1} + } + + test "ZMPOP with illegal argument" { + assert_error "ERR wrong number of arguments*" {r zmpop} + assert_error "ERR wrong number of arguments*" {r zmpop 1} + assert_error "ERR wrong number of arguments*" {r zmpop 1 myzset{t}} + + assert_error "ERR numkeys*" {r zmpop 0 myzset{t} MIN} + assert_error "ERR numkeys*" {r zmpop a myzset{t} MIN} + assert_error "ERR numkeys*" {r zmpop -1 myzset{t} MAX} + + assert_error "ERR syntax error*" {r zmpop 1 myzset{t} bad_where} + assert_error "ERR syntax error*" {r zmpop 1 myzset{t} MIN bar_arg} + assert_error "ERR syntax error*" {r zmpop 1 myzset{t} MAX MIN} + assert_error "ERR syntax error*" {r zmpop 1 myzset{t} COUNT} + assert_error "ERR syntax error*" {r zmpop 2 myzset{t} myzset2{t} bad_arg} + + assert_error "ERR count*" {r zmpop 1 myzset{t} MIN COUNT 0} + assert_error "ERR count*" {r zmpop 1 myzset{t} MAX COUNT a} + assert_error "ERR count*" {r zmpop 1 myzset{t} MIN COUNT -1} + assert_error "ERR count*" {r zmpop 2 myzset{t} myzset2{t} MAX COUNT -1} + } + + test "ZMPOP propagate as pop with count command to replica" { + set repl [attach_to_replication_stream] + + # ZMPOP min/max propagate as ZPOPMIN/ZPOPMAX with count + r zadd myzset{t} 1 one 2 two 3 three + + # Pop elements from one zset. + r zmpop 1 myzset{t} min + r zmpop 1 myzset{t} max count 1 + + # Now the zset have only one element + r zmpop 2 myzset{t} myzset2{t} min count 10 + + # No elements so we don't propagate. + r zmpop 2 myzset{t} myzset2{t} max count 10 + + # Pop elements from the second zset. + r zadd myzset2{t} 1 one 2 two 3 three + r zmpop 2 myzset{t} myzset2{t} min count 2 + r zmpop 2 myzset{t} myzset2{t} max count 1 + + # Pop all elements. + r zadd myzset{t} 1 one 2 two 3 three + r zadd myzset2{t} 4 four 5 five 6 six + r zmpop 2 myzset{t} myzset2{t} min count 10 + r zmpop 2 myzset{t} myzset2{t} max count 10 + + assert_replication_stream $repl { + {select *} + {zadd myzset{t} 1 one 2 two 3 three} + {zpopmin myzset{t} 1} + {zpopmax myzset{t} 1} + {zpopmin myzset{t} 1} + {zadd myzset2{t} 1 one 2 two 3 three} + {zpopmin myzset2{t} 2} + {zpopmax myzset2{t} 1} + {zadd myzset{t} 1 one 2 two 3 three} + {zadd myzset2{t} 4 four 5 five 6 six} + {zpopmin myzset{t} 3} + {zpopmax myzset2{t} 3} + } + } {} {needs:repl} + + foreach resp {3 2} { + test "ZPOPMIN/ZPOPMAX readraw in RESP$resp" { + r del zset{t} + create_zset zset2{t} {1 a 2 b 3 c 4 d 5 e} + + r hello $resp + r readraw 1 + + # ZPOP against non existing key. + assert_equal {*0} [r zpopmin zset{t}] + assert_equal {*0} [r zpopmin zset{t} 1] + + # ZPOP without COUNT option. + assert_equal {*2} [r zpopmin zset2{t}] + assert_equal [r read] {$1} + assert_equal [r read] {a} + verify_score_response r $resp 1 + + # ZPOP with COUNT option. + if {$resp == 2} { + assert_equal {*2} [r zpopmax zset2{t} 1] + assert_equal [r read] {$1} + assert_equal [r read] {e} + } elseif {$resp == 3} { + assert_equal {*1} [r zpopmax zset2{t} 1] + assert_equal [r read] {*2} + assert_equal [r read] {$1} + assert_equal [r read] {e} + } + verify_score_response r $resp 5 + + r readraw 0 + } + + test "BZPOPMIN/BZPOPMAX readraw in RESP$resp" { + r del zset{t} + create_zset zset2{t} {1 a 2 b 3 c 4 d 5 e} + + set rd [redis_deferring_client] + $rd hello $resp + $rd read + $rd readraw 1 + + # BZPOP released on timeout. + $rd bzpopmin zset{t} 0.01 + verify_nil_response $resp [$rd read] + $rd bzpopmax zset{t} 0.01 + verify_nil_response $resp [$rd read] + + # BZPOP non-blocking path. + $rd bzpopmin zset1{t} zset2{t} 0.1 + assert_equal [$rd read] {*3} + assert_equal [$rd read] {$8} + assert_equal [$rd read] {zset2{t}} + assert_equal [$rd read] {$1} + assert_equal [$rd read] {a} + verify_score_response $rd $resp 1 + + # BZPOP blocking path. + $rd bzpopmin zset{t} 5 + wait_for_blocked_client + r zadd zset{t} 1 a + assert_equal [$rd read] {*3} + assert_equal [$rd read] {$7} + assert_equal [$rd read] {zset{t}} + assert_equal [$rd read] {$1} + assert_equal [$rd read] {a} + verify_score_response $rd $resp 1 + + $rd readraw 0 + } + + test "ZMPOP readraw in RESP$resp" { + r del zset{t} zset2{t} + create_zset zset3{t} {1 a} + create_zset zset4{t} {1 a 2 b 3 c 4 d 5 e} + + r hello $resp + r readraw 1 + + # ZMPOP against non existing key. + verify_nil_response $resp [r zmpop 1 zset{t} min] + verify_nil_response $resp [r zmpop 1 zset{t} max count 1] + verify_nil_response $resp [r zmpop 2 zset{t} zset2{t} min] + verify_nil_response $resp [r zmpop 2 zset{t} zset2{t} max count 1] + + # ZMPOP with one input key. + assert_equal {*2} [r zmpop 1 zset3{t} max] + assert_equal [r read] {$8} + assert_equal [r read] {zset3{t}} + assert_equal [r read] {*1} + assert_equal [r read] {*2} + assert_equal [r read] {$1} + assert_equal [r read] {a} + verify_score_response r $resp 1 + + # ZMPOP with COUNT option. + assert_equal {*2} [r zmpop 2 zset3{t} zset4{t} min count 2] + assert_equal [r read] {$8} + assert_equal [r read] {zset4{t}} + assert_equal [r read] {*2} + assert_equal [r read] {*2} + assert_equal [r read] {$1} + assert_equal [r read] {a} + verify_score_response r $resp 1 + assert_equal [r read] {*2} + assert_equal [r read] {$1} + assert_equal [r read] {b} + verify_score_response r $resp 2 + + r readraw 0 + } + + test "BZMPOP readraw in RESP$resp" { + r del zset{t} zset2{t} + create_zset zset3{t} {1 a 2 b 3 c 4 d 5 e} + + set rd [redis_deferring_client] + $rd hello $resp + $rd read + $rd readraw 1 + + # BZMPOP released on timeout. + $rd bzmpop 0.01 1 zset{t} min + verify_nil_response $resp [$rd read] + $rd bzmpop 0.01 2 zset{t} zset2{t} max + verify_nil_response $resp [$rd read] + + # BZMPOP non-blocking path. + $rd bzmpop 0.1 2 zset3{t} zset4{t} min + + assert_equal [$rd read] {*2} + assert_equal [$rd read] {$8} + assert_equal [$rd read] {zset3{t}} + assert_equal [$rd read] {*1} + assert_equal [$rd read] {*2} + assert_equal [$rd read] {$1} + assert_equal [$rd read] {a} + verify_score_response $rd $resp 1 + + # BZMPOP blocking path with COUNT option. + $rd bzmpop 5 2 zset{t} zset2{t} max count 2 + wait_for_blocked_client + r zadd zset2{t} 1 a 2 b 3 c + + assert_equal [$rd read] {*2} + assert_equal [$rd read] {$8} + assert_equal [$rd read] {zset2{t}} + assert_equal [$rd read] {*2} + assert_equal [$rd read] {*2} + assert_equal [$rd read] {$1} + assert_equal [$rd read] {c} + verify_score_response $rd $resp 3 + assert_equal [$rd read] {*2} + assert_equal [$rd read] {$1} + assert_equal [$rd read] {b} + verify_score_response $rd $resp 2 + + $rd readraw 0 + } + } + test {ZINTERSTORE regression with two sets, intset+hashtable} { r del seta{t} setb{t} setc{t} r sadd set1{t} a @@ -1073,25 +1398,25 @@ start_server {tags {"zset"}} { assert_error "*ERR*syntax*" {r zinterstore foo{t} 2 zsetd{t} zsetf{t} withscores} assert_error "*ERR*syntax*" {r zdiffstore foo{t} 2 zsetd{t} zsetf{t} withscores} } - + test {ZMSCORE retrieve} { r del zmscoretest r zadd zmscoretest 10 x r zadd zmscoretest 20 y - + r zmscore zmscoretest x y } {10 20} test {ZMSCORE retrieve from empty set} { r del zmscoretest - + r zmscore zmscoretest x y } {{} {}} - + test {ZMSCORE retrieve with missing member} { r del zmscoretest r zadd zmscoretest 10 x - + r zmscore zmscoretest x y } {10 {}} @@ -1099,7 +1424,7 @@ start_server {tags {"zset"}} { r del zmscoretest r zadd zmscoretest 10 x r zadd zmscoretest 20 y - + r zmscore zmscoretest x } {10} @@ -1107,7 +1432,7 @@ start_server {tags {"zset"}} { r del zmscoretest r zadd zmscoretest 10 x r zadd zmscoretest 20 y - + catch {r zmscore zmscoretest} e assert_match {*ERR*wrong*number*arg*} $e } @@ -1459,27 +1784,31 @@ start_server {tags {"zset"}} { assert_equal {} $err } - test "BZPOPMIN, ZADD + DEL should not awake blocked client" { + foreach {pop} {BZPOPMIN BZMPOP_MIN} { + test "$pop, ZADD + DEL should not awake blocked client" { set rd [redis_deferring_client] r del zset - $rd bzpopmin zset 0 + bzpop_command $rd $pop zset 0 + wait_for_blocked_client + r multi r zadd zset 0 foo r del zset r exec r del zset r zadd zset 1 bar - $rd read - } {zset bar 1} - test "BZPOPMIN, ZADD + DEL + SET should not awake blocked client" { + verify_pop_response $pop [$rd read] {zset bar 1} {zset {{bar 1}}} + } + + test "$pop, ZADD + DEL + SET should not awake blocked client" { set rd [redis_deferring_client] - r del list - r del zset - $rd bzpopmin zset 0 + bzpop_command $rd $pop zset 0 + wait_for_blocked_client + r multi r zadd zset 0 foo r del zset @@ -1487,8 +1816,10 @@ start_server {tags {"zset"}} { r exec r del zset r zadd zset 1 bar - $rd read - } {zset bar 1} + + verify_pop_response $pop [$rd read] {zset bar 1} {zset {{bar 1}}} + } + } test "BZPOPMIN with same key multiple times should work" { set rd [redis_deferring_client] @@ -1496,9 +1827,11 @@ start_server {tags {"zset"}} { # Data arriving after the BZPOPMIN. $rd bzpopmin z1{t} z2{t} z2{t} z1{t} 0 + wait_for_blocked_client r zadd z1{t} 0 a assert_equal [$rd read] {z1{t} a 0} $rd bzpopmin z1{t} z2{t} z2{t} z1{t} 0 + wait_for_blocked_client r zadd z2{t} 1 b assert_equal [$rd read] {z2{t} b 1} @@ -1511,38 +1844,47 @@ start_server {tags {"zset"}} { assert_equal [$rd read] {z2{t} b 1} } - test "MULTI/EXEC is isolated from the point of view of BZPOPMIN" { + foreach {pop} {BZPOPMIN BZMPOP_MIN} { + test "MULTI/EXEC is isolated from the point of view of $pop" { set rd [redis_deferring_client] r del zset - $rd bzpopmin zset 0 + + bzpop_command $rd $pop zset 0 + wait_for_blocked_client + r multi r zadd zset 0 a r zadd zset 1 b r zadd zset 2 c r exec - $rd read - } {zset a 0} - test "BZPOPMIN with variadic ZADD" { + verify_pop_response $pop [$rd read] {zset a 0} {zset {{a 0}}} + } + + test "$pop with variadic ZADD" { set rd [redis_deferring_client] r del zset if {$::valgrind} {after 100} - $rd bzpopmin zset 0 + bzpop_command $rd $pop zset 0 + wait_for_blocked_client if {$::valgrind} {after 100} assert_equal 2 [r zadd zset -1 foo 1 bar] if {$::valgrind} {after 100} - assert_equal {zset foo -1} [$rd read] + verify_pop_response $pop [$rd read] {zset foo -1} {zset {{foo -1}}} assert_equal {bar} [r zrange zset 0 -1] } - test "BZPOPMIN with zero timeout should block indefinitely" { + test "$pop with zero timeout should block indefinitely" { set rd [redis_deferring_client] r del zset - $rd bzpopmin zset 0 + bzpop_command $rd $pop zset 0 + wait_for_blocked_client after 1000 r zadd zset 0 foo - assert_equal {zset foo 0} [$rd read] + verify_pop_response $pop [$rd read] {zset foo 0} {zset {{foo 0}}} } + } + r config set zset-max-ziplist-entries $original_max_entries r config set zset-max-ziplist-value $original_max_value } @@ -1552,6 +1894,113 @@ start_server {tags {"zset"}} { stressers skiplist } + test "BZPOP/BZMPOP against wrong type" { + r set foo{t} bar + assert_error "*WRONGTYPE*" {r bzpopmin foo{t} 1} + assert_error "*WRONGTYPE*" {r bzpopmax foo{t} 1} + + assert_error "*WRONGTYPE*" {r bzmpop 1 1 foo{t} min} + assert_error "*WRONGTYPE*" {r bzmpop 1 1 foo{t} max} + assert_error "*WRONGTYPE*" {r bzmpop 1 1 foo{t} min count 10} + + r del foo{t} + r set foo2{t} bar + assert_error "*WRONGTYPE*" {r bzmpop 1 2 foo{t} foo2{t} min} + assert_error "*WRONGTYPE*" {r bzmpop 1 2 foo2{t} foo{t} max count 1} + } + + test "BZMPOP with illegal argument" { + assert_error "ERR wrong number of arguments*" {r bzmpop} + assert_error "ERR wrong number of arguments*" {r bzmpop 0 1} + assert_error "ERR wrong number of arguments*" {r bzmpop 0 1 myzset{t}} + + assert_error "ERR numkeys*" {r bzmpop 1 0 myzset{t} MIN} + assert_error "ERR numkeys*" {r bzmpop 1 a myzset{t} MIN} + assert_error "ERR numkeys*" {r bzmpop 1 -1 myzset{t} MAX} + + assert_error "ERR syntax error*" {r bzmpop 1 1 myzset{t} bad_where} + assert_error "ERR syntax error*" {r bzmpop 1 1 myzset{t} MIN bar_arg} + assert_error "ERR syntax error*" {r bzmpop 1 1 myzset{t} MAX MIN} + assert_error "ERR syntax error*" {r bzmpop 1 1 myzset{t} COUNT} + assert_error "ERR syntax error*" {r bzmpop 1 2 myzset{t} myzset2{t} bad_arg} + + assert_error "ERR count*" {r bzmpop 1 1 myzset{t} MIN COUNT 0} + assert_error "ERR count*" {r bzmpop 1 1 myzset{t} MAX COUNT a} + assert_error "ERR count*" {r bzmpop 1 1 myzset{t} MIN COUNT -1} + assert_error "ERR count*" {r bzmpop 1 2 myzset{t} myzset2{t} MAX COUNT -1} + } + + test "BZMPOP with multiple blocked clients" { + set rd1 [redis_deferring_client] + set rd2 [redis_deferring_client] + set rd3 [redis_deferring_client] + set rd4 [redis_deferring_client] + r del myzset{t} myzset2{t} + + $rd1 bzmpop 0 2 myzset{t} myzset2{t} min count 1 + $rd2 bzmpop 0 2 myzset{t} myzset2{t} max count 10 + $rd3 bzmpop 0 2 myzset{t} myzset2{t} min count 10 + $rd4 bzmpop 0 2 myzset{t} myzset2{t} max count 1 + wait_for_blocked_clients_count 4 + + r multi + r zadd myzset{t} 1 a 2 b 3 c 4 d 5 e + r zadd myzset2{t} 1 a 2 b 3 c 4 d 5 e + r exec + + assert_equal {myzset{t} {{a 1}}} [$rd1 read] + assert_equal {myzset{t} {{e 5} {d 4} {c 3} {b 2}}} [$rd2 read] + assert_equal {myzset2{t} {{a 1} {b 2} {c 3} {d 4} {e 5}}} [$rd3 read] + + r zadd myzset2{t} 1 a 2 b 3 c + assert_equal {myzset2{t} {{c 3}}} [$rd4 read] + + r del myzset{t} myzset2{t} + } + + test "BZMPOP propagate as pop with count command to replica" { + set rd [redis_deferring_client] + set repl [attach_to_replication_stream] + + # BZMPOP without being blocked. + r zadd myzset{t} 1 one 2 two 3 three + r zadd myzset2{t} 4 four 5 five 6 six + r bzmpop 0 1 myzset{t} min + r bzmpop 0 2 myzset{t} myzset2{t} max count 10 + r bzmpop 0 2 myzset{t} myzset2{t} max count 10 + + # BZMPOP that gets blocked. + $rd bzmpop 0 1 myzset{t} min count 1 + wait_for_blocked_client + r zadd myzset{t} 1 one + $rd bzmpop 0 2 myzset{t} myzset2{t} min count 5 + wait_for_blocked_client + r zadd myzset{t} 1 one 2 two 3 three + $rd bzmpop 0 2 myzset{t} myzset2{t} max count 10 + wait_for_blocked_client + r zadd myzset2{t} 4 four 5 five 6 six + + # Released on timeout. + assert_equal {} [r bzmpop 0.01 1 myzset{t} max count 10] + r set foo{t} bar ;# something else to propagate after, so we can make sure the above pop didn't. + + assert_replication_stream $repl { + {select *} + {zadd myzset{t} 1 one 2 two 3 three} + {zadd myzset2{t} 4 four 5 five 6 six} + {zpopmin myzset{t} 1} + {zpopmax myzset{t} 2} + {zpopmax myzset2{t} 3} + {zadd myzset{t} 1 one} + {zpopmin myzset{t} 1} + {zadd myzset{t} 1 one 2 two 3 three} + {zpopmin myzset{t} 3} + {zadd myzset2{t} 4 four 5 five 6 six} + {zpopmax myzset2{t} 3} + {set foo{t} bar} + } + } {} {needs:repl} + test {ZSET skiplist order consistency when elements are moved} { set original_max [lindex [r config get zset-max-ziplist-entries] 1] r config set zset-max-ziplist-entries 0