Remove possiblity of multiple close on same fd

With all the async connects and disconnects and error handling
going on in hiredis, we need to centralize how we close our fd
and set it so it doesn't get re-closed.  Prior to this commit,
sometimes we'd close(fd), run an async error handler, then
call close(fd) again.

To stop multiple closes, we now set fd to -1 after we free it,
but that requires not passing fd as an independent argument to
functions.

This commit moves all fd usage to c->fd.  Since the context
has a fd field and all functions receive the context, it makes
more sense to use the fd inside of c instead of passing along fd
as an independent argument.

Also, by only using c->fd, we can set c->fd after we close it to
signify we shouldn't re-close the same fd again.

This does change one semi-public interface function redisCheckSocketError()
to only take (context) instead of (context, fd).  A search on github
returned zero occasions of people using redisCheckSocketError()
outside of net.{c,h} in hiredis itself.

Commit inspired by the bug report at:
https://groups.google.com/forum/#!topic/redis-db/mQm46XkIPOY

Thanks go out to Thijs for trying high-frequency reconnects on
a host that isn't there.

Closes #230
This commit is contained in:
Matt Stancliff 2014-04-07 11:57:26 -04:00
parent 065e90557c
commit 7f0c7a29fd
3 changed files with 45 additions and 39 deletions

View File

