This commit is contained in:
LingZhaoHui 2022-06-20 22:48:05 +08:00
parent 1c2425d7ea
commit 947938c994
16 changed files with 818 additions and 750 deletions

View File

@ -37,9 +37,12 @@ alloc.xo: fmacros.h alloc.h
dict.xo: fmacros.h alloc.h dict.h
ae.xo: redismodule.h
hiredis.xo:fmacros.h hiredis.h net.h sds.h alloc.h async.h
syncio.xo:redis-migrate.c
log.xo: log.h
rdbLoad.xo: rdbLoad.h
redis-migrate.xo: redismodule.h
XO_LIBS=redis-migrate.xo hiredis.xo sds.xo ssl.xo read.xo alloc.xo dict.xo async.xo net.xo ae.xo
XO_LIBS=redis-migrate.xo hiredis.xo sds.xo ssl.xo read.xo alloc.xo dict.xo async.xo net.xo ae.xo syncio.xo log.xo rdbLoad.xo
redis-migrate.so: $(XO_LIBS)
$(LD) -o $@ $^ $(SHOBJ_LDFLAGS) $(LIBS) -lc

509
src/ae.c
View File

@ -1,502 +1,25 @@
#include <errno.h>
#include <sys/types.h>
#include <sys/epoll.h>
#include <fcntl.h>
#include "ae.h"
#include <string.h>
#include <poll.h>
int aeWait(int fd, int mask, long long milliseconds) {
struct pollfd pfd;
int retmask = 0, retval;
void redisAeReadEvent(aeEventLoop *el, int fd, void *privdata, int mask) {
((void)el); ((void)fd); ((void)mask);
memset(&pfd, 0, sizeof(pfd));
pfd.fd = fd;
if (mask & AE_READABLE) pfd.events |= POLLIN;
if (mask & AE_WRITABLE) pfd.events |= POLLOUT;
redisAeEvents *e = (redisAeEvents*)privdata;
redisAsyncHandleRead(e->context);
}
void aeStop(aeEventLoop *eventLoop) {
eventLoop->stop = 1;
}
void aeMain(aeEventLoop *eventLoop) {
eventLoop->stop = 0;
while (!eventLoop->stop) {
aeProcessEvents(eventLoop, AE_ALL_EVENTS|
AE_CALL_BEFORE_SLEEP|
AE_CALL_AFTER_SLEEP);
}
}
monotime (*getMonotonicUs)(void) = NULL;
/* Process time events */
static int processTimeEvents(aeEventLoop *eventLoop) {
int processed = 0;
aeTimeEvent *te;
long long maxId;
te = eventLoop->timeEventHead;
maxId = eventLoop->timeEventNextId-1;
monotime now = getMonotonicUs();
while(te) {
long long id;
/* Remove events scheduled for deletion. */
if (te->id == AE_DELETED_EVENT_ID) {
aeTimeEvent *next = te->next;
/* If a reference exists for this timer event,
* don't free it. This is currently incremented
* for recursive timerProc calls */
if (te->refcount) {
te = next;
continue;
}
if (te->prev)
te->prev->next = te->next;
else
eventLoop->timeEventHead = te->next;
if (te->next)
te->next->prev = te->prev;
if (te->finalizerProc) {
te->finalizerProc(eventLoop, te->clientData);
now = getMonotonicUs();
}
hi_free(te);
te = next;
continue;
}
/* Make sure we don't process time events created by time events in
* this iteration. Note that this check is currently useless: we always
* add new timers on the head, however if we change the implementation
* detail, this check may be useful again: we keep it here for future
* defense. */
if (te->id > maxId) {
te = te->next;
continue;
}
if (te->when <= now) {
int retval;
id = te->id;
te->refcount++;
retval = te->timeProc(eventLoop, id, te->clientData);
te->refcount--;
processed++;
now = getMonotonicUs();
if (retval != AE_NOMORE) {
te->when = now + retval * 1000;
} else {
te->id = AE_DELETED_EVENT_ID;
}
}
te = te->next;
}
return processed;
}
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
aeApiState *state = eventLoop->apidata;
int retval, numevents = 0;
retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,
tvp ? (tvp->tv_sec*1000 + (tvp->tv_usec + 999)/1000) : -1);
if (retval > 0) {
int j;
numevents = retval;
for (j = 0; j < numevents; j++) {
int mask = 0;
struct epoll_event *e = state->events+j;
if (e->events & EPOLLIN) mask |= AE_READABLE;
if (e->events & EPOLLOUT) mask |= AE_WRITABLE;
if (e->events & EPOLLERR) mask |= AE_WRITABLE|AE_READABLE;
if (e->events & EPOLLHUP) mask |= AE_WRITABLE|AE_READABLE;
eventLoop->fired[j].fd = e->data.fd;
eventLoop->fired[j].mask = mask;
}
} else if (retval == -1 && errno != EINTR) {
//panic("aeApiPoll: epoll_wait, %s", strerror(errno));
}
return numevents;
}
static int64_t usUntilEarliestTimer(aeEventLoop *eventLoop) {
aeTimeEvent *te = eventLoop->timeEventHead;
if (te == NULL) return -1;
aeTimeEvent *earliest = NULL;
while (te) {
if (!earliest || te->when < earliest->when)
earliest = te;
te = te->next;
}
monotime now = getMonotonicUs();
return (now >= earliest->when) ? 0 : earliest->when - now;
}
/* Process every pending time event, then every pending file event
* (that may be registered by time event callbacks just processed).
* Without special flags the function sleeps until some file event
* fires, or when the next time event occurs (if any).
*
* If flags is 0, the function does nothing and returns.
* if flags has AE_ALL_EVENTS set, all the kind of events are processed.
* if flags has AE_FILE_EVENTS set, file events are processed.
* if flags has AE_TIME_EVENTS set, time events are processed.
* if flags has AE_DONT_WAIT set, the function returns ASAP once all
* the events that can be handled without a wait are processed.
* if flags has AE_CALL_AFTER_SLEEP set, the aftersleep callback is called.
* if flags has AE_CALL_BEFORE_SLEEP set, the beforesleep callback is called.
*
* The function returns the number of events processed. */
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
int processed = 0, numevents;
/* Nothing to do? return ASAP */
if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;
/* Note that we want to call select() even if there are no
* file events to process as long as we want to process time
* events, in order to sleep until the next time event is ready
* to fire. */
if (eventLoop->maxfd != -1 ||
((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
int j;
struct timeval tv, *tvp;
int64_t usUntilTimer = -1;
if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))
usUntilTimer = usUntilEarliestTimer(eventLoop);
if (usUntilTimer >= 0) {
tv.tv_sec = usUntilTimer / 1000000;
tv.tv_usec = usUntilTimer % 1000000;
tvp = &tv;
} else {
/* If we have to check for events but need to return
* ASAP because of AE_DONT_WAIT we need to set the timeout
* to zero */
if (flags & AE_DONT_WAIT) {
tv.tv_sec = tv.tv_usec = 0;
tvp = &tv;
} else {
/* Otherwise we can block */
tvp = NULL; /* wait forever */
}
}
if (eventLoop->flags & AE_DONT_WAIT) {
tv.tv_sec = tv.tv_usec = 0;
tvp = &tv;
}
if (eventLoop->beforesleep != NULL && flags & AE_CALL_BEFORE_SLEEP)
eventLoop->beforesleep(eventLoop);
/* Call the multiplexing API, will return only on timeout or when
* some event fires. */
numevents = aeApiPoll(eventLoop, tvp);
/* After sleep callback. */
if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP)
eventLoop->aftersleep(eventLoop);
for (j = 0; j < numevents; j++) {
int fd = eventLoop->fired[j].fd;
aeFileEvent *fe = &eventLoop->events[fd];
int mask = eventLoop->fired[j].mask;
int fired = 0; /* Number of events fired for current fd. */
/* Normally we execute the readable event first, and the writable
* event later. This is useful as sometimes we may be able
* to serve the reply of a query immediately after processing the
* query.
*
* However if AE_BARRIER is set in the mask, our application is
* asking us to do the reverse: never fire the writable event
* after the readable. In such a case, we invert the calls.
* This is useful when, for instance, we want to do things
* in the beforeSleep() hook, like fsyncing a file to disk,
* before replying to a client. */
int invert = fe->mask & AE_BARRIER;
/* Note the "fe->mask & mask & ..." code: maybe an already
* processed event removed an element that fired and we still
* didn't processed, so we check if the event is still valid.
*
* Fire the readable event if the call sequence is not
* inverted. */
if (!invert && fe->mask & mask & AE_READABLE) {
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
fired++;
fe = &eventLoop->events[fd]; /* Refresh in case of resize. */
}
/* Fire the writable event. */
if (fe->mask & mask & AE_WRITABLE) {
if (!fired || fe->wfileProc != fe->rfileProc) {
fe->wfileProc(eventLoop,fd,fe->clientData,mask);
fired++;
}
}
/* If we have to invert the call, fire the readable event now
* after the writable one. */
if (invert) {
fe = &eventLoop->events[fd]; /* Refresh in case of resize. */
if ((fe->mask & mask & AE_READABLE) &&
(!fired || fe->wfileProc != fe->rfileProc))
{
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
fired++;
}
}
processed++;
}
}
/* Check time events */
if (flags & AE_TIME_EVENTS)
processed += processTimeEvents(eventLoop);
return processed; /* return the number of processed file/time events */
}
void redisAeWriteEvent(aeEventLoop *el, int fd, void *privdata, int mask) {
((void)el); ((void)fd); ((void)mask);
redisAeEvents *e = (redisAeEvents*)privdata;
redisAsyncHandleWrite(e->context);
}
int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
aeApiState *state = eventLoop->apidata;
struct epoll_event ee = {0}; /* avoid valgrind warning */
/* If the fd was already monitored for some event, we need a MOD
* operation. Otherwise we need an ADD operation. */
int op = eventLoop->events[fd].mask == AE_NONE ?
EPOLL_CTL_ADD : EPOLL_CTL_MOD;
ee.events = 0;
mask |= eventLoop->events[fd].mask; /* Merge old events */
if (mask & AE_READABLE) ee.events |= EPOLLIN;
if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;
ee.data.fd = fd;
if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1;
return 0;
}
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
aeFileProc *proc, void *clientData)
{
if (fd >= eventLoop->setsize) {
errno = ERANGE;
return AE_ERR;
}
aeFileEvent *fe = &eventLoop->events[fd];
if (aeApiAddEvent(eventLoop, fd, mask) == -1)
return AE_ERR;
fe->mask |= mask;
if (mask & AE_READABLE) fe->rfileProc = proc;
if (mask & AE_WRITABLE) fe->wfileProc = proc;
fe->clientData = clientData;
if (fd > eventLoop->maxfd)
eventLoop->maxfd = fd;
return AE_OK;
}
void redisAeAddRead(void *privdata) {
redisAeEvents *e = (redisAeEvents*)privdata;
aeEventLoop *loop = e->loop;
if (!e->reading) {
e->reading = 1;
aeCreateFileEvent(loop,e->fd,AE_READABLE,redisAeReadEvent,e);
}
}
void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int delmask) {
aeApiState *state = eventLoop->apidata;
struct epoll_event ee = {0}; /* avoid valgrind warning */
int mask = eventLoop->events[fd].mask & (~delmask);
ee.events = 0;
if (mask & AE_READABLE) ee.events |= EPOLLIN;
if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;
ee.data.fd = fd;
if (mask != AE_NONE) {
epoll_ctl(state->epfd,EPOLL_CTL_MOD,fd,&ee);
if ((retval = poll(&pfd, 1, milliseconds))== 1) {
if (pfd.revents & POLLIN) retmask |= AE_READABLE;
if (pfd.revents & POLLOUT) retmask |= AE_WRITABLE;
if (pfd.revents & POLLERR) retmask |= AE_WRITABLE;
if (pfd.revents & POLLHUP) retmask |= AE_WRITABLE;
return retmask;
} else {
/* Note, Kernel < 2.6.9 requires a non null event pointer even for
* EPOLL_CTL_DEL. */
epoll_ctl(state->epfd,EPOLL_CTL_DEL,fd,&ee);
return retval;
}
}
void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask)
{
if (fd >= eventLoop->setsize) return;
aeFileEvent *fe = &eventLoop->events[fd];
if (fe->mask == AE_NONE) return;
/* We want to always remove AE_BARRIER if set when AE_WRITABLE
* is removed. */
if (mask & AE_WRITABLE) mask |= AE_BARRIER;
aeApiDelEvent(eventLoop, fd, mask);
fe->mask = fe->mask & (~mask);
if (fd == eventLoop->maxfd && fe->mask == AE_NONE) {
/* Update the max fd */
int j;
for (j = eventLoop->maxfd-1; j >= 0; j--)
if (eventLoop->events[j].mask != AE_NONE) break;
eventLoop->maxfd = j;
}
}
void redisAeDelRead(void *privdata) {
redisAeEvents *e = (redisAeEvents*)privdata;
aeEventLoop *loop = e->loop;
if (e->reading) {
e->reading = 0;
aeDeleteFileEvent(loop,e->fd,AE_READABLE);
}
}
void redisAeAddWrite(void *privdata) {
redisAeEvents *e = (redisAeEvents*)privdata;
aeEventLoop *loop = e->loop;
if (!e->writing) {
e->writing = 1;
aeCreateFileEvent(loop,e->fd,AE_WRITABLE,redisAeWriteEvent,e);
}
}
void redisAeDelWrite(void *privdata) {
redisAeEvents *e = (redisAeEvents*)privdata;
aeEventLoop *loop = e->loop;
if (e->writing) {
e->writing = 0;
aeDeleteFileEvent(loop,e->fd,AE_WRITABLE);
}
}
void redisAeCleanup(void *privdata) {
redisAeEvents *e = (redisAeEvents*)privdata;
redisAeDelRead(privdata);
redisAeDelWrite(privdata);
hi_free(e);
}
/* Enable the FD_CLOEXEC on the given fd to avoid fd leaks.
* This function should be invoked for fd's on specific places
* where fork + execve system calls are called. */
int anetCloexec(int fd) {
int r;
int flags;
do {
r = fcntl(fd, F_GETFD);
} while (r == -1 && errno == EINTR);
if (r == -1 || (r & FD_CLOEXEC))
return r;
flags = r | FD_CLOEXEC;
do {
r = fcntl(fd, F_SETFD, flags);
} while (r == -1 && errno == EINTR);
return r;
}
int aeApiCreate(aeEventLoop *eventLoop) {
aeApiState *state = hi_malloc(sizeof(aeApiState));
if (!state) return -1;
state->events = hi_malloc(sizeof(struct epoll_event)*eventLoop->setsize);
if (!state->events) {
hi_free(state);
return -1;
}
state->epfd = epoll_create(1024); /* 1024 is just a hint for the kernel */
if (state->epfd == -1) {
hi_free(state->events);
hi_free(state);
return -1;
}
anetCloexec(state->epfd);
eventLoop->apidata = state;
return 0;
}
aeEventLoop *aeCreateEventLoop(int setsize) {
aeEventLoop *eventLoop;
int i;
if ((eventLoop = hi_malloc(sizeof(*eventLoop))) == NULL) goto err;
eventLoop->events = hi_malloc(sizeof(aeFileEvent)*setsize);
eventLoop->fired = hi_malloc(sizeof(aeFiredEvent)*setsize);
if (eventLoop->events == NULL || eventLoop->fired == NULL) goto err;
eventLoop->setsize = setsize;
eventLoop->timeEventHead = NULL;
eventLoop->timeEventNextId = 0;
eventLoop->stop = 0;
eventLoop->maxfd = -1;
eventLoop->beforesleep = NULL;
eventLoop->aftersleep = NULL;
eventLoop->flags = 0;
if (aeApiCreate(eventLoop) == -1) goto err;
/* Events with mask == AE_NONE are not set. So let's initialize the
* vector with it. */
for (i = 0; i < setsize; i++)
eventLoop->events[i].mask = AE_NONE;
return eventLoop;
err:
if (eventLoop) {
hi_free(eventLoop->events);
hi_free(eventLoop->fired);
hi_free(eventLoop);
}
return NULL;
}
int redisAeAttach(aeEventLoop *loop, redisAsyncContext *ac) {
redisContext *c = &(ac->c);
redisAeEvents *e;
/* Nothing should be attached when something is already attached */
if (ac->ev.data != NULL)
return REDIS_ERR;
/* Create container for context and r/w events */
e = (redisAeEvents *) hi_malloc(sizeof(*e));
if (e == NULL)
return REDIS_ERR;
e->context = ac;
e->loop = loop;
e->fd = c->fd;
e->reading = e->writing = 0;
/* Register functions to start/stop listening for events */
ac->ev.addRead = redisAeAddRead;
ac->ev.delRead = redisAeDelRead;
ac->ev.addWrite = redisAeAddWrite;
ac->ev.delWrite = redisAeDelWrite;
ac->ev.cleanup = redisAeCleanup;
ac->ev.data = e;
return REDIS_OK;
}

