mirror of https://mirror.osredm.com/root/redis.git
Support HSET+expire in one command, at infra level (#13230)
Unify infra of `HSETF`, `HEXPIRE`, `HSET` and provide API for RDB load as well. Whereas setting plain fields is rather straightforward, setting expiration time to fields might be time-consuming and complex since each update of expiration time, not only updates `ebuckets` of corresponding hash, but also might update `ebuckets` of global HFE DS. It is required to opt sequence of field updates with expirartion for a given hash, such that only once done, the global HFE DS will get updated. To do so, follow the scheme: 1. Call `hashTypeSetExInit()` to initialize the HashTypeSetEx struct. 2. Call `hashTypeSetEx()` one time or more, for each field/expiration update. 3. Call `hashTypeSetExDone()` for notification and update of global HFE. If expiration is not required, then avoid this API and use instead hashTypeSet().
This commit is contained in:
parent
c18ff05665
commit
c33c91dbce
|
@ -1170,6 +1170,7 @@ static void ebValidateRax(rax *rax, EbucketsType *type) {
|
|||
raxStart(&raxIter, rax);
|
||||
raxSeek(&raxIter, "^", NULL, 0);
|
||||
while (raxNext(&raxIter)) {
|
||||
int expectFirstItemBucket = 1;
|
||||
FirstSegHdr *firstSegHdr = raxIter.data;
|
||||
eItem iter;
|
||||
ExpireMeta *mIter, *mHead;
|
||||
|
@ -1181,7 +1182,6 @@ static void ebValidateRax(rax *rax, EbucketsType *type) {
|
|||
void *segHdr = firstSegHdr;
|
||||
|
||||
mIter = type->getExpireMeta(iter);
|
||||
assert(mIter->firstItemBucket == 1);
|
||||
while (1) {
|
||||
uint64_t curBktKey, prevBktKey;
|
||||
for (int i = 0; i < mHead->numItems ; ++i) {
|
||||
|
@ -1191,6 +1191,8 @@ static void ebValidateRax(rax *rax, EbucketsType *type) {
|
|||
|
||||
if (i == 0) {
|
||||
assert(mIter->numItems > 0 && mIter->numItems <= EB_SEG_MAX_ITEMS);
|
||||
assert(mIter->firstItemBucket == expectFirstItemBucket);
|
||||
expectFirstItemBucket = 0;
|
||||
prevBktKey = curBktKey;
|
||||
} else {
|
||||
assert( (extendedSeg && prevBktKey == curBktKey) ||
|
||||
|
|
|
@ -251,7 +251,10 @@ typedef struct ExpireInfo {
|
|||
void *ctx; /* [INPUT ] context to pass to onExpireItem */
|
||||
uint64_t now; /* [INPUT ] Current time in msec. */
|
||||
uint64_t nextExpireTime; /* [OUTPUT] Next expiration time. Return 0, if none left. */
|
||||
uint64_t itemsExpired; /* [OUTPUT] Returns the number of expired items. */
|
||||
|
||||
/* TODO: Distinct between expired & updated */
|
||||
uint64_t itemsExpired; /* [OUTPUT] Returns the number of expired or updated items. */
|
||||
|
||||
} ExpireInfo;
|
||||
|
||||
/* ebuckets API */
|
||||
|
|
719
src/t_hash.c
719
src/t_hash.c
|
@ -17,7 +17,7 @@
|
|||
/* hash field expiration (HFE) funcs */
|
||||
static ExpireAction onFieldExpire(eItem item, void *ctx);
|
||||
static ExpireMeta* hfieldGetExpireMeta(const eItem field);
|
||||
static ExpireMeta *hashGetExpireMeta(const eItem item);
|
||||
static ExpireMeta *hashGetExpireMeta(const eItem hash);
|
||||
static void hexpireGenericCommand(client *c, const char *cmd, long long basetime, int unit);
|
||||
static ExpireAction hashTypeActiveExpire(eItem hashObj, void *ctx);
|
||||
static void hfieldPersist(redisDb *db, robj *hashObj, hfield field);
|
||||
|
@ -143,29 +143,112 @@ typedef enum SetPersistRes {
|
|||
HFE_PERSIST_OK = 1
|
||||
} SetPersistRes;
|
||||
|
||||
/* Used by hashTypeSetExpire() */
|
||||
typedef enum SetExpireTimeRes {
|
||||
HFE_SET_NO_FIELD = -2, /* No such hash-field */
|
||||
HFE_SET_NO_CONDITION_MET = 0, /* Specified NX | XX | GT | LT condition not met */
|
||||
HFE_SET_OK = 1, /* Expiration time set/updated as expected */
|
||||
HFE_SET_DELETED = 2 /* Field deleted because the specified time is in the past */
|
||||
} SetExpireTimeRes;
|
||||
|
||||
/* Used by httlGenericCommand() */
|
||||
typedef enum GetExpireTimeRes {
|
||||
HFE_GET_NO_FIELD = -2, /* No such hash-field */
|
||||
HFE_GET_NO_TTL = -1, /* No TTL attached to the field */
|
||||
} GetExpireTimeRes;
|
||||
|
||||
#define HFE_NX (1<<0)
|
||||
#define HFE_XX (1<<1)
|
||||
#define HFE_GT (1<<2)
|
||||
#define HFE_LT (1<<3)
|
||||
|
||||
static inline int isDictWithMetaHFE(dict *d) {
|
||||
return d->type == &mstrHashDictTypeWithHFE;
|
||||
}
|
||||
|
||||
/*-----------------------------------------------------------------------------
|
||||
* setex* - Set field OR field's expiration
|
||||
*
|
||||
* Whereas setting plain fields is rather straightforward, setting expiration
|
||||
* time to fields might be time-consuming and complex since each update of
|
||||
* expiration time, not only updates `ebuckets` of corresponding hash, but also
|
||||
* might update `ebuckets` of global HFE DS. It is required to opt sequence of
|
||||
* field updates with expirartion for a given hash, such that only once done,
|
||||
* the global HFE DS will get updated.
|
||||
*
|
||||
* To do so, follow the scheme:
|
||||
* 1. Call hashTypeSetExInit() to initialize the HashTypeSetEx struct.
|
||||
* 2. Call hashTypeSetEx() one time or more, for each field/expiration update.
|
||||
* 3. Call hashTypeSetExDone() for notification and update of global HFE.
|
||||
*
|
||||
* If expiration is not required, then avoid this API and use instead hashTypeSet()
|
||||
*----------------------------------------------------------------------------*/
|
||||
|
||||
/* Returned value of hashTypeSetEx() */
|
||||
typedef enum SetExRes {
|
||||
/* Common res from hashTypeSetEx() */
|
||||
HSETEX_OK = 1, /* Expiration time set/updated as expected */
|
||||
|
||||
/* If provided HashTypeSetEx struct to hashTypeSetEx() */
|
||||
HSETEX_NO_FIELD = -2, /* No such hash-field */
|
||||
HSETEX_NO_CONDITION_MET = 0, /* Specified NX | XX | GT | LT condition not met */
|
||||
HSETEX_DELETED = 2, /* Field deleted because the specified time is in the past */
|
||||
|
||||
/* If not provided HashTypeSetEx struct to hashTypeSetEx() (plain HSET) */
|
||||
HSET_UPDATE = 4, /* Update of the field without expiration time */
|
||||
|
||||
} SetExRes;
|
||||
|
||||
/* Used by httlGenericCommand() */
|
||||
typedef enum GetExpireTimeRes {
|
||||
HFE_GET_NO_FIELD = -2, /* No such hash-field */
|
||||
HFE_GET_NO_TTL = -1, /* No TTL attached to the field */
|
||||
} GetExpireTimeRes;
|
||||
|
||||
/* on fail return HSETEX_NO_CONDITION_MET */
|
||||
typedef enum FieldSetCond {
|
||||
FIELD_CREATE_OR_OVRWRT = 0,
|
||||
FIELD_DONT_CREATE = 1,
|
||||
FIELD_DONT_CREATE2 = 2, /* on fail return HSETEX_NO_FIELD */
|
||||
FIELD_DONT_OVRWRT = 3
|
||||
} FieldSetCond;
|
||||
|
||||
typedef enum FieldGet { /* TBD */
|
||||
FIELD_GET_NONE = 0,
|
||||
FIELD_GET_NEW = 1,
|
||||
FIELD_GET_OLD = 2
|
||||
} FieldGet;
|
||||
|
||||
typedef enum ExpireSetCond {
|
||||
HFE_NX = 1<<0,
|
||||
HFE_XX = 1<<1,
|
||||
HFE_GT = 1<<2,
|
||||
HFE_LT = 1<<3
|
||||
} ExpireSetCond;
|
||||
|
||||
typedef struct HashTypeSet {
|
||||
sds value;
|
||||
int flags;
|
||||
} HashTypeSet;
|
||||
|
||||
/* Used by hashTypeSetEx() for setting fields or their expiry */
|
||||
typedef struct HashTypeSetEx {
|
||||
|
||||
/*** config ***/
|
||||
FieldSetCond fieldSetCond; /* [DCF | DOF] */
|
||||
ExpireSetCond expireSetCond; /* [XX | NX | GT | LT] */
|
||||
FieldGet fieldGet; /* [GETNEW | GETOLD] TODO */
|
||||
|
||||
/*** metadata ***/
|
||||
dictExpireMetadata *dictExpireMeta; /* keep ref to dict's metadata */
|
||||
uint64_t minExpire; /* if uninit EB_EXPIRE_TIME_INVALID */
|
||||
redisDb *db;
|
||||
robj *key, *hashObj;
|
||||
uint64_t minExpireFields; /* Trace updated fields and their previous/new
|
||||
* minimum expiration time. If minimum recorded
|
||||
* is above minExpire of the hash, then we don't
|
||||
* have to update global HFE DS */
|
||||
int fieldDeleted; /* Number of fields deleted */
|
||||
int fieldUpdated; /* Number of fields updated */
|
||||
|
||||
/* Optionally provide client for notification */
|
||||
client *c;
|
||||
const char *cmd;
|
||||
} HashTypeSetEx;
|
||||
|
||||
static SetExRes hashTypeSetExListpack(redisDb *db, robj *o, sds field, HashTypeSet *s,
|
||||
uint64_t expireAt, HashTypeSetEx *ex);
|
||||
|
||||
int hashTypeSetExInit(robj *key, robj *o, client *c, redisDb *db, const char *cmd,
|
||||
FieldSetCond fieldSetCond, FieldGet fieldGet,
|
||||
ExpireSetCond expireSetCond, HashTypeSetEx *ex);
|
||||
|
||||
SetExRes hashTypeSetEx(redisDb *db, robj *o, sds field, HashTypeSet *setKeyVal,
|
||||
uint64_t expireAt, HashTypeSetEx *exInfo);
|
||||
|
||||
void hashTypeSetExDone(HashTypeSetEx *e);
|
||||
|
||||
/*-----------------------------------------------------------------------------
|
||||
* Accessor functions for dictType of hash
|
||||
*----------------------------------------------------------------------------*/
|
||||
|
@ -186,15 +269,20 @@ static uint64_t dictMstrHash(const void *key) {
|
|||
}
|
||||
|
||||
static void dictHfieldDestructor(dict *d, void *field) {
|
||||
|
||||
/* If attached TTL to the field, then remove it from hash's private ebuckets. */
|
||||
if (hfieldGetExpireTime(field) != EB_EXPIRE_TIME_INVALID) {
|
||||
dictExpireMetadata *dictExpireMeta = (dictExpireMetadata *) dictMetadata(d);
|
||||
ebRemove(&dictExpireMeta->hfe, &hashFieldExpireBucketsType, field);
|
||||
|
||||
// TODO: Check if the field is the minimum in the hash and update the global HFE DS
|
||||
}
|
||||
|
||||
hfieldFree(field);
|
||||
|
||||
/* Don't have to update global HFE DS. It's unnecessary. Implementing this
|
||||
* would introduce significant complexity and overhead for an operation that
|
||||
* isn't critical. In the worst case scenario, the hash will be efficiently
|
||||
* updated later by an active-expire operation, or it will be removed by the
|
||||
* hash's dbGenericDelete() function. */
|
||||
}
|
||||
|
||||
static size_t hashDictWithExpireMetadataBytes(dict *d) {
|
||||
|
@ -382,82 +470,332 @@ int hashTypeExists(robj *o, sds field) {
|
|||
#define HASH_SET_TAKE_VALUE (1<<1)
|
||||
#define HASH_SET_COPY 0
|
||||
int hashTypeSet(redisDb *db, robj *o, sds field, sds value, int flags) {
|
||||
int update = 0;
|
||||
HashTypeSet set = {value, flags};
|
||||
return (hashTypeSetEx(db, o, field, &set, 0, NULL) == HSET_UPDATE) ? 1 : 0;
|
||||
}
|
||||
|
||||
SetExRes hashTypeSetExpiry(HashTypeSetEx *ex, sds field, uint64_t expireAt, dictEntry **de) {
|
||||
dict *ht = ex->hashObj->ptr;
|
||||
dictEntry *newEntry = NULL, *existingEntry = NULL;
|
||||
|
||||
/* New field with expiration metadata */
|
||||
hfield hfNew = hfieldNew(field, sdslen(field), 1 /*withExpireMeta*/);
|
||||
|
||||
if ((ex->fieldSetCond == FIELD_DONT_CREATE) || (ex->fieldSetCond == FIELD_DONT_CREATE2)) {
|
||||
if ((existingEntry = dictFind(ht, field)) == NULL) {
|
||||
hfieldFree(hfNew);
|
||||
return (ex->fieldSetCond == FIELD_DONT_CREATE) ?
|
||||
HSETEX_NO_CONDITION_MET : HSETEX_NO_FIELD;
|
||||
}
|
||||
} else {
|
||||
dictUseStoredKeyApi(ht, 1);
|
||||
newEntry = dictAddRaw(ht, hfNew, &existingEntry);
|
||||
dictUseStoredKeyApi(ht, 0);
|
||||
}
|
||||
|
||||
if (newEntry) {
|
||||
*de = newEntry;
|
||||
|
||||
if (ex->expireSetCond & (HFE_XX | HFE_LT | HFE_GT)) {
|
||||
dictDelete(ht, field);
|
||||
return HSETEX_NO_CONDITION_MET;
|
||||
}
|
||||
} else { /* field exist */
|
||||
*de = existingEntry;
|
||||
|
||||
if (ex->fieldSetCond == FIELD_DONT_OVRWRT) {
|
||||
hfieldFree(hfNew);
|
||||
return HSETEX_NO_CONDITION_MET;
|
||||
}
|
||||
|
||||
hfield hfOld = dictGetKey(existingEntry);
|
||||
|
||||
/* If field doesn't have expiry metadata attached */
|
||||
if (!hfieldIsExpireAttached(hfOld)) {
|
||||
|
||||
if (ex->expireSetCond & (HFE_XX | HFE_LT | HFE_GT)) {
|
||||
hfieldFree(hfNew);
|
||||
return HSETEX_NO_CONDITION_MET;
|
||||
}
|
||||
|
||||
/* Delete old field. Below goanna dictSetKey(..,hfNew) */
|
||||
hfieldFree(hfOld);
|
||||
|
||||
} else { /* field has ExpireMeta struct attached */
|
||||
|
||||
/* No need for hfNew (Just modify expire-time of existing field) */
|
||||
hfieldFree(hfNew);
|
||||
|
||||
uint64_t prevExpire = hfieldGetExpireTime(hfOld);
|
||||
|
||||
/* If field has valid expiration time, then check GT|LT|NX */
|
||||
if (prevExpire != EB_EXPIRE_TIME_INVALID) {
|
||||
if (((ex->expireSetCond == HFE_GT) && (prevExpire >= expireAt)) ||
|
||||
((ex->expireSetCond == HFE_LT) && (prevExpire <= expireAt)) ||
|
||||
(ex->expireSetCond == HFE_NX) )
|
||||
return HSETEX_NO_CONDITION_MET;
|
||||
|
||||
/* remove old expiry time from hash's private ebuckets */
|
||||
ebRemove(&ex->dictExpireMeta->hfe, &hashFieldExpireBucketsType, hfOld);
|
||||
|
||||
/* Track of minimum expiration time (only later update global HFE DS) */
|
||||
if (ex->minExpireFields > prevExpire)
|
||||
ex->minExpireFields = prevExpire;
|
||||
|
||||
} else {
|
||||
/* field has invalid expiry. No need to ebRemove() */
|
||||
|
||||
/* Check XX|LT|GT */
|
||||
if (ex->expireSetCond & (HFE_XX | HFE_LT | HFE_GT))
|
||||
return HSETEX_NO_CONDITION_MET;
|
||||
}
|
||||
|
||||
/* Reuse hfOld as hfNew and rewrite its expiry with ebAdd() */
|
||||
hfNew = hfOld;
|
||||
}
|
||||
|
||||
dictSetKey(ht, existingEntry, hfNew);
|
||||
}
|
||||
|
||||
/* if expiration time is in the past */
|
||||
if (unlikely(checkAlreadyExpired(expireAt))) {
|
||||
hashTypeDelete(ex->hashObj, field);
|
||||
ex->fieldDeleted++;
|
||||
return HSETEX_DELETED;
|
||||
}
|
||||
|
||||
if (ex->minExpireFields > expireAt)
|
||||
ex->minExpireFields = expireAt;
|
||||
|
||||
ebAdd(&ex->dictExpireMeta->hfe, &hashFieldExpireBucketsType, hfNew, expireAt);
|
||||
ex->fieldUpdated++;
|
||||
return HSETEX_OK;
|
||||
}
|
||||
|
||||
/*
|
||||
* Set fields OR field's expiration (See also `setex*` comment above)
|
||||
*
|
||||
* Take care to call first hashTypeSetExInit() and then call this function.
|
||||
* Finally, call hashTypeSetExDone() to notify and update global HFE DS.
|
||||
*/
|
||||
SetExRes hashTypeSetEx(redisDb *db, robj *o, sds field, HashTypeSet *setKeyVal,
|
||||
uint64_t expireAt, HashTypeSetEx *exInfo)
|
||||
{
|
||||
SetExRes res = HSETEX_OK;
|
||||
int isSetKeyValue = (setKeyVal) ? 1 : 0;
|
||||
int isSetExpire = (exInfo) ? 1 : 0;
|
||||
int flags = (setKeyVal) ? setKeyVal->flags : 0;
|
||||
|
||||
/* Check if the field is too long for listpack, and convert before adding the item.
|
||||
* This is needed for HINCRBY* case since in other commands this is handled early by
|
||||
* hashTypeTryConversion, so this check will be a NOP. */
|
||||
if (o->encoding == OBJ_ENCODING_LISTPACK) {
|
||||
if (sdslen(field) > server.hash_max_listpack_value || sdslen(value) > server.hash_max_listpack_value)
|
||||
if (isSetKeyValue && o->encoding == OBJ_ENCODING_LISTPACK) {
|
||||
if (sdslen(field) > server.hash_max_listpack_value ||
|
||||
sdslen(setKeyVal->value) > server.hash_max_listpack_value)
|
||||
hashTypeConvert(o, OBJ_ENCODING_HT);
|
||||
}
|
||||
|
||||
if (o->encoding == OBJ_ENCODING_LISTPACK) {
|
||||
unsigned char *zl, *fptr, *vptr;
|
||||
|
||||
zl = o->ptr;
|
||||
fptr = lpFirst(zl);
|
||||
if (fptr != NULL) {
|
||||
fptr = lpFind(zl, fptr, (unsigned char*)field, sdslen(field), 1);
|
||||
if (fptr != NULL) {
|
||||
/* Grab pointer to the value (fptr points to the field) */
|
||||
vptr = lpNext(zl, fptr);
|
||||
serverAssert(vptr != NULL);
|
||||
update = 1;
|
||||
|
||||
/* Replace value */
|
||||
zl = lpReplace(zl, &vptr, (unsigned char*)value, sdslen(value));
|
||||
}
|
||||
}
|
||||
|
||||
if (!update) {
|
||||
/* Push new field/value pair onto the tail of the listpack */
|
||||
zl = lpAppend(zl, (unsigned char*)field, sdslen(field));
|
||||
zl = lpAppend(zl, (unsigned char*)value, sdslen(value));
|
||||
}
|
||||
o->ptr = zl;
|
||||
|
||||
/* Check if the listpack needs to be converted to a hash table */
|
||||
if (hashTypeLength(o, 0) > server.hash_max_listpack_entries)
|
||||
hashTypeConvert(o, OBJ_ENCODING_HT);
|
||||
} else if (o->encoding == OBJ_ENCODING_HT) {
|
||||
dict *ht = o->ptr;
|
||||
dictEntry *de, *existingEntry;
|
||||
sds storedValue;
|
||||
if (flags & HASH_SET_TAKE_VALUE) {
|
||||
storedValue = value;
|
||||
value = NULL;
|
||||
} else {
|
||||
storedValue = sdsdup(value);
|
||||
}
|
||||
/* Cannot leverage HASH_SET_TAKE_FIELD since hfield is not of type sds */
|
||||
hfield newField = hfieldNew(field, sdslen(field), 0);
|
||||
|
||||
/* stored key is different than lookup key */
|
||||
dictUseStoredKeyApi(ht, 1);
|
||||
de = dictAddRaw(ht, newField, &existingEntry);
|
||||
dictUseStoredKeyApi(ht, 0);
|
||||
|
||||
if (de) {
|
||||
dictSetVal(ht, de, storedValue);
|
||||
} else {
|
||||
/* If attached TTL to the old field, then remove it from hash's private ebuckets */
|
||||
hfield oldField = dictGetKey(existingEntry);
|
||||
hfieldPersist(db, o, oldField);
|
||||
sdsfree(dictGetVal(existingEntry));
|
||||
dictSetVal(ht, existingEntry, storedValue);
|
||||
update = 1;
|
||||
hfieldFree(newField);
|
||||
}
|
||||
} else {
|
||||
res = hashTypeSetExListpack(db, o, field, setKeyVal, expireAt, exInfo);
|
||||
goto SetExDone;
|
||||
} else if (o->encoding != OBJ_ENCODING_HT) {
|
||||
serverPanic("Unknown hash encoding");
|
||||
}
|
||||
|
||||
/*** now deal with HT ***/
|
||||
hfield newField;
|
||||
dict *ht = o->ptr;
|
||||
dictEntry *de;
|
||||
|
||||
/* If needed to set the field along with expiry */
|
||||
if (isSetExpire) {
|
||||
res = hashTypeSetExpiry(exInfo, field, expireAt, &de);
|
||||
if (res != HSETEX_OK) goto SetExDone;
|
||||
} else {
|
||||
dictEntry *existing;
|
||||
/* Cannot leverage HASH_SET_TAKE_FIELD since hfield is not of type sds */
|
||||
newField = hfieldNew(field, sdslen(field), 0);
|
||||
|
||||
/* stored key is different than lookup key */
|
||||
dictUseStoredKeyApi(ht, 1);
|
||||
de = dictAddRaw(ht, newField, &existing);
|
||||
dictUseStoredKeyApi(ht, 0);
|
||||
|
||||
/* If field already exists, then update "field". "Value" will be set afterward */
|
||||
if (de == NULL) {
|
||||
/* If attached TTL to the old field, then remove it from hash's private ebuckets */
|
||||
hfield oldField = dictGetKey(existing);
|
||||
hfieldPersist(db, o, oldField);
|
||||
|
||||
hfieldFree(oldField);
|
||||
sdsfree(dictGetVal(existing));
|
||||
dictSetKey(ht, existing, newField);
|
||||
res = HSET_UPDATE;
|
||||
de = existing;
|
||||
}
|
||||
}
|
||||
|
||||
/* If need to set value */
|
||||
if (isSetKeyValue) {
|
||||
if (flags & HASH_SET_TAKE_VALUE) {
|
||||
dictSetVal(ht, de, setKeyVal->value);
|
||||
flags &= ~HASH_SET_TAKE_VALUE;
|
||||
} else {
|
||||
dictSetVal(ht, de, sdsdup(setKeyVal->value));
|
||||
}
|
||||
}
|
||||
|
||||
SetExDone:
|
||||
/* Free SDS strings we did not referenced elsewhere if the flags
|
||||
* want this function to be responsible. */
|
||||
if (flags & HASH_SET_TAKE_FIELD && field) sdsfree(field);
|
||||
if (flags & HASH_SET_TAKE_VALUE && value) sdsfree(value);
|
||||
return update;
|
||||
if (flags & HASH_SET_TAKE_VALUE && setKeyVal->value) sdsfree(setKeyVal->value);
|
||||
return res;
|
||||
}
|
||||
|
||||
/*
|
||||
* Init HashTypeSetEx struct before calling hashTypeSetEx()
|
||||
*
|
||||
* Don't have to provide client and "cmd". If provided, then notification once
|
||||
* done by function hashTypeSetExDone().
|
||||
*/
|
||||
int hashTypeSetExInit(robj *key, robj *o, client *c, redisDb *db, const char *cmd, FieldSetCond fieldSetCond,
|
||||
FieldGet fieldGet, ExpireSetCond expireSetCond,
|
||||
HashTypeSetEx *ex)
|
||||
{
|
||||
dict *ht = o->ptr;
|
||||
|
||||
ex->fieldSetCond = fieldSetCond;
|
||||
ex->fieldGet = fieldGet; /* TODO */
|
||||
ex->expireSetCond = expireSetCond;
|
||||
ex->dictExpireMeta = NULL;
|
||||
ex->minExpire = EB_EXPIRE_TIME_INVALID;
|
||||
ex->c = c;
|
||||
ex->cmd = cmd;
|
||||
ex->db = db;
|
||||
ex->key = key;
|
||||
ex->hashObj = o;
|
||||
ex->fieldDeleted = 0;
|
||||
ex->fieldUpdated = 0;
|
||||
ex->minExpireFields = EB_EXPIRE_TIME_INVALID;
|
||||
|
||||
/* Take care dict has HFE metadata */
|
||||
if (!isDictWithMetaHFE(ht)) {
|
||||
/* Realloc (only header of dict) with metadata for hash-field expiration */
|
||||
dictTypeAddMeta(&ht, &mstrHashDictTypeWithHFE);
|
||||
ex->dictExpireMeta = (dictExpireMetadata *) dictMetadata(ht);
|
||||
ex->hashObj->ptr = ht;
|
||||
|
||||
/* Find the key in the keyspace. Need to keep reference to the key for
|
||||
* notifications or even removal of the hash */
|
||||
dictEntry *de = dbFind(db, key->ptr);
|
||||
serverAssert(de != NULL);
|
||||
|
||||
/* Fillup dict HFE metadata */
|
||||
ex->dictExpireMeta->key = dictGetKey(de); /* reference key in keyspace */
|
||||
ex->dictExpireMeta->hfe = ebCreate(); /* Allocate HFE DS */
|
||||
ex->dictExpireMeta->expireMeta.trash = 1; /* mark as trash (as long it wasn't ebAdd()) */
|
||||
} else {
|
||||
ex->dictExpireMeta = (dictExpireMetadata *) dictMetadata(ht);
|
||||
ExpireMeta *expireMeta = &ex->dictExpireMeta->expireMeta;
|
||||
|
||||
/* Keep aside min HFE before update. Verify it is not trash */
|
||||
if (expireMeta->trash == 0)
|
||||
ex->minExpire = ebGetMetaExpTime(&ex->dictExpireMeta->expireMeta);
|
||||
}
|
||||
return C_OK;
|
||||
}
|
||||
|
||||
/*
|
||||
* After calling hashTypeSetEx() for setting fields or their expiry, call this
|
||||
* function to notify and update global HFE DS.
|
||||
*/
|
||||
void hashTypeSetExDone(HashTypeSetEx *ex) {
|
||||
/* Notify keyspace event, update dirty count and update global HFE DS */
|
||||
if (ex->fieldDeleted + ex->fieldUpdated > 0) {
|
||||
|
||||
if (ex->c) {
|
||||
server.dirty += ex->fieldDeleted + ex->fieldUpdated;
|
||||
signalModifiedKey(ex->c, ex->db, ex->key);
|
||||
notifyKeyspaceEvent(NOTIFY_HASH, ex->cmd, ex->key, ex->db->id);
|
||||
}
|
||||
if (ex->fieldDeleted && hashTypeLength(ex->hashObj, 0) == 0) {
|
||||
dbDelete(ex->db,ex->key);
|
||||
if (ex->c) notifyKeyspaceEvent(NOTIFY_GENERIC,"del",ex->key, ex->db->id);
|
||||
} else {
|
||||
|
||||
/* If minimum HFE of the hash is smaller than expiration time of the
|
||||
* specified fields in the command as well as it is smaller or equal
|
||||
* than expiration time provided in the command, then the minimum
|
||||
* HFE of the hash won't change following this command. */
|
||||
if (ex->minExpire < ex->minExpireFields)
|
||||
return;
|
||||
|
||||
/* retrieve new expired time. It might have changed. */
|
||||
uint64_t newMinExpire = ebGetNextTimeToExpire(ex->dictExpireMeta->hfe,
|
||||
&hashFieldExpireBucketsType);
|
||||
|
||||
/* Calculate the diff between old minExpire and newMinExpire. If it is
|
||||
* only few seconds, then don't have to update global HFE DS. At the worst
|
||||
* case fields of hash will be active-expired up to few seconds later.
|
||||
*
|
||||
* In any case, active-expire operation will know to update global
|
||||
* HFE DS more efficiently than here for a single item.
|
||||
*/
|
||||
uint64_t diff = (ex->minExpire > newMinExpire) ?
|
||||
(ex->minExpire - newMinExpire) : (newMinExpire - ex->minExpire);
|
||||
if (diff < HASH_NEW_EXPIRE_DIFF_THRESHOLD) return;
|
||||
|
||||
if (ex->minExpire != EB_EXPIRE_TIME_INVALID)
|
||||
ebRemove(&ex->db->hexpires, &hashExpireBucketsType, ex->hashObj);
|
||||
if (newMinExpire != EB_EXPIRE_TIME_INVALID)
|
||||
ebAdd(&ex->db->hexpires, &hashExpireBucketsType, ex->hashObj, newMinExpire);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/* Check if the field is too long for listpack, and convert before adding the item.
|
||||
* This is needed for HINCRBY* case since in other commands this is handled early by
|
||||
* hashTypeTryConversion, so this check will be a NOP. */
|
||||
static SetExRes hashTypeSetExListpack(redisDb *db, robj *o, sds field, HashTypeSet *s,
|
||||
uint64_t expireAt, HashTypeSetEx *ex)
|
||||
{
|
||||
UNUSED(db);
|
||||
UNUSED(expireAt);
|
||||
UNUSED(ex);
|
||||
int res = HSETEX_OK;
|
||||
unsigned char *zl, *fptr, *vptr;
|
||||
|
||||
/* TODO support expiration time for listpack */
|
||||
|
||||
|
||||
zl = o->ptr;
|
||||
fptr = lpFirst(zl);
|
||||
if (fptr != NULL) {
|
||||
fptr = lpFind(zl, fptr, (unsigned char*)field, sdslen(field), 1);
|
||||
if (fptr != NULL) {
|
||||
/* Grab pointer to the value (fptr points to the field) */
|
||||
vptr = lpNext(zl, fptr);
|
||||
serverAssert(vptr != NULL);
|
||||
res = HSET_UPDATE;
|
||||
|
||||
/* Replace value */
|
||||
zl = lpReplace(zl, &vptr, (unsigned char*)s->value, sdslen(s->value));
|
||||
}
|
||||
}
|
||||
|
||||
if (res != HSET_UPDATE) {
|
||||
/* Push new field/value pair onto the tail of the listpack */
|
||||
zl = lpAppend(zl, (unsigned char*)field, sdslen(field));
|
||||
zl = lpAppend(zl, (unsigned char*)s->value, sdslen(s->value));
|
||||
}
|
||||
o->ptr = zl;
|
||||
|
||||
/* Check if the listpack needs to be converted to a hash table */
|
||||
if (hashTypeLength(o, 0) > server.hash_max_listpack_entries)
|
||||
hashTypeConvert(o, OBJ_ENCODING_HT);
|
||||
|
||||
return res;
|
||||
}
|
||||
|
||||
/* Delete an element from a hash.
|
||||
|
@ -493,7 +831,7 @@ int hashTypeDelete(robj *o, sds field) {
|
|||
|
||||
/* Return the number of elements in a hash.
|
||||
*
|
||||
* Note: Might be pricy in case there are many HFEs
|
||||
* Note, subtractExpiredFields=1 might be pricy in case there are many HFEs
|
||||
*/
|
||||
unsigned long hashTypeLength(const robj *o, int subtractExpiredFields) {
|
||||
unsigned long length = ULONG_MAX;
|
||||
|
@ -1659,46 +1997,26 @@ static uint64_t hfieldGetExpireTime(hfield field) {
|
|||
|
||||
/* Remove TTL from the field. Assumed ExpireMeta is attached and has valid value */
|
||||
static void hfieldPersist(redisDb *db, robj *hashObj, hfield field) {
|
||||
UNUSED(db);
|
||||
uint64_t fieldExpireTime = hfieldGetExpireTime(field);
|
||||
if (fieldExpireTime == EB_EXPIRE_TIME_INVALID)
|
||||
return;
|
||||
|
||||
serverAssert(isDictWithMetaHFE(hashObj->ptr));
|
||||
|
||||
/* if field is set with expire, then dict must has HFE metadata attached */
|
||||
dict *d = hashObj->ptr;
|
||||
dictExpireMetadata *dictExpireMeta = (dictExpireMetadata *)dictMetadata(d);
|
||||
|
||||
/* If field has valid expiry then dict should have valid metadata as well */
|
||||
/* If field has valid expiry then dict must have valid metadata as well */
|
||||
serverAssert(dictExpireMeta->expireMeta.trash == 0);
|
||||
|
||||
uint64_t minExpire = ebGetMetaExpTime(&dictExpireMeta->expireMeta);
|
||||
|
||||
/* Remove field from private HFE DS */
|
||||
ebRemove(&dictExpireMeta->hfe, &hashFieldExpireBucketsType, field);
|
||||
|
||||
/* If the removed field was not the minimal to expire, then no need to update
|
||||
* the hash at global HFE DS. Take into account precision loss in case
|
||||
* EB_BUCKET_KEY_PRECISION>0 by assisting EB_BUCKET_KEY() */
|
||||
if (EB_BUCKET_KEY(minExpire) != EB_BUCKET_KEY(fieldExpireTime)) return;
|
||||
|
||||
uint64_t newMinExpire = ebGetNextTimeToExpire(dictExpireMeta->hfe, &hashFieldExpireBucketsType);
|
||||
|
||||
/* Calculate the diff between minExpire and newMinExpire. If it is
|
||||
* only few seconds, then don't have to update global HFE DS. At the worst
|
||||
* case fields of hash will be active-expired up to few seconds later.
|
||||
*
|
||||
* In any case, active-expire operation will know to update global
|
||||
* HFE DS more efficiently than here for a single item.
|
||||
*/
|
||||
uint64_t diff = (minExpire > newMinExpire) ?
|
||||
(minExpire - newMinExpire) : (newMinExpire - minExpire);
|
||||
if (diff < HASH_NEW_EXPIRE_DIFF_THRESHOLD) return;
|
||||
|
||||
ebRemove(&db->hexpires, &hashExpireBucketsType, hashObj);
|
||||
|
||||
/* If it was not last field to expire */
|
||||
if (newMinExpire != EB_EXPIRE_TIME_INVALID)
|
||||
ebAdd(&db->hexpires, &hashExpireBucketsType, hashObj, newMinExpire);
|
||||
/* Don't have to update global HFE DS. It's unnecessary. Implementing this
|
||||
* would introduce significant complexity and overhead for an operation that
|
||||
* isn't critical. In the worst case scenario, the hash will be efficiently
|
||||
* updated later by an active-expire operation, or it will be removed by the
|
||||
* hash's dbGenericDelete() function. */
|
||||
}
|
||||
|
||||
int hfieldIsExpired(hfield field) {
|
||||
|
@ -1723,73 +2041,13 @@ static ExpireAction onFieldExpire(eItem item, void *ctx) {
|
|||
|
||||
/* Retrieve the ExpireMeta associated with the hash.
|
||||
* The caller is responsible for ensuring that it is indeed attached. */
|
||||
static ExpireMeta *hashGetExpireMeta(const eItem item) {
|
||||
robj *hashObj = (robj *)item;
|
||||
static ExpireMeta *hashGetExpireMeta(const eItem hash) {
|
||||
robj *hashObj = (robj *)hash;
|
||||
dict *d = hashObj->ptr;
|
||||
dictExpireMetadata *dictExpireMeta = (dictExpireMetadata *) dictMetadata(d);
|
||||
return &dictExpireMeta->expireMeta;
|
||||
}
|
||||
|
||||
/* Set time-expiration to hash-field */
|
||||
SetExpireTimeRes hashTypeSetExpire(ebuckets *eb,
|
||||
robj *hashObj,
|
||||
sds field,
|
||||
uint64_t expireAt,
|
||||
int flag,
|
||||
uint64_t *minPrevExp)
|
||||
{
|
||||
dict *d = hashObj->ptr;
|
||||
uint64_t prevExpire = EB_EXPIRE_TIME_MAX;
|
||||
|
||||
/* First retrieve the field to check if it exists */
|
||||
dictEntry *de = dictFind(d, field);
|
||||
if (de == NULL) return HFE_SET_NO_FIELD;
|
||||
|
||||
hfield hf = dictGetKey(de);
|
||||
|
||||
/* If field doesn't have expiry metadata attached */
|
||||
if (!hfieldIsExpireAttached(hf)) {
|
||||
if (flag & (HFE_XX | HFE_LT | HFE_GT))
|
||||
return HFE_SET_NO_CONDITION_MET;
|
||||
|
||||
/* allocate new field with expire metadata */
|
||||
hfield hfNew = hfieldNew(hf, hfieldlen(hf), 1 /*withExpireMeta*/);
|
||||
/* Replace the old field with the new one with metadata */
|
||||
dictSetKey(d, de, hfNew);
|
||||
hfieldFree(hf);
|
||||
hf = hfNew;
|
||||
} else {
|
||||
/* read previous expire time */
|
||||
prevExpire = hfieldGetExpireTime(hf);
|
||||
|
||||
if (prevExpire != EB_EXPIRE_TIME_INVALID) {
|
||||
if (((flag == HFE_GT) && (prevExpire >= expireAt)) ||
|
||||
((flag == HFE_LT) && (prevExpire <= expireAt)) ||
|
||||
(flag == HFE_NX) )
|
||||
return HFE_SET_NO_CONDITION_MET;
|
||||
|
||||
ebRemove(eb, &hashFieldExpireBucketsType, hf);
|
||||
|
||||
if (*minPrevExp > prevExpire)
|
||||
*minPrevExp = prevExpire;
|
||||
} else {
|
||||
if (flag & (HFE_XX | HFE_LT | HFE_GT))
|
||||
return HFE_SET_NO_CONDITION_MET;
|
||||
}
|
||||
}
|
||||
|
||||
/* if expiration time is in the past */
|
||||
if (checkAlreadyExpired(expireAt)) {
|
||||
hashTypeDelete(hashObj, field);
|
||||
return HFE_SET_DELETED;
|
||||
}
|
||||
ebAdd(eb, &hashFieldExpireBucketsType, hf, expireAt);
|
||||
|
||||
// TODO: propagate, rewrite command if needed. See expireGenericCommand() as reference
|
||||
|
||||
return HFE_SET_OK;
|
||||
}
|
||||
|
||||
static void httlGenericCommand(client *c, const char *cmd, long long basetime, int unit) {
|
||||
UNUSED(cmd);
|
||||
robj *hashObj;
|
||||
|
@ -1834,7 +2092,7 @@ static void httlGenericCommand(client *c, const char *cmd, long long basetime, i
|
|||
continue;
|
||||
}
|
||||
|
||||
if ( (long long) expire <= commandTimeSnapshot()) {
|
||||
if ( (long long) expire < commandTimeSnapshot()) {
|
||||
addReplyLongLong(c, HFE_GET_NO_FIELD);
|
||||
continue;
|
||||
}
|
||||
|
@ -1858,7 +2116,7 @@ static void httlGenericCommand(client *c, const char *cmd, long long basetime, i
|
|||
static void hexpireGenericCommand(client *c, const char *cmd, long long basetime, int unit) {
|
||||
long numFields = 0, numFieldsAt = 3;
|
||||
long long expire; /* unix time in msec */
|
||||
int flag = 0;
|
||||
int expireSetCond = 0;
|
||||
robj *hashObj, *keyArg = c->argv[1], *expireArg = c->argv[2];
|
||||
|
||||
/* Read the hash object */
|
||||
|
@ -1871,8 +2129,6 @@ static void hexpireGenericCommand(client *c, const char *cmd, long long basetime
|
|||
return;
|
||||
}
|
||||
|
||||
dict *d = hashObj->ptr;
|
||||
|
||||
/* Read the expiry time from command */
|
||||
if (getLongLongFromObjectOrReply(c, expireArg, &expire, NULL) != C_OK)
|
||||
return;
|
||||
|
@ -1902,16 +2158,16 @@ static void hexpireGenericCommand(client *c, const char *cmd, long long basetime
|
|||
}
|
||||
expire += basetime;
|
||||
|
||||
/* Read optional flag [NX|XX|GT|LT] */
|
||||
/* Read optional expireSetCond [NX|XX|GT|LT] */
|
||||
char *optArg = c->argv[3]->ptr;
|
||||
if (!strcasecmp(optArg, "nx")) {
|
||||
flag = HFE_NX; ++numFieldsAt;
|
||||
expireSetCond = HFE_NX; ++numFieldsAt;
|
||||
} else if (!strcasecmp(optArg, "xx")) {
|
||||
flag = HFE_XX; ++numFieldsAt;
|
||||
expireSetCond = HFE_XX; ++numFieldsAt;
|
||||
} else if (!strcasecmp(optArg, "gt")) {
|
||||
flag = HFE_GT; ++numFieldsAt;
|
||||
expireSetCond = HFE_GT; ++numFieldsAt;
|
||||
} else if (!strcasecmp(optArg, "lt")) {
|
||||
flag = HFE_LT; ++numFieldsAt;
|
||||
expireSetCond = HFE_LT; ++numFieldsAt;
|
||||
}
|
||||
|
||||
/* Read number of fields */
|
||||
|
@ -1925,96 +2181,21 @@ static void hexpireGenericCommand(client *c, const char *cmd, long long basetime
|
|||
return;
|
||||
}
|
||||
|
||||
dictExpireMetadata *dictExpireMeta;
|
||||
uint64_t minExpire = EB_EXPIRE_TIME_INVALID;
|
||||
HashTypeSetEx exCtx;
|
||||
hashTypeSetExInit(keyArg, hashObj, c, c->db, cmd,
|
||||
FIELD_DONT_CREATE2,
|
||||
FIELD_GET_NONE,
|
||||
expireSetCond,
|
||||
&exCtx);
|
||||
|
||||
/* If dict doesn't have metadata attached */
|
||||
if (!isDictWithMetaHFE(d)) {
|
||||
/* Realloc (only header of dict) with metadata for hash-field expiration */
|
||||
dictTypeAddMeta(&d, &mstrHashDictTypeWithHFE);
|
||||
dictExpireMeta = (dictExpireMetadata *) dictMetadata(d);
|
||||
hashObj->ptr = d;
|
||||
|
||||
/* Find the key in the keyspace. Need to keep reference to the key for
|
||||
* notifications or even removal of the hash */
|
||||
dictEntry *de = dbFind(c->db, keyArg->ptr);
|
||||
serverAssert(de != NULL);
|
||||
sds key = dictGetKey(de);
|
||||
|
||||
/* Fillup dict HFE metadata */
|
||||
dictExpireMeta->key = key; /* reference key in keyspace */
|
||||
dictExpireMeta->hfe = ebCreate(); /* Allocate HFE DS */
|
||||
dictExpireMeta->expireMeta.trash = 1; /* mark as trash (as long it wasn't ebAdd()) */
|
||||
} else {
|
||||
dictExpireMeta = (dictExpireMetadata *) dictMetadata(d);
|
||||
ExpireMeta *expireMeta = &dictExpireMeta->expireMeta;
|
||||
|
||||
/* Keep aside next hash-field expiry before updating HFE DS. Verify it is not trash */
|
||||
if (expireMeta->trash == 0)
|
||||
minExpire = ebGetMetaExpTime(&dictExpireMeta->expireMeta);
|
||||
}
|
||||
|
||||
/* Figure out from provided set of fields in command, which one has the minimum
|
||||
* expiration time, before the modification (Will be used for optimization below) */
|
||||
uint64_t minExpireFields = EB_EXPIRE_TIME_INVALID;
|
||||
|
||||
/* For each field in command, update dict HFE DS */
|
||||
int fieldUpdated=0, fieldDeleted=0;
|
||||
addReplyArrayLen(c, numFields);
|
||||
for (int i = 0 ; i < numFields ; i++) {
|
||||
sds field = c->argv[numFieldsAt+i+1]->ptr;
|
||||
|
||||
SetExpireTimeRes res = hashTypeSetExpire(&dictExpireMeta->hfe,
|
||||
hashObj,
|
||||
field,
|
||||
expire,
|
||||
flag,
|
||||
&minExpireFields);
|
||||
dictEntry *de;
|
||||
SetExRes res = hashTypeSetExpiry(&exCtx, field, expire, &de);
|
||||
addReplyLongLong(c,res);
|
||||
if (res == HFE_SET_DELETED)
|
||||
++fieldDeleted;
|
||||
else if (res == HFE_SET_OK)
|
||||
++fieldUpdated;
|
||||
}
|
||||
|
||||
/* Notify keyspace event, update dirty count and update global HFE DS */
|
||||
if (fieldDeleted + fieldUpdated > 0) {
|
||||
server.dirty += fieldDeleted + fieldUpdated;
|
||||
signalModifiedKey(c,c->db,keyArg);
|
||||
notifyKeyspaceEvent(NOTIFY_HASH,cmd,keyArg,c->db->id);
|
||||
if (fieldDeleted && hashTypeLength(hashObj, 0) == 0) {
|
||||
dbDelete(c->db,keyArg);
|
||||
notifyKeyspaceEvent(NOTIFY_GENERIC,"del",keyArg, c->db->id);
|
||||
} else {
|
||||
|
||||
/* If minimum HFE of the hash is smaller than expiration time of the
|
||||
* specified fields in the command as well as it is smaller or equal
|
||||
* than expiration time provided in the command, then the minimum
|
||||
* HFE of the hash won't change following this command. */
|
||||
if ((minExpire < minExpireFields) && ((long long)minExpire <= expire) )
|
||||
return;
|
||||
|
||||
/* retrieve new expired time. It might have changed. */
|
||||
uint64_t newMinExpire = ebGetNextTimeToExpire(dictExpireMeta->hfe,
|
||||
&hashFieldExpireBucketsType);
|
||||
|
||||
/* Calculate the diff between old minExpire and newMinExpire. If it is
|
||||
* only few seconds, then don't have to update global HFE DS. At the worst
|
||||
* case fields of hash will be active-expired up to few seconds later.
|
||||
*
|
||||
* In any case, active-expire operation will know to update global
|
||||
* HFE DS more efficiently than here for a single item.
|
||||
*/
|
||||
uint64_t diff = (minExpire > newMinExpire) ?
|
||||
(minExpire - newMinExpire) : (newMinExpire - minExpire);
|
||||
if (diff < HASH_NEW_EXPIRE_DIFF_THRESHOLD) return;
|
||||
|
||||
if (minExpire != EB_EXPIRE_TIME_INVALID)
|
||||
ebRemove(&c->db->hexpires, &hashExpireBucketsType, hashObj);
|
||||
if (newMinExpire != EB_EXPIRE_TIME_INVALID)
|
||||
ebAdd(&c->db->hexpires, &hashExpireBucketsType, hashObj, newMinExpire);
|
||||
}
|
||||
}
|
||||
hashTypeSetExDone(&exCtx);
|
||||
}
|
||||
|
||||
/* HPEXPIRE key milliseconds [ NX | XX | GT | LT] numfields <field [field ...]> */
|
||||
|
@ -2103,7 +2284,7 @@ void hpersistCommand(client *c) {
|
|||
}
|
||||
|
||||
/* Already expired. Pretend there is no such field */
|
||||
if ( (long long) expire <= commandTimeSnapshot()) {
|
||||
if ( (long long) expire < commandTimeSnapshot()) {
|
||||
addReplyLongLong(c, HFE_PERSIST_NO_FIELD);
|
||||
continue;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue