Writing a Custom Redis Command In C - Part 2
update: By relying on dictFind
, this implementation makes an assumption that you'll be storing strings in your set. This'll crash if the set is encoded as an intSet
. Open up t_set.c
and look at setTypeIsMember
. You should either call this directly, or use it for inspiration.
In part 1 we set up our xfind
method and hooked it into Redis. Now it's time to write the actual implementation. As a reminder, our goal is to take a sorted set, subtract a set from it, and providing paging (offset/count).
The lookupKeyReadOrReply
function we used returned a redis object (robj
), which we briefly described. robj
has one other member which is of interest to us: encoding
. You see, most data structures in Redis have multiple possible implementation. A sorted set, for example, can be either be a skiplist or a ziplist. Based on the data being stored (the type of values or the number of values), Redis will pick one implementation instead of another. It'll also take care of converting one implementation into another as needed.
Unfortunately, this makes our life more difficult. If you browse though Redis' codebase, you'll find many lines that go something like if (r->encoding == XYZ) { ... } else { ... }
. In many cases, wrappers are available to abstract this implementation detail - however they aren't always publicly available. While the accessibility is something we could easily fix, we'll take a different path. Instead of coding against sorted sets and sets as generic data structures, we'll code against specific implementations. There are already some commands in Redis which do this. For example, if you sort
with a sorted set, Redis will convert it to a skiplist, which is exactly what we'll do:
zobj* zobj = lookupKeyReadOrReply(c, c->argv[1], shared.czero);
if (zobj == NULL || checkType(c, zobj, REDIS_ZSET)) { return; }
zsetConvert(zobj, REDIS_ENCODING_SKIPLIST);
zset *zset = zobj->ptr;
Similarly, sets (which is what we'll diff with) use a special encoding when only storing integers. Since our set will contain strings, we can safely assume that our set is encoded as a hashtable, rather than a intset.
sobj = lookupKeyReadOrReply(c, c->argv[2], shared.czero);
if (sobj == NULL || checkType(c, sobj, REDIS_SET)) { return; }
dict *diff = (dict*)sobj->ptr;
A nice thing about the skiplist implementation we are programming against is the ability to easily walk forwards or backwards through the items. For now, we'll only concern ourselves with going backwards, but it would be trivial to add support for an order (asc/desc) parameter. Our code will first get the tail of the skiplist, and then move backwards:
zskiplist *zsl = zset->zsl;
zskiplistNode *ln = zsl->tail;
while(ln != NULL) {
//todo
ln = ln->backward;
}
The full loop, including applying the diff, looks like:
long found = 0, added = 0;
while(ln != NULL) {
robj *item = ln->obj;
if (dictFind(diff, item) == NULL && found++ >= offset) {
addReplyBulk(c, item);
if (++added == count) { break; }
}
ln = ln->backward;
}
This can can broken down in a few steps. First, we get the value of the current element. If this item is not in our dictionary, and we are beyond our offset, we can add the item to our reply. If we've added all that was expected, we can break.
Our response needs to be prefixed with the number of values. We can't know that value upfront (could be less than count
). Redis provides a function to allocate space for the length and fill it in after the fact:
void *replylen = addDeferredMultiBulkLength(c);
while(ln != NULL) {
//...
}
setDeferredMultiBulkLength(c, replylen, added);
And that, dear reader, is a simple (and very tailored) implementation of xdiff. There's all types of ways to improve this. We could add an order
parameter or implement something like sort's
GET
feature.
One last improvement we'll make is to skip directly to offset
, since we know we won't find an item that fits in our page until then. Instead of pointing to the set's tail, we can simply use the zslGetElementByRank
function. Our complete code looks like:
#include "redis.h"
zskiplistNode* zslGetElementByRank(zskiplist *zsl, unsigned long rank);
void xdiffCommand(redisClient *c) {
long offset, count, added = 0;
robj *zobj, *sobj;
zset *zset;
dict *diff;
void *replylen;
if ((getLongFromObjectOrReply(c, c->argv[3], &offset, NULL) != REDIS_OK)) { return; }
if ((getLongFromObjectOrReply(c, c->argv[4], &count, NULL) != REDIS_OK)) { return; }
zobj = lookupKeyReadOrReply(c, c->argv[1], shared.czero);
if (zobj == NULL || checkType(c, zobj, REDIS_ZSET)) { return; }
sobj = lookupKeyReadOrReply(c, c->argv[2], shared.czero);
if (sobj == NULL || checkType(c, sobj, REDIS_SET)) { return; }
zsetConvert(zobj, REDIS_ENCODING_SKIPLIST);
zset = zobj->ptr;
diff = (dict*)sobj->ptr;
long zsetlen = dictSize(zset->dict);
zskiplistNode *ln = zslGetElementByRank(zset->zsl, zsetlen - offset);
replylen = addDeferredMultiBulkLength(c);
while(ln != NULL) {
robj *item = ln->obj;
if (dictFind(diff, item) == NULL) {
addReplyBulk(c, item);
if (++added == count) { break; }
}
ln = ln->backward;
}
setDeferredMultiBulkLength(c, replylen, added);
}
Maybe this implementation will prove too specific, but hopefully it'll provide some insight into how to build your own.