Merge remote-tracking branch 'hyjin/master'

This commit is contained in:
michael-grunder 2018-05-09 16:37:00 -07:00
commit fbc4971d7d
2 changed files with 33 additions and 6 deletions

36
async.c
View File

@ -365,6 +365,7 @@ void redisAsyncDisconnect(redisAsyncContext *ac) {
static int __redisGetSubscribeCallback(redisAsyncContext *ac, redisReply *reply, redisCallback *dstcb) { static int __redisGetSubscribeCallback(redisAsyncContext *ac, redisReply *reply, redisCallback *dstcb) {
redisContext *c = &(ac->c); redisContext *c = &(ac->c);
dict *callbacks; dict *callbacks;
redisCallback *cb;
dictEntry *de; dictEntry *de;
int pvariant; int pvariant;
char *stype; char *stype;
@ -388,16 +389,28 @@ static int __redisGetSubscribeCallback(redisAsyncContext *ac, redisReply *reply,
sname = sdsnewlen(reply->element[1]->str,reply->element[1]->len); sname = sdsnewlen(reply->element[1]->str,reply->element[1]->len);
de = dictFind(callbacks,sname); de = dictFind(callbacks,sname);
if (de != NULL) { if (de != NULL) {
memcpy(dstcb,dictGetEntryVal(de),sizeof(*dstcb)); cb = dictGetEntryVal(de);
/* If this is an subscribe reply decrease pending counter. */
if (strcasecmp(stype+pvariant,"subscribe") == 0) {
cb->pending_subs -= 1;
}
memcpy(dstcb,cb,sizeof(*dstcb));
/* If this is an unsubscribe message, remove it. */ /* If this is an unsubscribe message, remove it. */
if (strcasecmp(stype+pvariant,"unsubscribe") == 0) { if (strcasecmp(stype+pvariant,"unsubscribe") == 0) {
if (cb->pending_subs == 0)
dictDelete(callbacks,sname); dictDelete(callbacks,sname);
/* If this was the last unsubscribe message, revert to /* If this was the last unsubscribe message, revert to
* non-subscribe mode. */ * non-subscribe mode. */
assert(reply->element[2]->type == REDIS_REPLY_INTEGER); assert(reply->element[2]->type == REDIS_REPLY_INTEGER);
if (reply->element[2]->integer == 0)
/* Unset subscribed flag only when no pipelined pending subscribe. */
if (reply->element[2]->integer == 0
&& dictSize(ac->sub.channels) == 0
&& dictSize(ac->sub.patterns) == 0)
c->flags &= ~REDIS_SUBSCRIBED; c->flags &= ~REDIS_SUBSCRIBED;
} }
} }
@ -411,7 +424,7 @@ static int __redisGetSubscribeCallback(redisAsyncContext *ac, redisReply *reply,
void redisProcessCallbacks(redisAsyncContext *ac) { void redisProcessCallbacks(redisAsyncContext *ac) {
redisContext *c = &(ac->c); redisContext *c = &(ac->c);
redisCallback cb = {NULL, NULL, NULL}; redisCallback cb = {NULL, NULL, 0, NULL};
void *reply = NULL; void *reply = NULL;
int status; int status;
@ -584,6 +597,9 @@ static const char *nextArgument(const char *start, const char **str, size_t *len
static int __redisAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata, const char *cmd, size_t len) { static int __redisAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata, const char *cmd, size_t len) {
redisContext *c = &(ac->c); redisContext *c = &(ac->c);
redisCallback cb; redisCallback cb;
struct dict *cbdict;
dictEntry *de;
redisCallback *existcb;
int pvariant, hasnext; int pvariant, hasnext;
const char *cstr, *astr; const char *cstr, *astr;
size_t clen, alen; size_t clen, alen;
@ -597,6 +613,7 @@ static int __redisAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void
/* Setup callback */ /* Setup callback */
cb.fn = fn; cb.fn = fn;
cb.privdata = privdata; cb.privdata = privdata;
cb.pending_subs = 1;
/* Find out which command will be appended. */ /* Find out which command will be appended. */
p = nextArgument(cmd,&cstr,&clen); p = nextArgument(cmd,&cstr,&clen);
@ -613,9 +630,18 @@ static int __redisAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void
while ((p = nextArgument(p,&astr,&alen)) != NULL) { while ((p = nextArgument(p,&astr,&alen)) != NULL) {
sname = sdsnewlen(astr,alen); sname = sdsnewlen(astr,alen);
if (pvariant) if (pvariant)
ret = dictReplace(ac->sub.patterns,sname,&cb); cbdict = ac->sub.patterns;
else else
ret = dictReplace(ac->sub.channels,sname,&cb); cbdict = ac->sub.channels;
de = dictFind(cbdict,sname);
if (de != NULL) {
existcb = dictGetEntryVal(de);
cb.pending_subs = existcb->pending_subs + 1;
}
ret = dictReplace(cbdict,sname,&cb);
if (ret == 0) sdsfree(sname); if (ret == 0) sdsfree(sname);
} }

View File

@ -45,6 +45,7 @@ typedef void (redisCallbackFn)(struct redisAsyncContext*, void*, void*);
typedef struct redisCallback { typedef struct redisCallback {
struct redisCallback *next; /* simple singly linked list */ struct redisCallback *next; /* simple singly linked list */
redisCallbackFn *fn; redisCallbackFn *fn;
int pending_subs;
void *privdata; void *privdata;
} redisCallback; } redisCallback;