sync #7

Merged
zeekling merged 3 commits from sync into master 2022-06-22 15:53:13 +00:00
17 changed files with 1098 additions and 739 deletions

213
.clang-format Normal file
View File

@ -0,0 +1,213 @@
# 语言: None, Cpp, Java, JavaScript, ObjC, Proto, TableGen, TextProto
Language: Cpp
# BasedOnStyle: LLVM
# 访问说明符(public、private等)的偏移
AccessModifierOffset: -4
# 开括号(开圆括号、开尖括号、开方括号)后的对齐: Align, DontAlign, AlwaysBreak(总是在开括号后换行)
AlignAfterOpenBracket: Align
# 连续赋值时,对齐所有等号
AlignConsecutiveAssignments: false
# 连续声明时,对齐所有声明的变量名
AlignConsecutiveDeclarations: false
# 右对齐逃脱换行(使用反斜杠换行)的反斜杠
AlignEscapedNewlines: Right
# 水平对齐二元和三元表达式的操作数
AlignOperands: true
# 对齐连续的尾随的注释
AlignTrailingComments: true
# 不允许函数声明的所有参数在放在下一行
AllowAllParametersOfDeclarationOnNextLine: false
# 不允许短的块放在同一行
AllowShortBlocksOnASingleLine: true
# 允许短的case标签放在同一行
AllowShortCaseLabelsOnASingleLine: true
# 允许短的函数放在同一行: None, InlineOnly(定义在类中), Empty(空函数), Inline(定义在类中,空函数), All
AllowShortFunctionsOnASingleLine: None
# 允许短的if语句保持在同一行
AllowShortIfStatementsOnASingleLine: true
# 允许短的循环保持在同一行
AllowShortLoopsOnASingleLine: true
# 总是在返回类型后换行: None, All, TopLevel(顶级函数,不包括在类中的函数),
# AllDefinitions(所有的定义,不包括声明), TopLevelDefinitions(所有的顶级函数的定义)
AlwaysBreakAfterReturnType: None
# 总是在多行string字面量前换行
AlwaysBreakBeforeMultilineStrings: false
# 总是在template声明后换行
AlwaysBreakTemplateDeclarations: true
# false表示函数实参要么都在同一行要么都各自一行
BinPackArguments: true
# false表示所有形参要么都在同一行要么都各自一行
BinPackParameters: true
# 大括号换行只有当BreakBeforeBraces设置为Custom时才有效
BraceWrapping:
# class定义后面
AfterClass: false
# 控制语句后面
AfterControlStatement: false
# enum定义后面
AfterEnum: false
# 函数定义后面
AfterFunction: false
# 命名空间定义后面
AfterNamespace: false
# struct定义后面
AfterStruct: false
# union定义后面
AfterUnion: false
# extern之后
AfterExternBlock: false
# catch之前
BeforeCatch: false
# else之前
BeforeElse: false
# 缩进大括号
IndentBraces: false
# 分离空函数
SplitEmptyFunction: false
# 分离空语句
SplitEmptyRecord: false
# 分离空命名空间
SplitEmptyNamespace: false
# 在二元运算符前换行: None(在操作符后换行), NonAssignment(在非赋值的操作符前换行), All(在操作符前换行)
BreakBeforeBinaryOperators: NonAssignment
# 在大括号前换行: Attach(始终将大括号附加到周围的上下文), Linux(除函数、命名空间和类定义与Attach类似),
# Mozilla(除枚举、函数、记录定义与Attach类似), Stroustrup(除函数定义、catch、else与Attach类似),
# Allman(总是在大括号前换行), GNU(总是在大括号前换行,并对于控制语句的大括号增加额外的缩进), WebKit(在函数前换行), Custom
# 注:这里认为语句块也属于函数
BreakBeforeBraces: Custom
# 在三元运算符前换行
BreakBeforeTernaryOperators: false
# 在构造函数的初始化列表的冒号后换行
BreakConstructorInitializers: AfterColon
#BreakInheritanceList: AfterColon
BreakStringLiterals: false
# 每行字符的限制0表示没有限制
ColumnLimit: 0
CompactNamespaces: true
# 构造函数的初始化列表要么都在同一行,要么都各自一行
ConstructorInitializerAllOnOneLineOrOnePerLine: false
# 构造函数的初始化列表的缩进宽度
ConstructorInitializerIndentWidth: 4
# 延续的行的缩进宽度
ContinuationIndentWidth: 4
# 去除C++11的列表初始化的大括号{后和}前的空格
Cpp11BracedListStyle: true
# 继承最常用的指针和引用的对齐方式
DerivePointerAlignment: false
# 固定命名空间注释
FixNamespaceComments: true
# 缩进case标签
IndentCaseLabels: false
IndentPPDirectives: None
# 缩进宽度
IndentWidth: 4
# 函数返回类型换行时,缩进函数声明或函数定义的函数名
IndentWrappedFunctionNames: false
# 保留在块开始处的空行
KeepEmptyLinesAtTheStartOfBlocks: false
# 连续空行的最大数量
MaxEmptyLinesToKeep: 1
# 命名空间的缩进: None, Inner(缩进嵌套的命名空间中的内容), All
NamespaceIndentation: None
# 指针和引用的对齐: Left, Right, Middle
PointerAlignment: Right
# 允许重新排版注释
ReflowComments: true
# 允许排序#include
SortIncludes: false
# 允许排序 using 声明
SortUsingDeclarations: false
# 在C风格类型转换后添加空格
SpaceAfterCStyleCast: false
# 在Template 关键字后面添加空格
SpaceAfterTemplateKeyword: true
# 在赋值运算符之前添加空格
SpaceBeforeAssignmentOperators: true
# SpaceBeforeCpp11BracedList: true
# SpaceBeforeCtorInitializerColon: true
# SpaceBeforeInheritanceColon: true
# 开圆括号之前添加一个空格: Never, ControlStatements, Always
SpaceBeforeParens: ControlStatements
# SpaceBeforeRangeBasedForLoopColon: true
# 在空的圆括号中添加空格
SpaceInEmptyParentheses: false
# 在尾随的评论前添加的空格数(只适用于//)
SpacesBeforeTrailingComments: 1
# 在尖括号的<后和>前添加空格
SpacesInAngles: false
# 在C风格类型转换的括号中添加空格
SpacesInCStyleCastParentheses: false
# 在容器(ObjC和JavaScript的数组和字典等)字面量中添加空格
SpacesInContainerLiterals: true
# 在圆括号的(后和)前添加空格
SpacesInParentheses: false
# 在方括号的[后和]前添加空格lamda表达式和未指明大小的数组的声明不受影响
SpacesInSquareBrackets: false
# 标准: Cpp03, Cpp11, Auto
Standard: Cpp11
# tab宽度
TabWidth: 4
# 使用tab字符: Never, ForIndentation, ForContinuationAndIndentation, Always
UseTab: Never

View File

@ -37,9 +37,12 @@ alloc.xo: fmacros.h alloc.h
dict.xo: fmacros.h alloc.h dict.h
ae.xo: redismodule.h
hiredis.xo:fmacros.h hiredis.h net.h sds.h alloc.h async.h
syncio.xo:redis-migrate.c
log.xo: log.h
rdbLoad.xo: rdbLoad.h
redis-migrate.xo: redismodule.h
XO_LIBS=redis-migrate.xo hiredis.xo sds.xo ssl.xo read.xo alloc.xo dict.xo async.xo net.xo ae.xo
XO_LIBS=redis-migrate.xo hiredis.xo sds.xo ssl.xo read.xo alloc.xo dict.xo async.xo net.xo ae.xo syncio.xo log.xo rdbLoad.xo
redis-migrate.so: $(XO_LIBS)
$(LD) -o $@ $^ $(SHOBJ_LDFLAGS) $(LIBS) -lc

509
src/ae.c
View File

