From 073dc84399d29a2648aa20ad687524795d13609e Mon Sep 17 00:00:00 2001 From: Hyungjin Kim Date: Fri, 30 Sep 2016 00:11:27 +0900 Subject: [PATCH 1/3] Counting pending subscribe. Fix #396 --- async.c | 34 +++++++++++++++++++++++++++++----- async.h | 1 + 2 files changed, 30 insertions(+), 5 deletions(-) diff --git a/async.c b/async.c index d955203..3654580 100644 --- a/async.c +++ b/async.c @@ -364,6 +364,7 @@ void redisAsyncDisconnect(redisAsyncContext *ac) { static int __redisGetSubscribeCallback(redisAsyncContext *ac, redisReply *reply, redisCallback *dstcb) { redisContext *c = &(ac->c); dict *callbacks; + redisCallback *cb; dictEntry *de; int pvariant; char *stype; @@ -387,16 +388,26 @@ static int __redisGetSubscribeCallback(redisAsyncContext *ac, redisReply *reply, sname = sdsnewlen(reply->element[1]->str,reply->element[1]->len); de = dictFind(callbacks,sname); if (de != NULL) { + cb = dictGetEntryVal(de); + + /* If this is an subscribe reply decrease pending counter. */ + if (strcasecmp(stype+pvariant,"subscribe") == 0) { + cb->pending_subs -= 1; + } + memcpy(dstcb,dictGetEntryVal(de),sizeof(*dstcb)); /* If this is an unsubscribe message, remove it. */ if (strcasecmp(stype+pvariant,"unsubscribe") == 0) { - dictDelete(callbacks,sname); + if (cb->pending_subs == 0) + dictDelete(callbacks,sname); /* If this was the last unsubscribe message, revert to * non-subscribe mode. */ assert(reply->element[2]->type == REDIS_REPLY_INTEGER); - if (reply->element[2]->integer == 0) + + /* Unset subscribed flag only when no pipelined pending subscribe. */ + if (reply->element[2]->integer == 0 && dictSize(callbacks) == 0) c->flags &= ~REDIS_SUBSCRIBED; } } @@ -410,7 +421,7 @@ static int __redisGetSubscribeCallback(redisAsyncContext *ac, redisReply *reply, void redisProcessCallbacks(redisAsyncContext *ac) { redisContext *c = &(ac->c); - redisCallback cb = {NULL, NULL, NULL}; + redisCallback cb = {NULL, NULL, 0, NULL}; void *reply = NULL; int status; @@ -583,6 +594,9 @@ static const char *nextArgument(const char *start, const char **str, size_t *len static int __redisAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata, const char *cmd, size_t len) { redisContext *c = &(ac->c); redisCallback cb; + struct dict *cbdict; + dictEntry *de; + redisCallback *existcb; int pvariant, hasnext; const char *cstr, *astr; size_t clen, alen; @@ -596,6 +610,7 @@ static int __redisAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void /* Setup callback */ cb.fn = fn; cb.privdata = privdata; + cb.pending_subs = 1; /* Find out which command will be appended. */ p = nextArgument(cmd,&cstr,&clen); @@ -612,9 +627,18 @@ static int __redisAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void while ((p = nextArgument(p,&astr,&alen)) != NULL) { sname = sdsnewlen(astr,alen); if (pvariant) - ret = dictReplace(ac->sub.patterns,sname,&cb); + cbdict = ac->sub.patterns; else - ret = dictReplace(ac->sub.channels,sname,&cb); + cbdict = ac->sub.channels; + + de = dictFind(cbdict,sname); + + if (de != NULL) { + existcb = dictGetEntryVal(de); + cb.pending_subs = existcb->pending_subs + 1; + } + + ret = dictReplace(cbdict,sname,&cb); if (ret == 0) sdsfree(sname); } diff --git a/async.h b/async.h index 59cbf46..e69d840 100644 --- a/async.h +++ b/async.h @@ -45,6 +45,7 @@ typedef void (redisCallbackFn)(struct redisAsyncContext*, void*, void*); typedef struct redisCallback { struct redisCallback *next; /* simple singly linked list */ redisCallbackFn *fn; + int pending_subs; void *privdata; } redisCallback; From be76c56b519a0390a7d7b0535685cdaa8254388e Mon Sep 17 00:00:00 2001 From: Hyungjin Kim Date: Mon, 30 Jan 2017 20:00:04 +0900 Subject: [PATCH 2/3] Consider sub by pattern when clear subscribed flag --- async.c | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/async.c b/async.c index 3654580..e3f5ac3 100644 --- a/async.c +++ b/async.c @@ -407,7 +407,9 @@ static int __redisGetSubscribeCallback(redisAsyncContext *ac, redisReply *reply, assert(reply->element[2]->type == REDIS_REPLY_INTEGER); /* Unset subscribed flag only when no pipelined pending subscribe. */ - if (reply->element[2]->integer == 0 && dictSize(callbacks) == 0) + if (reply->element[2]->integer == 0 + && dictSize(ac->sub.channels) == 0 + && dictSize(ac->sub.patterns) == 0) c->flags &= ~REDIS_SUBSCRIBED; } } From d4699989ca9d5074d1843cfd0510fb24310583c3 Mon Sep 17 00:00:00 2001 From: Hyungjin Kim Date: Mon, 30 Jan 2017 20:01:01 +0900 Subject: [PATCH 3/3] Use cached local variable instead using accessor --- async.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/async.c b/async.c index e3f5ac3..c27e012 100644 --- a/async.c +++ b/async.c @@ -395,7 +395,7 @@ static int __redisGetSubscribeCallback(redisAsyncContext *ac, redisReply *reply, cb->pending_subs -= 1; } - memcpy(dstcb,dictGetEntryVal(de),sizeof(*dstcb)); + memcpy(dstcb,cb,sizeof(*dstcb)); /* If this is an unsubscribe message, remove it. */ if (strcasecmp(stype+pvariant,"unsubscribe") == 0) {