Add function to explicitly free an async context

This commit is contained in:
Pieter Noordhuis 2010-12-28 19:19:25 +01:00
parent 3d76f3fe02
commit e3776bfaa6
4 changed files with 49 additions and 15 deletions

56
async.c
View File

@ -151,6 +151,39 @@ static int __redisShiftCallback(redisCallbackList *list, redisCallback *target)
return REDIS_ERR; return REDIS_ERR;
} }
/* Helper function to free the context. */
static void __redisAsyncFree(redisAsyncContext *ac) {
redisContext *c = &(ac->c);
/* Clear callback list */
while (__redisShiftCallback(&ac->replies,NULL) == REDIS_OK);
/* Signal event lib to clean up */
if (ac->evCleanup) ac->evCleanup(ac->_adapter_data);
/* Execute callback with proper status */
if (ac->onDisconnect && (c->flags & REDIS_CONNECTED))
ac->onDisconnect(ac, (ac->err == 0) ? REDIS_OK : REDIS_ERR);
/* Cleanup self */
redisFree(c);
}
/* Free's the async context. When REDIS_CONNECTED is set, it could only have
* been called from a command callback so we need to let control return to
* redisProcessCallbacks() before free'ing can continue.
*
* When REDIS_CONNECTED is not set, the first write event has not yet fired and
* we can free immediately. */
void redisAsyncFree(redisAsyncContext *ac) {
redisContext *c = &(ac->c);
if (c->flags & REDIS_CONNECTED) {
c->flags |= REDIS_FREEING;
} else {
__redisAsyncFree(ac);
}
}
/* Tries to do a clean disconnect from Redis, meaning it stops new commands /* Tries to do a clean disconnect from Redis, meaning it stops new commands
* from being issued, but tries to flush the output buffer and execute * from being issued, but tries to flush the output buffer and execute
* callbacks for all remaining replies. * callbacks for all remaining replies.
@ -167,13 +200,11 @@ void redisAsyncDisconnect(redisAsyncContext *ac) {
static void __redisAsyncDisconnect(redisAsyncContext *ac) { static void __redisAsyncDisconnect(redisAsyncContext *ac) {
redisContext *c = &(ac->c); redisContext *c = &(ac->c);
redisCallback cb; redisCallback cb;
int status;
/* Make sure error is accessible if there is any */ /* Make sure error is accessible if there is any */
__redisAsyncCopyError(ac); __redisAsyncCopyError(ac);
status = (ac->err == 0) ? REDIS_OK : REDIS_ERR;
if (status == REDIS_OK) { if (ac->err == 0) {
/* When the connection is cleanly disconnected, there should not /* When the connection is cleanly disconnected, there should not
* be pending callbacks. */ * be pending callbacks. */
assert(__redisShiftCallback(&ac->replies,NULL) == REDIS_ERR); assert(__redisShiftCallback(&ac->replies,NULL) == REDIS_ERR);
@ -188,14 +219,7 @@ static void __redisAsyncDisconnect(redisAsyncContext *ac) {
} }
} }
/* Signal event lib to clean up */ __redisAsyncFree(ac);
if (ac->evCleanup) ac->evCleanup(ac->_adapter_data);
/* Execute callback with proper status */
if (ac->onDisconnect) ac->onDisconnect(ac,status);
/* Cleanup self */
redisFree(c);
} }
void redisProcessCallbacks(redisAsyncContext *ac) { void redisProcessCallbacks(redisAsyncContext *ac) {
@ -225,6 +249,12 @@ void redisProcessCallbacks(redisAsyncContext *ac) {
} else { } else {
c->fn->freeObject(reply); c->fn->freeObject(reply);
} }
/* Proceed with free'ing when redisAsyncFree() was called. */
if (c->flags & REDIS_FREEING) {
__redisAsyncFree(ac);
return;
}
} }
/* Disconnect when there was an error reading the reply */ /* Disconnect when there was an error reading the reply */
@ -281,8 +311,8 @@ static int __redisAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void
redisContext *c = &(ac->c); redisContext *c = &(ac->c);
redisCallback cb; redisCallback cb;
/* Don't accept new commands when the connection is lazily closed. */ /* Don't accept new commands when the connection is about to be closed. */
if (c->flags & REDIS_DISCONNECTING) return REDIS_ERR; if (c->flags & (REDIS_DISCONNECTING | REDIS_FREEING)) return REDIS_ERR;
__redisAppendCommand(c,cmd,len); __redisAppendCommand(c,cmd,len);
/* Store callback */ /* Store callback */

View File

@ -95,6 +95,7 @@ int redisAsyncSetReplyObjectFunctions(redisAsyncContext *ac, redisReplyObjectFun
int redisAsyncSetConnectCallback(redisAsyncContext *ac, redisConnectCallback *fn); int redisAsyncSetConnectCallback(redisAsyncContext *ac, redisConnectCallback *fn);
int redisAsyncSetDisconnectCallback(redisAsyncContext *ac, redisDisconnectCallback *fn); int redisAsyncSetDisconnectCallback(redisAsyncContext *ac, redisDisconnectCallback *fn);
void redisAsyncDisconnect(redisAsyncContext *ac); void redisAsyncDisconnect(redisAsyncContext *ac);
void redisAsyncFree(redisAsyncContext *ac);
/* Handle read/write events */ /* Handle read/write events */
void redisAsyncHandleRead(redisAsyncContext *ac); void redisAsyncHandleRead(redisAsyncContext *ac);

View File

@ -809,8 +809,7 @@ static redisContext *redisContextInit() {
} }
void redisFree(redisContext *c) { void redisFree(redisContext *c) {
/* Disconnect before free'ing if not yet disconnected. */ if (c->fd > 0)
if (c->flags & REDIS_CONNECTED)
close(c->fd); close(c->fd);
if (c->errstr != NULL) if (c->errstr != NULL)
sdsfree(c->errstr); sdsfree(c->errstr);

View File

@ -64,6 +64,10 @@
* should be terminated once all replies have been read. */ * should be terminated once all replies have been read. */
#define REDIS_DISCONNECTING 0x4 #define REDIS_DISCONNECTING 0x4
/* Flag specific to the async API which means that the context should be clean
* up as soon as possible. */
#define REDIS_FREEING 0x8
#define REDIS_REPLY_STRING 1 #define REDIS_REPLY_STRING 1
#define REDIS_REPLY_ARRAY 2 #define REDIS_REPLY_ARRAY 2
#define REDIS_REPLY_INTEGER 3 #define REDIS_REPLY_INTEGER 3