Delegate pub/sub replies to the right callback

This commit is contained in:
Pieter Noordhuis 2010-12-31 14:18:11 +01:00
parent 3ac8ef927d
commit a0ebc5417f
1 changed files with 48 additions and 2 deletions

50
async.c
View File

@ -273,6 +273,53 @@ void redisAsyncDisconnect(redisAsyncContext *ac) {
__redisAsyncDisconnect(ac);
}
static int __redisGetSubscribeCallback(redisAsyncContext *ac, redisReply *reply, redisCallback *dstcb) {
redisContext *c = &(ac->c);
dict *callbacks;
dictEntry *de;
int pvariant;
char *stype;
sds sname;
/* Custom reply functions are not supported for pub/sub. This will fail
* very hard when they are used... */
if (reply->type == REDIS_REPLY_ARRAY) {
assert(reply->elements >= 2);
assert(reply->element[0]->type == REDIS_REPLY_STRING);
stype = reply->element[0]->str;
pvariant = (tolower(stype[0]) == 'p') ? 1 : 0;
if (pvariant)
callbacks = ac->sub.patterns;
else
callbacks = ac->sub.channels;
/* Locate the right callback */
assert(reply->element[1]->type == REDIS_REPLY_STRING);
sname = sdsnewlen(reply->element[1]->str,reply->element[1]->len);
de = dictFind(callbacks,sname);
if (de != NULL) {
memcpy(dstcb,dictGetEntryVal(de),sizeof(*dstcb));
/* If this is an unsubscribe message, remove it. */
if (strcasecmp(stype+pvariant,"unsubscribe") == 0) {
dictDelete(callbacks,sname);
/* If this was the last unsubscribe message, revert to
* non-subscribe mode. */
assert(reply->element[2+pvariant]->type == REDIS_REPLY_INTEGER);
if (reply->element[2+pvariant]->integer == 0)
c->flags &= ~REDIS_SUBSCRIBED;
}
}
sdsfree(sname);
} else {
/* Shift callback for invalid commands. */
__redisShiftCallback(&ac->sub.invalid,dstcb);
}
return REDIS_OK;
}
void redisProcessCallbacks(redisAsyncContext *ac) {
redisContext *c = &(ac->c);
redisCallback cb;
@ -298,8 +345,7 @@ void redisProcessCallbacks(redisAsyncContext *ac) {
if (__redisShiftCallback(&ac->replies,&cb) != REDIS_OK) {
/* No more regular callbacks, the context *must* be subscribed. */
assert(c->flags & REDIS_SUBSCRIBED);
/* TODO: find the right callback for pub/sub message. */
__redisGetSubscribeCallback(ac,reply,&cb);
}
if (cb.fn != NULL) {