mirror of https://mirror.osredm.com/root/redis.git
Add new hexpired notification for HFE (#13329)
When the hash field expired, we will send a new `hexpired` notification. It mainly includes the following three cases: 1. When field expired by active expiration. 2. When field expired by lazy expiration. 3. When the user uses the `h(p)expire(at)` command, the user will also get a `hexpired` notification if the field expires during the command. ## Improvement 1. Now if more than one field expires in the hmget command, we will only send a `hexpired` notification. 2. When a field with TTL is deleted by commands like hdel without updating the global DS, active expire will not send a notification. --------- Co-authored-by: Ozan Tezcan <ozantezcan@gmail.com> Co-authored-by: Moti Cohen <moti.cohen@redis.com>
This commit is contained in:
parent
f01fdc3960
commit
ed10f737b8
|
@ -3195,6 +3195,8 @@ typedef struct dictExpireMetadata {
|
|||
#define HFE_LAZY_EXPIRE (0) /* Delete expired field, and if last field also the hash */
|
||||
#define HFE_LAZY_AVOID_FIELD_DEL (1<<0) /* Avoid deleting expired field */
|
||||
#define HFE_LAZY_AVOID_HASH_DEL (1<<1) /* Avoid deleting hash if the field is the last one */
|
||||
#define HFE_LAZY_NO_NOTIFICATION (1<<2) /* Do not send notification, used when multiple fields
|
||||
* may expire and only one notification is desired. */
|
||||
|
||||
void hashTypeConvert(robj *o, int enc, ebuckets *hexpires);
|
||||
void hashTypeTryConversion(redisDb *db, robj *subject, robj **argv, int start, int end);
|
||||
|
|
75
src/t_hash.c
75
src/t_hash.c
|
@ -750,16 +750,19 @@ GetFieldRes hashTypeGetValue(redisDb *db, robj *o, sds field, unsigned char **vs
|
|||
propagateHashFieldDeletion(db, key, field, sdslen(field));
|
||||
|
||||
/* If the field is the last one in the hash, then the hash will be deleted */
|
||||
res = GETF_EXPIRED;
|
||||
robj *keyObj = createStringObject(key, sdslen(key));
|
||||
if (!(hfeFlags & HFE_LAZY_NO_NOTIFICATION))
|
||||
notifyKeyspaceEvent(NOTIFY_HASH, "hexpired", keyObj, db->id);
|
||||
if ((hashTypeLength(o, 0) == 0) && (!(hfeFlags & HFE_LAZY_AVOID_HASH_DEL))) {
|
||||
robj *keyObj = createStringObject(key, sdslen(key));
|
||||
notifyKeyspaceEvent(NOTIFY_GENERIC, "del", keyObj, db->id);
|
||||
if (!(hfeFlags & HFE_LAZY_NO_NOTIFICATION))
|
||||
notifyKeyspaceEvent(NOTIFY_GENERIC, "del", keyObj, db->id);
|
||||
dbDelete(db,keyObj);
|
||||
signalModifiedKey(NULL, db, keyObj);
|
||||
decrRefCount(keyObj);
|
||||
return GETF_EXPIRED_HASH;
|
||||
res = GETF_EXPIRED_HASH;
|
||||
}
|
||||
|
||||
return GETF_EXPIRED;
|
||||
signalModifiedKey(NULL, db, keyObj);
|
||||
decrRefCount(keyObj);
|
||||
return res;
|
||||
}
|
||||
|
||||
/* Like hashTypeGetValue() but returns a Redis object, which is useful for
|
||||
|
@ -1155,10 +1158,12 @@ void hashTypeSetExDone(HashTypeSetEx *ex) {
|
|||
if (ex->fieldDeleted && hashTypeLength(ex->hashObj, 0) == 0) {
|
||||
dbDelete(ex->db,ex->key);
|
||||
signalModifiedKey(ex->c, ex->db, ex->key);
|
||||
notifyKeyspaceEvent(NOTIFY_HASH, "hexpired", ex->key, ex->db->id);
|
||||
notifyKeyspaceEvent(NOTIFY_GENERIC,"del",ex->key, ex->db->id);
|
||||
} else {
|
||||
signalModifiedKey(ex->c, ex->db, ex->key);
|
||||
notifyKeyspaceEvent(NOTIFY_HASH, "hexpire", ex->key, ex->db->id);
|
||||
notifyKeyspaceEvent(NOTIFY_HASH, ex->fieldDeleted ? "hexpired" : "hexpire",
|
||||
ex->key, ex->db->id);
|
||||
|
||||
/* If minimum HFE of the hash is smaller than expiration time of the
|
||||
* specified fields in the command as well as it is smaller or equal
|
||||
|
@ -1819,16 +1824,23 @@ static ExpireAction hashTypeActiveExpire(eItem _hashObj, void *ctx) {
|
|||
/* Update quota left */
|
||||
activeExpireCtx->fieldsToExpireQuota -= info.itemsExpired;
|
||||
|
||||
/* In some cases, a field might have been deleted without updating the global DS.
|
||||
* As a result, active-expire might not expire any fields, in such cases,
|
||||
* we don't need to send notifications or perform other operations for this key. */
|
||||
if (info.itemsExpired) {
|
||||
robj *key = createStringObject(keystr, sdslen(keystr));
|
||||
notifyKeyspaceEvent(NOTIFY_HASH,"hexpired",key,activeExpireCtx->db->id);
|
||||
if (hashTypeLength(hashObj, 0) == 0) {
|
||||
dbDelete(activeExpireCtx->db, key);
|
||||
notifyKeyspaceEvent(NOTIFY_GENERIC,"del",key,activeExpireCtx->db->id);
|
||||
}
|
||||
server.dirty++;
|
||||
signalModifiedKey(NULL, activeExpireCtx->db, key);
|
||||
decrRefCount(key);
|
||||
}
|
||||
|
||||
/* If hash has no more fields to expire, remove it from HFE DB */
|
||||
if (info.nextExpireTime == EB_EXPIRE_TIME_INVALID) {
|
||||
if (hashTypeLength(hashObj, 0) == 0) {
|
||||
robj *key = createStringObject(keystr, sdslen(keystr));
|
||||
dbDelete(activeExpireCtx->db, key);
|
||||
notifyKeyspaceEvent(NOTIFY_GENERIC,"del",key, activeExpireCtx->db->id);
|
||||
server.dirty++;
|
||||
signalModifiedKey(NULL, &server.db[0], key);
|
||||
decrRefCount(key);
|
||||
}
|
||||
return ACT_REMOVE_EXP_ITEM;
|
||||
} else {
|
||||
/* Hash has more fields to expire. Update next expiration time of the hash
|
||||
|
@ -2164,7 +2176,7 @@ void hincrbyfloatCommand(client *c) {
|
|||
decrRefCount(newobj);
|
||||
}
|
||||
|
||||
static GetFieldRes addHashFieldToReply(client *c, robj *o, sds field) {
|
||||
static GetFieldRes addHashFieldToReply(client *c, robj *o, sds field, int hfeFlags) {
|
||||
if (o == NULL) {
|
||||
addReplyNull(c);
|
||||
return GETF_NOT_FOUND;
|
||||
|
@ -2174,8 +2186,7 @@ static GetFieldRes addHashFieldToReply(client *c, robj *o, sds field) {
|
|||
unsigned int vlen = UINT_MAX;
|
||||
long long vll = LLONG_MAX;
|
||||
|
||||
GetFieldRes res = hashTypeGetValue(c->db, o, field, &vstr, &vlen, &vll,
|
||||
HFE_LAZY_EXPIRE);
|
||||
GetFieldRes res = hashTypeGetValue(c->db, o, field, &vstr, &vlen, &vll, hfeFlags);
|
||||
if (res == GETF_OK) {
|
||||
if (vstr) {
|
||||
addReplyBulkCBuffer(c, vstr, vlen);
|
||||
|
@ -2194,13 +2205,14 @@ void hgetCommand(client *c) {
|
|||
if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.null[c->resp])) == NULL ||
|
||||
checkType(c,o,OBJ_HASH)) return;
|
||||
|
||||
addHashFieldToReply(c, o, c->argv[2]->ptr);
|
||||
addHashFieldToReply(c, o, c->argv[2]->ptr, HFE_LAZY_EXPIRE);
|
||||
}
|
||||
|
||||
void hmgetCommand(client *c) {
|
||||
GetFieldRes res = GETF_OK;
|
||||
robj *o;
|
||||
int i;
|
||||
int expired = 0, deleted = 0;
|
||||
|
||||
/* Don't abort when the key cannot be found. Non-existing keys are empty
|
||||
* hashes, where HMGET should respond with a series of null bulks. */
|
||||
|
@ -2209,17 +2221,22 @@ void hmgetCommand(client *c) {
|
|||
|
||||
addReplyArrayLen(c, c->argc-2);
|
||||
for (i = 2; i < c->argc ; i++) {
|
||||
|
||||
res = addHashFieldToReply(c, o, c->argv[i]->ptr);
|
||||
|
||||
/* If hash got lazy expired since all fields are expired (o is invalid),
|
||||
* then fill the rest with trivial nulls and return */
|
||||
if (res == GETF_EXPIRED_HASH) {
|
||||
while (++i < c->argc)
|
||||
addReplyNull(c);
|
||||
return;
|
||||
if (!deleted) {
|
||||
res = addHashFieldToReply(c, o, c->argv[i]->ptr, HFE_LAZY_NO_NOTIFICATION);
|
||||
expired += (res == GETF_EXPIRED);
|
||||
deleted += (res == GETF_EXPIRED_HASH);
|
||||
} else {
|
||||
/* If hash got lazy expired since all fields are expired (o is invalid),
|
||||
* then fill the rest with trivial nulls and return. */
|
||||
addReplyNull(c);
|
||||
}
|
||||
}
|
||||
|
||||
if (expired) {
|
||||
notifyKeyspaceEvent(NOTIFY_HASH, "hexpired", c->argv[1], c->db->id);
|
||||
if (deleted)
|
||||
notifyKeyspaceEvent(NOTIFY_GENERIC, "del", c->argv[1], c->db->id);
|
||||
}
|
||||
}
|
||||
|
||||
void hdelCommand(client *c) {
|
||||
|
|
|
@ -356,16 +356,17 @@ start_server {tags {"pubsub network"}} {
|
|||
foreach {type max_lp_entries} {listpackex 512 hashtable 0} {
|
||||
test "Keyspace notifications: hash events test ($type)" {
|
||||
r config set hash-max-listpack-entries $max_lp_entries
|
||||
r config set notify-keyspace-events Kh
|
||||
r config set notify-keyspace-events Khg
|
||||
r del myhash
|
||||
set rd1 [redis_deferring_client]
|
||||
assert_equal {1} [psubscribe $rd1 *]
|
||||
r hmset myhash yes 1 no 0
|
||||
r hmset myhash yes 1 no 0 f1 1 f2 2 f3_hdel 3
|
||||
r hincrby myhash yes 10
|
||||
r hexpire myhash 999999 FIELDS 1 yes
|
||||
r hexpireat myhash [expr {[clock seconds] + 999999}] NX FIELDS 1 no
|
||||
r hpexpire myhash 999999 FIELDS 1 yes
|
||||
r hpersist myhash FIELDS 1 yes
|
||||
r hpexpire myhash 0 FIELDS 1 yes
|
||||
assert_encoding $type myhash
|
||||
assert_equal "pmessage * __keyspace@${db}__:myhash hset" [$rd1 read]
|
||||
assert_equal "pmessage * __keyspace@${db}__:myhash hincrby" [$rd1 read]
|
||||
|
@ -373,8 +374,38 @@ start_server {tags {"pubsub network"}} {
|
|||
assert_equal "pmessage * __keyspace@${db}__:myhash hexpire" [$rd1 read]
|
||||
assert_equal "pmessage * __keyspace@${db}__:myhash hexpire" [$rd1 read]
|
||||
assert_equal "pmessage * __keyspace@${db}__:myhash hpersist" [$rd1 read]
|
||||
assert_equal "pmessage * __keyspace@${db}__:myhash hexpired" [$rd1 read]
|
||||
|
||||
# Test that we will get `hexpired` notification when
|
||||
# a hash field is removed by active expire.
|
||||
r hpexpire myhash 10 FIELDS 1 no
|
||||
after 100 ;# Wait for active expire
|
||||
assert_equal "pmessage * __keyspace@${db}__:myhash hexpire" [$rd1 read]
|
||||
assert_equal "pmessage * __keyspace@${db}__:myhash hexpired" [$rd1 read]
|
||||
|
||||
# Test that when a field with TTL is deleted by commands like hdel without
|
||||
# updating the global DS, active expire will not send a notification.
|
||||
r hpexpire myhash 100 FIELDS 1 f3_hdel
|
||||
r hdel myhash f3_hdel
|
||||
after 200 ;# Wait for active expire
|
||||
assert_equal "pmessage * __keyspace@${db}__:myhash hexpire" [$rd1 read]
|
||||
assert_equal "pmessage * __keyspace@${db}__:myhash hdel" [$rd1 read]
|
||||
|
||||
# Test that we will get `hexpired` notification when
|
||||
# a hash field is removed by lazy expire.
|
||||
r debug set-active-expire 0
|
||||
r hpexpire myhash 10 FIELDS 2 f1 f2
|
||||
after 20
|
||||
r hmget myhash f1 f2 ;# Trigger lazy expire
|
||||
assert_equal "pmessage * __keyspace@${db}__:myhash hexpire" [$rd1 read]
|
||||
# We should get only one `hexpired` notification even two fields was expired.
|
||||
assert_equal "pmessage * __keyspace@${db}__:myhash hexpired" [$rd1 read]
|
||||
# We should get a `del` notification after all fields were expired.
|
||||
assert_equal "pmessage * __keyspace@${db}__:myhash del" [$rd1 read]
|
||||
r debug set-active-expire 1
|
||||
|
||||
$rd1 close
|
||||
}
|
||||
} {0} {needs:debug}
|
||||
} ;# foreach
|
||||
|
||||
test "Keyspace notifications: stream events test" {
|
||||
|
|
Loading…
Reference in New Issue