diff --git a/src/Makefile b/src/Makefile index f948b2c..ade1381 100644 --- a/src/Makefile +++ b/src/Makefile @@ -37,9 +37,12 @@ alloc.xo: fmacros.h alloc.h dict.xo: fmacros.h alloc.h dict.h ae.xo: redismodule.h hiredis.xo:fmacros.h hiredis.h net.h sds.h alloc.h async.h +syncio.xo:redis-migrate.c +log.xo: log.h +rdbLoad.xo: rdbLoad.h redis-migrate.xo: redismodule.h -XO_LIBS=redis-migrate.xo hiredis.xo sds.xo ssl.xo read.xo alloc.xo dict.xo async.xo net.xo ae.xo +XO_LIBS=redis-migrate.xo hiredis.xo sds.xo ssl.xo read.xo alloc.xo dict.xo async.xo net.xo ae.xo syncio.xo log.xo rdbLoad.xo redis-migrate.so: $(XO_LIBS) $(LD) -o $@ $^ $(SHOBJ_LDFLAGS) $(LIBS) -lc diff --git a/src/ae.c b/src/ae.c index 8df57d5..d12189a 100644 --- a/src/ae.c +++ b/src/ae.c @@ -1,502 +1,25 @@ -#include -#include -#include -#include #include "ae.h" +#include +#include +int aeWait(int fd, int mask, long long milliseconds) { + struct pollfd pfd; + int retmask = 0, retval; -void redisAeReadEvent(aeEventLoop *el, int fd, void *privdata, int mask) { - ((void)el); ((void)fd); ((void)mask); + memset(&pfd, 0, sizeof(pfd)); + pfd.fd = fd; + if (mask & AE_READABLE) pfd.events |= POLLIN; + if (mask & AE_WRITABLE) pfd.events |= POLLOUT; - redisAeEvents *e = (redisAeEvents*)privdata; - redisAsyncHandleRead(e->context); -} - -void aeStop(aeEventLoop *eventLoop) { - eventLoop->stop = 1; -} - -void aeMain(aeEventLoop *eventLoop) { - eventLoop->stop = 0; - while (!eventLoop->stop) { - aeProcessEvents(eventLoop, AE_ALL_EVENTS| - AE_CALL_BEFORE_SLEEP| - AE_CALL_AFTER_SLEEP); - } -} - -monotime (*getMonotonicUs)(void) = NULL; - -/* Process time events */ -static int processTimeEvents(aeEventLoop *eventLoop) { - int processed = 0; - aeTimeEvent *te; - long long maxId; - - te = eventLoop->timeEventHead; - maxId = eventLoop->timeEventNextId-1; - monotime now = getMonotonicUs(); - while(te) { - long long id; - - /* Remove events scheduled for deletion. */ - if (te->id == AE_DELETED_EVENT_ID) { - aeTimeEvent *next = te->next; - /* If a reference exists for this timer event, - * don't free it. This is currently incremented - * for recursive timerProc calls */ - if (te->refcount) { - te = next; - continue; - } - if (te->prev) - te->prev->next = te->next; - else - eventLoop->timeEventHead = te->next; - if (te->next) - te->next->prev = te->prev; - if (te->finalizerProc) { - te->finalizerProc(eventLoop, te->clientData); - now = getMonotonicUs(); - } - hi_free(te); - te = next; - continue; - } - - /* Make sure we don't process time events created by time events in - * this iteration. Note that this check is currently useless: we always - * add new timers on the head, however if we change the implementation - * detail, this check may be useful again: we keep it here for future - * defense. */ - if (te->id > maxId) { - te = te->next; - continue; - } - - if (te->when <= now) { - int retval; - - id = te->id; - te->refcount++; - retval = te->timeProc(eventLoop, id, te->clientData); - te->refcount--; - processed++; - now = getMonotonicUs(); - if (retval != AE_NOMORE) { - te->when = now + retval * 1000; - } else { - te->id = AE_DELETED_EVENT_ID; - } - } - te = te->next; - } - return processed; -} - -static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) { - aeApiState *state = eventLoop->apidata; - int retval, numevents = 0; - - retval = epoll_wait(state->epfd,state->events,eventLoop->setsize, - tvp ? (tvp->tv_sec*1000 + (tvp->tv_usec + 999)/1000) : -1); - if (retval > 0) { - int j; - - numevents = retval; - for (j = 0; j < numevents; j++) { - int mask = 0; - struct epoll_event *e = state->events+j; - - if (e->events & EPOLLIN) mask |= AE_READABLE; - if (e->events & EPOLLOUT) mask |= AE_WRITABLE; - if (e->events & EPOLLERR) mask |= AE_WRITABLE|AE_READABLE; - if (e->events & EPOLLHUP) mask |= AE_WRITABLE|AE_READABLE; - eventLoop->fired[j].fd = e->data.fd; - eventLoop->fired[j].mask = mask; - } - } else if (retval == -1 && errno != EINTR) { - //panic("aeApiPoll: epoll_wait, %s", strerror(errno)); - } - - return numevents; -} - -static int64_t usUntilEarliestTimer(aeEventLoop *eventLoop) { - aeTimeEvent *te = eventLoop->timeEventHead; - if (te == NULL) return -1; - - aeTimeEvent *earliest = NULL; - while (te) { - if (!earliest || te->when < earliest->when) - earliest = te; - te = te->next; - } - - monotime now = getMonotonicUs(); - return (now >= earliest->when) ? 0 : earliest->when - now; -} - -/* Process every pending time event, then every pending file event - * (that may be registered by time event callbacks just processed). - * Without special flags the function sleeps until some file event - * fires, or when the next time event occurs (if any). - * - * If flags is 0, the function does nothing and returns. - * if flags has AE_ALL_EVENTS set, all the kind of events are processed. - * if flags has AE_FILE_EVENTS set, file events are processed. - * if flags has AE_TIME_EVENTS set, time events are processed. - * if flags has AE_DONT_WAIT set, the function returns ASAP once all - * the events that can be handled without a wait are processed. - * if flags has AE_CALL_AFTER_SLEEP set, the aftersleep callback is called. - * if flags has AE_CALL_BEFORE_SLEEP set, the beforesleep callback is called. - * - * The function returns the number of events processed. */ -int aeProcessEvents(aeEventLoop *eventLoop, int flags) -{ - int processed = 0, numevents; - - /* Nothing to do? return ASAP */ - if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0; - - /* Note that we want to call select() even if there are no - * file events to process as long as we want to process time - * events, in order to sleep until the next time event is ready - * to fire. */ - if (eventLoop->maxfd != -1 || - ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) { - int j; - struct timeval tv, *tvp; - int64_t usUntilTimer = -1; - - if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT)) - usUntilTimer = usUntilEarliestTimer(eventLoop); - - if (usUntilTimer >= 0) { - tv.tv_sec = usUntilTimer / 1000000; - tv.tv_usec = usUntilTimer % 1000000; - tvp = &tv; - } else { - /* If we have to check for events but need to return - * ASAP because of AE_DONT_WAIT we need to set the timeout - * to zero */ - if (flags & AE_DONT_WAIT) { - tv.tv_sec = tv.tv_usec = 0; - tvp = &tv; - } else { - /* Otherwise we can block */ - tvp = NULL; /* wait forever */ - } - } - - if (eventLoop->flags & AE_DONT_WAIT) { - tv.tv_sec = tv.tv_usec = 0; - tvp = &tv; - } - - if (eventLoop->beforesleep != NULL && flags & AE_CALL_BEFORE_SLEEP) - eventLoop->beforesleep(eventLoop); - - /* Call the multiplexing API, will return only on timeout or when - * some event fires. */ - numevents = aeApiPoll(eventLoop, tvp); - - /* After sleep callback. */ - if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP) - eventLoop->aftersleep(eventLoop); - - for (j = 0; j < numevents; j++) { - int fd = eventLoop->fired[j].fd; - aeFileEvent *fe = &eventLoop->events[fd]; - int mask = eventLoop->fired[j].mask; - int fired = 0; /* Number of events fired for current fd. */ - - /* Normally we execute the readable event first, and the writable - * event later. This is useful as sometimes we may be able - * to serve the reply of a query immediately after processing the - * query. - * - * However if AE_BARRIER is set in the mask, our application is - * asking us to do the reverse: never fire the writable event - * after the readable. In such a case, we invert the calls. - * This is useful when, for instance, we want to do things - * in the beforeSleep() hook, like fsyncing a file to disk, - * before replying to a client. */ - int invert = fe->mask & AE_BARRIER; - - /* Note the "fe->mask & mask & ..." code: maybe an already - * processed event removed an element that fired and we still - * didn't processed, so we check if the event is still valid. - * - * Fire the readable event if the call sequence is not - * inverted. */ - if (!invert && fe->mask & mask & AE_READABLE) { - fe->rfileProc(eventLoop,fd,fe->clientData,mask); - fired++; - fe = &eventLoop->events[fd]; /* Refresh in case of resize. */ - } - - /* Fire the writable event. */ - if (fe->mask & mask & AE_WRITABLE) { - if (!fired || fe->wfileProc != fe->rfileProc) { - fe->wfileProc(eventLoop,fd,fe->clientData,mask); - fired++; - } - } - - /* If we have to invert the call, fire the readable event now - * after the writable one. */ - if (invert) { - fe = &eventLoop->events[fd]; /* Refresh in case of resize. */ - if ((fe->mask & mask & AE_READABLE) && - (!fired || fe->wfileProc != fe->rfileProc)) - { - fe->rfileProc(eventLoop,fd,fe->clientData,mask); - fired++; - } - } - - processed++; - } - } - /* Check time events */ - if (flags & AE_TIME_EVENTS) - processed += processTimeEvents(eventLoop); - - return processed; /* return the number of processed file/time events */ -} - - - -void redisAeWriteEvent(aeEventLoop *el, int fd, void *privdata, int mask) { - ((void)el); ((void)fd); ((void)mask); - - redisAeEvents *e = (redisAeEvents*)privdata; - redisAsyncHandleWrite(e->context); -} - -int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) { - aeApiState *state = eventLoop->apidata; - struct epoll_event ee = {0}; /* avoid valgrind warning */ - /* If the fd was already monitored for some event, we need a MOD - * operation. Otherwise we need an ADD operation. */ - int op = eventLoop->events[fd].mask == AE_NONE ? - EPOLL_CTL_ADD : EPOLL_CTL_MOD; - - ee.events = 0; - mask |= eventLoop->events[fd].mask; /* Merge old events */ - if (mask & AE_READABLE) ee.events |= EPOLLIN; - if (mask & AE_WRITABLE) ee.events |= EPOLLOUT; - ee.data.fd = fd; - if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1; - return 0; -} - -int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask, - aeFileProc *proc, void *clientData) -{ - if (fd >= eventLoop->setsize) { - errno = ERANGE; - return AE_ERR; - } - aeFileEvent *fe = &eventLoop->events[fd]; - - if (aeApiAddEvent(eventLoop, fd, mask) == -1) - return AE_ERR; - fe->mask |= mask; - if (mask & AE_READABLE) fe->rfileProc = proc; - if (mask & AE_WRITABLE) fe->wfileProc = proc; - fe->clientData = clientData; - if (fd > eventLoop->maxfd) - eventLoop->maxfd = fd; - return AE_OK; -} - -void redisAeAddRead(void *privdata) { - redisAeEvents *e = (redisAeEvents*)privdata; - aeEventLoop *loop = e->loop; - if (!e->reading) { - e->reading = 1; - aeCreateFileEvent(loop,e->fd,AE_READABLE,redisAeReadEvent,e); - } -} - -void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int delmask) { - aeApiState *state = eventLoop->apidata; - struct epoll_event ee = {0}; /* avoid valgrind warning */ - int mask = eventLoop->events[fd].mask & (~delmask); - - ee.events = 0; - if (mask & AE_READABLE) ee.events |= EPOLLIN; - if (mask & AE_WRITABLE) ee.events |= EPOLLOUT; - ee.data.fd = fd; - if (mask != AE_NONE) { - epoll_ctl(state->epfd,EPOLL_CTL_MOD,fd,&ee); + if ((retval = poll(&pfd, 1, milliseconds))== 1) { + if (pfd.revents & POLLIN) retmask |= AE_READABLE; + if (pfd.revents & POLLOUT) retmask |= AE_WRITABLE; + if (pfd.revents & POLLERR) retmask |= AE_WRITABLE; + if (pfd.revents & POLLHUP) retmask |= AE_WRITABLE; + return retmask; } else { - /* Note, Kernel < 2.6.9 requires a non null event pointer even for - * EPOLL_CTL_DEL. */ - epoll_ctl(state->epfd,EPOLL_CTL_DEL,fd,&ee); + return retval; } } -void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask) -{ - if (fd >= eventLoop->setsize) return; - aeFileEvent *fe = &eventLoop->events[fd]; - if (fe->mask == AE_NONE) return; - - /* We want to always remove AE_BARRIER if set when AE_WRITABLE - * is removed. */ - if (mask & AE_WRITABLE) mask |= AE_BARRIER; - - aeApiDelEvent(eventLoop, fd, mask); - fe->mask = fe->mask & (~mask); - if (fd == eventLoop->maxfd && fe->mask == AE_NONE) { - /* Update the max fd */ - int j; - - for (j = eventLoop->maxfd-1; j >= 0; j--) - if (eventLoop->events[j].mask != AE_NONE) break; - eventLoop->maxfd = j; - } -} - -void redisAeDelRead(void *privdata) { - redisAeEvents *e = (redisAeEvents*)privdata; - aeEventLoop *loop = e->loop; - if (e->reading) { - e->reading = 0; - aeDeleteFileEvent(loop,e->fd,AE_READABLE); - } -} - -void redisAeAddWrite(void *privdata) { - redisAeEvents *e = (redisAeEvents*)privdata; - aeEventLoop *loop = e->loop; - if (!e->writing) { - e->writing = 1; - aeCreateFileEvent(loop,e->fd,AE_WRITABLE,redisAeWriteEvent,e); - } -} - -void redisAeDelWrite(void *privdata) { - redisAeEvents *e = (redisAeEvents*)privdata; - aeEventLoop *loop = e->loop; - if (e->writing) { - e->writing = 0; - aeDeleteFileEvent(loop,e->fd,AE_WRITABLE); - } -} - -void redisAeCleanup(void *privdata) { - redisAeEvents *e = (redisAeEvents*)privdata; - redisAeDelRead(privdata); - redisAeDelWrite(privdata); - hi_free(e); -} - -/* Enable the FD_CLOEXEC on the given fd to avoid fd leaks. - * This function should be invoked for fd's on specific places - * where fork + execve system calls are called. */ -int anetCloexec(int fd) { - int r; - int flags; - - do { - r = fcntl(fd, F_GETFD); - } while (r == -1 && errno == EINTR); - - if (r == -1 || (r & FD_CLOEXEC)) - return r; - - flags = r | FD_CLOEXEC; - - do { - r = fcntl(fd, F_SETFD, flags); - } while (r == -1 && errno == EINTR); - - return r; -} - -int aeApiCreate(aeEventLoop *eventLoop) { - aeApiState *state = hi_malloc(sizeof(aeApiState)); - - if (!state) return -1; - state->events = hi_malloc(sizeof(struct epoll_event)*eventLoop->setsize); - if (!state->events) { - hi_free(state); - return -1; - } - state->epfd = epoll_create(1024); /* 1024 is just a hint for the kernel */ - if (state->epfd == -1) { - hi_free(state->events); - hi_free(state); - return -1; - } - anetCloexec(state->epfd); - eventLoop->apidata = state; - return 0; -} - -aeEventLoop *aeCreateEventLoop(int setsize) { - aeEventLoop *eventLoop; - int i; - - if ((eventLoop = hi_malloc(sizeof(*eventLoop))) == NULL) goto err; - eventLoop->events = hi_malloc(sizeof(aeFileEvent)*setsize); - eventLoop->fired = hi_malloc(sizeof(aeFiredEvent)*setsize); - if (eventLoop->events == NULL || eventLoop->fired == NULL) goto err; - eventLoop->setsize = setsize; - eventLoop->timeEventHead = NULL; - eventLoop->timeEventNextId = 0; - eventLoop->stop = 0; - eventLoop->maxfd = -1; - eventLoop->beforesleep = NULL; - eventLoop->aftersleep = NULL; - eventLoop->flags = 0; - if (aeApiCreate(eventLoop) == -1) goto err; - /* Events with mask == AE_NONE are not set. So let's initialize the - * vector with it. */ - for (i = 0; i < setsize; i++) - eventLoop->events[i].mask = AE_NONE; - return eventLoop; - - err: - if (eventLoop) { - hi_free(eventLoop->events); - hi_free(eventLoop->fired); - hi_free(eventLoop); - } - return NULL; -} - -int redisAeAttach(aeEventLoop *loop, redisAsyncContext *ac) { - redisContext *c = &(ac->c); - redisAeEvents *e; - - /* Nothing should be attached when something is already attached */ - if (ac->ev.data != NULL) - return REDIS_ERR; - - /* Create container for context and r/w events */ - e = (redisAeEvents *) hi_malloc(sizeof(*e)); - if (e == NULL) - return REDIS_ERR; - - e->context = ac; - e->loop = loop; - e->fd = c->fd; - e->reading = e->writing = 0; - - /* Register functions to start/stop listening for events */ - ac->ev.addRead = redisAeAddRead; - ac->ev.delRead = redisAeDelRead; - ac->ev.addWrite = redisAeAddWrite; - ac->ev.delWrite = redisAeDelWrite; - ac->ev.cleanup = redisAeCleanup; - ac->ev.data = e; - - return REDIS_OK; -} - diff --git a/src/ae.h b/src/ae.h index 42c041c..90f460a 100644 --- a/src/ae.h +++ b/src/ae.h @@ -1,133 +1,13 @@ #ifndef __HIREDIS_AE_H__ #define __HIREDIS_AE_H__ - #include "hiredis.h" #include "async.h" - -typedef uint64_t monotime; - -#define AE_OK 0 -#define AE_ERR -1 -#define AE_NONE 0 /* No events registered. */ #define AE_READABLE 1 /* Fire when descriptor is readable. */ #define AE_WRITABLE 2 /* Fire when descriptor is writable. */ -#define AE_BARRIER 4 /* With WRITABLE, never fire the event if the - READABLE event already fired in the same event - loop iteration. Useful when you want to persist - things to disk before sending replies, and want - to do that in a group fashion. */ -/* Types and data structures */ -typedef void aeFileProc(struct aeEventLoop *eventLoop, int fd, void *clientData, int mask); -typedef int aeTimeProc(struct aeEventLoop *eventLoop, long long id, void *clientData); - -typedef void aeEventFinalizerProc(struct aeEventLoop *eventLoop, void *clientData); - -typedef void aeBeforeSleepProc(struct aeEventLoop *eventLoop); - -/* File event structure */ -typedef struct aeFileEvent { - int mask; /* one of AE_(READABLE|WRITABLE|BARRIER) */ - aeFileProc *rfileProc; - aeFileProc *wfileProc; - void *clientData; -} aeFileEvent; - -/* Time event structure */ -typedef struct aeTimeEvent { - long long id; /* time event identifier. */ - monotime when; - aeTimeProc *timeProc; - aeEventFinalizerProc *finalizerProc; - void *clientData; - struct aeTimeEvent *prev; - struct aeTimeEvent *next; - int refcount; /* refcount to prevent timer events from being - * freed in recursive time event calls. */ -} aeTimeEvent; - -/* A fired event */ -typedef struct aeFiredEvent { - int fd; - int mask; -} aeFiredEvent; - -typedef struct aeEventLoop { - int maxfd; /* highest file descriptor currently registered */ - int setsize; /* max number of file descriptors tracked */ - long long timeEventNextId; - aeFileEvent *events; /* Registered events */ - aeFiredEvent *fired; /* Fired events */ - aeTimeEvent *timeEventHead; - int stop; - void *apidata; /* This is used for polling API specific data */ - aeBeforeSleepProc *beforesleep; - aeBeforeSleepProc *aftersleep; - int flags; -} aeEventLoop; - -typedef struct redisAeEvents { - redisAsyncContext *context; - aeEventLoop *loop; - int fd; - int reading, writing; -} redisAeEvents; - -typedef struct aeApiState { - int epfd; - struct epoll_event *events; -} aeApiState; - -#define AE_FILE_EVENTS (1<<0) -#define AE_TIME_EVENTS (1<<1) -#define AE_ALL_EVENTS (AE_FILE_EVENTS|AE_TIME_EVENTS) -#define AE_DONT_WAIT (1<<2) -#define AE_CALL_BEFORE_SLEEP (1<<3) -#define AE_CALL_AFTER_SLEEP (1<<4) - -#define AE_NOMORE -1 -#define AE_DELETED_EVENT_ID -1 - -extern monotime (*getMonotonicUs)(void); - -void redisAeReadEvent(aeEventLoop *el, int fd, void *privdata, int mask); - -void redisAeWriteEvent(aeEventLoop *el, int fd, void *privdata, int mask); - -int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask); - -void aeStop(aeEventLoop *eventLoop); - -void aeMain(aeEventLoop *eventLoop); - -int aeProcessEvents(aeEventLoop *eventLoop, int flags); - -int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask, - aeFileProc *proc, void *clientData); - -void redisAeAddRead(void *privdata); - -void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int delmask); - -void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask); - -void redisAeDelRead(void *privdata); - -void redisAeAddWrite(void *privdata); - -void redisAeDelWrite(void *privdata); - -void redisAeCleanup(void *privdata); - -int anetCloexec(int fd); - -static int aeApiCreate(aeEventLoop *eventLoop); - -aeEventLoop *aeCreateEventLoop(int setsize); - -int redisAeAttach(aeEventLoop *loop, redisAsyncContext *ac); +int aeWait(int fd, int mask, long long milliseconds); #endif diff --git a/src/alloc.c b/src/alloc.c index 0902286..7c2382f 100644 --- a/src/alloc.c +++ b/src/alloc.c @@ -42,7 +42,8 @@ hiredisAllocFuncs hiredisAllocFns = { }; /* Override hiredis' allocators with ones supplied by the user */ -hiredisAllocFuncs hiredisSetAllocators(hiredisAllocFuncs *override) { +hiredisAllocFuncs hiredisSetAllocators(hiredisAllocFuncs *override) +{ hiredisAllocFuncs orig = hiredisAllocFns; hiredisAllocFns = *override; @@ -51,8 +52,9 @@ hiredisAllocFuncs hiredisSetAllocators(hiredisAllocFuncs *override) { } /* Reset allocators to use libc defaults */ -void hiredisResetAllocators(void) { - hiredisAllocFns = (hiredisAllocFuncs) { +void hiredisResetAllocators(void) +{ + hiredisAllocFns = (hiredisAllocFuncs){ .mallocFn = malloc, .callocFn = calloc, .reallocFn = realloc, @@ -63,11 +65,13 @@ void hiredisResetAllocators(void) { #ifdef _WIN32 -void *hi_malloc(size_t size) { +void *hi_malloc(size_t size) +{ return hiredisAllocFns.mallocFn(size); } -void *hi_calloc(size_t nmemb, size_t size) { +void *hi_calloc(size_t nmemb, size_t size) +{ /* Overflow check as the user can specify any arbitrary allocator */ if (SIZE_MAX / size < nmemb) return NULL; @@ -75,15 +79,18 @@ void *hi_calloc(size_t nmemb, size_t size) { return hiredisAllocFns.callocFn(nmemb, size); } -void *hi_realloc(void *ptr, size_t size) { +void *hi_realloc(void *ptr, size_t size) +{ return hiredisAllocFns.reallocFn(ptr, size); } -char *hi_strdup(const char *str) { +char *hi_strdup(const char *str) +{ return hiredisAllocFns.strdupFn(str); } -void hi_free(void *ptr) { +void hi_free(void *ptr) +{ hiredisAllocFns.freeFn(ptr); } diff --git a/src/async.c b/src/async.c index e13c090..9d1486f 100644 --- a/src/async.c +++ b/src/async.c @@ -812,7 +812,7 @@ static int __redisAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void if (!(c->flags & REDIS_SUBSCRIBED)) return REDIS_ERR; /* (P)UNSUBSCRIBE does not have its own response: every channel or - * pattern that is unsubscribed will receive a message. This means we + * pattern that is unsubscribed will redisReceive a message. This means we * should not append a callback function for this command. */ } else if (strncasecmp(cstr,"monitor\r\n",9) == 0) { /* Set monitor flag and push callback */ diff --git a/src/dict.c b/src/dict.c index ad57181..b810eb3 100644 --- a/src/dict.c +++ b/src/dict.c @@ -33,9 +33,7 @@ * POSSIBILITY OF SUCH DAMAGE. */ -#include "fmacros.h" #include "alloc.h" -#include #include #include #include "dict.h" diff --git a/src/hiredis.c b/src/hiredis.c index dd21d60..32480df 100644 --- a/src/hiredis.c +++ b/src/hiredis.c @@ -1139,6 +1139,16 @@ int redisAppendCommand(redisContext *c, const char *format, ...) { return ret; } +int redisSendCommand(redisContext *c, const char *format, ...) { + va_list ap; + int ret; + + va_start(ap,format); + ret = redisvAppendCommand(c,format,ap); + va_end(ap); + return redisFlush(c); +} + int redisAppendCommandArgv(redisContext *c, int argc, const char **argv, const size_t *argvlen) { hisds cmd; long long len; diff --git a/src/hiredis.h b/src/hiredis.h index 057fafd..e3377d9 100644 --- a/src/hiredis.h +++ b/src/hiredis.h @@ -331,6 +331,7 @@ int redisAppendFormattedCommand(redisContext *c, const char *cmd, size_t len); * to get a pipeline of commands. */ int redisvAppendCommand(redisContext *c, const char *format, va_list ap); int redisAppendCommand(redisContext *c, const char *format, ...); +int redisSendCommand(redisContext *c, const char *format, ...); int redisAppendCommandArgv(redisContext *c, int argc, const char **argv, const size_t *argvlen); /* Issue a command to Redis. In a blocking context, it is identical to calling diff --git a/src/log.c b/src/log.c new file mode 100644 index 0000000..f6ec670 --- /dev/null +++ b/src/log.c @@ -0,0 +1,111 @@ +#include +#include +#include +#include +#include +#include +#include "log.h" +#include "hiredis.h" + + + +static int is_leap_year(time_t year) { + if (year % 4) return 0; /* A year not divisible by 4 is not leap. */ + else if (year % 100) return 1; /* If div by 4 and not 100 is surely leap. */ + else if (year % 400) return 0; /* If div by 100 *and* not by 400 is not leap. */ + else return 1; /* If div by 100 and 400 is leap. */ +} + +void createLogObj(char *logfile) { + log = hi_malloc(sizeof(logObj)); + log->logfile = logfile; + log->loglevel = LL_NOTICE; +} + + +void _serverLog(int level, const char *fmt, ...) { + va_list ap; + char msg[LOG_MAX_LEN]; + + va_start(ap, fmt); + vsnprintf(msg, sizeof(msg), fmt, ap); + va_end(ap); +} +void serverLogRaw(int level, const char *msg) { + const char *c = ".-*#"; + FILE *fp; + char buf[64]; + int log_to_stdout = log->logfile[0] == '\0'; + + if (level < log->loglevel) { + return; + } + + fp = log_to_stdout ? stdout : fopen(log->logfile, "a"); + if (!fp) return; + + int off; + struct timeval tv; + int role_char; + pid_t pid = getpid(); + + gettimeofday(&tv, NULL); + struct tm tm; + nolocks_localtime(&tm, tv.tv_sec, timezone, 1); + off = strftime(buf, sizeof(buf), "%d %b %Y %H:%M:%S.", &tm); + snprintf(buf + off, sizeof(buf) - off, "%03d", (int) tv.tv_usec / 1000); + role_char = 'm'; + fprintf(fp, "%d:%c %s %c %s\n", + (int) pid, role_char, buf, c[level], msg); + + fflush(fp); + + if (!log_to_stdout) fclose(fp); +} + +void nolocks_localtime(struct tm *tmp, time_t t, time_t tz, int dst) { + const time_t secs_min = 60; + const time_t secs_hour = 3600; + const time_t secs_day = 3600*24; + + t -= tz; /* Adjust for timezone. */ + t += 3600*dst; /* Adjust for daylight time. */ + time_t days = t / secs_day; /* Days passed since epoch. */ + time_t seconds = t % secs_day; /* Remaining seconds. */ + + tmp->tm_isdst = dst; + tmp->tm_hour = seconds / secs_hour; + tmp->tm_min = (seconds % secs_hour) / secs_min; + tmp->tm_sec = (seconds % secs_hour) % secs_min; + + /* 1/1/1970 was a Thursday, that is, day 4 from the POV of the tm structure + * where sunday = 0, so to calculate the day of the week we have to add 4 + * and take the modulo by 7. */ + tmp->tm_wday = (days+4)%7; + + /* Calculate the current year. */ + tmp->tm_year = 1970; + while(1) { + /* Leap years have one day more. */ + time_t days_this_year = 365 + is_leap_year(tmp->tm_year); + if (days_this_year > days) break; + days -= days_this_year; + tmp->tm_year++; + } + tmp->tm_yday = days; /* Number of day of the current year. */ + + /* We need to calculate in which month and day of the month we are. To do + * so we need to skip days according to how many days there are in each + * month, and adjust for the leap year that has one more day in February. */ + int mdays[12] = {31, 28, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31}; + mdays[1] += is_leap_year(tmp->tm_year); + + tmp->tm_mon = 0; + while(days >= mdays[tmp->tm_mon]) { + days -= mdays[tmp->tm_mon]; + tmp->tm_mon++; + } + + tmp->tm_mday = days+1; /* Add 1 since our 'days' is zero-based. */ + tmp->tm_year -= 1900; /* Surprisingly tm_year is year-1900. */ +} diff --git a/src/log.h b/src/log.h new file mode 100644 index 0000000..7a783c9 --- /dev/null +++ b/src/log.h @@ -0,0 +1,34 @@ + +#ifndef REDIS_MIGRATE_LOG_H +#define REDIS_MIGRATE_LOG_H + +#define NOTICE "notice" +#define WARNING "warning" +#define VERBOSE "VERBOSE" +#define DEBUG "debug" + +#define LL_DEBUG 0 +#define LL_VERBOSE 1 +#define LL_NOTICE 2 +#define LL_WARNING 3 +#define LOG_MAX_LEN 1024 + +typedef struct logObj { + char *logfile; + int loglevel; +} logObj; + +static logObj *log; + +void createLogObj(char *logfile); + +#define serverLog(level, ...) _serverLog(level, __VA_ARGS__); + +void _serverLog(int level, const char *fmt, ...) +__attribute__((format(printf, 2, 3))); + +void serverLogRaw(int level, const char *msg); + +void nolocks_localtime(struct tm *tmp, time_t t, time_t tz, int dst); + +#endif //REDIS_MIGRATE_LOG_H diff --git a/src/rdbLoad.c b/src/rdbLoad.c new file mode 100644 index 0000000..03fc757 --- /dev/null +++ b/src/rdbLoad.c @@ -0,0 +1,53 @@ +#include "rdbLoad.h" +#include + +int rdbLoadRioWithLoading(migrateObj *mobj) +{ + char buf[1024]; + int error; + if (syncReadLine(mobj->source_cc->fd, buf, 9, mobj->timeout) == -1) + { + serverLog(LL_WARNING, "read version failed:%s,port:%d ", mobj->host, mobj->port); + return C_ERR; + } + buf[9] = '\0'; + if (memcmp(buf, "REDIS", 5) != 0) + { + serverLog(LL_WARNING, "Wrong signature trying to load DB from file"); + errno = EINVAL; + return C_ERR; + } + int type, rdbver; + rdbver = atoi(buf + 5); + if (rdbver < 1 || rdbver > RDB_VERSION) + { + serverLog(LL_WARNING, "Can't handle RDB format version %d", rdbver); + errno = EINVAL; + return C_ERR; + } + serverLog(LL_NOTICE, "buf=%s", buf); + long long lru_idle = -1, lfu_freq = -1, expiretime = -1, now = mstime(); + while (1) + { + sds key; + robj *val; + if ((type = rdbLoadType(mobj)) == -1) { + serverLog(LL_WARNING, "read type failed"); + return C_ERR; + } + if (type == RDB_OPCODE_EXPIRETIME) { + + } + } +} + + +int rdbLoadType(migrateObj *mobj) { + char buf[1]; + syncReadLine(mobj->source_cc->fd, buf, 1, mobj->timeout); + return buf[0]; +} + +time_t rdbLoadTime(migrateObj *mobj) { + +} \ No newline at end of file diff --git a/src/rdbLoad.h b/src/rdbLoad.h new file mode 100644 index 0000000..83bc4ed --- /dev/null +++ b/src/rdbLoad.h @@ -0,0 +1,25 @@ +#ifndef RDB_LOAD_REDIS_MIGRATE_H +#define RDB_LOAD_REDIS_MIGRATE_H +#include "redis-migrate.h" + + +#define RDB_VERSION 10 +#define RDB_OPCODE_FUNCTION2 245 /* function library data */ +#define RDB_OPCODE_FUNCTION 246 /* old function library data for 7.0 rc1 and rc2 */ +#define RDB_OPCODE_MODULE_AUX 247 /* Module auxiliary data. */ +#define RDB_OPCODE_IDLE 248 /* LRU idle time. */ +#define RDB_OPCODE_FREQ 249 /* LFU frequency. */ +#define RDB_OPCODE_AUX 250 /* RDB aux field. */ +#define RDB_OPCODE_RESIZEDB 251 /* Hash table resize hint. */ +#define RDB_OPCODE_EXPIRETIME_MS 252 /* Expire time in milliseconds. */ +#define RDB_OPCODE_EXPIRETIME 253 /* Old expire time in seconds. */ +#define RDB_OPCODE_SELECTDB 254 /* DB number of the following keys. */ +#define RDB_OPCODE_EOF 255 /* End of the RDB file. */ + +int rdbLoadRioWithLoading(migrateObj *mobj); + +int rdbLoadType(migrateObj *mobj); + +time_t rdbLoadTime(migrateObj *mobj); + +#endif diff --git a/src/redis-migrate.c b/src/redis-migrate.c index 572ccd0..06a3a90 100644 --- a/src/redis-migrate.c +++ b/src/redis-migrate.c @@ -2,13 +2,14 @@ #include #include #include -#include #include "redis-migrate.h" #include "hiredis.h" +#include "rdbLoad.h" static migrateObj *mobj; -migrateObj *createMigrateObject(robj *host, int port, int begin_slot, int end_slot) { +migrateObj *createMigrateObject(robj *host, int port, int begin_slot, int end_slot) +{ migrateObj *m; m = hi_malloc(sizeof(*m)); m->host = host->ptr; @@ -17,162 +18,439 @@ migrateObj *createMigrateObject(robj *host, int port, int begin_slot, int end_sl m->end_slot = end_slot; m->repl_stat = REPL_STATE_NONE; m->isCache = 0; + m->timeout = 10 * 1000; + m->repl_transfer_size = -1; return m; } -void freeMigrateObj(migrateObj *m) { +void freeMigrateObj(migrateObj *m) +{ redisFree(m->source_cc); hi_free(m); mobj = NULL; } -int sendSyncCommand(RedisModuleCtx *ctx) { +long long ustime(void) +{ + struct timeval tv; + long long ust; - if (!mobj->isCache) { + gettimeofday(&tv, NULL); + ust = ((long long)tv.tv_sec) * 1000000; + ust += tv.tv_usec; + return ust; +} + +/* Return the UNIX time in milliseconds */ +mstime_t mstime(void) +{ + return ustime() / 1000; +} + +int sendSyncCommand() +{ + if (!mobj->isCache) + { mobj->isCache = 1; mobj->psync_replid = "?"; memcpy(mobj->psync_offset, "-1", 3); } - if (redisAppendCommand(mobj->source_cc, "PSYNC %s %s", mobj->psync_replid, mobj->psync_offset) != REDIS_OK) { - RedisModule_Log(ctx, REDISMODULE_LOGLEVEL_WARNING, - "append PSYNC %s %s failed ip:%s,port:%d, ", - mobj->psync_replid, mobj->psync_offset, mobj->host, mobj->port); + if (redisAppendCommand(mobj->source_cc, "PSYNC %s %s", mobj->psync_replid, mobj->psync_offset) != REDIS_OK) + { + serverLog(LL_WARNING, "append PSYNC %s %s failed ip:%s,port:%d, ", + mobj->psync_replid, mobj->psync_offset, mobj->host, mobj->port); return 0; } - if (redisFlush(mobj->source_cc) != REDIS_OK) { - RedisModule_Log(ctx, REDISMODULE_LOGLEVEL_WARNING, - "send PSYNC %s %s failed ip:%s,port:%d, ", - mobj->psync_replid, mobj->psync_offset, mobj->host, mobj->port); + if (redisFlush(mobj->source_cc) != REDIS_OK) + { + serverLog(LL_WARNING, "send PSYNC %s %s failed ip:%s,port:%d, ", + mobj->psync_replid, mobj->psync_offset, mobj->host, mobj->port); return 0; } - RedisModule_Log(ctx, REDISMODULE_LOGLEVEL_NOTICE, "PSYNC %s %s ", - mobj->psync_replid, mobj->psync_offset); + serverLog(LL_NOTICE, "PSYNC %s %s ", mobj->psync_replid, mobj->psync_offset); return 1; } -void *syncWithRedis(void *arg) { - RedisModuleCtx *ctx = arg; - redisReply *reply; - RedisModule_Log(ctx, REDISMODULE_LOGLEVEL_NOTICE, "begin sync data"); - reply = redisCommand(mobj->source_cc, "PING"); - if (reply->type == REDIS_REPLY_ERROR) { - RedisModule_Log(ctx, REDISMODULE_LOGLEVEL_WARNING, "send PING failed ip:%s,port:%d, error:%s", - mobj->host, mobj->port, reply->str); - goto error; +sds redisReceive() +{ + char buf[256]; + if (syncReadLine(mobj->source_cc->fd, buf, sizeof(buf), mobj->timeout) == -1) + { + serverLog(LL_WARNING, "redisReceive failed ip:%s,port:%d ", mobj->host, mobj->port); + return NULL; } - RedisModule_Log(ctx, REDISMODULE_LOGLEVEL_NOTICE, "PING "); - freeReplyObject(reply); - //todo auth with master - sds portstr = sdsfromlonglong(mobj->port); - reply = redisCommand(mobj->source_cc, "REPLCONF listening-port %s", portstr); - if (reply->type == REDIS_REPLY_ERROR) { - sdsfree(portstr); - RedisModule_Log(ctx, REDISMODULE_LOGLEVEL_WARNING, "REPLCONF listening-port %s failed ip:%s,port:%d, error:%s", - portstr, mobj->host, mobj->port, reply->str); - goto error; - } - RedisModule_Log(ctx, REDISMODULE_LOGLEVEL_NOTICE, "REPLCONF listening-port %s ", portstr); - sdsfree(portstr); - freeReplyObject(reply); - reply = redisCommand(mobj->source_cc, "REPLCONF ip-address %s", mobj->host); - if (reply->type == REDIS_REPLY_ERROR) { - RedisModule_Log(ctx, REDISMODULE_LOGLEVEL_WARNING, "REPLCONF ip-address %s failed ip:%s,port:%d, error:%s", - mobj->host, mobj->host, mobj->port, reply->str); - goto error; - } - RedisModule_Log(ctx, REDISMODULE_LOGLEVEL_NOTICE, "REPLCONF ip-address %s ", mobj->host); - freeReplyObject(reply); - reply = redisCommand(mobj->source_cc, "REPLCONF %s %s %s %s", "capa", "eof", "capa", "psync2"); - if (reply->type == REDIS_REPLY_ERROR) { - RedisModule_Log(ctx, REDISMODULE_LOGLEVEL_WARNING, - "REPLCONF capa eof capa psync2 failed ip:%s,port:%d, error:%s", - mobj->host, mobj->port, reply->str); - goto error; - } - RedisModule_Log(ctx, REDISMODULE_LOGLEVEL_NOTICE, "REPLCONF %s %s %s %s", - "capa", "eof", "capa", "psync2"); - freeReplyObject(reply); - if (!sendSyncCommand(ctx)) { - RedisModule_Log(ctx, REDISMODULE_LOGLEVEL_WARNING, - "send PSYNC %s %s failed ip:%s,port:%d, ", - mobj->psync_replid, mobj->psync_offset, mobj->host, mobj->port); - goto error; - } - - return NULL; - error: - freeReplyObject(reply); - freeMigrateObj(mobj); - return NULL; + return hi_sdsnew(buf); } -int connectRedis(RedisModuleCtx *ctx) { - pthread_t pthread; - int flag = pthread_create(&pthread, NULL, syncWithRedis, ctx); - if (flag != 0) { - RedisModule_Log(ctx, REDISMODULE_LOGLEVEL_WARNING, "Can't start thread"); - freeMigrateObj(mobj); - return RedisModule_ReplyWithError(ctx, "Can't start thread"); +int receiveDataFromRedis() +{ + sds reply = redisReceive(); + if (reply == NULL) + { + serverLog(LL_WARNING, "Master did not reply to PSYNC"); + return PSYNC_TRY_LATER; } - pthread_join(pthread, NULL); - return RedisModule_ReplyWithSimpleString(ctx, "OK"); + if (sdslen(reply) == 0) + { + sdsfree(reply); + return PSYNC_WAIT_REPLY; + } + serverLog(LL_NOTICE, "reply=%s", reply); + if (!strncmp(reply, "+FULLRESYNC", 11)) + { + char *replid = NULL, *offset = NULL; + + /* FULL RESYNC, parse the reply in order to extract the replid + * and the replication offset. */ + replid = strchr(reply, ' '); + if (replid) + { + replid++; + offset = strchr(replid, ' '); + if (offset) + offset++; + } + if (!replid || !offset || (offset - replid - 1) != CONFIG_RUN_ID_SIZE) + { + serverLog(LL_WARNING, "Master replied with wrong +FULLRESYNC syntax."); + /* This is an unexpected condition, actually the +FULLRESYNC + * reply means that the master supports PSYNC, but the reply + * format seems wrong. To stay safe we blank the master + * replid to make sure next PSYNCs will fail. */ + memset(mobj->master_replid, 0, CONFIG_RUN_ID_SIZE + 1); + } + else + { + memcpy(mobj->master_replid, replid, offset - replid - 1); + mobj->master_replid[CONFIG_RUN_ID_SIZE] = '\0'; + mobj->master_initial_offset = strtoll(offset, NULL, 10); + serverLog(LL_NOTICE, "Full sync from master: %s:%lld", + mobj->master_replid, mobj->master_initial_offset); + } + sdsfree(reply); + return PSYNC_FULLRESYNC; + } + if (!strncmp(reply, "+CONTINUE", 9)) + { + /* Partial resync was accepted. */ + serverLog(LL_NOTICE, "Successful partial resynchronization with master."); + + /* Check the new replication ID advertised by the master. If it + * changed, we need to set the new ID as primary ID, and set + * secondary ID as the old master ID up to the current offset, so + * that our sub-slaves will be able to PSYNC with us after a + * disconnection. */ + char *start = reply + 10; + char *end = reply + 9; + while (end[0] != '\r' && end[0] != '\n' && end[0] != '\0') + end++; + if (end - start == CONFIG_RUN_ID_SIZE) + { + char new[CONFIG_RUN_ID_SIZE + 1]; + memcpy(new, start, CONFIG_RUN_ID_SIZE); + new[CONFIG_RUN_ID_SIZE] = '\0'; + + return PSYNC_CONTINUE; + } + } +} + +void readFullData() +{ + static char eofmark[CONFIG_RUN_ID_SIZE]; + static char lastbytes[CONFIG_RUN_ID_SIZE]; + static int usemark = 0; + char buf[PROTO_IOBUF_LEN]; + if (mobj->repl_transfer_size == -1) + { + int nread = syncReadLine(mobj->source_cc->fd, buf, PROTO_IOBUF_LEN, mobj->timeout); + if (nread == -1) + { + serverLog(LL_WARNING, "read full data failed"); + goto error; + } + if (buf[0] == '-') + { + serverLog(LL_WARNING, + "MASTER aborted replication with an error: %s", + buf + 1); + goto error; + } + else if (buf[0] == '\0') + { + // mobj->repl_transfer_lastio = server.unixtime; + return; + } + else if (buf[0] != '$') + { + serverLog(LL_WARNING, "Bad protocol from MASTER, the first byte is not '$' (we received '%s'), are you sure the host and port are right?", buf); + goto error; + } + + if (strncmp(buf + 1, "EOF:", 4) == 0 && strlen(buf + 5) >= CONFIG_RUN_ID_SIZE) + { + usemark = 1; + usemark = 1; + memcpy(eofmark, buf + 5, CONFIG_RUN_ID_SIZE); + memset(lastbytes, 0, CONFIG_RUN_ID_SIZE); + /* Set any repl_transfer_size to avoid entering this code path + * at the next call. */ + mobj->repl_transfer_size = 0; + serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: receiving streamed RDB from master with EOF to parser"); + } + else + { + usemark = 0; + mobj->repl_transfer_size = strtol(buf + 1, NULL, 10); + serverLog(LL_NOTICE, + "MASTER <-> REPLICA sync: receiving %lld bytes from master to parser", + (long long)mobj->repl_transfer_size); + } + } + int flag = rdbLoadRioWithLoading(mobj); + + + return; +error: + cancelMigrate(); + return; +} + +void cancelMigrate() +{ +} + +void syncDataWithRedis(int fd, void *user_data, int mask) +{ + REDISMODULE_NOT_USED(fd); + REDISMODULE_NOT_USED(mask); + REDISMODULE_NOT_USED(user_data); + sds err = NULL; + if (mobj->repl_stat == REPL_STATE_CONNECTING) + { + if (redisSendCommand(mobj->source_cc, "PING") != REDIS_OK) + { + serverLog(LL_WARNING, "send PING failed ip:%s,port:%d", + mobj->host, mobj->port); + goto error; + } + mobj->repl_stat = REPL_STATE_RECEIVE_PING_REPLY; + return; + } + if (mobj->repl_stat == REPL_STATE_RECEIVE_PING_REPLY) + { + err = redisReceive(); + if (err == NULL) + goto no_response_error; + if (err[0] != '+' && strncmp(err, "-NOAUTH", 7) != 0 && strncmp(err, "-NOPERM", 7) != 0 && + strncmp(err, "-ERR operation not permitted", 28) != 0) + { + serverLog(LL_WARNING, "Error reply to PING from master: '%s'", err); + sdsfree(err); + goto error; + } + else + { + serverLog(LL_NOTICE, "Master replied to PING, replication can continue..."); + } + sdsfree(err); + err = NULL; + mobj->repl_stat = REPL_STATE_SEND_HANDSHAKE; + return; + } + if (mobj->repl_stat == REPL_STATE_SEND_HANDSHAKE) + { + // todo 增加认证 + mobj->repl_stat = REPL_STATE_RECEIVE_AUTH_REPLY; + } + if (mobj->repl_stat == REPL_STATE_RECEIVE_AUTH_REPLY) + { + // todo 接受认证信息 + sds portstr = sdsfromlonglong(mobj->port); + if (redisSendCommand(mobj->source_cc, "REPLCONF listening-port %s", portstr) != REDIS_OK) + { + serverLog(LL_WARNING, "send PING failed ip:%s,port:%d", + mobj->host, mobj->port); + goto error; + } + mobj->repl_stat = REPL_STATE_RECEIVE_PORT_REPLY; + sdsfree(portstr); + return; + } + if (mobj->repl_stat == REPL_STATE_RECEIVE_PORT_REPLY) + { + err = redisReceive(); + if (err == NULL) + goto no_response_error; + if (err[0] == '-') + { + serverLog(LL_NOTICE, "(Non critical) Master does not understand REPLCONF listening-port: %s", err); + goto error; + } + serverLog(LL_NOTICE, "REPLCONF listening-port success"); + if (redisSendCommand(mobj->source_cc, "REPLCONF ip-address %s", mobj->host) != REDIS_OK) + { + serverLog(LL_WARNING, "REPLCONF ip-address %s failed", mobj->host); + goto error; + } + sdsfree(err); + err = NULL; + mobj->repl_stat = REPL_STATE_RECEIVE_IP_REPLY; + return; + } + if (mobj->repl_stat == REPL_STATE_RECEIVE_IP_REPLY) + { + err = redisReceive(); + if (err == NULL) + goto no_response_error; + if (err[0] == '-') + { + serverLog(LL_NOTICE, "(Non critical) Master does not understand REPLCONF ip-address: %s", err); + goto error; + } + serverLog(LL_NOTICE, "REPLCONF REPLCONF ip-address success"); + if (redisSendCommand(mobj->source_cc, "REPLCONF %s %s %s %s", "capa", "eof", "capa", "psync2") != REDIS_OK) + { + serverLog(LL_WARNING, "send REPLCONF capa eof capa psync2 failed"); + goto error; + } + sdsfree(err); + err = NULL; + mobj->repl_stat = REPL_STATE_RECEIVE_CAPA_REPLY; + return; + } + if (mobj->repl_stat == REPL_STATE_RECEIVE_CAPA_REPLY) + { + err = redisReceive(); + if (err == NULL) + goto no_response_error; + if (err[0] == '-') + { + serverLog(LL_NOTICE, "(Non critical) Master does not understand REPLCONF capa: %s", err); + goto error; + } + serverLog(LL_NOTICE, "REPLCONF capa eof capa psync2 success"); + sdsfree(err); + err = NULL; + mobj->repl_stat = REPL_STATE_SEND_PSYNC; + return; + } + if (mobj->repl_stat == REPL_STATE_SEND_PSYNC) + { + if (!sendSyncCommand()) + { + serverLog(LL_WARNING, "send PSYNC %s %s failed ip:%s,port:%d, ", + mobj->psync_replid, mobj->psync_offset, mobj->host, mobj->port); + goto error; + } + mobj->repl_stat = REPL_STATE_RECEIVE_PSYNC_REPLY; + return; + } + if (mobj->repl_stat != REPL_STATE_RECEIVE_PSYNC_REPLY) + { + serverLog(LL_WARNING, "syncDataWithRedis(): state machine error, state should be RECEIVE_PSYNC but is %d", + mobj->repl_stat); + goto error; + } + int psync_result = receiveDataFromRedis(); + if (psync_result == PSYNC_WAIT_REPLY) + return; + if (psync_result == PSYNC_TRY_LATER) + goto error; + if (psync_result == PSYNC_CONTINUE) + { + } + // 接受全部数据 + serverLog(LL_NOTICE, "begin receive full data"); + readFullData(); + return; +error: + if (err != NULL) + { + sdsfree(err); + } + freeMigrateObj(mobj); +no_response_error: /* Handle receiveSynchronousResponse() error when master has no reply */ + serverLog(LL_WARNING, "Master did not respond to command during SYNC handshake"); } /** * migrate data to current instance. * migrate host port begin-slot end-slot - * @param ctx + * @param ctxz * @param argv * @param argc * @return */ -int rm_migrateCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { - if (argc != 5) { +int rm_migrateCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) +{ + if (argc != 5) + { return RedisModule_WrongArity(ctx); } - robj *host = (robj *) argv[1]; - robj *port = (robj *) argv[2]; - robj *begin_slot = (robj *) argv[3]; - robj *end_slot = (robj *) argv[4]; - RedisModule_Log(ctx, REDISMODULE_LOGLEVEL_NOTICE, "host:%s, port:%s, begin:%s, end:%s", (char *) host->ptr, - (char *) port->ptr, (char *) begin_slot->ptr, (char *) end_slot->ptr); - if (mobj != NULL) { - return RedisModule_ReplyWithError(ctx, "-ERR is migrating, please waiting"); + if (RedisModule_IsKeysPositionRequest(ctx)) + { + RedisModule_Log(ctx, VERBOSE, "get keys from module"); + return RedisModule_ReplyWithSimpleString(ctx, "OK"); + } + robj *host = (robj *)argv[1]; + robj *port = (robj *)argv[2]; + robj *begin_slot = (robj *)argv[3]; + robj *end_slot = (robj *)argv[4]; + RedisModule_Log(ctx, NOTICE, "host:%s, port:%s, begin:%s, end:%s", (char *)host->ptr, + (char *)port->ptr, (char *)begin_slot->ptr, (char *)end_slot->ptr); + if (mobj != NULL) + { + return RedisModule_ReplyWithError(ctx, "migrating, please waiting"); } mobj = createMigrateObject(host, atoi(port->ptr), atoi(begin_slot->ptr), atoi(end_slot->ptr)); struct timeval timeout = {1, 500000}; // 1.5s redisOptions options = {0}; - REDIS_OPTIONS_SET_TCP(&options, (const char *) mobj->host, mobj->port); + REDIS_OPTIONS_SET_TCP(&options, (const char *)mobj->host, mobj->port); options.connect_timeout = &timeout; mobj->source_cc = redisConnectWithOptions(&options); - if (mobj->source_cc == NULL || mobj->source_cc->err) { - RedisModule_Log(ctx, REDISMODULE_LOGLEVEL_WARNING, "Could not connect to Redis at ip:%s,port:%d, error:%s", + if (mobj->source_cc == NULL || mobj->source_cc->err) + { + RedisModule_Log(ctx, WARNING, "Could not connect to Redis at ip:%s,port:%d, error:%s", mobj->host, mobj->port, mobj->source_cc->errstr); freeMigrateObj(mobj); return RedisModule_ReplyWithError(ctx, "Can't connect source redis"); } - return connectRedis(ctx); - + mobj->repl_stat = REPL_STATE_CONNECTING; + RedisModule_EventLoopAdd(mobj->source_cc->fd, AE_WRITABLE, syncDataWithRedis, ctx); + return RedisModule_ReplyWithSimpleString(ctx, "OK"); } -/* This function must be present on each Redis module. It is used in order to - * register the commands into the Redis server. */ -int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { +int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) +{ REDISMODULE_NOT_USED(argv); REDISMODULE_NOT_USED(argc); int flag = RedisModule_Init(ctx, MODULE_NAME, REDIS_MIGRATE_VERSION, REDISMODULE_APIVER_1); - if (flag == REDISMODULE_ERR) { + if (flag == REDISMODULE_ERR) + { return REDISMODULE_ERR; } - RedisModule_SetClusterFlags(ctx, REDISMODULE_CLUSTER_FLAG_NO_REDIRECTION); - RedisModule_Log(ctx, REDISMODULE_LOGLEVEL_NOTICE, "begin init commands of %s", MODULE_NAME); - flag = RedisModule_CreateCommand(ctx, "rm.migrate", rm_migrateCommand, "write deny-oom admin", 1, 1, 0); - if (flag == REDISMODULE_ERR) { - RedisModule_Log(ctx, REDISMODULE_LOGLEVEL_WARNING, "init rm.migrate failed"); + + RedisModule_Log(ctx, NOTICE, "begin init commands of %s", MODULE_NAME); + flag = RedisModule_CreateCommand(ctx, "rm.migrate", rm_migrateCommand, "write deny-oom admin getkeys-api", 0, 0, 0); + if (flag == REDISMODULE_ERR) + { + RedisModule_Log(ctx, WARNING, "init rm.migrate failed"); return REDISMODULE_ERR; } - RedisModule_Log(ctx, REDISMODULE_LOGLEVEL_NOTICE, "init %s success", MODULE_NAME); + RedisModuleCallReply *reply = RedisModule_Call(ctx, "config", "cc", "get", "logfile"); + long long items = RedisModule_CallReplyLength(reply); + if (items != 2) + { + RedisModule_Log(ctx, WARNING, "logfile is empty"); + return REDISMODULE_ERR; + } + RedisModuleCallReply *item1 = RedisModule_CallReplyArrayElement(reply, 1); + robj *logfile = (robj *)RedisModule_CreateStringFromCallReply(item1); + RedisModule_Log(ctx, NOTICE, "logfile is %s", (char *)logfile->ptr); + createLogObj((char *)logfile->ptr); + RedisModule_Log(ctx, NOTICE, "init %s success", MODULE_NAME); return REDISMODULE_OK; } diff --git a/src/redis-migrate.h b/src/redis-migrate.h index ede2ef3..43958d2 100644 --- a/src/redis-migrate.h +++ b/src/redis-migrate.h @@ -6,27 +6,37 @@ #include "ae.h" #include "sds.h" #include "sdscompat.h" +#include "log.h" #define MODULE_NAME "redis-migrate" #define REDIS_MIGRATE_VERSION 1 #define LRU_BITS 24 +#define CONFIG_RUN_ID_SIZE 40 +#define RDB_EOF_MARK_SIZE 40 +#define PSYNC_WRITE_ERROR 0 +#define PSYNC_WAIT_REPLY 1 +#define PSYNC_CONTINUE 2 +#define PSYNC_FULLRESYNC 3 +#define PSYNC_NOT_SUPPORTED 4 +#define PSYNC_TRY_LATER 5 #define C_ERR -1 #define C_OK 1 +#define PROTO_IOBUF_LEN (1024 * 16) /* Anti-warning macro... */ -#define UNUSED(V) ((void) V) +#define UNUSED(V) ((void)V) -typedef struct redisObject { - unsigned type: 4; - unsigned encoding: 4; - unsigned lru: LRU_BITS; /* LRU time (relative to global lru_clock) or - * LFU data (least significant 8 bits frequency - * and most significant 16 bits access time). */ +typedef struct redisObject +{ + unsigned type : 4; + unsigned encoding : 4; + unsigned lru : LRU_BITS; int refcount; void *ptr; } robj; -typedef struct migrateObject { +typedef struct migrateObject +{ char *address; int repl_stat; redisContext *source_cc; @@ -35,14 +45,20 @@ typedef struct migrateObject { int begin_slot; int end_slot; char *psync_replid; + char master_replid[CONFIG_RUN_ID_SIZE + 1]; + int timeout; int isCache; char psync_offset[32]; + int repl_transfer_size; + long long master_initial_offset; + time_t repl_transfer_lastio; } migrateObj; -typedef enum { - REPL_STATE_NONE = 0, /* No active replication */ - REPL_STATE_CONNECT, /* Must connect to master */ - REPL_STATE_CONNECTING, /* Connecting to master */ +typedef enum +{ + REPL_STATE_NONE = 0, /* No active replication */ + REPL_STATE_CONNECT, /* Must connect to master */ + REPL_STATE_CONNECTING, /* Connecting to master */ /* --- Handshake states, must be ordered --- */ REPL_STATE_RECEIVE_PING_REPLY, /* Wait for PING reply */ REPL_STATE_SEND_HANDSHAKE, /* Send handshake sequence to master */ @@ -53,26 +69,38 @@ typedef enum { REPL_STATE_SEND_PSYNC, /* Send PSYNC */ REPL_STATE_RECEIVE_PSYNC_REPLY, /* Wait for PSYNC reply */ /* --- End of handshake states --- */ - REPL_STATE_TRANSFER, /* Receiving .rdb from master */ - REPL_STATE_CONNECTED, /* Connected to master */ - STATE_CONNECT_ERROR, - STATE_DISCONNECT + REPL_STATE_TRANSFER, /* Receiving .rdb from master */ + REPL_STATE_CONNECTED, /* Connected to master */ } repl_state; long long ustime(void); +mstime_t mstime(void); + migrateObj *createMigrateObject(robj *host, int port, int begin_slot, int end_slot); void freeMigrateObj(migrateObj *m); -int sendSyncCommand(RedisModuleCtx *ctx); +int sendSyncCommand(); -void *syncWithRedis(void *arg); +int receiveDataFromRedis(); -int connectRedis(RedisModuleCtx *ctx) ; +ssize_t syncWrite(int fd, char *ptr, ssize_t size, long long timeout); + +ssize_t syncRead(int fd, char *ptr, ssize_t size, long long timeout); + +ssize_t syncReadLine(int fd, char *ptr, ssize_t size, long long timeout); + +sds redisReceive(); + +void readFullData(); + +void cancelMigrate(); + +void syncDataWithRedis(int fd, void *user_data, int mask); int rm_migrateCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc); int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc); -#endif //REDIS_MIGRATE_REDIS_MIGRATE_H +#endif // REDIS_MIGRATE_REDIS_MIGRATE_H diff --git a/src/redismodule.h b/src/redismodule.h index cd389dd..98edf24 100644 --- a/src/redismodule.h +++ b/src/redismodule.h @@ -220,7 +220,7 @@ This flag should not be used directly by the module. /* Logging level strings */ #define REDISMODULE_LOGLEVEL_DEBUG "debug" -#define REDISMODULE_LOGLEVEL_VERBOSE "verbose" +#define REDISMODULE_LOGLEVEL_VERBOSE "VERBOSE" #define REDISMODULE_LOGLEVEL_NOTICE "notice" #define REDISMODULE_LOGLEVEL_WARNING "warning" diff --git a/src/syncio.c b/src/syncio.c new file mode 100644 index 0000000..129432d --- /dev/null +++ b/src/syncio.c @@ -0,0 +1,117 @@ + +#include "redis-migrate.h" +#include "ae.h" +#include +#include + +/* ----------------- Blocking sockets I/O with timeouts --------------------- */ + +#define SYNCIO__RESOLUTION 10 + +ssize_t syncWrite(int fd, char *ptr, ssize_t size, long long timeout) +{ + ssize_t nwritten, ret = size; + long long start = mstime(); + long long remaining = timeout; + + while (1) + { + long long wait = (remaining > SYNCIO__RESOLUTION) ? remaining : SYNCIO__RESOLUTION; + long long elapsed; + + nwritten = write(fd, ptr, size); + if (nwritten == -1) + { + if (errno != EAGAIN) + return -1; + } + else + { + ptr += nwritten; + size -= nwritten; + } + if (size == 0) + return ret; + + /* Wait */ + aeWait(fd, AE_WRITABLE, wait); + elapsed = mstime() - start; + if (elapsed >= timeout) + { + errno = ETIMEDOUT; + return -1; + } + remaining = timeout - elapsed; + } +} + +ssize_t syncRead(int fd, char *ptr, ssize_t size, long long timeout) +{ + ssize_t nread, totread = 0; + long long start = mstime(); + long long remaining = timeout; + + if (size == 0) + return 0; + while (1) + { + long long wait = (remaining > SYNCIO__RESOLUTION) ? remaining : SYNCIO__RESOLUTION; + long long elapsed; + + nread = read(fd, ptr, size); + if (nread == 0) + return -1; + if (nread == -1) + { + if (errno != EAGAIN) + return -1; + } + else + { + ptr += nread; + size -= nread; + totread += nread; + } + if (size == 0) + return totread; + + /* Wait */ + aeWait(fd, AE_READABLE, wait); + elapsed = mstime() - start; + if (elapsed >= timeout) + { + errno = ETIMEDOUT; + return -1; + } + remaining = timeout - elapsed; + } +} + +ssize_t syncReadLine(int fd, char *ptr, ssize_t size, long long timeout) +{ + ssize_t nread = 0; + + size--; + while (size) + { + char c; + + if (syncRead(fd, &c, 1, timeout) == -1) + return -1; + if (c == '\n') + { + *ptr = '\0'; + if (nread && *(ptr - 1) == '\r') + *(ptr - 1) = '\0'; + return nread; + } + else + { + *ptr++ = c; + *ptr = '\0'; + nread++; + } + size--; + } + return nread; +}