read/write timeouts
This commit is contained in:
parent
e4a78006e7
commit
35a0a1f369
@ -36,48 +36,81 @@
|
|||||||
|
|
||||||
typedef struct redisLibeventEvents {
|
typedef struct redisLibeventEvents {
|
||||||
redisAsyncContext *context;
|
redisAsyncContext *context;
|
||||||
struct event *rev, *wev;
|
struct event *ev, *tmr;
|
||||||
|
struct event_base *base;
|
||||||
|
struct timeval tv;
|
||||||
|
short flags;
|
||||||
} redisLibeventEvents;
|
} redisLibeventEvents;
|
||||||
|
|
||||||
static void redisLibeventReadEvent(int fd, short event, void *arg) {
|
static void redisLibeventHandler(int fd, short event, void *arg) {
|
||||||
((void)fd); ((void)event);
|
((void)fd);
|
||||||
redisLibeventEvents *e = (redisLibeventEvents*)arg;
|
redisLibeventEvents *e = (redisLibeventEvents*)arg;
|
||||||
redisAsyncHandleRead(e->context);
|
if (event & EV_TIMEOUT) {
|
||||||
|
redisAsyncHandleTimeout(e->context);
|
||||||
|
}
|
||||||
|
if (e->context && (event & EV_READ)) {
|
||||||
|
redisAsyncHandleRead(e->context);
|
||||||
|
}
|
||||||
|
if (e->context && (event & EV_WRITE)) {
|
||||||
|
redisAsyncHandleWrite(e->context);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static void redisLibeventWriteEvent(int fd, short event, void *arg) {
|
static void redisLibeventUpdate(void *privdata, short flag, int isRemove) {
|
||||||
((void)fd); ((void)event);
|
redisLibeventEvents *e = (redisLibeventEvents *)privdata;
|
||||||
redisLibeventEvents *e = (redisLibeventEvents*)arg;
|
const struct timeval *tv = e->tv.tv_sec || e->tv.tv_usec ? &e->tv : NULL;
|
||||||
redisAsyncHandleWrite(e->context);
|
|
||||||
|
if (isRemove) {
|
||||||
|
if ((e->flags & flag) == 0) {
|
||||||
|
return;
|
||||||
|
} else {
|
||||||
|
e->flags &= ~flag;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if (e->flags & flag) {
|
||||||
|
return;
|
||||||
|
} else {
|
||||||
|
e->flags |= flag;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
event_del(e->ev);
|
||||||
|
event_assign(e->ev, e->base, e->context->c.fd, e->flags,
|
||||||
|
redisLibeventHandler, privdata);
|
||||||
|
event_add(e->ev, tv);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void redisLibeventAddRead(void *privdata) {
|
static void redisLibeventAddRead(void *privdata) {
|
||||||
redisLibeventEvents *e = (redisLibeventEvents*)privdata;
|
redisLibeventUpdate(privdata, EV_READ, 0);
|
||||||
event_add(e->rev,NULL);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static void redisLibeventDelRead(void *privdata) {
|
static void redisLibeventDelRead(void *privdata) {
|
||||||
redisLibeventEvents *e = (redisLibeventEvents*)privdata;
|
redisLibeventUpdate(privdata, EV_READ, 1);
|
||||||
event_del(e->rev);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static void redisLibeventAddWrite(void *privdata) {
|
static void redisLibeventAddWrite(void *privdata) {
|
||||||
redisLibeventEvents *e = (redisLibeventEvents*)privdata;
|
redisLibeventUpdate(privdata, EV_WRITE, 0);
|
||||||
event_add(e->wev,NULL);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static void redisLibeventDelWrite(void *privdata) {
|
static void redisLibeventDelWrite(void *privdata) {
|
||||||
redisLibeventEvents *e = (redisLibeventEvents*)privdata;
|
redisLibeventUpdate(privdata, EV_WRITE, 1);
|
||||||
event_del(e->wev);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static void redisLibeventCleanup(void *privdata) {
|
static void redisLibeventCleanup(void *privdata) {
|
||||||
redisLibeventEvents *e = (redisLibeventEvents*)privdata;
|
redisLibeventEvents *e = (redisLibeventEvents*)privdata;
|
||||||
event_free(e->rev);
|
event_free(e->ev);
|
||||||
event_free(e->wev);
|
|
||||||
free(e);
|
free(e);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void redisLibeventSetTimeout(void *privdata, struct timeval tv) {
|
||||||
|
redisLibeventEvents *e = (redisLibeventEvents *)privdata;
|
||||||
|
short flags = e->flags;
|
||||||
|
e->flags = 0;
|
||||||
|
e->tv = tv;
|
||||||
|
event_del(e->ev);
|
||||||
|
redisLibeventUpdate(e, flags, 0);
|
||||||
|
}
|
||||||
|
|
||||||
static int redisLibeventAttach(redisAsyncContext *ac, struct event_base *base) {
|
static int redisLibeventAttach(redisAsyncContext *ac, struct event_base *base) {
|
||||||
redisContext *c = &(ac->c);
|
redisContext *c = &(ac->c);
|
||||||
redisLibeventEvents *e;
|
redisLibeventEvents *e;
|
||||||
@ -96,13 +129,15 @@ static int redisLibeventAttach(redisAsyncContext *ac, struct event_base *base) {
|
|||||||
ac->ev.addWrite = redisLibeventAddWrite;
|
ac->ev.addWrite = redisLibeventAddWrite;
|
||||||
ac->ev.delWrite = redisLibeventDelWrite;
|
ac->ev.delWrite = redisLibeventDelWrite;
|
||||||
ac->ev.cleanup = redisLibeventCleanup;
|
ac->ev.cleanup = redisLibeventCleanup;
|
||||||
|
ac->ev.scheduleTimer = redisLibeventSetTimeout;
|
||||||
ac->ev.data = e;
|
ac->ev.data = e;
|
||||||
|
|
||||||
/* Initialize and install read/write events */
|
/* Initialize and install read/write events */
|
||||||
e->rev = event_new(base, c->fd, EV_READ, redisLibeventReadEvent, e);
|
e->ev = event_new(base, c->fd, EV_READ | EV_WRITE, redisLibeventHandler, e);
|
||||||
e->wev = event_new(base, c->fd, EV_WRITE, redisLibeventWriteEvent, e);
|
e->flags = 0;
|
||||||
event_add(e->rev, NULL);
|
e->base = base;
|
||||||
event_add(e->wev, NULL);
|
e->tv.tv_sec = 0;
|
||||||
|
e->tv.tv_usec = 0;
|
||||||
return REDIS_OK;
|
return REDIS_OK;
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
62
async.c
62
async.c
@ -42,15 +42,19 @@
|
|||||||
#include "sds.h"
|
#include "sds.h"
|
||||||
#include "sslio.h"
|
#include "sslio.h"
|
||||||
|
|
||||||
#define _EL_ADD_READ(ctx) do { \
|
#define _EL_ADD_READ(ctx) \
|
||||||
|
do { \
|
||||||
|
refreshTimeout(ctx); \
|
||||||
if ((ctx)->ev.addRead) (ctx)->ev.addRead((ctx)->ev.data); \
|
if ((ctx)->ev.addRead) (ctx)->ev.addRead((ctx)->ev.data); \
|
||||||
} while(0)
|
} while (0)
|
||||||
#define _EL_DEL_READ(ctx) do { \
|
#define _EL_DEL_READ(ctx) do { \
|
||||||
if ((ctx)->ev.delRead) (ctx)->ev.delRead((ctx)->ev.data); \
|
if ((ctx)->ev.delRead) (ctx)->ev.delRead((ctx)->ev.data); \
|
||||||
} while(0)
|
} while(0)
|
||||||
#define _EL_ADD_WRITE(ctx) do { \
|
#define _EL_ADD_WRITE(ctx) \
|
||||||
|
do { \
|
||||||
|
refreshTimeout(ctx); \
|
||||||
if ((ctx)->ev.addWrite) (ctx)->ev.addWrite((ctx)->ev.data); \
|
if ((ctx)->ev.addWrite) (ctx)->ev.addWrite((ctx)->ev.data); \
|
||||||
} while(0)
|
} while (0)
|
||||||
#define _EL_DEL_WRITE(ctx) do { \
|
#define _EL_DEL_WRITE(ctx) do { \
|
||||||
if ((ctx)->ev.delWrite) (ctx)->ev.delWrite((ctx)->ev.data); \
|
if ((ctx)->ev.delWrite) (ctx)->ev.delWrite((ctx)->ev.data); \
|
||||||
} while(0)
|
} while(0)
|
||||||
@ -58,6 +62,19 @@
|
|||||||
if ((ctx)->ev.cleanup) (ctx)->ev.cleanup((ctx)->ev.data); \
|
if ((ctx)->ev.cleanup) (ctx)->ev.cleanup((ctx)->ev.data); \
|
||||||
} while(0);
|
} while(0);
|
||||||
|
|
||||||
|
static void refreshTimeout(redisAsyncContext *ctx) {
|
||||||
|
if (ctx->c.timeout && ctx->ev.scheduleTimer &&
|
||||||
|
(ctx->c.timeout->tv_sec || ctx->c.timeout->tv_usec)) {
|
||||||
|
ctx->ev.scheduleTimer(ctx->ev.data, *ctx->c.timeout);
|
||||||
|
// } else {
|
||||||
|
// printf("Not scheduling timer.. (tmo=%p)\n", ctx->c.timeout);
|
||||||
|
// if (ctx->c.timeout){
|
||||||
|
// printf("tv_sec: %u. tv_usec: %u\n", ctx->c.timeout->tv_sec,
|
||||||
|
// ctx->c.timeout->tv_usec);
|
||||||
|
// }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/* Forward declaration of function in hiredis.c */
|
/* Forward declaration of function in hiredis.c */
|
||||||
int __redisAppendCommand(redisContext *c, const char *cmd, size_t len);
|
int __redisAppendCommand(redisContext *c, const char *cmd, size_t len);
|
||||||
|
|
||||||
@ -656,6 +673,30 @@ void redisAsyncHandleWrite(redisAsyncContext *ac) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void __redisSetError(redisContext *c, int type, const char *str);
|
||||||
|
|
||||||
|
void redisAsyncHandleTimeout(redisAsyncContext *ac) {
|
||||||
|
redisContext *c = &(ac->c);
|
||||||
|
redisCallback cb;
|
||||||
|
|
||||||
|
if ((c->flags & REDIS_CONNECTED) && ac->replies.head == NULL) {
|
||||||
|
/* Nothing to do - just an idle timeout */
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!c->err) {
|
||||||
|
__redisSetError(c, REDIS_ERR_TIMEOUT, "Timeout");
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!(c->flags & REDIS_CONNECTED) && ac->onConnect) {
|
||||||
|
ac->onConnect(ac, REDIS_ERR);
|
||||||
|
}
|
||||||
|
|
||||||
|
while (__redisShiftCallback(&ac->replies, &cb) == REDIS_OK) {
|
||||||
|
__redisRunCallback(ac, &cb, NULL);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/* Sets a pointer to the first argument and its length starting at p. Returns
|
/* Sets a pointer to the first argument and its length starting at p. Returns
|
||||||
* the number of bytes to skip to get to the following argument. */
|
* the number of bytes to skip to get to the following argument. */
|
||||||
static const char *nextArgument(const char *start, const char **str, size_t *len) {
|
static const char *nextArgument(const char *start, const char **str, size_t *len) {
|
||||||
@ -795,3 +836,16 @@ int redisAsyncFormattedCommand(redisAsyncContext *ac, redisCallbackFn *fn, void
|
|||||||
int status = __redisAsyncCommand(ac,fn,privdata,cmd,len);
|
int status = __redisAsyncCommand(ac,fn,privdata,cmd,len);
|
||||||
return status;
|
return status;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void redisAsyncSetTimeout(redisAsyncContext *ac, struct timeval tv) {
|
||||||
|
if (!ac->c.timeout) {
|
||||||
|
ac->c.timeout = calloc(1, sizeof(tv));
|
||||||
|
}
|
||||||
|
|
||||||
|
if (tv.tv_sec == ac->c.timeout->tv_sec &&
|
||||||
|
tv.tv_usec == ac->c.timeout->tv_usec) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
*ac->c.timeout = tv;
|
||||||
|
}
|
5
async.h
5
async.h
@ -57,6 +57,7 @@ typedef struct redisCallbackList {
|
|||||||
/* Connection callback prototypes */
|
/* Connection callback prototypes */
|
||||||
typedef void (redisDisconnectCallback)(const struct redisAsyncContext*, int status);
|
typedef void (redisDisconnectCallback)(const struct redisAsyncContext*, int status);
|
||||||
typedef void (redisConnectCallback)(const struct redisAsyncContext*, int status);
|
typedef void (redisConnectCallback)(const struct redisAsyncContext*, int status);
|
||||||
|
typedef void(redisTimerCallback)(void *timer, void *privdata);
|
||||||
|
|
||||||
/* Context for an async connection to Redis */
|
/* Context for an async connection to Redis */
|
||||||
typedef struct redisAsyncContext {
|
typedef struct redisAsyncContext {
|
||||||
@ -81,6 +82,7 @@ typedef struct redisAsyncContext {
|
|||||||
void (*addWrite)(void *privdata);
|
void (*addWrite)(void *privdata);
|
||||||
void (*delWrite)(void *privdata);
|
void (*delWrite)(void *privdata);
|
||||||
void (*cleanup)(void *privdata);
|
void (*cleanup)(void *privdata);
|
||||||
|
void (*scheduleTimer)(void *privdata, struct timeval tv);
|
||||||
} ev;
|
} ev;
|
||||||
|
|
||||||
/* Called when either the connection is terminated due to an error or per
|
/* Called when either the connection is terminated due to an error or per
|
||||||
@ -113,12 +115,15 @@ redisAsyncContext *redisAsyncConnectBindWithReuse(const char *ip, int port,
|
|||||||
redisAsyncContext *redisAsyncConnectUnix(const char *path);
|
redisAsyncContext *redisAsyncConnectUnix(const char *path);
|
||||||
int redisAsyncSetConnectCallback(redisAsyncContext *ac, redisConnectCallback *fn);
|
int redisAsyncSetConnectCallback(redisAsyncContext *ac, redisConnectCallback *fn);
|
||||||
int redisAsyncSetDisconnectCallback(redisAsyncContext *ac, redisDisconnectCallback *fn);
|
int redisAsyncSetDisconnectCallback(redisAsyncContext *ac, redisDisconnectCallback *fn);
|
||||||
|
|
||||||
|
void redisAsyncSetTimeout(redisAsyncContext *ac, struct timeval tv);
|
||||||
void redisAsyncDisconnect(redisAsyncContext *ac);
|
void redisAsyncDisconnect(redisAsyncContext *ac);
|
||||||
void redisAsyncFree(redisAsyncContext *ac);
|
void redisAsyncFree(redisAsyncContext *ac);
|
||||||
|
|
||||||
/* Handle read/write events */
|
/* Handle read/write events */
|
||||||
void redisAsyncHandleRead(redisAsyncContext *ac);
|
void redisAsyncHandleRead(redisAsyncContext *ac);
|
||||||
void redisAsyncHandleWrite(redisAsyncContext *ac);
|
void redisAsyncHandleWrite(redisAsyncContext *ac);
|
||||||
|
void redisAsyncHandleTimeout(redisAsyncContext *ac);
|
||||||
|
|
||||||
/* Command functions for an async context. Write the command to the
|
/* Command functions for an async context. Write the command to the
|
||||||
* output buffer and register the provided callback. */
|
* output buffer and register the provided callback. */
|
||||||
|
1
read.h
1
read.h
@ -45,6 +45,7 @@
|
|||||||
#define REDIS_ERR_EOF 3 /* End of file */
|
#define REDIS_ERR_EOF 3 /* End of file */
|
||||||
#define REDIS_ERR_PROTOCOL 4 /* Protocol error */
|
#define REDIS_ERR_PROTOCOL 4 /* Protocol error */
|
||||||
#define REDIS_ERR_OOM 5 /* Out of memory */
|
#define REDIS_ERR_OOM 5 /* Out of memory */
|
||||||
|
#define REDIS_ERR_TIMEOUT 6 /* Timed out */
|
||||||
#define REDIS_ERR_OTHER 2 /* Everything else... */
|
#define REDIS_ERR_OTHER 2 /* Everything else... */
|
||||||
|
|
||||||
#define REDIS_REPLY_STRING 1
|
#define REDIS_REPLY_STRING 1
|
||||||
|
Loading…
Reference in New Issue
Block a user