Expose API for streaming bytes to a reply

This commit is contained in:
Pieter Noordhuis 2010-09-20 09:50:19 +02:00
parent 4ec97f5907
commit e621f31306
2 changed files with 94 additions and 53 deletions

144
hiredis.c
View File

@ -38,14 +38,12 @@
#include "sds.h"
typedef struct redisReader {
char *buf; /* read buffer */
int len; /* buffer length */
int avail; /* available bytes for consumption */
int pos; /* buffer cursor */
sds buf; /* read buffer */
unsigned int pos; /* buffer cursor */
redisReply **rlist; /* list of items to process */
int rlen; /* list length */
int rpos; /* list cursor */
unsigned int rlen; /* list length */
unsigned int rpos; /* list cursor */
} redisReader;
static redisReply *redisReadReply(int fd);
@ -105,9 +103,9 @@ static redisReply *redisIOError(void) {
return createReplyObject(REDIS_REPLY_ERROR,sdsnew("I/O error"));
}
static char *readBytes(redisReader *r, int bytes) {
static char *readBytes(redisReader *r, unsigned int bytes) {
char *p;
if (r->len-r->pos >= bytes) {
if (sdslen(r->buf)-r->pos >= bytes) {
p = r->buf+r->pos;
r->pos += bytes;
return p;
@ -274,58 +272,98 @@ static int processItem(redisReader *r) {
#define READ_BUFFER_SIZE 2048
static redisReply *redisReadReply(int fd) {
void *reader = redisCreateReplyReader();
redisReply *reply;
redisReader r;
int bytes;
char buf[1024];
int nread;
/* setup read buffer */
r.buf = malloc(READ_BUFFER_SIZE+1);
r.len = READ_BUFFER_SIZE;
r.avail = 0;
r.pos = 0;
/* setup list of items to process */
r.rlist = malloc(sizeof(redisReply*));
r.rlist[0] = createReplyObject(-1,NULL);
r.rlen = 1;
r.rpos = 0;
while (r.rpos < r.rlen) {
/* discard the buffer upto pos */
if (r.pos > 0) {
memmove(r.buf,r.buf+r.pos,r.len-r.pos);
r.avail -= r.pos;
r.pos = 0;
do {
if ((nread = read(fd,buf,sizeof(buf))) <= 0) {
reply = redisIOError();
break;
} else {
reply = redisFeedReplyReader(reader,buf,nread);
}
} while (reply == NULL);
/* make sure there is room for at least BUFFER_SIZE */
if (r.len-r.avail < READ_BUFFER_SIZE) {
r.buf = realloc(r.buf,r.avail+READ_BUFFER_SIZE+1);
r.len = r.avail+READ_BUFFER_SIZE;
}
/* read from socket into buffer */
if ((bytes = read(fd,r.buf+r.avail,READ_BUFFER_SIZE)) <= 0) {
/* rlist[0] is the "root" reply object */
freeReplyObject(r.rlist[0]);
free(r.buf);
free(r.rlist);
return redisIOError();
}
r.avail += bytes;
r.buf[r.avail] = '\0';
/* process items in reply */
while (r.rpos < r.rlen)
if (processItem(&r) < 0)
break;
}
reply = r.rlist[0];
free(r.buf);
free(r.rlist);
redisFreeReplyReader(reader);
return reply;
}
void *redisCreateReplyReader() {
redisReader *r = calloc(sizeof(redisReader),1);
r->buf = sdsempty();
return r;
}
void redisFreeReplyReader(void *reader) {
redisReader *r = reader;
if (r->buf != NULL) {
sdsfree(r->buf);
}
if (r->rlen > 0) {
freeReplyObject(r->rlist[0]);
free(r->rlist);
}
free(r);
}
void *redisFeedReplyReader(void *reader, char *buf, int len) {
redisReader *r = reader;
/* Check if we are able to do *something*. */
if (sdslen(r->buf) == 0 && (buf == NULL || len <= 0))
return NULL;
/* Copy the provided buffer. */
if (buf != NULL && len >= 1)
r->buf = sdscatlen(r->buf,buf,len);
/* Create first item to process when the item list is empty. */
if (r->rlen == 0) {
r->rlist = malloc(sizeof(redisReply*));
r->rlist[0] = createReplyObject(-1,NULL);
r->rlen = 1;
r->rpos = 0;
}
/* Process items in reply. */
while (r->rpos < r->rlen)
if (processItem(r) < 0)
break;
/* Discard the consumed part of the buffer. */
if (r->pos > 0) {
if (r->pos == sdslen(r->buf)) {
/* sdsrange has a quirck on this edge case. */
sdsfree(r->buf);
r->buf = sdsempty();
} else {
r->buf = sdsrange(r->buf,r->pos,sdslen(r->buf));
}
r->pos = 0;
}
/* Emit a reply when there is one. */
if (r->rpos == r->rlen) {
redisReply *reply = r->rlist[0];
/* Destroy the buffer when it is empty and is quite large. */
if (sdslen(r->buf) == 0 && sdsavail(r->buf) > 16*1024) {
sdsfree(r->buf);
r->buf = sdsempty();
r->pos = 0;
}
/* Free list of items to process. */
free(r->rlist);
r->rlen = r->rpos = 0;
return reply;
} else {
return NULL;
}
}
/* Helper function for redisCommand(). It's used to append the next argument
* to the argument vector. */
static void addArgument(sds a, char ***argv, int *argc) {

View File

@ -51,5 +51,8 @@ typedef struct redisReply {
redisReply *redisConnect(int *fd, const char *ip, int port);
void freeReplyObject(redisReply *r);
redisReply *redisCommand(int fd, const char *format, ...);
void *redisCreateReplyReader();
void redisFreeReplyReader(void *ptr);
void *redisFeedReplyReader(void *reader, char *buf, int len);
#endif