diff --git a/async.c b/async.c index 781b56f..d89a2a0 100644 --- a/async.c +++ b/async.c @@ -197,19 +197,42 @@ static int __redisShiftCallback(redisCallbackList *list, redisCallback *target) return REDIS_ERR; } +static void __redisRunCallback(redisAsyncContext *ac, redisCallback *cb, redisReply *reply) { + redisContext *c = &(ac->c); + if (cb->fn != NULL) { + c->flags |= REDIS_IN_CALLBACK; + cb->fn(ac,reply,cb->privdata); + c->flags &= ~REDIS_IN_CALLBACK; + } +} + /* Helper function to free the context. */ static void __redisAsyncFree(redisAsyncContext *ac) { redisContext *c = &(ac->c); redisCallback cb; + dictIterator *it; + dictEntry *de; /* Execute pending callbacks with NULL reply. */ - while (__redisShiftCallback(&ac->replies,&cb) == REDIS_OK) { - if (cb.fn != NULL) { - c->flags |= REDIS_IN_CALLBACK; - cb.fn(ac,NULL,cb.privdata); - c->flags &= ~REDIS_IN_CALLBACK; - } - } + while (__redisShiftCallback(&ac->replies,&cb) == REDIS_OK) + __redisRunCallback(ac,&cb,NULL); + + /* Execute callbacks for invalid commands */ + while (__redisShiftCallback(&ac->sub.invalid,&cb) == REDIS_OK) + __redisRunCallback(ac,&cb,NULL); + + /* Run subscription callbacks callbacks with NULL reply */ + it = dictGetIterator(ac->sub.channels); + while ((de = dictNext(it)) != NULL) + __redisRunCallback(ac,dictGetEntryVal(de),NULL); + dictReleaseIterator(it); + dictRelease(ac->sub.channels); + + it = dictGetIterator(ac->sub.patterns); + while ((de = dictNext(it)) != NULL) + __redisRunCallback(ac,dictGetEntryVal(de),NULL); + dictReleaseIterator(it); + dictRelease(ac->sub.patterns); /* Signal event lib to clean up */ if (ac->ev.cleanup) ac->ev.cleanup(ac->ev.data); @@ -349,9 +372,7 @@ void redisProcessCallbacks(redisAsyncContext *ac) { } if (cb.fn != NULL) { - c->flags |= REDIS_IN_CALLBACK; - cb.fn(ac,reply,cb.privdata); - c->flags &= ~REDIS_IN_CALLBACK; + __redisRunCallback(ac,&cb,reply); c->fn->freeObject(reply); /* Proceed with free'ing when redisAsyncFree() was called. */