Revert "Add function to retrieve formatted reply"
This reverts commit 77540aa316
. The change
in buffer strategy is too large to put in a minor release. It is put in
a separate branch in the meantime, so it can be refined and released
together with a minor version bump.
This commit is contained in:
parent
f9a3229873
commit
375ba48eea
50
hiredis.c
50
hiredis.c
@ -358,12 +358,7 @@ static int processLineItem(redisReader *r) {
|
|||||||
char *p;
|
char *p;
|
||||||
int len;
|
int len;
|
||||||
|
|
||||||
cur->poff = (r->pos-r->roff)-1;
|
|
||||||
cur->coff = cur->poff+1;
|
|
||||||
if ((p = readLine(r,&len)) != NULL) {
|
if ((p = readLine(r,&len)) != NULL) {
|
||||||
cur->plen = 1+len+2; /* include \r\n */
|
|
||||||
cur->clen = len;
|
|
||||||
|
|
||||||
if (cur->type == REDIS_REPLY_INTEGER) {
|
if (cur->type == REDIS_REPLY_INTEGER) {
|
||||||
if (r->fn && r->fn->createInteger)
|
if (r->fn && r->fn->createInteger)
|
||||||
obj = r->fn->createInteger(cur,readLongLong(p));
|
obj = r->fn->createInteger(cur,readLongLong(p));
|
||||||
@ -402,13 +397,10 @@ static int processBulkItem(redisReader *r) {
|
|||||||
p = r->buf+r->pos;
|
p = r->buf+r->pos;
|
||||||
s = seekNewline(p,r->len-r->pos);
|
s = seekNewline(p,r->len-r->pos);
|
||||||
if (s != NULL) {
|
if (s != NULL) {
|
||||||
|
p = r->buf+r->pos;
|
||||||
bytelen = s-(r->buf+r->pos)+2; /* include \r\n */
|
bytelen = s-(r->buf+r->pos)+2; /* include \r\n */
|
||||||
cur->poff = (r->pos-r->roff)-1;
|
|
||||||
cur->plen = bytelen+1;
|
|
||||||
cur->coff = cur->poff+1+bytelen;
|
|
||||||
cur->clen = 0;
|
|
||||||
|
|
||||||
len = readLongLong(p);
|
len = readLongLong(p);
|
||||||
|
|
||||||
if (len < 0) {
|
if (len < 0) {
|
||||||
/* The nil object can always be created. */
|
/* The nil object can always be created. */
|
||||||
if (r->fn && r->fn->createNil)
|
if (r->fn && r->fn->createNil)
|
||||||
@ -420,8 +412,6 @@ static int processBulkItem(redisReader *r) {
|
|||||||
/* Only continue when the buffer contains the entire bulk item. */
|
/* Only continue when the buffer contains the entire bulk item. */
|
||||||
bytelen += len+2; /* include \r\n */
|
bytelen += len+2; /* include \r\n */
|
||||||
if (r->pos+bytelen <= r->len) {
|
if (r->pos+bytelen <= r->len) {
|
||||||
cur->plen += len+2;
|
|
||||||
cur->clen = len;
|
|
||||||
if (r->fn && r->fn->createString)
|
if (r->fn && r->fn->createString)
|
||||||
obj = r->fn->createString(cur,s+2,len);
|
obj = r->fn->createString(cur,s+2,len);
|
||||||
else
|
else
|
||||||
@ -463,12 +453,7 @@ static int processMultiBulkItem(redisReader *r) {
|
|||||||
return REDIS_ERR;
|
return REDIS_ERR;
|
||||||
}
|
}
|
||||||
|
|
||||||
cur->poff = (r->pos-r->roff)-1;
|
|
||||||
cur->coff = 0;
|
|
||||||
if ((p = readLine(r,NULL)) != NULL) {
|
if ((p = readLine(r,NULL)) != NULL) {
|
||||||
cur->plen = (r->pos-r->roff)-cur->poff; /* includes \r\n */
|
|
||||||
cur->clen = 0;
|
|
||||||
|
|
||||||
elements = readLongLong(p);
|
elements = readLongLong(p);
|
||||||
root = (r->ridx == 0);
|
root = (r->ridx == 0);
|
||||||
|
|
||||||
@ -605,7 +590,7 @@ int redisReaderFeed(redisReader *r, const char *buf, size_t len) {
|
|||||||
|
|
||||||
/* Copy the provided buffer. */
|
/* Copy the provided buffer. */
|
||||||
if (buf != NULL && len >= 1) {
|
if (buf != NULL && len >= 1) {
|
||||||
/* Destroy buffer when it is empty and is quite large. */
|
/* Destroy internal buffer when it is empty and is quite large. */
|
||||||
if (r->len == 0 && sdsavail(r->buf) > 16*1024) {
|
if (r->len == 0 && sdsavail(r->buf) > 16*1024) {
|
||||||
sdsfree(r->buf);
|
sdsfree(r->buf);
|
||||||
r->buf = sdsempty();
|
r->buf = sdsempty();
|
||||||
@ -615,15 +600,6 @@ int redisReaderFeed(redisReader *r, const char *buf, size_t len) {
|
|||||||
assert(r->buf != NULL);
|
assert(r->buf != NULL);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Discard consumed part of the buffer when the offset for the reply
|
|
||||||
* that is currently being read is high enough. */
|
|
||||||
if (r->roff >= 1024) {
|
|
||||||
r->buf = sdsrange(r->buf,r->roff,-1);
|
|
||||||
r->pos -= r->roff;
|
|
||||||
r->roff = 0;
|
|
||||||
r->len = sdslen(r->buf);
|
|
||||||
}
|
|
||||||
|
|
||||||
newbuf = sdscatlen(r->buf,buf,len);
|
newbuf = sdscatlen(r->buf,buf,len);
|
||||||
if (newbuf == NULL) {
|
if (newbuf == NULL) {
|
||||||
__redisReaderSetErrorOOM(r);
|
__redisReaderSetErrorOOM(r);
|
||||||
@ -659,7 +635,6 @@ int redisReaderGetReply(redisReader *r, void **reply) {
|
|||||||
r->rstack[0].parent = NULL;
|
r->rstack[0].parent = NULL;
|
||||||
r->rstack[0].privdata = r->privdata;
|
r->rstack[0].privdata = r->privdata;
|
||||||
r->ridx = 0;
|
r->ridx = 0;
|
||||||
r->roff = r->pos; /* Start offset in buffer. */
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Process items in reply. */
|
/* Process items in reply. */
|
||||||
@ -671,6 +646,14 @@ int redisReaderGetReply(redisReader *r, void **reply) {
|
|||||||
if (r->err)
|
if (r->err)
|
||||||
return REDIS_ERR;
|
return REDIS_ERR;
|
||||||
|
|
||||||
|
/* Discard part of the buffer when we've consumed at least 1k, to avoid
|
||||||
|
* doing unnecessary calls to memmove() in sds.c. */
|
||||||
|
if (r->pos >= 1024) {
|
||||||
|
r->buf = sdsrange(r->buf,r->pos,-1);
|
||||||
|
r->pos = 0;
|
||||||
|
r->len = sdslen(r->buf);
|
||||||
|
}
|
||||||
|
|
||||||
/* Emit a reply when there is one. */
|
/* Emit a reply when there is one. */
|
||||||
if (r->ridx == -1) {
|
if (r->ridx == -1) {
|
||||||
if (reply != NULL)
|
if (reply != NULL)
|
||||||
@ -680,17 +663,6 @@ int redisReaderGetReply(redisReader *r, void **reply) {
|
|||||||
return REDIS_OK;
|
return REDIS_OK;
|
||||||
}
|
}
|
||||||
|
|
||||||
const char *redisReaderGetRaw(redisReader *r, size_t *len) {
|
|
||||||
/* ridx == -1: No or a full reply has been read. */
|
|
||||||
/* pos > roff: Buffer position is larger than start offset, meaning
|
|
||||||
* the buffer has not yet been truncated. */
|
|
||||||
if (r->ridx == -1 && r->pos > r->roff) {
|
|
||||||
if (len) *len = (r->pos-r->roff);
|
|
||||||
return r->buf+r->roff;
|
|
||||||
}
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
/* Calculate the number of bytes needed to represent an integer as string. */
|
/* Calculate the number of bytes needed to represent an integer as string. */
|
||||||
static int intlen(int i) {
|
static int intlen(int i) {
|
||||||
int len = 0;
|
int len = 0;
|
||||||
|
@ -98,11 +98,6 @@ typedef struct redisReply {
|
|||||||
} redisReply;
|
} redisReply;
|
||||||
|
|
||||||
typedef struct redisReadTask {
|
typedef struct redisReadTask {
|
||||||
size_t poff; /* Protocol offset */
|
|
||||||
size_t plen; /* Protocol length */
|
|
||||||
size_t coff; /* Content offset */
|
|
||||||
size_t clen; /* Content length */
|
|
||||||
|
|
||||||
int type;
|
int type;
|
||||||
int elements; /* number of elements in multibulk container */
|
int elements; /* number of elements in multibulk container */
|
||||||
int idx; /* index in parent (array) object */
|
int idx; /* index in parent (array) object */
|
||||||
@ -127,7 +122,6 @@ typedef struct redisReader {
|
|||||||
char *buf; /* Read buffer */
|
char *buf; /* Read buffer */
|
||||||
size_t pos; /* Buffer cursor */
|
size_t pos; /* Buffer cursor */
|
||||||
size_t len; /* Buffer length */
|
size_t len; /* Buffer length */
|
||||||
size_t roff; /* Reply offset */
|
|
||||||
|
|
||||||
redisReadTask rstack[3];
|
redisReadTask rstack[3];
|
||||||
int ridx; /* Index of current read task */
|
int ridx; /* Index of current read task */
|
||||||
@ -142,7 +136,6 @@ redisReader *redisReaderCreate(void);
|
|||||||
void redisReaderFree(redisReader *r);
|
void redisReaderFree(redisReader *r);
|
||||||
int redisReaderFeed(redisReader *r, const char *buf, size_t len);
|
int redisReaderFeed(redisReader *r, const char *buf, size_t len);
|
||||||
int redisReaderGetReply(redisReader *r, void **reply);
|
int redisReaderGetReply(redisReader *r, void **reply);
|
||||||
const char *redisReaderGetRaw(redisReader *r, size_t *len);
|
|
||||||
|
|
||||||
/* Backwards compatibility, can be removed on big version bump. */
|
/* Backwards compatibility, can be removed on big version bump. */
|
||||||
#define redisReplyReaderCreate redisReaderCreate
|
#define redisReplyReaderCreate redisReaderCreate
|
||||||
|
93
test.c
93
test.c
@ -262,98 +262,6 @@ static void test_reply_reader(void) {
|
|||||||
redisReaderFree(reader);
|
redisReaderFree(reader);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void *test_create_string(const redisReadTask *task, char *str, size_t len) {
|
|
||||||
redisReader *r = (redisReader*)task->privdata;
|
|
||||||
const char *roff = r->buf+r->roff;
|
|
||||||
((void)str); ((void)len);
|
|
||||||
|
|
||||||
assert(task->plen > 0);
|
|
||||||
assert(task->clen > 0);
|
|
||||||
switch(task->type) {
|
|
||||||
case REDIS_REPLY_STATUS:
|
|
||||||
assert(strncmp("+status\r\n", roff+task->poff, task->plen) == 0);
|
|
||||||
assert(strncmp("status", roff+task->coff, task->clen) == 0);
|
|
||||||
break;
|
|
||||||
case REDIS_REPLY_ERROR:
|
|
||||||
assert(strncmp("-error\r\n", roff+task->poff, task->plen) == 0);
|
|
||||||
assert(strncmp("error", roff+task->coff, task->clen) == 0);
|
|
||||||
break;
|
|
||||||
case REDIS_REPLY_STRING: /* bulk */
|
|
||||||
assert(strncmp("$4\r\nbulk\r\n", roff+task->poff, task->plen) == 0);
|
|
||||||
assert(strncmp("bulk", roff+task->coff, task->clen) == 0);
|
|
||||||
break;
|
|
||||||
default:
|
|
||||||
assert(NULL);
|
|
||||||
}
|
|
||||||
return (void*)1;
|
|
||||||
}
|
|
||||||
|
|
||||||
static void *test_create_array(const redisReadTask *task, int len) {
|
|
||||||
redisReader *r = (redisReader*)task->privdata;
|
|
||||||
const char *roff = r->buf+r->roff;
|
|
||||||
((void)len);
|
|
||||||
|
|
||||||
assert(task->plen > 0);
|
|
||||||
assert(task->clen == 0);
|
|
||||||
assert(strncmp("*5\r\n", roff+task->poff, task->plen) == 0);
|
|
||||||
return (void*)1;
|
|
||||||
}
|
|
||||||
|
|
||||||
static void *test_create_integer(const redisReadTask *task, long long value) {
|
|
||||||
redisReader *r = (redisReader*)task->privdata;
|
|
||||||
const char *roff = r->buf+r->roff;
|
|
||||||
((void)value);
|
|
||||||
|
|
||||||
assert(task->plen > 0);
|
|
||||||
assert(task->clen > 0);
|
|
||||||
assert(strncmp(":1234\r\n", roff+task->poff, task->plen) == 0);
|
|
||||||
assert(strncmp("1234", roff+task->coff, task->clen) == 0);
|
|
||||||
return (void*)1;
|
|
||||||
}
|
|
||||||
|
|
||||||
static void *test_create_nil(const redisReadTask *task) {
|
|
||||||
redisReader *r = (redisReader*)task->privdata;
|
|
||||||
const char *roff = r->buf+r->roff;
|
|
||||||
|
|
||||||
assert(task->plen > 0);
|
|
||||||
assert(task->clen == 0);
|
|
||||||
assert(strncmp("$-1\r\n", roff+task->poff, task->plen) == 0);
|
|
||||||
return (void*)1;
|
|
||||||
}
|
|
||||||
|
|
||||||
static redisReplyObjectFunctions test_reader_fn = {
|
|
||||||
test_create_string,
|
|
||||||
test_create_array,
|
|
||||||
test_create_integer,
|
|
||||||
test_create_nil,
|
|
||||||
NULL
|
|
||||||
};
|
|
||||||
|
|
||||||
static void test_reader_functions(void) {
|
|
||||||
redisReader *reader;
|
|
||||||
const char *input;
|
|
||||||
int ret;
|
|
||||||
void *obj;
|
|
||||||
|
|
||||||
input =
|
|
||||||
"*5\r\n"
|
|
||||||
"$-1\r\n"
|
|
||||||
":1234\r\n"
|
|
||||||
"+status\r\n"
|
|
||||||
"-error\r\n"
|
|
||||||
"$4\r\nbulk\r\n";
|
|
||||||
|
|
||||||
test("Custom object functions in reply reader: ");
|
|
||||||
reader = redisReaderCreate();
|
|
||||||
reader->fn = &test_reader_fn;
|
|
||||||
reader->privdata = reader;
|
|
||||||
|
|
||||||
redisReaderFeed(reader,input,strlen(input));
|
|
||||||
ret = redisReaderGetReply(reader,&obj);
|
|
||||||
test_cond(ret == REDIS_OK && obj == (void*)1);
|
|
||||||
redisReaderFree(reader);
|
|
||||||
}
|
|
||||||
|
|
||||||
static void test_blocking_connection_errors(void) {
|
static void test_blocking_connection_errors(void) {
|
||||||
redisContext *c;
|
redisContext *c;
|
||||||
|
|
||||||
@ -706,7 +614,6 @@ int main(int argc, char **argv) {
|
|||||||
|
|
||||||
test_format_commands();
|
test_format_commands();
|
||||||
test_reply_reader();
|
test_reply_reader();
|
||||||
test_reader_functions();
|
|
||||||
test_blocking_connection_errors();
|
test_blocking_connection_errors();
|
||||||
|
|
||||||
printf("\nTesting against TCP connection (%s:%d):\n", cfg.tcp.host, cfg.tcp.port);
|
printf("\nTesting against TCP connection (%s:%d):\n", cfg.tcp.host, cfg.tcp.port);
|
||||||
|
Loading…
Reference in New Issue
Block a user