Implement a reconnect method for the client context

Originally implemented by @abedra as part of #306.

In case a write or read times out, we force an error state, because we
can't guarantuee that the next read will get the right data.
Instead we need to reconnect to have a clean-state connection, which is
now easily possible with this method.
This commit is contained in:
Jan-Erik Rediger 2015-04-16 19:28:12 +02:00
parent b872919463
commit d9e0b0f6ab
4 changed files with 170 additions and 5 deletions

View File

@ -598,6 +598,10 @@ static redisContext *redisContextInit(void) {
c->errstr[0] = '\0';
c->obuf = sdsempty();
c->reader = redisReaderCreate();
c->tcp.host = NULL;
c->tcp.source_addr = NULL;
c->unix.path = NULL;
c->timeout = NULL;
if (c->obuf == NULL || c->reader == NULL) {
redisFree(c);
@ -616,6 +620,14 @@ void redisFree(redisContext *c) {
sdsfree(c->obuf);
if (c->reader != NULL)
redisReaderFree(c->reader);
if (c->tcp.host)
free(c->tcp.host);
if (c->tcp.source_addr)
free(c->tcp.source_addr);
if (c->unix.path)
free(c->unix.path);
if (c->timeout)
free(c->timeout);
free(c);
}
@ -626,6 +638,34 @@ int redisFreeKeepFd(redisContext *c) {
return fd;
}
int redisReconnect(redisContext *c) {
c->err = 0;
memset(c->errstr, '\0', strlen(c->errstr));
if (c->fd > 0) {
close(c->fd);
}
sdsfree(c->obuf);
redisReaderFree(c->reader);
c->obuf = sdsempty();
c->reader = redisReaderCreate();
if (c->connection_type == REDIS_CONN_TCP) {
return redisContextConnectBindTcp(c, c->tcp.host, c->tcp.port,
c->timeout, c->tcp.source_addr);
} else if (c->connection_type == REDIS_CONN_UNIX) {
return redisContextConnectUnix(c, c->unix.path, c->timeout);
} else {
/* Something bad happened here and shouldn't have. There isn't
enough information in the context to reconnect. */
__redisSetError(c,REDIS_ERR_OTHER,"Not enough information to reconnect");
}
return REDIS_ERR;
}
/* Connect to a Redis instance. On error the field error in the returned
* context will be set to the return value of the error function.
* When no set of reply functions is given, the default set will be used. */

View File

@ -128,6 +128,11 @@ int redisFormatSdsCommandArgv(sds *target, int argc, const char ** argv, const s
void redisFreeCommand(char *cmd);
void redisFreeSdsCommand(sds cmd);
enum redisConnectionType {
REDIS_CONN_TCP,
REDIS_CONN_UNIX,
};
/* Context for a connection to Redis */
typedef struct redisContext {
int err; /* Error flags, 0 when there is no error */
@ -136,6 +141,20 @@ typedef struct redisContext {
int flags;
char *obuf; /* Write buffer */
redisReader *reader; /* Protocol reader */
enum redisConnectionType connection_type;
struct timeval *timeout;
struct {
char *host;
char *source_addr;
int port;
} tcp;
struct {
char *path;
} unix;
} redisContext;
redisContext *redisConnect(const char *ip, int port);
@ -149,6 +168,7 @@ redisContext *redisConnectUnix(const char *path);
redisContext *redisConnectUnixWithTimeout(const char *path, const struct timeval tv);
redisContext *redisConnectUnixNonBlock(const char *path);
redisContext *redisConnectFd(int fd);
int redisReconnect(redisContext *c);
int redisSetTimeout(redisContext *c, const struct timeval tv);
int redisEnableKeepAlive(redisContext *c);
void redisFree(redisContext *c);

66
net.c
View File

@ -47,6 +47,7 @@
#include <stdio.h>
#include <poll.h>
#include <limits.h>
#include <stdlib.h>
#include "net.h"
#include "sds.h"
@ -263,6 +264,44 @@ static int _redisContextConnectTcp(redisContext *c, const char *addr, int port,
int reuseaddr = (c->flags & REDIS_REUSEADDR);
int reuses = 0;
c->connection_type = REDIS_CONN_TCP;
c->tcp.port = port;
/* We need to take possession of the passed parameters
* to make them reusable for a reconnect.
* We also carefully check we don't free data we already own,
* as in the case of the reconnect method.
*
* This is a bit ugly, but atleast it works and doesn't leak memory.
**/
if (c->tcp.host != addr) {
if (c->tcp.host)
free(c->tcp.host);
c->tcp.host = strdup(addr);
}
if (timeout) {
if (c->timeout != timeout) {
if (c->timeout == NULL)
c->timeout = malloc(sizeof(struct timeval));
memcpy(c->timeout, timeout, sizeof(struct timeval));
}
} else {
if (c->timeout)
free(c->timeout);
c->timeout = NULL;
}
if (source_addr == NULL) {
free(c->tcp.source_addr);
c->tcp.source_addr = NULL;
} else if (c->tcp.source_addr != source_addr) {
free(c->tcp.source_addr);
c->tcp.source_addr = strdup(source_addr);
}
snprintf(_port, 6, "%d", port);
memset(&hints,0,sizeof(hints));
hints.ai_family = AF_INET;
@ -273,7 +312,7 @@ static int _redisContextConnectTcp(redisContext *c, const char *addr, int port,
* as this would add latency to every connect. Otherwise a more sensible
* route could be: Use IPv6 if both addresses are available and there is IPv6
* connectivity. */
if ((rv = getaddrinfo(addr,_port,&hints,&servinfo)) != 0) {
if ((rv = getaddrinfo(c->tcp.host,_port,&hints,&servinfo)) != 0) {
hints.ai_family = AF_INET6;
if ((rv = getaddrinfo(addr,_port,&hints,&servinfo)) != 0) {
__redisSetError(c,REDIS_ERR_OTHER,gai_strerror(rv));
@ -288,10 +327,10 @@ addrretry:
c->fd = s;
if (redisSetBlocking(c,0) != REDIS_OK)
goto error;
if (source_addr) {
if (c->tcp.source_addr) {
int bound = 0;
/* Using getaddrinfo saves us from self-determining IPv4 vs IPv6 */
if ((rv = getaddrinfo(source_addr, NULL, &hints, &bservinfo)) != 0) {
if ((rv = getaddrinfo(c->tcp.source_addr, NULL, &hints, &bservinfo)) != 0) {
char buf[128];
snprintf(buf,sizeof(buf),"Can't get addr: %s",gai_strerror(rv));
__redisSetError(c,REDIS_ERR_OTHER,buf);
@ -333,7 +372,7 @@ addrretry:
goto addrretry;
}
} else {
if (redisContextWaitReady(c,timeout) != REDIS_OK)
if (redisContextWaitReady(c,c->timeout) != REDIS_OK)
goto error;
}
}
@ -380,13 +419,30 @@ int redisContextConnectUnix(redisContext *c, const char *path, const struct time
if (redisSetBlocking(c,0) != REDIS_OK)
return REDIS_ERR;
c->connection_type = REDIS_CONN_UNIX;
if (c->unix.path != path)
c->unix.path = strdup(path);
if (timeout) {
if (c->timeout != timeout) {
if (c->timeout == NULL)
c->timeout = malloc(sizeof(struct timeval));
memcpy(c->timeout, timeout, sizeof(struct timeval));
}
} else {
if (c->timeout)
free(c->timeout);
c->timeout = NULL;
}
sa.sun_family = AF_LOCAL;
strncpy(sa.sun_path,path,sizeof(sa.sun_path)-1);
if (connect(c->fd, (struct sockaddr*)&sa, sizeof(sa)) == -1) {
if (errno == EINPROGRESS && !blocking) {
/* This is ok. */
} else {
if (redisContextWaitReady(c,timeout) != REDIS_OK)
if (redisContextWaitReady(c,c->timeout) != REDIS_OK)
return REDIS_ERR;
}
}

49
test.c
View File

@ -11,6 +11,7 @@
#include <limits.h>
#include "hiredis.h"
#include "net.h"
enum connection_type {
CONN_TCP,
@ -443,6 +444,52 @@ static void test_blocking_connection(struct config config) {
disconnect(c, 0);
}
static void test_blocking_connection_timeouts(struct config config) {
redisContext *c;
redisReply *reply;
ssize_t s;
const char *cmd = "DEBUG SLEEP 3\r\n";
struct timeval tv;
c = connect(config);
test("Successfully completes a command when the timeout is not exceeded: ");
reply = redisCommand(c,"SET foo fast");
freeReplyObject(reply);
tv.tv_sec = 0;
tv.tv_usec = 10000;
redisSetTimeout(c, tv);
reply = redisCommand(c, "GET foo");
test_cond(reply != NULL && reply->type == REDIS_REPLY_STRING && memcmp(reply->str, "fast", 4) == 0);
freeReplyObject(reply);
disconnect(c, 0);
c = connect(config);
test("Does not return a reply when the command times out: ");
s = write(c->fd, cmd, strlen(cmd));
tv.tv_sec = 0;
tv.tv_usec = 10000;
redisSetTimeout(c, tv);
reply = redisCommand(c, "GET foo");
test_cond(s > 0 && reply == NULL && c->err == REDIS_ERR_IO && strcmp(c->errstr, "Resource temporarily unavailable") == 0);
freeReplyObject(reply);
test("Reconnect properly reconnects after a timeout: ");
redisReconnect(c);
reply = redisCommand(c, "PING");
test_cond(reply != NULL && reply->type == REDIS_REPLY_STATUS && strcmp(reply->str, "PONG") == 0);
freeReplyObject(reply);
test("Reconnect properly uses owned parameters: ");
config.tcp.host = "foo";
config.unix.path = "foo";
redisReconnect(c);
reply = redisCommand(c, "PING");
test_cond(reply != NULL && reply->type == REDIS_REPLY_STATUS && strcmp(reply->str, "PONG") == 0);
freeReplyObject(reply);
disconnect(c, 0);
}
static void test_blocking_io_errors(struct config config) {
redisContext *c;
redisReply *reply;
@ -729,6 +776,7 @@ int main(int argc, char **argv) {
printf("\nTesting against TCP connection (%s:%d):\n", cfg.tcp.host, cfg.tcp.port);
cfg.type = CONN_TCP;
test_blocking_connection(cfg);
test_blocking_connection_timeouts(cfg);
test_blocking_io_errors(cfg);
test_invalid_timeout_errors(cfg);
test_append_formatted_commands(cfg);
@ -737,6 +785,7 @@ int main(int argc, char **argv) {
printf("\nTesting against Unix socket connection (%s):\n", cfg.unix.path);
cfg.type = CONN_UNIX;
test_blocking_connection(cfg);
test_blocking_connection_timeouts(cfg);
test_blocking_io_errors(cfg);
if (throughput) test_throughput(cfg);