This commit is contained in:
LingZhaoHui 2022-06-22 23:48:41 +08:00
parent 947938c994
commit f2982447b5
4 changed files with 458 additions and 162 deletions

213
.clang-format Normal file
View File

@ -0,0 +1,213 @@
# 语言: None, Cpp, Java, JavaScript, ObjC, Proto, TableGen, TextProto
Language: Cpp
# BasedOnStyle: LLVM
# 访问说明符(public、private等)的偏移
AccessModifierOffset: -4
# 开括号(开圆括号、开尖括号、开方括号)后的对齐: Align, DontAlign, AlwaysBreak(总是在开括号后换行)
AlignAfterOpenBracket: Align
# 连续赋值时,对齐所有等号
AlignConsecutiveAssignments: false
# 连续声明时,对齐所有声明的变量名
AlignConsecutiveDeclarations: false
# 右对齐逃脱换行(使用反斜杠换行)的反斜杠
AlignEscapedNewlines: Right
# 水平对齐二元和三元表达式的操作数
AlignOperands: true
# 对齐连续的尾随的注释
AlignTrailingComments: true
# 不允许函数声明的所有参数在放在下一行
AllowAllParametersOfDeclarationOnNextLine: false
# 不允许短的块放在同一行
AllowShortBlocksOnASingleLine: true
# 允许短的case标签放在同一行
AllowShortCaseLabelsOnASingleLine: true
# 允许短的函数放在同一行: None, InlineOnly(定义在类中), Empty(空函数), Inline(定义在类中,空函数), All
AllowShortFunctionsOnASingleLine: None
# 允许短的if语句保持在同一行
AllowShortIfStatementsOnASingleLine: true
# 允许短的循环保持在同一行
AllowShortLoopsOnASingleLine: true
# 总是在返回类型后换行: None, All, TopLevel(顶级函数,不包括在类中的函数),
# AllDefinitions(所有的定义,不包括声明), TopLevelDefinitions(所有的顶级函数的定义)
AlwaysBreakAfterReturnType: None
# 总是在多行string字面量前换行
AlwaysBreakBeforeMultilineStrings: false
# 总是在template声明后换行
AlwaysBreakTemplateDeclarations: true
# false表示函数实参要么都在同一行要么都各自一行
BinPackArguments: true
# false表示所有形参要么都在同一行要么都各自一行
BinPackParameters: true
# 大括号换行只有当BreakBeforeBraces设置为Custom时才有效
BraceWrapping:
# class定义后面
AfterClass: false
# 控制语句后面
AfterControlStatement: false
# enum定义后面
AfterEnum: false
# 函数定义后面
AfterFunction: false
# 命名空间定义后面
AfterNamespace: false
# struct定义后面
AfterStruct: false
# union定义后面
AfterUnion: false
# extern之后
AfterExternBlock: false
# catch之前
BeforeCatch: false
# else之前
BeforeElse: false
# 缩进大括号
IndentBraces: false
# 分离空函数
SplitEmptyFunction: false
# 分离空语句
SplitEmptyRecord: false
# 分离空命名空间
SplitEmptyNamespace: false
# 在二元运算符前换行: None(在操作符后换行), NonAssignment(在非赋值的操作符前换行), All(在操作符前换行)
BreakBeforeBinaryOperators: NonAssignment
# 在大括号前换行: Attach(始终将大括号附加到周围的上下文), Linux(除函数、命名空间和类定义与Attach类似),
# Mozilla(除枚举、函数、记录定义与Attach类似), Stroustrup(除函数定义、catch、else与Attach类似),
# Allman(总是在大括号前换行), GNU(总是在大括号前换行,并对于控制语句的大括号增加额外的缩进), WebKit(在函数前换行), Custom
# 注:这里认为语句块也属于函数
BreakBeforeBraces: Custom
# 在三元运算符前换行
BreakBeforeTernaryOperators: false
# 在构造函数的初始化列表的冒号后换行
BreakConstructorInitializers: AfterColon
#BreakInheritanceList: AfterColon
BreakStringLiterals: false
# 每行字符的限制0表示没有限制
ColumnLimit: 0
CompactNamespaces: true
# 构造函数的初始化列表要么都在同一行,要么都各自一行
ConstructorInitializerAllOnOneLineOrOnePerLine: false
# 构造函数的初始化列表的缩进宽度
ConstructorInitializerIndentWidth: 4
# 延续的行的缩进宽度
ContinuationIndentWidth: 4
# 去除C++11的列表初始化的大括号{后和}前的空格
Cpp11BracedListStyle: true
# 继承最常用的指针和引用的对齐方式
DerivePointerAlignment: false
# 固定命名空间注释
FixNamespaceComments: true
# 缩进case标签
IndentCaseLabels: false
IndentPPDirectives: None
# 缩进宽度
IndentWidth: 4
# 函数返回类型换行时,缩进函数声明或函数定义的函数名
IndentWrappedFunctionNames: false
# 保留在块开始处的空行
KeepEmptyLinesAtTheStartOfBlocks: false
# 连续空行的最大数量
MaxEmptyLinesToKeep: 1
# 命名空间的缩进: None, Inner(缩进嵌套的命名空间中的内容), All
NamespaceIndentation: None
# 指针和引用的对齐: Left, Right, Middle
PointerAlignment: Right
# 允许重新排版注释
ReflowComments: true
# 允许排序#include
SortIncludes: false
# 允许排序 using 声明
SortUsingDeclarations: false
# 在C风格类型转换后添加空格
SpaceAfterCStyleCast: false
# 在Template 关键字后面添加空格
SpaceAfterTemplateKeyword: true
# 在赋值运算符之前添加空格
SpaceBeforeAssignmentOperators: true
# SpaceBeforeCpp11BracedList: true
# SpaceBeforeCtorInitializerColon: true
# SpaceBeforeInheritanceColon: true
# 开圆括号之前添加一个空格: Never, ControlStatements, Always
SpaceBeforeParens: ControlStatements
# SpaceBeforeRangeBasedForLoopColon: true
# 在空的圆括号中添加空格
SpaceInEmptyParentheses: false
# 在尾随的评论前添加的空格数(只适用于//)
SpacesBeforeTrailingComments: 1
# 在尖括号的<后和>前添加空格
SpacesInAngles: false
# 在C风格类型转换的括号中添加空格
SpacesInCStyleCastParentheses: false
# 在容器(ObjC和JavaScript的数组和字典等)字面量中添加空格
SpacesInContainerLiterals: true
# 在圆括号的(后和)前添加空格
SpacesInParentheses: false
# 在方括号的[后和]前添加空格lamda表达式和未指明大小的数组的声明不受影响
SpacesInSquareBrackets: false
# 标准: Cpp03, Cpp11, Auto
Standard: Cpp11
# tab宽度
TabWidth: 4
# 使用tab字符: Never, ForIndentation, ForContinuationAndIndentation, Always
UseTab: Never