122
src/ae.h
View File

@ -1,133 +1,13 @@
#ifndef __HIREDIS_AE_H__
#define __HIREDIS_AE_H__
#include "hiredis.h"
#include "async.h"
typedef uint64_t monotime;
#define AE_OK 0
#define AE_ERR -1
#define AE_NONE 0 /* No events registered. */
#define AE_READABLE 1 /* Fire when descriptor is readable. */
#define AE_WRITABLE 2 /* Fire when descriptor is writable. */
#define AE_BARRIER 4 /* With WRITABLE, never fire the event if the
READABLE event already fired in the same event
loop iteration. Useful when you want to persist
things to disk before sending replies, and want
to do that in a group fashion. */
/* Types and data structures */
typedef void aeFileProc(struct aeEventLoop *eventLoop, int fd, void *clientData, int mask);
typedef int aeTimeProc(struct aeEventLoop *eventLoop, long long id, void *clientData);
typedef void aeEventFinalizerProc(struct aeEventLoop *eventLoop, void *clientData);
typedef void aeBeforeSleepProc(struct aeEventLoop *eventLoop);
/* File event structure */
typedef struct aeFileEvent {
int mask; /* one of AE_(READABLE|WRITABLE|BARRIER) */
aeFileProc *rfileProc;
aeFileProc *wfileProc;
void *clientData;
} aeFileEvent;
/* Time event structure */
typedef struct aeTimeEvent {
long long id; /* time event identifier. */
monotime when;
aeTimeProc *timeProc;
aeEventFinalizerProc *finalizerProc;
void *clientData;
struct aeTimeEvent *prev;
struct aeTimeEvent *next;
int refcount; /* refcount to prevent timer events from being
* freed in recursive time event calls. */
} aeTimeEvent;
/* A fired event */
typedef struct aeFiredEvent {
int fd;
int mask;
} aeFiredEvent;
typedef struct aeEventLoop {
int maxfd; /* highest file descriptor currently registered */
int setsize; /* max number of file descriptors tracked */
long long timeEventNextId;
aeFileEvent *events; /* Registered events */
aeFiredEvent *fired; /* Fired events */
aeTimeEvent *timeEventHead;
int stop;
void *apidata; /* This is used for polling API specific data */
aeBeforeSleepProc *beforesleep;
aeBeforeSleepProc *aftersleep;
int flags;
} aeEventLoop;
typedef struct redisAeEvents {
redisAsyncContext *context;
aeEventLoop *loop;
int fd;
int reading, writing;
} redisAeEvents;
typedef struct aeApiState {
int epfd;
struct epoll_event *events;
} aeApiState;
#define AE_FILE_EVENTS (1<<0)
#define AE_TIME_EVENTS (1<<1)
#define AE_ALL_EVENTS (AE_FILE_EVENTS|AE_TIME_EVENTS)
#define AE_DONT_WAIT (1<<2)
#define AE_CALL_BEFORE_SLEEP (1<<3)
#define AE_CALL_AFTER_SLEEP (1<<4)
#define AE_NOMORE -1
#define AE_DELETED_EVENT_ID -1
extern monotime (*getMonotonicUs)(void);
void redisAeReadEvent(aeEventLoop *el, int fd, void *privdata, int mask);
void redisAeWriteEvent(aeEventLoop *el, int fd, void *privdata, int mask);
int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask);
void aeStop(aeEventLoop *eventLoop);
void aeMain(aeEventLoop *eventLoop);
int aeProcessEvents(aeEventLoop *eventLoop, int flags);
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
aeFileProc *proc, void *clientData);
void redisAeAddRead(void *privdata);
void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int delmask);
void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask);
void redisAeDelRead(void *privdata);
void redisAeAddWrite(void *privdata);
void redisAeDelWrite(void *privdata);
void redisAeCleanup(void *privdata);
int anetCloexec(int fd);
static int aeApiCreate(aeEventLoop *eventLoop);
aeEventLoop *aeCreateEventLoop(int setsize);
int redisAeAttach(aeEventLoop *loop, redisAsyncContext *ac);
int aeWait(int fd, int mask, long long milliseconds);
#endif

