Add logic to handle RESP3 push messages (#819)

Fixes #815
This commit is contained in:
Michael Grunder 2020-05-21 11:12:18 -07:00 committed by GitHub
parent c8999c6602
commit 83bba659b9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 27 additions and 4 deletions

View File

@ -374,7 +374,7 @@ static int __redisGetSubscribeCallback(redisAsyncContext *ac, redisReply *reply,
/* Custom reply functions are not supported for pub/sub. This will fail /* Custom reply functions are not supported for pub/sub. This will fail
* very hard when they are used... */ * very hard when they are used... */
if (reply->type == REDIS_REPLY_ARRAY) { if (reply->type == REDIS_REPLY_ARRAY || reply->type == REDIS_REPLY_PUSH) {
assert(reply->elements >= 2); assert(reply->elements >= 2);
assert(reply->element[0]->type == REDIS_REPLY_STRING); assert(reply->element[0]->type == REDIS_REPLY_STRING);
stype = reply->element[0]->str; stype = reply->element[0]->str;

View File

@ -97,6 +97,7 @@ void freeReplyObject(void *reply) {
case REDIS_REPLY_ARRAY: case REDIS_REPLY_ARRAY:
case REDIS_REPLY_MAP: case REDIS_REPLY_MAP:
case REDIS_REPLY_SET: case REDIS_REPLY_SET:
case REDIS_REPLY_PUSH:
if (r->element != NULL) { if (r->element != NULL) {
for (j = 0; j < r->elements; j++) for (j = 0; j < r->elements; j++)
freeReplyObject(r->element[j]); freeReplyObject(r->element[j]);
@ -155,7 +156,8 @@ static void *createStringObject(const redisReadTask *task, char *str, size_t len
parent = task->parent->obj; parent = task->parent->obj;
assert(parent->type == REDIS_REPLY_ARRAY || assert(parent->type == REDIS_REPLY_ARRAY ||
parent->type == REDIS_REPLY_MAP || parent->type == REDIS_REPLY_MAP ||
parent->type == REDIS_REPLY_SET); parent->type == REDIS_REPLY_SET ||
parent->type == REDIS_REPLY_PUSH);
parent->element[task->idx] = r; parent->element[task->idx] = r;
} }
return r; return r;
@ -201,7 +203,8 @@ static void *createIntegerObject(const redisReadTask *task, long long value) {
parent = task->parent->obj; parent = task->parent->obj;
assert(parent->type == REDIS_REPLY_ARRAY || assert(parent->type == REDIS_REPLY_ARRAY ||
parent->type == REDIS_REPLY_MAP || parent->type == REDIS_REPLY_MAP ||
parent->type == REDIS_REPLY_SET); parent->type == REDIS_REPLY_SET ||
parent->type == REDIS_REPLY_PUSH);
parent->element[task->idx] = r; parent->element[task->idx] = r;
} }
return r; return r;

7
read.c
View File

@ -250,7 +250,8 @@ static void moveToNextTask(redisReader *r) {
prv = r->task[r->ridx-1]; prv = r->task[r->ridx-1];
assert(prv->type == REDIS_REPLY_ARRAY || assert(prv->type == REDIS_REPLY_ARRAY ||
prv->type == REDIS_REPLY_MAP || prv->type == REDIS_REPLY_MAP ||
prv->type == REDIS_REPLY_SET); prv->type == REDIS_REPLY_SET ||
prv->type == REDIS_REPLY_PUSH);
if (cur->idx == prv->elements-1) { if (cur->idx == prv->elements-1) {
r->ridx--; r->ridx--;
} else { } else {
@ -562,6 +563,9 @@ static int processItem(redisReader *r) {
case '=': case '=':
cur->type = REDIS_REPLY_VERB; cur->type = REDIS_REPLY_VERB;
break; break;
case '>':
cur->type = REDIS_REPLY_PUSH;
break;
default: default:
__redisReaderSetErrorProtocolByte(r,*p); __redisReaderSetErrorProtocolByte(r,*p);
return REDIS_ERR; return REDIS_ERR;
@ -587,6 +591,7 @@ static int processItem(redisReader *r) {
case REDIS_REPLY_ARRAY: case REDIS_REPLY_ARRAY:
case REDIS_REPLY_MAP: case REDIS_REPLY_MAP:
case REDIS_REPLY_SET: case REDIS_REPLY_SET:
case REDIS_REPLY_PUSH:
return processAggregateItem(r); return processAggregateItem(r);
default: default:
assert(NULL); assert(NULL);

15
test.c
View File

@ -488,6 +488,21 @@ static void test_reply_reader(void) {
!memcmp(((redisReply*)reply)->str,"LOLWUT", 6)); !memcmp(((redisReply*)reply)->str,"LOLWUT", 6));
freeReplyObject(reply); freeReplyObject(reply);
redisReaderFree(reader); redisReaderFree(reader);
/* RESP3 push messages (Github issue #815) */
test("Can parse RESP3 push messages: ");
reader = redisReaderCreate();
redisReaderFeed(reader,(char*)">2\r\n$6\r\nLOLWUT\r\n:42\r\n",21);
ret = redisReaderGetReply(reader,&reply);
test_cond(ret == REDIS_OK &&
((redisReply*)reply)->type == REDIS_REPLY_PUSH &&
((redisReply*)reply)->elements == 2 &&
((redisReply*)reply)->element[0]->type == REDIS_REPLY_STRING &&
!memcmp(((redisReply*)reply)->element[0]->str,"LOLWUT",6) &&
((redisReply*)reply)->element[1]->type == REDIS_REPLY_INTEGER &&
((redisReply*)reply)->element[1]->integer == 42);
freeReplyObject(reply);
redisReaderFree(reader);
} }
static void test_free_null(void) { static void test_free_null(void) {