Use a set of function pointers for building the reply

Allows libraries to wrap the reply parsing code and build the reply in a
streaming fashion. Reply objects can immediately be initialized to the
required type without having to convert an intermediary format.
This commit is contained in:
Pieter Noordhuis 2010-09-20 22:04:35 +02:00
parent 0a6e3b38e7
commit b1bedf5c6d
3 changed files with 51 additions and 18 deletions

View File

@ -39,6 +39,7 @@
#include "sds.h" #include "sds.h"
typedef struct redisReader { typedef struct redisReader {
struct redisReplyObjectFunctions *fn;
void *reply; /* holds temporary reply */ void *reply; /* holds temporary reply */
sds buf; /* read buffer */ sds buf; /* read buffer */
@ -58,6 +59,15 @@ static void *createIntegerObject(redisReadTask *task, long long value);
static void *createNilObject(redisReadTask *task); static void *createNilObject(redisReadTask *task);
static void redisSetReplyReaderError(redisReader *r, void *obj); static void redisSetReplyReaderError(redisReader *r, void *obj);
/* Default set of functions to build the reply. */
static redisReplyFunctions defaultFunctions = {
createStringObject,
createArrayObject,
createIntegerObject,
createNilObject,
freeReplyObject
};
/* We simply abort on out of memory */ /* We simply abort on out of memory */
static void redisOOM(void) { static void redisOOM(void) {
fprintf(stderr,"Out of memory in hiredis.c"); fprintf(stderr,"Out of memory in hiredis.c");
@ -89,7 +99,8 @@ static redisReply *createReplyObject(int type, sds reply) {
} }
/* Free a reply object */ /* Free a reply object */
void freeReplyObject(redisReply *r) { void freeReplyObject(void *reply) {
redisReply *r = reply;
size_t j; size_t j;
switch(r->type) { switch(r->type) {
@ -116,7 +127,12 @@ static void *createErrorObject(redisReader *context, const char *fmt, ...) {
va_start(ap,fmt); va_start(ap,fmt);
err = sdscatvprintf(sdsempty(),fmt,ap); err = sdscatvprintf(sdsempty(),fmt,ap);
va_end(ap); va_end(ap);
obj = createStringObject(&t,err,sdslen(err));
/* Use the context of the reader if it is provided. */
if (context)
obj = context->fn->createString(&t,err,sdslen(err));
else
obj = createStringObject(&t,err,sdslen(err));
sdsfree(err); sdsfree(err);
return obj; return obj;
} }
@ -205,9 +221,9 @@ static int processLineItem(redisReader *r) {
if ((p = readLine(r,&len)) != NULL) { if ((p = readLine(r,&len)) != NULL) {
if (cur->type == REDIS_REPLY_INTEGER) { if (cur->type == REDIS_REPLY_INTEGER) {
obj = createIntegerObject(cur,strtoll(p,NULL,10)); obj = r->fn->createInteger(cur,strtoll(p,NULL,10));
} else { } else {
obj = createStringObject(cur,p,len); obj = r->fn->createString(cur,p,len);
} }
/* If there is no root yet, register this object as root. */ /* If there is no root yet, register this object as root. */
@ -235,12 +251,12 @@ static int processBulkItem(redisReader *r) {
if (len < 0) { if (len < 0) {
/* The nil object can always be created. */ /* The nil object can always be created. */
obj = createNilObject(cur); obj = r->fn->createNil(cur);
} else { } else {
/* Only continue when the buffer contains the entire bulk item. */ /* Only continue when the buffer contains the entire bulk item. */
bytelen += len+2; /* include \r\n */ bytelen += len+2; /* include \r\n */
if (r->pos+bytelen <= sdslen(r->buf)) { if (r->pos+bytelen <= sdslen(r->buf)) {
obj = createStringObject(cur,s+2,len); obj = r->fn->createString(cur,s+2,len);
} }
} }
@ -265,9 +281,9 @@ static int processMultiBulkItem(redisReader *r) {
if ((p = readLine(r,NULL)) != NULL) { if ((p = readLine(r,NULL)) != NULL) {
elements = strtol(p,NULL,10); elements = strtol(p,NULL,10);
if (elements == -1) { if (elements == -1) {
obj = createNilObject(cur); obj = r->fn->createNil(cur);
} else { } else {
obj = createArrayObject(cur,elements); obj = r->fn->createArray(cur,elements);
/* Modify read list when there are more than 0 elements. */ /* Modify read list when there are more than 0 elements. */
if (elements > 0) { if (elements > 0) {
@ -358,7 +374,7 @@ static int processItem(redisReader *r) {
#define READ_BUFFER_SIZE 2048 #define READ_BUFFER_SIZE 2048
static redisReply *redisReadReply(int fd) { static redisReply *redisReadReply(int fd) {
void *reader = redisCreateReplyReader(); void *reader = redisCreateReplyReader(&defaultFunctions);
redisReply *reply; redisReply *reply;
char buf[1024]; char buf[1024];
int nread; int nread;
@ -376,17 +392,27 @@ static redisReply *redisReadReply(int fd) {
return reply; return reply;
} }
void *redisCreateReplyReader() { void *redisCreateReplyReader(redisReplyFunctions *fn) {
redisReader *r = calloc(sizeof(redisReader),1); redisReader *r = calloc(sizeof(redisReader),1);
r->fn = fn == NULL ? &defaultFunctions : fn;
r->buf = sdsempty(); r->buf = sdsempty();
r->rlist = malloc(sizeof(redisReadTask)*1); r->rlist = malloc(sizeof(redisReadTask)*1);
return r; return r;
} }
/* External libraries wrapping hiredis might need access to the temporary
* variable while the reply is built up. When the reader contains an
* object in between receiving some bytes to parse, this object might
* otherwise be free'd by garbage collection. */
void *redisGetReplyObjectFromReplyReader(void *reader) {
redisReader *r = reader;
return r->reply;
}
void redisFreeReplyReader(void *reader) { void redisFreeReplyReader(void *reader) {
redisReader *r = reader; redisReader *r = reader;
if (r->reply != NULL) if (r->reply != NULL)
freeReplyObject(r->reply); r->fn->freeObject(r->reply);
if (r->buf != NULL) if (r->buf != NULL)
sdsfree(r->buf); sdsfree(r->buf);
if (r->rlist != NULL) if (r->rlist != NULL)
@ -403,7 +429,7 @@ int redisIsReplyReaderEmpty(void *reader) {
static void redisSetReplyReaderError(redisReader *r, void *obj) { static void redisSetReplyReaderError(redisReader *r, void *obj) {
if (r->reply != NULL) if (r->reply != NULL)
freeReplyObject(r->reply); r->fn->freeObject(r->reply);
/* Clear remaining buffer when we see a protocol error. */ /* Clear remaining buffer when we see a protocol error. */
if (r->buf != NULL) { if (r->buf != NULL) {
@ -456,7 +482,6 @@ void *redisFeedReplyReader(void *reader, char *buf, int len) {
/* Emit a reply when there is one. */ /* Emit a reply when there is one. */
if (r->rpos == r->rlen) { if (r->rpos == r->rlen) {
void *reply = r->reply; void *reply = r->reply;
assert(reply != NULL);
r->reply = NULL; r->reply = NULL;
/* Destroy the buffer when it is empty and is quite large. */ /* Destroy the buffer when it is empty and is quite large. */

View File

@ -49,17 +49,25 @@ typedef struct redisReply {
struct redisReply **element; /* elements vector for REDIS_REPLY_ARRAY */ struct redisReply **element; /* elements vector for REDIS_REPLY_ARRAY */
} redisReply; } redisReply;
typedef struct redisReadTask { typedef struct redisReadTask {
int type; int type;
void *parent; /* optional pointer to parent object */ void *parent; /* optional pointer to parent object */
int idx; /* index in parent (array) object */ int idx; /* index in parent (array) object */
} redisReadTask; } redisReadTask;
typedef struct redisReplyObjectFunctions {
void *(*createString)(redisReadTask*, char*, size_t);
void *(*createArray)(redisReadTask*, int);
void *(*createInteger)(redisReadTask*, long long);
void *(*createNil)(redisReadTask*);
void (*freeObject)(void*);
} redisReplyFunctions;
redisReply *redisConnect(int *fd, const char *ip, int port); redisReply *redisConnect(int *fd, const char *ip, int port);
void freeReplyObject(redisReply *r); void freeReplyObject(void *reply);
redisReply *redisCommand(int fd, const char *format, ...); redisReply *redisCommand(int fd, const char *format, ...);
void *redisCreateReplyReader(); void *redisCreateReplyReader(redisReplyFunctions *fn);
void *redisGetReplyObjectFromReplyReader(void *reader);
void redisFreeReplyReader(void *ptr); void redisFreeReplyReader(void *ptr);
int redisIsReplyReaderEmpty(void *ptr); int redisIsReplyReaderEmpty(void *ptr);
void *redisFeedReplyReader(void *reader, char *buf, int len); void *redisFeedReplyReader(void *reader, char *buf, int len);

4
test.c
View File

@ -124,7 +124,7 @@ int main(void) {
freeReplyObject(reply); freeReplyObject(reply);
test("Error handling in reply parser: "); test("Error handling in reply parser: ");
reader = redisCreateReplyReader(); reader = redisCreateReplyReader(NULL);
reply = redisFeedReplyReader(reader,(char*)"@foo\r\n",6); reply = redisFeedReplyReader(reader,(char*)"@foo\r\n",6);
test_cond(reply->type == REDIS_PROTOCOL_ERROR && test_cond(reply->type == REDIS_PROTOCOL_ERROR &&
strcasecmp(reply->reply,"protocol error, got \"@\" as reply type byte") == 0); strcasecmp(reply->reply,"protocol error, got \"@\" as reply type byte") == 0);
@ -134,7 +134,7 @@ int main(void) {
/* when the reply already contains multiple items, they must be free'd /* when the reply already contains multiple items, they must be free'd
* on an error. valgrind will bark when this doesn't happen. */ * on an error. valgrind will bark when this doesn't happen. */
test("Memory cleanup in reply parser: "); test("Memory cleanup in reply parser: ");
reader = redisCreateReplyReader(); reader = redisCreateReplyReader(NULL);
redisFeedReplyReader(reader,(char*)"*2\r\n",4); redisFeedReplyReader(reader,(char*)"*2\r\n",4);
redisFeedReplyReader(reader,(char*)"$5\r\nhello\r\n",11); redisFeedReplyReader(reader,(char*)"$5\r\nhello\r\n",11);
reply = redisFeedReplyReader(reader,(char*)"@foo\r\n",6); reply = redisFeedReplyReader(reader,(char*)"@foo\r\n",6);