View File

@ -42,7 +42,8 @@ hiredisAllocFuncs hiredisAllocFns = {
};
/* Override hiredis' allocators with ones supplied by the user */
hiredisAllocFuncs hiredisSetAllocators(hiredisAllocFuncs *override) {
hiredisAllocFuncs hiredisSetAllocators(hiredisAllocFuncs *override)
{
hiredisAllocFuncs orig = hiredisAllocFns;
hiredisAllocFns = *override;
@ -51,8 +52,9 @@ hiredisAllocFuncs hiredisSetAllocators(hiredisAllocFuncs *override) {
}
/* Reset allocators to use libc defaults */
void hiredisResetAllocators(void) {
hiredisAllocFns = (hiredisAllocFuncs) {
void hiredisResetAllocators(void)
{
hiredisAllocFns = (hiredisAllocFuncs){
.mallocFn = malloc,
.callocFn = calloc,
.reallocFn = realloc,
@ -63,11 +65,13 @@ void hiredisResetAllocators(void) {
#ifdef _WIN32
void *hi_malloc(size_t size) {
void *hi_malloc(size_t size)
{
return hiredisAllocFns.mallocFn(size);
}
void *hi_calloc(size_t nmemb, size_t size) {
void *hi_calloc(size_t nmemb, size_t size)
{
/* Overflow check as the user can specify any arbitrary allocator */
if (SIZE_MAX / size < nmemb)
return NULL;
@ -75,15 +79,18 @@ void *hi_calloc(size_t nmemb, size_t size) {
return hiredisAllocFns.callocFn(nmemb, size);
}
void *hi_realloc(void *ptr, size_t size) {
void *hi_realloc(void *ptr, size_t size)
{
return hiredisAllocFns.reallocFn(ptr, size);
}
char *hi_strdup(const char *str) {
char *hi_strdup(const char *str)
{
return hiredisAllocFns.strdupFn(str);
}
void hi_free(void *ptr) {
void hi_free(void *ptr)
{
hiredisAllocFns.freeFn(ptr);
}

View File

@ -812,7 +812,7 @@ static int __redisAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void
if (!(c->flags & REDIS_SUBSCRIBED)) return REDIS_ERR;
/* (P)UNSUBSCRIBE does not have its own response: every channel or
* pattern that is unsubscribed will receive a message. This means we
* pattern that is unsubscribed will redisReceive a message. This means we
* should not append a callback function for this command. */
} else if (strncasecmp(cstr,"monitor\r\n",9) == 0) {
/* Set monitor flag and push callback */

View File

@ -33,9 +33,7 @@
* POSSIBILITY OF SUCH DAMAGE.
*/
#include "fmacros.h"
#include "alloc.h"
#include <stdlib.h>
#include <assert.h>
#include <limits.h>
#include "dict.h"

View File

@ -1139,6 +1139,16 @@ int redisAppendCommand(redisContext *c, const char *format, ...) {
return ret;
}
int redisSendCommand(redisContext *c, const char *format, ...) {
va_list ap;
int ret;
va_start(ap,format);
ret = redisvAppendCommand(c,format,ap);
va_end(ap);
return redisFlush(c);
}
int redisAppendCommandArgv(redisContext *c, int argc, const char **argv, const size_t *argvlen) {
hisds cmd;
long long len;

View File

@ -331,6 +331,7 @@ int redisAppendFormattedCommand(redisContext *c, const char *cmd, size_t len);
* to get a pipeline of commands. */
int redisvAppendCommand(redisContext *c, const char *format, va_list ap);
int redisAppendCommand(redisContext *c, const char *format, ...);
int redisSendCommand(redisContext *c, const char *format, ...);
int redisAppendCommandArgv(redisContext *c, int argc, const char **argv, const size_t *argvlen);
/* Issue a command to Redis. In a blocking context, it is identical to calling

111
src/log.c Normal file
View File

@ -0,0 +1,111 @@
#include <bits/types/FILE.h>
#include <stdio.h>
#include <fcntl.h>
#include <unistd.h>
#include <sys/time.h>
#include <time.h>
#include "log.h"
#include "hiredis.h"
static int is_leap_year(time_t year) {
if (year % 4) return 0; /* A year not divisible by 4 is not leap. */
else if (year % 100) return 1; /* If div by 4 and not 100 is surely leap. */
else if (year % 400) return 0; /* If div by 100 *and* not by 400 is not leap. */
else return 1; /* If div by 100 and 400 is leap. */
}
void createLogObj(char *logfile) {
log = hi_malloc(sizeof(logObj));
log->logfile = logfile;
log->loglevel = LL_NOTICE;
}
void _serverLog(int level, const char *fmt, ...) {
va_list ap;
char msg[LOG_MAX_LEN];
va_start(ap, fmt);
vsnprintf(msg, sizeof(msg), fmt, ap);
va_end(ap);
}
void serverLogRaw(int level, const char *msg) {
const char *c = ".-*#";
FILE *fp;
char buf[64];
int log_to_stdout = log->logfile[0] == '\0';
if (level < log->loglevel) {
return;
}
fp = log_to_stdout ? stdout : fopen(log->logfile, "a");
if (!fp) return;
int off;
struct timeval tv;
int role_char;
pid_t pid = getpid();
gettimeofday(&tv, NULL);
struct tm tm;
nolocks_localtime(&tm, tv.tv_sec, timezone, 1);
off = strftime(buf, sizeof(buf), "%d %b %Y %H:%M:%S.", &tm);
snprintf(buf + off, sizeof(buf) - off, "%03d", (int) tv.tv_usec / 1000);
role_char = 'm';
fprintf(fp, "%d:%c %s %c %s\n",
(int) pid, role_char, buf, c[level], msg);
fflush(fp);
if (!log_to_stdout) fclose(fp);
}
void nolocks_localtime(struct tm *tmp, time_t t, time_t tz, int dst) {
const time_t secs_min = 60;
const time_t secs_hour = 3600;
const time_t secs_day = 3600*24;
t -= tz; /* Adjust for timezone. */
t += 3600*dst; /* Adjust for daylight time. */
time_t days = t / secs_day; /* Days passed since epoch. */
time_t seconds = t % secs_day; /* Remaining seconds. */
tmp->tm_isdst = dst;
tmp->tm_hour = seconds / secs_hour;
tmp->tm_min = (seconds % secs_hour) / secs_min;
tmp->tm_sec = (seconds % secs_hour) % secs_min;
/* 1/1/1970 was a Thursday, that is, day 4 from the POV of the tm structure
* where sunday = 0, so to calculate the day of the week we have to add 4
* and take the modulo by 7. */
tmp->tm_wday = (days+4)%7;
/* Calculate the current year. */
tmp->tm_year = 1970;
while(1) {
/* Leap years have one day more. */
time_t days_this_year = 365 + is_leap_year(tmp->tm_year);
if (days_this_year > days) break;
days -= days_this_year;
tmp->tm_year++;
}
tmp->tm_yday = days; /* Number of day of the current year. */
/* We need to calculate in which month and day of the month we are. To do
* so we need to skip days according to how many days there are in each
* month, and adjust for the leap year that has one more day in February. */
int mdays[12] = {31, 28, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31};
mdays[1] += is_leap_year(tmp->tm_year);
tmp->tm_mon = 0;
while(days >= mdays[tmp->tm_mon]) {
days -= mdays[tmp->tm_mon];
tmp->tm_mon++;
}
tmp->tm_mday = days+1; /* Add 1 since our 'days' is zero-based. */
tmp->tm_year -= 1900; /* Surprisingly tm_year is year-1900. */
}

34
src/log.h Normal file
View File

@ -0,0 +1,34 @@
#ifndef REDIS_MIGRATE_LOG_H
#define REDIS_MIGRATE_LOG_H
#define NOTICE "notice"
#define WARNING "warning"
#define VERBOSE "VERBOSE"
#define DEBUG "debug"
#define LL_DEBUG 0
#define LL_VERBOSE 1
#define LL_NOTICE 2
#define LL_WARNING 3
#define LOG_MAX_LEN 1024
typedef struct logObj {
char *logfile;
int loglevel;
} logObj;
static logObj *log;
void createLogObj(char *logfile);
#define serverLog(level, ...) _serverLog(level, __VA_ARGS__);
void _serverLog(int level, const char *fmt, ...)
__attribute__((format(printf, 2, 3)));
void serverLogRaw(int level, const char *msg);
void nolocks_localtime(struct tm *tmp, time_t t, time_t tz, int dst);
#endif //REDIS_MIGRATE_LOG_H

53
src/rdbLoad.c Normal file
View File

@ -0,0 +1,53 @@
#include "rdbLoad.h"
#include <errno.h>
int rdbLoadRioWithLoading(migrateObj *mobj)
{
char buf[1024];
int error;
if (syncReadLine(mobj->source_cc->fd, buf, 9, mobj->timeout) == -1)
{
serverLog(LL_WARNING, "read version failed:%s,port:%d ", mobj->host, mobj->port);
return C_ERR;
}
buf[9] = '\0';
if (memcmp(buf, "REDIS", 5) != 0)
{
serverLog(LL_WARNING, "Wrong signature trying to load DB from file");
errno = EINVAL;
return C_ERR;
}
int type, rdbver;
rdbver = atoi(buf + 5);
if (rdbver < 1 || rdbver > RDB_VERSION)
{
serverLog(LL_WARNING, "Can't handle RDB format version %d", rdbver);
errno = EINVAL;
return C_ERR;
}
serverLog(LL_NOTICE, "buf=%s", buf);
long long lru_idle = -1, lfu_freq = -1, expiretime = -1, now = mstime();
while (1)
{
sds key;
robj *val;
if ((type = rdbLoadType(mobj)) == -1) {
serverLog(LL_WARNING, "read type failed");
return C_ERR;
}
if (type == RDB_OPCODE_EXPIRETIME) {
}
}
}
int rdbLoadType(migrateObj *mobj) {
char buf[1];
syncReadLine(mobj->source_cc->fd, buf, 1, mobj->timeout);
return buf[0];
}
time_t rdbLoadTime(migrateObj *mobj) {
}

25
src/rdbLoad.h Normal file
View File

@ -0,0 +1,25 @@
#ifndef RDB_LOAD_REDIS_MIGRATE_H
#define RDB_LOAD_REDIS_MIGRATE_H
#include "redis-migrate.h"
#define RDB_VERSION 10
#define RDB_OPCODE_FUNCTION2 245 /* function library data */
#define RDB_OPCODE_FUNCTION 246 /* old function library data for 7.0 rc1 and rc2 */
#define RDB_OPCODE_MODULE_AUX 247 /* Module auxiliary data. */
#define RDB_OPCODE_IDLE 248 /* LRU idle time. */
#define RDB_OPCODE_FREQ 249 /* LFU frequency. */
#define RDB_OPCODE_AUX 250 /* RDB aux field. */
#define RDB_OPCODE_RESIZEDB 251 /* Hash table resize hint. */
#define RDB_OPCODE_EXPIRETIME_MS 252 /* Expire time in milliseconds. */
#define RDB_OPCODE_EXPIRETIME 253 /* Old expire time in seconds. */
#define RDB_OPCODE_SELECTDB 254 /* DB number of the following keys. */
#define RDB_OPCODE_EOF 255 /* End of the RDB file. */
int rdbLoadRioWithLoading(migrateObj *mobj);
int rdbLoadType(migrateObj *mobj);
time_t rdbLoadTime(migrateObj *mobj);
#endif

View File

@ -2,13 +2,14 @@
#include <sys/time.h>
#include <unistd.h>
#include <string.h>
#include <pthread.h>
#include "redis-migrate.h"
#include "hiredis.h"
#include "rdbLoad.h"
static migrateObj *mobj;
migrateObj *createMigrateObject(robj *host, int port, int begin_slot, int end_slot) {
migrateObj *createMigrateObject(robj *host, int port, int begin_slot, int end_slot)
{
migrateObj *m;
m = hi_malloc(sizeof(*m));
m->host = host->ptr;
@ -17,162 +18,439 @@ migrateObj *createMigrateObject(robj *host, int port, int begin_slot, int end_sl
m->end_slot = end_slot;
m->repl_stat = REPL_STATE_NONE;
m->isCache = 0;
m->timeout = 10 * 1000;
m->repl_transfer_size = -1;
return m;
}
void freeMigrateObj(migrateObj *m) {
void freeMigrateObj(migrateObj *m)
{
redisFree(m->source_cc);
hi_free(m);
mobj = NULL;
}
int sendSyncCommand(RedisModuleCtx *ctx) {
long long ustime(void)
{
struct timeval tv;
long long ust;
if (!mobj->isCache) {
gettimeofday(&tv, NULL);
ust = ((long long)tv.tv_sec) * 1000000;
ust += tv.tv_usec;
return ust;
}
/* Return the UNIX time in milliseconds */
mstime_t mstime(void)
{
return ustime() / 1000;
}
int sendSyncCommand()
{
if (!mobj->isCache)
{
mobj->isCache = 1;
mobj->psync_replid = "?";
memcpy(mobj->psync_offset, "-1", 3);
}
if (redisAppendCommand(mobj->source_cc, "PSYNC %s %s", mobj->psync_replid, mobj->psync_offset) != REDIS_OK) {
RedisModule_Log(ctx, REDISMODULE_LOGLEVEL_WARNING,
"append PSYNC %s %s failed ip:%s,port:%d, ",
mobj->psync_replid, mobj->psync_offset, mobj->host, mobj->port);
if (redisAppendCommand(mobj->source_cc, "PSYNC %s %s", mobj->psync_replid, mobj->psync_offset) != REDIS_OK)
{
serverLog(LL_WARNING, "append PSYNC %s %s failed ip:%s,port:%d, ",
mobj->psync_replid, mobj->psync_offset, mobj->host, mobj->port);
return 0;
}
if (redisFlush(mobj->source_cc) != REDIS_OK) {
RedisModule_Log(ctx, REDISMODULE_LOGLEVEL_WARNING,
"send PSYNC %s %s failed ip:%s,port:%d, ",
mobj->psync_replid, mobj->psync_offset, mobj->host, mobj->port);
if (redisFlush(mobj->source_cc) != REDIS_OK)
{
serverLog(LL_WARNING, "send PSYNC %s %s failed ip:%s,port:%d, ",
mobj->psync_replid, mobj->psync_offset, mobj->host, mobj->port);
return 0;
}
RedisModule_Log(ctx, REDISMODULE_LOGLEVEL_NOTICE, "PSYNC %s %s ",
mobj->psync_replid, mobj->psync_offset);
serverLog(LL_NOTICE, "PSYNC %s %s ", mobj->psync_replid, mobj->psync_offset);
return 1;
}
void *syncWithRedis(void *arg) {
RedisModuleCtx *ctx = arg;
redisReply *reply;
RedisModule_Log(ctx, REDISMODULE_LOGLEVEL_NOTICE, "begin sync data");
reply = redisCommand(mobj->source_cc, "PING");
if (reply->type == REDIS_REPLY_ERROR) {
RedisModule_Log(ctx, REDISMODULE_LOGLEVEL_WARNING, "send PING failed ip:%s,port:%d, error:%s",
mobj->host, mobj->port, reply->str);
goto error;
sds redisReceive()
{
char buf[256];
if (syncReadLine(mobj->source_cc->fd, buf, sizeof(buf), mobj->timeout) == -1)
{
serverLog(LL_WARNING, "redisReceive failed ip:%s,port:%d ", mobj->host, mobj->port);
return NULL;
}
RedisModule_Log(ctx, REDISMODULE_LOGLEVEL_NOTICE, "PING ");
freeReplyObject(reply);
//todo auth with master
sds portstr = sdsfromlonglong(mobj->port);
reply = redisCommand(mobj->source_cc, "REPLCONF listening-port %s", portstr);
if (reply->type == REDIS_REPLY_ERROR) {
sdsfree(portstr);
RedisModule_Log(ctx, REDISMODULE_LOGLEVEL_WARNING, "REPLCONF listening-port %s failed ip:%s,port:%d, error:%s",
portstr, mobj->host, mobj->port, reply->str);
goto error;
}
RedisModule_Log(ctx, REDISMODULE_LOGLEVEL_NOTICE, "REPLCONF listening-port %s ", portstr);
sdsfree(portstr);
freeReplyObject(reply);
reply = redisCommand(mobj->source_cc, "REPLCONF ip-address %s", mobj->host);
if (reply->type == REDIS_REPLY_ERROR) {
RedisModule_Log(ctx, REDISMODULE_LOGLEVEL_WARNING, "REPLCONF ip-address %s failed ip:%s,port:%d, error:%s",
mobj->host, mobj->host, mobj->port, reply->str);
goto error;
}
RedisModule_Log(ctx, REDISMODULE_LOGLEVEL_NOTICE, "REPLCONF ip-address %s ", mobj->host);
freeReplyObject(reply);
reply = redisCommand(mobj->source_cc, "REPLCONF %s %s %s %s", "capa", "eof", "capa", "psync2");
if (reply->type == REDIS_REPLY_ERROR) {
RedisModule_Log(ctx, REDISMODULE_LOGLEVEL_WARNING,
"REPLCONF capa eof capa psync2 failed ip:%s,port:%d, error:%s",
mobj->host, mobj->port, reply->str);
goto error;
}
RedisModule_Log(ctx, REDISMODULE_LOGLEVEL_NOTICE, "REPLCONF %s %s %s %s",
"capa", "eof", "capa", "psync2");
freeReplyObject(reply);
if (!sendSyncCommand(ctx)) {
RedisModule_Log(ctx, REDISMODULE_LOGLEVEL_WARNING,
"send PSYNC %s %s failed ip:%s,port:%d, ",
mobj->psync_replid, mobj->psync_offset, mobj->host, mobj->port);
goto error;
}
return NULL;
error:
freeReplyObject(reply);
freeMigrateObj(mobj);
return NULL;
return hi_sdsnew(buf);
}
int connectRedis(RedisModuleCtx *ctx) {
pthread_t pthread;
int flag = pthread_create(&pthread, NULL, syncWithRedis, ctx);
if (flag != 0) {
RedisModule_Log(ctx, REDISMODULE_LOGLEVEL_WARNING, "Can't start thread");
freeMigrateObj(mobj);
return RedisModule_ReplyWithError(ctx, "Can't start thread");
int receiveDataFromRedis()
{
sds reply = redisReceive();
if (reply == NULL)
{
serverLog(LL_WARNING, "Master did not reply to PSYNC");
return PSYNC_TRY_LATER;
}
pthread_join(pthread, NULL);
return RedisModule_ReplyWithSimpleString(ctx, "OK");
if (sdslen(reply) == 0)
{
sdsfree(reply);
return PSYNC_WAIT_REPLY;
}
serverLog(LL_NOTICE, "reply=%s", reply);
if (!strncmp(reply, "+FULLRESYNC", 11))
{
char *replid = NULL, *offset = NULL;
/* FULL RESYNC, parse the reply in order to extract the replid
* and the replication offset. */
replid = strchr(reply, ' ');
if (replid)
{
replid++;
offset = strchr(replid, ' ');
if (offset)
offset++;
}
if (!replid || !offset || (offset - replid - 1) != CONFIG_RUN_ID_SIZE)
{
serverLog(LL_WARNING, "Master replied with wrong +FULLRESYNC syntax.");
/* This is an unexpected condition, actually the +FULLRESYNC
* reply means that the master supports PSYNC, but the reply
* format seems wrong. To stay safe we blank the master
* replid to make sure next PSYNCs will fail. */
memset(mobj->master_replid, 0, CONFIG_RUN_ID_SIZE + 1);
}
else
{
memcpy(mobj->master_replid, replid, offset - replid - 1);
mobj->master_replid[CONFIG_RUN_ID_SIZE] = '\0';
mobj->master_initial_offset = strtoll(offset, NULL, 10);
serverLog(LL_NOTICE, "Full sync from master: %s:%lld",
mobj->master_replid, mobj->master_initial_offset);
}
sdsfree(reply);
return PSYNC_FULLRESYNC;
}
if (!strncmp(reply, "+CONTINUE", 9))
{
/* Partial resync was accepted. */
serverLog(LL_NOTICE, "Successful partial resynchronization with master.");
/* Check the new replication ID advertised by the master. If it
* changed, we need to set the new ID as primary ID, and set
* secondary ID as the old master ID up to the current offset, so
* that our sub-slaves will be able to PSYNC with us after a
* disconnection. */
char *start = reply + 10;
char *end = reply + 9;
while (end[0] != '\r' && end[0] != '\n' && end[0] != '\0')
end++;
if (end - start == CONFIG_RUN_ID_SIZE)
{
char new[CONFIG_RUN_ID_SIZE + 1];
memcpy(new, start, CONFIG_RUN_ID_SIZE);
new[CONFIG_RUN_ID_SIZE] = '\0';
return PSYNC_CONTINUE;
}
}
}
void readFullData()
{
static char eofmark[CONFIG_RUN_ID_SIZE];
static char lastbytes[CONFIG_RUN_ID_SIZE];
static int usemark = 0;
char buf[PROTO_IOBUF_LEN];
if (mobj->repl_transfer_size == -1)
{
int nread = syncReadLine(mobj->source_cc->fd, buf, PROTO_IOBUF_LEN, mobj->timeout);
if (nread == -1)
{
serverLog(LL_WARNING, "read full data failed");
goto error;
}
if (buf[0] == '-')
{
serverLog(LL_WARNING,
"MASTER aborted replication with an error: %s",
buf + 1);
goto error;
}
else if (buf[0] == '\0')
{
// mobj->repl_transfer_lastio = server.unixtime;
return;
}
else if (buf[0] != '$')
{
serverLog(LL_WARNING, "Bad protocol from MASTER, the first byte is not '$' (we received '%s'), are you sure the host and port are right?", buf);
goto error;
}
if (strncmp(buf + 1, "EOF:", 4) == 0 && strlen(buf + 5) >= CONFIG_RUN_ID_SIZE)
{
usemark = 1;
usemark = 1;
memcpy(eofmark, buf + 5, CONFIG_RUN_ID_SIZE);
memset(lastbytes, 0, CONFIG_RUN_ID_SIZE);
/* Set any repl_transfer_size to avoid entering this code path
* at the next call. */
mobj->repl_transfer_size = 0;
serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: receiving streamed RDB from master with EOF to parser");
}
else
{
usemark = 0;
mobj->repl_transfer_size = strtol(buf + 1, NULL, 10);
serverLog(LL_NOTICE,
"MASTER <-> REPLICA sync: receiving %lld bytes from master to parser",
(long long)mobj->repl_transfer_size);
}
}
int flag = rdbLoadRioWithLoading(mobj);
return;
error:
cancelMigrate();
return;
}
void cancelMigrate()
{
}
void syncDataWithRedis(int fd, void *user_data, int mask)
{
REDISMODULE_NOT_USED(fd);
REDISMODULE_NOT_USED(mask);
REDISMODULE_NOT_USED(user_data);
sds err = NULL;
if (mobj->repl_stat == REPL_STATE_CONNECTING)
{
if (redisSendCommand(mobj->source_cc, "PING") != REDIS_OK)
{
serverLog(LL_WARNING, "send PING failed ip:%s,port:%d",
mobj->host, mobj->port);
goto error;
}
mobj->repl_stat = REPL_STATE_RECEIVE_PING_REPLY;
return;
}
if (mobj->repl_stat == REPL_STATE_RECEIVE_PING_REPLY)
{
err = redisReceive();
if (err == NULL)
goto no_response_error;
if (err[0] != '+' && strncmp(err, "-NOAUTH", 7) != 0 && strncmp(err, "-NOPERM", 7) != 0 &&
strncmp(err, "-ERR operation not permitted", 28) != 0)
{
serverLog(LL_WARNING, "Error reply to PING from master: '%s'", err);
sdsfree(err);
goto error;
}
else
{
serverLog(LL_NOTICE, "Master replied to PING, replication can continue...");
}
sdsfree(err);
err = NULL;
mobj->repl_stat = REPL_STATE_SEND_HANDSHAKE;
return;
}
if (mobj->repl_stat == REPL_STATE_SEND_HANDSHAKE)
{
// todo 增加认证
mobj->repl_stat = REPL_STATE_RECEIVE_AUTH_REPLY;
}
if (mobj->repl_stat == REPL_STATE_RECEIVE_AUTH_REPLY)
{
// todo 接受认证信息
sds portstr = sdsfromlonglong(mobj->port);
if (redisSendCommand(mobj->source_cc, "REPLCONF listening-port %s", portstr) != REDIS_OK)
{
serverLog(LL_WARNING, "send PING failed ip:%s,port:%d",
mobj->host, mobj->port);
goto error;
}
mobj->repl_stat = REPL_STATE_RECEIVE_PORT_REPLY;
sdsfree(portstr);
return;
}
if (mobj->repl_stat == REPL_STATE_RECEIVE_PORT_REPLY)
{
err = redisReceive();
if (err == NULL)
goto no_response_error;
if (err[0] == '-')
{
serverLog(LL_NOTICE, "(Non critical) Master does not understand REPLCONF listening-port: %s", err);
goto error;
}
serverLog(LL_NOTICE, "REPLCONF listening-port success");
if (redisSendCommand(mobj->source_cc, "REPLCONF ip-address %s", mobj->host) != REDIS_OK)
{
serverLog(LL_WARNING, "REPLCONF ip-address %s failed", mobj->host);
goto error;
}
sdsfree(err);
err = NULL;
mobj->repl_stat = REPL_STATE_RECEIVE_IP_REPLY;
return;
}
if (mobj->repl_stat == REPL_STATE_RECEIVE_IP_REPLY)
{
err = redisReceive();
if (err == NULL)
goto no_response_error;
if (err[0] == '-')
{
serverLog(LL_NOTICE, "(Non critical) Master does not understand REPLCONF ip-address: %s", err);
goto error;
}
serverLog(LL_NOTICE, "REPLCONF REPLCONF ip-address success");
if (redisSendCommand(mobj->source_cc, "REPLCONF %s %s %s %s", "capa", "eof", "capa", "psync2") != REDIS_OK)
{
serverLog(LL_WARNING, "send REPLCONF capa eof capa psync2 failed");
goto error;
}
sdsfree(err);
err = NULL;
mobj->repl_stat = REPL_STATE_RECEIVE_CAPA_REPLY;
return;
}
if (mobj->repl_stat == REPL_STATE_RECEIVE_CAPA_REPLY)
{
err = redisReceive();
if (err == NULL)
goto no_response_error;
if (err[0] == '-')
{
serverLog(LL_NOTICE, "(Non critical) Master does not understand REPLCONF capa: %s", err);
goto error;
}
serverLog(LL_NOTICE, "REPLCONF capa eof capa psync2 success");
sdsfree(err);
err = NULL;
mobj->repl_stat = REPL_STATE_SEND_PSYNC;
return;
}
if (mobj->repl_stat == REPL_STATE_SEND_PSYNC)
{
if (!sendSyncCommand())
{
serverLog(LL_WARNING, "send PSYNC %s %s failed ip:%s,port:%d, ",
mobj->psync_replid, mobj->psync_offset, mobj->host, mobj->port);
goto error;
}
mobj->repl_stat = REPL_STATE_RECEIVE_PSYNC_REPLY;
return;
}
if (mobj->repl_stat != REPL_STATE_RECEIVE_PSYNC_REPLY)
{
serverLog(LL_WARNING, "syncDataWithRedis(): state machine error, state should be RECEIVE_PSYNC but is %d",
mobj->repl_stat);
goto error;
}
int psync_result = receiveDataFromRedis();
if (psync_result == PSYNC_WAIT_REPLY)
return;
if (psync_result == PSYNC_TRY_LATER)
goto error;
if (psync_result == PSYNC_CONTINUE)
{
}
// 接受全部数据
serverLog(LL_NOTICE, "begin receive full data");
readFullData();
return;
error:
if (err != NULL)
{
sdsfree(err);
}
freeMigrateObj(mobj);
no_response_error: /* Handle receiveSynchronousResponse() error when master has no reply */
serverLog(LL_WARNING, "Master did not respond to command during SYNC handshake");
}
/**
* migrate data to current instance.
* migrate host port begin-slot end-slot
* @param ctx
* @param ctxz
* @param argv
* @param argc
* @return
*/
int rm_migrateCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
if (argc != 5) {
int rm_migrateCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
{
if (argc != 5)
{
return RedisModule_WrongArity(ctx);
}
robj *host = (robj *) argv[1];
robj *port = (robj *) argv[2];
robj *begin_slot = (robj *) argv[3];
robj *end_slot = (robj *) argv[4];
RedisModule_Log(ctx, REDISMODULE_LOGLEVEL_NOTICE, "host:%s, port:%s, begin:%s, end:%s", (char *) host->ptr,
(char *) port->ptr, (char *) begin_slot->ptr, (char *) end_slot->ptr);
if (mobj != NULL) {
return RedisModule_ReplyWithError(ctx, "-ERR is migrating, please waiting");
if (RedisModule_IsKeysPositionRequest(ctx))
{
RedisModule_Log(ctx, VERBOSE, "get keys from module");
return RedisModule_ReplyWithSimpleString(ctx, "OK");
}
robj *host = (robj *)argv[1];
robj *port = (robj *)argv[2];
robj *begin_slot = (robj *)argv[3];
robj *end_slot = (robj *)argv[4];
RedisModule_Log(ctx, NOTICE, "host:%s, port:%s, begin:%s, end:%s", (char *)host->ptr,
(char *)port->ptr, (char *)begin_slot->ptr, (char *)end_slot->ptr);
if (mobj != NULL)
{
return RedisModule_ReplyWithError(ctx, "migrating, please waiting");
}
mobj = createMigrateObject(host, atoi(port->ptr), atoi(begin_slot->ptr), atoi(end_slot->ptr));
struct timeval timeout = {1, 500000}; // 1.5s
redisOptions options = {0};
REDIS_OPTIONS_SET_TCP(&options, (const char *) mobj->host, mobj->port);
REDIS_OPTIONS_SET_TCP(&options, (const char *)mobj->host, mobj->port);
options.connect_timeout = &timeout;
mobj->source_cc = redisConnectWithOptions(&options);
if (mobj->source_cc == NULL || mobj->source_cc->err) {
RedisModule_Log(ctx, REDISMODULE_LOGLEVEL_WARNING, "Could not connect to Redis at ip:%s,port:%d, error:%s",
if (mobj->source_cc == NULL || mobj->source_cc->err)
{
RedisModule_Log(ctx, WARNING, "Could not connect to Redis at ip:%s,port:%d, error:%s",
mobj->host, mobj->port, mobj->source_cc->errstr);
freeMigrateObj(mobj);
return RedisModule_ReplyWithError(ctx, "Can't connect source redis");
}
return connectRedis(ctx);
mobj->repl_stat = REPL_STATE_CONNECTING;
RedisModule_EventLoopAdd(mobj->source_cc->fd, AE_WRITABLE, syncDataWithRedis, ctx);
return RedisModule_ReplyWithSimpleString(ctx, "OK");
}
/* This function must be present on each Redis module. It is used in order to
* register the commands into the Redis server. */
int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
{
REDISMODULE_NOT_USED(argv);
REDISMODULE_NOT_USED(argc);
int flag = RedisModule_Init(ctx, MODULE_NAME, REDIS_MIGRATE_VERSION, REDISMODULE_APIVER_1);
if (flag == REDISMODULE_ERR) {
if (flag == REDISMODULE_ERR)
{
return REDISMODULE_ERR;
}
RedisModule_SetClusterFlags(ctx, REDISMODULE_CLUSTER_FLAG_NO_REDIRECTION);
RedisModule_Log(ctx, REDISMODULE_LOGLEVEL_NOTICE, "begin init commands of %s", MODULE_NAME);
flag = RedisModule_CreateCommand(ctx, "rm.migrate", rm_migrateCommand, "write deny-oom admin", 1, 1, 0);
if (flag == REDISMODULE_ERR) {
RedisModule_Log(ctx, REDISMODULE_LOGLEVEL_WARNING, "init rm.migrate failed");
RedisModule_Log(ctx, NOTICE, "begin init commands of %s", MODULE_NAME);
flag = RedisModule_CreateCommand(ctx, "rm.migrate", rm_migrateCommand, "write deny-oom admin getkeys-api", 0, 0, 0);
if (flag == REDISMODULE_ERR)
{
RedisModule_Log(ctx, WARNING, "init rm.migrate failed");
return REDISMODULE_ERR;
}
RedisModule_Log(ctx, REDISMODULE_LOGLEVEL_NOTICE, "init %s success", MODULE_NAME);
RedisModuleCallReply *reply = RedisModule_Call(ctx, "config", "cc", "get", "logfile");
long long items = RedisModule_CallReplyLength(reply);
if (items != 2)
{
RedisModule_Log(ctx, WARNING, "logfile is empty");
return REDISMODULE_ERR;
}
RedisModuleCallReply *item1 = RedisModule_CallReplyArrayElement(reply, 1);
robj *logfile = (robj *)RedisModule_CreateStringFromCallReply(item1);
RedisModule_Log(ctx, NOTICE, "logfile is %s", (char *)logfile->ptr);
createLogObj((char *)logfile->ptr);
RedisModule_Log(ctx, NOTICE, "init %s success", MODULE_NAME);
return REDISMODULE_OK;
}

View File

@ -6,27 +6,37 @@
#include "ae.h"
#include "sds.h"
#include "sdscompat.h"
#include "log.h"
#define MODULE_NAME "redis-migrate"
#define REDIS_MIGRATE_VERSION 1
#define LRU_BITS 24
#define CONFIG_RUN_ID_SIZE 40
#define RDB_EOF_MARK_SIZE 40
#define PSYNC_WRITE_ERROR 0
#define PSYNC_WAIT_REPLY 1
#define PSYNC_CONTINUE 2
#define PSYNC_FULLRESYNC 3
#define PSYNC_NOT_SUPPORTED 4
#define PSYNC_TRY_LATER 5
#define C_ERR -1
#define C_OK 1
#define PROTO_IOBUF_LEN (1024 * 16)
/* Anti-warning macro... */
#define UNUSED(V) ((void) V)
#define UNUSED(V) ((void)V)
typedef struct redisObject {
unsigned type: 4;
unsigned encoding: 4;
unsigned lru: LRU_BITS; /* LRU time (relative to global lru_clock) or
* LFU data (least significant 8 bits frequency
* and most significant 16 bits access time). */
typedef struct redisObject
{
unsigned type : 4;
unsigned encoding : 4;
unsigned lru : LRU_BITS;
int refcount;
void *ptr;
} robj;
typedef struct migrateObject {
typedef struct migrateObject
{
char *address;
int repl_stat;
redisContext *source_cc;
@ -35,14 +45,20 @@ typedef struct migrateObject {
int begin_slot;
int end_slot;
char *psync_replid;
char master_replid[CONFIG_RUN_ID_SIZE + 1];
int timeout;
int isCache;
char psync_offset[32];
int repl_transfer_size;
long long master_initial_offset;
time_t repl_transfer_lastio;
} migrateObj;
typedef enum {
REPL_STATE_NONE = 0, /* No active replication */
REPL_STATE_CONNECT, /* Must connect to master */
REPL_STATE_CONNECTING, /* Connecting to master */
typedef enum
{
REPL_STATE_NONE = 0, /* No active replication */
REPL_STATE_CONNECT, /* Must connect to master */
REPL_STATE_CONNECTING, /* Connecting to master */
/* --- Handshake states, must be ordered --- */
REPL_STATE_RECEIVE_PING_REPLY, /* Wait for PING reply */
REPL_STATE_SEND_HANDSHAKE, /* Send handshake sequence to master */
@ -53,26 +69,38 @@ typedef enum {
REPL_STATE_SEND_PSYNC, /* Send PSYNC */
REPL_STATE_RECEIVE_PSYNC_REPLY, /* Wait for PSYNC reply */
/* --- End of handshake states --- */
REPL_STATE_TRANSFER, /* Receiving .rdb from master */
REPL_STATE_CONNECTED, /* Connected to master */
STATE_CONNECT_ERROR,
STATE_DISCONNECT
REPL_STATE_TRANSFER, /* Receiving .rdb from master */
REPL_STATE_CONNECTED, /* Connected to master */
} repl_state;
long long ustime(void);
mstime_t mstime(void);
migrateObj *createMigrateObject(robj *host, int port, int begin_slot, int end_slot);
void freeMigrateObj(migrateObj *m);
int sendSyncCommand(RedisModuleCtx *ctx);
int sendSyncCommand();
void *syncWithRedis(void *arg);
int receiveDataFromRedis();
int connectRedis(RedisModuleCtx *ctx) ;
ssize_t syncWrite(int fd, char *ptr, ssize_t size, long long timeout);
ssize_t syncRead(int fd, char *ptr, ssize_t size, long long timeout);
ssize_t syncReadLine(int fd, char *ptr, ssize_t size, long long timeout);
sds redisReceive();
void readFullData();
void cancelMigrate();
void syncDataWithRedis(int fd, void *user_data, int mask);
int rm_migrateCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc);
int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc);
#endif //REDIS_MIGRATE_REDIS_MIGRATE_H
#endif // REDIS_MIGRATE_REDIS_MIGRATE_H

View File

@ -220,7 +220,7 @@ This flag should not be used directly by the module.
/* Logging level strings */
#define REDISMODULE_LOGLEVEL_DEBUG "debug"
#define REDISMODULE_LOGLEVEL_VERBOSE "verbose"
#define REDISMODULE_LOGLEVEL_VERBOSE "VERBOSE"
#define REDISMODULE_LOGLEVEL_NOTICE "notice"
#define REDISMODULE_LOGLEVEL_WARNING "warning"

117
src/syncio.c Normal file
View File

@ -0,0 +1,117 @@
#include "redis-migrate.h"
#include "ae.h"
#include <unistd.h>
#include <errno.h>
/* ----------------- Blocking sockets I/O with timeouts --------------------- */
#define SYNCIO__RESOLUTION 10
ssize_t syncWrite(int fd, char *ptr, ssize_t size, long long timeout)
{
ssize_t nwritten, ret = size;
long long start = mstime();
long long remaining = timeout;
while (1)
{
long long wait = (remaining > SYNCIO__RESOLUTION) ? remaining : SYNCIO__RESOLUTION;
long long elapsed;
nwritten = write(fd, ptr, size);
if (nwritten == -1)
{
if (errno != EAGAIN)
return -1;
}
else
{
ptr += nwritten;
size -= nwritten;
}
if (size == 0)
return ret;
/* Wait */
aeWait(fd, AE_WRITABLE, wait);
elapsed = mstime() - start;
if (elapsed >= timeout)
{
errno = ETIMEDOUT;
return -1;
}
remaining = timeout - elapsed;
}
}
ssize_t syncRead(int fd, char *ptr, ssize_t size, long long timeout)
{
ssize_t nread, totread = 0;
long long start = mstime();
long long remaining = timeout;
if (size == 0)
return 0;
while (1)
{
long long wait = (remaining > SYNCIO__RESOLUTION) ? remaining : SYNCIO__RESOLUTION;
long long elapsed;
nread = read(fd, ptr, size);
if (nread == 0)
return -1;
if (nread == -1)
{
if (errno != EAGAIN)
return -1;
}
else
{
ptr += nread;
size -= nread;
totread += nread;
}
if (size == 0)
return totread;
/* Wait */
aeWait(fd, AE_READABLE, wait);
elapsed = mstime() - start;
if (elapsed >= timeout)
{
errno = ETIMEDOUT;
return -1;
}
remaining = timeout - elapsed;
}
}
ssize_t syncReadLine(int fd, char *ptr, ssize_t size, long long timeout)
{
ssize_t nread = 0;
size--;
while (size)
{
char c;
if (syncRead(fd, &c, 1, timeout) == -1)
return -1;
if (c == '\n')
{
*ptr = '\0';
if (nread && *(ptr - 1) == '\r')
*(ptr - 1) = '\0';
return nread;
}
else
{
*ptr++ = c;
*ptr = '\0';
nread++;
}
size--;
}
return nread;
}