diff --git a/hiredis.c b/hiredis.c index b914d21..5cfd815 100644 --- a/hiredis.c +++ b/hiredis.c @@ -41,6 +41,7 @@ typedef struct redisReader { struct redisReplyObjectFunctions *fn; + sds error; /* holds optional error */ void *reply; /* holds temporary reply */ sds buf; /* read buffer */ @@ -52,16 +53,14 @@ typedef struct redisReader { } redisReader; static redisReply *createReplyObject(int type, sds reply); -static void *createErrorObject(const char *str, size_t len); static void *createStringObject(redisReadTask *task, char *str, size_t len); static void *createArrayObject(redisReadTask *task, int elements); static void *createIntegerObject(redisReadTask *task, long long value); static void *createNilObject(redisReadTask *task); -static void redisSetReplyReaderError(redisReader *r, void *obj); +static void redisSetReplyReaderError(redisReader *r, sds err); /* Default set of functions to build the reply. */ static redisReplyFunctions defaultFunctions = { - createErrorObject, createStringObject, createArrayObject, createIntegerObject, @@ -106,24 +105,6 @@ void freeReplyObject(void *reply) { free(r); } -/* Helper function that allows printf-like creation of error objects. */ -static void *formatError(redisReplyFunctions *fn, const char *fmt, ...) { - va_list ap; - sds err; - void *obj; - va_start(ap,fmt); - err = sdscatvprintf(sdsempty(),fmt,ap); - va_end(ap); - obj = fn->createError(err,sdslen(err)); - sdsfree(err); - return obj; -} - -static void *createErrorObject(const char *str, size_t len) { - redisReply *r = createReplyObject(REDIS_ERROR,sdsnewlen(str,len)); - return r; -} - static void *createStringObject(redisReadTask *task, char *str, size_t len) { redisReply *r = createReplyObject(task->type,sdsnewlen(str,len)); assert(task->type == REDIS_REPLY_ERROR || @@ -308,7 +289,6 @@ static int processItem(redisReader *r) { redisReadTask *cur = &(r->rlist[r->rpos]); char *p; sds byte; - void *err; /* check if we need to read type */ if (cur->type < 0) { @@ -331,9 +311,8 @@ static int processItem(redisReader *r) { break; default: byte = sdscatrepr(sdsempty(),p,1); - err = formatError(r->fn, - "protocol error, got %s as reply type byte", byte); - redisSetReplyReaderError(r,err); + redisSetReplyReaderError(r,sdscatprintf(sdsempty(), + "protocol error, got %s as reply type byte", byte)); sdsfree(byte); return -1; } @@ -354,14 +333,15 @@ static int processItem(redisReader *r) { case REDIS_REPLY_ARRAY: return processMultiBulkItem(r); default: - err = formatError(r->fn,"unknown item type '%d'", cur->type); - redisSetReplyReaderError(r,err); + redisSetReplyReaderError(r,sdscatprintf(sdsempty(), + "unknown item type '%d'", cur->type)); return -1; } } void *redisReplyReaderCreate(redisReplyFunctions *fn) { redisReader *r = calloc(sizeof(redisReader),1); + r->error = NULL; r->fn = fn == NULL ? &defaultFunctions : fn; r->buf = sdsempty(); r->rlist = malloc(sizeof(redisReadTask)*1); @@ -379,6 +359,8 @@ void *redisReplyReaderGetObject(void *reader) { void redisReplyReaderFree(void *reader) { redisReader *r = reader; + if (r->error != NULL) + sdsfree(r->error); if (r->reply != NULL) r->fn->freeObject(r->reply); if (r->buf != NULL) @@ -388,7 +370,7 @@ void redisReplyReaderFree(void *reader) { free(r); } -static void redisSetReplyReaderError(redisReader *r, void *obj) { +static void redisSetReplyReaderError(redisReader *r, sds err) { if (r->reply != NULL) r->fn->freeObject(r->reply); @@ -399,7 +381,12 @@ static void redisSetReplyReaderError(redisReader *r, void *obj) { r->pos = 0; } r->rlen = r->rpos = 0; - r->reply = obj; + r->error = err; +} + +char *redisReplyReaderGetError(void *reader) { + redisReader *r = reader; + return r->error; } void redisReplyReaderFeed(void *reader, char *buf, int len) { @@ -410,12 +397,13 @@ void redisReplyReaderFeed(void *reader, char *buf, int len) { r->buf = sdscatlen(r->buf,buf,len); } -void *redisReplyReaderGetReply(void *reader) { +int redisReplyReaderGetReply(void *reader, void **reply) { redisReader *r = reader; + if (reply != NULL) *reply = NULL; /* When the buffer is empty, there will never be a reply. */ if (sdslen(r->buf) == 0) - return NULL; + return REDIS_OK; /* Create first item to process when the item list is empty. */ if (r->rlen == 0) { @@ -446,7 +434,7 @@ void *redisReplyReaderGetReply(void *reader) { /* Emit a reply when there is one. */ if (r->rpos == r->rlen) { - void *reply = r->reply; + void *aux = r->reply; r->reply = NULL; /* Destroy the buffer when it is empty and is quite large. */ @@ -458,10 +446,15 @@ void *redisReplyReaderGetReply(void *reader) { /* Set list of items to read to be empty. */ r->rlen = r->rpos = 0; - return reply; - } else { - return NULL; + + /* Check if there actually *is* a reply. */ + if (r->error != NULL) { + return REDIS_ERR; + } else { + if (reply != NULL) *reply = aux; + } } + return REDIS_OK; } /* Helper function for redisCommand(). It's used to append the next argument @@ -571,11 +564,11 @@ static int redisContextConnect(redisContext *c, const char *ip, int port) { } if (c->fd == ANET_ERR) { - c->error = c->fn->createError(err,strlen(err)); + c->error = sdsnew(err); return REDIS_ERR; } if (anetTcpNoDelay(err,c->fd) == ANET_ERR) { - c->error = c->fn->createError(err,strlen(err)); + c->error = sdsnew(err); return REDIS_ERR; } return REDIS_OK; @@ -623,12 +616,12 @@ int redisBufferRead(redisContext *c) { /* Try again later */ } else { /* Set error in context */ - c->error = formatError(c->fn, + c->error = sdscatprintf(sdsempty(), "Error reading from socket: %s", strerror(errno)); return REDIS_ERR; } } else if (nread == 0) { - c->error = formatError(c->fn, + c->error = sdscatprintf(sdsempty(), "Server closed the connection"); return REDIS_ERR; } else { @@ -644,21 +637,34 @@ static void redisPopCallback(redisContext *c) { c->cpos--; } -void *redisGetReply(redisContext *c) { +int redisGetReply(redisContext *c, void **reply) { redisPopCallback(c); - return redisReplyReaderGetReply(c->reader); + if (redisReplyReaderGetReply(c->reader,reply) == REDIS_ERR) { + /* Copy the (protocol) error from the reader to the context. */ + c->error = sdsnew(((redisReader*)c->reader)->error); + return REDIS_ERR; + } + return REDIS_OK; } int redisProcessCallbacks(redisContext *c) { - void *reply; + void *reply = NULL; redisCallback cb; - while ((reply = redisReplyReaderGetReply(c->reader)) != NULL) { + do { cb = c->callbacks[0]; - if (cb.fn != NULL) - cb.fn(c,reply,cb.privdata); - redisPopCallback(c); - } + if (redisGetReply(c,&reply) == REDIS_ERR) + return REDIS_ERR; + + /* Fire callback when there is a reply. */ + if (reply != NULL) { + if (cb.fn != NULL) { + cb.fn(c,reply,cb.privdata); + } else { + c->fn->freeObject(reply); + } + } + } while (reply != NULL); return REDIS_OK; } @@ -671,7 +677,7 @@ int redisBufferWrite(redisContext *c, int *done) { /* Try again later */ } else { /* Set error in context */ - c->error = formatError(c->fn, + c->error = sdscatprintf(sdsempty(), "Error writing to socket: %s", strerror(errno)); return REDIS_ERR; } @@ -687,73 +693,102 @@ int redisBufferWrite(redisContext *c, int *done) { return REDIS_OK; } -static void* redisCommandWrite(redisContext *c, redisCallback *cb, char *str, size_t len) { - void *reply = NULL; +static int redisCommandWriteBlock(redisContext *c, void **reply, char *str, size_t len) { int wdone = 0; + void *aux = NULL; + assert(c->flags & HIREDIS_BLOCK); c->obuf = sdscatlen(c->obuf,str,len); - /* Read reply immediately when the context is blocking. */ - if (c->flags & HIREDIS_BLOCK) { - do { /* Write until done. */ - if (redisBufferWrite(c,&wdone) == REDIS_ERR) - return c->error; - } while (!wdone); + /* Write until done. */ + do { + if (redisBufferWrite(c,&wdone) == REDIS_ERR) + return REDIS_ERR; + } while (!wdone); - do { /* Read until there is a reply. */ - if (redisBufferRead(c) == REDIS_ERR) - return c->error; - reply = redisGetReply(c); - } while (reply == NULL); - } else { - /* Make room for the callback. */ - assert(c->cpos <= c->clen); - if (c->cpos == c->clen) { - c->clen++; - c->callbacks = realloc(c->callbacks,c->clen*sizeof(redisCallback)); - } + /* Read until there is a reply. */ + do { + if (redisBufferRead(c) == REDIS_ERR) + return REDIS_ERR; + if (redisGetReply(c,&aux) == REDIS_ERR) + return REDIS_ERR; + } while (aux == NULL); - if (cb != NULL) { - c->callbacks[c->cpos] = *cb; - } else { - memset(&c->callbacks[c->cpos],0,sizeof(redisCallback)); - } - c->cpos++; - } - return reply; + /* Set reply object. */ + if (reply != NULL) + *reply = aux; + + return REDIS_OK; } -/* Write a formatted command to the output buffer, and, if the context is a - * blocking connection, read the reply and return it. When this function - * is called from a non-blocking context, it will always return NULL. */ +static int redisCommandWriteNonBlock(redisContext *c, redisCallback *cb, char *str, size_t len) { + assert(!(c->flags & HIREDIS_BLOCK)); + c->obuf = sdscatlen(c->obuf,str,len); + + /* Make sure there is space for the callback. */ + assert(c->cpos <= c->clen); + if (c->cpos == c->clen) { + c->clen++; + c->callbacks = realloc(c->callbacks,c->clen*sizeof(redisCallback)); + } + + if (cb != NULL) { + c->callbacks[c->cpos] = *cb; + } else { + memset(&c->callbacks[c->cpos],0,sizeof(redisCallback)); + } + c->cpos++; + + return REDIS_OK; +} + +/* Write a formatted command to the output buffer. If the given context is + * blocking, immediately read the reply into the "reply" pointer. When the + * context is non-blocking, the "reply" pointer will not be used and a + * NULL callback will be appended to the list of callbacks. + * + * Returns the reply when a reply was succesfully retrieved. Returns NULL + * otherwise. When NULL is returned in a blocking context, provided that + * the reply build functions did not return NULL when building the reply, + * the error field in the context will be set. */ void *redisCommand(redisContext *c, const char *format, ...) { va_list ap; sds cmd; - void *reply; + void *reply = NULL; va_start(ap,format); cmd = redisFormatCommand(format,ap); va_end(ap); - reply = redisCommandWrite(c,NULL,cmd,sdslen(cmd)); + if (c->flags & HIREDIS_BLOCK) { + if (redisCommandWriteBlock(c,&reply,cmd,sdslen(cmd)) == REDIS_OK) { + return reply; + } + } else { + redisCommandWriteNonBlock(c,NULL,cmd,sdslen(cmd)); + } sdsfree(cmd); - return reply; + return NULL; } -/* Write a formatted command to the output buffer and register the provided - * callback function and argument. When this function is called in a - * non-blocking context, it is a no-op. Always returns NULL. */ +/* Write a formatted command to the output buffer. Registers the provided + * callback function and argument in the callback list. + * + * Always returns NULL. In a non-blocking context this will never fail because + * this function does not do any I/O. In a blocking context this function will + * have no effect (a callback in a blocking context makes no sense). */ void *redisCommandWithCallback(redisContext *c, redisCallbackFn *fn, void *privdata, const char *format, ...) { va_list ap; - redisCallback cb = { fn, privdata }; sds cmd; + int status; + redisCallback cb = { fn, privdata }; - /* Skip if the context is blocking. */ + /* This function may only be used in a non-blocking context. */ if (c->flags & HIREDIS_BLOCK) return NULL; va_start(ap,format); cmd = redisFormatCommand(format,ap); va_end(ap); - redisCommandWrite(c,&cb,cmd,sdslen(cmd)); + status = redisCommandWriteNonBlock(c,&cb,cmd,sdslen(cmd)); sdsfree(cmd); return NULL; } diff --git a/hiredis.h b/hiredis.h index e57c3ab..e5be725 100644 --- a/hiredis.h +++ b/hiredis.h @@ -63,7 +63,6 @@ typedef struct redisReadTask { } redisReadTask; typedef struct redisReplyObjectFunctions { - void *(*createError)(const char*, size_t); void *(*createString)(redisReadTask*, char*, size_t); void *(*createArray)(redisReadTask*, int); void *(*createInteger)(redisReadTask*, long long); @@ -85,7 +84,7 @@ typedef struct redisCallback { typedef struct redisContext { int fd; int flags; - void *error; /* Error object is set when in erronous state */ + sds error; /* Error object is set when in erronous state */ sds obuf; /* Write buffer */ redisReplyFunctions *fn; void *reader; @@ -97,15 +96,16 @@ typedef struct redisContext { void freeReplyObject(void *reply); void *redisReplyReaderCreate(redisReplyFunctions *fn); void *redisReplyReaderGetObject(void *reader); +char *redisReplyReaderGetError(void *reader); void redisReplyReaderFree(void *ptr); void redisReplyReaderFeed(void *reader, char *buf, int len); -void *redisReplyReaderGetReply(void *reader); +int redisReplyReaderGetReply(void *reader, void **reply); redisContext *redisConnect(const char *ip, int port, redisReplyFunctions *fn); redisContext *redisConnectNonBlock(const char *ip, int port, redisReplyFunctions *fn); int redisBufferRead(redisContext *c); int redisBufferWrite(redisContext *c, int *done); -void *redisGetReply(redisContext *c); +int redisGetReply(redisContext *c, void **reply); int redisProcessCallbacks(redisContext *c); void *redisCommand(redisContext *c, const char *format, ...); diff --git a/test.c b/test.c index 13de115..fed149e 100644 --- a/test.c +++ b/test.c @@ -25,18 +25,17 @@ static void __connect(redisContext **c) { } int main(void) { - int i, tests = 0, fails = 0; + int i, ret, tests = 0, fails = 0; long long t1, t2; redisContext *c; redisReply *reply; void *reader; + char *err; __connect(&c); test("Returns I/O error when the connection is lost: "); - reply = redisCommand(c,"QUIT"); - test_cond(reply->type == REDIS_ERROR && - strcmp(reply->reply,"Server closed the connection") == 0); - freeReplyObject(reply); + test_cond(redisCommand(c,"QUIT") == NULL && + strcmp(c->error,"Server closed the connection") == 0); __connect(&c); /* reconnect */ test("Is able to deliver commands: "); @@ -126,10 +125,10 @@ int main(void) { test("Error handling in reply parser: "); reader = redisReplyReaderCreate(NULL); redisReplyReaderFeed(reader,(char*)"@foo\r\n",6); - reply = redisReplyReaderGetReply(reader); - test_cond(reply->type == REDIS_ERROR && - strcasecmp(reply->reply,"protocol error, got \"@\" as reply type byte") == 0); - freeReplyObject(reply); + ret = redisReplyReaderGetReply(reader,(void*)&reply); + err = redisReplyReaderGetError(reader); + test_cond(ret == REDIS_ERR && + strcasecmp(err,"protocol error, got \"@\" as reply type byte") == 0); redisReplyReaderFree(reader); /* when the reply already contains multiple items, they must be free'd @@ -139,10 +138,10 @@ int main(void) { redisReplyReaderFeed(reader,(char*)"*2\r\n",4); redisReplyReaderFeed(reader,(char*)"$5\r\nhello\r\n",11); redisReplyReaderFeed(reader,(char*)"@foo\r\n",6); - reply = redisReplyReaderGetReply(reader); - test_cond(reply->type == REDIS_ERROR && - strcasecmp(reply->reply,"protocol error, got \"@\" as reply type byte") == 0); - freeReplyObject(reply); + ret = redisReplyReaderGetReply(reader,(void*)&reply); + err = redisReplyReaderGetError(reader); + test_cond(ret == REDIS_ERR && + strcasecmp(err,"protocol error, got \"@\" as reply type byte") == 0); redisReplyReaderFree(reader); test("Throughput:\n");