diff --git a/hiredis.c b/hiredis.c index e08c596..9d5f107 100644 --- a/hiredis.c +++ b/hiredis.c @@ -583,10 +583,13 @@ static int redisContextConnect(redisContext *c, const char *ip, int port) { static redisContext *redisContextInit(redisReplyFunctions *fn) { redisContext *c = malloc(sizeof(*c)); - c->fn = fn == NULL ? &defaultFunctions : fn; - c->obuf = sdsempty(); c->error = NULL; - c->reader = redisReplyReaderCreate(fn); + c->obuf = sdsempty(); + c->fn = fn == NULL ? &defaultFunctions : fn; + c->reader = redisReplyReaderCreate(c->fn); + c->callbacks = calloc(sizeof(redisCallback),1); + c->clen = 1; + c->cpos = 0; return c; } @@ -634,10 +637,31 @@ int redisBufferRead(redisContext *c) { return HIREDIS_OK; } +static void redisPopCallback(redisContext *c) { + if (c->cpos > 1) { + memmove(&c->callbacks[0],&c->callbacks[1],(c->cpos-1)*sizeof(redisCallback)); + } + c->cpos--; +} + void *redisGetReply(redisContext *c) { + redisPopCallback(c); return redisReplyReaderGetReply(c->reader); } +int redisProcessCallbacks(redisContext *c) { + void *reply; + redisCallback cb; + + while ((reply = redisReplyReaderGetReply(c->reader)) != NULL) { + cb = c->callbacks[0]; + if (cb.fn != NULL) + cb.fn(c,reply,cb.privdata); + redisPopCallback(c); + } + return HIREDIS_OK; +} + /* Use this function to try and write the entire output buffer to the * descriptor. Returns 1 when the entire buffer was written, 0 otherwise. */ int redisBufferWrite(redisContext *c, int *done) { @@ -663,12 +687,12 @@ int redisBufferWrite(redisContext *c, int *done) { return HIREDIS_OK; } -static void* redisCommandWrite(redisContext *c, char *str, size_t len) { +static void* redisCommandWrite(redisContext *c, redisCallback *cb, char *str, size_t len) { void *reply = NULL; int wdone = 0; c->obuf = sdscatlen(c->obuf,str,len); - /* Only take action when this is a blocking context. */ + /* Read reply immediately when the context is blocking. */ if (c->flags & HIREDIS_BLOCK) { do { /* Write until done. */ if (redisBufferWrite(c,&wdone) == HIREDIS_ERR) @@ -680,6 +704,20 @@ static void* redisCommandWrite(redisContext *c, char *str, size_t len) { return c->error; reply = redisGetReply(c); } while (reply == NULL); + } else { + /* Make room for the callback. */ + assert(c->cpos <= c->clen); + if (c->cpos == c->clen) { + c->clen++; + c->callbacks = realloc(c->callbacks,c->clen*sizeof(redisCallback)); + } + + if (cb != NULL) { + c->callbacks[c->cpos] = *cb; + } else { + memset(&c->callbacks[c->cpos],0,sizeof(redisCallback)); + } + c->cpos++; } return reply; } @@ -695,7 +733,27 @@ void *redisCommand(redisContext *c, const char *format, ...) { cmd = redisFormatCommand(format,ap); va_end(ap); - reply = redisCommandWrite(c,cmd,sdslen(cmd)); + reply = redisCommandWrite(c,NULL,cmd,sdslen(cmd)); sdsfree(cmd); return reply; } + +/* Write a formatted command to the output buffer and register the provided + * callback function and argument. When this function is called in a + * non-blocking context, it is a no-op. Always returns NULL. */ +void *redisCommandWithCallback(redisContext *c, redisCallbackFn *fn, void *privdata, const char *format, ...) { + va_list ap; + redisCallback cb = { fn, privdata }; + sds cmd; + + /* Skip if the context is blocking. */ + if (c->flags & HIREDIS_BLOCK) return NULL; + + va_start(ap,format); + cmd = redisFormatCommand(format,ap); + va_end(ap); + + redisCommandWrite(c,&cb,cmd,sdslen(cmd)); + sdsfree(cmd); + return NULL; +} diff --git a/hiredis.h b/hiredis.h index 60615b1..85ef87e 100644 --- a/hiredis.h +++ b/hiredis.h @@ -71,14 +71,27 @@ typedef struct redisReplyObjectFunctions { void (*freeObject)(void*); } redisReplyFunctions; +/* Callback prototype */ +struct redisContext; /* needs forward declaration of redisContext */ +typedef void redisCallbackFn(struct redisContext*, redisReply*, void*); + +/* Callback container */ +typedef struct redisCallback { + redisCallbackFn *fn; + void *privdata; +} redisCallback; + /* Context for a connection to Redis */ typedef struct redisContext { int fd; int flags; - char *error; /* error object is set when in erronous state */ - void *reader; /* reply reader */ - sds obuf; /* output buffer */ - redisReplyFunctions *fn; /* functions for reply buildup */ + void *error; /* Error object is set when in erronous state */ + sds obuf; /* Write buffer */ + redisReplyFunctions *fn; + void *reader; + redisCallback *callbacks; + int cpos; + int clen; } redisContext; void freeReplyObject(void *reply); @@ -93,7 +106,9 @@ redisContext *redisConnectNonBlock(const char *ip, int port, redisReplyFunctions int redisBufferRead(redisContext *c); int redisBufferWrite(redisContext *c, int *done); void *redisGetReply(redisContext *c); +int redisProcessCallbacks(redisContext *c); void *redisCommand(redisContext *c, const char *format, ...); +void *redisCommandWithCallback(redisContext *c, redisCallbackFn *fn, void *privdata, const char *format, ...); #endif