Test callback sequence in non-blocking context
This commit is contained in:
parent
e332a32b35
commit
f9596db90b
25
hiredis.c
25
hiredis.c
@ -676,15 +676,7 @@ int redisBufferRead(redisContext *c) {
|
||||
return REDIS_OK;
|
||||
}
|
||||
|
||||
static void redisPopCallback(redisContext *c) {
|
||||
if (c->cpos > 1) {
|
||||
memmove(&c->callbacks[0],&c->callbacks[1],(c->cpos-1)*sizeof(redisCallback));
|
||||
}
|
||||
c->cpos--;
|
||||
}
|
||||
|
||||
int redisGetReply(redisContext *c, void **reply) {
|
||||
redisPopCallback(c);
|
||||
if (redisReplyReaderGetReply(c->reader,reply) == REDIS_ERR) {
|
||||
/* Copy the (protocol) error from the reader to the context. */
|
||||
c->error = sdsnew(((redisReader*)c->reader)->error);
|
||||
@ -693,24 +685,35 @@ int redisGetReply(redisContext *c, void **reply) {
|
||||
return REDIS_OK;
|
||||
}
|
||||
|
||||
static void redisPopCallback(redisContext *c) {
|
||||
assert(c->cpos > 0);
|
||||
if (c->cpos > 1)
|
||||
memmove(&c->callbacks[0],&c->callbacks[1],(c->cpos-1)*sizeof(redisCallback));
|
||||
c->cpos--;
|
||||
}
|
||||
|
||||
int redisProcessCallbacks(redisContext *c) {
|
||||
void *reply = NULL;
|
||||
redisCallback cb;
|
||||
|
||||
do {
|
||||
/* Continue while there are callbacks */
|
||||
while(c->cpos > 0) {
|
||||
cb = c->callbacks[0];
|
||||
if (redisGetReply(c,&reply) == REDIS_ERR)
|
||||
return REDIS_ERR;
|
||||
|
||||
/* Fire callback when there is a reply. */
|
||||
if (reply != NULL) {
|
||||
redisPopCallback(c);
|
||||
if (cb.fn != NULL) {
|
||||
cb.fn(c,reply,cb.privdata);
|
||||
} else {
|
||||
c->fn->freeObject(reply);
|
||||
}
|
||||
} else {
|
||||
/* Stop trying */
|
||||
break;
|
||||
}
|
||||
} while (reply != NULL);
|
||||
}
|
||||
return REDIS_OK;
|
||||
}
|
||||
|
||||
|
32
test.c
32
test.c
@ -202,6 +202,15 @@ static void __test_callback(redisContext *c, const void *privdata) {
|
||||
__test_callback_flags |= (long)privdata;
|
||||
}
|
||||
|
||||
static long __test_reply_callback_flags = 0;
|
||||
static void __test_reply_callback(redisContext *c, redisReply *reply, const void *privdata) {
|
||||
((void)c);
|
||||
/* Shift to detect execution order */
|
||||
__test_reply_callback_flags <<= 8;
|
||||
__test_reply_callback_flags |= (long)privdata;
|
||||
freeReplyObject(reply);
|
||||
}
|
||||
|
||||
static void test_nonblocking_connection() {
|
||||
redisContext *c;
|
||||
int wdone = 0;
|
||||
@ -249,6 +258,29 @@ static void test_nonblocking_connection() {
|
||||
test_cond(redisBufferWrite(c,NULL) == REDIS_ERR &&
|
||||
strncmp(c->error,"write:",6) == 0);
|
||||
redisFree(c);
|
||||
|
||||
wdone = __test_reply_callback_flags = 0;
|
||||
test("Process callbacks in the right sequence: ");
|
||||
c = redisConnectNonBlock("127.0.0.1", 6379, NULL);
|
||||
redisCommandWithCallback(c,__test_reply_callback,(const void*)1,"PING");
|
||||
redisCommandWithCallback(c,__test_reply_callback,(const void*)2,"PING");
|
||||
redisCommandWithCallback(c,__test_reply_callback,(const void*)3,"PING");
|
||||
|
||||
/* Write output buffer */
|
||||
while(!wdone) {
|
||||
usleep(500);
|
||||
redisBufferWrite(c,&wdone);
|
||||
}
|
||||
|
||||
/* Read until at least one callback is executed (the 3 replies will
|
||||
* arrive in a single packet, causing all callbacks to be executed in
|
||||
* a single pass). */
|
||||
while(__test_reply_callback_flags == 0) {
|
||||
assert(redisBufferRead(c) == REDIS_OK);
|
||||
redisProcessCallbacks(c);
|
||||
}
|
||||
test_cond(__test_reply_callback_flags == 0x010203);
|
||||
redisFree(c);
|
||||
}
|
||||
|
||||
int main(void) {
|
||||
|
Loading…
Reference in New Issue
Block a user