From f8c1d0f251cde488b40cdb80fc7467c801e4b53f Mon Sep 17 00:00:00 2001 From: zeekling Date: Sat, 2 Jul 2022 12:15:36 +0800 Subject: [PATCH 1/5] add sync --- src/rdbLoad.c | 170 ++++++++++++++++++++++++++++++++++++++++++++------ src/rdbLoad.h | 36 ++++++++--- 2 files changed, 181 insertions(+), 25 deletions(-) diff --git a/src/rdbLoad.c b/src/rdbLoad.c index e7c863a..10e1ac5 100644 --- a/src/rdbLoad.c +++ b/src/rdbLoad.c @@ -1,4 +1,5 @@ #include "rdbLoad.h" +#include "sds.h" #include #include @@ -7,13 +8,13 @@ int rdbLoadRioWithLoading(migrateObj *mobj) { char buf[1024]; int error; if (syncReadLine(mobj->source_cc->fd, buf, 9, mobj->timeout) == -1) { - serverLog(LL_WARNING, "read version failed:%s,port:%d ", mobj->host, + serverLog(LL_WARNING, "[rm]read version failed:%s,port:%d ", mobj->host, mobj->port); return C_ERR; } buf[9] = '\0'; if (memcmp(buf, "REDIS", 5) != 0) { - serverLog(LL_WARNING, "Wrong signature trying to load DB from file"); + serverLog(LL_WARNING, "[rm] Wrong signature trying to load DB from file"); errno = EINVAL; return C_ERR; } @@ -25,25 +26,24 @@ int rdbLoadRioWithLoading(migrateObj *mobj) { errno = EINVAL; return C_ERR; } - serverLog(LL_NOTICE, "buf=%s", buf); long long lru_idle = -1, lfu_freq = -1, expiretime = -1, now = mstime(); while (1) { sds key; robj *val; - if ((type = rm_rdbLoadType(mobj)) == -1) { + if ((type = rmLoadType(mobj)) == -1) { serverLog(LL_WARNING, "read type failed"); return C_ERR; } if (type == RDB_OPCODE_EXPIRETIME) { - expiretime = rm_rdbLoadTime(mobj); + expiretime = rmLoadTime(mobj); if (expiretime == -1) { return C_ERR; } expiretime *= 1000; continue; } else if (type == RDB_OPCODE_EXPIRETIME_MS) { - expiretime = rm_rdbLoadMillisecondTime(mobj, rdbver); + expiretime = rmLoadMillisecondTime(mobj, rdbver); continue; } else if (type == RDB_OPCODE_FREQ) { uint8_t byte; @@ -53,47 +53,181 @@ int rdbLoadRioWithLoading(migrateObj *mobj) { continue; } else if (type == RDB_OPCODE_IDLE) { uint64_t qword; - if ((qword = rm_rdbLoadLen(mobj, NULL)) == RDB_LENERR) + if ((qword = rmLoadLen(mobj, NULL)) == RDB_LENERR) return C_ERR; lru_idle = qword; continue; } else if (type == RDB_OPCODE_EOF) { break; } else if (type == RDB_OPCODE_SELECTDB) { - if ((dbid = rm_rdbLoadLen(mobj, NULL)) == RDB_LENERR) return C_ERR; + if ((dbid = rmLoadLen(mobj, NULL)) == RDB_LENERR) return C_ERR; continue; } else if (type == RDB_OPCODE_RESIZEDB) { uint64_t db_size, expires_size; - if ((db_size = rm_rdbLoadLen(mobj, NULL)) == RDB_LENERR) return C_ERR; - if ((expires_size = rm_rdbLoadLen(mobj, NULL)) == RDB_LENERR) return C_ERR; + if ((db_size = rmLoadLen(mobj, NULL)) == RDB_LENERR) return C_ERR; + if ((expires_size = rmLoadLen(mobj, NULL)) == RDB_LENERR) return C_ERR; continue; } else if (type == RDB_OPCODE_AUX) { robj *auxkey, *auxval; + if ((auxkey = rmLoadStringObject(mobj)) == NULL) { + return NULL; + } + if ((auxval = rmLoadStringObject(mobj)) == NULL) { + decrRefCount(auxkey); + return NULL; + } + if (((char *)auxkey->ptr)[0] == '%') { + serverLog(LL_NOTICE, "RDB '%s': %s", (char *)auxkey->ptr, (char *)auxval->ptr); + } else if (!strcasecmp(auxkey->ptr, "repl-stream-db")) { + serverLog(LL_NOTICE, "repl-stream-db: %s", auxval->ptr); + } else if (!strcasecmp(auxkey->ptr, "repl-id")) { + memcmp(mobj->psync_replid, auxval->ptr, CONFIG_RUN_ID_SIZE + 1); + serverLog(LL_NOTICE, "repl-id: %s", auxval->ptr); + } else if (!strcasecmp(auxkey->ptr, "repl-offset")) { + serverLog(LL_NOTICE, "repl-offset: %s", auxval->ptr); + } else if (!strcasecmp(auxkey->ptr, "lua") || !strcasecmp(auxkey->ptr, "aof-base") + || !strcasecmp(auxkey->ptr, "redis-bits") || !strcasecmp(auxkey->ptr, "aof-preamble")) { + // do nothing + } else if (!strcasecmp(auxkey->ptr, "redis-ver")) { + serverLog(LL_NOTICE, "redis-ver:%s", auxval->ptr); + } else if (!strcasecmp(auxkey->ptr, "ctime")) { + time_t age = time(NULL) - strtol(auxval->ptr, NULL, 10); + if (age < 0) age = 0; + serverLog(LL_NOTICE, "RDB age %ld seconds", (unsigned long)age); + } else if (!strcasecmp(auxkey->ptr, "used-mem")) { + long long usedmem = strtoll(auxval->ptr, NULL, 10); + serverLog(LL_NOTICE, "RDB memory usage when created %.2f Mb", (double)usedmem / (1024 * 1024)); + } else { + serverLog(LL_DEBUG, "Unrecognized RDB AUX field: '%s'", (char *)auxkey->ptr); + } + continue; } else if (type == RDB_OPCODE_FUNCTION || type == RDB_OPCODE_FUNCTION2) { + continue; + } + if ((key = rmGenericLoadStringObject(mobj, RDB_LOAD_SDS, NULL)) == NULL) { + return NULL; } } } -void *rm_rdbGenericLoadStringObject(migrateObj *mobj, int flags, size_t *lenptr) { +robj *rmLoadStringObject(migrateObj *mobj) { + return rmGenericLoadStringObject(mobj, RDB_LOAD_NONE, NULL); +} + +void *rmGenericLoadStringObject(migrateObj *mobj, int flags, size_t *lenptr) { int encode = flags & RDB_LOAD_ENC; int plain = flags & RDB_LOAD_PLAIN; int sds = flags & RDB_LOAD_SDS; int isencoded; unsigned long long len; - + len = rmLoadLen(mobj, &isencoded); + if (len == RDB_LENERR) return NULL; + if (isencoded) { + switch (len) { + case RDB_ENC_INT8: + case RDB_ENC_INT16: + case RDB_ENC_INT32: + return rmLoadIntegerObject(mobj, len, flags, lenptr); + case RDB_ENC_LZF: + return rmLoadLzfStringObject(mobj, flags, lenptr); + default: + serverLog(LL_WARNING, "Unknown RDB string encoding type %llu", len); + return NULL; + } + } } -uint64_t rm_rdbLoadLen(migrateObj *mobj, int *isencoded) { +void *rmLoadIntegerObject(migrateObj *mobj, int enctype, int flags, size_t *lenptr) { + int plain = flags & RDB_LOAD_PLAIN; + int sds = flags & RDB_LOAD_SDS; + int encode = flags & RDB_LOAD_ENC; + unsigned char enc[4]; + long long val; + if (enctype == RDB_ENC_INT8) { + if (read(mobj->source_cc->fd, enc, 1) == 0) return NULL; + val = (signed char)enc[0]; + } else if (enctype == RDB_ENC_INT16) { + uint16_t v; + if (read(mobj->source_cc->fd, enc, 2) == 0) return NULL; + v = ((uint32_t)enc[0]) | ((uint32_t)enc[1] << 8); + val = (int16_t)v; + } else if (enctype == RDB_ENC_INT32) { + uint32_t v; + if (read(mobj->source_cc->fd, enc, 4) == 0) return NULL; + v = ((uint32_t)enc[0]) | ((uint32_t)enc[1] << 8) | ((uint32_t)enc[2] << 16) | ((uint32_t)enc[3] << 24); + val = (int32_t)v; + } else { + // rdbReportCorruptRDB("Unknown RDB integer encoding type %d", enctype); + return NULL; /* Never reached. */ + } + if (plain || sds) { + char buf[LONG_STR_SIZE], *p; + int len = ll2string(buf, sizeof(buf), val); + if (lenptr) *lenptr = len; + p = plain ? zmalloc(len) : sdsnewlen(SDS_NOINIT, len); + memcpy(p, buf, len); + return p; + } else if (encode) { + return createStringObjectFromLongLongForValue(val); + } else { + return createObject(OBJ_STRING, sdsfromlonglong(val)); + } +} + +void *rmLoadLzfStringObject(migrateObj *mobj, int flags, size_t *lenptr) { + int plain = flags & RDB_LOAD_PLAIN; + int sds = flags & RDB_LOAD_SDS; + uint64_t len, clen; + unsigned char *c = NULL; + char *val = NULL; + if ((clen = rmLoadLen(mobj, NULL)) == RDB_LENERR) return NULL; + if ((len = rmLoadLen(mobj, NULL)) == RDB_LENERR) return NULL; + if ((c = hi_malloc(clen)) == NULL) { + serverLog(LL_WARNING, "rdbLoadLzfStringObject failed allocating %llu bytes", (unsigned long long)clen); + goto err; + } + if (plain) { + val = hi_malloc(len); + } else { + val = hi_sdsnewlen(SDS_NOINIT, len); + } + if (!val) { + serverLog(LL_WARNING, "rdbLoadLzfStringObject failed allocating %llu bytes", (unsigned long long)len); + goto err; + } + if (lenptr) *lenptr = len; + if (read(mobj->source_cc->fd, c, len) == 0) return NULL; + if (lzf_decompress(c, clen, val, len) != len) { + serverLog(LL_WARNING, "Invalid LZF compressed string"); + goto err; + } + zfree(c); + + if (plain || sds) { + return val; + } else { + return createObject(OBJ_STRING, val); + } +err: + zfree(c); + if (plain) + zfree(val); + else + sdsfree(val); + return NULL; +} + +uint64_t rmLoadLen(migrateObj *mobj, int *isencoded) { uint64_t len; - if (rdbLoadLenByRef(mobj, isencoded, &len) == -1) + if (rmLoadLenByRef(mobj, isencoded, &len) == -1) return RDB_LENERR; return len; } -int rm_rdbLoadType(migrateObj *mobj) { +int rmLoadType(migrateObj *mobj) { unsigned char type; if (read(mobj->source_cc->fd, &type, 1) == 0) { return -1; @@ -102,7 +236,7 @@ int rm_rdbLoadType(migrateObj *mobj) { return type; } -int rm_rdbLoadLenByRef(migrateObj *mobj, int *isencoded, uint64_t *lenptr) { +int rmLoadLenByRef(migrateObj *mobj, int *isencoded, uint64_t *lenptr) { unsigned char buf[2]; int type; if (isencoded) @@ -138,7 +272,7 @@ int rm_rdbLoadLenByRef(migrateObj *mobj, int *isencoded, uint64_t *lenptr) { return 0; } -time_t rm_rdbLoadTime(migrateObj *mobj) { +time_t rmLoadTime(migrateObj *mobj) { int32_t t32; if (read(mobj->source_cc->fd, &t32, 4) == 0) { return -1; @@ -147,7 +281,7 @@ time_t rm_rdbLoadTime(migrateObj *mobj) { return (time_t)t32; } -long long rm_rdbLoadMillisecondTime(migrateObj *mobj, int rdbver) { +long long rmLoadMillisecondTime(migrateObj *mobj, int rdbver) { int64_t t64; if (read(mobj->source_cc->fd, &t64, 8) == 0) { return LLONG_MAX; diff --git a/src/rdbLoad.h b/src/rdbLoad.h index bdaccec..c61f279 100644 --- a/src/rdbLoad.h +++ b/src/rdbLoad.h @@ -27,22 +27,44 @@ #define RDB_LOAD_PLAIN (1 << 1) #define RDB_LOAD_SDS (1 << 2) +#define RDB_ENC_INT8 0 /* 8 bit signed integer */ +#define RDB_ENC_INT16 1 /* 16 bit signed integer */ +#define RDB_ENC_INT32 2 /* 32 bit signed integer */ +#define RDB_ENC_LZF 3 /* string compressed with FASTLZ */ + +#define LONG_STR_SIZE 21 +extern const char *SDS_NOINIT; +#define OBJ_STRING 0 /* String object. */ +#define OBJ_LIST 1 /* List object. */ +#define OBJ_SET 2 /* Set object. */ +#define OBJ_ZSET 3 /* Sorted set object. */ +#define OBJ_HASH 4 /* Hash object. */ + +#define htonu64(v) intrev64(v) +#define ntohu64(v) intrev64(v) + #define LLONG_MAX __LONG_LONG_MAX__ -int rm_rdbLoadRioWithLoading(migrateObj *mobj); +int rmLoadRioWithLoading(migrateObj *mobj); -int rm_rdbLoadType(migrateObj *mobj); +int rmLoadType(migrateObj *mobj); -time_t rm_rdbLoadTime(migrateObj *mobj); +time_t rmLoadTime(migrateObj *mobj); -long long rm_rdbLoadMillisecondTime(migrateObj *mobj, int rdbver); +long long rmLoadMillisecondTime(migrateObj *mobj, int rdbver); -int rm_rdbLoadLenByRef(migrateObj *mobi, int *isencoded, uint64_t *lenptr); +int rmLoadLenByRef(migrateObj *mobi, int *isencoded, uint64_t *lenptr); void rm_memrev64(void *p); -uint64_t rm_rdbLoadLen(migrateObj *mobj, int *isencoded); +uint64_t rmLoadLen(migrateObj *mobj, int *isencoded); -void *rm_rdbGenericLoadStringObject(migrateObj *mobj, int flags, size_t *lenptr); +robj *rmLoadStringObject(migrateObj *mobj); + +void *rmGenericLoadStringObject(migrateObj *mobj, int flags, size_t *lenptr); + +void *rmLoadIntegerObject(migrateObj *mobj, int enctype, int flags, size_t *lenptr); + +void *rmLoadLzfStringObject(migrateObj *mobj, int flags, size_t *lenptr); #endif -- 2.45.2 From fb5d3a725e0473d3315f128f3d4a4cb0f3e307c3 Mon Sep 17 00:00:00 2001 From: zeekling Date: Sat, 9 Jul 2022 15:44:53 +0800 Subject: [PATCH 2/5] add sync --- src/rdbLoad.c | 27 +++++++++++++++++++++++++-- src/rdbLoad.h | 4 ++++ src/redis-migrate.c | 30 +++++++++++++++++++++--------- src/redis-migrate.h | 2 ++ 4 files changed, 52 insertions(+), 11 deletions(-) diff --git a/src/rdbLoad.c b/src/rdbLoad.c index 10e1ac5..7eea071 100644 --- a/src/rdbLoad.c +++ b/src/rdbLoad.c @@ -4,7 +4,7 @@ #include #include -int rdbLoadRioWithLoading(migrateObj *mobj) { +int rmLoadRioWithLoading(migrateObj *mobj) { char buf[1024]; int error; if (syncReadLine(mobj->source_cc->fd, buf, 9, mobj->timeout) == -1) { @@ -13,6 +13,7 @@ int rdbLoadRioWithLoading(migrateObj *mobj) { return C_ERR; } buf[9] = '\0'; + serverLog(LL_NOTICE, "version:%s", buf); if (memcmp(buf, "REDIS", 5) != 0) { serverLog(LL_WARNING, "[rm] Wrong signature trying to load DB from file"); errno = EINVAL; @@ -34,6 +35,7 @@ int rdbLoadRioWithLoading(migrateObj *mobj) { serverLog(LL_WARNING, "read type failed"); return C_ERR; } + serverLog(LL_NOTICE, "type: %d", type); if (type == RDB_OPCODE_EXPIRETIME) { expiretime = rmLoadTime(mobj); @@ -109,9 +111,22 @@ int rdbLoadRioWithLoading(migrateObj *mobj) { if ((key = rmGenericLoadStringObject(mobj, RDB_LOAD_SDS, NULL)) == NULL) { return NULL; } + val = rmLoadObject(type, mobj, key, &error); + + if (val == NULL) { + } } } +robj *rmLoadObject(int rdbtype, migrateObj *mobj, sds key, int *error) { + int dbid = 0; + robj *o = NULL, *ele, *dec; + uint64_t len; + unsigned int i; + + return o; +} + robj *rmLoadStringObject(migrateObj *mobj) { return rmGenericLoadStringObject(mobj, RDB_LOAD_NONE, NULL); } @@ -229,13 +244,21 @@ uint64_t rmLoadLen(migrateObj *mobj, int *isencoded) { int rmLoadType(migrateObj *mobj) { unsigned char type; - if (read(mobj->source_cc->fd, &type, 1) == 0) { + if (rmRead(mobj, &type, 1) == -1) { return -1; } return type; } +int rmRead(migrateObj *mobj, void *buf, size_t len) { + if (read(mobj->source_cc->fd, &buf, len) == 0) { + return -1; + } + // buf = (char*)buf + len; + return 1; +} + int rmLoadLenByRef(migrateObj *mobj, int *isencoded, uint64_t *lenptr) { unsigned char buf[2]; int type; diff --git a/src/rdbLoad.h b/src/rdbLoad.h index c61f279..159fccc 100644 --- a/src/rdbLoad.h +++ b/src/rdbLoad.h @@ -45,6 +45,8 @@ extern const char *SDS_NOINIT; #define LLONG_MAX __LONG_LONG_MAX__ +int rmRead(migrateObj *mobj, void *buf, size_t len); + int rmLoadRioWithLoading(migrateObj *mobj); int rmLoadType(migrateObj *mobj); @@ -67,4 +69,6 @@ void *rmLoadIntegerObject(migrateObj *mobj, int enctype, int flags, size_t *lenp void *rmLoadLzfStringObject(migrateObj *mobj, int flags, size_t *lenptr); +robj *rmLoadObject(int rdbtype, migrateObj *mobj, sds key, int *error); + #endif diff --git a/src/redis-migrate.c b/src/redis-migrate.c index bf8316b..960e835 100644 --- a/src/redis-migrate.c +++ b/src/redis-migrate.c @@ -176,7 +176,7 @@ void readFullData() { (long long)mobj->repl_transfer_size); } } - int flag = rdbLoadRioWithLoading(mobj); + int flag = rmLoadRioWithLoading(mobj); return; error: @@ -297,16 +297,28 @@ void syncDataWithRedis(int fd, void *user_data, int mask) { mobj->repl_stat); goto error; } - int psync_result = receiveDataFromRedis(); - if (psync_result == PSYNC_WAIT_REPLY) - return; - if (psync_result == PSYNC_TRY_LATER) - goto error; - if (psync_result == PSYNC_CONTINUE) { + if (mobj->repl_stat == REPL_STATE_RECEIVE_PSYNC_REPLY) { + int psync_result = receiveDataFromRedis(); + if (psync_result == PSYNC_WAIT_REPLY) + return; + if (psync_result == PSYNC_TRY_LATER) + goto error; + if (psync_result == PSYNC_CONTINUE) { + mobj->repl_stat = REPL_STATE_CONTINUE_SYNC; + } else { + mobj->repl_stat = REPL_STATE_FULL_SYNC; + } } + // 接受全部数据 - serverLog(LL_NOTICE, "begin receive full data"); - readFullData(); + if (mobj->repl_stat == REPL_STATE_FULL_SYNC) { + serverLog(LL_NOTICE, "begin receive full data"); + readFullData(); + } + if (mobj->repl_stat == REPL_STATE_CONTINUE_SYNC) { + serverLog(LL_NOTICE, "begin receive continue data"); + } + return; error: if (err != NULL) { diff --git a/src/redis-migrate.h b/src/redis-migrate.h index 43958d2..832bd02 100644 --- a/src/redis-migrate.h +++ b/src/redis-migrate.h @@ -68,6 +68,8 @@ typedef enum REPL_STATE_RECEIVE_CAPA_REPLY, /* Wait for REPLCONF reply */ REPL_STATE_SEND_PSYNC, /* Send PSYNC */ REPL_STATE_RECEIVE_PSYNC_REPLY, /* Wait for PSYNC reply */ + REPL_STATE_FULL_SYNC, + REPL_STATE_CONTINUE_SYNC, /* --- End of handshake states --- */ REPL_STATE_TRANSFER, /* Receiving .rdb from master */ REPL_STATE_CONNECTED, /* Connected to master */ -- 2.45.2 From a5551d371aafdc25e2e1fedae4c75af5e9fad35f Mon Sep 17 00:00:00 2001 From: zeekling Date: Sat, 9 Jul 2022 22:48:56 +0800 Subject: [PATCH 3/5] add sync --- src/rdbLoad.c | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/src/rdbLoad.c b/src/rdbLoad.c index 7eea071..890c6ce 100644 --- a/src/rdbLoad.c +++ b/src/rdbLoad.c @@ -28,13 +28,21 @@ int rmLoadRioWithLoading(migrateObj *mobj) { return C_ERR; } long long lru_idle = -1, lfu_freq = -1, expiretime = -1, now = mstime(); + + // syncReadLine(mobj->source_cc->fd, buf, 4, mobj->timeout); + // buf[4] = '\0'; + // serverLog(LL_NOTICE, "buf:%s", buf); + // type = atoi(buf); + // serverLog(LL_NOTICE, "type:%d", type); + char type_buf[1]; while (1) { sds key; robj *val; - if ((type = rmLoadType(mobj)) == -1) { + if (read(mobj->source_cc->fd, &type_buf, 4) == 0) { serverLog(LL_WARNING, "read type failed"); return C_ERR; } + int type = type_buf[0] & 0xff; serverLog(LL_NOTICE, "type: %d", type); if (type == RDB_OPCODE_EXPIRETIME) { @@ -244,7 +252,7 @@ uint64_t rmLoadLen(migrateObj *mobj, int *isencoded) { int rmLoadType(migrateObj *mobj) { unsigned char type; - if (rmRead(mobj, &type, 1) == -1) { + if (rmRead(mobj, &type, 1) == 0) { return -1; } @@ -253,9 +261,9 @@ int rmLoadType(migrateObj *mobj) { int rmRead(migrateObj *mobj, void *buf, size_t len) { if (read(mobj->source_cc->fd, &buf, len) == 0) { - return -1; + return 0; } - // buf = (char*)buf + len; + buf = (char*)buf + len; return 1; } -- 2.45.2 From 7258e809894a21b99a95c05c6d4984791c663add Mon Sep 17 00:00:00 2001 From: zeekling Date: Sun, 10 Jul 2022 01:05:24 +0800 Subject: [PATCH 4/5] add sync --- src/rdbLoad.c | 42 ++++++++++++++++++++++++++++++++++++------ src/redis-migrate.c | 11 +++-------- src/redis-migrate.h | 1 + 3 files changed, 40 insertions(+), 14 deletions(-) diff --git a/src/rdbLoad.c b/src/rdbLoad.c index 890c6ce..f99c081 100644 --- a/src/rdbLoad.c +++ b/src/rdbLoad.c @@ -13,7 +13,7 @@ int rmLoadRioWithLoading(migrateObj *mobj) { return C_ERR; } buf[9] = '\0'; - serverLog(LL_NOTICE, "version:%s", buf); + serverLog(LL_NOTICE, "[rm] version=%s", buf); if (memcmp(buf, "REDIS", 5) != 0) { serverLog(LL_WARNING, "[rm] Wrong signature trying to load DB from file"); errno = EINVAL; @@ -30,19 +30,19 @@ int rmLoadRioWithLoading(migrateObj *mobj) { long long lru_idle = -1, lfu_freq = -1, expiretime = -1, now = mstime(); // syncReadLine(mobj->source_cc->fd, buf, 4, mobj->timeout); - // buf[4] = '\0'; + // buf[4] = '\0'; // serverLog(LL_NOTICE, "buf:%s", buf); // type = atoi(buf); // serverLog(LL_NOTICE, "type:%d", type); - char type_buf[1]; + char type_buf[2]; while (1) { sds key; robj *val; - if (read(mobj->source_cc->fd, &type_buf, 4) == 0) { + if (read(mobj->source_cc->fd, &type_buf, 2) == 0) { serverLog(LL_WARNING, "read type failed"); return C_ERR; } - int type = type_buf[0] & 0xff; + int type = type_buf[1] & 0xff; serverLog(LL_NOTICE, "type: %d", type); if (type == RDB_OPCODE_EXPIRETIME) { @@ -80,6 +80,7 @@ int rmLoadRioWithLoading(migrateObj *mobj) { continue; } else if (type == RDB_OPCODE_AUX) { robj *auxkey, *auxval; + serverLog(LL_NOTICE, "type=RDB_OPCODE_AUX"); if ((auxkey = rmLoadStringObject(mobj)) == NULL) { return NULL; } @@ -146,6 +147,7 @@ void *rmGenericLoadStringObject(migrateObj *mobj, int flags, size_t *lenptr) { int isencoded; unsigned long long len; len = rmLoadLen(mobj, &isencoded); + serverLog(LL_NOTICE, "len=%d, isencoded=%d", len, isencoded); if (len == RDB_LENERR) return NULL; if (isencoded) { switch (len) { @@ -160,6 +162,34 @@ void *rmGenericLoadStringObject(migrateObj *mobj, int flags, size_t *lenptr) { return NULL; } } + if (plain || sds) { + void *buf = plain ? ztrymalloc(len) : sdstrynewlen(SDS_NOINIT, len); + if (!buf) { + serverLog(LL_WARNING, "rdbGenericLoadStringObject failed allocating %llu bytes", len); + return NULL; + } + if (lenptr) *lenptr = len; + if (len && read(mobj->source_cc->fd, buf, len) == 0) { + if (plain) + zfree(buf); + else + sdsfree(buf); + return NULL; + } + return buf; + } else { + robj *o = encode ? tryCreateStringObject(SDS_NOINIT, len) : + tryCreateRawStringObject(SDS_NOINIT, len); + if (!o) { + serverLog(LL_WARNING, "rdbGenericLoadStringObject failed allocating %llu bytes", len); + return NULL; + } + if (len && read(mobj->source_cc->fd, o->ptr, len)) { + decrRefCount(o); + return NULL; + } + return o; + } } void *rmLoadIntegerObject(migrateObj *mobj, int enctype, int flags, size_t *lenptr) { @@ -263,7 +293,7 @@ int rmRead(migrateObj *mobj, void *buf, size_t len) { if (read(mobj->source_cc->fd, &buf, len) == 0) { return 0; } - buf = (char*)buf + len; + buf = (char *)buf + len; return 1; } diff --git a/src/redis-migrate.c b/src/redis-migrate.c index 960e835..bbfce48 100644 --- a/src/redis-migrate.c +++ b/src/redis-migrate.c @@ -147,9 +147,7 @@ void readFullData() { goto error; } if (buf[0] == '-') { - serverLog(LL_WARNING, - "MASTER aborted replication with an error: %s", - buf + 1); + serverLog(LL_WARNING, "MASTER aborted replication with an error: %s", buf + 1); goto error; } else if (buf[0] == '\0') { // mobj->repl_transfer_lastio = server.unixtime; @@ -292,11 +290,7 @@ void syncDataWithRedis(int fd, void *user_data, int mask) { mobj->repl_stat = REPL_STATE_RECEIVE_PSYNC_REPLY; return; } - if (mobj->repl_stat != REPL_STATE_RECEIVE_PSYNC_REPLY) { - serverLog(LL_WARNING, "syncDataWithRedis(): state machine error, state should be RECEIVE_PSYNC but is %d", - mobj->repl_stat); - goto error; - } + if (mobj->repl_stat == REPL_STATE_RECEIVE_PSYNC_REPLY) { int psync_result = receiveDataFromRedis(); if (psync_result == PSYNC_WAIT_REPLY) @@ -313,6 +307,7 @@ void syncDataWithRedis(int fd, void *user_data, int mask) { // 接受全部数据 if (mobj->repl_stat == REPL_STATE_FULL_SYNC) { serverLog(LL_NOTICE, "begin receive full data"); + mobj->repl_stat = REPL_STATE_READING_FULL_DATA; readFullData(); } if (mobj->repl_stat == REPL_STATE_CONTINUE_SYNC) { diff --git a/src/redis-migrate.h b/src/redis-migrate.h index 832bd02..0bed185 100644 --- a/src/redis-migrate.h +++ b/src/redis-migrate.h @@ -69,6 +69,7 @@ typedef enum REPL_STATE_SEND_PSYNC, /* Send PSYNC */ REPL_STATE_RECEIVE_PSYNC_REPLY, /* Wait for PSYNC reply */ REPL_STATE_FULL_SYNC, + REPL_STATE_READING_FULL_DATA, REPL_STATE_CONTINUE_SYNC, /* --- End of handshake states --- */ REPL_STATE_TRANSFER, /* Receiving .rdb from master */ -- 2.45.2 From c665fe9a4aa79829c3e8bf5cbca26e7897e7f50c Mon Sep 17 00:00:00 2001 From: zeekling Date: Sun, 10 Jul 2022 16:49:32 +0800 Subject: [PATCH 5/5] add sync --- src/rdbLoad.c | 82 +++++++++++++++++++++++++++------------------------ src/rdbLoad.h | 4 +-- 2 files changed, 45 insertions(+), 41 deletions(-) diff --git a/src/rdbLoad.c b/src/rdbLoad.c index f99c081..097a32c 100644 --- a/src/rdbLoad.c +++ b/src/rdbLoad.c @@ -1,8 +1,12 @@ #include "rdbLoad.h" #include "sds.h" +#include "sdscompat.h" #include #include +#include +#include +#include int rmLoadRioWithLoading(migrateObj *mobj) { char buf[1024]; @@ -34,15 +38,21 @@ int rmLoadRioWithLoading(migrateObj *mobj) { // serverLog(LL_NOTICE, "buf:%s", buf); // type = atoi(buf); // serverLog(LL_NOTICE, "type:%d", type); - char type_buf[2]; + char type_buf[1]; + if (read(mobj->source_cc->fd, &type_buf, 1) == 0) + { + serverLog(LL_WARNING, "read mark failed"); + return C_ERR; + } + while (1) { sds key; robj *val; - if (read(mobj->source_cc->fd, &type_buf, 2) == 0) { + if (read(mobj->source_cc->fd, &type_buf, 1) == 0) { serverLog(LL_WARNING, "read type failed"); return C_ERR; } - int type = type_buf[1] & 0xff; + int type = type_buf[0] & 0xff; serverLog(LL_NOTICE, "type: %d", type); if (type == RDB_OPCODE_EXPIRETIME) { @@ -68,6 +78,7 @@ int rmLoadRioWithLoading(migrateObj *mobj) { lru_idle = qword; continue; } else if (type == RDB_OPCODE_EOF) { + serverLog(LL_NOTICE, "rdb file parse end.."); break; } else if (type == RDB_OPCODE_SELECTDB) { if ((dbid = rmLoadLen(mobj, NULL)) == RDB_LENERR) return C_ERR; @@ -76,41 +87,34 @@ int rmLoadRioWithLoading(migrateObj *mobj) { uint64_t db_size, expires_size; if ((db_size = rmLoadLen(mobj, NULL)) == RDB_LENERR) return C_ERR; if ((expires_size = rmLoadLen(mobj, NULL)) == RDB_LENERR) return C_ERR; - continue; } else if (type == RDB_OPCODE_AUX) { - robj *auxkey, *auxval; - serverLog(LL_NOTICE, "type=RDB_OPCODE_AUX"); + sds *auxkey, *auxval; if ((auxkey = rmLoadStringObject(mobj)) == NULL) { + serverLog(LL_WARNING, "auxkey is null"); return NULL; } if ((auxval = rmLoadStringObject(mobj)) == NULL) { - decrRefCount(auxkey); + serverLog(LL_WARNING, "auxval is null"); return NULL; } - if (((char *)auxkey->ptr)[0] == '%') { - serverLog(LL_NOTICE, "RDB '%s': %s", (char *)auxkey->ptr, (char *)auxval->ptr); - } else if (!strcasecmp(auxkey->ptr, "repl-stream-db")) { - serverLog(LL_NOTICE, "repl-stream-db: %s", auxval->ptr); - } else if (!strcasecmp(auxkey->ptr, "repl-id")) { - memcmp(mobj->psync_replid, auxval->ptr, CONFIG_RUN_ID_SIZE + 1); - serverLog(LL_NOTICE, "repl-id: %s", auxval->ptr); - } else if (!strcasecmp(auxkey->ptr, "repl-offset")) { - serverLog(LL_NOTICE, "repl-offset: %s", auxval->ptr); - } else if (!strcasecmp(auxkey->ptr, "lua") || !strcasecmp(auxkey->ptr, "aof-base") - || !strcasecmp(auxkey->ptr, "redis-bits") || !strcasecmp(auxkey->ptr, "aof-preamble")) { - // do nothing - } else if (!strcasecmp(auxkey->ptr, "redis-ver")) { - serverLog(LL_NOTICE, "redis-ver:%s", auxval->ptr); - } else if (!strcasecmp(auxkey->ptr, "ctime")) { - time_t age = time(NULL) - strtol(auxval->ptr, NULL, 10); - if (age < 0) age = 0; - serverLog(LL_NOTICE, "RDB age %ld seconds", (unsigned long)age); - } else if (!strcasecmp(auxkey->ptr, "used-mem")) { - long long usedmem = strtoll(auxval->ptr, NULL, 10); + if (auxkey[0] == '%') { + serverLog(LL_NOTICE, "RDB '%s': %s", auxkey, auxval); + } else if (!strcasecmp(auxkey, "repl-id")) { + memcmp(mobj->psync_replid, auxval, CONFIG_RUN_ID_SIZE + 1); + serverLog(LL_NOTICE, "repl-id: %s", auxval); + } else if (!strcasecmp(auxkey, "repl-offset")) { + serverLog(LL_NOTICE, "repl-offset: %s", auxval); + } else if (!strcasecmp(auxkey, "lua") || !strcasecmp(auxkey, "aof-base") + || !strcasecmp(auxkey, "redis-bits") || !strcasecmp(auxkey, "aof-preamble") + || !strcasecmp(auxkey, "repl-stream-db") || !strcasecmp(auxkey, "redis-ver") + || !strcasecmp(auxkey, "ctime")) { + serverLog(LL_NOTICE, "RDB '%s': %s", auxkey, auxval); + } else if (!strcasecmp(auxkey, "used-mem")) { + long long usedmem = strtoll(auxval, NULL, 10); serverLog(LL_NOTICE, "RDB memory usage when created %.2f Mb", (double)usedmem / (1024 * 1024)); } else { - serverLog(LL_DEBUG, "Unrecognized RDB AUX field: '%s'", (char *)auxkey->ptr); + serverLog(LL_DEBUG, "Unrecognized RDB AUX field: '%s'", auxkey); } continue; @@ -136,11 +140,11 @@ robj *rmLoadObject(int rdbtype, migrateObj *mobj, sds key, int *error) { return o; } -robj *rmLoadStringObject(migrateObj *mobj) { +sds rmLoadStringObject(migrateObj *mobj) { return rmGenericLoadStringObject(mobj, RDB_LOAD_NONE, NULL); } -void *rmGenericLoadStringObject(migrateObj *mobj, int flags, size_t *lenptr) { +sds rmGenericLoadStringObject(migrateObj *mobj, int flags, size_t *lenptr) { int encode = flags & RDB_LOAD_ENC; int plain = flags & RDB_LOAD_PLAIN; int sds = flags & RDB_LOAD_SDS; @@ -163,7 +167,7 @@ void *rmGenericLoadStringObject(migrateObj *mobj, int flags, size_t *lenptr) { } } if (plain || sds) { - void *buf = plain ? ztrymalloc(len) : sdstrynewlen(SDS_NOINIT, len); + void *buf = sds_malloc(len); if (!buf) { serverLog(LL_WARNING, "rdbGenericLoadStringObject failed allocating %llu bytes", len); return NULL; @@ -176,19 +180,18 @@ void *rmGenericLoadStringObject(migrateObj *mobj, int flags, size_t *lenptr) { sdsfree(buf); return NULL; } - return buf; + return hi_sdsnewlen(buf, len); } else { - robj *o = encode ? tryCreateStringObject(SDS_NOINIT, len) : - tryCreateRawStringObject(SDS_NOINIT, len); - if (!o) { + char buf[len]; + if (!buf) { serverLog(LL_WARNING, "rdbGenericLoadStringObject failed allocating %llu bytes", len); return NULL; } - if (len && read(mobj->source_cc->fd, o->ptr, len)) { - decrRefCount(o); + if (len && read(mobj->source_cc->fd, buf, len) == 0) { return NULL; } - return o; + buf[len] = '\0'; + return hi_sdsnewlen(buf, len); } } @@ -215,6 +218,7 @@ void *rmLoadIntegerObject(migrateObj *mobj, int enctype, int flags, size_t *lenp // rdbReportCorruptRDB("Unknown RDB integer encoding type %d", enctype); return NULL; /* Never reached. */ } + serverLog(LL_NOTICE, "plain=%d, encode=%d", plain, encode); if (plain || sds) { char buf[LONG_STR_SIZE], *p; int len = ll2string(buf, sizeof(buf), val); @@ -225,7 +229,7 @@ void *rmLoadIntegerObject(migrateObj *mobj, int enctype, int flags, size_t *lenp } else if (encode) { return createStringObjectFromLongLongForValue(val); } else { - return createObject(OBJ_STRING, sdsfromlonglong(val)); + return sdsfromlonglong(val); } } diff --git a/src/rdbLoad.h b/src/rdbLoad.h index 159fccc..589b935 100644 --- a/src/rdbLoad.h +++ b/src/rdbLoad.h @@ -61,9 +61,9 @@ void rm_memrev64(void *p); uint64_t rmLoadLen(migrateObj *mobj, int *isencoded); -robj *rmLoadStringObject(migrateObj *mobj); +sds rmLoadStringObject(migrateObj *mobj); -void *rmGenericLoadStringObject(migrateObj *mobj, int flags, size_t *lenptr); +sds rmGenericLoadStringObject(migrateObj *mobj, int flags, size_t *lenptr); void *rmLoadIntegerObject(migrateObj *mobj, int enctype, int flags, size_t *lenptr); -- 2.45.2