Add reply type for protocol errors, in order to never exit()

This commit is contained in:
Pieter Noordhuis 2010-09-20 14:05:23 +02:00
parent aec1fbd2ad
commit e944ea3662
3 changed files with 62 additions and 7 deletions

View File

@ -48,6 +48,8 @@ typedef struct redisReader {
static redisReply *redisReadReply(int fd);
static redisReply *createReplyObject(int type, sds reply);
static redisReply *createErrorObject(const char *fmt, ...);
static void redisSetReplyReaderError(redisReader *r, redisReply *error);
/* We simply abort on out of memory */
static void redisOOM(void) {
@ -64,7 +66,7 @@ redisReply *redisConnect(int *fd, const char *ip, int port) {
*fd = anetTcpConnect(err,ip,port);
if (*fd == ANET_ERR)
return createReplyObject(REDIS_REPLY_ERROR,sdsnew(err));
return createErrorObject(err);
anetTcpNoDelay(NULL,*fd);
return NULL;
}
@ -99,8 +101,19 @@ void freeReplyObject(redisReply *r) {
free(r);
}
static redisReply *createErrorObject(const char *fmt, ...) {
va_list ap;
sds err;
redisReply *r;
va_start(ap,fmt);
err = sdscatvprintf(sdsempty(),fmt,ap);
va_end(ap);
r = createReplyObject(REDIS_PROTOCOL_ERROR,err);
return r;
}
static redisReply *redisIOError(void) {
return createReplyObject(REDIS_REPLY_ERROR,sdsnew("I/O error"));
return createErrorObject("I/O error");
}
static char *readBytes(redisReader *r, unsigned int bytes) {
@ -224,6 +237,7 @@ static int processMultiBulkItem(redisReader *r) {
static int processItem(redisReader *r) {
redisReply *cur = r->rlist[r->rpos];
char *p;
sds byte;
/* check if we need to read type */
if (cur->type < 0) {
@ -245,8 +259,11 @@ static int processItem(redisReader *r) {
cur->type = REDIS_REPLY_ARRAY;
break;
default:
printf("protocol error, got '%c' as reply type byte\n", p[0]);
exit(1);
byte = sdscatrepr(sdsempty(),p,1);
redisSetReplyReaderError(r,createErrorObject(
"protocol error, got %s as reply type byte", byte));
sdsfree(byte);
return -1;
}
} else {
/* could not consume 1 byte */
@ -265,8 +282,9 @@ static int processItem(redisReader *r) {
case REDIS_REPLY_ARRAY:
return processMultiBulkItem(r);
default:
printf("unknown item type: %d\n", cur->type);
exit(1);
redisSetReplyReaderError(r,createErrorObject(
"unknown item type '%d'", cur->type));
return -1;
}
}
@ -308,6 +326,20 @@ void redisFreeReplyReader(void *reader) {
free(r);
}
static void redisSetReplyReaderError(redisReader *r, redisReply *error) {
/* Clear remaining buffer when we see a protocol error. */
if (r->buf != NULL) {
sdsfree(r->buf);
r->buf = sdsempty();
r->pos = 0;
}
/* Clear currently allocated objects. */
if (r->rlist[0] != NULL)
freeReplyObject(r->rlist[0]);
r->rlen = r->rpos = 1;
r->rlist[0] = error;
}
void *redisFeedReplyReader(void *reader, char *buf, int len) {
redisReader *r = reader;
@ -357,6 +389,7 @@ void *redisFeedReplyReader(void *reader, char *buf, int len) {
/* Free list of items to process. */
free(r->rlist);
r->rlist = NULL;
r->rlen = r->rpos = 0;
return reply;
} else {

View File

@ -36,6 +36,7 @@
#define REDIS_REPLY_INTEGER 3
#define REDIS_REPLY_NIL 4
#define REDIS_REPLY_STATUS 5
#define REDIS_PROTOCOL_ERROR 6
#include "sds.h"

23
test.c
View File

@ -29,11 +29,12 @@ int main(void) {
int i, tests = 0, fails = 0;
long long t1, t2;
redisReply *reply;
void *reader;
__connect(&fd);
test("Returns I/O error when the connection is lost: ");
reply = redisCommand(fd,"QUIT");
test_cond(reply->type == REDIS_REPLY_ERROR &&
test_cond(reply->type == REDIS_PROTOCOL_ERROR &&
strcasecmp(reply->reply,"i/o error") == 0);
freeReplyObject(reply);
__connect(&fd); /* reconnect */
@ -122,6 +123,26 @@ int main(void) {
strcasecmp(reply->element[1]->reply,"pong") == 0);
freeReplyObject(reply);
test("Error handling in reply parser: ");
reader = redisCreateReplyReader();
reply = redisFeedReplyReader(reader,(char*)"@foo\r\n",6);
test_cond(reply->type == REDIS_PROTOCOL_ERROR &&
strcasecmp(reply->reply,"protocol error, got \"@\" as reply type byte") == 0);
freeReplyObject(reply);
redisFreeReplyReader(reader);
/* when the reply already contains multiple items, they must be free'd
* on an error. valgrind will bark when this doesn't happen. */
test("Memory cleanup in reply parser: ");
reader = redisCreateReplyReader();
redisFeedReplyReader(reader,(char*)"*2\r\n",4);
redisFeedReplyReader(reader,(char*)"$5\r\nhello\r\n",11);
reply = redisFeedReplyReader(reader,(char*)"@foo\r\n",6);
test_cond(reply->type == REDIS_PROTOCOL_ERROR &&
strcasecmp(reply->reply,"protocol error, got \"@\" as reply type byte") == 0);
freeReplyObject(reply);
redisFreeReplyReader(reader);
test("Throughput:\n");
for (i = 0; i < 500; i++)
freeReplyObject(redisCommand(fd,"LPUSH mylist foo"));