diff --git a/hiredis.c b/hiredis.c index 585dfe8..052285b 100644 --- a/hiredis.c +++ b/hiredis.c @@ -676,15 +676,7 @@ int redisBufferRead(redisContext *c) { return REDIS_OK; } -static void redisPopCallback(redisContext *c) { - if (c->cpos > 1) { - memmove(&c->callbacks[0],&c->callbacks[1],(c->cpos-1)*sizeof(redisCallback)); - } - c->cpos--; -} - int redisGetReply(redisContext *c, void **reply) { - redisPopCallback(c); if (redisReplyReaderGetReply(c->reader,reply) == REDIS_ERR) { /* Copy the (protocol) error from the reader to the context. */ c->error = sdsnew(((redisReader*)c->reader)->error); @@ -693,24 +685,35 @@ int redisGetReply(redisContext *c, void **reply) { return REDIS_OK; } +static void redisPopCallback(redisContext *c) { + assert(c->cpos > 0); + if (c->cpos > 1) + memmove(&c->callbacks[0],&c->callbacks[1],(c->cpos-1)*sizeof(redisCallback)); + c->cpos--; +} + int redisProcessCallbacks(redisContext *c) { void *reply = NULL; redisCallback cb; - do { + /* Continue while there are callbacks */ + while(c->cpos > 0) { cb = c->callbacks[0]; if (redisGetReply(c,&reply) == REDIS_ERR) return REDIS_ERR; - /* Fire callback when there is a reply. */ if (reply != NULL) { + redisPopCallback(c); if (cb.fn != NULL) { cb.fn(c,reply,cb.privdata); } else { c->fn->freeObject(reply); } + } else { + /* Stop trying */ + break; } - } while (reply != NULL); + } return REDIS_OK; } diff --git a/test.c b/test.c index 2800616..b1a6ed6 100644 --- a/test.c +++ b/test.c @@ -202,6 +202,15 @@ static void __test_callback(redisContext *c, const void *privdata) { __test_callback_flags |= (long)privdata; } +static long __test_reply_callback_flags = 0; +static void __test_reply_callback(redisContext *c, redisReply *reply, const void *privdata) { + ((void)c); + /* Shift to detect execution order */ + __test_reply_callback_flags <<= 8; + __test_reply_callback_flags |= (long)privdata; + freeReplyObject(reply); +} + static void test_nonblocking_connection() { redisContext *c; int wdone = 0; @@ -249,6 +258,29 @@ static void test_nonblocking_connection() { test_cond(redisBufferWrite(c,NULL) == REDIS_ERR && strncmp(c->error,"write:",6) == 0); redisFree(c); + + wdone = __test_reply_callback_flags = 0; + test("Process callbacks in the right sequence: "); + c = redisConnectNonBlock("127.0.0.1", 6379, NULL); + redisCommandWithCallback(c,__test_reply_callback,(const void*)1,"PING"); + redisCommandWithCallback(c,__test_reply_callback,(const void*)2,"PING"); + redisCommandWithCallback(c,__test_reply_callback,(const void*)3,"PING"); + + /* Write output buffer */ + while(!wdone) { + usleep(500); + redisBufferWrite(c,&wdone); + } + + /* Read until at least one callback is executed (the 3 replies will + * arrive in a single packet, causing all callbacks to be executed in + * a single pass). */ + while(__test_reply_callback_flags == 0) { + assert(redisBufferRead(c) == REDIS_OK); + redisProcessCallbacks(c); + } + test_cond(__test_reply_callback_flags == 0x010203); + redisFree(c); } int main(void) {