Rewrite reply parsing to use a read buffer

This commit is contained in:
Pieter Noordhuis 2010-09-19 18:47:05 +02:00
parent 66036d113e
commit 457cdbf7c5
3 changed files with 223 additions and 83 deletions

288
hiredis.c
View File

@ -37,6 +37,17 @@
#include "anet.h"
#include "sds.h"
typedef struct redisReader {
char *buf; /* read buffer */
int len; /* buffer length */
int avail; /* available bytes for consumption */
int pos; /* buffer cursor */
redisReply **rlist; /* list of items to process */
int rlen; /* list length */
int rpos; /* list cursor */
} redisReader;
static redisReply *redisReadReply(int fd);
static redisReply *createReplyObject(int type, sds reply);
@ -93,109 +104,220 @@ static redisReply *redisIOError(void) {
return createReplyObject(REDIS_REPLY_ERROR,sdsnew("I/O error"));
}
/* In a real high performance C client this should be bufferized */
static sds redisReadLine(int fd) {
sds line = sdsempty();
static char *readBytes(redisReader *r, int bytes) {
char *p;
if (r->len-r->pos >= bytes) {
p = r->buf+r->pos;
r->pos += bytes;
return p;
}
return NULL;
}
while(1) {
char c;
ssize_t ret;
static char *readLine(redisReader *r, int *_len) {
char *p, *s = strstr(r->buf+r->pos,"\r\n");
int len;
if (s != NULL) {
p = r->buf+r->pos;
len = s-(r->buf+r->pos);
r->pos += len+2; /* skip \r\n */
if (_len) *_len = len;
return p;
}
return NULL;
}
ret = read(fd,&c,1);
if (ret == -1) {
sdsfree(line);
return NULL;
} else if ((ret == 0) || (c == '\n')) {
break;
static int processLineItem(redisReader *r) {
redisReply *cur = r->rlist[r->rpos];
char *p;
int len;
if ((p = readLine(r,&len)) != NULL) {
if (cur->type == REDIS_REPLY_INTEGER) {
cur->integer = strtoll(p,NULL,10);
} else {
line = sdscatlen(line,&c,1);
cur->reply = sdsnewlen(p,len);
}
/* for API compat, set STATUS to STRING */
if (cur->type == REDIS_REPLY_STATUS)
cur->type = REDIS_REPLY_STRING;
r->rpos++;
return 0;
}
return -1;
}
static int processBulkItem(redisReader *r) {
redisReply *cur = r->rlist[r->rpos];
char *p;
int len;
if (cur->reply == NULL) {
if ((p = readLine(r,NULL)) != NULL) {
len = atoi(p);
if (len == -1) {
/* nil means this item is done */
cur->type = REDIS_REPLY_NIL;
cur->reply = sdsempty();
r->rpos++;
return 0;
} else {
cur->reply = sdsnewlen(NULL,len);
}
} else {
return -1;
}
}
return sdstrim(line,"\r\n");
}
static redisReply *redisReadSingleLineReply(int fd, int type) {
sds buf = redisReadLine(fd);
if (buf == NULL) return redisIOError();
return createReplyObject(type,buf);
}
static redisReply *redisReadIntegerReply(int fd) {
sds buf = redisReadLine(fd);
redisReply *r = malloc(sizeof(*r));
if (r == NULL) redisOOM();
if (buf == NULL) {
free(r);
return redisIOError();
len = sdslen(cur->reply);
/* add two bytes for crlf */
if ((p = readBytes(r,len+2)) != NULL) {
memcpy(cur->reply,p,len);
r->rpos++;
return 0;
}
r->type = REDIS_REPLY_INTEGER;
r->integer = strtoll(buf,NULL,10);
sdsfree(buf);
return r;
return -1;
}
static redisReply *redisReadBulkReply(int fd) {
sds replylen = redisReadLine(fd);
sds buf;
char crlf[2];
int bulklen;
static int processMultiBulkItem(redisReader *r) {
redisReply *cur = r->rlist[r->rpos];
char *p;
int elements, j;
if (replylen == NULL) return redisIOError();
bulklen = atoi(replylen);
sdsfree(replylen);
if (bulklen == -1)
return createReplyObject(REDIS_REPLY_NIL,sdsempty());
if ((p = readLine(r,NULL)) != NULL) {
elements = atoi(p);
if (elements == -1) {
/* empty */
cur->type = REDIS_REPLY_NIL;
cur->reply = sdsempty();
r->rpos++;
return 0;
}
} else {
return -1;
}
buf = sdsnewlen(NULL,bulklen);
anetRead(fd,buf,bulklen);
anetRead(fd,crlf,2);
return createReplyObject(REDIS_REPLY_STRING,buf);
cur->elements = elements;
r->rlen += elements;
r->rpos++;
/* create placeholder items */
if ((cur->element = malloc(sizeof(redisReply*)*elements)) == NULL)
redisOOM();
if ((r->rlist = realloc(r->rlist,sizeof(redisReply*)*r->rlen)) == NULL)
redisOOM();
/* move existing items backwards */
memmove(&(r->rlist[r->rpos+elements]),
&(r->rlist[r->rpos]),
(r->rlen-(r->rpos+elements))*sizeof(redisReply*));
/* populate item list */
for (j = 0; j < elements; j++) {
cur->element[j] = createReplyObject(-1,NULL);
r->rlist[r->rpos+j] = cur->element[j];
}
return 0;
}
static redisReply *redisReadMultiBulkReply(int fd) {
sds replylen = redisReadLine(fd);
long elements, j;
redisReply *r;
static int processItem(redisReader *r) {
redisReply *cur = r->rlist[r->rpos];
char *p;
if (replylen == NULL) return redisIOError();
elements = strtol(replylen,NULL,10);
sdsfree(replylen);
/* check if we need to read type */
if (cur->type < 0) {
if ((p = readBytes(r,1)) != NULL) {
switch (p[0]) {
case '-':
cur->type = REDIS_REPLY_ERROR;
break;
case '+':
cur->type = REDIS_REPLY_STATUS;
break;
case ':':
cur->type = REDIS_REPLY_INTEGER;
break;
case '$':
cur->type = REDIS_REPLY_STRING;
break;
case '*':
cur->type = REDIS_REPLY_ARRAY;
break;
default:
printf("protocol error, got '%c' as reply type byte\n", p[0]);
exit(1);
}
} else {
/* could not consume 1 byte */
return -1;
}
}
if (elements == -1)
return createReplyObject(REDIS_REPLY_NIL,sdsempty());
if ((r = malloc(sizeof(*r))) == NULL) redisOOM();
r->type = REDIS_REPLY_ARRAY;
r->elements = elements;
if ((r->element = malloc(sizeof(*r)*elements)) == NULL) redisOOM();
for (j = 0; j < elements; j++)
r->element[j] = redisReadReply(fd);
return r;
}
static redisReply *redisReadReply(int fd) {
char type;
if (anetRead(fd,&type,1) <= 0) return redisIOError();
switch(type) {
case '-':
return redisReadSingleLineReply(fd,REDIS_REPLY_ERROR);
case '+':
return redisReadSingleLineReply(fd,REDIS_REPLY_STRING);
case ':':
return redisReadIntegerReply(fd);
case '$':
return redisReadBulkReply(fd);
case '*':
return redisReadMultiBulkReply(fd);
/* process typed item */
switch(cur->type) {
case REDIS_REPLY_ERROR:
case REDIS_REPLY_STATUS:
case REDIS_REPLY_INTEGER:
return processLineItem(r);
case REDIS_REPLY_STRING:
return processBulkItem(r);
case REDIS_REPLY_ARRAY:
return processMultiBulkItem(r);
default:
printf("protocol error, got '%c' as reply type byte\n", type);
printf("unknown item type: %d\n", cur->type);
exit(1);
}
}
#define READ_BUFFER_SIZE 2048
static redisReply *redisReadReply(int fd) {
redisReader r;
int bytes;
/* setup read buffer */
r.buf = malloc(READ_BUFFER_SIZE+1);
r.len = READ_BUFFER_SIZE;
r.avail = 0;
r.pos = 0;
/* setup list of items to process */
r.rlist = malloc(sizeof(redisReply*));
r.rlist[0] = createReplyObject(-1,NULL);
r.rlen = 1;
r.rpos = 0;
while (r.rpos < r.rlen) {
/* discard the buffer upto pos */
if (r.pos > 0) {
memmove(r.buf,r.buf+r.pos,r.len-r.pos);
r.avail -= r.pos;
r.pos = 0;
}
/* make sure there is room for at least BUFFER_SIZE */
if (r.len-r.avail < READ_BUFFER_SIZE) {
r.buf = realloc(r.buf,r.avail+READ_BUFFER_SIZE+1);
r.len = r.avail+READ_BUFFER_SIZE;
}
/* read from socket into buffer */
if ((bytes = read(fd,r.buf+r.avail,READ_BUFFER_SIZE)) <= 0)
return redisIOError();
r.avail += bytes;
r.buf[r.avail] = '\0';
/* process items in reply */
while (r.rpos < r.rlen)
if (processItem(&r) < 0)
break;
}
free(r.buf);
free(r.rlist);
return r.rlist[0];
}
/* Helper function for redisCommand(). It's used to append the next argument
* to the argument vector. */
static void addArgument(sds a, char ***argv, int *argc) {

View File

@ -35,6 +35,7 @@
#define REDIS_REPLY_ARRAY 2
#define REDIS_REPLY_INTEGER 3
#define REDIS_REPLY_NIL 4
#define REDIS_REPLY_STATUS 5
#include "sds.h"

17
test.c
View File

@ -99,6 +99,23 @@ int main(void) {
!memcmp(reply->element[1]->reply,"foo",3))
freeReplyObject(reply);
/* test 9 (m/e with multi bulk reply *before* other reply).
* specifically test ordering of reply items to parse. */
printf("#10 can handle nested multi bulk replies: ");
freeReplyObject(redisCommand(fd,"MULTI"));
freeReplyObject(redisCommand(fd,"LRANGE mylist 0 -1"));
freeReplyObject(redisCommand(fd,"PING"));
reply = (redisCommand(fd,"EXEC"));
test_cond(reply->type == REDIS_REPLY_ARRAY &&
reply->elements == 2 &&
reply->element[0]->type == REDIS_REPLY_ARRAY &&
reply->element[0]->elements == 2 &&
!memcmp(reply->element[0]->element[0]->reply,"bar",3) &&
!memcmp(reply->element[0]->element[1]->reply,"foo",3) &&
reply->element[1]->type == REDIS_REPLY_STRING &&
strcasecmp(reply->element[1]->reply,"pong") == 0);
freeReplyObject(reply);
if (fails == 0) {
printf("ALL TESTS PASSED\n");
} else {