Introduce read tasks to allow hooking other code into reply parsing

This commit is contained in:
Pieter Noordhuis 2010-09-20 18:02:28 +02:00
parent b1fa529cf9
commit db5244045c
2 changed files with 155 additions and 83 deletions

231
hiredis.c
View File

@ -32,16 +32,19 @@
#include <stdlib.h> #include <stdlib.h>
#include <unistd.h> #include <unistd.h>
#include <stdarg.h> #include <stdarg.h>
#include <assert.h>
#include "hiredis.h" #include "hiredis.h"
#include "anet.h" #include "anet.h"
#include "sds.h" #include "sds.h"
typedef struct redisReader { typedef struct redisReader {
void *reply; /* holds temporary reply */
sds buf; /* read buffer */ sds buf; /* read buffer */
unsigned int pos; /* buffer cursor */ unsigned int pos; /* buffer cursor */
redisReply **rlist; /* list of items to process */ redisReadTask *rlist; /* list of items to process */
unsigned int rlen; /* list length */ unsigned int rlen; /* list length */
unsigned int rpos; /* list cursor */ unsigned int rpos; /* list cursor */
} redisReader; } redisReader;
@ -90,7 +93,7 @@ void freeReplyObject(redisReply *r) {
break; /* Nothing to free */ break; /* Nothing to free */
case REDIS_REPLY_ARRAY: case REDIS_REPLY_ARRAY:
for (j = 0; j < r->elements; j++) for (j = 0; j < r->elements; j++)
freeReplyObject(r->element[j]); if (r->element[j]) freeReplyObject(r->element[j]);
free(r->element); free(r->element);
break; break;
default: default:
@ -116,6 +119,58 @@ static redisReply *redisIOError(void) {
return createErrorObject("I/O error"); return createErrorObject("I/O error");
} }
static void *createStringObject(redisReadTask *task, char *str, size_t len) {
redisReply *r = createReplyObject(task->type,sdsnewlen(str,len));
assert(task->type == REDIS_REPLY_ERROR ||
task->type == REDIS_REPLY_STATUS ||
task->type == REDIS_REPLY_STRING);
/* for API compat, set STATUS to STRING */
if (task->type == REDIS_REPLY_STATUS)
r->type = REDIS_REPLY_STRING;
if (task->parent) {
redisReply *parent = task->parent;
assert(parent->type == REDIS_REPLY_ARRAY);
parent->element[task->idx] = r;
}
return r;
}
static void *createArrayObject(redisReadTask *task, int elements) {
redisReply *r = createReplyObject(REDIS_REPLY_ARRAY,NULL);
r->elements = elements;
if ((r->element = calloc(sizeof(redisReply*),elements)) == NULL)
redisOOM();
if (task->parent) {
redisReply *parent = task->parent;
assert(parent->type == REDIS_REPLY_ARRAY);
parent->element[task->idx] = r;
}
return r;
}
static void *createIntegerObject(redisReadTask *task, long long value) {
redisReply *r = createReplyObject(REDIS_REPLY_INTEGER,NULL);
r->integer = value;
if (task->parent) {
redisReply *parent = task->parent;
assert(parent->type == REDIS_REPLY_ARRAY);
parent->element[task->idx] = r;
}
return r;
}
static void *createNilObject(redisReadTask *task) {
redisReply *r = createReplyObject(REDIS_REPLY_NIL,NULL);
if (task->parent) {
redisReply *parent = task->parent;
assert(parent->type == REDIS_REPLY_ARRAY);
parent->element[task->idx] = r;
}
return r;
}
static char *readBytes(redisReader *r, unsigned int bytes) { static char *readBytes(redisReader *r, unsigned int bytes) {
char *p; char *p;
if (sdslen(r->buf)-r->pos >= bytes) { if (sdslen(r->buf)-r->pos >= bytes) {
@ -140,21 +195,21 @@ static char *readLine(redisReader *r, int *_len) {
} }
static int processLineItem(redisReader *r) { static int processLineItem(redisReader *r) {
redisReply *cur = r->rlist[r->rpos]; redisReadTask *cur = &(r->rlist[r->rpos]);
void *obj;
char *p; char *p;
int len; int len;
if ((p = readLine(r,&len)) != NULL) { if ((p = readLine(r,&len)) != NULL) {
if (cur->type == REDIS_REPLY_INTEGER) { if (cur->type == REDIS_REPLY_INTEGER) {
cur->integer = strtoll(p,NULL,10); obj = createIntegerObject(cur,strtoll(p,NULL,10));
} else { } else {
cur->reply = sdsnewlen(p,len); obj = createStringObject(cur,p,len);
} }
/* for API compat, set STATUS to STRING */ /* If there is no root yet, register this object as root. */
if (cur->type == REDIS_REPLY_STATUS) if (r->reply == NULL)
cur->type = REDIS_REPLY_STRING; r->reply = obj;
r->rpos++; r->rpos++;
return 0; return 0;
} }
@ -162,80 +217,90 @@ static int processLineItem(redisReader *r) {
} }
static int processBulkItem(redisReader *r) { static int processBulkItem(redisReader *r) {
redisReply *cur = r->rlist[r->rpos]; redisReadTask *cur = &(r->rlist[r->rpos]);
char *p; void *obj = NULL;
int len; char *p, *s;
long len;
unsigned long bytelen;
if (cur->reply == NULL) { p = r->buf+r->pos;
if ((p = readLine(r,NULL)) != NULL) { s = strstr(p,"\r\n");
len = atoi(p); if (s != NULL) {
if (len == -1) { p = r->buf+r->pos;
/* nil means this item is done */ bytelen = s-(r->buf+r->pos)+2; /* include \r\n */
cur->type = REDIS_REPLY_NIL; len = strtol(p,NULL,10);
cur->reply = sdsempty();
r->rpos++; if (len < 0) {
return 0; /* The nil object can always be created. */
} else { obj = createNilObject(cur);
cur->reply = sdsnewlen(NULL,len);
}
} else { } else {
return -1; /* Only continue when the buffer contains the entire bulk item. */
bytelen += len+2; /* include \r\n */
if (r->pos+bytelen <= sdslen(r->buf)) {
obj = createStringObject(cur,s+2,len);
}
} }
}
len = sdslen(cur->reply); /* Proceed when obj was created. */
/* add two bytes for crlf */ if (obj != NULL) {
if ((p = readBytes(r,len+2)) != NULL) { r->pos += bytelen;
memcpy(cur->reply,p,len); if (r->reply == NULL)
r->rpos++; r->reply = obj;
return 0; r->rpos++;
return 0;
}
} }
return -1; return -1;
} }
static int processMultiBulkItem(redisReader *r) { static int processMultiBulkItem(redisReader *r) {
redisReply *cur = r->rlist[r->rpos]; redisReadTask *cur = &(r->rlist[r->rpos]);
void *obj;
char *p; char *p;
int elements, j; long elements, j;
if ((p = readLine(r,NULL)) != NULL) { if ((p = readLine(r,NULL)) != NULL) {
elements = atoi(p); elements = strtol(p,NULL,10);
if (elements == -1) { if (elements == -1) {
/* empty */ obj = createNilObject(cur);
cur->type = REDIS_REPLY_NIL; } else {
cur->reply = sdsempty(); obj = createArrayObject(cur,elements);
/* Modify read list when there are more than 0 elements. */
if (elements > 0) {
/* Append elements to the read list. */
r->rlen += elements;
if ((r->rlist = realloc(r->rlist,sizeof(redisReadTask)*r->rlen)) == NULL)
redisOOM();
/* Move existing items backwards. */
memmove(&(r->rlist[r->rpos+1+elements]),
&(r->rlist[r->rpos+1]),
(r->rlen-(r->rpos+1+elements))*sizeof(redisReadTask));
/* Populate new read items. */
redisReadTask *t;
for (j = 0; j < elements; j++) {
t = &(r->rlist[r->rpos+1+j]);
t->type = -1;
t->parent = obj;
t->idx = j;
}
}
}
if (obj != NULL) {
if (r->reply == NULL)
r->reply = obj;
r->rpos++; r->rpos++;
return 0; return 0;
} }
} else {
return -1;
} }
return -1;
cur->elements = elements;
r->rlen += elements;
r->rpos++;
/* create placeholder items */
if ((cur->element = malloc(sizeof(redisReply*)*elements)) == NULL)
redisOOM();
if ((r->rlist = realloc(r->rlist,sizeof(redisReply*)*r->rlen)) == NULL)
redisOOM();
/* move existing items backwards */
memmove(&(r->rlist[r->rpos+elements]),
&(r->rlist[r->rpos]),
(r->rlen-(r->rpos+elements))*sizeof(redisReply*));
/* populate item list */
for (j = 0; j < elements; j++) {
cur->element[j] = createReplyObject(-1,NULL);
r->rlist[r->rpos+j] = cur->element[j];
}
return 0;
} }
static int processItem(redisReader *r) { static int processItem(redisReader *r) {
redisReply *cur = r->rlist[r->rpos]; redisReadTask *cur = &(r->rlist[r->rpos]);
char *p; char *p;
sds byte; sds byte;
@ -311,42 +376,40 @@ static redisReply *redisReadReply(int fd) {
void *redisCreateReplyReader() { void *redisCreateReplyReader() {
redisReader *r = calloc(sizeof(redisReader),1); redisReader *r = calloc(sizeof(redisReader),1);
r->buf = sdsempty(); r->buf = sdsempty();
r->rlist = malloc(sizeof(redisReadTask)*1);
return r; return r;
} }
void redisFreeReplyReader(void *reader) { void redisFreeReplyReader(void *reader) {
redisReader *r = reader; redisReader *r = reader;
if (r->buf != NULL) { if (r->reply != NULL)
freeReplyObject(r->reply);
if (r->buf != NULL)
sdsfree(r->buf); sdsfree(r->buf);
} if (r->rlist != NULL)
if (r->rlen > 0) {
freeReplyObject(r->rlist[0]);
free(r->rlist); free(r->rlist);
}
free(r); free(r);
} }
int redisIsReplyReaderEmpty(void *reader) { int redisIsReplyReaderEmpty(void *reader) {
redisReader *r = reader; redisReader *r = reader;
if (r->buf != NULL && sdslen(r->buf) > 0) if ((r->buf != NULL && sdslen(r->buf) > 0) ||
return 0; (r->rpos < r->rlen)) return 0;
if (r->rlist != NULL && r->rpos < r->rlen)
return 0;
return 1; return 1;
} }
static void redisSetReplyReaderError(redisReader *r, redisReply *error) { static void redisSetReplyReaderError(redisReader *r, redisReply *error) {
if (r->reply != NULL)
freeReplyObject(r->reply);
/* Clear remaining buffer when we see a protocol error. */ /* Clear remaining buffer when we see a protocol error. */
if (r->buf != NULL) { if (r->buf != NULL) {
sdsfree(r->buf); sdsfree(r->buf);
r->buf = sdsempty(); r->buf = sdsempty();
r->pos = 0; r->pos = 0;
} }
/* Clear currently allocated objects. */ r->rlen = r->rpos = 0;
if (r->rlist[0] != NULL) r->reply = error;
freeReplyObject(r->rlist[0]);
r->rlen = r->rpos = 1;
r->rlist[0] = error;
} }
void *redisFeedReplyReader(void *reader, char *buf, int len) { void *redisFeedReplyReader(void *reader, char *buf, int len) {
@ -362,8 +425,10 @@ void *redisFeedReplyReader(void *reader, char *buf, int len) {
/* Create first item to process when the item list is empty. */ /* Create first item to process when the item list is empty. */
if (r->rlen == 0) { if (r->rlen == 0) {
r->rlist = malloc(sizeof(redisReply*)); r->rlist = realloc(r->rlist,sizeof(redisReadTask)*1);
r->rlist[0] = createReplyObject(-1,NULL); r->rlist[0].type = -1;
r->rlist[0].parent = NULL;
r->rlist[0].idx = -1;
r->rlen = 1; r->rlen = 1;
r->rpos = 0; r->rpos = 0;
} }
@ -387,7 +452,9 @@ void *redisFeedReplyReader(void *reader, char *buf, int len) {
/* Emit a reply when there is one. */ /* Emit a reply when there is one. */
if (r->rpos == r->rlen) { if (r->rpos == r->rlen) {
redisReply *reply = r->rlist[0]; void *reply = r->reply;
assert(reply != NULL);
r->reply = NULL;
/* Destroy the buffer when it is empty and is quite large. */ /* Destroy the buffer when it is empty and is quite large. */
if (sdslen(r->buf) == 0 && sdsavail(r->buf) > 16*1024) { if (sdslen(r->buf) == 0 && sdsavail(r->buf) > 16*1024) {
@ -396,9 +463,7 @@ void *redisFeedReplyReader(void *reader, char *buf, int len) {
r->pos = 0; r->pos = 0;
} }
/* Free list of items to process. */ /* Set list of items to read to be empty. */
free(r->rlist);
r->rlist = NULL;
r->rlen = r->rpos = 0; r->rlen = r->rpos = 0;
return reply; return reply;
} else { } else {

View File

@ -49,6 +49,13 @@ typedef struct redisReply {
struct redisReply **element; /* elements vector for REDIS_REPLY_ARRAY */ struct redisReply **element; /* elements vector for REDIS_REPLY_ARRAY */
} redisReply; } redisReply;
typedef struct redisReadTask {
int type;
void *parent; /* optional pointer to parent object */
int idx; /* index in parent (array) object */
} redisReadTask;
redisReply *redisConnect(int *fd, const char *ip, int port); redisReply *redisConnect(int *fd, const char *ip, int port);
void freeReplyObject(redisReply *r); void freeReplyObject(redisReply *r);
redisReply *redisCommand(int fd, const char *format, ...); redisReply *redisCommand(int fd, const char *format, ...);