Support callback functions in a non-blocking context

This commit is contained in:
Pieter Noordhuis 2010-09-25 12:06:00 +02:00
parent de9c172b50
commit bb668e1b94
2 changed files with 83 additions and 10 deletions

View File

@ -583,10 +583,13 @@ static int redisContextConnect(redisContext *c, const char *ip, int port) {
static redisContext *redisContextInit(redisReplyFunctions *fn) { static redisContext *redisContextInit(redisReplyFunctions *fn) {
redisContext *c = malloc(sizeof(*c)); redisContext *c = malloc(sizeof(*c));
c->fn = fn == NULL ? &defaultFunctions : fn;
c->obuf = sdsempty();
c->error = NULL; c->error = NULL;
c->reader = redisReplyReaderCreate(fn); c->obuf = sdsempty();
c->fn = fn == NULL ? &defaultFunctions : fn;
c->reader = redisReplyReaderCreate(c->fn);
c->callbacks = calloc(sizeof(redisCallback),1);
c->clen = 1;
c->cpos = 0;
return c; return c;
} }
@ -634,10 +637,31 @@ int redisBufferRead(redisContext *c) {
return HIREDIS_OK; return HIREDIS_OK;
} }
static void redisPopCallback(redisContext *c) {
if (c->cpos > 1) {
memmove(&c->callbacks[0],&c->callbacks[1],(c->cpos-1)*sizeof(redisCallback));
}
c->cpos--;
}
void *redisGetReply(redisContext *c) { void *redisGetReply(redisContext *c) {
redisPopCallback(c);
return redisReplyReaderGetReply(c->reader); return redisReplyReaderGetReply(c->reader);
} }
int redisProcessCallbacks(redisContext *c) {
void *reply;
redisCallback cb;
while ((reply = redisReplyReaderGetReply(c->reader)) != NULL) {
cb = c->callbacks[0];
if (cb.fn != NULL)
cb.fn(c,reply,cb.privdata);
redisPopCallback(c);
}
return HIREDIS_OK;
}
/* Use this function to try and write the entire output buffer to the /* Use this function to try and write the entire output buffer to the
* descriptor. Returns 1 when the entire buffer was written, 0 otherwise. */ * descriptor. Returns 1 when the entire buffer was written, 0 otherwise. */
int redisBufferWrite(redisContext *c, int *done) { int redisBufferWrite(redisContext *c, int *done) {
@ -663,12 +687,12 @@ int redisBufferWrite(redisContext *c, int *done) {
return HIREDIS_OK; return HIREDIS_OK;
} }
static void* redisCommandWrite(redisContext *c, char *str, size_t len) { static void* redisCommandWrite(redisContext *c, redisCallback *cb, char *str, size_t len) {
void *reply = NULL; void *reply = NULL;
int wdone = 0; int wdone = 0;
c->obuf = sdscatlen(c->obuf,str,len); c->obuf = sdscatlen(c->obuf,str,len);
/* Only take action when this is a blocking context. */ /* Read reply immediately when the context is blocking. */
if (c->flags & HIREDIS_BLOCK) { if (c->flags & HIREDIS_BLOCK) {
do { /* Write until done. */ do { /* Write until done. */
if (redisBufferWrite(c,&wdone) == HIREDIS_ERR) if (redisBufferWrite(c,&wdone) == HIREDIS_ERR)
@ -680,6 +704,20 @@ static void* redisCommandWrite(redisContext *c, char *str, size_t len) {
return c->error; return c->error;
reply = redisGetReply(c); reply = redisGetReply(c);
} while (reply == NULL); } while (reply == NULL);
} else {
/* Make room for the callback. */
assert(c->cpos <= c->clen);
if (c->cpos == c->clen) {
c->clen++;
c->callbacks = realloc(c->callbacks,c->clen*sizeof(redisCallback));
}
if (cb != NULL) {
c->callbacks[c->cpos] = *cb;
} else {
memset(&c->callbacks[c->cpos],0,sizeof(redisCallback));
}
c->cpos++;
} }
return reply; return reply;
} }
@ -695,7 +733,27 @@ void *redisCommand(redisContext *c, const char *format, ...) {
cmd = redisFormatCommand(format,ap); cmd = redisFormatCommand(format,ap);
va_end(ap); va_end(ap);
reply = redisCommandWrite(c,cmd,sdslen(cmd)); reply = redisCommandWrite(c,NULL,cmd,sdslen(cmd));
sdsfree(cmd); sdsfree(cmd);
return reply; return reply;
} }
/* Write a formatted command to the output buffer and register the provided
* callback function and argument. When this function is called in a
* non-blocking context, it is a no-op. Always returns NULL. */
void *redisCommandWithCallback(redisContext *c, redisCallbackFn *fn, void *privdata, const char *format, ...) {
va_list ap;
redisCallback cb = { fn, privdata };
sds cmd;
/* Skip if the context is blocking. */
if (c->flags & HIREDIS_BLOCK) return NULL;
va_start(ap,format);
cmd = redisFormatCommand(format,ap);
va_end(ap);
redisCommandWrite(c,&cb,cmd,sdslen(cmd));
sdsfree(cmd);
return NULL;
}

View File

@ -71,14 +71,27 @@ typedef struct redisReplyObjectFunctions {
void (*freeObject)(void*); void (*freeObject)(void*);
} redisReplyFunctions; } redisReplyFunctions;
/* Callback prototype */
struct redisContext; /* needs forward declaration of redisContext */
typedef void redisCallbackFn(struct redisContext*, redisReply*, void*);
/* Callback container */
typedef struct redisCallback {
redisCallbackFn *fn;
void *privdata;
} redisCallback;
/* Context for a connection to Redis */ /* Context for a connection to Redis */
typedef struct redisContext { typedef struct redisContext {
int fd; int fd;
int flags; int flags;
char *error; /* error object is set when in erronous state */ void *error; /* Error object is set when in erronous state */
void *reader; /* reply reader */ sds obuf; /* Write buffer */
sds obuf; /* output buffer */ redisReplyFunctions *fn;
redisReplyFunctions *fn; /* functions for reply buildup */ void *reader;
redisCallback *callbacks;
int cpos;
int clen;
} redisContext; } redisContext;
void freeReplyObject(void *reply); void freeReplyObject(void *reply);
@ -93,7 +106,9 @@ redisContext *redisConnectNonBlock(const char *ip, int port, redisReplyFunctions
int redisBufferRead(redisContext *c); int redisBufferRead(redisContext *c);
int redisBufferWrite(redisContext *c, int *done); int redisBufferWrite(redisContext *c, int *done);
void *redisGetReply(redisContext *c); void *redisGetReply(redisContext *c);
int redisProcessCallbacks(redisContext *c);
void *redisCommand(redisContext *c, const char *format, ...); void *redisCommand(redisContext *c, const char *format, ...);
void *redisCommandWithCallback(redisContext *c, redisCallbackFn *fn, void *privdata, const char *format, ...);
#endif #endif