Add support to lazily disconnect an asynchronous connection
This commit is contained in:
parent
ae5a13f557
commit
4e3bd7893d
105
async.c
105
async.c
|
@ -41,8 +41,9 @@ static redisAsyncContext *redisAsyncInitialize(redisContext *c) {
|
|||
}
|
||||
|
||||
redisAsyncContext *redisAsyncConnect(const char *ip, int port) {
|
||||
redisContext *_c = redisConnectNonBlock(ip,port);
|
||||
return redisAsyncInitialize(_c);
|
||||
redisContext *c = redisConnectNonBlock(ip,port);
|
||||
redisAsyncContext *ac = redisAsyncInitialize(c);
|
||||
return ac;
|
||||
}
|
||||
|
||||
int redisAsyncSetReplyObjectFunctions(redisAsyncContext *ac, redisReplyObjectFunctions *fn) {
|
||||
|
@ -50,6 +51,40 @@ int redisAsyncSetReplyObjectFunctions(redisAsyncContext *ac, redisReplyObjectFun
|
|||
return redisSetReplyObjectFunctions(c,fn);
|
||||
}
|
||||
|
||||
int redisAsyncSetDisconnectCallback(redisAsyncContext *ac, redisDisconnectCallback *fn) {
|
||||
if (ac->onDisconnect == NULL) {
|
||||
ac->onDisconnect = fn;
|
||||
return REDIS_OK;
|
||||
}
|
||||
return REDIS_ERR;
|
||||
}
|
||||
|
||||
/* Tries to do a clean disconnect from Redis, meaning it stops new commands
|
||||
* from being issued, but tries to flush the output buffer and execute
|
||||
* callbacks for all remaining replies. */
|
||||
void redisAsyncDisconnect(redisAsyncContext *ac) {
|
||||
redisContext *c = &(ac->c);
|
||||
c->flags |= REDIS_DISCONNECTING;
|
||||
}
|
||||
|
||||
/* Helper function to make the disconnect happen and clean up. */
|
||||
static void __redisAsyncDisconnect(redisAsyncContext *ac) {
|
||||
redisContext *c = &(ac->c);
|
||||
int status;
|
||||
|
||||
/* Signal event lib to stop reading/writing */
|
||||
if (ac->evDelRead) ac->evDelRead(ac->data);
|
||||
if (ac->evDelWrite) ac->evDelWrite(ac->data);
|
||||
if (ac->evCleanup) ac->evCleanup(ac->data);
|
||||
|
||||
/* Execute callback with proper status */
|
||||
status = (c->error == NULL) ? REDIS_OK : REDIS_ERR;
|
||||
if (ac->onDisconnect) ac->onDisconnect(ac,status);
|
||||
|
||||
/* Cleanup self */
|
||||
redisFree(c);
|
||||
}
|
||||
|
||||
/* Helper functions to push/shift callbacks */
|
||||
static void __redisPushCallback(redisCallbackList *list, redisCallback *cb) {
|
||||
if (list->head == NULL)
|
||||
|
@ -69,38 +104,52 @@ static redisCallback *__redisShiftCallback(redisCallbackList *list) {
|
|||
return cb;
|
||||
}
|
||||
|
||||
/* This function should be called when the socket is readable.
|
||||
* It processes all replies that can be read and executes their callbacks.
|
||||
*/
|
||||
void redisAsyncHandleRead(redisAsyncContext *ac) {
|
||||
void redisProcessCallbacks(redisAsyncContext *ac) {
|
||||
redisContext *c = &(ac->c);
|
||||
redisCallback *cb;
|
||||
void *reply = NULL;
|
||||
int status;
|
||||
|
||||
while((status = redisGetReply(c,&reply)) == REDIS_OK) {
|
||||
if (reply == NULL) {
|
||||
/* When the connection is being disconnected and there are
|
||||
* no more replies, this is the cue to really disconnect. */
|
||||
if (c->flags & REDIS_DISCONNECTING && sdslen(c->obuf) == 0) {
|
||||
__redisAsyncDisconnect(ac);
|
||||
return;
|
||||
}
|
||||
|
||||
/* When the connection is not being disconnected, simply stop
|
||||
* trying to get replies and wait for the next loop tick. */
|
||||
break;
|
||||
}
|
||||
|
||||
/* Shift callback and execute it */
|
||||
cb = __redisShiftCallback(&ac->replies);
|
||||
assert(cb != NULL);
|
||||
if (cb->fn != NULL) {
|
||||
cb->fn(ac,reply,cb->privdata);
|
||||
} else {
|
||||
c->fn->freeObject(reply);
|
||||
}
|
||||
}
|
||||
|
||||
/* Disconnect when there was an error reading the reply */
|
||||
if (status != REDIS_OK)
|
||||
__redisAsyncDisconnect(ac);
|
||||
}
|
||||
|
||||
/* This function should be called when the socket is readable.
|
||||
* It processes all replies that can be read and executes their callbacks.
|
||||
*/
|
||||
void redisAsyncHandleRead(redisAsyncContext *ac) {
|
||||
redisContext *c = &(ac->c);
|
||||
|
||||
if (redisBufferRead(c) == REDIS_ERR) {
|
||||
// needs error handling
|
||||
assert(NULL);
|
||||
__redisAsyncDisconnect(ac);
|
||||
} else {
|
||||
/* Always re-schedule reads */
|
||||
if (ac->evAddRead) ac->evAddRead(ac->data);
|
||||
|
||||
while((status = redisGetReply(c,&reply)) == REDIS_OK) {
|
||||
/* Abort when there are no more replies */
|
||||
if (reply == NULL) break;
|
||||
|
||||
/* Shift callback and execute it */
|
||||
cb = __redisShiftCallback(&ac->replies);
|
||||
assert(cb != NULL);
|
||||
if (cb->fn != NULL) {
|
||||
cb->fn(ac,reply,cb->privdata);
|
||||
} else {
|
||||
c->fn->freeObject(reply);
|
||||
}
|
||||
}
|
||||
|
||||
// needs error handling
|
||||
assert(status == REDIS_OK);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -109,8 +158,7 @@ void redisAsyncHandleWrite(redisAsyncContext *ac) {
|
|||
int done = 0;
|
||||
|
||||
if (redisBufferWrite(c,&done) == REDIS_ERR) {
|
||||
// needs error handling
|
||||
assert(NULL);
|
||||
__redisAsyncDisconnect(ac);
|
||||
} else {
|
||||
/* Continue writing when not done, stop writing otherwise */
|
||||
if (!done) {
|
||||
|
@ -132,6 +180,9 @@ void redisAsyncHandleWrite(redisAsyncContext *ac) {
|
|||
static int __redisAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata, char *cmd, size_t len) {
|
||||
redisContext *c = &(ac->c);
|
||||
redisCallback *cb;
|
||||
|
||||
/* Don't accept new commands when the connection is lazily closed. */
|
||||
if (c->flags & REDIS_DISCONNECTING) return REDIS_ERR;
|
||||
c->obuf = sdscatlen(c->obuf,cmd,len);
|
||||
|
||||
/* Store callback */
|
||||
|
|
18
async.h
18
async.h
|
@ -46,6 +46,9 @@ typedef struct redisCallbackList {
|
|||
redisCallback *head, *tail;
|
||||
} redisCallbackList;
|
||||
|
||||
/* Disconnect callback prototype */
|
||||
typedef void (redisDisconnectCallback)(const struct redisAsyncContext*, int status);
|
||||
|
||||
/* Context for an async connection to Redis */
|
||||
typedef struct redisAsyncContext {
|
||||
/* Hold the regular context, so it can be realloc'ed. */
|
||||
|
@ -57,15 +60,22 @@ typedef struct redisAsyncContext {
|
|||
void (*evDelRead)(void *privdata);
|
||||
void (*evAddWrite)(void *privdata);
|
||||
void (*evDelWrite)(void *privdata);
|
||||
void (*evCleanup)(void *privdata);
|
||||
void *data;
|
||||
|
||||
/* Called when either the connection is terminated due to an error or per
|
||||
* user request. The status is set accordingly (REDIS_OK, REDIS_ERR). */
|
||||
redisDisconnectCallback *onDisconnect;
|
||||
|
||||
/* Reply callbacks */
|
||||
redisCallbackList replies;
|
||||
} redisAsyncContext;
|
||||
|
||||
/* Functions that proxy to hiredis */
|
||||
redisAsyncContext *redisAsyncConnect(const char *ip, int port);
|
||||
int redisAsyncSetReplyObjectFunctions(redisAsyncContext *c, redisReplyObjectFunctions *fn);
|
||||
int redisAsyncSetReplyObjectFunctions(redisAsyncContext *ac, redisReplyObjectFunctions *fn);
|
||||
int redisAsyncSetDisconnectCallback(redisAsyncContext *ac, redisDisconnectCallback *fn);
|
||||
void redisAsyncDisconnect(redisAsyncContext *ac);
|
||||
|
||||
/* Handle read/write events */
|
||||
void redisAsyncHandleRead(redisAsyncContext *ac);
|
||||
|
@ -73,8 +83,8 @@ void redisAsyncHandleWrite(redisAsyncContext *ac);
|
|||
|
||||
/* Command functions for an async context. Write the command to the
|
||||
* output buffer and register the provided callback. */
|
||||
int redisvAsyncCommand(redisAsyncContext *c, redisCallbackFn *fn, void *privdata, const char *format, va_list ap);
|
||||
int redisAsyncCommand(redisAsyncContext *c, redisCallbackFn *fn, void *privdata, const char *format, ...);
|
||||
int redisAsyncCommandArgv(redisAsyncContext *c, redisCallbackFn *fn, void *privdata, int argc, const char **argv, const size_t *argvlen);
|
||||
int redisvAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata, const char *format, va_list ap);
|
||||
int redisAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata, const char *format, ...);
|
||||
int redisAsyncCommandArgv(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata, int argc, const char **argv, const size_t *argvlen);
|
||||
|
||||
#endif
|
||||
|
|
|
@ -43,6 +43,12 @@
|
|||
* in the flags field is set when the context is connected. */
|
||||
#define REDIS_CONNECTED 0x2
|
||||
|
||||
/* The async API might try to disconnect cleanly and flush the output
|
||||
* buffer and read all subsequent replies before disconnecting.
|
||||
* This flag means no new commands can come in and the connection
|
||||
* should be terminated once all replies have been read. */
|
||||
#define REDIS_DISCONNECTING 0x4
|
||||
|
||||
#define REDIS_ERROR -1
|
||||
#define REDIS_REPLY_ERROR 0
|
||||
#define REDIS_REPLY_STRING 1
|
||||
|
|
Loading…
Reference in New Issue