View File

@ -1,53 +1,178 @@
#include "rdbLoad.h" #include "rdbLoad.h"
#include <errno.h>
int rdbLoadRioWithLoading(migrateObj *mobj) #include <errno.h>
{ #include <unistd.h>
int rdbLoadRioWithLoading(migrateObj *mobj) {
char buf[1024]; char buf[1024];
int error; int error;
if (syncReadLine(mobj->source_cc->fd, buf, 9, mobj->timeout) == -1) if (syncReadLine(mobj->source_cc->fd, buf, 9, mobj->timeout) == -1) {
{ serverLog(LL_WARNING, "read version failed:%s,port:%d ", mobj->host,
serverLog(LL_WARNING, "read version failed:%s,port:%d ", mobj->host, mobj->port); mobj->port);
return C_ERR; return C_ERR;
} }
buf[9] = '\0'; buf[9] = '\0';
if (memcmp(buf, "REDIS", 5) != 0) if (memcmp(buf, "REDIS", 5) != 0) {
{
serverLog(LL_WARNING, "Wrong signature trying to load DB from file"); serverLog(LL_WARNING, "Wrong signature trying to load DB from file");
errno = EINVAL; errno = EINVAL;
return C_ERR; return C_ERR;
} }
int type, rdbver; int type, rdbver;
uint64_t dbid = 0;
rdbver = atoi(buf + 5); rdbver = atoi(buf + 5);
if (rdbver < 1 || rdbver > RDB_VERSION) if (rdbver < 1 || rdbver > RDB_VERSION) {
{
serverLog(LL_WARNING, "Can't handle RDB format version %d", rdbver); serverLog(LL_WARNING, "Can't handle RDB format version %d", rdbver);
errno = EINVAL; errno = EINVAL;
return C_ERR; return C_ERR;
} }
serverLog(LL_NOTICE, "buf=%s", buf); serverLog(LL_NOTICE, "buf=%s", buf);
long long lru_idle = -1, lfu_freq = -1, expiretime = -1, now = mstime(); long long lru_idle = -1, lfu_freq = -1, expiretime = -1, now = mstime();
while (1) while (1) {
{
sds key; sds key;
robj *val; robj *val;
if ((type = rdbLoadType(mobj)) == -1) { if ((type = rm_rdbLoadType(mobj)) == -1) {
serverLog(LL_WARNING, "read type failed"); serverLog(LL_WARNING, "read type failed");
return C_ERR; return C_ERR;
} }
if (type == RDB_OPCODE_EXPIRETIME) { if (type == RDB_OPCODE_EXPIRETIME) {
expiretime = rm_rdbLoadTime(mobj);
if (expiretime == -1) {
return C_ERR;
}
expiretime *= 1000;
continue;
} else if (type == RDB_OPCODE_EXPIRETIME_MS) {
expiretime = rm_rdbLoadMillisecondTime(mobj, rdbver);
continue;
} else if (type == RDB_OPCODE_FREQ) {
uint8_t byte;
if (read(mobj->source_cc->fd, &byte, 1) == 0)
return C_ERR;
lfu_freq = byte;
continue;
} else if (type == RDB_OPCODE_IDLE) {
uint64_t qword;
if ((qword = rm_rdbLoadLen(mobj, NULL)) == RDB_LENERR)
return C_ERR;
lru_idle = qword;
continue;
} else if (type == RDB_OPCODE_EOF) {
break;
} else if (type == RDB_OPCODE_SELECTDB) {
if ((dbid = rm_rdbLoadLen(mobj, NULL)) == RDB_LENERR) return C_ERR;
continue;
} else if (type == RDB_OPCODE_RESIZEDB) {
uint64_t db_size, expires_size;
if ((db_size = rm_rdbLoadLen(mobj, NULL)) == RDB_LENERR) return C_ERR;
if ((expires_size = rm_rdbLoadLen(mobj, NULL)) == RDB_LENERR) return C_ERR;
continue;
} else if (type == RDB_OPCODE_AUX) {
robj *auxkey, *auxval;
} else if (type == RDB_OPCODE_FUNCTION || type == RDB_OPCODE_FUNCTION2) {
} }
} }
} }
void *rm_rdbGenericLoadStringObject(migrateObj *mobj, int flags, size_t *lenptr) {
int rdbLoadType(migrateObj *mobj) { int encode = flags & RDB_LOAD_ENC;
char buf[1]; int plain = flags & RDB_LOAD_PLAIN;
syncReadLine(mobj->source_cc->fd, buf, 1, mobj->timeout); int sds = flags & RDB_LOAD_SDS;
return buf[0]; int isencoded;
unsigned long long len;
} }
time_t rdbLoadTime(migrateObj *mobj) { uint64_t rm_rdbLoadLen(migrateObj *mobj, int *isencoded) {
uint64_t len;
}
if (rdbLoadLenByRef(mobj, isencoded, &len) == -1)
return RDB_LENERR;
return len;
}
int rm_rdbLoadType(migrateObj *mobj) {
unsigned char type;
if (read(mobj->source_cc->fd, &type, 1) == 0) {
return -1;
}
return type;
}
int rm_rdbLoadLenByRef(migrateObj *mobj, int *isencoded, uint64_t *lenptr) {
unsigned char buf[2];
int type;
if (isencoded)
*isencoded = 0;
if (read(mobj->source_cc->fd, buf, 1) == 0) {
return -1;
}
type = (buf[0] & 0xC0) >> 6;
if (type == RDB_ENCVAL) {
if (isencoded)
*isencoded = 1;
*lenptr = buf[0] & 0x3F;
} else if (type == RDB_6BITLEN) {
/* Read a 6 bit len. */
*lenptr = buf[0] & 0x3F;
} else if (type == RDB_14BITLEN) {
if (read(mobj->source_cc->fd, buf + 1, 1) == 0)
return -1;
*lenptr = ((buf[0] & 0x3F) << 8) | buf[1];
} else if (buf[0] == RDB_32BITLEN) {
uint32_t len;
if (read(mobj->source_cc->fd, &len, 4) == 0)
return -1;
*lenptr = ntohl(len);
} else if (buf[0] == RDB_64BITLEN) {
uint64_t len;
if (read(mobj->source_cc->fd, &len, 8) == 0)
return -1;
*lenptr = ntohu64(len);
} else {
return -1;
}
return 0;
}
time_t rm_rdbLoadTime(migrateObj *mobj) {
int32_t t32;
if (read(mobj->source_cc->fd, &t32, 4) == 0) {
return -1;
}
t32 = (char *)t32 + 4;
return (time_t)t32;
}
long long rm_rdbLoadMillisecondTime(migrateObj *mobj, int rdbver) {
int64_t t64;
if (read(mobj->source_cc->fd, &t64, 8) == 0) {
return LLONG_MAX;
}
if (rdbver >= 9)
memrev64(&t64);
return (long long)t64;
}
uint64_t rm_rdbLoadLen(migrateObj *mobj, int *isencoded) {
}
void rm_memrev64(void *p) {
unsigned char *x = p, t;
t = x[0];
x[0] = x[7];
x[7] = t;
t = x[1];
x[1] = x[6];
x[6] = t;
t = x[2];
x[2] = x[5];
x[5] = t;
t = x[3];
x[3] = x[4];
x[4] = t;
}

