Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Embed key into dict entry #541

Merged
merged 22 commits into from
Jul 2, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion src/db.c
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,6 @@ static void dbAddInternal(serverDb *db, robj *key, robj *val, int update_if_exis
return;
}
serverAssertWithInfo(NULL, key, de != NULL);
kvstoreDictSetKey(db->keys, slot, de, sdsdup(key->ptr));
initObjectLRUOrLFU(val);
kvstoreDictSetVal(db->keys, slot, de, val);
signalKeyAsReady(db, key, val->type);
Expand Down
54 changes: 36 additions & 18 deletions src/defrag.c
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
typedef struct defragCtx {
void *privdata;
int slot;
void *aux;
} defragCtx;

typedef struct defragPubSubCtx {
Expand Down Expand Up @@ -75,6 +76,36 @@ void *activeDefragAlloc(void *ptr) {
return newptr;
}

/* This method captures the expiry db dict entry which refers to data stored in keys db dict entry. */
void defragEntryStartCbForKeys(void *ctx, void *oldptr) {
defragCtx *defragctx = (defragCtx *)ctx;
serverDb *db = defragctx->privdata;
sds oldsds = (sds)dictGetKey((dictEntry *)oldptr);
int slot = defragctx->slot;
if (kvstoreDictSize(db->expires, slot)) {
dictEntry *expire_de = kvstoreDictFind(db->expires, slot, oldsds);
defragctx->aux = expire_de;
}
}

/* This method updates the key of expiry db dict entry. The key might be no longer valid
* as it could have been cleaned up during the defrag-realloc of the main dictionary. */
void defragEntryFinishCbForKeys(void *ctx, void *newptr) {
defragCtx *defragctx = (defragCtx *)ctx;
dictEntry *expire_de = (dictEntry *)defragctx->aux;
/* Item doesn't have TTL associated to it. */
if (!expire_de) return;
/* No reallocation happened. */
if (!newptr) {
expire_de = NULL;
return;
}
serverDb *db = defragctx->privdata;
sds newsds = (sds)dictGetKey((dictEntry *)newptr);
int slot = defragctx->slot;
kvstoreDictSetKey(db->expires, slot, expire_de, newsds);
}

/*Defrag helper for sds strings
*
* returns NULL in case the allocation wasn't moved.
Expand Down Expand Up @@ -650,25 +681,10 @@ void defragModule(serverDb *db, dictEntry *kde) {
/* for each key we scan in the main dict, this function will attempt to defrag
* all the various pointers it has. */
void defragKey(defragCtx *ctx, dictEntry *de) {
sds keysds = dictGetKey(de);
robj *newob, *ob;
unsigned char *newzl;
sds newsds;
serverDb *db = ctx->privdata;
int slot = ctx->slot;
/* Try to defrag the key name. */
newsds = activeDefragSds(keysds);
if (newsds) {
kvstoreDictSetKey(db->keys, slot, de, newsds);
if (kvstoreDictSize(db->expires, slot)) {
/* We can't search in db->expires for that key after we've released
* the pointer it holds, since it won't be able to do the string
* compare, but we can find the entry using key hash and pointer. */
uint64_t hash = kvstoreGetHash(db->expires, newsds);
dictEntry *expire_de = kvstoreDictFindEntryByPtrAndHash(db->expires, slot, keysds, hash);
if (expire_de) kvstoreDictSetKey(db->expires, slot, expire_de, newsds);
}
}
robj *newob, *ob;
unsigned char *newzl;

/* Try to defrag robj and / or string value. */
ob = dictGetVal(de);
Expand Down Expand Up @@ -984,7 +1000,9 @@ void activeDefragCycle(void) {
endtime = start + timelimit;
latencyStartMonitor(latency);

dictDefragFunctions defragfns = {.defragAlloc = activeDefragAlloc};
dictDefragFunctions defragfns = {.defragAlloc = activeDefragAlloc,
.defragEntryStartCb = defragEntryStartCbForKeys,
.defragEntryFinishCb = defragEntryFinishCbForKeys};
do {
/* if we're not continuing a scan from the last call or loop, start a new one */
if (!defrag_stage && !defrag_cursor && (slot < 0)) {
Expand Down
131 changes: 94 additions & 37 deletions src/dict.c
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@

#include "fmacros.h"

#include <stddef.h>
#include <stdio.h>
#include <stdlib.h>
#include <stdint.h>
Expand All @@ -48,6 +49,10 @@
#include "serverassert.h"
#include "monotonic.h"

#ifndef static_assert
#define static_assert(expr, lit) extern char __static_assert_failure[(expr) ? 1 : -1]
madolson marked this conversation as resolved.
Show resolved Hide resolved
#endif

/* Using dictSetResizeEnabled() we make possible to disable
* resizing and rehashing of the hash table as needed. This is very important
* for us, as we use copy-on-write and don't want to move too much memory
Expand All @@ -74,11 +79,36 @@ struct dictEntry {
struct dictEntry *next; /* Next entry in the same hash bucket. */
};

typedef struct {
union {
void *val;
uint64_t u64;
int64_t s64;
double d;
} v;
struct dictEntry *next; /* Next entry in the same hash bucket. */
uint8_t key_header_size; /* offset into key_buf where the key is located at. */
unsigned char key_buf[]; /* buffer to embed the key. */
hpatro marked this conversation as resolved.
Show resolved Hide resolved
} embeddedDictEntry;

static_assert(sizeof(embeddedDictEntry) == 24, "unexpected total size of embeddedDictEntry");
static_assert(offsetof(embeddedDictEntry, v) == 0, "unexpected field offset");
static_assert(offsetof(embeddedDictEntry, next) == 8, "unexpected field offset");
static_assert(offsetof(embeddedDictEntry, key_header_size) == 16, "unexpected field offset");
static_assert(offsetof(embeddedDictEntry, key_buf) == 17, "unexpected field offset");
hpatro marked this conversation as resolved.
Show resolved Hide resolved

/* The minimum amount of bytes required for embedded dict entry. */
static inline size_t compactSizeEmbeddedDictEntry(void) {
return offsetof(embeddedDictEntry, key_buf);
}
hpatro marked this conversation as resolved.
Show resolved Hide resolved

typedef struct {
void *key;
dictEntry *next;
} dictEntryNoValue;

/* Helper and validation for `embeddedDictEntry` */

/* -------------------------- private prototypes ---------------------------- */

static void _dictExpandIfNeeded(dict *d);
Expand Down Expand Up @@ -124,6 +154,8 @@ uint64_t dictGenCaseHashFunction(const unsigned char *buf, size_t len) {
#define ENTRY_PTR_MASK 7 /* 111 */
#define ENTRY_PTR_NORMAL 0 /* 000 */
#define ENTRY_PTR_NO_VALUE 2 /* 010 */
#define ENTRY_PTR_EMBEDDED 4 /* 100 */
zuiderkwast marked this conversation as resolved.
Show resolved Hide resolved
/* ENTRY_PTR_IS_KEY xx1 */

/* Returns 1 if the entry pointer is a pointer to a key, rather than to an
* allocated entry. Returns 0 otherwise. */
Expand All @@ -143,12 +175,9 @@ static inline int entryIsNoValue(const dictEntry *de) {
return ((uintptr_t)(void *)de & ENTRY_PTR_MASK) == ENTRY_PTR_NO_VALUE;
}

/* Creates an entry without a value field. */
static inline dictEntry *createEntryNoValue(void *key, dictEntry *next) {
dictEntryNoValue *entry = zmalloc(sizeof(*entry));
entry->key = key;
entry->next = next;
return (dictEntry *)(void *)((uintptr_t)(void *)entry | ENTRY_PTR_NO_VALUE);

static inline int entryIsEmbedded(const dictEntry *de) {
return ((uintptr_t)(void *)de & ENTRY_PTR_MASK) == ENTRY_PTR_EMBEDDED;
}

static inline dictEntry *encodeMaskedPtr(const void *ptr, unsigned int bits) {
Expand All @@ -161,15 +190,40 @@ static inline void *decodeMaskedPtr(const dictEntry *de) {
return (void *)((uintptr_t)(void *)de & ~ENTRY_PTR_MASK);
}

/* Creates an entry without a value field. */
static inline dictEntry *createEntryNoValue(void *key, dictEntry *next) {
dictEntryNoValue *entry = zmalloc(sizeof(*entry));
entry->key = key;
entry->next = next;
return encodeMaskedPtr(entry, ENTRY_PTR_NO_VALUE);
}

static inline dictEntry *createEmbeddedEntry(void *key, dictEntry *next, dictType *dt) {
size_t key_len = dt->embedKey(NULL, 0, key, NULL);
embeddedDictEntry *entry = zmalloc(compactSizeEmbeddedDictEntry() + key_len);
dt->embedKey(entry->key_buf, key_len, key, &entry->key_header_size);
entry->next = next;
return encodeMaskedPtr(entry, ENTRY_PTR_EMBEDDED);
}

static inline void *getEmbeddedKey(const dictEntry *de) {
embeddedDictEntry *entry = (embeddedDictEntry *)decodeMaskedPtr(de);
return &entry->key_buf[entry->key_header_size];
}

/* Decodes the pointer to an entry without value, when you know it is an entry
* without value. Hint: Use entryIsNoValue to check. */
static inline dictEntryNoValue *decodeEntryNoValue(const dictEntry *de) {
return decodeMaskedPtr(de);
}

static inline embeddedDictEntry *decodeEmbeddedEntry(const dictEntry *de) {
return decodeMaskedPtr(de);
}

/* Returns 1 if the entry has a value field and 0 otherwise. */
static inline int entryHasValue(const dictEntry *de) {
return entryIsNormal(de);
return !entryIsNoValue(de);
hpatro marked this conversation as resolved.
Show resolved Hide resolved
}

/* ----------------------------- API implementation ------------------------- */
Expand Down Expand Up @@ -509,6 +563,8 @@ dictEntry *dictInsertAtPosition(dict *d, void *key, void *position) {
/* Allocate an entry without value. */
entry = createEntryNoValue(key, *bucket);
}
} else if (d->type->embedded_entry) {
entry = createEmbeddedEntry(key, *bucket, d->type);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The way dictEntry is used has no type safety. Admittedly, this is not a new issue but the addition of embeddedDictEntry is making it worse. The following code path looks problematic to me. can you please double check?

setGenericComamd() -> setKey() -> dbAdd() -> dbAddInsternal() -> kvstoreDictSetKey() -> dictSetKey()

I don't seedictSetKey getting patched to handle this new embedded dict entry.

I would suggest making dictEntry an opaque struct next and force every function go through an inline accessor function/macro. I think this is the only certain way to ensure we don't accidentally use the wrong data type.

Copy link
Member

@madolson madolson Jun 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't the dictEntry already opaque?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not in dict.c.

Copy link
Member

@madolson madolson Jun 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess I don't agree it should be opaque in dict.c. It seems like a very small number of actual touch points we actually have to make sure are correct. We could even reduce the number. I'm not sure making it opaque will help all that much.

Discussed offline, Ping has a separate proposal for adding guardrails that he will publish.

hpatro marked this conversation as resolved.
Show resolved Hide resolved
} else {
/* Allocate the memory and store the new entry.
* Insert the element in top, with the assumption that in a database
Expand Down Expand Up @@ -656,6 +712,7 @@ void dictFreeUnlinkedEntry(dict *d, dictEntry *he) {
if (he == NULL) return;
dictFreeKey(d, he);
dictFreeVal(d, he);
/* Clear the dictEntry */
if (!entryIsKey(he)) zfree(decodeMaskedPtr(he));
}

Expand Down Expand Up @@ -801,7 +858,12 @@ void dictSetKey(dict *d, dictEntry *de, void *key) {

void dictSetVal(dict *d, dictEntry *de, void *val) {
assert(entryHasValue(de));
de->v.val = d->type->valDup ? d->type->valDup(d, val) : val;
void *v = d->type->valDup ? d->type->valDup(d, val) : val;
if (entryIsEmbedded(de)) {
decodeEmbeddedEntry(de)->v.val = v;
} else {
de->v.val = v;
}
}

void dictSetSignedIntegerVal(dictEntry *de, int64_t val) {
Expand Down Expand Up @@ -837,11 +899,15 @@ double dictIncrDoubleVal(dictEntry *de, double val) {
void *dictGetKey(const dictEntry *de) {
if (entryIsKey(de)) return (void *)de;
if (entryIsNoValue(de)) return decodeEntryNoValue(de)->key;
if (entryIsEmbedded(de)) return getEmbeddedKey(de);
return de->key;
}

void *dictGetVal(const dictEntry *de) {
assert(entryHasValue(de));
if (entryIsEmbedded(de)) {
return decodeEmbeddedEntry(de)->v.val;
}
return de->v.val;
}

Expand Down Expand Up @@ -871,6 +937,7 @@ double *dictGetDoubleValPtr(dictEntry *de) {
static dictEntry *dictGetNext(const dictEntry *de) {
if (entryIsKey(de)) return NULL; /* there's no next */
if (entryIsNoValue(de)) return decodeEntryNoValue(de)->next;
if (entryIsEmbedded(de)) return decodeEmbeddedEntry(de)->next;
return de->next;
}

Expand All @@ -879,14 +946,16 @@ static dictEntry *dictGetNext(const dictEntry *de) {
static dictEntry **dictGetNextRef(dictEntry *de) {
if (entryIsKey(de)) return NULL;
if (entryIsNoValue(de)) return &decodeEntryNoValue(de)->next;
if (entryIsEmbedded(de)) return &decodeEmbeddedEntry(de)->next;
return &de->next;
}

static void dictSetNext(dictEntry *de, dictEntry *next) {
assert(!entryIsKey(de));
if (entryIsNoValue(de)) {
dictEntryNoValue *entry = decodeEntryNoValue(de);
entry->next = next;
decodeEntryNoValue(de)->next = next;
} else if (entryIsEmbedded(de)) {
decodeEmbeddedEntry(de)->next = next;
} else {
de->next = next;
}
Expand Down Expand Up @@ -1164,7 +1233,7 @@ unsigned int dictGetSomeKeys(dict *d, dictEntry **des, unsigned int count) {

/* Reallocate the dictEntry, key and value allocations in a bucket using the
* provided allocation functions in order to defrag them. */
static void dictDefragBucket(dictEntry **bucketref, dictDefragFunctions *defragfns) {
static void dictDefragBucket(dictEntry **bucketref, dictDefragFunctions *defragfns, void *privdata) {
dictDefragAllocFunction *defragalloc = defragfns->defragAlloc;
dictDefragAllocFunction *defragkey = defragfns->defragKey;
dictDefragAllocFunction *defragval = defragfns->defragVal;
Expand All @@ -1182,6 +1251,17 @@ static void dictDefragBucket(dictEntry **bucketref, dictDefragFunctions *defragf
entry = newentry;
}
if (newkey) entry->key = newkey;
} else if (entryIsEmbedded(de)) {
defragfns->defragEntryStartCb(privdata, de);
embeddedDictEntry *entry = decodeEmbeddedEntry(de), *newentry;
if ((newentry = defragalloc(entry))) {
newde = encodeMaskedPtr(newentry, ENTRY_PTR_EMBEDDED);
entry = newentry;
defragfns->defragEntryFinishCb(privdata, newde);
} else {
defragfns->defragEntryFinishCb(privdata, NULL);
}
if (newval) entry->v.val = newval;
} else {
assert(entryIsNormal(de));
newde = defragalloc(de);
Expand Down Expand Up @@ -1345,7 +1425,7 @@ dictScanDefrag(dict *d, unsigned long v, dictScanFunction *fn, dictDefragFunctio

/* Emit entries at cursor */
if (defragfns) {
dictDefragBucket(&d->ht_table[htidx0][v & m0], defragfns);
dictDefragBucket(&d->ht_table[htidx0][v & m0], defragfns, privdata);
}
de = d->ht_table[htidx0][v & m0];
while (de) {
Expand Down Expand Up @@ -1378,7 +1458,7 @@ dictScanDefrag(dict *d, unsigned long v, dictScanFunction *fn, dictDefragFunctio

/* Emit entries at cursor */
if (defragfns) {
dictDefragBucket(&d->ht_table[htidx0][v & m0], defragfns);
dictDefragBucket(&d->ht_table[htidx0][v & m0], defragfns, privdata);
}
de = d->ht_table[htidx0][v & m0];
while (de) {
Expand All @@ -1392,7 +1472,7 @@ dictScanDefrag(dict *d, unsigned long v, dictScanFunction *fn, dictDefragFunctio
do {
/* Emit entries at cursor */
if (defragfns) {
dictDefragBucket(&d->ht_table[htidx1][v & m1], defragfns);
dictDefragBucket(&d->ht_table[htidx1][v & m1], defragfns, privdata);
}
de = d->ht_table[htidx1][v & m1];
while (de) {
Expand Down Expand Up @@ -1565,29 +1645,6 @@ uint64_t dictGetHash(dict *d, const void *key) {
return dictHashKey(d, key);
}

/* Finds the dictEntry using pointer and pre-calculated hash.
* oldkey is a dead pointer and should not be accessed.
* the hash value should be provided using dictGetHash.
* no string / key comparison is performed.
* return value is a pointer to the dictEntry if found, or NULL if not found. */
dictEntry *dictFindEntryByPtrAndHash(dict *d, const void *oldptr, uint64_t hash) {
dictEntry *he;
unsigned long idx, table;

if (dictSize(d) == 0) return NULL; /* dict is empty */
for (table = 0; table <= 1; table++) {
idx = hash & DICTHT_SIZE_MASK(d->ht_size_exp[table]);
if (table == 0 && (long)idx < d->rehashidx) continue;
he = d->ht_table[table][idx];
while (he) {
if (oldptr == dictGetKey(he)) return he;
he = dictGetNext(he);
}
if (!dictIsRehashing(d)) return NULL;
}
return NULL;
}

/* Provides the old and new ht size for a given dictionary during rehashing. This method
* should only be invoked during initialization/rehashing. */
void dictRehashingInfo(dict *d, unsigned long long *from_size, unsigned long long *to_size) {
Expand Down
Loading