From 12725f88ed2f77ee7a003458906c34dc20511e0e Mon Sep 17 00:00:00 2001 From: Pieter Noordhuis Date: Tue, 7 Dec 2010 10:22:30 +0100 Subject: [PATCH] Fire onConnect callback on the first write event --- async.c | 27 ++++++++++++++++++++++++++- async.h | 7 ++++++- example-ae.c | 7 +++++++ example-libev.c | 7 +++++++ example-libevent.c | 7 +++++++ 5 files changed, 53 insertions(+), 2 deletions(-) diff --git a/async.c b/async.c index 1dcb04e..a425c42 100644 --- a/async.c +++ b/async.c @@ -38,16 +38,27 @@ void __redisAppendCommand(redisContext *c, char *cmd, size_t len); static redisAsyncContext *redisAsyncInitialize(redisContext *c) { redisAsyncContext *ac = realloc(c,sizeof(redisAsyncContext)); + c = &(ac->c); + + /* The regular connect functions will always set the flag REDIS_CONNECTED. + * For the async API, we want to wait until the first write event is + * received up before setting this flag, so reset it here. */ + c->flags &= ~REDIS_CONNECTED; + ac->err = 0; ac->errstr = NULL; ac->data = NULL; ac->_adapter_data = NULL; + ac->evAddRead = NULL; ac->evDelRead = NULL; ac->evAddWrite = NULL; ac->evDelWrite = NULL; ac->evCleanup = NULL; + + ac->onConnect = NULL; ac->onDisconnect = NULL; + ac->replies.head = NULL; ac->replies.tail = NULL; return ac; @@ -80,6 +91,14 @@ int redisAsyncSetReplyObjectFunctions(redisAsyncContext *ac, redisReplyObjectFun return redisSetReplyObjectFunctions(c,fn); } +int redisAsyncSetConnectCallback(redisAsyncContext *ac, redisConnectCallback *fn) { + if (ac->onConnect == NULL) { + ac->onConnect = fn; + return REDIS_OK; + } + return REDIS_ERR; +} + int redisAsyncSetDisconnectCallback(redisAsyncContext *ac, redisDisconnectCallback *fn) { if (ac->onDisconnect == NULL) { ac->onDisconnect = fn; @@ -235,8 +254,14 @@ void redisAsyncHandleWrite(redisAsyncContext *ac) { if (ac->evDelWrite) ac->evDelWrite(ac->_adapter_data); } - /* Always schedule reads when something was written */ + /* Always schedule reads after writes */ if (ac->evAddRead) ac->evAddRead(ac->_adapter_data); + + /* Fire onConnect when this is the first write event. */ + if (!(c->flags & REDIS_CONNECTED)) { + c->flags |= REDIS_CONNECTED; + if (ac->onConnect) ac->onConnect(ac); + } } } diff --git a/async.h b/async.h index 19c522b..2ef0e21 100644 --- a/async.h +++ b/async.h @@ -50,8 +50,9 @@ typedef struct redisCallbackList { redisCallback *head, *tail; } redisCallbackList; -/* Disconnect callback prototype */ +/* Connection callback prototypes */ typedef void (redisDisconnectCallback)(const struct redisAsyncContext*, int status); +typedef void (redisConnectCallback)(const struct redisAsyncContext*); /* Context for an async connection to Redis */ typedef struct redisAsyncContext { @@ -80,6 +81,9 @@ typedef struct redisAsyncContext { * user request. The status is set accordingly (REDIS_OK, REDIS_ERR). */ redisDisconnectCallback *onDisconnect; + /* Called when the first write event was received. */ + redisConnectCallback *onConnect; + /* Reply callbacks */ redisCallbackList replies; } redisAsyncContext; @@ -87,6 +91,7 @@ typedef struct redisAsyncContext { /* Functions that proxy to hiredis */ redisAsyncContext *redisAsyncConnect(const char *ip, int port); int redisAsyncSetReplyObjectFunctions(redisAsyncContext *ac, redisReplyObjectFunctions *fn); +int redisAsyncSetConnectCallback(redisAsyncContext *ac, redisConnectCallback *fn); int redisAsyncSetDisconnectCallback(redisAsyncContext *ac, redisDisconnectCallback *fn); void redisAsyncDisconnect(redisAsyncContext *ac); diff --git a/example-ae.c b/example-ae.c index fc471fb..28c34dc 100644 --- a/example-ae.c +++ b/example-ae.c @@ -18,10 +18,16 @@ void getCallback(redisAsyncContext *c, void *r, void *privdata) { redisAsyncDisconnect(c); } +void connectCallback(const redisAsyncContext *c) { + ((void)c); + printf("connected...\n"); +} + void disconnectCallback(const redisAsyncContext *c, int status) { if (status != REDIS_OK) { printf("Error: %s\n", c->errstr); } + printf("disconnected...\n"); aeStop(loop); } @@ -37,6 +43,7 @@ int main (int argc, char **argv) { loop = aeCreateEventLoop(); redisAeAttach(loop, c); + redisAsyncSetConnectCallback(c,connectCallback); redisAsyncSetDisconnectCallback(c,disconnectCallback); redisAsyncCommand(c, NULL, NULL, "SET key %b", argv[argc-1], strlen(argv[argc-1])); redisAsyncCommand(c, getCallback, (char*)"end-1", "GET key"); diff --git a/example-libev.c b/example-libev.c index ecf8c02..8efa1e3 100644 --- a/example-libev.c +++ b/example-libev.c @@ -15,10 +15,16 @@ void getCallback(redisAsyncContext *c, void *r, void *privdata) { redisAsyncDisconnect(c); } +void connectCallback(const redisAsyncContext *c) { + ((void)c); + printf("connected...\n"); +} + void disconnectCallback(const redisAsyncContext *c, int status) { if (status != REDIS_OK) { printf("Error: %s\n", c->errstr); } + printf("disconnected...\n"); } int main (int argc, char **argv) { @@ -32,6 +38,7 @@ int main (int argc, char **argv) { } redisLibevAttach(EV_DEFAULT_ c); + redisAsyncSetConnectCallback(c,connectCallback); redisAsyncSetDisconnectCallback(c,disconnectCallback); redisAsyncCommand(c, NULL, NULL, "SET key %b", argv[argc-1], strlen(argv[argc-1])); redisAsyncCommand(c, getCallback, (char*)"end-1", "GET key"); diff --git a/example-libevent.c b/example-libevent.c index c257bb6..f6f8c83 100644 --- a/example-libevent.c +++ b/example-libevent.c @@ -15,10 +15,16 @@ void getCallback(redisAsyncContext *c, void *r, void *privdata) { redisAsyncDisconnect(c); } +void connectCallback(const redisAsyncContext *c) { + ((void)c); + printf("connected...\n"); +} + void disconnectCallback(const redisAsyncContext *c, int status) { if (status != REDIS_OK) { printf("Error: %s\n", c->errstr); } + printf("disconnected...\n"); } int main (int argc, char **argv) { @@ -33,6 +39,7 @@ int main (int argc, char **argv) { } redisLibeventAttach(c,base); + redisAsyncSetConnectCallback(c,connectCallback); redisAsyncSetDisconnectCallback(c,disconnectCallback); redisAsyncCommand(c, NULL, NULL, "SET key %b", argv[argc-1], strlen(argv[argc-1])); redisAsyncCommand(c, getCallback, (char*)"end-1", "GET key");