View File

@ -2,24 +2,49 @@
#define RDB_LOAD_REDIS_MIGRATE_H #define RDB_LOAD_REDIS_MIGRATE_H
#include "redis-migrate.h" #include "redis-migrate.h"
#define RDB_6BITLEN 0
#define RDB_14BITLEN 1
#define RDB_32BITLEN 0x80
#define RDB_64BITLEN 0x81
#define RDB_ENCVAL 3
#define RDB_LENERR UINT64_MAX
#define RDB_VERSION 10 #define RDB_VERSION 10
#define RDB_OPCODE_FUNCTION2 245 /* function library data */ #define RDB_OPCODE_FUNCTION2 245 /* function library data */
#define RDB_OPCODE_FUNCTION 246 /* old function library data for 7.0 rc1 and rc2 */ #define RDB_OPCODE_FUNCTION 246 /* old function library data for 7.0 rc1 and rc2 */
#define RDB_OPCODE_MODULE_AUX 247 /* Module auxiliary data. */ #define RDB_OPCODE_MODULE_AUX 247 /* Module auxiliary data. */
#define RDB_OPCODE_IDLE 248 /* LRU idle time. */ #define RDB_OPCODE_IDLE 248 /* LRU idle time. */
#define RDB_OPCODE_FREQ 249 /* LFU frequency. */ #define RDB_OPCODE_FREQ 249 /* LFU frequency. */
#define RDB_OPCODE_AUX 250 /* RDB aux field. */ #define RDB_OPCODE_AUX 250 /* RDB aux field. */
#define RDB_OPCODE_RESIZEDB 251 /* Hash table resize hint. */ #define RDB_OPCODE_RESIZEDB 251 /* Hash table resize hint. */
#define RDB_OPCODE_EXPIRETIME_MS 252 /* Expire time in milliseconds. */ #define RDB_OPCODE_EXPIRETIME_MS 252 /* Expire time in milliseconds. */
#define RDB_OPCODE_EXPIRETIME 253 /* Old expire time in seconds. */ #define RDB_OPCODE_EXPIRETIME 253 /* Old expire time in seconds. */
#define RDB_OPCODE_SELECTDB 254 /* DB number of the following keys. */ #define RDB_OPCODE_SELECTDB 254 /* DB number of the following keys. */
#define RDB_OPCODE_EOF 255 /* End of the RDB file. */ #define RDB_OPCODE_EOF 255 /* End of the RDB file. */
int rdbLoadRioWithLoading(migrateObj *mobj); #define RDB_LOAD_NONE 0
#define RDB_LOAD_ENC (1 << 0)
#define RDB_LOAD_PLAIN (1 << 1)
#define RDB_LOAD_SDS (1 << 2)
int rdbLoadType(migrateObj *mobj); #define LLONG_MAX __LONG_LONG_MAX__
time_t rdbLoadTime(migrateObj *mobj); int rm_rdbLoadRioWithLoading(migrateObj *mobj);
int rm_rdbLoadType(migrateObj *mobj);
time_t rm_rdbLoadTime(migrateObj *mobj);
long long rm_rdbLoadMillisecondTime(migrateObj *mobj, int rdbver);
uint64_t rm_rdbLoadLen(migrateObj *mobi, int *isencoded);
int rm_rdbLoadLenByRef(migrateObj *mobi, int *isencoded, uint64_t *lenptr);
void rm_memrev64(void *p);
uint64_t rm_rdbLoadLen(migrateObj *mobj, int *isencoded);
void *rm_rdbGenericLoadStringObject(migrateObj *mobj, int flags, size_t *lenptr);
#endif #endif