@ -1,502 +1,25 @@
#include <errno.h>
#include <sys/types.h>
#include <sys/epoll.h>
#include <fcntl.h>
#include "ae.h"
#include <string.h>
#include <poll.h>
int aeWait(int fd, int mask, long long milliseconds) {
struct pollfd pfd;
int retmask = 0, retval;
void redisAeReadEvent(aeEventLoop *el, int fd, void *privdata, int mask) {
((void)el); ((void)fd); ((void)mask);
memset(&pfd, 0, sizeof(pfd));
pfd.fd = fd;
if (mask & AE_READABLE) pfd.events |= POLLIN;
if (mask & AE_WRITABLE) pfd.events |= POLLOUT;
redisAeEvents *e = (redisAeEvents*)privdata;
redisAsyncHandleRead(e->context);
}
void aeStop(aeEventLoop *eventLoop) {
eventLoop->stop = 1;
}
void aeMain(aeEventLoop *eventLoop) {
eventLoop->stop = 0;
while (!eventLoop->stop) {
aeProcessEvents(eventLoop, AE_ALL_EVENTS|
AE_CALL_BEFORE_SLEEP|
AE_CALL_AFTER_SLEEP);
}
}
monotime (*getMonotonicUs)(void) = NULL;
/* Process time events */
static int processTimeEvents(aeEventLoop *eventLoop) {
int processed = 0;
aeTimeEvent *te;
long long maxId;
te = eventLoop->timeEventHead;
maxId = eventLoop->timeEventNextId-1;
monotime now = getMonotonicUs();
while(te) {
long long id;
/* Remove events scheduled for deletion. */
if (te->id == AE_DELETED_EVENT_ID) {
aeTimeEvent *next = te->next;
/* If a reference exists for this timer event,
* don't free it. This is currently incremented
* for recursive timerProc calls */
if (te->refcount) {
te = next;
continue;
}
if (te->prev)
te->prev->next = te->next;
else
eventLoop->timeEventHead = te->next;
if (te->next)
te->next->prev = te->prev;
if (te->finalizerProc) {
te->finalizerProc(eventLoop, te->clientData);
now = getMonotonicUs();
}
hi_free(te);
te = next;
continue;
}
/* Make sure we don't process time events created by time events in
* this iteration. Note that this check is currently useless: we always
* add new timers on the head, however if we change the implementation
* detail, this check may be useful again: we keep it here for future
* defense. */
if (te->id > maxId) {
te = te->next;
continue;
}
if (te->when <= now) {
int retval;
id = te->id;
te->refcount++;
retval = te->timeProc(eventLoop, id, te->clientData);
te->refcount--;
processed++;
now = getMonotonicUs();
if (retval != AE_NOMORE) {
te->when = now + retval * 1000;
} else {
te->id = AE_DELETED_EVENT_ID;
}
}
te = te->next;
}
return processed;
}
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
aeApiState *state = eventLoop->apidata;
int retval, numevents = 0;
retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,
tvp ? (tvp->tv_sec*1000 + (tvp->tv_usec + 999)/1000) : -1);
if (retval > 0) {
int j;
numevents = retval;
for (j = 0; j < numevents; j++) {
int mask = 0;
struct epoll_event *e = state->events+j;
if (e->events & EPOLLIN) mask |= AE_READABLE;
if (e->events & EPOLLOUT) mask |= AE_WRITABLE;
if (e->events & EPOLLERR) mask |= AE_WRITABLE|AE_READABLE;
if (e->events & EPOLLHUP) mask |= AE_WRITABLE|AE_READABLE;
eventLoop->fired[j].fd = e->data.fd;
eventLoop->fired[j].mask = mask;
}
} else if (retval == -1 && errno != EINTR) {
//panic("aeApiPoll: epoll_wait, %s", strerror(errno));
}
return numevents;
}
static int64_t usUntilEarliestTimer(aeEventLoop *eventLoop) {
aeTimeEvent *te = eventLoop->timeEventHead;
if (te == NULL) return -1;
aeTimeEvent *earliest = NULL;
while (te) {
if (!earliest || te->when < earliest->when)
earliest = te;
te = te->next;
}
monotime now = getMonotonicUs();
return (now >= earliest->when) ? 0 : earliest->when - now;
}
/* Process every pending time event, then every pending file event
* (that may be registered by time event callbacks just processed).
* Without special flags the function sleeps until some file event
* fires, or when the next time event occurs (if any).
*
* If flags is 0, the function does nothing and returns.
* if flags has AE_ALL_EVENTS set, all the kind of events are processed.
* if flags has AE_FILE_EVENTS set, file events are processed.
* if flags has AE_TIME_EVENTS set, time events are processed.
* if flags has AE_DONT_WAIT set, the function returns ASAP once all
* the events that can be handled without a wait are processed.
* if flags has AE_CALL_AFTER_SLEEP set, the aftersleep callback is called.
* if flags has AE_CALL_BEFORE_SLEEP set, the beforesleep callback is called.
*
* The function returns the number of events processed. */
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
int processed = 0, numevents;
/* Nothing to do? return ASAP */
if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;
/* Note that we want to call select() even if there are no
* file events to process as long as we want to process time
* events, in order to sleep until the next time event is ready
* to fire. */
if (eventLoop->maxfd != -1 ||
((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
int j;
struct timeval tv, *tvp;
int64_t usUntilTimer = -1;
if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))
usUntilTimer = usUntilEarliestTimer(eventLoop);
if (usUntilTimer >= 0) {
tv.tv_sec = usUntilTimer / 1000000;
tv.tv_usec = usUntilTimer % 1000000;
tvp = &tv;
} else {
/* If we have to check for events but need to return
* ASAP because of AE_DONT_WAIT we need to set the timeout
* to zero */
if (flags & AE_DONT_WAIT) {
tv.tv_sec = tv.tv_usec = 0;
tvp = &tv;
} else {
/* Otherwise we can block */
tvp = NULL; /* wait forever */
}
}
if (eventLoop->flags & AE_DONT_WAIT) {
tv.tv_sec = tv.tv_usec = 0;
tvp = &tv;
}
if (eventLoop->beforesleep != NULL && flags & AE_CALL_BEFORE_SLEEP)
eventLoop->beforesleep(eventLoop);
/* Call the multiplexing API, will return only on timeout or when
* some event fires. */
numevents = aeApiPoll(eventLoop, tvp);
/* After sleep callback. */
if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP)
eventLoop->aftersleep(eventLoop);
for (j = 0; j < numevents; j++) {
int fd = eventLoop->fired[j].fd;
aeFileEvent *fe = &eventLoop->events[fd];
int mask = eventLoop->fired[j].mask;
int fired = 0; /* Number of events fired for current fd. */
/* Normally we execute the readable event first, and the writable
* event later. This is useful as sometimes we may be able
* to serve the reply of a query immediately after processing the
* query.
*
* However if AE_BARRIER is set in the mask, our application is
* asking us to do the reverse: never fire the writable event
* after the readable. In such a case, we invert the calls.
* This is useful when, for instance, we want to do things
* in the beforeSleep() hook, like fsyncing a file to disk,
* before replying to a client. */
int invert = fe->mask & AE_BARRIER;
/* Note the "fe->mask & mask & ..." code: maybe an already
* processed event removed an element that fired and we still
* didn't processed, so we check if the event is still valid.
*
* Fire the readable event if the call sequence is not
* inverted. */
if (!invert && fe->mask & mask & AE_READABLE) {
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
fired++;
fe = &eventLoop->events[fd]; /* Refresh in case of resize. */
}
/* Fire the writable event. */
if (fe->mask & mask & AE_WRITABLE) {
if (!fired || fe->wfileProc != fe->rfileProc) {
fe->wfileProc(eventLoop,fd,fe->clientData,mask);
fired++;
}
}
/* If we have to invert the call, fire the readable event now
* after the writable one. */
if (invert) {
fe = &eventLoop->events[fd]; /* Refresh in case of resize. */
if ((fe->mask & mask & AE_READABLE) &&
(!fired || fe->wfileProc != fe->rfileProc))
{
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
fired++;
}
}
processed++;
}
}
/* Check time events */
if (flags & AE_TIME_EVENTS)
processed += processTimeEvents(eventLoop);
return processed; /* return the number of processed file/time events */
}
void redisAeWriteEvent(aeEventLoop *el, int fd, void *privdata, int mask) {
((void)el); ((void)fd); ((void)mask);
redisAeEvents *e = (redisAeEvents*)privdata;
redisAsyncHandleWrite(e->context);
}
int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
aeApiState *state = eventLoop->apidata;
struct epoll_event ee = {0}; /* avoid valgrind warning */
/* If the fd was already monitored for some event, we need a MOD
* operation. Otherwise we need an ADD operation. */
int op = eventLoop->events[fd].mask == AE_NONE ?
EPOLL_CTL_ADD : EPOLL_CTL_MOD;
ee.events = 0;
mask |= eventLoop->events[fd].mask; /* Merge old events */
if (mask & AE_READABLE) ee.events |= EPOLLIN;
if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;
ee.data.fd = fd;
if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1;
return 0;
}
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
aeFileProc *proc, void *clientData)
{
if (fd >= eventLoop->setsize) {
errno = ERANGE;
return AE_ERR;
}
aeFileEvent *fe = &eventLoop->events[fd];
if (aeApiAddEvent(eventLoop, fd, mask) == -1)
return AE_ERR;
fe->mask |= mask;
if (mask & AE_READABLE) fe->rfileProc = proc;
if (mask & AE_WRITABLE) fe->wfileProc = proc;
fe->clientData = clientData;
if (fd > eventLoop->maxfd)
eventLoop->maxfd = fd;
return AE_OK;
}
void redisAeAddRead(void *privdata) {
redisAeEvents *e = (redisAeEvents*)privdata;
aeEventLoop *loop = e->loop;
if (!e->reading) {
e->reading = 1;
aeCreateFileEvent(loop,e->fd,AE_READABLE,redisAeReadEvent,e);
}
}
void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int delmask) {
aeApiState *state = eventLoop->apidata;
struct epoll_event ee = {0}; /* avoid valgrind warning */
int mask = eventLoop->events[fd].mask & (~delmask);
ee.events = 0;
if (mask & AE_READABLE) ee.events |= EPOLLIN;
if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;
ee.data.fd = fd;
if (mask != AE_NONE) {
epoll_ctl(state->epfd,EPOLL_CTL_MOD,fd,&ee);
if ((retval = poll(&pfd, 1, milliseconds))== 1) {
if (pfd.revents & POLLIN) retmask |= AE_READABLE;
if (pfd.revents & POLLOUT) retmask |= AE_WRITABLE;
if (pfd.revents & POLLERR) retmask |= AE_WRITABLE;
if (pfd.revents & POLLHUP) retmask |= AE_WRITABLE;
return retmask;
} else {
/* Note, Kernel < 2.6.9 requires a non null event pointer even for
* EPOLL_CTL_DEL. */
epoll_ctl(state->epfd,EPOLL_CTL_DEL,fd,&ee);
return retval;
}
}
void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask)
{
if (fd >= eventLoop->setsize) return;
aeFileEvent *fe = &eventLoop->events[fd];
if (fe->mask == AE_NONE) return;
/* We want to always remove AE_BARRIER if set when AE_WRITABLE
* is removed. */
if (mask & AE_WRITABLE) mask |= AE_BARRIER;
aeApiDelEvent(eventLoop, fd, mask);
fe->mask = fe->mask & (~mask);
if (fd == eventLoop->maxfd && fe->mask == AE_NONE) {
/* Update the max fd */
int j;
for (j = eventLoop->maxfd-1; j >= 0; j--)
if (eventLoop->events[j].mask != AE_NONE) break;
eventLoop->maxfd = j;
}
}
void redisAeDelRead(void *privdata) {
redisAeEvents *e = (redisAeEvents*)privdata;
aeEventLoop *loop = e->loop;
if (e->reading) {
e->reading = 0;
aeDeleteFileEvent(loop,e->fd,AE_READABLE);
}
}
void redisAeAddWrite(void *privdata) {
redisAeEvents *e = (redisAeEvents*)privdata;
aeEventLoop *loop = e->loop;
if (!e->writing) {
e->writing = 1;
aeCreateFileEvent(loop,e->fd,AE_WRITABLE,redisAeWriteEvent,e);
}
}
void redisAeDelWrite(void *privdata) {
redisAeEvents *e = (redisAeEvents*)privdata;
aeEventLoop *loop = e->loop;
if (e->writing) {
e->writing = 0;
aeDeleteFileEvent(loop,e->fd,AE_WRITABLE);
}
}
void redisAeCleanup(void *privdata) {
redisAeEvents *e = (redisAeEvents*)privdata;
redisAeDelRead(privdata);
redisAeDelWrite(privdata);
hi_free(e);
}
/* Enable the FD_CLOEXEC on the given fd to avoid fd leaks.
* This function should be invoked for fd's on specific places
* where fork + execve system calls are called. */
int anetCloexec(int fd) {
int r;
int flags;
do {
r = fcntl(fd, F_GETFD);
} while (r == -1 && errno == EINTR);
if (r == -1 || (r & FD_CLOEXEC))
return r;
flags = r | FD_CLOEXEC;
do {
r = fcntl(fd, F_SETFD, flags);
} while (r == -1 && errno == EINTR);
return r;
}
int aeApiCreate(aeEventLoop *eventLoop) {
aeApiState *state = hi_malloc(sizeof(aeApiState));
if (!state) return -1;
state->events = hi_malloc(sizeof(struct epoll_event)*eventLoop->setsize);
if (!state->events) {
hi_free(state);
return -1;
}
state->epfd = epoll_create(1024); /* 1024 is just a hint for the kernel */
if (state->epfd == -1) {
hi_free(state->events);
hi_free(state);
return -1;
}
anetCloexec(state->epfd);
eventLoop->apidata = state;
return 0;
}
aeEventLoop *aeCreateEventLoop(int setsize) {
aeEventLoop *eventLoop;
int i;
if ((eventLoop = hi_malloc(sizeof(*eventLoop))) == NULL) goto err;
eventLoop->events = hi_malloc(sizeof(aeFileEvent)*setsize);
eventLoop->fired = hi_malloc(sizeof(aeFiredEvent)*setsize);
if (eventLoop->events == NULL || eventLoop->fired == NULL) goto err;
eventLoop->setsize = setsize;
eventLoop->timeEventHead = NULL;
eventLoop->timeEventNextId = 0;
eventLoop->stop = 0;
eventLoop->maxfd = -1;
eventLoop->beforesleep = NULL;
eventLoop->aftersleep = NULL;
eventLoop->flags = 0;
if (aeApiCreate(eventLoop) == -1) goto err;
/* Events with mask == AE_NONE are not set. So let's initialize the
* vector with it. */
for (i = 0; i < setsize; i++)
eventLoop->events[i].mask = AE_NONE;
return eventLoop;
err:
if (eventLoop) {
hi_free(eventLoop->events);
hi_free(eventLoop->fired);
hi_free(eventLoop);
}
return NULL;
}
int redisAeAttach(aeEventLoop *loop, redisAsyncContext *ac) {
redisContext *c = &(ac->c);
redisAeEvents *e;
/* Nothing should be attached when something is already attached */
if (ac->ev.data != NULL)
return REDIS_ERR;
/* Create container for context and r/w events */
e = (redisAeEvents *) hi_malloc(sizeof(*e));
if (e == NULL)
return REDIS_ERR;
e->context = ac;
e->loop = loop;
e->fd = c->fd;
e->reading = e->writing = 0;
/* Register functions to start/stop listening for events */
ac->ev.addRead = redisAeAddRead;
ac->ev.delRead = redisAeDelRead;
ac->ev.addWrite = redisAeAddWrite;
ac->ev.delWrite = redisAeDelWrite;
ac->ev.cleanup = redisAeCleanup;
ac->ev.data = e;
return REDIS_OK;
}

