diff --git a/async.c b/async.c index 9bf74c8..3ea957b 100644 --- a/async.c +++ b/async.c @@ -41,8 +41,9 @@ static redisAsyncContext *redisAsyncInitialize(redisContext *c) { } redisAsyncContext *redisAsyncConnect(const char *ip, int port) { - redisContext *_c = redisConnectNonBlock(ip,port); - return redisAsyncInitialize(_c); + redisContext *c = redisConnectNonBlock(ip,port); + redisAsyncContext *ac = redisAsyncInitialize(c); + return ac; } int redisAsyncSetReplyObjectFunctions(redisAsyncContext *ac, redisReplyObjectFunctions *fn) { @@ -50,6 +51,40 @@ int redisAsyncSetReplyObjectFunctions(redisAsyncContext *ac, redisReplyObjectFun return redisSetReplyObjectFunctions(c,fn); } +int redisAsyncSetDisconnectCallback(redisAsyncContext *ac, redisDisconnectCallback *fn) { + if (ac->onDisconnect == NULL) { + ac->onDisconnect = fn; + return REDIS_OK; + } + return REDIS_ERR; +} + +/* Tries to do a clean disconnect from Redis, meaning it stops new commands + * from being issued, but tries to flush the output buffer and execute + * callbacks for all remaining replies. */ +void redisAsyncDisconnect(redisAsyncContext *ac) { + redisContext *c = &(ac->c); + c->flags |= REDIS_DISCONNECTING; +} + +/* Helper function to make the disconnect happen and clean up. */ +static void __redisAsyncDisconnect(redisAsyncContext *ac) { + redisContext *c = &(ac->c); + int status; + + /* Signal event lib to stop reading/writing */ + if (ac->evDelRead) ac->evDelRead(ac->data); + if (ac->evDelWrite) ac->evDelWrite(ac->data); + if (ac->evCleanup) ac->evCleanup(ac->data); + + /* Execute callback with proper status */ + status = (c->error == NULL) ? REDIS_OK : REDIS_ERR; + if (ac->onDisconnect) ac->onDisconnect(ac,status); + + /* Cleanup self */ + redisFree(c); +} + /* Helper functions to push/shift callbacks */ static void __redisPushCallback(redisCallbackList *list, redisCallback *cb) { if (list->head == NULL) @@ -69,38 +104,52 @@ static redisCallback *__redisShiftCallback(redisCallbackList *list) { return cb; } -/* This function should be called when the socket is readable. - * It processes all replies that can be read and executes their callbacks. - */ -void redisAsyncHandleRead(redisAsyncContext *ac) { +void redisProcessCallbacks(redisAsyncContext *ac) { redisContext *c = &(ac->c); redisCallback *cb; void *reply = NULL; int status; + while((status = redisGetReply(c,&reply)) == REDIS_OK) { + if (reply == NULL) { + /* When the connection is being disconnected and there are + * no more replies, this is the cue to really disconnect. */ + if (c->flags & REDIS_DISCONNECTING && sdslen(c->obuf) == 0) { + __redisAsyncDisconnect(ac); + return; + } + + /* When the connection is not being disconnected, simply stop + * trying to get replies and wait for the next loop tick. */ + break; + } + + /* Shift callback and execute it */ + cb = __redisShiftCallback(&ac->replies); + assert(cb != NULL); + if (cb->fn != NULL) { + cb->fn(ac,reply,cb->privdata); + } else { + c->fn->freeObject(reply); + } + } + + /* Disconnect when there was an error reading the reply */ + if (status != REDIS_OK) + __redisAsyncDisconnect(ac); +} + +/* This function should be called when the socket is readable. + * It processes all replies that can be read and executes their callbacks. + */ +void redisAsyncHandleRead(redisAsyncContext *ac) { + redisContext *c = &(ac->c); + if (redisBufferRead(c) == REDIS_ERR) { - // needs error handling - assert(NULL); + __redisAsyncDisconnect(ac); } else { /* Always re-schedule reads */ if (ac->evAddRead) ac->evAddRead(ac->data); - - while((status = redisGetReply(c,&reply)) == REDIS_OK) { - /* Abort when there are no more replies */ - if (reply == NULL) break; - - /* Shift callback and execute it */ - cb = __redisShiftCallback(&ac->replies); - assert(cb != NULL); - if (cb->fn != NULL) { - cb->fn(ac,reply,cb->privdata); - } else { - c->fn->freeObject(reply); - } - } - - // needs error handling - assert(status == REDIS_OK); } } @@ -109,8 +158,7 @@ void redisAsyncHandleWrite(redisAsyncContext *ac) { int done = 0; if (redisBufferWrite(c,&done) == REDIS_ERR) { - // needs error handling - assert(NULL); + __redisAsyncDisconnect(ac); } else { /* Continue writing when not done, stop writing otherwise */ if (!done) { @@ -132,6 +180,9 @@ void redisAsyncHandleWrite(redisAsyncContext *ac) { static int __redisAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata, char *cmd, size_t len) { redisContext *c = &(ac->c); redisCallback *cb; + + /* Don't accept new commands when the connection is lazily closed. */ + if (c->flags & REDIS_DISCONNECTING) return REDIS_ERR; c->obuf = sdscatlen(c->obuf,cmd,len); /* Store callback */ diff --git a/async.h b/async.h index e0ac8bf..1678240 100644 --- a/async.h +++ b/async.h @@ -46,6 +46,9 @@ typedef struct redisCallbackList { redisCallback *head, *tail; } redisCallbackList; +/* Disconnect callback prototype */ +typedef void (redisDisconnectCallback)(const struct redisAsyncContext*, int status); + /* Context for an async connection to Redis */ typedef struct redisAsyncContext { /* Hold the regular context, so it can be realloc'ed. */ @@ -57,15 +60,22 @@ typedef struct redisAsyncContext { void (*evDelRead)(void *privdata); void (*evAddWrite)(void *privdata); void (*evDelWrite)(void *privdata); + void (*evCleanup)(void *privdata); void *data; + /* Called when either the connection is terminated due to an error or per + * user request. The status is set accordingly (REDIS_OK, REDIS_ERR). */ + redisDisconnectCallback *onDisconnect; + /* Reply callbacks */ redisCallbackList replies; } redisAsyncContext; /* Functions that proxy to hiredis */ redisAsyncContext *redisAsyncConnect(const char *ip, int port); -int redisAsyncSetReplyObjectFunctions(redisAsyncContext *c, redisReplyObjectFunctions *fn); +int redisAsyncSetReplyObjectFunctions(redisAsyncContext *ac, redisReplyObjectFunctions *fn); +int redisAsyncSetDisconnectCallback(redisAsyncContext *ac, redisDisconnectCallback *fn); +void redisAsyncDisconnect(redisAsyncContext *ac); /* Handle read/write events */ void redisAsyncHandleRead(redisAsyncContext *ac); @@ -73,8 +83,8 @@ void redisAsyncHandleWrite(redisAsyncContext *ac); /* Command functions for an async context. Write the command to the * output buffer and register the provided callback. */ -int redisvAsyncCommand(redisAsyncContext *c, redisCallbackFn *fn, void *privdata, const char *format, va_list ap); -int redisAsyncCommand(redisAsyncContext *c, redisCallbackFn *fn, void *privdata, const char *format, ...); -int redisAsyncCommandArgv(redisAsyncContext *c, redisCallbackFn *fn, void *privdata, int argc, const char **argv, const size_t *argvlen); +int redisvAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata, const char *format, va_list ap); +int redisAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata, const char *format, ...); +int redisAsyncCommandArgv(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata, int argc, const char **argv, const size_t *argvlen); #endif diff --git a/hiredis.h b/hiredis.h index 4e174d7..76b07e0 100644 --- a/hiredis.h +++ b/hiredis.h @@ -43,6 +43,12 @@ * in the flags field is set when the context is connected. */ #define REDIS_CONNECTED 0x2 +/* The async API might try to disconnect cleanly and flush the output + * buffer and read all subsequent replies before disconnecting. + * This flag means no new commands can come in and the connection + * should be terminated once all replies have been read. */ +#define REDIS_DISCONNECTING 0x4 + #define REDIS_ERROR -1 #define REDIS_REPLY_ERROR 0 #define REDIS_REPLY_STRING 1