@ -473,7 +473,7 @@ void redisProcessCallbacks(redisAsyncContext *ac) {
static int __redisAsyncHandleConnect(redisAsyncContext *ac) {
redisContext *c = &(ac->c);
if (redisCheckSocketError(c,c->fd) == REDIS_ERR) {
if (redisCheckSocketError(c) == REDIS_ERR) {
/* Try again later when connect(2) is still in progress. */
if (errno == EINPROGRESS)
return REDIS_OK;

80
net.c
View File

@ -54,6 +54,13 @@
/* Defined in hiredis.c */
void __redisSetError(redisContext *c, int type, const char *str);
static void redisContextCloseFd(redisContext *c) {
if (c && c->fd > 0) {
close(c->fd);
c->fd = -1;
}
}
static void __redisSetErrorFromErrno(redisContext *c, int type, const char *prefix) {
char buf[128] = { 0 };
size_t len = 0;
@ -64,11 +71,11 @@ static void __redisSetErrorFromErrno(redisContext *c, int type, const char *pref
__redisSetError(c,type,buf);
}
static int redisSetReuseAddr(redisContext *c, int fd) {
static int redisSetReuseAddr(redisContext *c) {
int on = 1;
if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) == -1) {
if (setsockopt(c->fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) == -1) {
__redisSetErrorFromErrno(c,REDIS_ERR_IO,NULL);
close(fd);
redisContextCloseFd(c);
return REDIS_ERR;
}
return REDIS_OK;
@ -80,23 +87,24 @@ static int redisCreateSocket(redisContext *c, int type) {
__redisSetErrorFromErrno(c,REDIS_ERR_IO,NULL);
return REDIS_ERR;
}
c->fd = s;
if (type == AF_INET) {
if (redisSetReuseAddr(c,s) == REDIS_ERR) {
if (redisSetReuseAddr(c) == REDIS_ERR) {
return REDIS_ERR;
}
}
return s;
return REDIS_OK;
}
static int redisSetBlocking(redisContext *c, int fd, int blocking) {
static int redisSetBlocking(redisContext *c, int blocking) {
int flags;
/* Set the socket nonblocking.
* Note that fcntl(2) for F_GETFL and F_SETFL can't be
* interrupted by a signal. */
if ((flags = fcntl(fd, F_GETFL)) == -1) {
if ((flags = fcntl(c->fd, F_GETFL)) == -1) {
__redisSetErrorFromErrno(c,REDIS_ERR_IO,"fcntl(F_GETFL)");
close(fd);
redisContextCloseFd(c);
return REDIS_ERR;
}
@ -105,9 +113,9 @@ static int redisSetBlocking(redisContext *c, int fd, int blocking) {
else
flags |= O_NONBLOCK;
if (fcntl(fd, F_SETFL, flags) == -1) {
if (fcntl(c->fd, F_SETFL, flags) == -1) {
__redisSetErrorFromErrno(c,REDIS_ERR_IO,"fcntl(F_SETFL)");
close(fd);
redisContextCloseFd(c);
return REDIS_ERR;
}
return REDIS_OK;
@ -152,11 +160,11 @@ int redisKeepAlive(redisContext *c, int interval) {
return REDIS_OK;
}
static int redisSetTcpNoDelay(redisContext *c, int fd) {
static int redisSetTcpNoDelay(redisContext *c) {
int yes = 1;
if (setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &yes, sizeof(yes)) == -1) {
if (setsockopt(c->fd, IPPROTO_TCP, TCP_NODELAY, &yes, sizeof(yes)) == -1) {
__redisSetErrorFromErrno(c,REDIS_ERR_IO,"setsockopt(TCP_NODELAY)");
close(fd);
redisContextCloseFd(c);
return REDIS_ERR;
}
return REDIS_OK;
@ -164,19 +172,19 @@ static int redisSetTcpNoDelay(redisContext *c, int fd) {
#define __MAX_MSEC (((LONG_MAX) - 999) / 1000)
static int redisContextWaitReady(redisContext *c, int fd, const struct timeval *timeout) {
static int redisContextWaitReady(redisContext *c, const struct timeval *timeout) {
struct pollfd wfd[1];
long msec;
msec = -1;
wfd[0].fd = fd;
wfd[0].fd = c->fd;
wfd[0].events = POLLOUT;
/* Only use timeout when not NULL. */
if (timeout != NULL) {
if (timeout->tv_usec > 1000000 || timeout->tv_sec > __MAX_MSEC) {
__redisSetErrorFromErrno(c, REDIS_ERR_IO, NULL);
close(fd);
redisContextCloseFd(c);
return REDIS_ERR;
}
@ -192,40 +200,40 @@ static int redisContextWaitReady(redisContext *c, int fd, const struct timeval *
if ((res = poll(wfd, 1, msec)) == -1) {
__redisSetErrorFromErrno(c, REDIS_ERR_IO, "poll(2)");
close(fd);
redisContextCloseFd(c);
return REDIS_ERR;
} else if (res == 0) {
errno = ETIMEDOUT;
__redisSetErrorFromErrno(c,REDIS_ERR_IO,NULL);
close(fd);
redisContextCloseFd(c);
return REDIS_ERR;
}
if (redisCheckSocketError(c, fd) != REDIS_OK)
if (redisCheckSocketError(c) != REDIS_OK)
return REDIS_ERR;
return REDIS_OK;
}
__redisSetErrorFromErrno(c,REDIS_ERR_IO,NULL);
close(fd);
redisContextCloseFd(c);
return REDIS_ERR;
}
int redisCheckSocketError(redisContext *c, int fd) {
int redisCheckSocketError(redisContext *c) {
int err = 0;
socklen_t errlen = sizeof(err);
if (getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &errlen) == -1) {
if (getsockopt(c->fd, SOL_SOCKET, SO_ERROR, &err, &errlen) == -1) {
__redisSetErrorFromErrno(c,REDIS_ERR_IO,"getsockopt(SO_ERROR)");
close(fd);
redisContextCloseFd(c);
return REDIS_ERR;
}
if (err) {
errno = err;
__redisSetErrorFromErrno(c,REDIS_ERR_IO,NULL);
close(fd);
redisContextCloseFd(c);
return REDIS_ERR;
}
@ -271,25 +279,25 @@ int redisContextConnectTcp(redisContext *c, const char *addr, int port, const st
if ((s = socket(p->ai_family,p->ai_socktype,p->ai_protocol)) == -1)
continue;
if (redisSetBlocking(c,s,0) != REDIS_OK)
c->fd = s;
if (redisSetBlocking(c,0) != REDIS_OK)
goto error;
if (connect(s,p->ai_addr,p->ai_addrlen) == -1) {
if (errno == EHOSTUNREACH) {
close(s);
redisContextCloseFd(c);
continue;
} else if (errno == EINPROGRESS && !blocking) {
/* This is ok. */
} else {
if (redisContextWaitReady(c,s,timeout) != REDIS_OK)
if (redisContextWaitReady(c,timeout) != REDIS_OK)
goto error;
}
}
if (blocking && redisSetBlocking(c,s,1) != REDIS_OK)
if (blocking && redisSetBlocking(c,1) != REDIS_OK)
goto error;
if (redisSetTcpNoDelay(c,s) != REDIS_OK)
if (redisSetTcpNoDelay(c) != REDIS_OK)
goto error;
c->fd = s;
c->flags |= REDIS_CONNECTED;
rv = REDIS_OK;
goto end;
@ -309,31 +317,29 @@ end:
}
int redisContextConnectUnix(redisContext *c, const char *path, const struct timeval *timeout) {
int s;
int blocking = (c->flags & REDIS_BLOCK);
struct sockaddr_un sa;
if ((s = redisCreateSocket(c,AF_LOCAL)) < 0)
if (redisCreateSocket(c,AF_LOCAL) < 0)
return REDIS_ERR;
if (redisSetBlocking(c,s,0) != REDIS_OK)
if (redisSetBlocking(c,0) != REDIS_OK)
return REDIS_ERR;
sa.sun_family = AF_LOCAL;
strncpy(sa.sun_path,path,sizeof(sa.sun_path)-1);
if (connect(s, (struct sockaddr*)&sa, sizeof(sa)) == -1) {
if (connect(c->fd, (struct sockaddr*)&sa, sizeof(sa)) == -1) {
if (errno == EINPROGRESS && !blocking) {
/* This is ok. */
} else {
if (redisContextWaitReady(c,s,timeout) != REDIS_OK)
if (redisContextWaitReady(c,timeout) != REDIS_OK)
return REDIS_ERR;
}
}
/* Reset socket to be blocking after connect(2). */
if (blocking && redisSetBlocking(c,s,1) != REDIS_OK)
if (blocking && redisSetBlocking(c,1) != REDIS_OK)
return REDIS_ERR;
c->fd = s;
c->flags |= REDIS_CONNECTED;
return REDIS_OK;
}

2
net.h
View File

@ -39,7 +39,7 @@
#define AF_LOCAL AF_UNIX
#endif
int redisCheckSocketError(redisContext *c, int fd);
int redisCheckSocketError(redisContext *c);
int redisContextSetTimeout(redisContext *c, const struct timeval tv);
int redisContextConnectTcp(redisContext *c, const char *addr, int port, const struct timeval *timeout);
int redisContextConnectUnix(redisContext *c, const char *path, const struct timeval *timeout);