122
src/ae.h
View File

@ -1,133 +1,13 @@
#ifndef __HIREDIS_AE_H__
#define __HIREDIS_AE_H__
#include "hiredis.h"
#include "async.h"
typedef uint64_t monotime;
#define AE_OK 0
#define AE_ERR -1
#define AE_NONE 0 /* No events registered. */
#define AE_READABLE 1 /* Fire when descriptor is readable. */
#define AE_WRITABLE 2 /* Fire when descriptor is writable. */
#define AE_BARRIER 4 /* With WRITABLE, never fire the event if the
READABLE event already fired in the same event
loop iteration. Useful when you want to persist
things to disk before sending replies, and want
to do that in a group fashion. */
/* Types and data structures */
typedef void aeFileProc(struct aeEventLoop *eventLoop, int fd, void *clientData, int mask);
typedef int aeTimeProc(struct aeEventLoop *eventLoop, long long id, void *clientData);
typedef void aeEventFinalizerProc(struct aeEventLoop *eventLoop, void *clientData);
typedef void aeBeforeSleepProc(struct aeEventLoop *eventLoop);
/* File event structure */
typedef struct aeFileEvent {
int mask; /* one of AE_(READABLE|WRITABLE|BARRIER) */
aeFileProc *rfileProc;
aeFileProc *wfileProc;
void *clientData;
} aeFileEvent;
/* Time event structure */
typedef struct aeTimeEvent {
long long id; /* time event identifier. */
monotime when;
aeTimeProc *timeProc;
aeEventFinalizerProc *finalizerProc;
void *clientData;
struct aeTimeEvent *prev;
struct aeTimeEvent *next;
int refcount; /* refcount to prevent timer events from being
* freed in recursive time event calls. */
} aeTimeEvent;
/* A fired event */
typedef struct aeFiredEvent {
int fd;
int mask;
} aeFiredEvent;
typedef struct aeEventLoop {
int maxfd; /* highest file descriptor currently registered */
int setsize; /* max number of file descriptors tracked */
long long timeEventNextId;
aeFileEvent *events; /* Registered events */
aeFiredEvent *fired; /* Fired events */
aeTimeEvent *timeEventHead;
int stop;
void *apidata; /* This is used for polling API specific data */
aeBeforeSleepProc *beforesleep;
aeBeforeSleepProc *aftersleep;
int flags;
} aeEventLoop;
typedef struct redisAeEvents {
redisAsyncContext *context;
aeEventLoop *loop;
int fd;
int reading, writing;
} redisAeEvents;
typedef struct aeApiState {
int epfd;
struct epoll_event *events;
} aeApiState;
#define AE_FILE_EVENTS (1<<0)
#define AE_TIME_EVENTS (1<<1)
#define AE_ALL_EVENTS (AE_FILE_EVENTS|AE_TIME_EVENTS)
#define AE_DONT_WAIT (1<<2)
#define AE_CALL_BEFORE_SLEEP (1<<3)
#define AE_CALL_AFTER_SLEEP (1<<4)
#define AE_NOMORE -1
#define AE_DELETED_EVENT_ID -1
extern monotime (*getMonotonicUs)(void);
void redisAeReadEvent(aeEventLoop *el, int fd, void *privdata, int mask);
void redisAeWriteEvent(aeEventLoop *el, int fd, void *privdata, int mask);
int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask);
void aeStop(aeEventLoop *eventLoop);
void aeMain(aeEventLoop *eventLoop);
int aeProcessEvents(aeEventLoop *eventLoop, int flags);
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
aeFileProc *proc, void *clientData);
void redisAeAddRead(void *privdata);
void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int delmask);
void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask);
void redisAeDelRead(void *privdata);
void redisAeAddWrite(void *privdata);
void redisAeDelWrite(void *privdata);
void redisAeCleanup(void *privdata);
int anetCloexec(int fd);
static int aeApiCreate(aeEventLoop *eventLoop);
aeEventLoop *aeCreateEventLoop(int setsize);
int redisAeAttach(aeEventLoop *loop, redisAsyncContext *ac);
int aeWait(int fd, int mask, long long milliseconds);
#endif

