diff --git a/src/blocked.c b/src/blocked.c index 6ce4b9893..68d3112a9 100644 --- a/src/blocked.c +++ b/src/blocked.c @@ -65,7 +65,7 @@ #include "latency.h" #include "monotonic.h" -int serveClientBlockedOnList(client *receiver, robj *key, robj *dstkey, redisDb *db, robj *value, int wherefrom, int whereto); +void serveClientBlockedOnList(client *receiver, robj *o, robj *key, robj *dstkey, redisDb *db, int wherefrom, int whereto, int *deleted); int getListPositionFromObjectOrReply(client *c, robj *arg, int *position); /* This structure represents the blocked key information that we store @@ -271,6 +271,7 @@ void serveClientsBlockedOnListKey(robj *o, readyList *rl) { if (de) { list *clients = dictGetVal(de); int numclients = listLength(clients); + int deleted = 0; while(numclients--) { listNode *clientnode = listFirst(clients); @@ -286,41 +287,27 @@ void serveClientsBlockedOnListKey(robj *o, readyList *rl) { robj *dstkey = receiver->bpop.target; int wherefrom = receiver->bpop.listpos.wherefrom; int whereto = receiver->bpop.listpos.whereto; - robj *value = listTypePop(o, wherefrom); - if (value) { - /* Protect receiver->bpop.target, that will be - * freed by the next unblockClient() - * call. */ - if (dstkey) incrRefCount(dstkey); + /* Protect receiver->bpop.target, that will be + * freed by the next unblockClient() + * call. */ + if (dstkey) incrRefCount(dstkey); - monotime replyTimer; - elapsedStart(&replyTimer); - if (serveClientBlockedOnList(receiver, - rl->key,dstkey,rl->db,value, - wherefrom, whereto) == C_ERR) - { - /* If we failed serving the client we need - * to also undo the POP operation. */ - listTypePush(o,value,wherefrom); - } - updateStatsOnUnblock(receiver, 0, elapsedUs(replyTimer)); - unblockClient(receiver); + monotime replyTimer; + elapsedStart(&replyTimer); + serveClientBlockedOnList(receiver, o, + rl->key, dstkey, rl->db, + wherefrom, whereto, + &deleted); + updateStatsOnUnblock(receiver, 0, elapsedUs(replyTimer)); + unblockClient(receiver); - if (dstkey) decrRefCount(dstkey); - decrRefCount(value); - } else { - break; - } + if (dstkey) decrRefCount(dstkey); + + /* The list is empty and has been deleted. */ + if (deleted) break; } } - - if (listTypeLength(o) == 0) { - dbDelete(rl->db,rl->key); - notifyKeyspaceEvent(NOTIFY_GENERIC,"del",rl->key,rl->db->id); - } - /* We don't call signalModifiedKey() as it was already called - * when an element was pushed on the list. */ } /* Helper function for handleClientsBlockedOnKeys(). This function is called @@ -627,12 +614,16 @@ void handleClientsBlockedOnKeys(void) { * for all the 'numkeys' keys as in the 'keys' argument. When we block for * stream keys, we also provide an array of streamID structures: clients will * be unblocked only when items with an ID greater or equal to the specified - * one is appended to the stream. */ -void blockForKeys(client *c, int btype, robj **keys, int numkeys, mstime_t timeout, robj *target, struct listPos *listpos, streamID *ids) { + * one is appended to the stream. + * + * 'count' for those commands that support the optional count argument. + * Otherwise the value is 0. */ +void blockForKeys(client *c, int btype, robj **keys, int numkeys, long count, mstime_t timeout, robj *target, struct listPos *listpos, streamID *ids) { dictEntry *de; list *l; int j; + c->bpop.count = count; c->bpop.timeout = timeout; c->bpop.target = target; diff --git a/src/db.c b/src/db.c index bda03998d..ba2c24348 100644 --- a/src/db.c +++ b/src/db.c @@ -1703,6 +1703,16 @@ int evalGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult * return genericGetKeys(0, 2, 3, 1, argv, argc, result); } +int lmpopGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result) { + UNUSED(cmd); + return genericGetKeys(0, 1, 2, 1, argv, argc, result); +} + +int blmpopGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result) { + UNUSED(cmd); + return genericGetKeys(0, 2, 3, 1, argv, argc, result); +} + /* Helper function to extract keys from the SORT command. * * SORT ... STORE ... diff --git a/src/module.c b/src/module.c index bec844edf..a617d6053 100644 --- a/src/module.c +++ b/src/module.c @@ -5504,7 +5504,7 @@ RedisModuleBlockedClient *moduleBlockClient(RedisModuleCtx *ctx, RedisModuleCmdF "Blocking module command called from transaction"); } else { if (keys) { - blockForKeys(c,BLOCKED_MODULE,keys,numkeys,timeout,NULL,NULL,NULL); + blockForKeys(c,BLOCKED_MODULE,keys,numkeys,0,timeout,NULL,NULL,NULL); } else { blockClient(c,BLOCKED_MODULE); } diff --git a/src/server.c b/src/server.c index 866bf5775..f753b6658 100644 --- a/src/server.c +++ b/src/server.c @@ -316,6 +316,10 @@ struct redisCommand redisCommandTable[] = { "write fast @list", 0,NULL,1,1,1,0,0,0}, + {"lmpop",lmpopCommand,-4, + "write @list", + 0,lmpopGetKeys,0,0,0,0,0,0}, + {"brpop",brpopCommand,-3, "write no-script @list @blocking", 0,NULL,1,-2,1,0,0,0}, @@ -332,6 +336,10 @@ struct redisCommand redisCommandTable[] = { "write no-script @list @blocking", 0,NULL,1,-2,1,0,0,0}, + {"blmpop",blmpopCommand,-5, + "write @list @blocking", + 0,blmpopGetKeys,0,0,0,0,0,0}, + {"llen",llenCommand,2, "read-only fast @list", 0,NULL,1,1,1,0,0,0}, diff --git a/src/server.h b/src/server.h index c97d9fd84..ce5b0db76 100644 --- a/src/server.h +++ b/src/server.h @@ -793,6 +793,7 @@ typedef struct multiState { * The fields used depend on client->btype. */ typedef struct blockingState { /* Generic fields. */ + long count; /* Elements to pop if count was specified (BLMPOP), 0 otherwise. */ mstime_t timeout; /* Blocking operation timeout. If UNIX current time * is > timeout then the operation timed out. */ @@ -1910,6 +1911,7 @@ void addReplyPushLen(client *c, long length); void addReplyHelp(client *c, const char **help); void addReplySubcommandSyntaxError(client *c); void addReplyLoadedModules(client *c); +void addListRangeReply(client *c, robj *o, long start, long end, int reverse); void copyClientOutputBuffer(client *dst, client *src); size_t sdsZmallocSize(sds s); size_t getStringObjectSdsUsedMemory(robj *o); @@ -1991,9 +1993,10 @@ int listTypeEqual(listTypeEntry *entry, robj *o); void listTypeDelete(listTypeIterator *iter, listTypeEntry *entry); void listTypeConvert(robj *subject, int enc); robj *listTypeDup(robj *o); +int listTypeDelRange(robj *o, long start, long stop); void unblockClientWaitingData(client *c); void popGenericCommand(client *c, int where); -void listElementsRemoved(client *c, robj *key, int where, robj *o, long count); +void listElementsRemoved(client *c, robj *key, int where, robj *o, long count, int *deleted); /* MULTI/EXEC/WATCH... */ void unwatchAllKeys(client *c); @@ -2451,6 +2454,8 @@ int georadiusGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysRes int xreadGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result); int memoryGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result); int lcsGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result); +int lmpopGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result); +int blmpopGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result); unsigned short crc16(const char *buf, int len); @@ -2487,7 +2492,7 @@ int getTimeoutFromObjectOrReply(client *c, robj *object, mstime_t *timeout, int void disconnectAllBlockedClients(void); void handleClientsBlockedOnKeys(void); void signalKeyAsReady(redisDb *db, robj *key, int type); -void blockForKeys(client *c, int btype, robj **keys, int numkeys, mstime_t timeout, robj *target, struct listPos *listpos, streamID *ids); +void blockForKeys(client *c, int btype, robj **keys, int numkeys, long count, mstime_t timeout, robj *target, struct listPos *listpos, streamID *ids); void updateStatsOnUnblock(client *c, long blocked_us, long reply_us); /* timeout.c -- Blocked clients timeout and connections timeout. */ @@ -2576,6 +2581,7 @@ void rpushxCommand(client *c); void linsertCommand(client *c); void lpopCommand(client *c); void rpopCommand(client *c); +void lmpopCommand(client *c); void llenCommand(client *c); void lindexCommand(client *c); void lrangeCommand(client *c); @@ -2652,6 +2658,7 @@ void execCommand(client *c); void discardCommand(client *c); void blpopCommand(client *c); void brpopCommand(client *c); +void blmpopCommand(client *c); void brpoplpushCommand(client *c); void blmoveCommand(client *c); void appendCommand(client *c); diff --git a/src/t_list.c b/src/t_list.c index 6fbf9ee0d..fb63c1509 100644 --- a/src/t_list.c +++ b/src/t_list.c @@ -215,6 +215,15 @@ robj *listTypeDup(robj *o) { return lobj; } +/* Delete a range of elements from the list. */ +int listTypeDelRange(robj *subject, long start, long count) { + if (subject->encoding == OBJ_ENCODING_QUICKLIST) { + return quicklistDelRange(subject->ptr, start, count); + } else { + serverPanic("Unknown list encoding"); + } +} + /*----------------------------------------------------------------------------- * List Commands *----------------------------------------------------------------------------*/ @@ -374,6 +383,35 @@ void lsetCommand(client *c) { } } +/* A helper function like addListRangeReply, more details see below. + * The difference is that here we are returning nested arrays, like: + * 1) keyname + * 2) 1) element1 + * 2) element2 + * + * And also actually pop out from the list by calling listElementsRemoved. + * We maintain the server.dirty and notifications there. + * + * 'deleted' is an optional output argument to get an indication + * if the key got deleted by this function. */ +void listPopRangeAndReplyWithKey(client *c, robj *o, robj *key, int where, long count, int *deleted) { + long llen = listTypeLength(o); + long rangelen = (count > llen) ? llen : count; + long rangestart = (where == LIST_HEAD) ? 0 : -rangelen; + long rangeend = (where == LIST_HEAD) ? rangelen - 1 : -1; + int reverse = (where == LIST_HEAD) ? 0 : 1; + + /* We return key-name just once, and an array of elements */ + addReplyArrayLen(c, 2); + addReplyBulk(c, key); + addListRangeReply(c, o, rangestart, rangeend, reverse); + + /* Pop these elements. */ + listTypeDelRange(o, rangestart, rangelen); + /* Maintain the notifications and dirty. */ + listElementsRemoved(c, key, where, o, rangelen, deleted); +} + /* A helper for replying with a list's range between the inclusive start and end * indexes as multi-bulk, with support for negative indexes. Note that start * must be less than end or an empty array is returned. When the reverse @@ -419,14 +457,21 @@ void addListRangeReply(client *c, robj *o, long start, long end, int reverse) { } } -/* A housekeeping helper for list elements popping tasks. */ -void listElementsRemoved(client *c, robj *key, int where, robj *o, long count) { +/* A housekeeping helper for list elements popping tasks. + * + * 'deleted' is an optional output argument to get an indication + * if the key got deleted by this function. */ +void listElementsRemoved(client *c, robj *key, int where, robj *o, long count, int *deleted) { char *event = (where == LIST_HEAD) ? "lpop" : "rpop"; notifyKeyspaceEvent(NOTIFY_LIST, event, key, c->db->id); if (listTypeLength(o) == 0) { - notifyKeyspaceEvent(NOTIFY_GENERIC, "del", key, c->db->id); + if (deleted) *deleted = 1; + dbDelete(c->db, key); + notifyKeyspaceEvent(NOTIFY_GENERIC, "del", key, c->db->id); + } else { + if (deleted) *deleted = 0; } signalModifiedKey(c, c->db, key); server.dirty += count; @@ -466,7 +511,7 @@ void popGenericCommand(client *c, int where) { serverAssert(value != NULL); addReplyBulk(c,value); decrRefCount(value); - listElementsRemoved(c,c->argv[1],where,o,1); + listElementsRemoved(c,c->argv[1],where,o,1,NULL); } else { /* Pop a range of elements. An addition to the original POP command, * which replies with a multi-bulk. */ @@ -477,11 +522,52 @@ void popGenericCommand(client *c, int where) { int reverse = (where == LIST_HEAD) ? 0 : 1; addListRangeReply(c,o,rangestart,rangeend,reverse); - quicklistDelRange(o->ptr,rangestart,rangelen); - listElementsRemoved(c,c->argv[1],where,o,rangelen); + listTypeDelRange(o,rangestart,rangelen); + listElementsRemoved(c,c->argv[1],where,o,rangelen,NULL); } } +/* Like popGenericCommand but work with multiple keys. + * Take multiple keys and return multiple elements from just one key. + * + * 'numkeys' the number of keys. + * 'count' is the number of elements requested to pop. + * + * Always reply with array. */ +void mpopGenericCommand(client *c, robj **keys, int numkeys, int where, long count) { + int j; + robj *o; + robj *key; + + for (j = 0; j < numkeys; j++) { + key = keys[j]; + o = lookupKeyWrite(c->db, key); + + /* Non-existing key, move to next key. */ + if (o == NULL) continue; + + if (checkType(c, o, OBJ_LIST)) return; + + long llen = listTypeLength(o); + /* Empty list, move to next key. */ + if (llen == 0) continue; + + /* Pop a range of elements in a nested arrays way. */ + listPopRangeAndReplyWithKey(c, o, key, where, count, NULL); + + /* Replicate it as [LR]POP COUNT. */ + robj *count_obj = createStringObjectFromLongLong((count > llen) ? llen : count); + rewriteClientCommandVector(c, 3, + (where == LIST_HEAD) ? shared.lpop : shared.rpop, + key, count_obj); + decrRefCount(count_obj); + return; + } + + /* Look like we are not able to pop up any elements. */ + addReplyNullArray(c); +} + /* LPOP [count] */ void lpopCommand(client *c) { popGenericCommand(c,LIST_HEAD); @@ -829,10 +915,11 @@ void rpoplpushCommand(client *c) { * is to serve a specific client (receiver) that is blocked on 'key' * in the context of the specified 'db', doing the following: * - * 1) Provide the client with the 'value' element. + * 1) Provide the client with the 'value' element or a range of elements. + * We will do the pop in here and caller does not need to bother the return. * 2) If the dstkey is not NULL (we are serving a BLMOVE) also push the * 'value' element on the destination list (the "push" side of the command). - * 3) Propagate the resulting BRPOP, BLPOP and additional xPUSH if any into + * 3) Propagate the resulting BRPOP, BLPOP, BLMPOP and additional xPUSH if any into * the AOF and replication channel. * * The argument 'wherefrom' is LIST_TAIL or LIST_HEAD, and indicates if the @@ -843,25 +930,45 @@ void rpoplpushCommand(client *c) { * 'value' element is to be pushed to the head or tail so that we can * propagate the command properly. * - * The function returns C_OK if we are able to serve the client, otherwise - * C_ERR is returned to signal the caller that the list POP operation - * should be undone as the client was not served: This only happens for - * BLMOVE that fails to push the value to the destination key as it is - * of the wrong type. */ -int serveClientBlockedOnList(client *receiver, robj *key, robj *dstkey, redisDb *db, robj *value, int wherefrom, int whereto) + * 'deleted' is an optional output argument to get an indication + * if the key got deleted by this function. */ +void serveClientBlockedOnList(client *receiver, robj *o, robj *key, robj *dstkey, redisDb *db, int wherefrom, int whereto, int *deleted) { robj *argv[5]; + robj *value = NULL; + + if (deleted) *deleted = 0; if (dstkey == NULL) { /* Propagate the [LR]POP operation. */ + struct redisCommand *cmd = (wherefrom == LIST_HEAD) ? + server.lpopCommand : server.rpopCommand; argv[0] = (wherefrom == LIST_HEAD) ? shared.lpop : shared.rpop; argv[1] = key; - propagate((wherefrom == LIST_HEAD) ? - server.lpopCommand : server.rpopCommand, - db->id,argv,2,PROPAGATE_AOF|PROPAGATE_REPL); + + if (receiver->lastcmd->proc == blmpopCommand) { + /* Propagate the [LR]POP COUNT operation. */ + long count = receiver->bpop.count; + serverAssert(count > 0); + long llen = listTypeLength(o); + serverAssert(llen > 0); + + argv[2] = createStringObjectFromLongLong((count > llen) ? llen : count); + propagate(cmd, db->id, argv, 3, PROPAGATE_AOF|PROPAGATE_REPL); + decrRefCount(argv[2]); + + /* Pop a range of elements in a nested arrays way. */ + listPopRangeAndReplyWithKey(receiver, o, key, wherefrom, count, deleted); + return; + } + + propagate(cmd, db->id, argv, 2, PROPAGATE_AOF|PROPAGATE_REPL); /* BRPOP/BLPOP */ + value = listTypePop(o, wherefrom); + serverAssert(value != NULL); + addReplyArrayLen(receiver,2); addReplyBulk(receiver,key); addReplyBulk(receiver,value); @@ -876,6 +983,9 @@ int serveClientBlockedOnList(client *receiver, robj *key, robj *dstkey, redisDb if (!(dstobj && checkType(receiver,dstobj,OBJ_LIST))) { + value = listTypePop(o, wherefrom); + serverAssert(value != NULL); + lmoveHandlePush(receiver,dstkey,dstobj,value,whereto); /* Propagate the LMOVE/RPOPLPUSH operation. */ int isbrpoplpush = (receiver->lastcmd->proc == brpoplpushCommand); @@ -892,49 +1002,82 @@ int serveClientBlockedOnList(client *receiver, robj *key, robj *dstkey, redisDb /* Notify event ("lpush" or "rpush" was notified by lmoveHandlePush). */ notifyKeyspaceEvent(NOTIFY_LIST,wherefrom == LIST_TAIL ? "rpop" : "lpop", key,receiver->db->id); - } else { - /* BLMOVE failed because of wrong - * destination type. */ - return C_ERR; } } - return C_OK; + + if (value) decrRefCount(value); + + if (listTypeLength(o) == 0) { + if (deleted) *deleted = 1; + + dbDelete(receiver->db, key); + notifyKeyspaceEvent(NOTIFY_GENERIC, "del", key, receiver->db->id); + } + /* We don't call signalModifiedKey() as it was already called + * when an element was pushed on the list. */ } -/* Blocking RPOP/LPOP */ -void blockingPopGenericCommand(client *c, int where) { +/* Blocking RPOP/LPOP/LMPOP + * + * 'numkeys' is the number of keys. + * 'timeout_idx' parameter position of block timeout. + * 'where' LIST_HEAD for LEFT, LIST_TAIL for RIGHT. + * 'count' is the number of elements requested to pop, or 0 for plain single pop. + * + * When count is 0, a reply of a single bulk-string will be used. + * When count > 0, an array reply will be used. */ +void blockingPopGenericCommand(client *c, robj **keys, int numkeys, int where, int timeout_idx, long count) { robj *o; + robj *key; mstime_t timeout; int j; - if (getTimeoutFromObjectOrReply(c,c->argv[c->argc-1],&timeout,UNIT_SECONDS) + if (getTimeoutFromObjectOrReply(c,c->argv[timeout_idx],&timeout,UNIT_SECONDS) != C_OK) return; - for (j = 1; j < c->argc-1; j++) { - o = lookupKeyWrite(c->db,c->argv[j]); - if (o != NULL) { - if (checkType(c,o,OBJ_LIST)) { - return; - } else { - if (listTypeLength(o) != 0) { - /* Non empty list, this is like a normal [LR]POP. */ - robj *value = listTypePop(o,where); - serverAssert(value != NULL); + /* Traverse all input keys, we take action only based on one key. */ + for (j = 0; j < numkeys; j++) { + key = keys[j]; + o = lookupKeyWrite(c->db, key); - addReplyArrayLen(c,2); - addReplyBulk(c,c->argv[j]); - addReplyBulk(c,value); - decrRefCount(value); - listElementsRemoved(c,c->argv[j],where,o,1); + /* Non-existing key, move to next key. */ + if (o == NULL) continue; - /* Replicate it as an [LR]POP instead of B[LR]POP. */ - rewriteClientCommandVector(c,2, - (where == LIST_HEAD) ? shared.lpop : shared.rpop, - c->argv[j]); - return; - } - } + if (checkType(c, o, OBJ_LIST)) return; + + long llen = listTypeLength(o); + /* Empty list, move to next key. */ + if (llen == 0) continue; + + if (count != 0) { + /* BLMPOP, non empty list, like a normal [LR]POP with count option. + * The difference here we pop a range of elements in a nested arrays way. */ + listPopRangeAndReplyWithKey(c, o, key, where, count, NULL); + + /* Replicate it as [LR]POP COUNT. */ + robj *count_obj = createStringObjectFromLongLong((count > llen) ? llen : count); + rewriteClientCommandVector(c, 3, + (where == LIST_HEAD) ? shared.lpop : shared.rpop, + key, count_obj); + decrRefCount(count_obj); + return; } + + /* Non empty list, this is like a normal [LR]POP. */ + robj *value = listTypePop(o,where); + serverAssert(value != NULL); + + addReplyArrayLen(c,2); + addReplyBulk(c,key); + addReplyBulk(c,value); + decrRefCount(value); + listElementsRemoved(c,key,where,o,1,NULL); + + /* Replicate it as an [LR]POP instead of B[LR]POP. */ + rewriteClientCommandVector(c,2, + (where == LIST_HEAD) ? shared.lpop : shared.rpop, + key); + return; } /* If we are not allowed to block the client, the only thing @@ -946,17 +1089,17 @@ void blockingPopGenericCommand(client *c, int where) { /* If the keys do not exist we must block */ struct listPos pos = {where}; - blockForKeys(c,BLOCKED_LIST,c->argv + 1,c->argc - 2,timeout,NULL,&pos,NULL); + blockForKeys(c,BLOCKED_LIST,keys,numkeys,count,timeout,NULL,&pos,NULL); } /* BLPOP [ ...] */ void blpopCommand(client *c) { - blockingPopGenericCommand(c,LIST_HEAD); + blockingPopGenericCommand(c,c->argv+1,c->argc-2,LIST_HEAD,c->argc-1,0); } /* BRPOP [ ...] */ void brpopCommand(client *c) { - blockingPopGenericCommand(c,LIST_TAIL); + blockingPopGenericCommand(c,c->argv+1,c->argc-2,LIST_TAIL,c->argc-1,0); } void blmoveGenericCommand(client *c, int wherefrom, int whereto, mstime_t timeout) { @@ -971,7 +1114,7 @@ void blmoveGenericCommand(client *c, int wherefrom, int whereto, mstime_t timeou } else { /* The list is empty and the client blocks. */ struct listPos pos = {wherefrom, whereto}; - blockForKeys(c,BLOCKED_LIST,c->argv + 1,1,timeout,c->argv[2],&pos,NULL); + blockForKeys(c,BLOCKED_LIST,c->argv + 1,1,0,timeout,c->argv[2],&pos,NULL); } } else { /* The list exists and has elements, so @@ -1001,3 +1144,62 @@ void brpoplpushCommand(client *c) { != C_OK) return; blmoveGenericCommand(c, LIST_TAIL, LIST_HEAD, timeout); } + +/* LMPOP/BLMPOP + * + * 'numkeys_idx' parameter position of key number. + * 'is_block' this indicates whether it is a blocking variant. */ +void lmpopGenericCommand(client *c, int numkeys_idx, int is_block) { + long j; + long numkeys = 0; /* Number of keys. */ + int where = 0; /* HEAD for LEFT, TAIL for RIGHT. */ + long count = 1; /* Reply will consist of up to count elements, depending on the list's length. */ + + /* Parse the numkeys. */ + if (getRangeLongFromObjectOrReply(c, c->argv[numkeys_idx], 1, LONG_MAX, + &numkeys, "numkeys should be greater than 0") != C_OK) + return; + + /* Parse the where. where_idx: the index of where in the c->argv. */ + long where_idx = numkeys_idx + numkeys + 1; + if (where_idx >= c->argc) { + addReplyErrorObject(c, shared.syntaxerr); + return; + } + if (getListPositionFromObjectOrReply(c, c->argv[where_idx], &where) != C_OK) + return; + + /* Parse the optional arguments. */ + for (j = where_idx + 1; j < c->argc; j++) { + char *opt = c->argv[j]->ptr; + int moreargs = (c->argc - 1) - j; + + if (!strcasecmp(opt, "COUNT") && moreargs) { + j++; + if (getRangeLongFromObjectOrReply(c, c->argv[j], 1, LONG_MAX, + &count,"count should be greater than 0") != C_OK) + return; + } else { + addReplyErrorObject(c, shared.syntaxerr); + return; + } + } + + if (is_block) { + /* BLOCK. We will handle CLIENT_DENY_BLOCKING flag in blockingPopGenericCommand. */ + blockingPopGenericCommand(c, c->argv+numkeys_idx+1, numkeys, where, 1, count); + } else { + /* NON-BLOCK */ + mpopGenericCommand(c, c->argv+numkeys_idx+1, numkeys, where, count); + } +} + +/* LMPOP numkeys [ ...] LEFT|RIGHT [COUNT count] */ +void lmpopCommand(client *c) { + lmpopGenericCommand(c, 1, 0); +} + +/* BLMPOP timeout numkeys [ ...] LEFT|RIGHT [COUNT count] */ +void blmpopCommand(client *c) { + lmpopGenericCommand(c, 2, 1); +} diff --git a/src/t_stream.c b/src/t_stream.c index d1b144a8f..3b64de91e 100644 --- a/src/t_stream.c +++ b/src/t_stream.c @@ -2152,7 +2152,7 @@ void xreadCommand(client *c) { goto cleanup; } blockForKeys(c, BLOCKED_STREAM, c->argv+streams_arg, streams_count, - timeout, NULL, NULL, ids); + 0, timeout, NULL, NULL, ids); /* If no COUNT is given and we block, set a relatively small count: * in case the ID provided is too low, we do not want the server to * block just to serve this client a huge stream of messages. */ diff --git a/src/t_zset.c b/src/t_zset.c index c7681adc7..5bbb3e1c3 100644 --- a/src/t_zset.c +++ b/src/t_zset.c @@ -3967,7 +3967,7 @@ void blockingGenericZpopCommand(client *c, int where) { } /* If the keys do not exist we must block */ - blockForKeys(c,BLOCKED_ZSET,c->argv + 1,c->argc - 2,timeout,NULL,NULL,NULL); + blockForKeys(c,BLOCKED_ZSET,c->argv + 1,c->argc - 2,0,timeout,NULL,NULL,NULL); } // BZPOPMIN key [key ...] timeout diff --git a/tests/integration/aof.tcl b/tests/integration/aof.tcl index c19918b20..aff7baacf 100644 --- a/tests/integration/aof.tcl +++ b/tests/integration/aof.tcl @@ -320,4 +320,52 @@ tags {"aof external:skip"} { } } } + + # Test that LMPOP/BLMPOP work fine with AOF. + create_aof { + append_to_aof [formatCommand lpush mylist a b c] + append_to_aof [formatCommand rpush mylist2 1 2 3] + append_to_aof [formatCommand lpush mylist3 a b c d e] + } + + start_server_aof [list dir $server_path aof-load-truncated no] { + test "AOF+LMPOP/BLMPOP: pop elements from the list" { + set client [redis [dict get $srv host] [dict get $srv port] 0 $::tls] + set client2 [redis [dict get $srv host] [dict get $srv port] 1 $::tls] + wait_done_loading $client + wait_done_loading $client2 + + # Pop all elements from mylist, should be blmpop delete mylist. + $client lmpop 1 mylist left count 1 + $client blmpop 0 1 mylist left count 10 + + # Pop all elements from mylist2, should be lmpop delete mylist2. + $client blmpop 0 2 mylist mylist2 right count 10 + $client lmpop 2 mylist mylist2 right count 2 + + # Blocking path, be blocked and then released. + $client2 blmpop 0 2 mylist mylist2 left count 2 + after 100 + $client lpush mylist2 a b c + + # Pop up the last element in mylist2 + $client blmpop 0 3 mylist mylist2 mylist3 left count 1 + + # Leave two elements in mylist3. + $client blmpop 0 3 mylist mylist2 mylist3 right count 3 + } + } + + start_server_aof [list dir $server_path aof-load-truncated no] { + test "AOF+LMPOP/BLMPOP: after pop elements from the list" { + set client [redis [dict get $srv host] [dict get $srv port] 0 $::tls] + wait_done_loading $client + + # mylist and mylist2 no longer exist. + assert_equal 0 [$client exists mylist mylist2] + + # Length of mylist3 is two. + assert_equal 2 [$client llen mylist3] + } + } } diff --git a/tests/unit/type/list.tcl b/tests/unit/type/list.tcl index 20fd0e49b..735c5eb18 100644 --- a/tests/unit/type/list.tcl +++ b/tests/unit/type/list.tcl @@ -6,6 +6,28 @@ start_server { } { source "tests/unit/type/list-common.tcl" + # A helper function for BPOP/BLMPOP with one input key. + proc bpop_command {rd pop key timeout} { + if {$pop == "BLMPOP_LEFT"} { + $rd blmpop $timeout 1 $key left count 1 + } elseif {$pop == "BLMPOP_RIGHT"} { + $rd blmpop $timeout 1 $key right count 1 + } else { + $rd $pop $key $timeout + } + } + + # A helper function for BPOP/BLMPOP with two input keys. + proc bpop_command_two_key {rd pop key key2 timeout} { + if {$pop == "BLMPOP_LEFT"} { + $rd blmpop $timeout 2 $key $key2 left count 1 + } elseif {$pop == "BLMPOP_RIGHT"} { + $rd blmpop $timeout 2 $key $key2 right count 1 + } else { + $rd $pop $key $key2 $timeout + } + } + test {LPOS basic usage} { r DEL mylist r RPUSH mylist a b c 1 2 3 c c @@ -111,10 +133,6 @@ start_server { assert_equal $largevalue(linkedlist) [r rpop mylist2] assert_equal c [r lpop mylist2] } - - test {R/LPOP against empty list} { - r lpop non-existing-list - } {} test {R/LPOP with the optional count argument} { assert_equal 7 [r lpush listcount aa bb cc dd ee ff gg] @@ -147,53 +165,77 @@ start_server { } foreach {type large} [array get largevalue] { - test "BLPOP, BRPOP: single existing list - $type" { + foreach {pop} {BLPOP BLMPOP_LEFT} { + test "$pop: single existing list - $type" { set rd [redis_deferring_client] create_list blist "a b $large c d" - $rd blpop blist 1 + bpop_command $rd $pop blist 1 assert_equal {blist a} [$rd read] - $rd brpop blist 1 + if {$pop == "BLPOP"} { + bpop_command $rd BRPOP blist 1 + } else { + bpop_command $rd BLMPOP_RIGHT blist 1 + } assert_equal {blist d} [$rd read] - $rd blpop blist 1 + bpop_command $rd $pop blist 1 assert_equal {blist b} [$rd read] - $rd brpop blist 1 + if {$pop == "BLPOP"} { + bpop_command $rd BRPOP blist 1 + } else { + bpop_command $rd BLMPOP_RIGHT blist 1 + } assert_equal {blist c} [$rd read] + + assert_equal 1 [r llen blist] } - test "BLPOP, BRPOP: multiple existing lists - $type" { + test "$pop: multiple existing lists - $type" { set rd [redis_deferring_client] create_list blist1{t} "a $large c" create_list blist2{t} "d $large f" - $rd blpop blist1{t} blist2{t} 1 + bpop_command_two_key $rd $pop blist1{t} blist2{t} 1 assert_equal {blist1{t} a} [$rd read] - $rd brpop blist1{t} blist2{t} 1 + if {$pop == "BLPOP"} { + bpop_command_two_key $rd BRPOP blist1{t} blist2{t} 1 + } else { + bpop_command_two_key $rd BLMPOP_RIGHT blist1{t} blist2{t} 1 + } assert_equal {blist1{t} c} [$rd read] assert_equal 1 [r llen blist1{t}] assert_equal 3 [r llen blist2{t}] - $rd blpop blist2{t} blist1{t} 1 + bpop_command_two_key $rd $pop blist2{t} blist1{t} 1 assert_equal {blist2{t} d} [$rd read] - $rd brpop blist2{t} blist1{t} 1 + if {$pop == "BLPOP"} { + bpop_command_two_key $rd BRPOP blist2{t} blist1{t} 1 + } else { + bpop_command_two_key $rd BLMPOP_RIGHT blist2{t} blist1{t} 1 + } assert_equal {blist2{t} f} [$rd read] assert_equal 1 [r llen blist1{t}] assert_equal 1 [r llen blist2{t}] } - test "BLPOP, BRPOP: second list has an entry - $type" { + test "$pop: second list has an entry - $type" { set rd [redis_deferring_client] r del blist1{t} create_list blist2{t} "d $large f" - $rd blpop blist1{t} blist2{t} 1 + bpop_command_two_key $rd $pop blist1{t} blist2{t} 1 assert_equal {blist2{t} d} [$rd read] - $rd brpop blist1{t} blist2{t} 1 + if {$pop == "BLPOP"} { + bpop_command_two_key $rd BRPOP blist1{t} blist2{t} 1 + } else { + bpop_command_two_key $rd BLMPOP_RIGHT blist1{t} blist2{t} 1 + } assert_equal {blist2{t} f} [$rd read] assert_equal 0 [r llen blist1{t}] assert_equal 1 [r llen blist2{t}] } + } test "BRPOPLPUSH - $type" { r del target{t} @@ -239,26 +281,31 @@ start_server { } } - test "BLPOP, LPUSH + DEL should not awake blocked client" { +foreach {pop} {BLPOP BLMPOP_LEFT} { + test "$pop, LPUSH + DEL should not awake blocked client" { set rd [redis_deferring_client] r del list - $rd blpop list 0 + bpop_command $rd $pop list 0 + after 100 ;# Make sure rd is blocked before MULTI + wait_for_blocked_client + r multi r lpush list a r del list r exec r del list r lpush list b - $rd read - } {list b} + assert_equal {list b} [$rd read] + } - test "BLPOP, LPUSH + DEL + SET should not awake blocked client" { + test "$pop, LPUSH + DEL + SET should not awake blocked client" { set rd [redis_deferring_client] r del list - $rd blpop list 0 + bpop_command $rd $pop list 0 after 100 ;# Make sure rd is blocked before MULTI + wait_for_blocked_client r multi r lpush list a @@ -267,8 +314,9 @@ start_server { r exec r del list r lpush list b - $rd read - } {list b} + assert_equal {list b} [$rd read] + } +} test "BLPOP with same key multiple times should work (issue #801)" { set rd [redis_deferring_client] @@ -291,29 +339,36 @@ start_server { assert_equal [$rd read] {list2{t} b} } - test "MULTI/EXEC is isolated from the point of view of BLPOP" { +foreach {pop} {BLPOP BLMPOP_LEFT} { + test "MULTI/EXEC is isolated from the point of view of $pop" { set rd [redis_deferring_client] r del list - $rd blpop list 0 + + bpop_command $rd $pop list 0 + after 100 ;# Make sure rd is blocked before MULTI + wait_for_blocked_client + r multi r lpush list a r lpush list b r lpush list c r exec - $rd read - } {list c} + assert_equal {list c} [$rd read] + } - test "BLPOP with variadic LPUSH" { + test "$pop with variadic LPUSH" { set rd [redis_deferring_client] r del blist if {$::valgrind} {after 100} - $rd blpop blist 0 + bpop_command $rd $pop blist 0 if {$::valgrind} {after 100} + wait_for_blocked_client assert_equal 2 [r lpush blist foo bar] if {$::valgrind} {after 100} assert_equal {blist bar} [$rd read] assert_equal foo [lindex [r lrange blist 0 -1] 0] } +} test "BRPOPLPUSH with zero timeout should block indefinitely" { set rd [redis_deferring_client] @@ -412,6 +467,32 @@ start_server { assert_equal {foo} [r lrange target2{t} 0 -1] } + test "BLMPOP with multiple blocked clients" { + set rd1 [redis_deferring_client] + set rd2 [redis_deferring_client] + set rd3 [redis_deferring_client] + set rd4 [redis_deferring_client] + r del blist{t} blist2{t} + + $rd1 blmpop 0 2 blist{t} blist2{t} left count 1 + $rd2 blmpop 0 2 blist{t} blist2{t} right count 10 + $rd3 blmpop 0 2 blist{t} blist2{t} left count 10 + $rd4 blmpop 0 2 blist{t} blist2{t} right count 1 + wait_for_blocked_clients_count 4 + + r multi + r lpush blist{t} a b c d e + r lpush blist2{t} 1 2 3 4 5 + r exec + + assert_equal {blist{t} e} [$rd1 read] + assert_equal {blist{t} {a b c d}} [$rd2 read] + assert_equal {blist2{t} {5 4 3 2 1}} [$rd3 read] + + r lpush blist2{t} 1 2 3 + assert_equal {blist2{t} 1} [$rd4 read] + } + test "Linked LMOVEs" { set rd1 [redis_deferring_client] set rd2 [redis_deferring_client] @@ -420,6 +501,7 @@ start_server { $rd1 blmove list1{t} list2{t} right left 0 $rd2 blmove list2{t} list3{t} left right 0 + wait_for_blocked_clients_count 2 r rpush list1{t} foo @@ -436,6 +518,7 @@ start_server { $rd1 brpoplpush list1{t} list2{t} 0 $rd2 brpoplpush list2{t} list1{t} 0 + wait_for_blocked_clients_count 2 r rpush list1{t} foo @@ -449,6 +532,7 @@ start_server { r del blist{t} $rd brpoplpush blist{t} blist{t} 0 + wait_for_blocked_client r rpush blist{t} foo @@ -475,6 +559,7 @@ start_server { r del srclist{t} dstlist{t} somekey{t} r set somekey{t} somevalue $blocked_client brpoplpush srclist{t} dstlist{t} 0 + wait_for_blocked_client $watching_client watch dstlist{t} $watching_client read $watching_client multi @@ -492,6 +577,7 @@ start_server { r del srclist{t} dstlist{t} somekey{t} r set somekey{t} somevalue $blocked_client brpoplpush srclist{t} dstlist{t} 0 + wait_for_blocked_client $watching_client watch dstlist{t} $watching_client read $watching_client multi @@ -513,16 +599,19 @@ start_server { $rd read } {} - test "BLPOP when new key is moved into place" { +foreach {pop} {BLPOP BLMPOP_LEFT} { + test "$pop when new key is moved into place" { set rd [redis_deferring_client] + r del foo{t} - $rd blpop foo{t} 5 + bpop_command $rd $pop foo{t} 0 + wait_for_blocked_client r lpush bob{t} abc def hij r rename bob{t} foo{t} $rd read } {foo{t} hij} - test "BLPOP when result key is created by SORT..STORE" { + test "$pop when result key is created by SORT..STORE" { set rd [redis_deferring_client] # zero out list from previous test without explicit delete @@ -530,17 +619,20 @@ start_server { r lpop foo{t} r lpop foo{t} - $rd blpop foo{t} 5 + bpop_command $rd $pop foo{t} 5 + wait_for_blocked_client r lpush notfoo{t} hello hola aguacate konichiwa zanzibar r sort notfoo{t} ALPHA store foo{t} $rd read } {foo{t} aguacate} +} - foreach {pop} {BLPOP BRPOP} { + foreach {pop} {BLPOP BRPOP BLMPOP_LEFT BLMPOP_RIGHT} { test "$pop: with single empty list argument" { set rd [redis_deferring_client] r del blist1 - $rd $pop blist1 1 + bpop_command $rd $pop blist1 1 + wait_for_blocked_client r rpush blist1 foo assert_equal {blist1 foo} [$rd read] assert_equal 0 [r exists blist1] @@ -548,14 +640,14 @@ start_server { test "$pop: with negative timeout" { set rd [redis_deferring_client] - $rd $pop blist1 -1 + bpop_command $rd $pop blist1 -1 assert_error "ERR*is negative*" {$rd read} } test "$pop: with non-integer timeout" { set rd [redis_deferring_client] r del blist1 - $rd $pop blist1 0.1 + bpop_command $rd $pop blist1 0.1 r rpush blist1 foo assert_equal {blist1 foo} [$rd read] assert_equal 0 [r exists blist1] @@ -565,7 +657,8 @@ start_server { # To test this, use a timeout of 0 and wait a second. # The blocking pop should still be waiting for a push. set rd [redis_deferring_client] - $rd $pop blist1 0 + bpop_command $rd $pop blist1 0 + wait_for_blocked_client after 1000 r rpush blist1 foo assert_equal {blist1 foo} [$rd read] @@ -575,6 +668,7 @@ start_server { set rd [redis_deferring_client] r del blist1{t} blist2{t} r set blist2{t} nolist{t} + bpop_command_two_key $rd $pop blist1{t} blist2{t} 1 $rd $pop blist1{t} blist2{t} 1 assert_error "WRONGTYPE*" {$rd read} } @@ -582,7 +676,8 @@ start_server { test "$pop: timeout" { set rd [redis_deferring_client] r del blist1{t} blist2{t} - $rd $pop blist1{t} blist2{t} 1 + bpop_command_two_key $rd $pop blist1{t} blist2{t} 1 + wait_for_blocked_client assert_equal {} [$rd read] } @@ -590,13 +685,15 @@ start_server { set rd [redis_deferring_client] r del blist1{t} blist2{t} - $rd $pop blist1{t} blist2{t} 1 + bpop_command_two_key $rd $pop blist1{t} blist2{t} 1 + wait_for_blocked_client r rpush blist1{t} foo assert_equal {blist1{t} foo} [$rd read] assert_equal 0 [r exists blist1{t}] assert_equal 0 [r exists blist2{t}] - $rd $pop blist1{t} blist2{t} 1 + bpop_command_two_key $rd $pop blist1{t} blist2{t} 1 + wait_for_blocked_client r rpush blist2{t} foo assert_equal {blist2{t} foo} [$rd read] assert_equal 0 [r exists blist1{t}] @@ -604,16 +701,57 @@ start_server { } } - test {BLPOP inside a transaction} { +foreach {pop} {BLPOP BLMPOP_LEFT} { + test "$pop inside a transaction" { r del xlist r lpush xlist foo r lpush xlist bar r multi - r blpop xlist 0 - r blpop xlist 0 - r blpop xlist 0 + + bpop_command r $pop xlist 0 + bpop_command r $pop xlist 0 + bpop_command r $pop xlist 0 r exec } {{xlist bar} {xlist foo} {}} +} + + test {BLMPOP propagate as pop with count command to replica} { + set rd [redis_deferring_client] + set repl [attach_to_replication_stream] + + # BLMPOP without block. + r lpush mylist{t} a b c + r rpush mylist2{t} 1 2 3 + r blmpop 0 1 mylist{t} left count 1 + r blmpop 0 2 mylist{t} mylist2{t} right count 10 + r blmpop 0 2 mylist{t} mylist2{t} right count 10 + + # BLMPOP with block. + $rd blmpop 0 1 mylist{t} left count 1 + wait_for_blocked_client + r lpush mylist{t} a + $rd blmpop 0 2 mylist{t} mylist2{t} left count 5 + wait_for_blocked_client + r lpush mylist{t} a b c + $rd blmpop 0 2 mylist{t} mylist2{t} right count 10 + wait_for_blocked_client + r rpush mylist2{t} a b c + + assert_replication_stream $repl { + {select *} + {lpush mylist{t} a b c} + {rpush mylist2{t} 1 2 3} + {lpop mylist{t} 1} + {rpop mylist{t} 2} + {rpop mylist2{t} 3} + {lpush mylist{t} a} + {lpop mylist{t} 1} + {lpush mylist{t} a b c} + {lpop mylist{t} 3} + {rpush mylist2{t} a b c} + {rpop mylist2{t} 3} + } + } {} {needs:repl} test {LPUSHX, RPUSHX - generic} { r del xlist @@ -860,23 +998,46 @@ start_server { } {} foreach {type large} [array get largevalue] { - test "Basic LPOP/RPOP - $type" { + test "Basic LPOP/RPOP/LMPOP - $type" { create_list mylist "$large 1 2" assert_equal $large [r lpop mylist] assert_equal 2 [r rpop mylist] assert_equal 1 [r lpop mylist] assert_equal 0 [r llen mylist] - # pop on empty list - assert_equal {} [r lpop mylist] - assert_equal {} [r rpop mylist] + create_list mylist "$large 1 2" + assert_equal "mylist $large" [r lmpop 1 mylist left count 1] + assert_equal {mylist {2 1}} [r lmpop 2 mylist mylist right count 2] } } - test {LPOP/RPOP against non list value} { - r set notalist foo - assert_error WRONGTYPE* {r lpop notalist} - assert_error WRONGTYPE* {r rpop notalist} + test {LPOP/RPOP/LMPOP against empty list} { + r del non-existing-list{t} non-existing-list2{t} + + assert_equal {} [r lpop non-existing-list{t}] + assert_equal {} [r rpop non-existing-list2{t}] + + assert_equal {} [r lmpop 1 non-existing-list{t} left count 1] + assert_equal {} [r lmpop 1 non-existing-list{t} left count 10] + assert_equal {} [r lmpop 2 non-existing-list{t} non-existing-list2{t} right count 1] + assert_equal {} [r lmpop 2 non-existing-list{t} non-existing-list2{t} right count 10] + } + + test {LPOP/RPOP/LMPOP NON-BLOCK or BLOCK against non list value} { + r set notalist{t} foo + assert_error WRONGTYPE* {r lpop notalist{t}} + assert_error WRONGTYPE* {r blpop notalist{t} 0} + assert_error WRONGTYPE* {r rpop notalist{t}} + assert_error WRONGTYPE* {r brpop notalist{t} 0} + + r del notalist2{t} + assert_error "WRONGTYPE*" {r lmpop 2 notalist{t} notalist2{t} left count 1} + assert_error "WRONGTYPE*" {r blmpop 0 2 notalist{t} notalist2{t} left count 1} + + r del notalist{t} + r set notalist2{t} nolist + assert_error "WRONGTYPE*" {r lmpop 2 notalist{t} notalist2{t} right count 10} + assert_error "WRONGTYPE*" {r blmpop 0 2 notalist{t} notalist2{t} left count 1} } foreach {type num} {quicklist 250 quicklist 500} { @@ -897,6 +1058,121 @@ start_server { } } + test {LMPOP with illegal argument} { + assert_error "ERR wrong number of arguments*" {r lmpop} + assert_error "ERR wrong number of arguments*" {r lmpop 1} + assert_error "ERR wrong number of arguments*" {r lmpop 1 mylist{t}} + + assert_error "ERR numkeys*" {r lmpop 0 mylist{t} LEFT} + assert_error "ERR numkeys*" {r lmpop a mylist{t} LEFT} + assert_error "ERR numkeys*" {r lmpop -1 mylist{t} RIGHT} + + assert_error "ERR syntax error*" {r lmpop 1 mylist{t} bad_where} + assert_error "ERR syntax error*" {r lmpop 1 mylist{t} LEFT bar_arg} + assert_error "ERR syntax error*" {r lmpop 1 mylist{t} RIGHT LEFT} + assert_error "ERR syntax error*" {r lmpop 1 mylist{t} COUNT} + assert_error "ERR syntax error*" {r lmpop 2 mylist{t} mylist2{t} bad_arg} + + assert_error "ERR count*" {r lmpop 1 mylist{t} LEFT COUNT 0} + assert_error "ERR count*" {r lmpop 1 mylist{t} RIGHT COUNT a} + assert_error "ERR count*" {r lmpop 1 mylist{t} LEFT COUNT -1} + assert_error "ERR count*" {r lmpop 2 mylist{t} mylist2{t} RIGHT COUNT -1} + } + + test {LMPOP single existing list} { + # Same key multiple times. + create_list mylist{t} "a b c d e f" + assert_equal {mylist{t} {a b}} [r lmpop 2 mylist{t} mylist{t} left count 2] + assert_equal {mylist{t} {f e}} [r lmpop 2 mylist{t} mylist{t} right count 2] + assert_equal 2 [r llen mylist{t}] + + # First one exists, second one does not exist. + create_list mylist{t} "a b c d e" + r del mylist2{t} + assert_equal {mylist{t} a} [r lmpop 2 mylist{t} mylist2{t} left count 1] + assert_equal 4 [r llen mylist{t}] + assert_equal {mylist{t} {e d c b}} [r lmpop 2 mylist{t} mylist2{t} right count 10] + assert_equal {} [r lmpop 2 mylist{t} mylist2{t} right count 1] + + # First one does not exist, second one exists. + r del mylist{t} + create_list mylist2{t} "1 2 3 4 5" + assert_equal {mylist2{t} 5} [r lmpop 2 mylist{t} mylist2{t} right count 1] + assert_equal 4 [r llen mylist2{t}] + assert_equal {mylist2{t} {1 2 3 4}} [r lmpop 2 mylist{t} mylist2{t} left count 10] + + assert_equal 0 [r exists mylist{t} mylist2{t}] + } + + test {LMPOP multiple existing lists} { + create_list mylist{t} "a b c d e" + create_list mylist2{t} "1 2 3 4 5" + + # Pop up from the first key. + assert_equal {mylist{t} {a b}} [r lmpop 2 mylist{t} mylist2{t} left count 2] + assert_equal 3 [r llen mylist{t}] + assert_equal {mylist{t} {e d c}} [r lmpop 2 mylist{t} mylist2{t} right count 3] + assert_equal 0 [r exists mylist{t}] + + # Pop up from the second key. + assert_equal {mylist2{t} {1 2 3}} [r lmpop 2 mylist{t} mylist2{t} left count 3] + assert_equal 2 [r llen mylist2{t}] + assert_equal {mylist2{t} {5 4}} [r lmpop 2 mylist{t} mylist2{t} right count 2] + assert_equal 0 [r exists mylist{t}] + + # Pop up all elements. + create_list mylist{t} "a b c" + create_list mylist2{t} "1 2 3" + assert_equal {mylist{t} {a b c}} [r lmpop 2 mylist{t} mylist2{t} left count 10] + assert_equal 0 [r llen mylist{t}] + assert_equal {mylist2{t} {3 2 1}} [r lmpop 2 mylist{t} mylist2{t} right count 10] + assert_equal 0 [r llen mylist2{t}] + assert_equal 0 [r exists mylist{t} mylist2{t}] + } + + test {LMPOP propagate as pop with count command to replica} { + set repl [attach_to_replication_stream] + + # left/right propagate as lpop/rpop with count + r lpush mylist{t} a b c + + # Pop elements from one list. + r lmpop 1 mylist{t} left count 1 + r lmpop 1 mylist{t} right count 1 + + # Now the list have only one element + r lmpop 2 mylist{t} mylist2{t} left count 10 + + # No elements so we don't propagate. + r lmpop 2 mylist{t} mylist2{t} left count 10 + + # Pop elements from the second list. + r rpush mylist2{t} 1 2 3 + r lmpop 2 mylist{t} mylist2{t} left count 2 + r lmpop 2 mylist{t} mylist2{t} right count 1 + + # Pop all elements. + r rpush mylist{t} a b c + r rpush mylist2{t} 1 2 3 + r lmpop 2 mylist{t} mylist2{t} left count 10 + r lmpop 2 mylist{t} mylist2{t} right count 10 + + assert_replication_stream $repl { + {select *} + {lpush mylist{t} a b c} + {lpop mylist{t} 1} + {rpop mylist{t} 1} + {lpop mylist{t} 1} + {rpush mylist2{t} 1 2 3} + {lpop mylist2{t} 2} + {rpop mylist2{t} 1} + {rpush mylist{t} a b c} + {rpush mylist2{t} 1 2 3} + {lpop mylist{t} 3} + {rpop mylist2{t} 3} + } + } {} {needs:repl} + foreach {type large} [array get largevalue] { test "LRANGE basics - $type" { create_list mylist "$large 1 2 3 4 5 6 7 8 9" @@ -1034,6 +1310,7 @@ start_server { r ping } {PONG} +foreach {pop} {BLPOP BLMPOP_RIGHT} { test "client unblock tests" { r del l set rd [redis_deferring_client] @@ -1041,19 +1318,19 @@ start_server { set id [$rd read] # test default args - $rd blpop l 0 + bpop_command $rd $pop l 0 wait_for_blocked_client r client unblock $id assert_equal {} [$rd read] # test with timeout - $rd blpop l 0 + bpop_command $rd $pop l 0 wait_for_blocked_client r client unblock $id TIMEOUT assert_equal {} [$rd read] # test with error - $rd blpop l 0 + bpop_command $rd $pop l 0 wait_for_blocked_client r client unblock $id ERROR catch {[$rd read]} e @@ -1069,11 +1346,12 @@ start_server { assert_equal $e {invalid command name "0"} # finally, see the this client and list are still functional - $rd blpop l 0 + bpop_command $rd $pop l 0 wait_for_blocked_client r lpush l foo assert_equal {l foo} [$rd read] } {} +} test {List ziplist of various encodings} { r del k