View File

@ -8,8 +8,7 @@
static migrateObj *mobj; static migrateObj *mobj;
migrateObj *createMigrateObject(robj *host, int port, int begin_slot, int end_slot) migrateObj *createMigrateObject(robj *host, int port, int begin_slot, int end_slot) {
{
migrateObj *m; migrateObj *m;
m = hi_malloc(sizeof(*m)); m = hi_malloc(sizeof(*m));
m->host = host->ptr; m->host = host->ptr;
@ -23,15 +22,13 @@ migrateObj *createMigrateObject(robj *host, int port, int begin_slot, int end_sl
return m; return m;
} }
void freeMigrateObj(migrateObj *m) void freeMigrateObj(migrateObj *m) {
{
redisFree(m->source_cc); redisFree(m->source_cc);
hi_free(m); hi_free(m);
mobj = NULL; mobj = NULL;
} }
long long ustime(void) long long ustime(void) {
{
struct timeval tv; struct timeval tv;
long long ust; long long ust;
@ -42,27 +39,22 @@ long long ustime(void)
} }
/* Return the UNIX time in milliseconds */ /* Return the UNIX time in milliseconds */
mstime_t mstime(void) mstime_t mstime(void) {
{
return ustime() / 1000; return ustime() / 1000;
} }
int sendSyncCommand() int sendSyncCommand() {
{ if (!mobj->isCache) {
if (!mobj->isCache)
{
mobj->isCache = 1; mobj->isCache = 1;
mobj->psync_replid = "?"; mobj->psync_replid = "?";
memcpy(mobj->psync_offset, "-1", 3); memcpy(mobj->psync_offset, "-1", 3);
} }
if (redisAppendCommand(mobj->source_cc, "PSYNC %s %s", mobj->psync_replid, mobj->psync_offset) != REDIS_OK) if (redisAppendCommand(mobj->source_cc, "PSYNC %s %s", mobj->psync_replid, mobj->psync_offset) != REDIS_OK) {
{
serverLog(LL_WARNING, "append PSYNC %s %s failed ip:%s,port:%d, ", serverLog(LL_WARNING, "append PSYNC %s %s failed ip:%s,port:%d, ",
mobj->psync_replid, mobj->psync_offset, mobj->host, mobj->port); mobj->psync_replid, mobj->psync_offset, mobj->host, mobj->port);
return 0; return 0;
} }
if (redisFlush(mobj->source_cc) != REDIS_OK) if (redisFlush(mobj->source_cc) != REDIS_OK) {
{
serverLog(LL_WARNING, "send PSYNC %s %s failed ip:%s,port:%d, ", serverLog(LL_WARNING, "send PSYNC %s %s failed ip:%s,port:%d, ",
mobj->psync_replid, mobj->psync_offset, mobj->host, mobj->port); mobj->psync_replid, mobj->psync_offset, mobj->host, mobj->port);
return 0; return 0;
@ -71,56 +63,46 @@ int sendSyncCommand()
return 1; return 1;
} }
sds redisReceive() sds redisReceive() {
{
char buf[256]; char buf[256];
if (syncReadLine(mobj->source_cc->fd, buf, sizeof(buf), mobj->timeout) == -1) if (syncReadLine(mobj->source_cc->fd, buf, sizeof(buf), mobj->timeout) == -1) {
{
serverLog(LL_WARNING, "redisReceive failed ip:%s,port:%d ", mobj->host, mobj->port); serverLog(LL_WARNING, "redisReceive failed ip:%s,port:%d ", mobj->host, mobj->port);
return NULL; return NULL;
} }
return hi_sdsnew(buf); return hi_sdsnew(buf);
} }
int receiveDataFromRedis() int receiveDataFromRedis() {
{
sds reply = redisReceive(); sds reply = redisReceive();
if (reply == NULL) if (reply == NULL) {
{
serverLog(LL_WARNING, "Master did not reply to PSYNC"); serverLog(LL_WARNING, "Master did not reply to PSYNC");
return PSYNC_TRY_LATER; return PSYNC_TRY_LATER;
} }
if (sdslen(reply) == 0) if (sdslen(reply) == 0) {
{
sdsfree(reply); sdsfree(reply);
return PSYNC_WAIT_REPLY; return PSYNC_WAIT_REPLY;
} }
serverLog(LL_NOTICE, "reply=%s", reply); serverLog(LL_NOTICE, "reply=%s", reply);
if (!strncmp(reply, "+FULLRESYNC", 11)) if (!strncmp(reply, "+FULLRESYNC", 11)) {
{
char *replid = NULL, *offset = NULL; char *replid = NULL, *offset = NULL;
/* FULL RESYNC, parse the reply in order to extract the replid /* FULL RESYNC, parse the reply in order to extract the replid
* and the replication offset. */ * and the replication offset. */
replid = strchr(reply, ' '); replid = strchr(reply, ' ');
if (replid) if (replid) {
{
replid++; replid++;
offset = strchr(replid, ' '); offset = strchr(replid, ' ');
if (offset) if (offset)
offset++; offset++;
} }
if (!replid || !offset || (offset - replid - 1) != CONFIG_RUN_ID_SIZE) if (!replid || !offset || (offset - replid - 1) != CONFIG_RUN_ID_SIZE) {
{
serverLog(LL_WARNING, "Master replied with wrong +FULLRESYNC syntax."); serverLog(LL_WARNING, "Master replied with wrong +FULLRESYNC syntax.");
/* This is an unexpected condition, actually the +FULLRESYNC /* This is an unexpected condition, actually the +FULLRESYNC
* reply means that the master supports PSYNC, but the reply * reply means that the master supports PSYNC, but the reply
* format seems wrong. To stay safe we blank the master * format seems wrong. To stay safe we blank the master
* replid to make sure next PSYNCs will fail. */ * replid to make sure next PSYNCs will fail. */
memset(mobj->master_replid, 0, CONFIG_RUN_ID_SIZE + 1); memset(mobj->master_replid, 0, CONFIG_RUN_ID_SIZE + 1);
} } else {
else
{
memcpy(mobj->master_replid, replid, offset - replid - 1); memcpy(mobj->master_replid, replid, offset - replid - 1);
mobj->master_replid[CONFIG_RUN_ID_SIZE] = '\0'; mobj->master_replid[CONFIG_RUN_ID_SIZE] = '\0';
mobj->master_initial_offset = strtoll(offset, NULL, 10); mobj->master_initial_offset = strtoll(offset, NULL, 10);
@ -130,8 +112,7 @@ int receiveDataFromRedis()
sdsfree(reply); sdsfree(reply);
return PSYNC_FULLRESYNC; return PSYNC_FULLRESYNC;
} }
if (!strncmp(reply, "+CONTINUE", 9)) if (!strncmp(reply, "+CONTINUE", 9)) {
{
/* Partial resync was accepted. */ /* Partial resync was accepted. */
serverLog(LL_NOTICE, "Successful partial resynchronization with master."); serverLog(LL_NOTICE, "Successful partial resynchronization with master.");
@ -144,8 +125,7 @@ int receiveDataFromRedis()
char *end = reply + 9; char *end = reply + 9;
while (end[0] != '\r' && end[0] != '\n' && end[0] != '\0') while (end[0] != '\r' && end[0] != '\n' && end[0] != '\0')
end++; end++;
if (end - start == CONFIG_RUN_ID_SIZE) if (end - start == CONFIG_RUN_ID_SIZE) {
{
char new[CONFIG_RUN_ID_SIZE + 1]; char new[CONFIG_RUN_ID_SIZE + 1];
memcpy(new, start, CONFIG_RUN_ID_SIZE); memcpy(new, start, CONFIG_RUN_ID_SIZE);
new[CONFIG_RUN_ID_SIZE] = '\0'; new[CONFIG_RUN_ID_SIZE] = '\0';
@ -155,40 +135,31 @@ int receiveDataFromRedis()
} }
} }
void readFullData() void readFullData() {
{
static char eofmark[CONFIG_RUN_ID_SIZE]; static char eofmark[CONFIG_RUN_ID_SIZE];
static char lastbytes[CONFIG_RUN_ID_SIZE]; static char lastbytes[CONFIG_RUN_ID_SIZE];
static int usemark = 0; static int usemark = 0;
char buf[PROTO_IOBUF_LEN]; char buf[PROTO_IOBUF_LEN];
if (mobj->repl_transfer_size == -1) if (mobj->repl_transfer_size == -1) {
{
int nread = syncReadLine(mobj->source_cc->fd, buf, PROTO_IOBUF_LEN, mobj->timeout); int nread = syncReadLine(mobj->source_cc->fd, buf, PROTO_IOBUF_LEN, mobj->timeout);
if (nread == -1) if (nread == -1) {
{
serverLog(LL_WARNING, "read full data failed"); serverLog(LL_WARNING, "read full data failed");
goto error; goto error;
} }
if (buf[0] == '-') if (buf[0] == '-') {
{
serverLog(LL_WARNING, serverLog(LL_WARNING,
"MASTER aborted replication with an error: %s", "MASTER aborted replication with an error: %s",
buf + 1); buf + 1);
goto error; goto error;
} } else if (buf[0] == '\0') {
else if (buf[0] == '\0')
{
// mobj->repl_transfer_lastio = server.unixtime; // mobj->repl_transfer_lastio = server.unixtime;
return; return;
} } else if (buf[0] != '$') {
else if (buf[0] != '$')
{
serverLog(LL_WARNING, "Bad protocol from MASTER, the first byte is not '$' (we received '%s'), are you sure the host and port are right?", buf); serverLog(LL_WARNING, "Bad protocol from MASTER, the first byte is not '$' (we received '%s'), are you sure the host and port are right?", buf);
goto error; goto error;
} }
if (strncmp(buf + 1, "EOF:", 4) == 0 && strlen(buf + 5) >= CONFIG_RUN_ID_SIZE) if (strncmp(buf + 1, "EOF:", 4) == 0 && strlen(buf + 5) >= CONFIG_RUN_ID_SIZE) {
{
usemark = 1; usemark = 1;
usemark = 1; usemark = 1;
memcpy(eofmark, buf + 5, CONFIG_RUN_ID_SIZE); memcpy(eofmark, buf + 5, CONFIG_RUN_ID_SIZE);
@ -197,9 +168,7 @@ void readFullData()
* at the next call. */ * at the next call. */
mobj->repl_transfer_size = 0; mobj->repl_transfer_size = 0;
serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: receiving streamed RDB from master with EOF to parser"); serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: receiving streamed RDB from master with EOF to parser");
} } else {
else
{
usemark = 0; usemark = 0;
mobj->repl_transfer_size = strtol(buf + 1, NULL, 10); mobj->repl_transfer_size = strtol(buf + 1, NULL, 10);
serverLog(LL_NOTICE, serverLog(LL_NOTICE,
@ -209,27 +178,22 @@ void readFullData()
} }
int flag = rdbLoadRioWithLoading(mobj); int flag = rdbLoadRioWithLoading(mobj);
return; return;
error: error:
cancelMigrate(); cancelMigrate();
return; return;
} }
void cancelMigrate() void cancelMigrate() {
{
} }
void syncDataWithRedis(int fd, void *user_data, int mask) void syncDataWithRedis(int fd, void *user_data, int mask) {
{
REDISMODULE_NOT_USED(fd); REDISMODULE_NOT_USED(fd);
REDISMODULE_NOT_USED(mask); REDISMODULE_NOT_USED(mask);
REDISMODULE_NOT_USED(user_data); REDISMODULE_NOT_USED(user_data);
sds err = NULL; sds err = NULL;
if (mobj->repl_stat == REPL_STATE_CONNECTING) if (mobj->repl_stat == REPL_STATE_CONNECTING) {
{ if (redisSendCommand(mobj->source_cc, "PING") != REDIS_OK) {
if (redisSendCommand(mobj->source_cc, "PING") != REDIS_OK)
{
serverLog(LL_WARNING, "send PING failed ip:%s,port:%d", serverLog(LL_WARNING, "send PING failed ip:%s,port:%d",
mobj->host, mobj->port); mobj->host, mobj->port);
goto error; goto error;
@ -237,20 +201,15 @@ void syncDataWithRedis(int fd, void *user_data, int mask)
mobj->repl_stat = REPL_STATE_RECEIVE_PING_REPLY; mobj->repl_stat = REPL_STATE_RECEIVE_PING_REPLY;
return; return;
} }
if (mobj->repl_stat == REPL_STATE_RECEIVE_PING_REPLY) if (mobj->repl_stat == REPL_STATE_RECEIVE_PING_REPLY) {
{
err = redisReceive(); err = redisReceive();
if (err == NULL) if (err == NULL)
goto no_response_error; goto no_response_error;
if (err[0] != '+' && strncmp(err, "-NOAUTH", 7) != 0 && strncmp(err, "-NOPERM", 7) != 0 && if (err[0] != '+' && strncmp(err, "-NOAUTH", 7) != 0 && strncmp(err, "-NOPERM", 7) != 0 && strncmp(err, "-ERR operation not permitted", 28) != 0) {
strncmp(err, "-ERR operation not permitted", 28) != 0)
{
serverLog(LL_WARNING, "Error reply to PING from master: '%s'", err); serverLog(LL_WARNING, "Error reply to PING from master: '%s'", err);
sdsfree(err); sdsfree(err);
goto error; goto error;
} } else {
else
{
serverLog(LL_NOTICE, "Master replied to PING, replication can continue..."); serverLog(LL_NOTICE, "Master replied to PING, replication can continue...");
} }
sdsfree(err); sdsfree(err);
@ -258,17 +217,14 @@ void syncDataWithRedis(int fd, void *user_data, int mask)
mobj->repl_stat = REPL_STATE_SEND_HANDSHAKE; mobj->repl_stat = REPL_STATE_SEND_HANDSHAKE;
return; return;
} }
if (mobj->repl_stat == REPL_STATE_SEND_HANDSHAKE) if (mobj->repl_stat == REPL_STATE_SEND_HANDSHAKE) {
{
// todo 增加认证 // todo 增加认证
mobj->repl_stat = REPL_STATE_RECEIVE_AUTH_REPLY; mobj->repl_stat = REPL_STATE_RECEIVE_AUTH_REPLY;
} }
if (mobj->repl_stat == REPL_STATE_RECEIVE_AUTH_REPLY) if (mobj->repl_stat == REPL_STATE_RECEIVE_AUTH_REPLY) {
{
// todo 接受认证信息 // todo 接受认证信息
sds portstr = sdsfromlonglong(mobj->port); sds portstr = sdsfromlonglong(mobj->port);
if (redisSendCommand(mobj->source_cc, "REPLCONF listening-port %s", portstr) != REDIS_OK) if (redisSendCommand(mobj->source_cc, "REPLCONF listening-port %s", portstr) != REDIS_OK) {
{
serverLog(LL_WARNING, "send PING failed ip:%s,port:%d", serverLog(LL_WARNING, "send PING failed ip:%s,port:%d",
mobj->host, mobj->port); mobj->host, mobj->port);
goto error; goto error;
@ -277,19 +233,16 @@ void syncDataWithRedis(int fd, void *user_data, int mask)
sdsfree(portstr); sdsfree(portstr);
return; return;
} }
if (mobj->repl_stat == REPL_STATE_RECEIVE_PORT_REPLY) if (mobj->repl_stat == REPL_STATE_RECEIVE_PORT_REPLY) {
{
err = redisReceive(); err = redisReceive();
if (err == NULL) if (err == NULL)
goto no_response_error; goto no_response_error;
if (err[0] == '-') if (err[0] == '-') {
{
serverLog(LL_NOTICE, "(Non critical) Master does not understand REPLCONF listening-port: %s", err); serverLog(LL_NOTICE, "(Non critical) Master does not understand REPLCONF listening-port: %s", err);
goto error; goto error;
} }
serverLog(LL_NOTICE, "REPLCONF listening-port success"); serverLog(LL_NOTICE, "REPLCONF listening-port success");
if (redisSendCommand(mobj->source_cc, "REPLCONF ip-address %s", mobj->host) != REDIS_OK) if (redisSendCommand(mobj->source_cc, "REPLCONF ip-address %s", mobj->host) != REDIS_OK) {
{
serverLog(LL_WARNING, "REPLCONF ip-address %s failed", mobj->host); serverLog(LL_WARNING, "REPLCONF ip-address %s failed", mobj->host);
goto error; goto error;
} }
@ -298,19 +251,16 @@ void syncDataWithRedis(int fd, void *user_data, int mask)
mobj->repl_stat = REPL_STATE_RECEIVE_IP_REPLY; mobj->repl_stat = REPL_STATE_RECEIVE_IP_REPLY;
return; return;
} }
if (mobj->repl_stat == REPL_STATE_RECEIVE_IP_REPLY) if (mobj->repl_stat == REPL_STATE_RECEIVE_IP_REPLY) {
{
err = redisReceive(); err = redisReceive();
if (err == NULL) if (err == NULL)
goto no_response_error; goto no_response_error;
if (err[0] == '-') if (err[0] == '-') {
{
serverLog(LL_NOTICE, "(Non critical) Master does not understand REPLCONF ip-address: %s", err); serverLog(LL_NOTICE, "(Non critical) Master does not understand REPLCONF ip-address: %s", err);
goto error; goto error;
} }
serverLog(LL_NOTICE, "REPLCONF REPLCONF ip-address success"); serverLog(LL_NOTICE, "REPLCONF REPLCONF ip-address success");
if (redisSendCommand(mobj->source_cc, "REPLCONF %s %s %s %s", "capa", "eof", "capa", "psync2") != REDIS_OK) if (redisSendCommand(mobj->source_cc, "REPLCONF %s %s %s %s", "capa", "eof", "capa", "psync2") != REDIS_OK) {
{
serverLog(LL_WARNING, "send REPLCONF capa eof capa psync2 failed"); serverLog(LL_WARNING, "send REPLCONF capa eof capa psync2 failed");
goto error; goto error;
} }
@ -319,13 +269,11 @@ void syncDataWithRedis(int fd, void *user_data, int mask)
mobj->repl_stat = REPL_STATE_RECEIVE_CAPA_REPLY; mobj->repl_stat = REPL_STATE_RECEIVE_CAPA_REPLY;
return; return;
} }
if (mobj->repl_stat == REPL_STATE_RECEIVE_CAPA_REPLY) if (mobj->repl_stat == REPL_STATE_RECEIVE_CAPA_REPLY) {
{
err = redisReceive(); err = redisReceive();
if (err == NULL) if (err == NULL)
goto no_response_error; goto no_response_error;
if (err[0] == '-') if (err[0] == '-') {
{
serverLog(LL_NOTICE, "(Non critical) Master does not understand REPLCONF capa: %s", err); serverLog(LL_NOTICE, "(Non critical) Master does not understand REPLCONF capa: %s", err);
goto error; goto error;
} }
@ -335,10 +283,8 @@ void syncDataWithRedis(int fd, void *user_data, int mask)
mobj->repl_stat = REPL_STATE_SEND_PSYNC; mobj->repl_stat = REPL_STATE_SEND_PSYNC;
return; return;
} }
if (mobj->repl_stat == REPL_STATE_SEND_PSYNC) if (mobj->repl_stat == REPL_STATE_SEND_PSYNC) {
{ if (!sendSyncCommand()) {
if (!sendSyncCommand())
{
serverLog(LL_WARNING, "send PSYNC %s %s failed ip:%s,port:%d, ", serverLog(LL_WARNING, "send PSYNC %s %s failed ip:%s,port:%d, ",
mobj->psync_replid, mobj->psync_offset, mobj->host, mobj->port); mobj->psync_replid, mobj->psync_offset, mobj->host, mobj->port);
goto error; goto error;
@ -346,8 +292,7 @@ void syncDataWithRedis(int fd, void *user_data, int mask)
mobj->repl_stat = REPL_STATE_RECEIVE_PSYNC_REPLY; mobj->repl_stat = REPL_STATE_RECEIVE_PSYNC_REPLY;
return; return;
} }
if (mobj->repl_stat != REPL_STATE_RECEIVE_PSYNC_REPLY) if (mobj->repl_stat != REPL_STATE_RECEIVE_PSYNC_REPLY) {
{
serverLog(LL_WARNING, "syncDataWithRedis(): state machine error, state should be RECEIVE_PSYNC but is %d", serverLog(LL_WARNING, "syncDataWithRedis(): state machine error, state should be RECEIVE_PSYNC but is %d",
mobj->repl_stat); mobj->repl_stat);
goto error; goto error;
@ -357,16 +302,14 @@ void syncDataWithRedis(int fd, void *user_data, int mask)
return; return;
if (psync_result == PSYNC_TRY_LATER) if (psync_result == PSYNC_TRY_LATER)
goto error; goto error;
if (psync_result == PSYNC_CONTINUE) if (psync_result == PSYNC_CONTINUE) {
{
} }
// 接受全部数据 // 接受全部数据
serverLog(LL_NOTICE, "begin receive full data"); serverLog(LL_NOTICE, "begin receive full data");
readFullData(); readFullData();
return; return;
error: error:
if (err != NULL) if (err != NULL) {
{
sdsfree(err); sdsfree(err);
} }
freeMigrateObj(mobj); freeMigrateObj(mobj);
@ -382,14 +325,11 @@ no_response_error: /* Handle receiveSynchronousResponse() error when master has
* @param argc * @param argc
* @return * @return
*/ */
int rm_migrateCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) int rm_migrateCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
{ if (argc != 5) {
if (argc != 5)
{
return RedisModule_WrongArity(ctx); return RedisModule_WrongArity(ctx);
} }
if (RedisModule_IsKeysPositionRequest(ctx)) if (RedisModule_IsKeysPositionRequest(ctx)) {
{
RedisModule_Log(ctx, VERBOSE, "get keys from module"); RedisModule_Log(ctx, VERBOSE, "get keys from module");
return RedisModule_ReplyWithSimpleString(ctx, "OK"); return RedisModule_ReplyWithSimpleString(ctx, "OK");
} }
@ -399,8 +339,7 @@ int rm_migrateCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
robj *end_slot = (robj *)argv[4]; robj *end_slot = (robj *)argv[4];
RedisModule_Log(ctx, NOTICE, "host:%s, port:%s, begin:%s, end:%s", (char *)host->ptr, RedisModule_Log(ctx, NOTICE, "host:%s, port:%s, begin:%s, end:%s", (char *)host->ptr,
(char *)port->ptr, (char *)begin_slot->ptr, (char *)end_slot->ptr); (char *)port->ptr, (char *)begin_slot->ptr, (char *)end_slot->ptr);
if (mobj != NULL) if (mobj != NULL) {
{
return RedisModule_ReplyWithError(ctx, "migrating, please waiting"); return RedisModule_ReplyWithError(ctx, "migrating, please waiting");
} }
mobj = createMigrateObject(host, atoi(port->ptr), atoi(begin_slot->ptr), atoi(end_slot->ptr)); mobj = createMigrateObject(host, atoi(port->ptr), atoi(begin_slot->ptr), atoi(end_slot->ptr));
@ -409,8 +348,7 @@ int rm_migrateCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
REDIS_OPTIONS_SET_TCP(&options, (const char *)mobj->host, mobj->port); REDIS_OPTIONS_SET_TCP(&options, (const char *)mobj->host, mobj->port);
options.connect_timeout = &timeout; options.connect_timeout = &timeout;
mobj->source_cc = redisConnectWithOptions(&options); mobj->source_cc = redisConnectWithOptions(&options);
if (mobj->source_cc == NULL || mobj->source_cc->err) if (mobj->source_cc == NULL || mobj->source_cc->err) {
{
RedisModule_Log(ctx, WARNING, "Could not connect to Redis at ip:%s,port:%d, error:%s", RedisModule_Log(ctx, WARNING, "Could not connect to Redis at ip:%s,port:%d, error:%s",
mobj->host, mobj->port, mobj->source_cc->errstr); mobj->host, mobj->port, mobj->source_cc->errstr);
freeMigrateObj(mobj); freeMigrateObj(mobj);
@ -421,29 +359,24 @@ int rm_migrateCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
return RedisModule_ReplyWithSimpleString(ctx, "OK"); return RedisModule_ReplyWithSimpleString(ctx, "OK");
} }
int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
{
REDISMODULE_NOT_USED(argv); REDISMODULE_NOT_USED(argv);
REDISMODULE_NOT_USED(argc); REDISMODULE_NOT_USED(argc);
int flag = RedisModule_Init(ctx, MODULE_NAME, REDIS_MIGRATE_VERSION, REDISMODULE_APIVER_1); int flag = RedisModule_Init(ctx, MODULE_NAME, REDIS_MIGRATE_VERSION, REDISMODULE_APIVER_1);
if (flag == REDISMODULE_ERR) if (flag == REDISMODULE_ERR) {
{
return REDISMODULE_ERR; return REDISMODULE_ERR;
} }
RedisModule_Log(ctx, NOTICE, "begin init commands of %s", MODULE_NAME); RedisModule_Log(ctx, NOTICE, "begin init commands of %s", MODULE_NAME);
flag = RedisModule_CreateCommand(ctx, "rm.migrate", rm_migrateCommand, "write deny-oom admin getkeys-api", 0, 0, 0); flag = RedisModule_CreateCommand(ctx, "rm.migrate", rm_migrateCommand, "write deny-oom admin getkeys-api", 0, 0, 0);
if (flag == REDISMODULE_ERR) if (flag == REDISMODULE_ERR) {
{
RedisModule_Log(ctx, WARNING, "init rm.migrate failed"); RedisModule_Log(ctx, WARNING, "init rm.migrate failed");
return REDISMODULE_ERR; return REDISMODULE_ERR;
} }
RedisModuleCallReply *reply = RedisModule_Call(ctx, "config", "cc", "get", "logfile"); RedisModuleCallReply *reply = RedisModule_Call(ctx, "config", "cc", "get", "logfile");
long long items = RedisModule_CallReplyLength(reply); long long items = RedisModule_CallReplyLength(reply);
if (items != 2) if (items != 2) {
{
RedisModule_Log(ctx, WARNING, "logfile is empty"); RedisModule_Log(ctx, WARNING, "logfile is empty");
return REDISMODULE_ERR; return REDISMODULE_ERR;
} }