View File

@ -42,7 +42,8 @@ hiredisAllocFuncs hiredisAllocFns = {
};
/* Override hiredis' allocators with ones supplied by the user */
hiredisAllocFuncs hiredisSetAllocators(hiredisAllocFuncs *override) {
hiredisAllocFuncs hiredisSetAllocators(hiredisAllocFuncs *override)
{
hiredisAllocFuncs orig = hiredisAllocFns;
hiredisAllocFns = *override;
@ -51,8 +52,9 @@ hiredisAllocFuncs hiredisSetAllocators(hiredisAllocFuncs *override) {
}
/* Reset allocators to use libc defaults */
void hiredisResetAllocators(void) {
hiredisAllocFns = (hiredisAllocFuncs) {
void hiredisResetAllocators(void)
{
hiredisAllocFns = (hiredisAllocFuncs){
.mallocFn = malloc,
.callocFn = calloc,
.reallocFn = realloc,
@ -63,11 +65,13 @@ void hiredisResetAllocators(void) {
#ifdef _WIN32
void *hi_malloc(size_t size) {
void *hi_malloc(size_t size)
{
return hiredisAllocFns.mallocFn(size);
}
void *hi_calloc(size_t nmemb, size_t size) {
void *hi_calloc(size_t nmemb, size_t size)
{
/* Overflow check as the user can specify any arbitrary allocator */
if (SIZE_MAX / size < nmemb)
return NULL;
@ -75,15 +79,18 @@ void *hi_calloc(size_t nmemb, size_t size) {
return hiredisAllocFns.callocFn(nmemb, size);
}
void *hi_realloc(void *ptr, size_t size) {
void *hi_realloc(void *ptr, size_t size)
{
return hiredisAllocFns.reallocFn(ptr, size);
}
char *hi_strdup(const char *str) {
char *hi_strdup(const char *str)
{
return hiredisAllocFns.strdupFn(str);
}
void hi_free(void *ptr) {
void hi_free(void *ptr)
{
hiredisAllocFns.freeFn(ptr);
}

View File

@ -812,7 +812,7 @@ static int __redisAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void
if (!(c->flags & REDIS_SUBSCRIBED)) return REDIS_ERR;
/* (P)UNSUBSCRIBE does not have its own response: every channel or
* pattern that is unsubscribed will receive a message. This means we
* pattern that is unsubscribed will redisReceive a message. This means we
* should not append a callback function for this command. */
} else if (strncasecmp(cstr,"monitor\r\n",9) == 0) {
/* Set monitor flag and push callback */

View File

@ -33,9 +33,7 @@
* POSSIBILITY OF SUCH DAMAGE.
*/
#include "fmacros.h"
#include "alloc.h"
#include <stdlib.h>
#include <assert.h>
#include <limits.h>
#include "dict.h"

View File

@ -1139,6 +1139,16 @@ int redisAppendCommand(redisContext *c, const char *format, ...) {
return ret;
}
int redisSendCommand(redisContext *c, const char *format, ...) {
va_list ap;
int ret;
va_start(ap,format);
ret = redisvAppendCommand(c,format,ap);
va_end(ap);
return redisFlush(c);
}
int redisAppendCommandArgv(redisContext *c, int argc, const char **argv, const size_t *argvlen) {
hisds cmd;
long long len;

View File

@ -331,6 +331,7 @@ int redisAppendFormattedCommand(redisContext *c, const char *cmd, size_t len);
* to get a pipeline of commands. */
int redisvAppendCommand(redisContext *c, const char *format, va_list ap);
int redisAppendCommand(redisContext *c, const char *format, ...);
int redisSendCommand(redisContext *c, const char *format, ...);
int redisAppendCommandArgv(redisContext *c, int argc, const char **argv, const size_t *argvlen);
/* Issue a command to Redis. In a blocking context, it is identical to calling

111
src/log.c Normal file
View File

@ -0,0 +1,111 @@
#include <bits/types/FILE.h>
#include <stdio.h>
#include <fcntl.h>
#include <unistd.h>
#include <sys/time.h>
#include <time.h>
#include "log.h"
#include "hiredis.h"
static int is_leap_year(time_t year) {
if (year % 4) return 0; /* A year not divisible by 4 is not leap. */
else if (year % 100) return 1; /* If div by 4 and not 100 is surely leap. */
else if (year % 400) return 0; /* If div by 100 *and* not by 400 is not leap. */
else return 1; /* If div by 100 and 400 is leap. */
}
void createLogObj(char *logfile) {
log = hi_malloc(sizeof(logObj));
log->logfile = logfile;
log->loglevel = LL_NOTICE;
}
void _serverLog(int level, const char *fmt, ...) {
va_list ap;
char msg[LOG_MAX_LEN];
va_start(ap, fmt);
vsnprintf(msg, sizeof(msg), fmt, ap);
va_end(ap);
}
void serverLogRaw(int level, const char *msg) {
const char *c = ".-*#";
FILE *fp;
char buf[64];
int log_to_stdout = log->logfile[0] == '\0';
if (level < log->loglevel) {
return;
}
fp = log_to_stdout ? stdout : fopen(log->logfile, "a");
if (!fp) return;
int off;
struct timeval tv;
int role_char;
pid_t pid = getpid();
gettimeofday(&tv, NULL);
struct tm tm;
nolocks_localtime(&tm, tv.tv_sec, timezone, 1);
off = strftime(buf, sizeof(buf), "%d %b %Y %H:%M:%S.", &tm);
snprintf(buf + off, sizeof(buf) - off, "%03d", (int) tv.tv_usec / 1000);
role_char = 'm';
fprintf(fp, "%d:%c %s %c %s\n",
(int) pid, role_char, buf, c[level], msg);
fflush(fp);
if (!log_to_stdout) fclose(fp);
}
void nolocks_localtime(struct tm *tmp, time_t t, time_t tz, int dst) {
const time_t secs_min = 60;
const time_t secs_hour = 3600;
const time_t secs_day = 3600*24;
t -= tz; /* Adjust for timezone. */
t += 3600*dst; /* Adjust for daylight time. */
time_t days = t / secs_day; /* Days passed since epoch. */
time_t seconds = t % secs_day; /* Remaining seconds. */
tmp->tm_isdst = dst;
tmp->tm_hour = seconds / secs_hour;
tmp->tm_min = (seconds % secs_hour) / secs_min;
tmp->tm_sec = (seconds % secs_hour) % secs_min;
/* 1/1/1970 was a Thursday, that is, day 4 from the POV of the tm structure
* where sunday = 0, so to calculate the day of the week we have to add 4
* and take the modulo by 7. */
tmp->tm_wday = (days+4)%7;
/* Calculate the current year. */
tmp->tm_year = 1970;
while(1) {
/* Leap years have one day more. */
time_t days_this_year = 365 + is_leap_year(tmp->tm_year);
if (days_this_year > days) break;
days -= days_this_year;
tmp->tm_year++;
}
tmp->tm_yday = days; /* Number of day of the current year. */
/* We need to calculate in which month and day of the month we are. To do
* so we need to skip days according to how many days there are in each
* month, and adjust for the leap year that has one more day in February. */
int mdays[12] = {31, 28, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31};
mdays[1] += is_leap_year(tmp->tm_year);
tmp->tm_mon = 0;
while(days >= mdays[tmp->tm_mon]) {
days -= mdays[tmp->tm_mon];
tmp->tm_mon++;
}
tmp->tm_mday = days+1; /* Add 1 since our 'days' is zero-based. */
tmp->tm_year -= 1900; /* Surprisingly tm_year is year-1900. */
}

34
src/log.h Normal file
View File

@ -0,0 +1,34 @@
#ifndef REDIS_MIGRATE_LOG_H
#define REDIS_MIGRATE_LOG_H
#define NOTICE "notice"
#define WARNING "warning"
#define VERBOSE "VERBOSE"
#define DEBUG "debug"
#define LL_DEBUG 0
#define LL_VERBOSE 1
#define LL_NOTICE 2
#define LL_WARNING 3
#define LOG_MAX_LEN 1024
typedef struct logObj {
char *logfile;
int loglevel;
} logObj;
static logObj *log;
void createLogObj(char *logfile);
#define serverLog(level, ...) _serverLog(level, __VA_ARGS__);
void _serverLog(int level, const char *fmt, ...)
__attribute__((format(printf, 2, 3)));
void serverLogRaw(int level, const char *msg);
void nolocks_localtime(struct tm *tmp, time_t t, time_t tz, int dst);
#endif //REDIS_MIGRATE_LOG_H

175
src/rdbLoad.c Normal file
View File

@ -0,0 +1,175 @@
#include "rdbLoad.h"
#include <errno.h>
#include <unistd.h>
int rdbLoadRioWithLoading(migrateObj *mobj) {
char buf[1024];
int error;
if (syncReadLine(mobj->source_cc->fd, buf, 9, mobj->timeout) == -1) {
serverLog(LL_WARNING, "read version failed:%s,port:%d ", mobj->host,
mobj->port);
return C_ERR;
}
buf[9] = '\0';
if (memcmp(buf, "REDIS", 5) != 0) {
serverLog(LL_WARNING, "Wrong signature trying to load DB from file");
errno = EINVAL;
return C_ERR;
}
int type, rdbver;
uint64_t dbid = 0;
rdbver = atoi(buf + 5);
if (rdbver < 1 || rdbver > RDB_VERSION) {
serverLog(LL_WARNING, "Can't handle RDB format version %d", rdbver);
errno = EINVAL;
return C_ERR;
}
serverLog(LL_NOTICE, "buf=%s", buf);
long long lru_idle = -1, lfu_freq = -1, expiretime = -1, now = mstime();
while (1) {
sds key;
robj *val;
if ((type = rm_rdbLoadType(mobj)) == -1) {
serverLog(LL_WARNING, "read type failed");
return C_ERR;
}
if (type == RDB_OPCODE_EXPIRETIME) {
expiretime = rm_rdbLoadTime(mobj);
if (expiretime == -1) {
return C_ERR;
}
expiretime *= 1000;
continue;
} else if (type == RDB_OPCODE_EXPIRETIME_MS) {
expiretime = rm_rdbLoadMillisecondTime(mobj, rdbver);
continue;
} else if (type == RDB_OPCODE_FREQ) {
uint8_t byte;
if (read(mobj->source_cc->fd, &byte, 1) == 0)
return C_ERR;
lfu_freq = byte;
continue;
} else if (type == RDB_OPCODE_IDLE) {
uint64_t qword;
if ((qword = rm_rdbLoadLen(mobj, NULL)) == RDB_LENERR)
return C_ERR;
lru_idle = qword;
continue;
} else if (type == RDB_OPCODE_EOF) {
break;
} else if (type == RDB_OPCODE_SELECTDB) {
if ((dbid = rm_rdbLoadLen(mobj, NULL)) == RDB_LENERR) return C_ERR;
continue;
} else if (type == RDB_OPCODE_RESIZEDB) {
uint64_t db_size, expires_size;
if ((db_size = rm_rdbLoadLen(mobj, NULL)) == RDB_LENERR) return C_ERR;
if ((expires_size = rm_rdbLoadLen(mobj, NULL)) == RDB_LENERR) return C_ERR;
continue;
} else if (type == RDB_OPCODE_AUX) {
robj *auxkey, *auxval;
} else if (type == RDB_OPCODE_FUNCTION || type == RDB_OPCODE_FUNCTION2) {
}
}
}
void *rm_rdbGenericLoadStringObject(migrateObj *mobj, int flags, size_t *lenptr) {
int encode = flags & RDB_LOAD_ENC;
int plain = flags & RDB_LOAD_PLAIN;
int sds = flags & RDB_LOAD_SDS;
int isencoded;
unsigned long long len;
}
uint64_t rm_rdbLoadLen(migrateObj *mobj, int *isencoded) {
uint64_t len;
if (rdbLoadLenByRef(mobj, isencoded, &len) == -1)
return RDB_LENERR;
return len;
}
int rm_rdbLoadType(migrateObj *mobj) {
unsigned char type;
if (read(mobj->source_cc->fd, &type, 1) == 0) {
return -1;
}
return type;
}
int rm_rdbLoadLenByRef(migrateObj *mobj, int *isencoded, uint64_t *lenptr) {
unsigned char buf[2];
int type;
if (isencoded)
*isencoded = 0;
if (read(mobj->source_cc->fd, buf, 1) == 0) {
return -1;
}
type = (buf[0] & 0xC0) >> 6;
if (type == RDB_ENCVAL) {
if (isencoded)
*isencoded = 1;
*lenptr = buf[0] & 0x3F;
} else if (type == RDB_6BITLEN) {
/* Read a 6 bit len. */
*lenptr = buf[0] & 0x3F;
} else if (type == RDB_14BITLEN) {
if (read(mobj->source_cc->fd, buf + 1, 1) == 0)
return -1;
*lenptr = ((buf[0] & 0x3F) << 8) | buf[1];
} else if (buf[0] == RDB_32BITLEN) {
uint32_t len;
if (read(mobj->source_cc->fd, &len, 4) == 0)
return -1;
*lenptr = ntohl(len);
} else if (buf[0] == RDB_64BITLEN) {
uint64_t len;
if (read(mobj->source_cc->fd, &len, 8) == 0)
return -1;
*lenptr = ntohu64(len);
} else {
return -1;
}
return 0;
}
time_t rm_rdbLoadTime(migrateObj *mobj) {
int32_t t32;
if (read(mobj->source_cc->fd, &t32, 4) == 0) {
return -1;
}
t32 = (char *)t32 + 4;
return (time_t)t32;
}
long long rm_rdbLoadMillisecondTime(migrateObj *mobj, int rdbver) {
int64_t t64;
if (read(mobj->source_cc->fd, &t64, 8) == 0) {
return LLONG_MAX;
}
if (rdbver >= 9)
memrev64(&t64);
return (long long)t64;
}
void rm_memrev64(void *p) {
unsigned char *x = p, t;
t = x[0];
x[0] = x[7];
x[7] = t;
t = x[1];
x[1] = x[6];
x[6] = t;
t = x[2];
x[2] = x[5];
x[5] = t;
t = x[3];
x[3] = x[4];
x[4] = t;
}

48
src/rdbLoad.h Normal file
View File

@ -0,0 +1,48 @@
#ifndef RDB_LOAD_REDIS_MIGRATE_H
#define RDB_LOAD_REDIS_MIGRATE_H
#include "redis-migrate.h"
#define RDB_6BITLEN 0
#define RDB_14BITLEN 1
#define RDB_32BITLEN 0x80
#define RDB_64BITLEN 0x81
#define RDB_ENCVAL 3
#define RDB_LENERR UINT64_MAX
#define RDB_VERSION 10
#define RDB_OPCODE_FUNCTION2 245 /* function library data */
#define RDB_OPCODE_FUNCTION 246 /* old function library data for 7.0 rc1 and rc2 */
#define RDB_OPCODE_MODULE_AUX 247 /* Module auxiliary data. */
#define RDB_OPCODE_IDLE 248 /* LRU idle time. */
#define RDB_OPCODE_FREQ 249 /* LFU frequency. */
#define RDB_OPCODE_AUX 250 /* RDB aux field. */
#define RDB_OPCODE_RESIZEDB 251 /* Hash table resize hint. */
#define RDB_OPCODE_EXPIRETIME_MS 252 /* Expire time in milliseconds. */
#define RDB_OPCODE_EXPIRETIME 253 /* Old expire time in seconds. */
#define RDB_OPCODE_SELECTDB 254 /* DB number of the following keys. */
#define RDB_OPCODE_EOF 255 /* End of the RDB file. */
#define RDB_LOAD_NONE 0
#define RDB_LOAD_ENC (1 << 0)
#define RDB_LOAD_PLAIN (1 << 1)
#define RDB_LOAD_SDS (1 << 2)
#define LLONG_MAX __LONG_LONG_MAX__
int rm_rdbLoadRioWithLoading(migrateObj *mobj);
int rm_rdbLoadType(migrateObj *mobj);
time_t rm_rdbLoadTime(migrateObj *mobj);
long long rm_rdbLoadMillisecondTime(migrateObj *mobj, int rdbver);
int rm_rdbLoadLenByRef(migrateObj *mobi, int *isencoded, uint64_t *lenptr);
void rm_memrev64(void *p);
uint64_t rm_rdbLoadLen(migrateObj *mobj, int *isencoded);
void *rm_rdbGenericLoadStringObject(migrateObj *mobj, int flags, size_t *lenptr);
#endif

View File

@ -2,9 +2,9 @@
#include <sys/time.h>
#include <unistd.h>
#include <string.h>
#include <pthread.h>
#include "redis-migrate.h"
#include "hiredis.h"
#include "rdbLoad.h"
static migrateObj *mobj;
@ -17,6 +17,8 @@ migrateObj *createMigrateObject(robj *host, int port, int begin_slot, int end_sl
m->end_slot = end_slot;
m->repl_stat = REPL_STATE_NONE;
m->isCache = 0;
m->timeout = 10 * 1000;
m->repl_transfer_size = -1;
return m;
}
@ -26,102 +28,299 @@ void freeMigrateObj(migrateObj *m) {
mobj = NULL;
}
int sendSyncCommand(RedisModuleCtx *ctx) {
long long ustime(void) {
struct timeval tv;
long long ust;
gettimeofday(&tv, NULL);
ust = ((long long)tv.tv_sec) * 1000000;
ust += tv.tv_usec;
return ust;
}
/* Return the UNIX time in milliseconds */
mstime_t mstime(void) {
return ustime() / 1000;
}
int sendSyncCommand() {
if (!mobj->isCache) {
mobj->isCache = 1;
mobj->psync_replid = "?";
memcpy(mobj->psync_offset, "-1", 3);
}
if (redisAppendCommand(mobj->source_cc, "PSYNC %s %s", mobj->psync_replid, mobj->psync_offset) != REDIS_OK) {
RedisModule_Log(ctx, REDISMODULE_LOGLEVEL_WARNING,
"append PSYNC %s %s failed ip:%s,port:%d, ",
mobj->psync_replid, mobj->psync_offset, mobj->host, mobj->port);
serverLog(LL_WARNING, "append PSYNC %s %s failed ip:%s,port:%d, ",
mobj->psync_replid, mobj->psync_offset, mobj->host, mobj->port);
return 0;
}
if (redisFlush(mobj->source_cc) != REDIS_OK) {
RedisModule_Log(ctx, REDISMODULE_LOGLEVEL_WARNING,
"send PSYNC %s %s failed ip:%s,port:%d, ",
mobj->psync_replid, mobj->psync_offset, mobj->host, mobj->port);
serverLog(LL_WARNING, "send PSYNC %s %s failed ip:%s,port:%d, ",
mobj->psync_replid, mobj->psync_offset, mobj->host, mobj->port);
return 0;
}
RedisModule_Log(ctx, REDISMODULE_LOGLEVEL_NOTICE, "PSYNC %s %s ",
mobj->psync_replid, mobj->psync_offset);
serverLog(LL_NOTICE, "PSYNC %s %s ", mobj->psync_replid, mobj->psync_offset);
return 1;
}
void *syncWithRedis(void *arg) {
RedisModuleCtx *ctx = arg;
redisReply *reply;
RedisModule_Log(ctx, REDISMODULE_LOGLEVEL_NOTICE, "begin sync data");
reply = redisCommand(mobj->source_cc, "PING");
if (reply->type == REDIS_REPLY_ERROR) {
RedisModule_Log(ctx, REDISMODULE_LOGLEVEL_WARNING, "send PING failed ip:%s,port:%d, error:%s",
mobj->host, mobj->port, reply->str);
goto error;
sds redisReceive() {
char buf[256];
if (syncReadLine(mobj->source_cc->fd, buf, sizeof(buf), mobj->timeout) == -1) {
serverLog(LL_WARNING, "redisReceive failed ip:%s,port:%d ", mobj->host, mobj->port);
return NULL;
}
RedisModule_Log(ctx, REDISMODULE_LOGLEVEL_NOTICE, "PING ");
freeReplyObject(reply);
//todo auth with master
sds portstr = sdsfromlonglong(mobj->port);
reply = redisCommand(mobj->source_cc, "REPLCONF listening-port %s", portstr);
if (reply->type == REDIS_REPLY_ERROR) {
sdsfree(portstr);
RedisModule_Log(ctx, REDISMODULE_LOGLEVEL_WARNING, "REPLCONF listening-port %s failed ip:%s,port:%d, error:%s",
portstr, mobj->host, mobj->port, reply->str);
goto error;
}
RedisModule_Log(ctx, REDISMODULE_LOGLEVEL_NOTICE, "REPLCONF listening-port %s ", portstr);
sdsfree(portstr);
freeReplyObject(reply);
reply = redisCommand(mobj->source_cc, "REPLCONF ip-address %s", mobj->host);
if (reply->type == REDIS_REPLY_ERROR) {
RedisModule_Log(ctx, REDISMODULE_LOGLEVEL_WARNING, "REPLCONF ip-address %s failed ip:%s,port:%d, error:%s",
mobj->host, mobj->host, mobj->port, reply->str);
goto error;
}
RedisModule_Log(ctx, REDISMODULE_LOGLEVEL_NOTICE, "REPLCONF ip-address %s ", mobj->host);
freeReplyObject(reply);
reply = redisCommand(mobj->source_cc, "REPLCONF %s %s %s %s", "capa", "eof", "capa", "psync2");
if (reply->type == REDIS_REPLY_ERROR) {
RedisModule_Log(ctx, REDISMODULE_LOGLEVEL_WARNING,
"REPLCONF capa eof capa psync2 failed ip:%s,port:%d, error:%s",
mobj->host, mobj->port, reply->str);
goto error;
}
RedisModule_Log(ctx, REDISMODULE_LOGLEVEL_NOTICE, "REPLCONF %s %s %s %s",
"capa", "eof", "capa", "psync2");
freeReplyObject(reply);
if (!sendSyncCommand(ctx)) {
RedisModule_Log(ctx, REDISMODULE_LOGLEVEL_WARNING,
"send PSYNC %s %s failed ip:%s,port:%d, ",
mobj->psync_replid, mobj->psync_offset, mobj->host, mobj->port);
goto error;
}
return NULL;
error:
freeReplyObject(reply);
freeMigrateObj(mobj);
return NULL;
return hi_sdsnew(buf);
}
int connectRedis(RedisModuleCtx *ctx) {
pthread_t pthread;
int flag = pthread_create(&pthread, NULL, syncWithRedis, ctx);
if (flag != 0) {
RedisModule_Log(ctx, REDISMODULE_LOGLEVEL_WARNING, "Can't start thread");
freeMigrateObj(mobj);
return RedisModule_ReplyWithError(ctx, "Can't start thread");
int receiveDataFromRedis() {
sds reply = redisReceive();
if (reply == NULL) {
serverLog(LL_WARNING, "Master did not reply to PSYNC");
return PSYNC_TRY_LATER;
}
pthread_join(pthread, NULL);
return RedisModule_ReplyWithSimpleString(ctx, "OK");
if (sdslen(reply) == 0) {
sdsfree(reply);
return PSYNC_WAIT_REPLY;
}
serverLog(LL_NOTICE, "reply=%s", reply);
if (!strncmp(reply, "+FULLRESYNC", 11)) {
char *replid = NULL, *offset = NULL;
/* FULL RESYNC, parse the reply in order to extract the replid
* and the replication offset. */
replid = strchr(reply, ' ');
if (replid) {
replid++;
offset = strchr(replid, ' ');
if (offset)
offset++;
}
if (!replid || !offset || (offset - replid - 1) != CONFIG_RUN_ID_SIZE) {
serverLog(LL_WARNING, "Master replied with wrong +FULLRESYNC syntax.");
/* This is an unexpected condition, actually the +FULLRESYNC
* reply means that the master supports PSYNC, but the reply
* format seems wrong. To stay safe we blank the master
* replid to make sure next PSYNCs will fail. */
memset(mobj->master_replid, 0, CONFIG_RUN_ID_SIZE + 1);
} else {
memcpy(mobj->master_replid, replid, offset - replid - 1);
mobj->master_replid[CONFIG_RUN_ID_SIZE] = '\0';
mobj->master_initial_offset = strtoll(offset, NULL, 10);
serverLog(LL_NOTICE, "Full sync from master: %s:%lld",
mobj->master_replid, mobj->master_initial_offset);
}
sdsfree(reply);
return PSYNC_FULLRESYNC;
}
if (!strncmp(reply, "+CONTINUE", 9)) {
/* Partial resync was accepted. */
serverLog(LL_NOTICE, "Successful partial resynchronization with master.");
/* Check the new replication ID advertised by the master. If it
* changed, we need to set the new ID as primary ID, and set
* secondary ID as the old master ID up to the current offset, so
* that our sub-slaves will be able to PSYNC with us after a
* disconnection. */
char *start = reply + 10;
char *end = reply + 9;
while (end[0] != '\r' && end[0] != '\n' && end[0] != '\0')
end++;
if (end - start == CONFIG_RUN_ID_SIZE) {
char new[CONFIG_RUN_ID_SIZE + 1];
memcpy(new, start, CONFIG_RUN_ID_SIZE);
new[CONFIG_RUN_ID_SIZE] = '\0';
return PSYNC_CONTINUE;
}
}
}
void readFullData() {
static char eofmark[CONFIG_RUN_ID_SIZE];
static char lastbytes[CONFIG_RUN_ID_SIZE];
static int usemark = 0;
char buf[PROTO_IOBUF_LEN];
if (mobj->repl_transfer_size == -1) {
int nread = syncReadLine(mobj->source_cc->fd, buf, PROTO_IOBUF_LEN, mobj->timeout);
if (nread == -1) {
serverLog(LL_WARNING, "read full data failed");
goto error;
}
if (buf[0] == '-') {
serverLog(LL_WARNING,
"MASTER aborted replication with an error: %s",
buf + 1);
goto error;
} else if (buf[0] == '\0') {
// mobj->repl_transfer_lastio = server.unixtime;
return;
} else if (buf[0] != '$') {
serverLog(LL_WARNING, "Bad protocol from MASTER, the first byte is not '$' (we received '%s'), are you sure the host and port are right?", buf);
goto error;
}
if (strncmp(buf + 1, "EOF:", 4) == 0 && strlen(buf + 5) >= CONFIG_RUN_ID_SIZE) {
usemark = 1;
usemark = 1;
memcpy(eofmark, buf + 5, CONFIG_RUN_ID_SIZE);
memset(lastbytes, 0, CONFIG_RUN_ID_SIZE);
/* Set any repl_transfer_size to avoid entering this code path
* at the next call. */
mobj->repl_transfer_size = 0;
serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: receiving streamed RDB from master with EOF to parser");
} else {
usemark = 0;
mobj->repl_transfer_size = strtol(buf + 1, NULL, 10);
serverLog(LL_NOTICE,
"MASTER <-> REPLICA sync: receiving %lld bytes from master to parser",
(long long)mobj->repl_transfer_size);
}
}
int flag = rdbLoadRioWithLoading(mobj);
return;
error:
cancelMigrate();
return;
}
void cancelMigrate() {
}
void syncDataWithRedis(int fd, void *user_data, int mask) {
REDISMODULE_NOT_USED(fd);
REDISMODULE_NOT_USED(mask);
REDISMODULE_NOT_USED(user_data);
sds err = NULL;
if (mobj->repl_stat == REPL_STATE_CONNECTING) {
if (redisSendCommand(mobj->source_cc, "PING") != REDIS_OK) {
serverLog(LL_WARNING, "send PING failed ip:%s,port:%d",
mobj->host, mobj->port);
goto error;
}
mobj->repl_stat = REPL_STATE_RECEIVE_PING_REPLY;
return;
}
if (mobj->repl_stat == REPL_STATE_RECEIVE_PING_REPLY) {
err = redisReceive();
if (err == NULL)
goto no_response_error;
if (err[0] != '+' && strncmp(err, "-NOAUTH", 7) != 0 && strncmp(err, "-NOPERM", 7) != 0 && strncmp(err, "-ERR operation not permitted", 28) != 0) {
serverLog(LL_WARNING, "Error reply to PING from master: '%s'", err);
sdsfree(err);
goto error;
} else {
serverLog(LL_NOTICE, "Master replied to PING, replication can continue...");
}
sdsfree(err);
err = NULL;
mobj->repl_stat = REPL_STATE_SEND_HANDSHAKE;
return;
}
if (mobj->repl_stat == REPL_STATE_SEND_HANDSHAKE) {
// todo 增加认证
mobj->repl_stat = REPL_STATE_RECEIVE_AUTH_REPLY;
}
if (mobj->repl_stat == REPL_STATE_RECEIVE_AUTH_REPLY) {
// todo 接受认证信息
sds portstr = sdsfromlonglong(mobj->port);
if (redisSendCommand(mobj->source_cc, "REPLCONF listening-port %s", portstr) != REDIS_OK) {
serverLog(LL_WARNING, "send PING failed ip:%s,port:%d",
mobj->host, mobj->port);
goto error;
}
mobj->repl_stat = REPL_STATE_RECEIVE_PORT_REPLY;
sdsfree(portstr);
return;
}
if (mobj->repl_stat == REPL_STATE_RECEIVE_PORT_REPLY) {
err = redisReceive();
if (err == NULL)
goto no_response_error;
if (err[0] == '-') {
serverLog(LL_NOTICE, "(Non critical) Master does not understand REPLCONF listening-port: %s", err);
goto error;
}
serverLog(LL_NOTICE, "REPLCONF listening-port success");
if (redisSendCommand(mobj->source_cc, "REPLCONF ip-address %s", mobj->host) != REDIS_OK) {
serverLog(LL_WARNING, "REPLCONF ip-address %s failed", mobj->host);
goto error;
}
sdsfree(err);
err = NULL;
mobj->repl_stat = REPL_STATE_RECEIVE_IP_REPLY;
return;
}
if (mobj->repl_stat == REPL_STATE_RECEIVE_IP_REPLY) {
err = redisReceive();
if (err == NULL)
goto no_response_error;
if (err[0] == '-') {
serverLog(LL_NOTICE, "(Non critical) Master does not understand REPLCONF ip-address: %s", err);
goto error;
}
serverLog(LL_NOTICE, "REPLCONF REPLCONF ip-address success");
if (redisSendCommand(mobj->source_cc, "REPLCONF %s %s %s %s", "capa", "eof", "capa", "psync2") != REDIS_OK) {
serverLog(LL_WARNING, "send REPLCONF capa eof capa psync2 failed");
goto error;
}
sdsfree(err);
err = NULL;
mobj->repl_stat = REPL_STATE_RECEIVE_CAPA_REPLY;
return;
}
if (mobj->repl_stat == REPL_STATE_RECEIVE_CAPA_REPLY) {
err = redisReceive();
if (err == NULL)
goto no_response_error;
if (err[0] == '-') {
serverLog(LL_NOTICE, "(Non critical) Master does not understand REPLCONF capa: %s", err);
goto error;
}
serverLog(LL_NOTICE, "REPLCONF capa eof capa psync2 success");
sdsfree(err);
err = NULL;
mobj->repl_stat = REPL_STATE_SEND_PSYNC;
return;
}
if (mobj->repl_stat == REPL_STATE_SEND_PSYNC) {
if (!sendSyncCommand()) {
serverLog(LL_WARNING, "send PSYNC %s %s failed ip:%s,port:%d, ",
mobj->psync_replid, mobj->psync_offset, mobj->host, mobj->port);
goto error;
}
mobj->repl_stat = REPL_STATE_RECEIVE_PSYNC_REPLY;
return;
}
if (mobj->repl_stat != REPL_STATE_RECEIVE_PSYNC_REPLY) {
serverLog(LL_WARNING, "syncDataWithRedis(): state machine error, state should be RECEIVE_PSYNC but is %d",
mobj->repl_stat);
goto error;
}
int psync_result = receiveDataFromRedis();
if (psync_result == PSYNC_WAIT_REPLY)
return;
if (psync_result == PSYNC_TRY_LATER)
goto error;
if (psync_result == PSYNC_CONTINUE) {
}
// 接受全部数据
serverLog(LL_NOTICE, "begin receive full data");
readFullData();
return;
error:
if (err != NULL) {
sdsfree(err);
}
freeMigrateObj(mobj);
no_response_error: /* Handle receiveSynchronousResponse() error when master has no reply */
serverLog(LL_WARNING, "Master did not respond to command during SYNC handshake");
}
/**
* migrate data to current instance.
* migrate host port begin-slot end-slot
* @param ctx
* @param ctxz
* @param argv
* @param argc
* @return
@ -130,35 +329,37 @@ int rm_migrateCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
if (argc != 5) {
return RedisModule_WrongArity(ctx);
}
robj *host = (robj *) argv[1];
robj *port = (robj *) argv[2];
robj *begin_slot = (robj *) argv[3];
robj *end_slot = (robj *) argv[4];
RedisModule_Log(ctx, REDISMODULE_LOGLEVEL_NOTICE, "host:%s, port:%s, begin:%s, end:%s", (char *) host->ptr,
(char *) port->ptr, (char *) begin_slot->ptr, (char *) end_slot->ptr);
if (RedisModule_IsKeysPositionRequest(ctx)) {
RedisModule_Log(ctx, VERBOSE, "get keys from module");
return RedisModule_ReplyWithSimpleString(ctx, "OK");
}
robj *host = (robj *)argv[1];
robj *port = (robj *)argv[2];
robj *begin_slot = (robj *)argv[3];
robj *end_slot = (robj *)argv[4];
RedisModule_Log(ctx, NOTICE, "host:%s, port:%s, begin:%s, end:%s", (char *)host->ptr,
(char *)port->ptr, (char *)begin_slot->ptr, (char *)end_slot->ptr);
if (mobj != NULL) {
return RedisModule_ReplyWithError(ctx, "-ERR is migrating, please waiting");
return RedisModule_ReplyWithError(ctx, "migrating, please waiting");
}
mobj = createMigrateObject(host, atoi(port->ptr), atoi(begin_slot->ptr), atoi(end_slot->ptr));
struct timeval timeout = {1, 500000}; // 1.5s
redisOptions options = {0};
REDIS_OPTIONS_SET_TCP(&options, (const char *) mobj->host, mobj->port);
REDIS_OPTIONS_SET_TCP(&options, (const char *)mobj->host, mobj->port);
options.connect_timeout = &timeout;
mobj->source_cc = redisConnectWithOptions(&options);
if (mobj->source_cc == NULL || mobj->source_cc->err) {
RedisModule_Log(ctx, REDISMODULE_LOGLEVEL_WARNING, "Could not connect to Redis at ip:%s,port:%d, error:%s",
RedisModule_Log(ctx, WARNING, "Could not connect to Redis at ip:%s,port:%d, error:%s",
mobj->host, mobj->port, mobj->source_cc->errstr);
freeMigrateObj(mobj);
return RedisModule_ReplyWithError(ctx, "Can't connect source redis");
}
return connectRedis(ctx);
mobj->repl_stat = REPL_STATE_CONNECTING;
RedisModule_EventLoopAdd(mobj->source_cc->fd, AE_WRITABLE, syncDataWithRedis, ctx);
return RedisModule_ReplyWithSimpleString(ctx, "OK");
}
/* This function must be present on each Redis module. It is used in order to
* register the commands into the Redis server. */
int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
REDISMODULE_NOT_USED(argv);
REDISMODULE_NOT_USED(argc);
@ -166,13 +367,23 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
if (flag == REDISMODULE_ERR) {
return REDISMODULE_ERR;
}
RedisModule_SetClusterFlags(ctx, REDISMODULE_CLUSTER_FLAG_NO_REDIRECTION);
RedisModule_Log(ctx, REDISMODULE_LOGLEVEL_NOTICE, "begin init commands of %s", MODULE_NAME);
flag = RedisModule_CreateCommand(ctx, "rm.migrate", rm_migrateCommand, "write deny-oom admin", 1, 1, 0);
RedisModule_Log(ctx, NOTICE, "begin init commands of %s", MODULE_NAME);
flag = RedisModule_CreateCommand(ctx, "rm.migrate", rm_migrateCommand, "write deny-oom admin getkeys-api", 0, 0, 0);
if (flag == REDISMODULE_ERR) {
RedisModule_Log(ctx, REDISMODULE_LOGLEVEL_WARNING, "init rm.migrate failed");
RedisModule_Log(ctx, WARNING, "init rm.migrate failed");
return REDISMODULE_ERR;
}
RedisModule_Log(ctx, REDISMODULE_LOGLEVEL_NOTICE, "init %s success", MODULE_NAME);
RedisModuleCallReply *reply = RedisModule_Call(ctx, "config", "cc", "get", "logfile");
long long items = RedisModule_CallReplyLength(reply);
if (items != 2) {
RedisModule_Log(ctx, WARNING, "logfile is empty");
return REDISMODULE_ERR;
}
RedisModuleCallReply *item1 = RedisModule_CallReplyArrayElement(reply, 1);
robj *logfile = (robj *)RedisModule_CreateStringFromCallReply(item1);
RedisModule_Log(ctx, NOTICE, "logfile is %s", (char *)logfile->ptr);
createLogObj((char *)logfile->ptr);
RedisModule_Log(ctx, NOTICE, "init %s success", MODULE_NAME);
return REDISMODULE_OK;
}

View File

@ -6,27 +6,37 @@
#include "ae.h"
#include "sds.h"
#include "sdscompat.h"
#include "log.h"
#define MODULE_NAME "redis-migrate"
#define REDIS_MIGRATE_VERSION 1
#define LRU_BITS 24
#define CONFIG_RUN_ID_SIZE 40
#define RDB_EOF_MARK_SIZE 40
#define PSYNC_WRITE_ERROR 0
#define PSYNC_WAIT_REPLY 1
#define PSYNC_CONTINUE 2
#define PSYNC_FULLRESYNC 3
#define PSYNC_NOT_SUPPORTED 4
#define PSYNC_TRY_LATER 5
#define C_ERR -1
#define C_OK 1
#define PROTO_IOBUF_LEN (1024 * 16)
/* Anti-warning macro... */
#define UNUSED(V) ((void) V)
#define UNUSED(V) ((void)V)
typedef struct redisObject {
unsigned type: 4;
unsigned encoding: 4;
unsigned lru: LRU_BITS; /* LRU time (relative to global lru_clock) or
* LFU data (least significant 8 bits frequency
* and most significant 16 bits access time). */
typedef struct redisObject
{
unsigned type : 4;
unsigned encoding : 4;
unsigned lru : LRU_BITS;
int refcount;
void *ptr;
} robj;
typedef struct migrateObject {
typedef struct migrateObject
{
char *address;
int repl_stat;
redisContext *source_cc;
@ -35,14 +45,20 @@ typedef struct migrateObject {
int begin_slot;
int end_slot;
char *psync_replid;
char master_replid[CONFIG_RUN_ID_SIZE + 1];
int timeout;
int isCache;
char psync_offset[32];
int repl_transfer_size;
long long master_initial_offset;
time_t repl_transfer_lastio;
} migrateObj;
typedef enum {
REPL_STATE_NONE = 0, /* No active replication */
REPL_STATE_CONNECT, /* Must connect to master */
REPL_STATE_CONNECTING, /* Connecting to master */
typedef enum
{
REPL_STATE_NONE = 0, /* No active replication */
REPL_STATE_CONNECT, /* Must connect to master */
REPL_STATE_CONNECTING, /* Connecting to master */
/* --- Handshake states, must be ordered --- */
REPL_STATE_RECEIVE_PING_REPLY, /* Wait for PING reply */
REPL_STATE_SEND_HANDSHAKE, /* Send handshake sequence to master */
@ -53,26 +69,38 @@ typedef enum {
REPL_STATE_SEND_PSYNC, /* Send PSYNC */
REPL_STATE_RECEIVE_PSYNC_REPLY, /* Wait for PSYNC reply */
/* --- End of handshake states --- */
REPL_STATE_TRANSFER, /* Receiving .rdb from master */
REPL_STATE_CONNECTED, /* Connected to master */
STATE_CONNECT_ERROR,
STATE_DISCONNECT
REPL_STATE_TRANSFER, /* Receiving .rdb from master */
REPL_STATE_CONNECTED, /* Connected to master */
} repl_state;
long long ustime(void);
mstime_t mstime(void);
migrateObj *createMigrateObject(robj *host, int port, int begin_slot, int end_slot);
void freeMigrateObj(migrateObj *m);
int sendSyncCommand(RedisModuleCtx *ctx);
int sendSyncCommand();
void *syncWithRedis(void *arg);
int receiveDataFromRedis();
int connectRedis(RedisModuleCtx *ctx) ;
ssize_t syncWrite(int fd, char *ptr, ssize_t size, long long timeout);
ssize_t syncRead(int fd, char *ptr, ssize_t size, long long timeout);
ssize_t syncReadLine(int fd, char *ptr, ssize_t size, long long timeout);
sds redisReceive();
void readFullData();
void cancelMigrate();
void syncDataWithRedis(int fd, void *user_data, int mask);
int rm_migrateCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc);
int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc);
#endif //REDIS_MIGRATE_REDIS_MIGRATE_H
#endif // REDIS_MIGRATE_REDIS_MIGRATE_H

View File

@ -220,7 +220,7 @@ This flag should not be used directly by the module.
/* Logging level strings */
#define REDISMODULE_LOGLEVEL_DEBUG "debug"
#define REDISMODULE_LOGLEVEL_VERBOSE "verbose"
#define REDISMODULE_LOGLEVEL_VERBOSE "VERBOSE"
#define REDISMODULE_LOGLEVEL_NOTICE "notice"
#define REDISMODULE_LOGLEVEL_WARNING "warning"

117
src/syncio.c Normal file
View File

@ -0,0 +1,117 @@
#include "redis-migrate.h"
#include "ae.h"
#include <unistd.h>
#include <errno.h>
/* ----------------- Blocking sockets I/O with timeouts --------------------- */
#define SYNCIO__RESOLUTION 10
ssize_t syncWrite(int fd, char *ptr, ssize_t size, long long timeout)
{
ssize_t nwritten, ret = size;
long long start = mstime();
long long remaining = timeout;
while (1)
{
long long wait = (remaining > SYNCIO__RESOLUTION) ? remaining : SYNCIO__RESOLUTION;
long long elapsed;
nwritten = write(fd, ptr, size);
if (nwritten == -1)
{
if (errno != EAGAIN)
return -1;
}
else
{
ptr += nwritten;
size -= nwritten;
}
if (size == 0)
return ret;
/* Wait */
aeWait(fd, AE_WRITABLE, wait);
elapsed = mstime() - start;
if (elapsed >= timeout)
{
errno = ETIMEDOUT;
return -1;
}
remaining = timeout - elapsed;
}
}
ssize_t syncRead(int fd, char *ptr, ssize_t size, long long timeout)
{
ssize_t nread, totread = 0;
long long start = mstime();
long long remaining = timeout;
if (size == 0)
return 0;
while (1)
{
long long wait = (remaining > SYNCIO__RESOLUTION) ? remaining : SYNCIO__RESOLUTION;
long long elapsed;
nread = read(fd, ptr, size);
if (nread == 0)
return -1;
if (nread == -1)
{
if (errno != EAGAIN)
return -1;
}
else
{
ptr += nread;
size -= nread;
totread += nread;
}
if (size == 0)
return totread;
/* Wait */
aeWait(fd, AE_READABLE, wait);
elapsed = mstime() - start;
if (elapsed >= timeout)
{
errno = ETIMEDOUT;
return -1;
}
remaining = timeout - elapsed;
}
}
ssize_t syncReadLine(int fd, char *ptr, ssize_t size, long long timeout)
{
ssize_t nread = 0;
size--;
while (size)
{
char c;
if (syncRead(fd, &c, 1, timeout) == -1)
return -1;
if (c == '\n')
{
*ptr = '\0';
if (nread && *(ptr - 1) == '\r')
*(ptr - 1) = '\0';
return nread;
}
else
{
*ptr++ = c;
*ptr = '\0';
nread++;
}
size--;
}
return nread;
}