From a0ebc5417f8cabaef74c8b7bebb4f308562b4c27 Mon Sep 17 00:00:00 2001 From: Pieter Noordhuis Date: Fri, 31 Dec 2010 14:18:11 +0100 Subject: [PATCH] Delegate pub/sub replies to the right callback --- async.c | 50 ++++++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 48 insertions(+), 2 deletions(-) diff --git a/async.c b/async.c index 3a5fe2d..781b56f 100644 --- a/async.c +++ b/async.c @@ -273,6 +273,53 @@ void redisAsyncDisconnect(redisAsyncContext *ac) { __redisAsyncDisconnect(ac); } +static int __redisGetSubscribeCallback(redisAsyncContext *ac, redisReply *reply, redisCallback *dstcb) { + redisContext *c = &(ac->c); + dict *callbacks; + dictEntry *de; + int pvariant; + char *stype; + sds sname; + + /* Custom reply functions are not supported for pub/sub. This will fail + * very hard when they are used... */ + if (reply->type == REDIS_REPLY_ARRAY) { + assert(reply->elements >= 2); + assert(reply->element[0]->type == REDIS_REPLY_STRING); + stype = reply->element[0]->str; + pvariant = (tolower(stype[0]) == 'p') ? 1 : 0; + + if (pvariant) + callbacks = ac->sub.patterns; + else + callbacks = ac->sub.channels; + + /* Locate the right callback */ + assert(reply->element[1]->type == REDIS_REPLY_STRING); + sname = sdsnewlen(reply->element[1]->str,reply->element[1]->len); + de = dictFind(callbacks,sname); + if (de != NULL) { + memcpy(dstcb,dictGetEntryVal(de),sizeof(*dstcb)); + + /* If this is an unsubscribe message, remove it. */ + if (strcasecmp(stype+pvariant,"unsubscribe") == 0) { + dictDelete(callbacks,sname); + + /* If this was the last unsubscribe message, revert to + * non-subscribe mode. */ + assert(reply->element[2+pvariant]->type == REDIS_REPLY_INTEGER); + if (reply->element[2+pvariant]->integer == 0) + c->flags &= ~REDIS_SUBSCRIBED; + } + } + sdsfree(sname); + } else { + /* Shift callback for invalid commands. */ + __redisShiftCallback(&ac->sub.invalid,dstcb); + } + return REDIS_OK; +} + void redisProcessCallbacks(redisAsyncContext *ac) { redisContext *c = &(ac->c); redisCallback cb; @@ -298,8 +345,7 @@ void redisProcessCallbacks(redisAsyncContext *ac) { if (__redisShiftCallback(&ac->replies,&cb) != REDIS_OK) { /* No more regular callbacks, the context *must* be subscribed. */ assert(c->flags & REDIS_SUBSCRIBED); - - /* TODO: find the right callback for pub/sub message. */ + __redisGetSubscribeCallback(ac,reply,&cb); } if (cb.fn != NULL) {