Counting pending subscribe. Fix #396
This commit is contained in:
parent
5f98e1d35d
commit
073dc84399
32
async.c
32
async.c
@ -364,6 +364,7 @@ void redisAsyncDisconnect(redisAsyncContext *ac) {
|
|||||||
static int __redisGetSubscribeCallback(redisAsyncContext *ac, redisReply *reply, redisCallback *dstcb) {
|
static int __redisGetSubscribeCallback(redisAsyncContext *ac, redisReply *reply, redisCallback *dstcb) {
|
||||||
redisContext *c = &(ac->c);
|
redisContext *c = &(ac->c);
|
||||||
dict *callbacks;
|
dict *callbacks;
|
||||||
|
redisCallback *cb;
|
||||||
dictEntry *de;
|
dictEntry *de;
|
||||||
int pvariant;
|
int pvariant;
|
||||||
char *stype;
|
char *stype;
|
||||||
@ -387,16 +388,26 @@ static int __redisGetSubscribeCallback(redisAsyncContext *ac, redisReply *reply,
|
|||||||
sname = sdsnewlen(reply->element[1]->str,reply->element[1]->len);
|
sname = sdsnewlen(reply->element[1]->str,reply->element[1]->len);
|
||||||
de = dictFind(callbacks,sname);
|
de = dictFind(callbacks,sname);
|
||||||
if (de != NULL) {
|
if (de != NULL) {
|
||||||
|
cb = dictGetEntryVal(de);
|
||||||
|
|
||||||
|
/* If this is an subscribe reply decrease pending counter. */
|
||||||
|
if (strcasecmp(stype+pvariant,"subscribe") == 0) {
|
||||||
|
cb->pending_subs -= 1;
|
||||||
|
}
|
||||||
|
|
||||||
memcpy(dstcb,dictGetEntryVal(de),sizeof(*dstcb));
|
memcpy(dstcb,dictGetEntryVal(de),sizeof(*dstcb));
|
||||||
|
|
||||||
/* If this is an unsubscribe message, remove it. */
|
/* If this is an unsubscribe message, remove it. */
|
||||||
if (strcasecmp(stype+pvariant,"unsubscribe") == 0) {
|
if (strcasecmp(stype+pvariant,"unsubscribe") == 0) {
|
||||||
|
if (cb->pending_subs == 0)
|
||||||
dictDelete(callbacks,sname);
|
dictDelete(callbacks,sname);
|
||||||
|
|
||||||
/* If this was the last unsubscribe message, revert to
|
/* If this was the last unsubscribe message, revert to
|
||||||
* non-subscribe mode. */
|
* non-subscribe mode. */
|
||||||
assert(reply->element[2]->type == REDIS_REPLY_INTEGER);
|
assert(reply->element[2]->type == REDIS_REPLY_INTEGER);
|
||||||
if (reply->element[2]->integer == 0)
|
|
||||||
|
/* Unset subscribed flag only when no pipelined pending subscribe. */
|
||||||
|
if (reply->element[2]->integer == 0 && dictSize(callbacks) == 0)
|
||||||
c->flags &= ~REDIS_SUBSCRIBED;
|
c->flags &= ~REDIS_SUBSCRIBED;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -410,7 +421,7 @@ static int __redisGetSubscribeCallback(redisAsyncContext *ac, redisReply *reply,
|
|||||||
|
|
||||||
void redisProcessCallbacks(redisAsyncContext *ac) {
|
void redisProcessCallbacks(redisAsyncContext *ac) {
|
||||||
redisContext *c = &(ac->c);
|
redisContext *c = &(ac->c);
|
||||||
redisCallback cb = {NULL, NULL, NULL};
|
redisCallback cb = {NULL, NULL, 0, NULL};
|
||||||
void *reply = NULL;
|
void *reply = NULL;
|
||||||
int status;
|
int status;
|
||||||
|
|
||||||
@ -583,6 +594,9 @@ static const char *nextArgument(const char *start, const char **str, size_t *len
|
|||||||
static int __redisAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata, const char *cmd, size_t len) {
|
static int __redisAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata, const char *cmd, size_t len) {
|
||||||
redisContext *c = &(ac->c);
|
redisContext *c = &(ac->c);
|
||||||
redisCallback cb;
|
redisCallback cb;
|
||||||
|
struct dict *cbdict;
|
||||||
|
dictEntry *de;
|
||||||
|
redisCallback *existcb;
|
||||||
int pvariant, hasnext;
|
int pvariant, hasnext;
|
||||||
const char *cstr, *astr;
|
const char *cstr, *astr;
|
||||||
size_t clen, alen;
|
size_t clen, alen;
|
||||||
@ -596,6 +610,7 @@ static int __redisAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void
|
|||||||
/* Setup callback */
|
/* Setup callback */
|
||||||
cb.fn = fn;
|
cb.fn = fn;
|
||||||
cb.privdata = privdata;
|
cb.privdata = privdata;
|
||||||
|
cb.pending_subs = 1;
|
||||||
|
|
||||||
/* Find out which command will be appended. */
|
/* Find out which command will be appended. */
|
||||||
p = nextArgument(cmd,&cstr,&clen);
|
p = nextArgument(cmd,&cstr,&clen);
|
||||||
@ -612,9 +627,18 @@ static int __redisAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void
|
|||||||
while ((p = nextArgument(p,&astr,&alen)) != NULL) {
|
while ((p = nextArgument(p,&astr,&alen)) != NULL) {
|
||||||
sname = sdsnewlen(astr,alen);
|
sname = sdsnewlen(astr,alen);
|
||||||
if (pvariant)
|
if (pvariant)
|
||||||
ret = dictReplace(ac->sub.patterns,sname,&cb);
|
cbdict = ac->sub.patterns;
|
||||||
else
|
else
|
||||||
ret = dictReplace(ac->sub.channels,sname,&cb);
|
cbdict = ac->sub.channels;
|
||||||
|
|
||||||
|
de = dictFind(cbdict,sname);
|
||||||
|
|
||||||
|
if (de != NULL) {
|
||||||
|
existcb = dictGetEntryVal(de);
|
||||||
|
cb.pending_subs = existcb->pending_subs + 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
ret = dictReplace(cbdict,sname,&cb);
|
||||||
|
|
||||||
if (ret == 0) sdsfree(sname);
|
if (ret == 0) sdsfree(sname);
|
||||||
}
|
}
|
||||||
|
1
async.h
1
async.h
@ -45,6 +45,7 @@ typedef void (redisCallbackFn)(struct redisAsyncContext*, void*, void*);
|
|||||||
typedef struct redisCallback {
|
typedef struct redisCallback {
|
||||||
struct redisCallback *next; /* simple singly linked list */
|
struct redisCallback *next; /* simple singly linked list */
|
||||||
redisCallbackFn *fn;
|
redisCallbackFn *fn;
|
||||||
|
int pending_subs;
|
||||||
void *privdata;
|
void *privdata;
|
||||||
} redisCallback;
|
} redisCallback;
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user