sync #7
213
.clang-format
Normal file
213
.clang-format
Normal file
@ -0,0 +1,213 @@
|
||||
# 语言: None, Cpp, Java, JavaScript, ObjC, Proto, TableGen, TextProto
|
||||
Language: Cpp
|
||||
# BasedOnStyle: LLVM
|
||||
|
||||
# 访问说明符(public、private等)的偏移
|
||||
AccessModifierOffset: -4
|
||||
|
||||
# 开括号(开圆括号、开尖括号、开方括号)后的对齐: Align, DontAlign, AlwaysBreak(总是在开括号后换行)
|
||||
AlignAfterOpenBracket: Align
|
||||
|
||||
# 连续赋值时,对齐所有等号
|
||||
AlignConsecutiveAssignments: false
|
||||
|
||||
# 连续声明时,对齐所有声明的变量名
|
||||
AlignConsecutiveDeclarations: false
|
||||
|
||||
# 右对齐逃脱换行(使用反斜杠换行)的反斜杠
|
||||
AlignEscapedNewlines: Right
|
||||
|
||||
# 水平对齐二元和三元表达式的操作数
|
||||
AlignOperands: true
|
||||
|
||||
# 对齐连续的尾随的注释
|
||||
AlignTrailingComments: true
|
||||
|
||||
# 不允许函数声明的所有参数在放在下一行
|
||||
AllowAllParametersOfDeclarationOnNextLine: false
|
||||
|
||||
# 不允许短的块放在同一行
|
||||
AllowShortBlocksOnASingleLine: true
|
||||
|
||||
# 允许短的case标签放在同一行
|
||||
AllowShortCaseLabelsOnASingleLine: true
|
||||
|
||||
# 允许短的函数放在同一行: None, InlineOnly(定义在类中), Empty(空函数), Inline(定义在类中,空函数), All
|
||||
AllowShortFunctionsOnASingleLine: None
|
||||
|
||||
# 允许短的if语句保持在同一行
|
||||
AllowShortIfStatementsOnASingleLine: true
|
||||
|
||||
# 允许短的循环保持在同一行
|
||||
AllowShortLoopsOnASingleLine: true
|
||||
|
||||
# 总是在返回类型后换行: None, All, TopLevel(顶级函数,不包括在类中的函数),
|
||||
# AllDefinitions(所有的定义,不包括声明), TopLevelDefinitions(所有的顶级函数的定义)
|
||||
AlwaysBreakAfterReturnType: None
|
||||
|
||||
# 总是在多行string字面量前换行
|
||||
AlwaysBreakBeforeMultilineStrings: false
|
||||
|
||||
# 总是在template声明后换行
|
||||
AlwaysBreakTemplateDeclarations: true
|
||||
|
||||
# false表示函数实参要么都在同一行,要么都各自一行
|
||||
BinPackArguments: true
|
||||
|
||||
# false表示所有形参要么都在同一行,要么都各自一行
|
||||
BinPackParameters: true
|
||||
|
||||
# 大括号换行,只有当BreakBeforeBraces设置为Custom时才有效
|
||||
BraceWrapping:
|
||||
# class定义后面
|
||||
AfterClass: false
|
||||
# 控制语句后面
|
||||
AfterControlStatement: false
|
||||
# enum定义后面
|
||||
AfterEnum: false
|
||||
# 函数定义后面
|
||||
AfterFunction: false
|
||||
# 命名空间定义后面
|
||||
AfterNamespace: false
|
||||
# struct定义后面
|
||||
AfterStruct: false
|
||||
# union定义后面
|
||||
AfterUnion: false
|
||||
# extern之后
|
||||
AfterExternBlock: false
|
||||
# catch之前
|
||||
BeforeCatch: false
|
||||
# else之前
|
||||
BeforeElse: false
|
||||
# 缩进大括号
|
||||
IndentBraces: false
|
||||
# 分离空函数
|
||||
SplitEmptyFunction: false
|
||||
# 分离空语句
|
||||
SplitEmptyRecord: false
|
||||
# 分离空命名空间
|
||||
SplitEmptyNamespace: false
|
||||
|
||||
# 在二元运算符前换行: None(在操作符后换行), NonAssignment(在非赋值的操作符前换行), All(在操作符前换行)
|
||||
BreakBeforeBinaryOperators: NonAssignment
|
||||
|
||||
# 在大括号前换行: Attach(始终将大括号附加到周围的上下文), Linux(除函数、命名空间和类定义,与Attach类似),
|
||||
# Mozilla(除枚举、函数、记录定义,与Attach类似), Stroustrup(除函数定义、catch、else,与Attach类似),
|
||||
# Allman(总是在大括号前换行), GNU(总是在大括号前换行,并对于控制语句的大括号增加额外的缩进), WebKit(在函数前换行), Custom
|
||||
# 注:这里认为语句块也属于函数
|
||||
BreakBeforeBraces: Custom
|
||||
|
||||
# 在三元运算符前换行
|
||||
BreakBeforeTernaryOperators: false
|
||||
|
||||
# 在构造函数的初始化列表的冒号后换行
|
||||
BreakConstructorInitializers: AfterColon
|
||||
|
||||
#BreakInheritanceList: AfterColon
|
||||
|
||||
BreakStringLiterals: false
|
||||
|
||||
# 每行字符的限制,0表示没有限制
|
||||
ColumnLimit: 0
|
||||
|
||||
CompactNamespaces: true
|
||||
|
||||
# 构造函数的初始化列表要么都在同一行,要么都各自一行
|
||||
ConstructorInitializerAllOnOneLineOrOnePerLine: false
|
||||
|
||||
# 构造函数的初始化列表的缩进宽度
|
||||
ConstructorInitializerIndentWidth: 4
|
||||
|
||||
# 延续的行的缩进宽度
|
||||
ContinuationIndentWidth: 4
|
||||
|
||||
# 去除C++11的列表初始化的大括号{后和}前的空格
|
||||
Cpp11BracedListStyle: true
|
||||
|
||||
# 继承最常用的指针和引用的对齐方式
|
||||
DerivePointerAlignment: false
|
||||
|
||||
# 固定命名空间注释
|
||||
FixNamespaceComments: true
|
||||
|
||||
# 缩进case标签
|
||||
IndentCaseLabels: false
|
||||
|
||||
IndentPPDirectives: None
|
||||
|
||||
# 缩进宽度
|
||||
IndentWidth: 4
|
||||
|
||||
# 函数返回类型换行时,缩进函数声明或函数定义的函数名
|
||||
IndentWrappedFunctionNames: false
|
||||
|
||||
# 保留在块开始处的空行
|
||||
KeepEmptyLinesAtTheStartOfBlocks: false
|
||||
|
||||
# 连续空行的最大数量
|
||||
MaxEmptyLinesToKeep: 1
|
||||
|
||||
# 命名空间的缩进: None, Inner(缩进嵌套的命名空间中的内容), All
|
||||
NamespaceIndentation: None
|
||||
|
||||
# 指针和引用的对齐: Left, Right, Middle
|
||||
PointerAlignment: Right
|
||||
|
||||
# 允许重新排版注释
|
||||
ReflowComments: true
|
||||
|
||||
# 允许排序#include
|
||||
SortIncludes: false
|
||||
|
||||
# 允许排序 using 声明
|
||||
SortUsingDeclarations: false
|
||||
|
||||
# 在C风格类型转换后添加空格
|
||||
SpaceAfterCStyleCast: false
|
||||
|
||||
# 在Template 关键字后面添加空格
|
||||
SpaceAfterTemplateKeyword: true
|
||||
|
||||
# 在赋值运算符之前添加空格
|
||||
SpaceBeforeAssignmentOperators: true
|
||||
|
||||
# SpaceBeforeCpp11BracedList: true
|
||||
|
||||
# SpaceBeforeCtorInitializerColon: true
|
||||
|
||||
# SpaceBeforeInheritanceColon: true
|
||||
|
||||
# 开圆括号之前添加一个空格: Never, ControlStatements, Always
|
||||
SpaceBeforeParens: ControlStatements
|
||||
|
||||
# SpaceBeforeRangeBasedForLoopColon: true
|
||||
|
||||
# 在空的圆括号中添加空格
|
||||
SpaceInEmptyParentheses: false
|
||||
|
||||
# 在尾随的评论前添加的空格数(只适用于//)
|
||||
SpacesBeforeTrailingComments: 1
|
||||
|
||||
# 在尖括号的<后和>前添加空格
|
||||
SpacesInAngles: false
|
||||
|
||||
# 在C风格类型转换的括号中添加空格
|
||||
SpacesInCStyleCastParentheses: false
|
||||
|
||||
# 在容器(ObjC和JavaScript的数组和字典等)字面量中添加空格
|
||||
SpacesInContainerLiterals: true
|
||||
|
||||
# 在圆括号的(后和)前添加空格
|
||||
SpacesInParentheses: false
|
||||
|
||||
# 在方括号的[后和]前添加空格,lamda表达式和未指明大小的数组的声明不受影响
|
||||
SpacesInSquareBrackets: false
|
||||
|
||||
# 标准: Cpp03, Cpp11, Auto
|
||||
Standard: Cpp11
|
||||
|
||||
# tab宽度
|
||||
TabWidth: 4
|
||||
|
||||
# 使用tab字符: Never, ForIndentation, ForContinuationAndIndentation, Always
|
||||
UseTab: Never
|
@ -37,9 +37,12 @@ alloc.xo: fmacros.h alloc.h
|
||||
dict.xo: fmacros.h alloc.h dict.h
|
||||
ae.xo: redismodule.h
|
||||
hiredis.xo:fmacros.h hiredis.h net.h sds.h alloc.h async.h
|
||||
syncio.xo:redis-migrate.c
|
||||
log.xo: log.h
|
||||
rdbLoad.xo: rdbLoad.h
|
||||
redis-migrate.xo: redismodule.h
|
||||
|
||||
XO_LIBS=redis-migrate.xo hiredis.xo sds.xo ssl.xo read.xo alloc.xo dict.xo async.xo net.xo ae.xo
|
||||
XO_LIBS=redis-migrate.xo hiredis.xo sds.xo ssl.xo read.xo alloc.xo dict.xo async.xo net.xo ae.xo syncio.xo log.xo rdbLoad.xo
|
||||
|
||||
redis-migrate.so: $(XO_LIBS)
|
||||
$(LD) -o $@ $^ $(SHOBJ_LDFLAGS) $(LIBS) -lc
|
||||
|
509
src/ae.c
509
src/ae.c
@ -1,502 +1,25 @@
|
||||
#include <errno.h>
|
||||
#include <sys/types.h>
|
||||
#include <sys/epoll.h>
|
||||
#include <fcntl.h>
|
||||
#include "ae.h"
|
||||
#include <string.h>
|
||||
#include <poll.h>
|
||||
|
||||
int aeWait(int fd, int mask, long long milliseconds) {
|
||||
struct pollfd pfd;
|
||||
int retmask = 0, retval;
|
||||
|
||||
void redisAeReadEvent(aeEventLoop *el, int fd, void *privdata, int mask) {
|
||||
((void)el); ((void)fd); ((void)mask);
|
||||
memset(&pfd, 0, sizeof(pfd));
|
||||
pfd.fd = fd;
|
||||
if (mask & AE_READABLE) pfd.events |= POLLIN;
|
||||
if (mask & AE_WRITABLE) pfd.events |= POLLOUT;
|
||||
|
||||
redisAeEvents *e = (redisAeEvents*)privdata;
|
||||
redisAsyncHandleRead(e->context);
|
||||
}
|
||||
|
||||
void aeStop(aeEventLoop *eventLoop) {
|
||||
eventLoop->stop = 1;
|
||||
}
|
||||
|
||||
void aeMain(aeEventLoop *eventLoop) {
|
||||
eventLoop->stop = 0;
|
||||
while (!eventLoop->stop) {
|
||||
aeProcessEvents(eventLoop, AE_ALL_EVENTS|
|
||||
AE_CALL_BEFORE_SLEEP|
|
||||
AE_CALL_AFTER_SLEEP);
|
||||
}
|
||||
}
|
||||
|
||||
monotime (*getMonotonicUs)(void) = NULL;
|
||||
|
||||
/* Process time events */
|
||||
static int processTimeEvents(aeEventLoop *eventLoop) {
|
||||
int processed = 0;
|
||||
aeTimeEvent *te;
|
||||
long long maxId;
|
||||
|
||||
te = eventLoop->timeEventHead;
|
||||
maxId = eventLoop->timeEventNextId-1;
|
||||
monotime now = getMonotonicUs();
|
||||
while(te) {
|
||||
long long id;
|
||||
|
||||
/* Remove events scheduled for deletion. */
|
||||
if (te->id == AE_DELETED_EVENT_ID) {
|
||||
aeTimeEvent *next = te->next;
|
||||
/* If a reference exists for this timer event,
|
||||
* don't free it. This is currently incremented
|
||||
* for recursive timerProc calls */
|
||||
if (te->refcount) {
|
||||
te = next;
|
||||
continue;
|
||||
}
|
||||
if (te->prev)
|
||||
te->prev->next = te->next;
|
||||
else
|
||||
eventLoop->timeEventHead = te->next;
|
||||
if (te->next)
|
||||
te->next->prev = te->prev;
|
||||
if (te->finalizerProc) {
|
||||
te->finalizerProc(eventLoop, te->clientData);
|
||||
now = getMonotonicUs();
|
||||
}
|
||||
hi_free(te);
|
||||
te = next;
|
||||
continue;
|
||||
}
|
||||
|
||||
/* Make sure we don't process time events created by time events in
|
||||
* this iteration. Note that this check is currently useless: we always
|
||||
* add new timers on the head, however if we change the implementation
|
||||
* detail, this check may be useful again: we keep it here for future
|
||||
* defense. */
|
||||
if (te->id > maxId) {
|
||||
te = te->next;
|
||||
continue;
|
||||
}
|
||||
|
||||
if (te->when <= now) {
|
||||
int retval;
|
||||
|
||||
id = te->id;
|
||||
te->refcount++;
|
||||
retval = te->timeProc(eventLoop, id, te->clientData);
|
||||
te->refcount--;
|
||||
processed++;
|
||||
now = getMonotonicUs();
|
||||
if (retval != AE_NOMORE) {
|
||||
te->when = now + retval * 1000;
|
||||
if ((retval = poll(&pfd, 1, milliseconds))== 1) {
|
||||
if (pfd.revents & POLLIN) retmask |= AE_READABLE;
|
||||
if (pfd.revents & POLLOUT) retmask |= AE_WRITABLE;
|
||||
if (pfd.revents & POLLERR) retmask |= AE_WRITABLE;
|
||||
if (pfd.revents & POLLHUP) retmask |= AE_WRITABLE;
|
||||
return retmask;
|
||||
} else {
|
||||
te->id = AE_DELETED_EVENT_ID;
|
||||
}
|
||||
}
|
||||
te = te->next;
|
||||
}
|
||||
return processed;
|
||||
}
|
||||
|
||||
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
|
||||
aeApiState *state = eventLoop->apidata;
|
||||
int retval, numevents = 0;
|
||||
|
||||
retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,
|
||||
tvp ? (tvp->tv_sec*1000 + (tvp->tv_usec + 999)/1000) : -1);
|
||||
if (retval > 0) {
|
||||
int j;
|
||||
|
||||
numevents = retval;
|
||||
for (j = 0; j < numevents; j++) {
|
||||
int mask = 0;
|
||||
struct epoll_event *e = state->events+j;
|
||||
|
||||
if (e->events & EPOLLIN) mask |= AE_READABLE;
|
||||
if (e->events & EPOLLOUT) mask |= AE_WRITABLE;
|
||||
if (e->events & EPOLLERR) mask |= AE_WRITABLE|AE_READABLE;
|
||||
if (e->events & EPOLLHUP) mask |= AE_WRITABLE|AE_READABLE;
|
||||
eventLoop->fired[j].fd = e->data.fd;
|
||||
eventLoop->fired[j].mask = mask;
|
||||
}
|
||||
} else if (retval == -1 && errno != EINTR) {
|
||||
//panic("aeApiPoll: epoll_wait, %s", strerror(errno));
|
||||
}
|
||||
|
||||
return numevents;
|
||||
}
|
||||
|
||||
static int64_t usUntilEarliestTimer(aeEventLoop *eventLoop) {
|
||||
aeTimeEvent *te = eventLoop->timeEventHead;
|
||||
if (te == NULL) return -1;
|
||||
|
||||
aeTimeEvent *earliest = NULL;
|
||||
while (te) {
|
||||
if (!earliest || te->when < earliest->when)
|
||||
earliest = te;
|
||||
te = te->next;
|
||||
}
|
||||
|
||||
monotime now = getMonotonicUs();
|
||||
return (now >= earliest->when) ? 0 : earliest->when - now;
|
||||
}
|
||||
|
||||
/* Process every pending time event, then every pending file event
|
||||
* (that may be registered by time event callbacks just processed).
|
||||
* Without special flags the function sleeps until some file event
|
||||
* fires, or when the next time event occurs (if any).
|
||||
*
|
||||
* If flags is 0, the function does nothing and returns.
|
||||
* if flags has AE_ALL_EVENTS set, all the kind of events are processed.
|
||||
* if flags has AE_FILE_EVENTS set, file events are processed.
|
||||
* if flags has AE_TIME_EVENTS set, time events are processed.
|
||||
* if flags has AE_DONT_WAIT set, the function returns ASAP once all
|
||||
* the events that can be handled without a wait are processed.
|
||||
* if flags has AE_CALL_AFTER_SLEEP set, the aftersleep callback is called.
|
||||
* if flags has AE_CALL_BEFORE_SLEEP set, the beforesleep callback is called.
|
||||
*
|
||||
* The function returns the number of events processed. */
|
||||
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
|
||||
{
|
||||
int processed = 0, numevents;
|
||||
|
||||
/* Nothing to do? return ASAP */
|
||||
if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;
|
||||
|
||||
/* Note that we want to call select() even if there are no
|
||||
* file events to process as long as we want to process time
|
||||
* events, in order to sleep until the next time event is ready
|
||||
* to fire. */
|
||||
if (eventLoop->maxfd != -1 ||
|
||||
((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
|
||||
int j;
|
||||
struct timeval tv, *tvp;
|
||||
int64_t usUntilTimer = -1;
|
||||
|
||||
if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))
|
||||
usUntilTimer = usUntilEarliestTimer(eventLoop);
|
||||
|
||||
if (usUntilTimer >= 0) {
|
||||
tv.tv_sec = usUntilTimer / 1000000;
|
||||
tv.tv_usec = usUntilTimer % 1000000;
|
||||
tvp = &tv;
|
||||
} else {
|
||||
/* If we have to check for events but need to return
|
||||
* ASAP because of AE_DONT_WAIT we need to set the timeout
|
||||
* to zero */
|
||||
if (flags & AE_DONT_WAIT) {
|
||||
tv.tv_sec = tv.tv_usec = 0;
|
||||
tvp = &tv;
|
||||
} else {
|
||||
/* Otherwise we can block */
|
||||
tvp = NULL; /* wait forever */
|
||||
return retval;
|
||||
}
|
||||
}
|
||||
|
||||
if (eventLoop->flags & AE_DONT_WAIT) {
|
||||
tv.tv_sec = tv.tv_usec = 0;
|
||||
tvp = &tv;
|
||||
}
|
||||
|
||||
if (eventLoop->beforesleep != NULL && flags & AE_CALL_BEFORE_SLEEP)
|
||||
eventLoop->beforesleep(eventLoop);
|
||||
|
||||
/* Call the multiplexing API, will return only on timeout or when
|
||||
* some event fires. */
|
||||
numevents = aeApiPoll(eventLoop, tvp);
|
||||
|
||||
/* After sleep callback. */
|
||||
if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP)
|
||||
eventLoop->aftersleep(eventLoop);
|
||||
|
||||
for (j = 0; j < numevents; j++) {
|
||||
int fd = eventLoop->fired[j].fd;
|
||||
aeFileEvent *fe = &eventLoop->events[fd];
|
||||
int mask = eventLoop->fired[j].mask;
|
||||
int fired = 0; /* Number of events fired for current fd. */
|
||||
|
||||
/* Normally we execute the readable event first, and the writable
|
||||
* event later. This is useful as sometimes we may be able
|
||||
* to serve the reply of a query immediately after processing the
|
||||
* query.
|
||||
*
|
||||
* However if AE_BARRIER is set in the mask, our application is
|
||||
* asking us to do the reverse: never fire the writable event
|
||||
* after the readable. In such a case, we invert the calls.
|
||||
* This is useful when, for instance, we want to do things
|
||||
* in the beforeSleep() hook, like fsyncing a file to disk,
|
||||
* before replying to a client. */
|
||||
int invert = fe->mask & AE_BARRIER;
|
||||
|
||||
/* Note the "fe->mask & mask & ..." code: maybe an already
|
||||
* processed event removed an element that fired and we still
|
||||
* didn't processed, so we check if the event is still valid.
|
||||
*
|
||||
* Fire the readable event if the call sequence is not
|
||||
* inverted. */
|
||||
if (!invert && fe->mask & mask & AE_READABLE) {
|
||||
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
|
||||
fired++;
|
||||
fe = &eventLoop->events[fd]; /* Refresh in case of resize. */
|
||||
}
|
||||
|
||||
/* Fire the writable event. */
|
||||
if (fe->mask & mask & AE_WRITABLE) {
|
||||
if (!fired || fe->wfileProc != fe->rfileProc) {
|
||||
fe->wfileProc(eventLoop,fd,fe->clientData,mask);
|
||||
fired++;
|
||||
}
|
||||
}
|
||||
|
||||
/* If we have to invert the call, fire the readable event now
|
||||
* after the writable one. */
|
||||
if (invert) {
|
||||
fe = &eventLoop->events[fd]; /* Refresh in case of resize. */
|
||||
if ((fe->mask & mask & AE_READABLE) &&
|
||||
(!fired || fe->wfileProc != fe->rfileProc))
|
||||
{
|
||||
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
|
||||
fired++;
|
||||
}
|
||||
}
|
||||
|
||||
processed++;
|
||||
}
|
||||
}
|
||||
/* Check time events */
|
||||
if (flags & AE_TIME_EVENTS)
|
||||
processed += processTimeEvents(eventLoop);
|
||||
|
||||
return processed; /* return the number of processed file/time events */
|
||||
}
|
||||
|
||||
|
||||
|
||||
void redisAeWriteEvent(aeEventLoop *el, int fd, void *privdata, int mask) {
|
||||
((void)el); ((void)fd); ((void)mask);
|
||||
|
||||
redisAeEvents *e = (redisAeEvents*)privdata;
|
||||
redisAsyncHandleWrite(e->context);
|
||||
}
|
||||
|
||||
int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
|
||||
aeApiState *state = eventLoop->apidata;
|
||||
struct epoll_event ee = {0}; /* avoid valgrind warning */
|
||||
/* If the fd was already monitored for some event, we need a MOD
|
||||
* operation. Otherwise we need an ADD operation. */
|
||||
int op = eventLoop->events[fd].mask == AE_NONE ?
|
||||
EPOLL_CTL_ADD : EPOLL_CTL_MOD;
|
||||
|
||||
ee.events = 0;
|
||||
mask |= eventLoop->events[fd].mask; /* Merge old events */
|
||||
if (mask & AE_READABLE) ee.events |= EPOLLIN;
|
||||
if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;
|
||||
ee.data.fd = fd;
|
||||
if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1;
|
||||
return 0;
|
||||
}
|
||||
|
||||
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
|
||||
aeFileProc *proc, void *clientData)
|
||||
{
|
||||
if (fd >= eventLoop->setsize) {
|
||||
errno = ERANGE;
|
||||
return AE_ERR;
|
||||
}
|
||||
aeFileEvent *fe = &eventLoop->events[fd];
|
||||
|
||||
if (aeApiAddEvent(eventLoop, fd, mask) == -1)
|
||||
return AE_ERR;
|
||||
fe->mask |= mask;
|
||||
if (mask & AE_READABLE) fe->rfileProc = proc;
|
||||
if (mask & AE_WRITABLE) fe->wfileProc = proc;
|
||||
fe->clientData = clientData;
|
||||
if (fd > eventLoop->maxfd)
|
||||
eventLoop->maxfd = fd;
|
||||
return AE_OK;
|
||||
}
|
||||
|
||||
void redisAeAddRead(void *privdata) {
|
||||
redisAeEvents *e = (redisAeEvents*)privdata;
|
||||
aeEventLoop *loop = e->loop;
|
||||
if (!e->reading) {
|
||||
e->reading = 1;
|
||||
aeCreateFileEvent(loop,e->fd,AE_READABLE,redisAeReadEvent,e);
|
||||
}
|
||||
}
|
||||
|
||||
void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int delmask) {
|
||||
aeApiState *state = eventLoop->apidata;
|
||||
struct epoll_event ee = {0}; /* avoid valgrind warning */
|
||||
int mask = eventLoop->events[fd].mask & (~delmask);
|
||||
|
||||
ee.events = 0;
|
||||
if (mask & AE_READABLE) ee.events |= EPOLLIN;
|
||||
if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;
|
||||
ee.data.fd = fd;
|
||||
if (mask != AE_NONE) {
|
||||
epoll_ctl(state->epfd,EPOLL_CTL_MOD,fd,&ee);
|
||||
} else {
|
||||
/* Note, Kernel < 2.6.9 requires a non null event pointer even for
|
||||
* EPOLL_CTL_DEL. */
|
||||
epoll_ctl(state->epfd,EPOLL_CTL_DEL,fd,&ee);
|
||||
}
|
||||
}
|
||||
|
||||
void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask)
|
||||
{
|
||||
if (fd >= eventLoop->setsize) return;
|
||||
aeFileEvent *fe = &eventLoop->events[fd];
|
||||
if (fe->mask == AE_NONE) return;
|
||||
|
||||
/* We want to always remove AE_BARRIER if set when AE_WRITABLE
|
||||
* is removed. */
|
||||
if (mask & AE_WRITABLE) mask |= AE_BARRIER;
|
||||
|
||||
aeApiDelEvent(eventLoop, fd, mask);
|
||||
fe->mask = fe->mask & (~mask);
|
||||
if (fd == eventLoop->maxfd && fe->mask == AE_NONE) {
|
||||
/* Update the max fd */
|
||||
int j;
|
||||
|
||||
for (j = eventLoop->maxfd-1; j >= 0; j--)
|
||||
if (eventLoop->events[j].mask != AE_NONE) break;
|
||||
eventLoop->maxfd = j;
|
||||
}
|
||||
}
|
||||
|
||||
void redisAeDelRead(void *privdata) {
|
||||
redisAeEvents *e = (redisAeEvents*)privdata;
|
||||
aeEventLoop *loop = e->loop;
|
||||
if (e->reading) {
|
||||
e->reading = 0;
|
||||
aeDeleteFileEvent(loop,e->fd,AE_READABLE);
|
||||
}
|
||||
}
|
||||
|
||||
void redisAeAddWrite(void *privdata) {
|
||||
redisAeEvents *e = (redisAeEvents*)privdata;
|
||||
aeEventLoop *loop = e->loop;
|
||||
if (!e->writing) {
|
||||
e->writing = 1;
|
||||
aeCreateFileEvent(loop,e->fd,AE_WRITABLE,redisAeWriteEvent,e);
|
||||
}
|
||||
}
|
||||
|
||||
void redisAeDelWrite(void *privdata) {
|
||||
redisAeEvents *e = (redisAeEvents*)privdata;
|
||||
aeEventLoop *loop = e->loop;
|
||||
if (e->writing) {
|
||||
e->writing = 0;
|
||||
aeDeleteFileEvent(loop,e->fd,AE_WRITABLE);
|
||||
}
|
||||
}
|
||||
|
||||
void redisAeCleanup(void *privdata) {
|
||||
redisAeEvents *e = (redisAeEvents*)privdata;
|
||||
redisAeDelRead(privdata);
|
||||
redisAeDelWrite(privdata);
|
||||
hi_free(e);
|
||||
}
|
||||
|
||||
/* Enable the FD_CLOEXEC on the given fd to avoid fd leaks.
|
||||
* This function should be invoked for fd's on specific places
|
||||
* where fork + execve system calls are called. */
|
||||
int anetCloexec(int fd) {
|
||||
int r;
|
||||
int flags;
|
||||
|
||||
do {
|
||||
r = fcntl(fd, F_GETFD);
|
||||
} while (r == -1 && errno == EINTR);
|
||||
|
||||
if (r == -1 || (r & FD_CLOEXEC))
|
||||
return r;
|
||||
|
||||
flags = r | FD_CLOEXEC;
|
||||
|
||||
do {
|
||||
r = fcntl(fd, F_SETFD, flags);
|
||||
} while (r == -1 && errno == EINTR);
|
||||
|
||||
return r;
|
||||
}
|
||||
|
||||
int aeApiCreate(aeEventLoop *eventLoop) {
|
||||
aeApiState *state = hi_malloc(sizeof(aeApiState));
|
||||
|
||||
if (!state) return -1;
|
||||
state->events = hi_malloc(sizeof(struct epoll_event)*eventLoop->setsize);
|
||||
if (!state->events) {
|
||||
hi_free(state);
|
||||
return -1;
|
||||
}
|
||||
state->epfd = epoll_create(1024); /* 1024 is just a hint for the kernel */
|
||||
if (state->epfd == -1) {
|
||||
hi_free(state->events);
|
||||
hi_free(state);
|
||||
return -1;
|
||||
}
|
||||
anetCloexec(state->epfd);
|
||||
eventLoop->apidata = state;
|
||||
return 0;
|
||||
}
|
||||
|
||||
aeEventLoop *aeCreateEventLoop(int setsize) {
|
||||
aeEventLoop *eventLoop;
|
||||
int i;
|
||||
|
||||
if ((eventLoop = hi_malloc(sizeof(*eventLoop))) == NULL) goto err;
|
||||
eventLoop->events = hi_malloc(sizeof(aeFileEvent)*setsize);
|
||||
eventLoop->fired = hi_malloc(sizeof(aeFiredEvent)*setsize);
|
||||
if (eventLoop->events == NULL || eventLoop->fired == NULL) goto err;
|
||||
eventLoop->setsize = setsize;
|
||||
eventLoop->timeEventHead = NULL;
|
||||
eventLoop->timeEventNextId = 0;
|
||||
eventLoop->stop = 0;
|
||||
eventLoop->maxfd = -1;
|
||||
eventLoop->beforesleep = NULL;
|
||||
eventLoop->aftersleep = NULL;
|
||||
eventLoop->flags = 0;
|
||||
if (aeApiCreate(eventLoop) == -1) goto err;
|
||||
/* Events with mask == AE_NONE are not set. So let's initialize the
|
||||
* vector with it. */
|
||||
for (i = 0; i < setsize; i++)
|
||||
eventLoop->events[i].mask = AE_NONE;
|
||||
return eventLoop;
|
||||
|
||||
err:
|
||||
if (eventLoop) {
|
||||
hi_free(eventLoop->events);
|
||||
hi_free(eventLoop->fired);
|
||||
hi_free(eventLoop);
|
||||
}
|
||||
return NULL;
|
||||
}
|
||||
|
||||
int redisAeAttach(aeEventLoop *loop, redisAsyncContext *ac) {
|
||||
redisContext *c = &(ac->c);
|
||||
redisAeEvents *e;
|
||||
|
||||
/* Nothing should be attached when something is already attached */
|
||||
if (ac->ev.data != NULL)
|
||||
return REDIS_ERR;
|
||||
|
||||
/* Create container for context and r/w events */
|
||||
e = (redisAeEvents *) hi_malloc(sizeof(*e));
|
||||
if (e == NULL)
|
||||
return REDIS_ERR;
|
||||
|
||||
e->context = ac;
|
||||
e->loop = loop;
|
||||
e->fd = c->fd;
|
||||
e->reading = e->writing = 0;
|
||||
|
||||
/* Register functions to start/stop listening for events */
|
||||
ac->ev.addRead = redisAeAddRead;
|
||||
ac->ev.delRead = redisAeDelRead;
|
||||
ac->ev.addWrite = redisAeAddWrite;
|
||||
ac->ev.delWrite = redisAeDelWrite;
|
||||
ac->ev.cleanup = redisAeCleanup;
|
||||
ac->ev.data = e;
|
||||
|
||||
return REDIS_OK;
|
||||
}
|
||||
|
||||
|
||||
|
122
src/ae.h
122
src/ae.h
@ -1,133 +1,13 @@
|
||||
#ifndef __HIREDIS_AE_H__
|
||||
#define __HIREDIS_AE_H__
|
||||
|
||||
|
||||
#include "hiredis.h"
|
||||
#include "async.h"
|
||||
|
||||
|
||||
typedef uint64_t monotime;
|
||||
|
||||
#define AE_OK 0
|
||||
#define AE_ERR -1
|
||||
#define AE_NONE 0 /* No events registered. */
|
||||
#define AE_READABLE 1 /* Fire when descriptor is readable. */
|
||||
#define AE_WRITABLE 2 /* Fire when descriptor is writable. */
|
||||
#define AE_BARRIER 4 /* With WRITABLE, never fire the event if the
|
||||
READABLE event already fired in the same event
|
||||
loop iteration. Useful when you want to persist
|
||||
things to disk before sending replies, and want
|
||||
to do that in a group fashion. */
|
||||
|
||||
/* Types and data structures */
|
||||
typedef void aeFileProc(struct aeEventLoop *eventLoop, int fd, void *clientData, int mask);
|
||||
|
||||
typedef int aeTimeProc(struct aeEventLoop *eventLoop, long long id, void *clientData);
|
||||
|
||||
typedef void aeEventFinalizerProc(struct aeEventLoop *eventLoop, void *clientData);
|
||||
|
||||
typedef void aeBeforeSleepProc(struct aeEventLoop *eventLoop);
|
||||
|
||||
/* File event structure */
|
||||
typedef struct aeFileEvent {
|
||||
int mask; /* one of AE_(READABLE|WRITABLE|BARRIER) */
|
||||
aeFileProc *rfileProc;
|
||||
aeFileProc *wfileProc;
|
||||
void *clientData;
|
||||
} aeFileEvent;
|
||||
|
||||
/* Time event structure */
|
||||
typedef struct aeTimeEvent {
|
||||
long long id; /* time event identifier. */
|
||||
monotime when;
|
||||
aeTimeProc *timeProc;
|
||||
aeEventFinalizerProc *finalizerProc;
|
||||
void *clientData;
|
||||
struct aeTimeEvent *prev;
|
||||
struct aeTimeEvent *next;
|
||||
int refcount; /* refcount to prevent timer events from being
|
||||
* freed in recursive time event calls. */
|
||||
} aeTimeEvent;
|
||||
|
||||
/* A fired event */
|
||||
typedef struct aeFiredEvent {
|
||||
int fd;
|
||||
int mask;
|
||||
} aeFiredEvent;
|
||||
|
||||
typedef struct aeEventLoop {
|
||||
int maxfd; /* highest file descriptor currently registered */
|
||||
int setsize; /* max number of file descriptors tracked */
|
||||
long long timeEventNextId;
|
||||
aeFileEvent *events; /* Registered events */
|
||||
aeFiredEvent *fired; /* Fired events */
|
||||
aeTimeEvent *timeEventHead;
|
||||
int stop;
|
||||
void *apidata; /* This is used for polling API specific data */
|
||||
aeBeforeSleepProc *beforesleep;
|
||||
aeBeforeSleepProc *aftersleep;
|
||||
int flags;
|
||||
} aeEventLoop;
|
||||
|
||||
typedef struct redisAeEvents {
|
||||
redisAsyncContext *context;
|
||||
aeEventLoop *loop;
|
||||
int fd;
|
||||
int reading, writing;
|
||||
} redisAeEvents;
|
||||
|
||||
typedef struct aeApiState {
|
||||
int epfd;
|
||||
struct epoll_event *events;
|
||||
} aeApiState;
|
||||
|
||||
#define AE_FILE_EVENTS (1<<0)
|
||||
#define AE_TIME_EVENTS (1<<1)
|
||||
#define AE_ALL_EVENTS (AE_FILE_EVENTS|AE_TIME_EVENTS)
|
||||
#define AE_DONT_WAIT (1<<2)
|
||||
#define AE_CALL_BEFORE_SLEEP (1<<3)
|
||||
#define AE_CALL_AFTER_SLEEP (1<<4)
|
||||
|
||||
#define AE_NOMORE -1
|
||||
#define AE_DELETED_EVENT_ID -1
|
||||
|
||||
extern monotime (*getMonotonicUs)(void);
|
||||
|
||||
void redisAeReadEvent(aeEventLoop *el, int fd, void *privdata, int mask);
|
||||
|
||||
void redisAeWriteEvent(aeEventLoop *el, int fd, void *privdata, int mask);
|
||||
|
||||
int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask);
|
||||
|
||||
void aeStop(aeEventLoop *eventLoop);
|
||||
|
||||
void aeMain(aeEventLoop *eventLoop);
|
||||
|
||||
int aeProcessEvents(aeEventLoop *eventLoop, int flags);
|
||||
|
||||
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
|
||||
aeFileProc *proc, void *clientData);
|
||||
|
||||
void redisAeAddRead(void *privdata);
|
||||
|
||||
void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int delmask);
|
||||
|
||||
void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask);
|
||||
|
||||
void redisAeDelRead(void *privdata);
|
||||
|
||||
void redisAeAddWrite(void *privdata);
|
||||
|
||||
void redisAeDelWrite(void *privdata);
|
||||
|
||||
void redisAeCleanup(void *privdata);
|
||||
|
||||
int anetCloexec(int fd);
|
||||
|
||||
static int aeApiCreate(aeEventLoop *eventLoop);
|
||||
|
||||
aeEventLoop *aeCreateEventLoop(int setsize);
|
||||
|
||||
int redisAeAttach(aeEventLoop *loop, redisAsyncContext *ac);
|
||||
int aeWait(int fd, int mask, long long milliseconds);
|
||||
|
||||
#endif
|
||||
|
21
src/alloc.c
21
src/alloc.c
@ -42,7 +42,8 @@ hiredisAllocFuncs hiredisAllocFns = {
|
||||
};
|
||||
|
||||
/* Override hiredis' allocators with ones supplied by the user */
|
||||
hiredisAllocFuncs hiredisSetAllocators(hiredisAllocFuncs *override) {
|
||||
hiredisAllocFuncs hiredisSetAllocators(hiredisAllocFuncs *override)
|
||||
{
|
||||
hiredisAllocFuncs orig = hiredisAllocFns;
|
||||
|
||||
hiredisAllocFns = *override;
|
||||
@ -51,7 +52,8 @@ hiredisAllocFuncs hiredisSetAllocators(hiredisAllocFuncs *override) {
|
||||
}
|
||||
|
||||
/* Reset allocators to use libc defaults */
|
||||
void hiredisResetAllocators(void) {
|
||||
void hiredisResetAllocators(void)
|
||||
{
|
||||
hiredisAllocFns = (hiredisAllocFuncs){
|
||||
.mallocFn = malloc,
|
||||
.callocFn = calloc,
|
||||
@ -63,11 +65,13 @@ void hiredisResetAllocators(void) {
|
||||
|
||||
#ifdef _WIN32
|
||||
|
||||
void *hi_malloc(size_t size) {
|
||||
void *hi_malloc(size_t size)
|
||||
{
|
||||
return hiredisAllocFns.mallocFn(size);
|
||||
}
|
||||
|
||||
void *hi_calloc(size_t nmemb, size_t size) {
|
||||
void *hi_calloc(size_t nmemb, size_t size)
|
||||
{
|
||||
/* Overflow check as the user can specify any arbitrary allocator */
|
||||
if (SIZE_MAX / size < nmemb)
|
||||
return NULL;
|
||||
@ -75,15 +79,18 @@ void *hi_calloc(size_t nmemb, size_t size) {
|
||||
return hiredisAllocFns.callocFn(nmemb, size);
|
||||
}
|
||||
|
||||
void *hi_realloc(void *ptr, size_t size) {
|
||||
void *hi_realloc(void *ptr, size_t size)
|
||||
{
|
||||
return hiredisAllocFns.reallocFn(ptr, size);
|
||||
}
|
||||
|
||||
char *hi_strdup(const char *str) {
|
||||
char *hi_strdup(const char *str)
|
||||
{
|
||||
return hiredisAllocFns.strdupFn(str);
|
||||
}
|
||||
|
||||
void hi_free(void *ptr) {
|
||||
void hi_free(void *ptr)
|
||||
{
|
||||
hiredisAllocFns.freeFn(ptr);
|
||||
}
|
||||
|
||||
|
@ -812,7 +812,7 @@ static int __redisAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void
|
||||
if (!(c->flags & REDIS_SUBSCRIBED)) return REDIS_ERR;
|
||||
|
||||
/* (P)UNSUBSCRIBE does not have its own response: every channel or
|
||||
* pattern that is unsubscribed will receive a message. This means we
|
||||
* pattern that is unsubscribed will redisReceive a message. This means we
|
||||
* should not append a callback function for this command. */
|
||||
} else if (strncasecmp(cstr,"monitor\r\n",9) == 0) {
|
||||
/* Set monitor flag and push callback */
|
||||
|
@ -33,9 +33,7 @@
|
||||
* POSSIBILITY OF SUCH DAMAGE.
|
||||
*/
|
||||
|
||||
#include "fmacros.h"
|
||||
#include "alloc.h"
|
||||
#include <stdlib.h>
|
||||
#include <assert.h>
|
||||
#include <limits.h>
|
||||
#include "dict.h"
|
||||
|
@ -1139,6 +1139,16 @@ int redisAppendCommand(redisContext *c, const char *format, ...) {
|
||||
return ret;
|
||||
}
|
||||
|
||||
int redisSendCommand(redisContext *c, const char *format, ...) {
|
||||
va_list ap;
|
||||
int ret;
|
||||
|
||||
va_start(ap,format);
|
||||
ret = redisvAppendCommand(c,format,ap);
|
||||
va_end(ap);
|
||||
return redisFlush(c);
|
||||
}
|
||||
|
||||
int redisAppendCommandArgv(redisContext *c, int argc, const char **argv, const size_t *argvlen) {
|
||||
hisds cmd;
|
||||
long long len;
|
||||
|
@ -331,6 +331,7 @@ int redisAppendFormattedCommand(redisContext *c, const char *cmd, size_t len);
|
||||
* to get a pipeline of commands. */
|
||||
int redisvAppendCommand(redisContext *c, const char *format, va_list ap);
|
||||
int redisAppendCommand(redisContext *c, const char *format, ...);
|
||||
int redisSendCommand(redisContext *c, const char *format, ...);
|
||||
int redisAppendCommandArgv(redisContext *c, int argc, const char **argv, const size_t *argvlen);
|
||||
|
||||
/* Issue a command to Redis. In a blocking context, it is identical to calling
|
||||
|
111
src/log.c
Normal file
111
src/log.c
Normal file
@ -0,0 +1,111 @@
|
||||
#include <bits/types/FILE.h>
|
||||
#include <stdio.h>
|
||||
#include <fcntl.h>
|
||||
#include <unistd.h>
|
||||
#include <sys/time.h>
|
||||
#include <time.h>
|
||||
#include "log.h"
|
||||
#include "hiredis.h"
|
||||
|
||||
|
||||
|
||||
static int is_leap_year(time_t year) {
|
||||
if (year % 4) return 0; /* A year not divisible by 4 is not leap. */
|
||||
else if (year % 100) return 1; /* If div by 4 and not 100 is surely leap. */
|
||||
else if (year % 400) return 0; /* If div by 100 *and* not by 400 is not leap. */
|
||||
else return 1; /* If div by 100 and 400 is leap. */
|
||||
}
|
||||
|
||||
void createLogObj(char *logfile) {
|
||||
log = hi_malloc(sizeof(logObj));
|
||||
log->logfile = logfile;
|
||||
log->loglevel = LL_NOTICE;
|
||||
}
|
||||
|
||||
|
||||
void _serverLog(int level, const char *fmt, ...) {
|
||||
va_list ap;
|
||||
char msg[LOG_MAX_LEN];
|
||||
|
||||
va_start(ap, fmt);
|
||||
vsnprintf(msg, sizeof(msg), fmt, ap);
|
||||
va_end(ap);
|
||||
}
|
||||
void serverLogRaw(int level, const char *msg) {
|
||||
const char *c = ".-*#";
|
||||
FILE *fp;
|
||||
char buf[64];
|
||||
int log_to_stdout = log->logfile[0] == '\0';
|
||||
|
||||
if (level < log->loglevel) {
|
||||
return;
|
||||
}
|
||||
|
||||
fp = log_to_stdout ? stdout : fopen(log->logfile, "a");
|
||||
if (!fp) return;
|
||||
|
||||
int off;
|
||||
struct timeval tv;
|
||||
int role_char;
|
||||
pid_t pid = getpid();
|
||||
|
||||
gettimeofday(&tv, NULL);
|
||||
struct tm tm;
|
||||
nolocks_localtime(&tm, tv.tv_sec, timezone, 1);
|
||||
off = strftime(buf, sizeof(buf), "%d %b %Y %H:%M:%S.", &tm);
|
||||
snprintf(buf + off, sizeof(buf) - off, "%03d", (int) tv.tv_usec / 1000);
|
||||
role_char = 'm';
|
||||
fprintf(fp, "%d:%c %s %c %s\n",
|
||||
(int) pid, role_char, buf, c[level], msg);
|
||||
|
||||
fflush(fp);
|
||||
|
||||
if (!log_to_stdout) fclose(fp);
|
||||
}
|
||||
|
||||
void nolocks_localtime(struct tm *tmp, time_t t, time_t tz, int dst) {
|
||||
const time_t secs_min = 60;
|
||||
const time_t secs_hour = 3600;
|
||||
const time_t secs_day = 3600*24;
|
||||
|
||||
t -= tz; /* Adjust for timezone. */
|
||||
t += 3600*dst; /* Adjust for daylight time. */
|
||||
time_t days = t / secs_day; /* Days passed since epoch. */
|
||||
time_t seconds = t % secs_day; /* Remaining seconds. */
|
||||
|
||||
tmp->tm_isdst = dst;
|
||||
tmp->tm_hour = seconds / secs_hour;
|
||||
tmp->tm_min = (seconds % secs_hour) / secs_min;
|
||||
tmp->tm_sec = (seconds % secs_hour) % secs_min;
|
||||
|
||||
/* 1/1/1970 was a Thursday, that is, day 4 from the POV of the tm structure
|
||||
* where sunday = 0, so to calculate the day of the week we have to add 4
|
||||
* and take the modulo by 7. */
|
||||
tmp->tm_wday = (days+4)%7;
|
||||
|
||||
/* Calculate the current year. */
|
||||
tmp->tm_year = 1970;
|
||||
while(1) {
|
||||
/* Leap years have one day more. */
|
||||
time_t days_this_year = 365 + is_leap_year(tmp->tm_year);
|
||||
if (days_this_year > days) break;
|
||||
days -= days_this_year;
|
||||
tmp->tm_year++;
|
||||
}
|
||||
tmp->tm_yday = days; /* Number of day of the current year. */
|
||||
|
||||
/* We need to calculate in which month and day of the month we are. To do
|
||||
* so we need to skip days according to how many days there are in each
|
||||
* month, and adjust for the leap year that has one more day in February. */
|
||||
int mdays[12] = {31, 28, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31};
|
||||
mdays[1] += is_leap_year(tmp->tm_year);
|
||||
|
||||
tmp->tm_mon = 0;
|
||||
while(days >= mdays[tmp->tm_mon]) {
|
||||
days -= mdays[tmp->tm_mon];
|
||||
tmp->tm_mon++;
|
||||
}
|
||||
|
||||
tmp->tm_mday = days+1; /* Add 1 since our 'days' is zero-based. */
|
||||
tmp->tm_year -= 1900; /* Surprisingly tm_year is year-1900. */
|
||||
}
|
34
src/log.h
Normal file
34
src/log.h
Normal file
@ -0,0 +1,34 @@
|
||||
|
||||
#ifndef REDIS_MIGRATE_LOG_H
|
||||
#define REDIS_MIGRATE_LOG_H
|
||||
|
||||
#define NOTICE "notice"
|
||||
#define WARNING "warning"
|
||||
#define VERBOSE "VERBOSE"
|
||||
#define DEBUG "debug"
|
||||
|
||||
#define LL_DEBUG 0
|
||||
#define LL_VERBOSE 1
|
||||
#define LL_NOTICE 2
|
||||
#define LL_WARNING 3
|
||||
#define LOG_MAX_LEN 1024
|
||||
|
||||
typedef struct logObj {
|
||||
char *logfile;
|
||||
int loglevel;
|
||||
} logObj;
|
||||
|
||||
static logObj *log;
|
||||
|
||||
void createLogObj(char *logfile);
|
||||
|
||||
#define serverLog(level, ...) _serverLog(level, __VA_ARGS__);
|
||||
|
||||
void _serverLog(int level, const char *fmt, ...)
|
||||
__attribute__((format(printf, 2, 3)));
|
||||
|
||||
void serverLogRaw(int level, const char *msg);
|
||||
|
||||
void nolocks_localtime(struct tm *tmp, time_t t, time_t tz, int dst);
|
||||
|
||||
#endif //REDIS_MIGRATE_LOG_H
|
175
src/rdbLoad.c
Normal file
175
src/rdbLoad.c
Normal file
@ -0,0 +1,175 @@
|
||||
#include "rdbLoad.h"
|
||||
|
||||
#include <errno.h>
|
||||
#include <unistd.h>
|
||||
|
||||
int rdbLoadRioWithLoading(migrateObj *mobj) {
|
||||
char buf[1024];
|
||||
int error;
|
||||
if (syncReadLine(mobj->source_cc->fd, buf, 9, mobj->timeout) == -1) {
|
||||
serverLog(LL_WARNING, "read version failed:%s,port:%d ", mobj->host,
|
||||
mobj->port);
|
||||
return C_ERR;
|
||||
}
|
||||
buf[9] = '\0';
|
||||
if (memcmp(buf, "REDIS", 5) != 0) {
|
||||
serverLog(LL_WARNING, "Wrong signature trying to load DB from file");
|
||||
errno = EINVAL;
|
||||
return C_ERR;
|
||||
}
|
||||
int type, rdbver;
|
||||
uint64_t dbid = 0;
|
||||
rdbver = atoi(buf + 5);
|
||||
if (rdbver < 1 || rdbver > RDB_VERSION) {
|
||||
serverLog(LL_WARNING, "Can't handle RDB format version %d", rdbver);
|
||||
errno = EINVAL;
|
||||
return C_ERR;
|
||||
}
|
||||
serverLog(LL_NOTICE, "buf=%s", buf);
|
||||
long long lru_idle = -1, lfu_freq = -1, expiretime = -1, now = mstime();
|
||||
while (1) {
|
||||
sds key;
|
||||
robj *val;
|
||||
if ((type = rm_rdbLoadType(mobj)) == -1) {
|
||||
serverLog(LL_WARNING, "read type failed");
|
||||
return C_ERR;
|
||||
}
|
||||
|
||||
if (type == RDB_OPCODE_EXPIRETIME) {
|
||||
expiretime = rm_rdbLoadTime(mobj);
|
||||
if (expiretime == -1) {
|
||||
return C_ERR;
|
||||
}
|
||||
expiretime *= 1000;
|
||||
continue;
|
||||
} else if (type == RDB_OPCODE_EXPIRETIME_MS) {
|
||||
expiretime = rm_rdbLoadMillisecondTime(mobj, rdbver);
|
||||
continue;
|
||||
} else if (type == RDB_OPCODE_FREQ) {
|
||||
uint8_t byte;
|
||||
if (read(mobj->source_cc->fd, &byte, 1) == 0)
|
||||
return C_ERR;
|
||||
lfu_freq = byte;
|
||||
continue;
|
||||
} else if (type == RDB_OPCODE_IDLE) {
|
||||
uint64_t qword;
|
||||
if ((qword = rm_rdbLoadLen(mobj, NULL)) == RDB_LENERR)
|
||||
return C_ERR;
|
||||
lru_idle = qword;
|
||||
continue;
|
||||
} else if (type == RDB_OPCODE_EOF) {
|
||||
break;
|
||||
} else if (type == RDB_OPCODE_SELECTDB) {
|
||||
if ((dbid = rm_rdbLoadLen(mobj, NULL)) == RDB_LENERR) return C_ERR;
|
||||
continue;
|
||||
} else if (type == RDB_OPCODE_RESIZEDB) {
|
||||
uint64_t db_size, expires_size;
|
||||
if ((db_size = rm_rdbLoadLen(mobj, NULL)) == RDB_LENERR) return C_ERR;
|
||||
if ((expires_size = rm_rdbLoadLen(mobj, NULL)) == RDB_LENERR) return C_ERR;
|
||||
|
||||
continue;
|
||||
} else if (type == RDB_OPCODE_AUX) {
|
||||
robj *auxkey, *auxval;
|
||||
|
||||
} else if (type == RDB_OPCODE_FUNCTION || type == RDB_OPCODE_FUNCTION2) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void *rm_rdbGenericLoadStringObject(migrateObj *mobj, int flags, size_t *lenptr) {
|
||||
int encode = flags & RDB_LOAD_ENC;
|
||||
int plain = flags & RDB_LOAD_PLAIN;
|
||||
int sds = flags & RDB_LOAD_SDS;
|
||||
int isencoded;
|
||||
unsigned long long len;
|
||||
|
||||
}
|
||||
|
||||
uint64_t rm_rdbLoadLen(migrateObj *mobj, int *isencoded) {
|
||||
uint64_t len;
|
||||
|
||||
if (rdbLoadLenByRef(mobj, isencoded, &len) == -1)
|
||||
return RDB_LENERR;
|
||||
return len;
|
||||
}
|
||||
|
||||
int rm_rdbLoadType(migrateObj *mobj) {
|
||||
unsigned char type;
|
||||
if (read(mobj->source_cc->fd, &type, 1) == 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
return type;
|
||||
}
|
||||
|
||||
int rm_rdbLoadLenByRef(migrateObj *mobj, int *isencoded, uint64_t *lenptr) {
|
||||
unsigned char buf[2];
|
||||
int type;
|
||||
if (isencoded)
|
||||
*isencoded = 0;
|
||||
if (read(mobj->source_cc->fd, buf, 1) == 0) {
|
||||
return -1;
|
||||
}
|
||||
type = (buf[0] & 0xC0) >> 6;
|
||||
if (type == RDB_ENCVAL) {
|
||||
if (isencoded)
|
||||
*isencoded = 1;
|
||||
*lenptr = buf[0] & 0x3F;
|
||||
} else if (type == RDB_6BITLEN) {
|
||||
/* Read a 6 bit len. */
|
||||
*lenptr = buf[0] & 0x3F;
|
||||
} else if (type == RDB_14BITLEN) {
|
||||
if (read(mobj->source_cc->fd, buf + 1, 1) == 0)
|
||||
return -1;
|
||||
*lenptr = ((buf[0] & 0x3F) << 8) | buf[1];
|
||||
} else if (buf[0] == RDB_32BITLEN) {
|
||||
uint32_t len;
|
||||
if (read(mobj->source_cc->fd, &len, 4) == 0)
|
||||
return -1;
|
||||
*lenptr = ntohl(len);
|
||||
} else if (buf[0] == RDB_64BITLEN) {
|
||||
uint64_t len;
|
||||
if (read(mobj->source_cc->fd, &len, 8) == 0)
|
||||
return -1;
|
||||
*lenptr = ntohu64(len);
|
||||
} else {
|
||||
return -1;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
time_t rm_rdbLoadTime(migrateObj *mobj) {
|
||||
int32_t t32;
|
||||
if (read(mobj->source_cc->fd, &t32, 4) == 0) {
|
||||
return -1;
|
||||
}
|
||||
t32 = (char *)t32 + 4;
|
||||
return (time_t)t32;
|
||||
}
|
||||
|
||||
long long rm_rdbLoadMillisecondTime(migrateObj *mobj, int rdbver) {
|
||||
int64_t t64;
|
||||
if (read(mobj->source_cc->fd, &t64, 8) == 0) {
|
||||
return LLONG_MAX;
|
||||
}
|
||||
if (rdbver >= 9)
|
||||
memrev64(&t64);
|
||||
return (long long)t64;
|
||||
}
|
||||
|
||||
void rm_memrev64(void *p) {
|
||||
unsigned char *x = p, t;
|
||||
|
||||
t = x[0];
|
||||
x[0] = x[7];
|
||||
x[7] = t;
|
||||
t = x[1];
|
||||
x[1] = x[6];
|
||||
x[6] = t;
|
||||
t = x[2];
|
||||
x[2] = x[5];
|
||||
x[5] = t;
|
||||
t = x[3];
|
||||
x[3] = x[4];
|
||||
x[4] = t;
|
||||
}
|
48
src/rdbLoad.h
Normal file
48
src/rdbLoad.h
Normal file
@ -0,0 +1,48 @@
|
||||
#ifndef RDB_LOAD_REDIS_MIGRATE_H
|
||||
#define RDB_LOAD_REDIS_MIGRATE_H
|
||||
#include "redis-migrate.h"
|
||||
|
||||
#define RDB_6BITLEN 0
|
||||
#define RDB_14BITLEN 1
|
||||
#define RDB_32BITLEN 0x80
|
||||
#define RDB_64BITLEN 0x81
|
||||
#define RDB_ENCVAL 3
|
||||
#define RDB_LENERR UINT64_MAX
|
||||
|
||||
#define RDB_VERSION 10
|
||||
#define RDB_OPCODE_FUNCTION2 245 /* function library data */
|
||||
#define RDB_OPCODE_FUNCTION 246 /* old function library data for 7.0 rc1 and rc2 */
|
||||
#define RDB_OPCODE_MODULE_AUX 247 /* Module auxiliary data. */
|
||||
#define RDB_OPCODE_IDLE 248 /* LRU idle time. */
|
||||
#define RDB_OPCODE_FREQ 249 /* LFU frequency. */
|
||||
#define RDB_OPCODE_AUX 250 /* RDB aux field. */
|
||||
#define RDB_OPCODE_RESIZEDB 251 /* Hash table resize hint. */
|
||||
#define RDB_OPCODE_EXPIRETIME_MS 252 /* Expire time in milliseconds. */
|
||||
#define RDB_OPCODE_EXPIRETIME 253 /* Old expire time in seconds. */
|
||||
#define RDB_OPCODE_SELECTDB 254 /* DB number of the following keys. */
|
||||
#define RDB_OPCODE_EOF 255 /* End of the RDB file. */
|
||||
|
||||
#define RDB_LOAD_NONE 0
|
||||
#define RDB_LOAD_ENC (1 << 0)
|
||||
#define RDB_LOAD_PLAIN (1 << 1)
|
||||
#define RDB_LOAD_SDS (1 << 2)
|
||||
|
||||
#define LLONG_MAX __LONG_LONG_MAX__
|
||||
|
||||
int rm_rdbLoadRioWithLoading(migrateObj *mobj);
|
||||
|
||||
int rm_rdbLoadType(migrateObj *mobj);
|
||||
|
||||
time_t rm_rdbLoadTime(migrateObj *mobj);
|
||||
|
||||
long long rm_rdbLoadMillisecondTime(migrateObj *mobj, int rdbver);
|
||||
|
||||
int rm_rdbLoadLenByRef(migrateObj *mobi, int *isencoded, uint64_t *lenptr);
|
||||
|
||||
void rm_memrev64(void *p);
|
||||
|
||||
uint64_t rm_rdbLoadLen(migrateObj *mobj, int *isencoded);
|
||||
|
||||
void *rm_rdbGenericLoadStringObject(migrateObj *mobj, int flags, size_t *lenptr);
|
||||
|
||||
#endif
|
@ -2,9 +2,9 @@
|
||||
#include <sys/time.h>
|
||||
#include <unistd.h>
|
||||
#include <string.h>
|
||||
#include <pthread.h>
|
||||
#include "redis-migrate.h"
|
||||
#include "hiredis.h"
|
||||
#include "rdbLoad.h"
|
||||
|
||||
static migrateObj *mobj;
|
||||
|
||||
@ -17,6 +17,8 @@ migrateObj *createMigrateObject(robj *host, int port, int begin_slot, int end_sl
|
||||
m->end_slot = end_slot;
|
||||
m->repl_stat = REPL_STATE_NONE;
|
||||
m->isCache = 0;
|
||||
m->timeout = 10 * 1000;
|
||||
m->repl_transfer_size = -1;
|
||||
return m;
|
||||
}
|
||||
|
||||
@ -26,102 +28,299 @@ void freeMigrateObj(migrateObj *m) {
|
||||
mobj = NULL;
|
||||
}
|
||||
|
||||
int sendSyncCommand(RedisModuleCtx *ctx) {
|
||||
long long ustime(void) {
|
||||
struct timeval tv;
|
||||
long long ust;
|
||||
|
||||
gettimeofday(&tv, NULL);
|
||||
ust = ((long long)tv.tv_sec) * 1000000;
|
||||
ust += tv.tv_usec;
|
||||
return ust;
|
||||
}
|
||||
|
||||
/* Return the UNIX time in milliseconds */
|
||||
mstime_t mstime(void) {
|
||||
return ustime() / 1000;
|
||||
}
|
||||
|
||||
int sendSyncCommand() {
|
||||
if (!mobj->isCache) {
|
||||
mobj->isCache = 1;
|
||||
mobj->psync_replid = "?";
|
||||
memcpy(mobj->psync_offset, "-1", 3);
|
||||
}
|
||||
if (redisAppendCommand(mobj->source_cc, "PSYNC %s %s", mobj->psync_replid, mobj->psync_offset) != REDIS_OK) {
|
||||
RedisModule_Log(ctx, REDISMODULE_LOGLEVEL_WARNING,
|
||||
"append PSYNC %s %s failed ip:%s,port:%d, ",
|
||||
serverLog(LL_WARNING, "append PSYNC %s %s failed ip:%s,port:%d, ",
|
||||
mobj->psync_replid, mobj->psync_offset, mobj->host, mobj->port);
|
||||
return 0;
|
||||
}
|
||||
if (redisFlush(mobj->source_cc) != REDIS_OK) {
|
||||
RedisModule_Log(ctx, REDISMODULE_LOGLEVEL_WARNING,
|
||||
"send PSYNC %s %s failed ip:%s,port:%d, ",
|
||||
serverLog(LL_WARNING, "send PSYNC %s %s failed ip:%s,port:%d, ",
|
||||
mobj->psync_replid, mobj->psync_offset, mobj->host, mobj->port);
|
||||
return 0;
|
||||
}
|
||||
RedisModule_Log(ctx, REDISMODULE_LOGLEVEL_NOTICE, "PSYNC %s %s ",
|
||||
mobj->psync_replid, mobj->psync_offset);
|
||||
serverLog(LL_NOTICE, "PSYNC %s %s ", mobj->psync_replid, mobj->psync_offset);
|
||||
return 1;
|
||||
}
|
||||
|
||||
void *syncWithRedis(void *arg) {
|
||||
RedisModuleCtx *ctx = arg;
|
||||
redisReply *reply;
|
||||
RedisModule_Log(ctx, REDISMODULE_LOGLEVEL_NOTICE, "begin sync data");
|
||||
reply = redisCommand(mobj->source_cc, "PING");
|
||||
if (reply->type == REDIS_REPLY_ERROR) {
|
||||
RedisModule_Log(ctx, REDISMODULE_LOGLEVEL_WARNING, "send PING failed ip:%s,port:%d, error:%s",
|
||||
mobj->host, mobj->port, reply->str);
|
||||
sds redisReceive() {
|
||||
char buf[256];
|
||||
if (syncReadLine(mobj->source_cc->fd, buf, sizeof(buf), mobj->timeout) == -1) {
|
||||
serverLog(LL_WARNING, "redisReceive failed ip:%s,port:%d ", mobj->host, mobj->port);
|
||||
return NULL;
|
||||
}
|
||||
return hi_sdsnew(buf);
|
||||
}
|
||||
|
||||
int receiveDataFromRedis() {
|
||||
sds reply = redisReceive();
|
||||
if (reply == NULL) {
|
||||
serverLog(LL_WARNING, "Master did not reply to PSYNC");
|
||||
return PSYNC_TRY_LATER;
|
||||
}
|
||||
if (sdslen(reply) == 0) {
|
||||
sdsfree(reply);
|
||||
return PSYNC_WAIT_REPLY;
|
||||
}
|
||||
serverLog(LL_NOTICE, "reply=%s", reply);
|
||||
if (!strncmp(reply, "+FULLRESYNC", 11)) {
|
||||
char *replid = NULL, *offset = NULL;
|
||||
|
||||
/* FULL RESYNC, parse the reply in order to extract the replid
|
||||
* and the replication offset. */
|
||||
replid = strchr(reply, ' ');
|
||||
if (replid) {
|
||||
replid++;
|
||||
offset = strchr(replid, ' ');
|
||||
if (offset)
|
||||
offset++;
|
||||
}
|
||||
if (!replid || !offset || (offset - replid - 1) != CONFIG_RUN_ID_SIZE) {
|
||||
serverLog(LL_WARNING, "Master replied with wrong +FULLRESYNC syntax.");
|
||||
/* This is an unexpected condition, actually the +FULLRESYNC
|
||||
* reply means that the master supports PSYNC, but the reply
|
||||
* format seems wrong. To stay safe we blank the master
|
||||
* replid to make sure next PSYNCs will fail. */
|
||||
memset(mobj->master_replid, 0, CONFIG_RUN_ID_SIZE + 1);
|
||||
} else {
|
||||
memcpy(mobj->master_replid, replid, offset - replid - 1);
|
||||
mobj->master_replid[CONFIG_RUN_ID_SIZE] = '\0';
|
||||
mobj->master_initial_offset = strtoll(offset, NULL, 10);
|
||||
serverLog(LL_NOTICE, "Full sync from master: %s:%lld",
|
||||
mobj->master_replid, mobj->master_initial_offset);
|
||||
}
|
||||
sdsfree(reply);
|
||||
return PSYNC_FULLRESYNC;
|
||||
}
|
||||
if (!strncmp(reply, "+CONTINUE", 9)) {
|
||||
/* Partial resync was accepted. */
|
||||
serverLog(LL_NOTICE, "Successful partial resynchronization with master.");
|
||||
|
||||
/* Check the new replication ID advertised by the master. If it
|
||||
* changed, we need to set the new ID as primary ID, and set
|
||||
* secondary ID as the old master ID up to the current offset, so
|
||||
* that our sub-slaves will be able to PSYNC with us after a
|
||||
* disconnection. */
|
||||
char *start = reply + 10;
|
||||
char *end = reply + 9;
|
||||
while (end[0] != '\r' && end[0] != '\n' && end[0] != '\0')
|
||||
end++;
|
||||
if (end - start == CONFIG_RUN_ID_SIZE) {
|
||||
char new[CONFIG_RUN_ID_SIZE + 1];
|
||||
memcpy(new, start, CONFIG_RUN_ID_SIZE);
|
||||
new[CONFIG_RUN_ID_SIZE] = '\0';
|
||||
|
||||
return PSYNC_CONTINUE;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void readFullData() {
|
||||
static char eofmark[CONFIG_RUN_ID_SIZE];
|
||||
static char lastbytes[CONFIG_RUN_ID_SIZE];
|
||||
static int usemark = 0;
|
||||
char buf[PROTO_IOBUF_LEN];
|
||||
if (mobj->repl_transfer_size == -1) {
|
||||
int nread = syncReadLine(mobj->source_cc->fd, buf, PROTO_IOBUF_LEN, mobj->timeout);
|
||||
if (nread == -1) {
|
||||
serverLog(LL_WARNING, "read full data failed");
|
||||
goto error;
|
||||
}
|
||||
RedisModule_Log(ctx, REDISMODULE_LOGLEVEL_NOTICE, "PING ");
|
||||
freeReplyObject(reply);
|
||||
//todo auth with master
|
||||
if (buf[0] == '-') {
|
||||
serverLog(LL_WARNING,
|
||||
"MASTER aborted replication with an error: %s",
|
||||
buf + 1);
|
||||
goto error;
|
||||
} else if (buf[0] == '\0') {
|
||||
// mobj->repl_transfer_lastio = server.unixtime;
|
||||
return;
|
||||
} else if (buf[0] != '$') {
|
||||
serverLog(LL_WARNING, "Bad protocol from MASTER, the first byte is not '$' (we received '%s'), are you sure the host and port are right?", buf);
|
||||
goto error;
|
||||
}
|
||||
|
||||
if (strncmp(buf + 1, "EOF:", 4) == 0 && strlen(buf + 5) >= CONFIG_RUN_ID_SIZE) {
|
||||
usemark = 1;
|
||||
usemark = 1;
|
||||
memcpy(eofmark, buf + 5, CONFIG_RUN_ID_SIZE);
|
||||
memset(lastbytes, 0, CONFIG_RUN_ID_SIZE);
|
||||
/* Set any repl_transfer_size to avoid entering this code path
|
||||
* at the next call. */
|
||||
mobj->repl_transfer_size = 0;
|
||||
serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: receiving streamed RDB from master with EOF to parser");
|
||||
} else {
|
||||
usemark = 0;
|
||||
mobj->repl_transfer_size = strtol(buf + 1, NULL, 10);
|
||||
serverLog(LL_NOTICE,
|
||||
"MASTER <-> REPLICA sync: receiving %lld bytes from master to parser",
|
||||
(long long)mobj->repl_transfer_size);
|
||||
}
|
||||
}
|
||||
int flag = rdbLoadRioWithLoading(mobj);
|
||||
|
||||
return;
|
||||
error:
|
||||
cancelMigrate();
|
||||
return;
|
||||
}
|
||||
|
||||
void cancelMigrate() {
|
||||
}
|
||||
|
||||
void syncDataWithRedis(int fd, void *user_data, int mask) {
|
||||
REDISMODULE_NOT_USED(fd);
|
||||
REDISMODULE_NOT_USED(mask);
|
||||
REDISMODULE_NOT_USED(user_data);
|
||||
sds err = NULL;
|
||||
if (mobj->repl_stat == REPL_STATE_CONNECTING) {
|
||||
if (redisSendCommand(mobj->source_cc, "PING") != REDIS_OK) {
|
||||
serverLog(LL_WARNING, "send PING failed ip:%s,port:%d",
|
||||
mobj->host, mobj->port);
|
||||
goto error;
|
||||
}
|
||||
mobj->repl_stat = REPL_STATE_RECEIVE_PING_REPLY;
|
||||
return;
|
||||
}
|
||||
if (mobj->repl_stat == REPL_STATE_RECEIVE_PING_REPLY) {
|
||||
err = redisReceive();
|
||||
if (err == NULL)
|
||||
goto no_response_error;
|
||||
if (err[0] != '+' && strncmp(err, "-NOAUTH", 7) != 0 && strncmp(err, "-NOPERM", 7) != 0 && strncmp(err, "-ERR operation not permitted", 28) != 0) {
|
||||
serverLog(LL_WARNING, "Error reply to PING from master: '%s'", err);
|
||||
sdsfree(err);
|
||||
goto error;
|
||||
} else {
|
||||
serverLog(LL_NOTICE, "Master replied to PING, replication can continue...");
|
||||
}
|
||||
sdsfree(err);
|
||||
err = NULL;
|
||||
mobj->repl_stat = REPL_STATE_SEND_HANDSHAKE;
|
||||
return;
|
||||
}
|
||||
if (mobj->repl_stat == REPL_STATE_SEND_HANDSHAKE) {
|
||||
// todo 增加认证
|
||||
mobj->repl_stat = REPL_STATE_RECEIVE_AUTH_REPLY;
|
||||
}
|
||||
if (mobj->repl_stat == REPL_STATE_RECEIVE_AUTH_REPLY) {
|
||||
// todo 接受认证信息
|
||||
sds portstr = sdsfromlonglong(mobj->port);
|
||||
reply = redisCommand(mobj->source_cc, "REPLCONF listening-port %s", portstr);
|
||||
if (reply->type == REDIS_REPLY_ERROR) {
|
||||
if (redisSendCommand(mobj->source_cc, "REPLCONF listening-port %s", portstr) != REDIS_OK) {
|
||||
serverLog(LL_WARNING, "send PING failed ip:%s,port:%d",
|
||||
mobj->host, mobj->port);
|
||||
goto error;
|
||||
}
|
||||
mobj->repl_stat = REPL_STATE_RECEIVE_PORT_REPLY;
|
||||
sdsfree(portstr);
|
||||
RedisModule_Log(ctx, REDISMODULE_LOGLEVEL_WARNING, "REPLCONF listening-port %s failed ip:%s,port:%d, error:%s",
|
||||
portstr, mobj->host, mobj->port, reply->str);
|
||||
return;
|
||||
}
|
||||
if (mobj->repl_stat == REPL_STATE_RECEIVE_PORT_REPLY) {
|
||||
err = redisReceive();
|
||||
if (err == NULL)
|
||||
goto no_response_error;
|
||||
if (err[0] == '-') {
|
||||
serverLog(LL_NOTICE, "(Non critical) Master does not understand REPLCONF listening-port: %s", err);
|
||||
goto error;
|
||||
}
|
||||
RedisModule_Log(ctx, REDISMODULE_LOGLEVEL_NOTICE, "REPLCONF listening-port %s ", portstr);
|
||||
sdsfree(portstr);
|
||||
freeReplyObject(reply);
|
||||
reply = redisCommand(mobj->source_cc, "REPLCONF ip-address %s", mobj->host);
|
||||
if (reply->type == REDIS_REPLY_ERROR) {
|
||||
RedisModule_Log(ctx, REDISMODULE_LOGLEVEL_WARNING, "REPLCONF ip-address %s failed ip:%s,port:%d, error:%s",
|
||||
mobj->host, mobj->host, mobj->port, reply->str);
|
||||
serverLog(LL_NOTICE, "REPLCONF listening-port success");
|
||||
if (redisSendCommand(mobj->source_cc, "REPLCONF ip-address %s", mobj->host) != REDIS_OK) {
|
||||
serverLog(LL_WARNING, "REPLCONF ip-address %s failed", mobj->host);
|
||||
goto error;
|
||||
}
|
||||
RedisModule_Log(ctx, REDISMODULE_LOGLEVEL_NOTICE, "REPLCONF ip-address %s ", mobj->host);
|
||||
freeReplyObject(reply);
|
||||
reply = redisCommand(mobj->source_cc, "REPLCONF %s %s %s %s", "capa", "eof", "capa", "psync2");
|
||||
if (reply->type == REDIS_REPLY_ERROR) {
|
||||
RedisModule_Log(ctx, REDISMODULE_LOGLEVEL_WARNING,
|
||||
"REPLCONF capa eof capa psync2 failed ip:%s,port:%d, error:%s",
|
||||
mobj->host, mobj->port, reply->str);
|
||||
sdsfree(err);
|
||||
err = NULL;
|
||||
mobj->repl_stat = REPL_STATE_RECEIVE_IP_REPLY;
|
||||
return;
|
||||
}
|
||||
if (mobj->repl_stat == REPL_STATE_RECEIVE_IP_REPLY) {
|
||||
err = redisReceive();
|
||||
if (err == NULL)
|
||||
goto no_response_error;
|
||||
if (err[0] == '-') {
|
||||
serverLog(LL_NOTICE, "(Non critical) Master does not understand REPLCONF ip-address: %s", err);
|
||||
goto error;
|
||||
}
|
||||
RedisModule_Log(ctx, REDISMODULE_LOGLEVEL_NOTICE, "REPLCONF %s %s %s %s",
|
||||
"capa", "eof", "capa", "psync2");
|
||||
freeReplyObject(reply);
|
||||
if (!sendSyncCommand(ctx)) {
|
||||
RedisModule_Log(ctx, REDISMODULE_LOGLEVEL_WARNING,
|
||||
"send PSYNC %s %s failed ip:%s,port:%d, ",
|
||||
serverLog(LL_NOTICE, "REPLCONF REPLCONF ip-address success");
|
||||
if (redisSendCommand(mobj->source_cc, "REPLCONF %s %s %s %s", "capa", "eof", "capa", "psync2") != REDIS_OK) {
|
||||
serverLog(LL_WARNING, "send REPLCONF capa eof capa psync2 failed");
|
||||
goto error;
|
||||
}
|
||||
sdsfree(err);
|
||||
err = NULL;
|
||||
mobj->repl_stat = REPL_STATE_RECEIVE_CAPA_REPLY;
|
||||
return;
|
||||
}
|
||||
if (mobj->repl_stat == REPL_STATE_RECEIVE_CAPA_REPLY) {
|
||||
err = redisReceive();
|
||||
if (err == NULL)
|
||||
goto no_response_error;
|
||||
if (err[0] == '-') {
|
||||
serverLog(LL_NOTICE, "(Non critical) Master does not understand REPLCONF capa: %s", err);
|
||||
goto error;
|
||||
}
|
||||
serverLog(LL_NOTICE, "REPLCONF capa eof capa psync2 success");
|
||||
sdsfree(err);
|
||||
err = NULL;
|
||||
mobj->repl_stat = REPL_STATE_SEND_PSYNC;
|
||||
return;
|
||||
}
|
||||
if (mobj->repl_stat == REPL_STATE_SEND_PSYNC) {
|
||||
if (!sendSyncCommand()) {
|
||||
serverLog(LL_WARNING, "send PSYNC %s %s failed ip:%s,port:%d, ",
|
||||
mobj->psync_replid, mobj->psync_offset, mobj->host, mobj->port);
|
||||
goto error;
|
||||
}
|
||||
|
||||
return NULL;
|
||||
mobj->repl_stat = REPL_STATE_RECEIVE_PSYNC_REPLY;
|
||||
return;
|
||||
}
|
||||
if (mobj->repl_stat != REPL_STATE_RECEIVE_PSYNC_REPLY) {
|
||||
serverLog(LL_WARNING, "syncDataWithRedis(): state machine error, state should be RECEIVE_PSYNC but is %d",
|
||||
mobj->repl_stat);
|
||||
goto error;
|
||||
}
|
||||
int psync_result = receiveDataFromRedis();
|
||||
if (psync_result == PSYNC_WAIT_REPLY)
|
||||
return;
|
||||
if (psync_result == PSYNC_TRY_LATER)
|
||||
goto error;
|
||||
if (psync_result == PSYNC_CONTINUE) {
|
||||
}
|
||||
// 接受全部数据
|
||||
serverLog(LL_NOTICE, "begin receive full data");
|
||||
readFullData();
|
||||
return;
|
||||
error:
|
||||
freeReplyObject(reply);
|
||||
freeMigrateObj(mobj);
|
||||
return NULL;
|
||||
if (err != NULL) {
|
||||
sdsfree(err);
|
||||
}
|
||||
|
||||
int connectRedis(RedisModuleCtx *ctx) {
|
||||
pthread_t pthread;
|
||||
int flag = pthread_create(&pthread, NULL, syncWithRedis, ctx);
|
||||
if (flag != 0) {
|
||||
RedisModule_Log(ctx, REDISMODULE_LOGLEVEL_WARNING, "Can't start thread");
|
||||
freeMigrateObj(mobj);
|
||||
return RedisModule_ReplyWithError(ctx, "Can't start thread");
|
||||
}
|
||||
pthread_join(pthread, NULL);
|
||||
return RedisModule_ReplyWithSimpleString(ctx, "OK");
|
||||
no_response_error: /* Handle receiveSynchronousResponse() error when master has no reply */
|
||||
serverLog(LL_WARNING, "Master did not respond to command during SYNC handshake");
|
||||
}
|
||||
|
||||
/**
|
||||
* migrate data to current instance.
|
||||
* migrate host port begin-slot end-slot
|
||||
* @param ctx
|
||||
* @param ctxz
|
||||
* @param argv
|
||||
* @param argc
|
||||
* @return
|
||||
@ -130,14 +329,18 @@ int rm_migrateCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
|
||||
if (argc != 5) {
|
||||
return RedisModule_WrongArity(ctx);
|
||||
}
|
||||
if (RedisModule_IsKeysPositionRequest(ctx)) {
|
||||
RedisModule_Log(ctx, VERBOSE, "get keys from module");
|
||||
return RedisModule_ReplyWithSimpleString(ctx, "OK");
|
||||
}
|
||||
robj *host = (robj *)argv[1];
|
||||
robj *port = (robj *)argv[2];
|
||||
robj *begin_slot = (robj *)argv[3];
|
||||
robj *end_slot = (robj *)argv[4];
|
||||
RedisModule_Log(ctx, REDISMODULE_LOGLEVEL_NOTICE, "host:%s, port:%s, begin:%s, end:%s", (char *) host->ptr,
|
||||
RedisModule_Log(ctx, NOTICE, "host:%s, port:%s, begin:%s, end:%s", (char *)host->ptr,
|
||||
(char *)port->ptr, (char *)begin_slot->ptr, (char *)end_slot->ptr);
|
||||
if (mobj != NULL) {
|
||||
return RedisModule_ReplyWithError(ctx, "-ERR is migrating, please waiting");
|
||||
return RedisModule_ReplyWithError(ctx, "migrating, please waiting");
|
||||
}
|
||||
mobj = createMigrateObject(host, atoi(port->ptr), atoi(begin_slot->ptr), atoi(end_slot->ptr));
|
||||
struct timeval timeout = {1, 500000}; // 1.5s
|
||||
@ -146,19 +349,17 @@ int rm_migrateCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
|
||||
options.connect_timeout = &timeout;
|
||||
mobj->source_cc = redisConnectWithOptions(&options);
|
||||
if (mobj->source_cc == NULL || mobj->source_cc->err) {
|
||||
RedisModule_Log(ctx, REDISMODULE_LOGLEVEL_WARNING, "Could not connect to Redis at ip:%s,port:%d, error:%s",
|
||||
RedisModule_Log(ctx, WARNING, "Could not connect to Redis at ip:%s,port:%d, error:%s",
|
||||
mobj->host, mobj->port, mobj->source_cc->errstr);
|
||||
freeMigrateObj(mobj);
|
||||
return RedisModule_ReplyWithError(ctx, "Can't connect source redis");
|
||||
}
|
||||
return connectRedis(ctx);
|
||||
|
||||
mobj->repl_stat = REPL_STATE_CONNECTING;
|
||||
RedisModule_EventLoopAdd(mobj->source_cc->fd, AE_WRITABLE, syncDataWithRedis, ctx);
|
||||
return RedisModule_ReplyWithSimpleString(ctx, "OK");
|
||||
}
|
||||
|
||||
/* This function must be present on each Redis module. It is used in order to
|
||||
* register the commands into the Redis server. */
|
||||
int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
|
||||
|
||||
REDISMODULE_NOT_USED(argv);
|
||||
REDISMODULE_NOT_USED(argc);
|
||||
|
||||
@ -166,13 +367,23 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
|
||||
if (flag == REDISMODULE_ERR) {
|
||||
return REDISMODULE_ERR;
|
||||
}
|
||||
RedisModule_SetClusterFlags(ctx, REDISMODULE_CLUSTER_FLAG_NO_REDIRECTION);
|
||||
RedisModule_Log(ctx, REDISMODULE_LOGLEVEL_NOTICE, "begin init commands of %s", MODULE_NAME);
|
||||
flag = RedisModule_CreateCommand(ctx, "rm.migrate", rm_migrateCommand, "write deny-oom admin", 1, 1, 0);
|
||||
|
||||
RedisModule_Log(ctx, NOTICE, "begin init commands of %s", MODULE_NAME);
|
||||
flag = RedisModule_CreateCommand(ctx, "rm.migrate", rm_migrateCommand, "write deny-oom admin getkeys-api", 0, 0, 0);
|
||||
if (flag == REDISMODULE_ERR) {
|
||||
RedisModule_Log(ctx, REDISMODULE_LOGLEVEL_WARNING, "init rm.migrate failed");
|
||||
RedisModule_Log(ctx, WARNING, "init rm.migrate failed");
|
||||
return REDISMODULE_ERR;
|
||||
}
|
||||
RedisModule_Log(ctx, REDISMODULE_LOGLEVEL_NOTICE, "init %s success", MODULE_NAME);
|
||||
RedisModuleCallReply *reply = RedisModule_Call(ctx, "config", "cc", "get", "logfile");
|
||||
long long items = RedisModule_CallReplyLength(reply);
|
||||
if (items != 2) {
|
||||
RedisModule_Log(ctx, WARNING, "logfile is empty");
|
||||
return REDISMODULE_ERR;
|
||||
}
|
||||
RedisModuleCallReply *item1 = RedisModule_CallReplyArrayElement(reply, 1);
|
||||
robj *logfile = (robj *)RedisModule_CreateStringFromCallReply(item1);
|
||||
RedisModule_Log(ctx, NOTICE, "logfile is %s", (char *)logfile->ptr);
|
||||
createLogObj((char *)logfile->ptr);
|
||||
RedisModule_Log(ctx, NOTICE, "init %s success", MODULE_NAME);
|
||||
return REDISMODULE_OK;
|
||||
}
|
||||
|
@ -6,27 +6,37 @@
|
||||
#include "ae.h"
|
||||
#include "sds.h"
|
||||
#include "sdscompat.h"
|
||||
#include "log.h"
|
||||
|
||||
#define MODULE_NAME "redis-migrate"
|
||||
#define REDIS_MIGRATE_VERSION 1
|
||||
#define LRU_BITS 24
|
||||
#define CONFIG_RUN_ID_SIZE 40
|
||||
#define RDB_EOF_MARK_SIZE 40
|
||||
#define PSYNC_WRITE_ERROR 0
|
||||
#define PSYNC_WAIT_REPLY 1
|
||||
#define PSYNC_CONTINUE 2
|
||||
#define PSYNC_FULLRESYNC 3
|
||||
#define PSYNC_NOT_SUPPORTED 4
|
||||
#define PSYNC_TRY_LATER 5
|
||||
#define C_ERR -1
|
||||
#define C_OK 1
|
||||
#define PROTO_IOBUF_LEN (1024 * 16)
|
||||
|
||||
/* Anti-warning macro... */
|
||||
#define UNUSED(V) ((void)V)
|
||||
|
||||
typedef struct redisObject {
|
||||
typedef struct redisObject
|
||||
{
|
||||
unsigned type : 4;
|
||||
unsigned encoding : 4;
|
||||
unsigned lru: LRU_BITS; /* LRU time (relative to global lru_clock) or
|
||||
* LFU data (least significant 8 bits frequency
|
||||
* and most significant 16 bits access time). */
|
||||
unsigned lru : LRU_BITS;
|
||||
int refcount;
|
||||
void *ptr;
|
||||
} robj;
|
||||
|
||||
typedef struct migrateObject {
|
||||
typedef struct migrateObject
|
||||
{
|
||||
char *address;
|
||||
int repl_stat;
|
||||
redisContext *source_cc;
|
||||
@ -35,11 +45,17 @@ typedef struct migrateObject {
|
||||
int begin_slot;
|
||||
int end_slot;
|
||||
char *psync_replid;
|
||||
char master_replid[CONFIG_RUN_ID_SIZE + 1];
|
||||
int timeout;
|
||||
int isCache;
|
||||
char psync_offset[32];
|
||||
int repl_transfer_size;
|
||||
long long master_initial_offset;
|
||||
time_t repl_transfer_lastio;
|
||||
} migrateObj;
|
||||
|
||||
typedef enum {
|
||||
typedef enum
|
||||
{
|
||||
REPL_STATE_NONE = 0, /* No active replication */
|
||||
REPL_STATE_CONNECT, /* Must connect to master */
|
||||
REPL_STATE_CONNECTING, /* Connecting to master */
|
||||
@ -55,21 +71,33 @@ typedef enum {
|
||||
/* --- End of handshake states --- */
|
||||
REPL_STATE_TRANSFER, /* Receiving .rdb from master */
|
||||
REPL_STATE_CONNECTED, /* Connected to master */
|
||||
STATE_CONNECT_ERROR,
|
||||
STATE_DISCONNECT
|
||||
} repl_state;
|
||||
|
||||
long long ustime(void);
|
||||
|
||||
mstime_t mstime(void);
|
||||
|
||||
migrateObj *createMigrateObject(robj *host, int port, int begin_slot, int end_slot);
|
||||
|
||||
void freeMigrateObj(migrateObj *m);
|
||||
|
||||
int sendSyncCommand(RedisModuleCtx *ctx);
|
||||
int sendSyncCommand();
|
||||
|
||||
void *syncWithRedis(void *arg);
|
||||
int receiveDataFromRedis();
|
||||
|
||||
int connectRedis(RedisModuleCtx *ctx) ;
|
||||
ssize_t syncWrite(int fd, char *ptr, ssize_t size, long long timeout);
|
||||
|
||||
ssize_t syncRead(int fd, char *ptr, ssize_t size, long long timeout);
|
||||
|
||||
ssize_t syncReadLine(int fd, char *ptr, ssize_t size, long long timeout);
|
||||
|
||||
sds redisReceive();
|
||||
|
||||
void readFullData();
|
||||
|
||||
void cancelMigrate();
|
||||
|
||||
void syncDataWithRedis(int fd, void *user_data, int mask);
|
||||
|
||||
int rm_migrateCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc);
|
||||
|
||||
|
@ -220,7 +220,7 @@ This flag should not be used directly by the module.
|
||||
|
||||
/* Logging level strings */
|
||||
#define REDISMODULE_LOGLEVEL_DEBUG "debug"
|
||||
#define REDISMODULE_LOGLEVEL_VERBOSE "verbose"
|
||||
#define REDISMODULE_LOGLEVEL_VERBOSE "VERBOSE"
|
||||
#define REDISMODULE_LOGLEVEL_NOTICE "notice"
|
||||
#define REDISMODULE_LOGLEVEL_WARNING "warning"
|
||||
|
||||
|
117
src/syncio.c
Normal file
117
src/syncio.c
Normal file
@ -0,0 +1,117 @@
|
||||
|
||||
#include "redis-migrate.h"
|
||||
#include "ae.h"
|
||||
#include <unistd.h>
|
||||
#include <errno.h>
|
||||
|
||||
/* ----------------- Blocking sockets I/O with timeouts --------------------- */
|
||||
|
||||
#define SYNCIO__RESOLUTION 10
|
||||
|
||||
ssize_t syncWrite(int fd, char *ptr, ssize_t size, long long timeout)
|
||||
{
|
||||
ssize_t nwritten, ret = size;
|
||||
long long start = mstime();
|
||||
long long remaining = timeout;
|
||||
|
||||
while (1)
|
||||
{
|
||||
long long wait = (remaining > SYNCIO__RESOLUTION) ? remaining : SYNCIO__RESOLUTION;
|
||||
long long elapsed;
|
||||
|
||||
nwritten = write(fd, ptr, size);
|
||||
if (nwritten == -1)
|
||||
{
|
||||
if (errno != EAGAIN)
|
||||
return -1;
|
||||
}
|
||||
else
|
||||
{
|
||||
ptr += nwritten;
|
||||
size -= nwritten;
|
||||
}
|
||||
if (size == 0)
|
||||
return ret;
|
||||
|
||||
/* Wait */
|
||||
aeWait(fd, AE_WRITABLE, wait);
|
||||
elapsed = mstime() - start;
|
||||
if (elapsed >= timeout)
|
||||
{
|
||||
errno = ETIMEDOUT;
|
||||
return -1;
|
||||
}
|
||||
remaining = timeout - elapsed;
|
||||
}
|
||||
}
|
||||
|
||||
ssize_t syncRead(int fd, char *ptr, ssize_t size, long long timeout)
|
||||
{
|
||||
ssize_t nread, totread = 0;
|
||||
long long start = mstime();
|
||||
long long remaining = timeout;
|
||||
|
||||
if (size == 0)
|
||||
return 0;
|
||||
while (1)
|
||||
{
|
||||
long long wait = (remaining > SYNCIO__RESOLUTION) ? remaining : SYNCIO__RESOLUTION;
|
||||
long long elapsed;
|
||||
|
||||
nread = read(fd, ptr, size);
|
||||
if (nread == 0)
|
||||
return -1;
|
||||
if (nread == -1)
|
||||
{
|
||||
if (errno != EAGAIN)
|
||||
return -1;
|
||||
}
|
||||
else
|
||||
{
|
||||
ptr += nread;
|
||||
size -= nread;
|
||||
totread += nread;
|
||||
}
|
||||
if (size == 0)
|
||||
return totread;
|
||||
|
||||
/* Wait */
|
||||
aeWait(fd, AE_READABLE, wait);
|
||||
elapsed = mstime() - start;
|
||||
if (elapsed >= timeout)
|
||||
{
|
||||
errno = ETIMEDOUT;
|
||||
return -1;
|
||||
}
|
||||
remaining = timeout - elapsed;
|
||||
}
|
||||
}
|
||||
|
||||
ssize_t syncReadLine(int fd, char *ptr, ssize_t size, long long timeout)
|
||||
{
|
||||
ssize_t nread = 0;
|
||||
|
||||
size--;
|
||||
while (size)
|
||||
{
|
||||
char c;
|
||||
|
||||
if (syncRead(fd, &c, 1, timeout) == -1)
|
||||
return -1;
|
||||
if (c == '\n')
|
||||
{
|
||||
*ptr = '\0';
|
||||
if (nread && *(ptr - 1) == '\r')
|
||||
*(ptr - 1) = '\0';
|
||||
return nread;
|
||||
}
|
||||
else
|
||||
{
|
||||
*ptr++ = c;
|
||||
*ptr = '\0';
|
||||
nread++;
|
||||
}
|
||||
size--;
|
||||
}
|
||||
return nread;
|
||||
}
|
Loading…
Reference in New Issue
Block a user