diff --git a/.clang-format b/.clang-format new file mode 100644 index 0000000..e295476 --- /dev/null +++ b/.clang-format @@ -0,0 +1,213 @@ +# 语言: None, Cpp, Java, JavaScript, ObjC, Proto, TableGen, TextProto +Language: Cpp +# BasedOnStyle: LLVM + +# 访问说明符(public、private等)的偏移 +AccessModifierOffset: -4 + +# 开括号(开圆括号、开尖括号、开方括号)后的对齐: Align, DontAlign, AlwaysBreak(总是在开括号后换行) +AlignAfterOpenBracket: Align + +# 连续赋值时,对齐所有等号 +AlignConsecutiveAssignments: false + +# 连续声明时,对齐所有声明的变量名 +AlignConsecutiveDeclarations: false + +# 右对齐逃脱换行(使用反斜杠换行)的反斜杠 +AlignEscapedNewlines: Right + +# 水平对齐二元和三元表达式的操作数 +AlignOperands: true + +# 对齐连续的尾随的注释 +AlignTrailingComments: true + +# 不允许函数声明的所有参数在放在下一行 +AllowAllParametersOfDeclarationOnNextLine: false + +# 不允许短的块放在同一行 +AllowShortBlocksOnASingleLine: true + +# 允许短的case标签放在同一行 +AllowShortCaseLabelsOnASingleLine: true + +# 允许短的函数放在同一行: None, InlineOnly(定义在类中), Empty(空函数), Inline(定义在类中,空函数), All +AllowShortFunctionsOnASingleLine: None + +# 允许短的if语句保持在同一行 +AllowShortIfStatementsOnASingleLine: true + +# 允许短的循环保持在同一行 +AllowShortLoopsOnASingleLine: true + +# 总是在返回类型后换行: None, All, TopLevel(顶级函数,不包括在类中的函数), +# AllDefinitions(所有的定义,不包括声明), TopLevelDefinitions(所有的顶级函数的定义) +AlwaysBreakAfterReturnType: None + +# 总是在多行string字面量前换行 +AlwaysBreakBeforeMultilineStrings: false + +# 总是在template声明后换行 +AlwaysBreakTemplateDeclarations: true + +# false表示函数实参要么都在同一行,要么都各自一行 +BinPackArguments: true + +# false表示所有形参要么都在同一行,要么都各自一行 +BinPackParameters: true + +# 大括号换行,只有当BreakBeforeBraces设置为Custom时才有效 +BraceWrapping: + # class定义后面 + AfterClass: false + # 控制语句后面 + AfterControlStatement: false + # enum定义后面 + AfterEnum: false + # 函数定义后面 + AfterFunction: false + # 命名空间定义后面 + AfterNamespace: false + # struct定义后面 + AfterStruct: false + # union定义后面 + AfterUnion: false + # extern之后 + AfterExternBlock: false + # catch之前 + BeforeCatch: false + # else之前 + BeforeElse: false + # 缩进大括号 + IndentBraces: false + # 分离空函数 + SplitEmptyFunction: false + # 分离空语句 + SplitEmptyRecord: false + # 分离空命名空间 + SplitEmptyNamespace: false + +# 在二元运算符前换行: None(在操作符后换行), NonAssignment(在非赋值的操作符前换行), All(在操作符前换行) +BreakBeforeBinaryOperators: NonAssignment + +# 在大括号前换行: Attach(始终将大括号附加到周围的上下文), Linux(除函数、命名空间和类定义,与Attach类似), +# Mozilla(除枚举、函数、记录定义,与Attach类似), Stroustrup(除函数定义、catch、else,与Attach类似), +# Allman(总是在大括号前换行), GNU(总是在大括号前换行,并对于控制语句的大括号增加额外的缩进), WebKit(在函数前换行), Custom +# 注:这里认为语句块也属于函数 +BreakBeforeBraces: Custom + +# 在三元运算符前换行 +BreakBeforeTernaryOperators: false + +# 在构造函数的初始化列表的冒号后换行 +BreakConstructorInitializers: AfterColon + +#BreakInheritanceList: AfterColon + +BreakStringLiterals: false + +# 每行字符的限制,0表示没有限制 +ColumnLimit: 0 + +CompactNamespaces: true + +# 构造函数的初始化列表要么都在同一行,要么都各自一行 +ConstructorInitializerAllOnOneLineOrOnePerLine: false + +# 构造函数的初始化列表的缩进宽度 +ConstructorInitializerIndentWidth: 4 + +# 延续的行的缩进宽度 +ContinuationIndentWidth: 4 + +# 去除C++11的列表初始化的大括号{后和}前的空格 +Cpp11BracedListStyle: true + +# 继承最常用的指针和引用的对齐方式 +DerivePointerAlignment: false + +# 固定命名空间注释 +FixNamespaceComments: true + +# 缩进case标签 +IndentCaseLabels: false + +IndentPPDirectives: None + +# 缩进宽度 +IndentWidth: 4 + +# 函数返回类型换行时,缩进函数声明或函数定义的函数名 +IndentWrappedFunctionNames: false + +# 保留在块开始处的空行 +KeepEmptyLinesAtTheStartOfBlocks: false + +# 连续空行的最大数量 +MaxEmptyLinesToKeep: 1 + +# 命名空间的缩进: None, Inner(缩进嵌套的命名空间中的内容), All +NamespaceIndentation: None + +# 指针和引用的对齐: Left, Right, Middle +PointerAlignment: Right + +# 允许重新排版注释 +ReflowComments: true + +# 允许排序#include +SortIncludes: false + +# 允许排序 using 声明 +SortUsingDeclarations: false + +# 在C风格类型转换后添加空格 +SpaceAfterCStyleCast: false + +# 在Template 关键字后面添加空格 +SpaceAfterTemplateKeyword: true + +# 在赋值运算符之前添加空格 +SpaceBeforeAssignmentOperators: true + +# SpaceBeforeCpp11BracedList: true + +# SpaceBeforeCtorInitializerColon: true + +# SpaceBeforeInheritanceColon: true + +# 开圆括号之前添加一个空格: Never, ControlStatements, Always +SpaceBeforeParens: ControlStatements + +# SpaceBeforeRangeBasedForLoopColon: true + +# 在空的圆括号中添加空格 +SpaceInEmptyParentheses: false + +# 在尾随的评论前添加的空格数(只适用于//) +SpacesBeforeTrailingComments: 1 + +# 在尖括号的<后和>前添加空格 +SpacesInAngles: false + +# 在C风格类型转换的括号中添加空格 +SpacesInCStyleCastParentheses: false + +# 在容器(ObjC和JavaScript的数组和字典等)字面量中添加空格 +SpacesInContainerLiterals: true + +# 在圆括号的(后和)前添加空格 +SpacesInParentheses: false + +# 在方括号的[后和]前添加空格,lamda表达式和未指明大小的数组的声明不受影响 +SpacesInSquareBrackets: false + +# 标准: Cpp03, Cpp11, Auto +Standard: Cpp11 + +# tab宽度 +TabWidth: 4 + +# 使用tab字符: Never, ForIndentation, ForContinuationAndIndentation, Always +UseTab: Never \ No newline at end of file diff --git a/src/rdbLoad.c b/src/rdbLoad.c index 03fc757..e35dd77 100644 --- a/src/rdbLoad.c +++ b/src/rdbLoad.c @@ -1,53 +1,178 @@ #include "rdbLoad.h" -#include -int rdbLoadRioWithLoading(migrateObj *mobj) -{ +#include +#include + +int rdbLoadRioWithLoading(migrateObj *mobj) { char buf[1024]; int error; - if (syncReadLine(mobj->source_cc->fd, buf, 9, mobj->timeout) == -1) - { - serverLog(LL_WARNING, "read version failed:%s,port:%d ", mobj->host, mobj->port); + if (syncReadLine(mobj->source_cc->fd, buf, 9, mobj->timeout) == -1) { + serverLog(LL_WARNING, "read version failed:%s,port:%d ", mobj->host, + mobj->port); return C_ERR; } buf[9] = '\0'; - if (memcmp(buf, "REDIS", 5) != 0) - { + if (memcmp(buf, "REDIS", 5) != 0) { serverLog(LL_WARNING, "Wrong signature trying to load DB from file"); errno = EINVAL; return C_ERR; } int type, rdbver; + uint64_t dbid = 0; rdbver = atoi(buf + 5); - if (rdbver < 1 || rdbver > RDB_VERSION) - { + if (rdbver < 1 || rdbver > RDB_VERSION) { serverLog(LL_WARNING, "Can't handle RDB format version %d", rdbver); errno = EINVAL; return C_ERR; } serverLog(LL_NOTICE, "buf=%s", buf); long long lru_idle = -1, lfu_freq = -1, expiretime = -1, now = mstime(); - while (1) - { + while (1) { sds key; robj *val; - if ((type = rdbLoadType(mobj)) == -1) { + if ((type = rm_rdbLoadType(mobj)) == -1) { serverLog(LL_WARNING, "read type failed"); return C_ERR; } + if (type == RDB_OPCODE_EXPIRETIME) { - + expiretime = rm_rdbLoadTime(mobj); + if (expiretime == -1) { + return C_ERR; + } + expiretime *= 1000; + continue; + } else if (type == RDB_OPCODE_EXPIRETIME_MS) { + expiretime = rm_rdbLoadMillisecondTime(mobj, rdbver); + continue; + } else if (type == RDB_OPCODE_FREQ) { + uint8_t byte; + if (read(mobj->source_cc->fd, &byte, 1) == 0) + return C_ERR; + lfu_freq = byte; + continue; + } else if (type == RDB_OPCODE_IDLE) { + uint64_t qword; + if ((qword = rm_rdbLoadLen(mobj, NULL)) == RDB_LENERR) + return C_ERR; + lru_idle = qword; + continue; + } else if (type == RDB_OPCODE_EOF) { + break; + } else if (type == RDB_OPCODE_SELECTDB) { + if ((dbid = rm_rdbLoadLen(mobj, NULL)) == RDB_LENERR) return C_ERR; + continue; + } else if (type == RDB_OPCODE_RESIZEDB) { + uint64_t db_size, expires_size; + if ((db_size = rm_rdbLoadLen(mobj, NULL)) == RDB_LENERR) return C_ERR; + if ((expires_size = rm_rdbLoadLen(mobj, NULL)) == RDB_LENERR) return C_ERR; + + continue; + } else if (type == RDB_OPCODE_AUX) { + robj *auxkey, *auxval; + + } else if (type == RDB_OPCODE_FUNCTION || type == RDB_OPCODE_FUNCTION2) { } } } - -int rdbLoadType(migrateObj *mobj) { - char buf[1]; - syncReadLine(mobj->source_cc->fd, buf, 1, mobj->timeout); - return buf[0]; +void *rm_rdbGenericLoadStringObject(migrateObj *mobj, int flags, size_t *lenptr) { + int encode = flags & RDB_LOAD_ENC; + int plain = flags & RDB_LOAD_PLAIN; + int sds = flags & RDB_LOAD_SDS; + int isencoded; + unsigned long long len; + } -time_t rdbLoadTime(migrateObj *mobj) { - -} \ No newline at end of file +uint64_t rm_rdbLoadLen(migrateObj *mobj, int *isencoded) { + uint64_t len; + + if (rdbLoadLenByRef(mobj, isencoded, &len) == -1) + return RDB_LENERR; + return len; +} + +int rm_rdbLoadType(migrateObj *mobj) { + unsigned char type; + if (read(mobj->source_cc->fd, &type, 1) == 0) { + return -1; + } + + return type; +} + +int rm_rdbLoadLenByRef(migrateObj *mobj, int *isencoded, uint64_t *lenptr) { + unsigned char buf[2]; + int type; + if (isencoded) + *isencoded = 0; + if (read(mobj->source_cc->fd, buf, 1) == 0) { + return -1; + } + type = (buf[0] & 0xC0) >> 6; + if (type == RDB_ENCVAL) { + if (isencoded) + *isencoded = 1; + *lenptr = buf[0] & 0x3F; + } else if (type == RDB_6BITLEN) { + /* Read a 6 bit len. */ + *lenptr = buf[0] & 0x3F; + } else if (type == RDB_14BITLEN) { + if (read(mobj->source_cc->fd, buf + 1, 1) == 0) + return -1; + *lenptr = ((buf[0] & 0x3F) << 8) | buf[1]; + } else if (buf[0] == RDB_32BITLEN) { + uint32_t len; + if (read(mobj->source_cc->fd, &len, 4) == 0) + return -1; + *lenptr = ntohl(len); + } else if (buf[0] == RDB_64BITLEN) { + uint64_t len; + if (read(mobj->source_cc->fd, &len, 8) == 0) + return -1; + *lenptr = ntohu64(len); + } else { + return -1; + } + return 0; +} + +time_t rm_rdbLoadTime(migrateObj *mobj) { + int32_t t32; + if (read(mobj->source_cc->fd, &t32, 4) == 0) { + return -1; + } + t32 = (char *)t32 + 4; + return (time_t)t32; +} + +long long rm_rdbLoadMillisecondTime(migrateObj *mobj, int rdbver) { + int64_t t64; + if (read(mobj->source_cc->fd, &t64, 8) == 0) { + return LLONG_MAX; + } + if (rdbver >= 9) + memrev64(&t64); + return (long long)t64; +} + +uint64_t rm_rdbLoadLen(migrateObj *mobj, int *isencoded) { +} + +void rm_memrev64(void *p) { + unsigned char *x = p, t; + + t = x[0]; + x[0] = x[7]; + x[7] = t; + t = x[1]; + x[1] = x[6]; + x[6] = t; + t = x[2]; + x[2] = x[5]; + x[5] = t; + t = x[3]; + x[3] = x[4]; + x[4] = t; +} diff --git a/src/rdbLoad.h b/src/rdbLoad.h index 83bc4ed..a9e0615 100644 --- a/src/rdbLoad.h +++ b/src/rdbLoad.h @@ -2,24 +2,49 @@ #define RDB_LOAD_REDIS_MIGRATE_H #include "redis-migrate.h" +#define RDB_6BITLEN 0 +#define RDB_14BITLEN 1 +#define RDB_32BITLEN 0x80 +#define RDB_64BITLEN 0x81 +#define RDB_ENCVAL 3 +#define RDB_LENERR UINT64_MAX #define RDB_VERSION 10 -#define RDB_OPCODE_FUNCTION2 245 /* function library data */ -#define RDB_OPCODE_FUNCTION 246 /* old function library data for 7.0 rc1 and rc2 */ -#define RDB_OPCODE_MODULE_AUX 247 /* Module auxiliary data. */ -#define RDB_OPCODE_IDLE 248 /* LRU idle time. */ -#define RDB_OPCODE_FREQ 249 /* LFU frequency. */ -#define RDB_OPCODE_AUX 250 /* RDB aux field. */ -#define RDB_OPCODE_RESIZEDB 251 /* Hash table resize hint. */ -#define RDB_OPCODE_EXPIRETIME_MS 252 /* Expire time in milliseconds. */ -#define RDB_OPCODE_EXPIRETIME 253 /* Old expire time in seconds. */ -#define RDB_OPCODE_SELECTDB 254 /* DB number of the following keys. */ -#define RDB_OPCODE_EOF 255 /* End of the RDB file. */ +#define RDB_OPCODE_FUNCTION2 245 /* function library data */ +#define RDB_OPCODE_FUNCTION 246 /* old function library data for 7.0 rc1 and rc2 */ +#define RDB_OPCODE_MODULE_AUX 247 /* Module auxiliary data. */ +#define RDB_OPCODE_IDLE 248 /* LRU idle time. */ +#define RDB_OPCODE_FREQ 249 /* LFU frequency. */ +#define RDB_OPCODE_AUX 250 /* RDB aux field. */ +#define RDB_OPCODE_RESIZEDB 251 /* Hash table resize hint. */ +#define RDB_OPCODE_EXPIRETIME_MS 252 /* Expire time in milliseconds. */ +#define RDB_OPCODE_EXPIRETIME 253 /* Old expire time in seconds. */ +#define RDB_OPCODE_SELECTDB 254 /* DB number of the following keys. */ +#define RDB_OPCODE_EOF 255 /* End of the RDB file. */ -int rdbLoadRioWithLoading(migrateObj *mobj); +#define RDB_LOAD_NONE 0 +#define RDB_LOAD_ENC (1 << 0) +#define RDB_LOAD_PLAIN (1 << 1) +#define RDB_LOAD_SDS (1 << 2) -int rdbLoadType(migrateObj *mobj); +#define LLONG_MAX __LONG_LONG_MAX__ -time_t rdbLoadTime(migrateObj *mobj); +int rm_rdbLoadRioWithLoading(migrateObj *mobj); + +int rm_rdbLoadType(migrateObj *mobj); + +time_t rm_rdbLoadTime(migrateObj *mobj); + +long long rm_rdbLoadMillisecondTime(migrateObj *mobj, int rdbver); + +uint64_t rm_rdbLoadLen(migrateObj *mobi, int *isencoded); + +int rm_rdbLoadLenByRef(migrateObj *mobi, int *isencoded, uint64_t *lenptr); + +void rm_memrev64(void *p); + +uint64_t rm_rdbLoadLen(migrateObj *mobj, int *isencoded); + +void *rm_rdbGenericLoadStringObject(migrateObj *mobj, int flags, size_t *lenptr); #endif diff --git a/src/redis-migrate.c b/src/redis-migrate.c index 06a3a90..bf8316b 100644 --- a/src/redis-migrate.c +++ b/src/redis-migrate.c @@ -8,8 +8,7 @@ static migrateObj *mobj; -migrateObj *createMigrateObject(robj *host, int port, int begin_slot, int end_slot) -{ +migrateObj *createMigrateObject(robj *host, int port, int begin_slot, int end_slot) { migrateObj *m; m = hi_malloc(sizeof(*m)); m->host = host->ptr; @@ -23,15 +22,13 @@ migrateObj *createMigrateObject(robj *host, int port, int begin_slot, int end_sl return m; } -void freeMigrateObj(migrateObj *m) -{ +void freeMigrateObj(migrateObj *m) { redisFree(m->source_cc); hi_free(m); mobj = NULL; } -long long ustime(void) -{ +long long ustime(void) { struct timeval tv; long long ust; @@ -42,27 +39,22 @@ long long ustime(void) } /* Return the UNIX time in milliseconds */ -mstime_t mstime(void) -{ +mstime_t mstime(void) { return ustime() / 1000; } -int sendSyncCommand() -{ - if (!mobj->isCache) - { +int sendSyncCommand() { + if (!mobj->isCache) { mobj->isCache = 1; mobj->psync_replid = "?"; memcpy(mobj->psync_offset, "-1", 3); } - if (redisAppendCommand(mobj->source_cc, "PSYNC %s %s", mobj->psync_replid, mobj->psync_offset) != REDIS_OK) - { + if (redisAppendCommand(mobj->source_cc, "PSYNC %s %s", mobj->psync_replid, mobj->psync_offset) != REDIS_OK) { serverLog(LL_WARNING, "append PSYNC %s %s failed ip:%s,port:%d, ", mobj->psync_replid, mobj->psync_offset, mobj->host, mobj->port); return 0; } - if (redisFlush(mobj->source_cc) != REDIS_OK) - { + if (redisFlush(mobj->source_cc) != REDIS_OK) { serverLog(LL_WARNING, "send PSYNC %s %s failed ip:%s,port:%d, ", mobj->psync_replid, mobj->psync_offset, mobj->host, mobj->port); return 0; @@ -71,56 +63,46 @@ int sendSyncCommand() return 1; } -sds redisReceive() -{ +sds redisReceive() { char buf[256]; - if (syncReadLine(mobj->source_cc->fd, buf, sizeof(buf), mobj->timeout) == -1) - { + if (syncReadLine(mobj->source_cc->fd, buf, sizeof(buf), mobj->timeout) == -1) { serverLog(LL_WARNING, "redisReceive failed ip:%s,port:%d ", mobj->host, mobj->port); return NULL; } return hi_sdsnew(buf); } -int receiveDataFromRedis() -{ +int receiveDataFromRedis() { sds reply = redisReceive(); - if (reply == NULL) - { + if (reply == NULL) { serverLog(LL_WARNING, "Master did not reply to PSYNC"); return PSYNC_TRY_LATER; } - if (sdslen(reply) == 0) - { + if (sdslen(reply) == 0) { sdsfree(reply); return PSYNC_WAIT_REPLY; } serverLog(LL_NOTICE, "reply=%s", reply); - if (!strncmp(reply, "+FULLRESYNC", 11)) - { + if (!strncmp(reply, "+FULLRESYNC", 11)) { char *replid = NULL, *offset = NULL; /* FULL RESYNC, parse the reply in order to extract the replid * and the replication offset. */ replid = strchr(reply, ' '); - if (replid) - { + if (replid) { replid++; offset = strchr(replid, ' '); if (offset) offset++; } - if (!replid || !offset || (offset - replid - 1) != CONFIG_RUN_ID_SIZE) - { + if (!replid || !offset || (offset - replid - 1) != CONFIG_RUN_ID_SIZE) { serverLog(LL_WARNING, "Master replied with wrong +FULLRESYNC syntax."); /* This is an unexpected condition, actually the +FULLRESYNC * reply means that the master supports PSYNC, but the reply * format seems wrong. To stay safe we blank the master * replid to make sure next PSYNCs will fail. */ memset(mobj->master_replid, 0, CONFIG_RUN_ID_SIZE + 1); - } - else - { + } else { memcpy(mobj->master_replid, replid, offset - replid - 1); mobj->master_replid[CONFIG_RUN_ID_SIZE] = '\0'; mobj->master_initial_offset = strtoll(offset, NULL, 10); @@ -130,8 +112,7 @@ int receiveDataFromRedis() sdsfree(reply); return PSYNC_FULLRESYNC; } - if (!strncmp(reply, "+CONTINUE", 9)) - { + if (!strncmp(reply, "+CONTINUE", 9)) { /* Partial resync was accepted. */ serverLog(LL_NOTICE, "Successful partial resynchronization with master."); @@ -144,8 +125,7 @@ int receiveDataFromRedis() char *end = reply + 9; while (end[0] != '\r' && end[0] != '\n' && end[0] != '\0') end++; - if (end - start == CONFIG_RUN_ID_SIZE) - { + if (end - start == CONFIG_RUN_ID_SIZE) { char new[CONFIG_RUN_ID_SIZE + 1]; memcpy(new, start, CONFIG_RUN_ID_SIZE); new[CONFIG_RUN_ID_SIZE] = '\0'; @@ -155,40 +135,31 @@ int receiveDataFromRedis() } } -void readFullData() -{ +void readFullData() { static char eofmark[CONFIG_RUN_ID_SIZE]; static char lastbytes[CONFIG_RUN_ID_SIZE]; static int usemark = 0; char buf[PROTO_IOBUF_LEN]; - if (mobj->repl_transfer_size == -1) - { + if (mobj->repl_transfer_size == -1) { int nread = syncReadLine(mobj->source_cc->fd, buf, PROTO_IOBUF_LEN, mobj->timeout); - if (nread == -1) - { + if (nread == -1) { serverLog(LL_WARNING, "read full data failed"); goto error; } - if (buf[0] == '-') - { + if (buf[0] == '-') { serverLog(LL_WARNING, "MASTER aborted replication with an error: %s", buf + 1); goto error; - } - else if (buf[0] == '\0') - { + } else if (buf[0] == '\0') { // mobj->repl_transfer_lastio = server.unixtime; return; - } - else if (buf[0] != '$') - { + } else if (buf[0] != '$') { serverLog(LL_WARNING, "Bad protocol from MASTER, the first byte is not '$' (we received '%s'), are you sure the host and port are right?", buf); goto error; } - if (strncmp(buf + 1, "EOF:", 4) == 0 && strlen(buf + 5) >= CONFIG_RUN_ID_SIZE) - { + if (strncmp(buf + 1, "EOF:", 4) == 0 && strlen(buf + 5) >= CONFIG_RUN_ID_SIZE) { usemark = 1; usemark = 1; memcpy(eofmark, buf + 5, CONFIG_RUN_ID_SIZE); @@ -197,9 +168,7 @@ void readFullData() * at the next call. */ mobj->repl_transfer_size = 0; serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: receiving streamed RDB from master with EOF to parser"); - } - else - { + } else { usemark = 0; mobj->repl_transfer_size = strtol(buf + 1, NULL, 10); serverLog(LL_NOTICE, @@ -209,27 +178,22 @@ void readFullData() } int flag = rdbLoadRioWithLoading(mobj); - return; error: cancelMigrate(); return; } -void cancelMigrate() -{ +void cancelMigrate() { } -void syncDataWithRedis(int fd, void *user_data, int mask) -{ +void syncDataWithRedis(int fd, void *user_data, int mask) { REDISMODULE_NOT_USED(fd); REDISMODULE_NOT_USED(mask); REDISMODULE_NOT_USED(user_data); sds err = NULL; - if (mobj->repl_stat == REPL_STATE_CONNECTING) - { - if (redisSendCommand(mobj->source_cc, "PING") != REDIS_OK) - { + if (mobj->repl_stat == REPL_STATE_CONNECTING) { + if (redisSendCommand(mobj->source_cc, "PING") != REDIS_OK) { serverLog(LL_WARNING, "send PING failed ip:%s,port:%d", mobj->host, mobj->port); goto error; @@ -237,20 +201,15 @@ void syncDataWithRedis(int fd, void *user_data, int mask) mobj->repl_stat = REPL_STATE_RECEIVE_PING_REPLY; return; } - if (mobj->repl_stat == REPL_STATE_RECEIVE_PING_REPLY) - { + if (mobj->repl_stat == REPL_STATE_RECEIVE_PING_REPLY) { err = redisReceive(); if (err == NULL) goto no_response_error; - if (err[0] != '+' && strncmp(err, "-NOAUTH", 7) != 0 && strncmp(err, "-NOPERM", 7) != 0 && - strncmp(err, "-ERR operation not permitted", 28) != 0) - { + if (err[0] != '+' && strncmp(err, "-NOAUTH", 7) != 0 && strncmp(err, "-NOPERM", 7) != 0 && strncmp(err, "-ERR operation not permitted", 28) != 0) { serverLog(LL_WARNING, "Error reply to PING from master: '%s'", err); sdsfree(err); goto error; - } - else - { + } else { serverLog(LL_NOTICE, "Master replied to PING, replication can continue..."); } sdsfree(err); @@ -258,17 +217,14 @@ void syncDataWithRedis(int fd, void *user_data, int mask) mobj->repl_stat = REPL_STATE_SEND_HANDSHAKE; return; } - if (mobj->repl_stat == REPL_STATE_SEND_HANDSHAKE) - { + if (mobj->repl_stat == REPL_STATE_SEND_HANDSHAKE) { // todo 增加认证 mobj->repl_stat = REPL_STATE_RECEIVE_AUTH_REPLY; } - if (mobj->repl_stat == REPL_STATE_RECEIVE_AUTH_REPLY) - { + if (mobj->repl_stat == REPL_STATE_RECEIVE_AUTH_REPLY) { // todo 接受认证信息 sds portstr = sdsfromlonglong(mobj->port); - if (redisSendCommand(mobj->source_cc, "REPLCONF listening-port %s", portstr) != REDIS_OK) - { + if (redisSendCommand(mobj->source_cc, "REPLCONF listening-port %s", portstr) != REDIS_OK) { serverLog(LL_WARNING, "send PING failed ip:%s,port:%d", mobj->host, mobj->port); goto error; @@ -277,19 +233,16 @@ void syncDataWithRedis(int fd, void *user_data, int mask) sdsfree(portstr); return; } - if (mobj->repl_stat == REPL_STATE_RECEIVE_PORT_REPLY) - { + if (mobj->repl_stat == REPL_STATE_RECEIVE_PORT_REPLY) { err = redisReceive(); if (err == NULL) goto no_response_error; - if (err[0] == '-') - { + if (err[0] == '-') { serverLog(LL_NOTICE, "(Non critical) Master does not understand REPLCONF listening-port: %s", err); goto error; } serverLog(LL_NOTICE, "REPLCONF listening-port success"); - if (redisSendCommand(mobj->source_cc, "REPLCONF ip-address %s", mobj->host) != REDIS_OK) - { + if (redisSendCommand(mobj->source_cc, "REPLCONF ip-address %s", mobj->host) != REDIS_OK) { serverLog(LL_WARNING, "REPLCONF ip-address %s failed", mobj->host); goto error; } @@ -298,19 +251,16 @@ void syncDataWithRedis(int fd, void *user_data, int mask) mobj->repl_stat = REPL_STATE_RECEIVE_IP_REPLY; return; } - if (mobj->repl_stat == REPL_STATE_RECEIVE_IP_REPLY) - { + if (mobj->repl_stat == REPL_STATE_RECEIVE_IP_REPLY) { err = redisReceive(); if (err == NULL) goto no_response_error; - if (err[0] == '-') - { + if (err[0] == '-') { serverLog(LL_NOTICE, "(Non critical) Master does not understand REPLCONF ip-address: %s", err); goto error; } serverLog(LL_NOTICE, "REPLCONF REPLCONF ip-address success"); - if (redisSendCommand(mobj->source_cc, "REPLCONF %s %s %s %s", "capa", "eof", "capa", "psync2") != REDIS_OK) - { + if (redisSendCommand(mobj->source_cc, "REPLCONF %s %s %s %s", "capa", "eof", "capa", "psync2") != REDIS_OK) { serverLog(LL_WARNING, "send REPLCONF capa eof capa psync2 failed"); goto error; } @@ -319,13 +269,11 @@ void syncDataWithRedis(int fd, void *user_data, int mask) mobj->repl_stat = REPL_STATE_RECEIVE_CAPA_REPLY; return; } - if (mobj->repl_stat == REPL_STATE_RECEIVE_CAPA_REPLY) - { + if (mobj->repl_stat == REPL_STATE_RECEIVE_CAPA_REPLY) { err = redisReceive(); if (err == NULL) goto no_response_error; - if (err[0] == '-') - { + if (err[0] == '-') { serverLog(LL_NOTICE, "(Non critical) Master does not understand REPLCONF capa: %s", err); goto error; } @@ -335,10 +283,8 @@ void syncDataWithRedis(int fd, void *user_data, int mask) mobj->repl_stat = REPL_STATE_SEND_PSYNC; return; } - if (mobj->repl_stat == REPL_STATE_SEND_PSYNC) - { - if (!sendSyncCommand()) - { + if (mobj->repl_stat == REPL_STATE_SEND_PSYNC) { + if (!sendSyncCommand()) { serverLog(LL_WARNING, "send PSYNC %s %s failed ip:%s,port:%d, ", mobj->psync_replid, mobj->psync_offset, mobj->host, mobj->port); goto error; @@ -346,8 +292,7 @@ void syncDataWithRedis(int fd, void *user_data, int mask) mobj->repl_stat = REPL_STATE_RECEIVE_PSYNC_REPLY; return; } - if (mobj->repl_stat != REPL_STATE_RECEIVE_PSYNC_REPLY) - { + if (mobj->repl_stat != REPL_STATE_RECEIVE_PSYNC_REPLY) { serverLog(LL_WARNING, "syncDataWithRedis(): state machine error, state should be RECEIVE_PSYNC but is %d", mobj->repl_stat); goto error; @@ -357,16 +302,14 @@ void syncDataWithRedis(int fd, void *user_data, int mask) return; if (psync_result == PSYNC_TRY_LATER) goto error; - if (psync_result == PSYNC_CONTINUE) - { + if (psync_result == PSYNC_CONTINUE) { } // 接受全部数据 serverLog(LL_NOTICE, "begin receive full data"); readFullData(); return; error: - if (err != NULL) - { + if (err != NULL) { sdsfree(err); } freeMigrateObj(mobj); @@ -382,14 +325,11 @@ no_response_error: /* Handle receiveSynchronousResponse() error when master has * @param argc * @return */ -int rm_migrateCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) -{ - if (argc != 5) - { +int rm_migrateCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + if (argc != 5) { return RedisModule_WrongArity(ctx); } - if (RedisModule_IsKeysPositionRequest(ctx)) - { + if (RedisModule_IsKeysPositionRequest(ctx)) { RedisModule_Log(ctx, VERBOSE, "get keys from module"); return RedisModule_ReplyWithSimpleString(ctx, "OK"); } @@ -399,8 +339,7 @@ int rm_migrateCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) robj *end_slot = (robj *)argv[4]; RedisModule_Log(ctx, NOTICE, "host:%s, port:%s, begin:%s, end:%s", (char *)host->ptr, (char *)port->ptr, (char *)begin_slot->ptr, (char *)end_slot->ptr); - if (mobj != NULL) - { + if (mobj != NULL) { return RedisModule_ReplyWithError(ctx, "migrating, please waiting"); } mobj = createMigrateObject(host, atoi(port->ptr), atoi(begin_slot->ptr), atoi(end_slot->ptr)); @@ -409,8 +348,7 @@ int rm_migrateCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) REDIS_OPTIONS_SET_TCP(&options, (const char *)mobj->host, mobj->port); options.connect_timeout = &timeout; mobj->source_cc = redisConnectWithOptions(&options); - if (mobj->source_cc == NULL || mobj->source_cc->err) - { + if (mobj->source_cc == NULL || mobj->source_cc->err) { RedisModule_Log(ctx, WARNING, "Could not connect to Redis at ip:%s,port:%d, error:%s", mobj->host, mobj->port, mobj->source_cc->errstr); freeMigrateObj(mobj); @@ -421,29 +359,24 @@ int rm_migrateCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) return RedisModule_ReplyWithSimpleString(ctx, "OK"); } -int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) -{ - +int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { REDISMODULE_NOT_USED(argv); REDISMODULE_NOT_USED(argc); int flag = RedisModule_Init(ctx, MODULE_NAME, REDIS_MIGRATE_VERSION, REDISMODULE_APIVER_1); - if (flag == REDISMODULE_ERR) - { + if (flag == REDISMODULE_ERR) { return REDISMODULE_ERR; } RedisModule_Log(ctx, NOTICE, "begin init commands of %s", MODULE_NAME); flag = RedisModule_CreateCommand(ctx, "rm.migrate", rm_migrateCommand, "write deny-oom admin getkeys-api", 0, 0, 0); - if (flag == REDISMODULE_ERR) - { + if (flag == REDISMODULE_ERR) { RedisModule_Log(ctx, WARNING, "init rm.migrate failed"); return REDISMODULE_ERR; } RedisModuleCallReply *reply = RedisModule_Call(ctx, "config", "cc", "get", "logfile"); long long items = RedisModule_CallReplyLength(reply); - if (items != 2) - { + if (items != 2) { RedisModule_Log(ctx, WARNING, "logfile is empty"); return REDISMODULE_ERR; }