Add callbacks to channel/pattern dictionaries
This commit is contained in:
parent
e6621d05b4
commit
3ac8ef927d
70
async.c
70
async.c
@ -31,6 +31,7 @@
|
|||||||
|
|
||||||
#include <string.h>
|
#include <string.h>
|
||||||
#include <assert.h>
|
#include <assert.h>
|
||||||
|
#include <ctype.h>
|
||||||
#include "async.h"
|
#include "async.h"
|
||||||
#include "sds.h"
|
#include "sds.h"
|
||||||
#include "util.h"
|
#include "util.h"
|
||||||
@ -366,23 +367,78 @@ void redisAsyncHandleWrite(redisAsyncContext *ac) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Helper function for the redisAsyncCommand* family of functions.
|
/* Sets a pointer to the first argument and its length starting at p. Returns
|
||||||
*
|
* the number of bytes to skip to get to the following argument. */
|
||||||
* Write a formatted command to the output buffer and register the provided
|
static char *nextArgument(char *start, char **str, size_t *len) {
|
||||||
* callback function with the context.
|
char *p = start;
|
||||||
*/
|
if (p[0] != '$') {
|
||||||
|
p = strchr(p,'$');
|
||||||
|
if (p == NULL) return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
*len = (int)strtol(p+1,NULL,10);
|
||||||
|
p = strchr(p,'\r');
|
||||||
|
assert(p);
|
||||||
|
*str = p+2;
|
||||||
|
return p+2+(*len)+2;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Helper function for the redisAsyncCommand* family of functions. Writes a
|
||||||
|
* formatted command to the output buffer and registers the provided callback
|
||||||
|
* function with the context. */
|
||||||
static int __redisAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata, char *cmd, size_t len) {
|
static int __redisAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata, char *cmd, size_t len) {
|
||||||
redisContext *c = &(ac->c);
|
redisContext *c = &(ac->c);
|
||||||
redisCallback cb;
|
redisCallback cb;
|
||||||
|
int pvariant, hasnext;
|
||||||
|
char *cstr, *astr;
|
||||||
|
size_t clen, alen;
|
||||||
|
char *p;
|
||||||
|
sds sname;
|
||||||
|
|
||||||
/* Don't accept new commands when the connection is about to be closed. */
|
/* Don't accept new commands when the connection is about to be closed. */
|
||||||
if (c->flags & (REDIS_DISCONNECTING | REDIS_FREEING)) return REDIS_ERR;
|
if (c->flags & (REDIS_DISCONNECTING | REDIS_FREEING)) return REDIS_ERR;
|
||||||
__redisAppendCommand(c,cmd,len);
|
|
||||||
|
|
||||||
/* Store callback */
|
/* Setup callback */
|
||||||
cb.fn = fn;
|
cb.fn = fn;
|
||||||
cb.privdata = privdata;
|
cb.privdata = privdata;
|
||||||
|
|
||||||
|
/* Find out which command will be appended. */
|
||||||
|
p = nextArgument(cmd,&cstr,&clen);
|
||||||
|
assert(p != NULL);
|
||||||
|
hasnext = (p[0] == '$');
|
||||||
|
pvariant = (tolower(cstr[0]) == 'p') ? 1 : 0;
|
||||||
|
cstr += pvariant;
|
||||||
|
clen -= pvariant;
|
||||||
|
|
||||||
|
if (hasnext && strncasecmp(cstr,"subscribe\r\n",11) == 0) {
|
||||||
|
c->flags |= REDIS_SUBSCRIBED;
|
||||||
|
|
||||||
|
/* Add every channel/pattern to the list of subscription callbacks. */
|
||||||
|
while ((p = nextArgument(p,&astr,&alen)) != NULL) {
|
||||||
|
sname = sdsnewlen(astr,alen);
|
||||||
|
if (pvariant)
|
||||||
|
dictReplace(ac->sub.patterns,sname,&cb);
|
||||||
|
else
|
||||||
|
dictReplace(ac->sub.channels,sname,&cb);
|
||||||
|
}
|
||||||
|
} else if (strncasecmp(cstr,"unsubscribe\r\n",13) == 0) {
|
||||||
|
/* It is only useful to call (P)UNSUBSCRIBE when the context is
|
||||||
|
* subscribed to one or more channels or patterns. */
|
||||||
|
if (!(c->flags & REDIS_SUBSCRIBED)) return REDIS_ERR;
|
||||||
|
|
||||||
|
/* (P)UNSUBSCRIBE does not have its own response: every channel or
|
||||||
|
* pattern that is unsubscribed will receive a message. This means we
|
||||||
|
* should not append a callback function for this command. */
|
||||||
|
} else {
|
||||||
|
if (c->flags & REDIS_SUBSCRIBED)
|
||||||
|
/* This will likely result in an error reply, but it needs to be
|
||||||
|
* received and passed to the callback. */
|
||||||
|
__redisPushCallback(&ac->sub.invalid,&cb);
|
||||||
|
else
|
||||||
__redisPushCallback(&ac->replies,&cb);
|
__redisPushCallback(&ac->replies,&cb);
|
||||||
|
}
|
||||||
|
|
||||||
|
__redisAppendCommand(c,cmd,len);
|
||||||
|
|
||||||
/* Always schedule a write when the write buffer is non-empty */
|
/* Always schedule a write when the write buffer is non-empty */
|
||||||
if (ac->ev.addWrite) ac->ev.addWrite(ac->ev.data);
|
if (ac->ev.addWrite) ac->ev.addWrite(ac->ev.data);
|
||||||
|
Loading…
Reference in New Issue
Block a user