Fire onConnect callback on the first write event
This commit is contained in:
parent
822efe2ac3
commit
12725f88ed
27
async.c
27
async.c
@ -38,16 +38,27 @@ void __redisAppendCommand(redisContext *c, char *cmd, size_t len);
|
|||||||
|
|
||||||
static redisAsyncContext *redisAsyncInitialize(redisContext *c) {
|
static redisAsyncContext *redisAsyncInitialize(redisContext *c) {
|
||||||
redisAsyncContext *ac = realloc(c,sizeof(redisAsyncContext));
|
redisAsyncContext *ac = realloc(c,sizeof(redisAsyncContext));
|
||||||
|
c = &(ac->c);
|
||||||
|
|
||||||
|
/* The regular connect functions will always set the flag REDIS_CONNECTED.
|
||||||
|
* For the async API, we want to wait until the first write event is
|
||||||
|
* received up before setting this flag, so reset it here. */
|
||||||
|
c->flags &= ~REDIS_CONNECTED;
|
||||||
|
|
||||||
ac->err = 0;
|
ac->err = 0;
|
||||||
ac->errstr = NULL;
|
ac->errstr = NULL;
|
||||||
ac->data = NULL;
|
ac->data = NULL;
|
||||||
ac->_adapter_data = NULL;
|
ac->_adapter_data = NULL;
|
||||||
|
|
||||||
ac->evAddRead = NULL;
|
ac->evAddRead = NULL;
|
||||||
ac->evDelRead = NULL;
|
ac->evDelRead = NULL;
|
||||||
ac->evAddWrite = NULL;
|
ac->evAddWrite = NULL;
|
||||||
ac->evDelWrite = NULL;
|
ac->evDelWrite = NULL;
|
||||||
ac->evCleanup = NULL;
|
ac->evCleanup = NULL;
|
||||||
|
|
||||||
|
ac->onConnect = NULL;
|
||||||
ac->onDisconnect = NULL;
|
ac->onDisconnect = NULL;
|
||||||
|
|
||||||
ac->replies.head = NULL;
|
ac->replies.head = NULL;
|
||||||
ac->replies.tail = NULL;
|
ac->replies.tail = NULL;
|
||||||
return ac;
|
return ac;
|
||||||
@ -80,6 +91,14 @@ int redisAsyncSetReplyObjectFunctions(redisAsyncContext *ac, redisReplyObjectFun
|
|||||||
return redisSetReplyObjectFunctions(c,fn);
|
return redisSetReplyObjectFunctions(c,fn);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int redisAsyncSetConnectCallback(redisAsyncContext *ac, redisConnectCallback *fn) {
|
||||||
|
if (ac->onConnect == NULL) {
|
||||||
|
ac->onConnect = fn;
|
||||||
|
return REDIS_OK;
|
||||||
|
}
|
||||||
|
return REDIS_ERR;
|
||||||
|
}
|
||||||
|
|
||||||
int redisAsyncSetDisconnectCallback(redisAsyncContext *ac, redisDisconnectCallback *fn) {
|
int redisAsyncSetDisconnectCallback(redisAsyncContext *ac, redisDisconnectCallback *fn) {
|
||||||
if (ac->onDisconnect == NULL) {
|
if (ac->onDisconnect == NULL) {
|
||||||
ac->onDisconnect = fn;
|
ac->onDisconnect = fn;
|
||||||
@ -235,8 +254,14 @@ void redisAsyncHandleWrite(redisAsyncContext *ac) {
|
|||||||
if (ac->evDelWrite) ac->evDelWrite(ac->_adapter_data);
|
if (ac->evDelWrite) ac->evDelWrite(ac->_adapter_data);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Always schedule reads when something was written */
|
/* Always schedule reads after writes */
|
||||||
if (ac->evAddRead) ac->evAddRead(ac->_adapter_data);
|
if (ac->evAddRead) ac->evAddRead(ac->_adapter_data);
|
||||||
|
|
||||||
|
/* Fire onConnect when this is the first write event. */
|
||||||
|
if (!(c->flags & REDIS_CONNECTED)) {
|
||||||
|
c->flags |= REDIS_CONNECTED;
|
||||||
|
if (ac->onConnect) ac->onConnect(ac);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
7
async.h
7
async.h
@ -50,8 +50,9 @@ typedef struct redisCallbackList {
|
|||||||
redisCallback *head, *tail;
|
redisCallback *head, *tail;
|
||||||
} redisCallbackList;
|
} redisCallbackList;
|
||||||
|
|
||||||
/* Disconnect callback prototype */
|
/* Connection callback prototypes */
|
||||||
typedef void (redisDisconnectCallback)(const struct redisAsyncContext*, int status);
|
typedef void (redisDisconnectCallback)(const struct redisAsyncContext*, int status);
|
||||||
|
typedef void (redisConnectCallback)(const struct redisAsyncContext*);
|
||||||
|
|
||||||
/* Context for an async connection to Redis */
|
/* Context for an async connection to Redis */
|
||||||
typedef struct redisAsyncContext {
|
typedef struct redisAsyncContext {
|
||||||
@ -80,6 +81,9 @@ typedef struct redisAsyncContext {
|
|||||||
* user request. The status is set accordingly (REDIS_OK, REDIS_ERR). */
|
* user request. The status is set accordingly (REDIS_OK, REDIS_ERR). */
|
||||||
redisDisconnectCallback *onDisconnect;
|
redisDisconnectCallback *onDisconnect;
|
||||||
|
|
||||||
|
/* Called when the first write event was received. */
|
||||||
|
redisConnectCallback *onConnect;
|
||||||
|
|
||||||
/* Reply callbacks */
|
/* Reply callbacks */
|
||||||
redisCallbackList replies;
|
redisCallbackList replies;
|
||||||
} redisAsyncContext;
|
} redisAsyncContext;
|
||||||
@ -87,6 +91,7 @@ typedef struct redisAsyncContext {
|
|||||||
/* Functions that proxy to hiredis */
|
/* Functions that proxy to hiredis */
|
||||||
redisAsyncContext *redisAsyncConnect(const char *ip, int port);
|
redisAsyncContext *redisAsyncConnect(const char *ip, int port);
|
||||||
int redisAsyncSetReplyObjectFunctions(redisAsyncContext *ac, redisReplyObjectFunctions *fn);
|
int redisAsyncSetReplyObjectFunctions(redisAsyncContext *ac, redisReplyObjectFunctions *fn);
|
||||||
|
int redisAsyncSetConnectCallback(redisAsyncContext *ac, redisConnectCallback *fn);
|
||||||
int redisAsyncSetDisconnectCallback(redisAsyncContext *ac, redisDisconnectCallback *fn);
|
int redisAsyncSetDisconnectCallback(redisAsyncContext *ac, redisDisconnectCallback *fn);
|
||||||
void redisAsyncDisconnect(redisAsyncContext *ac);
|
void redisAsyncDisconnect(redisAsyncContext *ac);
|
||||||
|
|
||||||
|
@ -18,10 +18,16 @@ void getCallback(redisAsyncContext *c, void *r, void *privdata) {
|
|||||||
redisAsyncDisconnect(c);
|
redisAsyncDisconnect(c);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void connectCallback(const redisAsyncContext *c) {
|
||||||
|
((void)c);
|
||||||
|
printf("connected...\n");
|
||||||
|
}
|
||||||
|
|
||||||
void disconnectCallback(const redisAsyncContext *c, int status) {
|
void disconnectCallback(const redisAsyncContext *c, int status) {
|
||||||
if (status != REDIS_OK) {
|
if (status != REDIS_OK) {
|
||||||
printf("Error: %s\n", c->errstr);
|
printf("Error: %s\n", c->errstr);
|
||||||
}
|
}
|
||||||
|
printf("disconnected...\n");
|
||||||
aeStop(loop);
|
aeStop(loop);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -37,6 +43,7 @@ int main (int argc, char **argv) {
|
|||||||
|
|
||||||
loop = aeCreateEventLoop();
|
loop = aeCreateEventLoop();
|
||||||
redisAeAttach(loop, c);
|
redisAeAttach(loop, c);
|
||||||
|
redisAsyncSetConnectCallback(c,connectCallback);
|
||||||
redisAsyncSetDisconnectCallback(c,disconnectCallback);
|
redisAsyncSetDisconnectCallback(c,disconnectCallback);
|
||||||
redisAsyncCommand(c, NULL, NULL, "SET key %b", argv[argc-1], strlen(argv[argc-1]));
|
redisAsyncCommand(c, NULL, NULL, "SET key %b", argv[argc-1], strlen(argv[argc-1]));
|
||||||
redisAsyncCommand(c, getCallback, (char*)"end-1", "GET key");
|
redisAsyncCommand(c, getCallback, (char*)"end-1", "GET key");
|
||||||
|
@ -15,10 +15,16 @@ void getCallback(redisAsyncContext *c, void *r, void *privdata) {
|
|||||||
redisAsyncDisconnect(c);
|
redisAsyncDisconnect(c);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void connectCallback(const redisAsyncContext *c) {
|
||||||
|
((void)c);
|
||||||
|
printf("connected...\n");
|
||||||
|
}
|
||||||
|
|
||||||
void disconnectCallback(const redisAsyncContext *c, int status) {
|
void disconnectCallback(const redisAsyncContext *c, int status) {
|
||||||
if (status != REDIS_OK) {
|
if (status != REDIS_OK) {
|
||||||
printf("Error: %s\n", c->errstr);
|
printf("Error: %s\n", c->errstr);
|
||||||
}
|
}
|
||||||
|
printf("disconnected...\n");
|
||||||
}
|
}
|
||||||
|
|
||||||
int main (int argc, char **argv) {
|
int main (int argc, char **argv) {
|
||||||
@ -32,6 +38,7 @@ int main (int argc, char **argv) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
redisLibevAttach(EV_DEFAULT_ c);
|
redisLibevAttach(EV_DEFAULT_ c);
|
||||||
|
redisAsyncSetConnectCallback(c,connectCallback);
|
||||||
redisAsyncSetDisconnectCallback(c,disconnectCallback);
|
redisAsyncSetDisconnectCallback(c,disconnectCallback);
|
||||||
redisAsyncCommand(c, NULL, NULL, "SET key %b", argv[argc-1], strlen(argv[argc-1]));
|
redisAsyncCommand(c, NULL, NULL, "SET key %b", argv[argc-1], strlen(argv[argc-1]));
|
||||||
redisAsyncCommand(c, getCallback, (char*)"end-1", "GET key");
|
redisAsyncCommand(c, getCallback, (char*)"end-1", "GET key");
|
||||||
|
@ -15,10 +15,16 @@ void getCallback(redisAsyncContext *c, void *r, void *privdata) {
|
|||||||
redisAsyncDisconnect(c);
|
redisAsyncDisconnect(c);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void connectCallback(const redisAsyncContext *c) {
|
||||||
|
((void)c);
|
||||||
|
printf("connected...\n");
|
||||||
|
}
|
||||||
|
|
||||||
void disconnectCallback(const redisAsyncContext *c, int status) {
|
void disconnectCallback(const redisAsyncContext *c, int status) {
|
||||||
if (status != REDIS_OK) {
|
if (status != REDIS_OK) {
|
||||||
printf("Error: %s\n", c->errstr);
|
printf("Error: %s\n", c->errstr);
|
||||||
}
|
}
|
||||||
|
printf("disconnected...\n");
|
||||||
}
|
}
|
||||||
|
|
||||||
int main (int argc, char **argv) {
|
int main (int argc, char **argv) {
|
||||||
@ -33,6 +39,7 @@ int main (int argc, char **argv) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
redisLibeventAttach(c,base);
|
redisLibeventAttach(c,base);
|
||||||
|
redisAsyncSetConnectCallback(c,connectCallback);
|
||||||
redisAsyncSetDisconnectCallback(c,disconnectCallback);
|
redisAsyncSetDisconnectCallback(c,disconnectCallback);
|
||||||
redisAsyncCommand(c, NULL, NULL, "SET key %b", argv[argc-1], strlen(argv[argc-1]));
|
redisAsyncCommand(c, NULL, NULL, "SET key %b", argv[argc-1], strlen(argv[argc-1]));
|
||||||
redisAsyncCommand(c, getCallback, (char*)"end-1", "GET key");
|
redisAsyncCommand(c, getCallback, (char*)"end-1", "GET key");
|
||||||
|
Loading…
Reference in New Issue
Block a user