full data sync #8

Merged
zeekling merged 5 commits from sync into master 2022-07-10 08:50:20 +00:00
2 changed files with 181 additions and 25 deletions
Showing only changes of commit f8c1d0f251 - Show all commits

View File

@ -1,4 +1,5 @@
#include "rdbLoad.h" #include "rdbLoad.h"
#include "sds.h"
#include <errno.h> #include <errno.h>
#include <unistd.h> #include <unistd.h>
@ -7,13 +8,13 @@ int rdbLoadRioWithLoading(migrateObj *mobj) {
char buf[1024]; char buf[1024];
int error; int error;
if (syncReadLine(mobj->source_cc->fd, buf, 9, mobj->timeout) == -1) { if (syncReadLine(mobj->source_cc->fd, buf, 9, mobj->timeout) == -1) {
serverLog(LL_WARNING, "read version failed:%s,port:%d ", mobj->host, serverLog(LL_WARNING, "[rm]read version failed:%s,port:%d ", mobj->host,
mobj->port); mobj->port);
return C_ERR; return C_ERR;
} }
buf[9] = '\0'; buf[9] = '\0';
if (memcmp(buf, "REDIS", 5) != 0) { if (memcmp(buf, "REDIS", 5) != 0) {
serverLog(LL_WARNING, "Wrong signature trying to load DB from file"); serverLog(LL_WARNING, "[rm] Wrong signature trying to load DB from file");
errno = EINVAL; errno = EINVAL;
return C_ERR; return C_ERR;
} }
@ -25,25 +26,24 @@ int rdbLoadRioWithLoading(migrateObj *mobj) {
errno = EINVAL; errno = EINVAL;
return C_ERR; return C_ERR;
} }
serverLog(LL_NOTICE, "buf=%s", buf);
long long lru_idle = -1, lfu_freq = -1, expiretime = -1, now = mstime(); long long lru_idle = -1, lfu_freq = -1, expiretime = -1, now = mstime();
while (1) { while (1) {
sds key; sds key;
robj *val; robj *val;
if ((type = rm_rdbLoadType(mobj)) == -1) { if ((type = rmLoadType(mobj)) == -1) {
serverLog(LL_WARNING, "read type failed"); serverLog(LL_WARNING, "read type failed");
return C_ERR; return C_ERR;
} }
if (type == RDB_OPCODE_EXPIRETIME) { if (type == RDB_OPCODE_EXPIRETIME) {
expiretime = rm_rdbLoadTime(mobj); expiretime = rmLoadTime(mobj);
if (expiretime == -1) { if (expiretime == -1) {
return C_ERR; return C_ERR;
} }
expiretime *= 1000; expiretime *= 1000;
continue; continue;
} else if (type == RDB_OPCODE_EXPIRETIME_MS) { } else if (type == RDB_OPCODE_EXPIRETIME_MS) {
expiretime = rm_rdbLoadMillisecondTime(mobj, rdbver); expiretime = rmLoadMillisecondTime(mobj, rdbver);
continue; continue;
} else if (type == RDB_OPCODE_FREQ) { } else if (type == RDB_OPCODE_FREQ) {
uint8_t byte; uint8_t byte;
@ -53,47 +53,181 @@ int rdbLoadRioWithLoading(migrateObj *mobj) {
continue; continue;
} else if (type == RDB_OPCODE_IDLE) { } else if (type == RDB_OPCODE_IDLE) {
uint64_t qword; uint64_t qword;
if ((qword = rm_rdbLoadLen(mobj, NULL)) == RDB_LENERR) if ((qword = rmLoadLen(mobj, NULL)) == RDB_LENERR)
return C_ERR; return C_ERR;
lru_idle = qword; lru_idle = qword;
continue; continue;
} else if (type == RDB_OPCODE_EOF) { } else if (type == RDB_OPCODE_EOF) {
break; break;
} else if (type == RDB_OPCODE_SELECTDB) { } else if (type == RDB_OPCODE_SELECTDB) {
if ((dbid = rm_rdbLoadLen(mobj, NULL)) == RDB_LENERR) return C_ERR; if ((dbid = rmLoadLen(mobj, NULL)) == RDB_LENERR) return C_ERR;
continue; continue;
} else if (type == RDB_OPCODE_RESIZEDB) { } else if (type == RDB_OPCODE_RESIZEDB) {
uint64_t db_size, expires_size; uint64_t db_size, expires_size;
if ((db_size = rm_rdbLoadLen(mobj, NULL)) == RDB_LENERR) return C_ERR; if ((db_size = rmLoadLen(mobj, NULL)) == RDB_LENERR) return C_ERR;
if ((expires_size = rm_rdbLoadLen(mobj, NULL)) == RDB_LENERR) return C_ERR; if ((expires_size = rmLoadLen(mobj, NULL)) == RDB_LENERR) return C_ERR;
continue; continue;
} else if (type == RDB_OPCODE_AUX) { } else if (type == RDB_OPCODE_AUX) {
robj *auxkey, *auxval; robj *auxkey, *auxval;
if ((auxkey = rmLoadStringObject(mobj)) == NULL) {
return NULL;
}
if ((auxval = rmLoadStringObject(mobj)) == NULL) {
decrRefCount(auxkey);
return NULL;
}
if (((char *)auxkey->ptr)[0] == '%') {
serverLog(LL_NOTICE, "RDB '%s': %s", (char *)auxkey->ptr, (char *)auxval->ptr);
} else if (!strcasecmp(auxkey->ptr, "repl-stream-db")) {
serverLog(LL_NOTICE, "repl-stream-db: %s", auxval->ptr);
} else if (!strcasecmp(auxkey->ptr, "repl-id")) {
memcmp(mobj->psync_replid, auxval->ptr, CONFIG_RUN_ID_SIZE + 1);
serverLog(LL_NOTICE, "repl-id: %s", auxval->ptr);
} else if (!strcasecmp(auxkey->ptr, "repl-offset")) {
serverLog(LL_NOTICE, "repl-offset: %s", auxval->ptr);
} else if (!strcasecmp(auxkey->ptr, "lua") || !strcasecmp(auxkey->ptr, "aof-base")
|| !strcasecmp(auxkey->ptr, "redis-bits") || !strcasecmp(auxkey->ptr, "aof-preamble")) {
// do nothing
} else if (!strcasecmp(auxkey->ptr, "redis-ver")) {
serverLog(LL_NOTICE, "redis-ver:%s", auxval->ptr);
} else if (!strcasecmp(auxkey->ptr, "ctime")) {
time_t age = time(NULL) - strtol(auxval->ptr, NULL, 10);
if (age < 0) age = 0;
serverLog(LL_NOTICE, "RDB age %ld seconds", (unsigned long)age);
} else if (!strcasecmp(auxkey->ptr, "used-mem")) {
long long usedmem = strtoll(auxval->ptr, NULL, 10);
serverLog(LL_NOTICE, "RDB memory usage when created %.2f Mb", (double)usedmem / (1024 * 1024));
} else {
serverLog(LL_DEBUG, "Unrecognized RDB AUX field: '%s'", (char *)auxkey->ptr);
}
continue;
} else if (type == RDB_OPCODE_FUNCTION || type == RDB_OPCODE_FUNCTION2) { } else if (type == RDB_OPCODE_FUNCTION || type == RDB_OPCODE_FUNCTION2) {
continue;
}
if ((key = rmGenericLoadStringObject(mobj, RDB_LOAD_SDS, NULL)) == NULL) {
return NULL;
} }
} }
} }
void *rm_rdbGenericLoadStringObject(migrateObj *mobj, int flags, size_t *lenptr) { robj *rmLoadStringObject(migrateObj *mobj) {
return rmGenericLoadStringObject(mobj, RDB_LOAD_NONE, NULL);
}
void *rmGenericLoadStringObject(migrateObj *mobj, int flags, size_t *lenptr) {
int encode = flags & RDB_LOAD_ENC; int encode = flags & RDB_LOAD_ENC;
int plain = flags & RDB_LOAD_PLAIN; int plain = flags & RDB_LOAD_PLAIN;
int sds = flags & RDB_LOAD_SDS; int sds = flags & RDB_LOAD_SDS;
int isencoded; int isencoded;
unsigned long long len; unsigned long long len;
len = rmLoadLen(mobj, &isencoded);
if (len == RDB_LENERR) return NULL;
if (isencoded) {
switch (len) {
case RDB_ENC_INT8:
case RDB_ENC_INT16:
case RDB_ENC_INT32:
return rmLoadIntegerObject(mobj, len, flags, lenptr);
case RDB_ENC_LZF:
return rmLoadLzfStringObject(mobj, flags, lenptr);
default:
serverLog(LL_WARNING, "Unknown RDB string encoding type %llu", len);
return NULL;
}
}
} }
uint64_t rm_rdbLoadLen(migrateObj *mobj, int *isencoded) { void *rmLoadIntegerObject(migrateObj *mobj, int enctype, int flags, size_t *lenptr) {
int plain = flags & RDB_LOAD_PLAIN;
int sds = flags & RDB_LOAD_SDS;
int encode = flags & RDB_LOAD_ENC;
unsigned char enc[4];
long long val;
if (enctype == RDB_ENC_INT8) {
if (read(mobj->source_cc->fd, enc, 1) == 0) return NULL;
val = (signed char)enc[0];
} else if (enctype == RDB_ENC_INT16) {
uint16_t v;
if (read(mobj->source_cc->fd, enc, 2) == 0) return NULL;
v = ((uint32_t)enc[0]) | ((uint32_t)enc[1] << 8);
val = (int16_t)v;
} else if (enctype == RDB_ENC_INT32) {
uint32_t v;
if (read(mobj->source_cc->fd, enc, 4) == 0) return NULL;
v = ((uint32_t)enc[0]) | ((uint32_t)enc[1] << 8) | ((uint32_t)enc[2] << 16) | ((uint32_t)enc[3] << 24);
val = (int32_t)v;
} else {
// rdbReportCorruptRDB("Unknown RDB integer encoding type %d", enctype);
return NULL; /* Never reached. */
}
if (plain || sds) {
char buf[LONG_STR_SIZE], *p;
int len = ll2string(buf, sizeof(buf), val);
if (lenptr) *lenptr = len;
p = plain ? zmalloc(len) : sdsnewlen(SDS_NOINIT, len);
memcpy(p, buf, len);
return p;
} else if (encode) {
return createStringObjectFromLongLongForValue(val);
} else {
return createObject(OBJ_STRING, sdsfromlonglong(val));
}
}
void *rmLoadLzfStringObject(migrateObj *mobj, int flags, size_t *lenptr) {
int plain = flags & RDB_LOAD_PLAIN;
int sds = flags & RDB_LOAD_SDS;
uint64_t len, clen;
unsigned char *c = NULL;
char *val = NULL;
if ((clen = rmLoadLen(mobj, NULL)) == RDB_LENERR) return NULL;
if ((len = rmLoadLen(mobj, NULL)) == RDB_LENERR) return NULL;
if ((c = hi_malloc(clen)) == NULL) {
serverLog(LL_WARNING, "rdbLoadLzfStringObject failed allocating %llu bytes", (unsigned long long)clen);
goto err;
}
if (plain) {
val = hi_malloc(len);
} else {
val = hi_sdsnewlen(SDS_NOINIT, len);
}
if (!val) {
serverLog(LL_WARNING, "rdbLoadLzfStringObject failed allocating %llu bytes", (unsigned long long)len);
goto err;
}
if (lenptr) *lenptr = len;
if (read(mobj->source_cc->fd, c, len) == 0) return NULL;
if (lzf_decompress(c, clen, val, len) != len) {
serverLog(LL_WARNING, "Invalid LZF compressed string");
goto err;
}
zfree(c);
if (plain || sds) {
return val;
} else {
return createObject(OBJ_STRING, val);
}
err:
zfree(c);
if (plain)
zfree(val);
else
sdsfree(val);
return NULL;
}
uint64_t rmLoadLen(migrateObj *mobj, int *isencoded) {
uint64_t len; uint64_t len;
if (rdbLoadLenByRef(mobj, isencoded, &len) == -1) if (rmLoadLenByRef(mobj, isencoded, &len) == -1)
return RDB_LENERR; return RDB_LENERR;
return len; return len;
} }
int rm_rdbLoadType(migrateObj *mobj) { int rmLoadType(migrateObj *mobj) {
unsigned char type; unsigned char type;
if (read(mobj->source_cc->fd, &type, 1) == 0) { if (read(mobj->source_cc->fd, &type, 1) == 0) {
return -1; return -1;
@ -102,7 +236,7 @@ int rm_rdbLoadType(migrateObj *mobj) {
return type; return type;
} }
int rm_rdbLoadLenByRef(migrateObj *mobj, int *isencoded, uint64_t *lenptr) { int rmLoadLenByRef(migrateObj *mobj, int *isencoded, uint64_t *lenptr) {
unsigned char buf[2]; unsigned char buf[2];
int type; int type;
if (isencoded) if (isencoded)
@ -138,7 +272,7 @@ int rm_rdbLoadLenByRef(migrateObj *mobj, int *isencoded, uint64_t *lenptr) {
return 0; return 0;
} }
time_t rm_rdbLoadTime(migrateObj *mobj) { time_t rmLoadTime(migrateObj *mobj) {
int32_t t32; int32_t t32;
if (read(mobj->source_cc->fd, &t32, 4) == 0) { if (read(mobj->source_cc->fd, &t32, 4) == 0) {
return -1; return -1;
@ -147,7 +281,7 @@ time_t rm_rdbLoadTime(migrateObj *mobj) {
return (time_t)t32; return (time_t)t32;
} }
long long rm_rdbLoadMillisecondTime(migrateObj *mobj, int rdbver) { long long rmLoadMillisecondTime(migrateObj *mobj, int rdbver) {
int64_t t64; int64_t t64;
if (read(mobj->source_cc->fd, &t64, 8) == 0) { if (read(mobj->source_cc->fd, &t64, 8) == 0) {
return LLONG_MAX; return LLONG_MAX;

View File

@ -27,22 +27,44 @@
#define RDB_LOAD_PLAIN (1 << 1) #define RDB_LOAD_PLAIN (1 << 1)
#define RDB_LOAD_SDS (1 << 2) #define RDB_LOAD_SDS (1 << 2)
#define RDB_ENC_INT8 0 /* 8 bit signed integer */
#define RDB_ENC_INT16 1 /* 16 bit signed integer */
#define RDB_ENC_INT32 2 /* 32 bit signed integer */
#define RDB_ENC_LZF 3 /* string compressed with FASTLZ */
#define LONG_STR_SIZE 21
extern const char *SDS_NOINIT;
#define OBJ_STRING 0 /* String object. */
#define OBJ_LIST 1 /* List object. */
#define OBJ_SET 2 /* Set object. */
#define OBJ_ZSET 3 /* Sorted set object. */
#define OBJ_HASH 4 /* Hash object. */
#define htonu64(v) intrev64(v)
#define ntohu64(v) intrev64(v)
#define LLONG_MAX __LONG_LONG_MAX__ #define LLONG_MAX __LONG_LONG_MAX__
int rm_rdbLoadRioWithLoading(migrateObj *mobj); int rmLoadRioWithLoading(migrateObj *mobj);
int rm_rdbLoadType(migrateObj *mobj); int rmLoadType(migrateObj *mobj);
time_t rm_rdbLoadTime(migrateObj *mobj); time_t rmLoadTime(migrateObj *mobj);
long long rm_rdbLoadMillisecondTime(migrateObj *mobj, int rdbver); long long rmLoadMillisecondTime(migrateObj *mobj, int rdbver);
int rm_rdbLoadLenByRef(migrateObj *mobi, int *isencoded, uint64_t *lenptr); int rmLoadLenByRef(migrateObj *mobi, int *isencoded, uint64_t *lenptr);
void rm_memrev64(void *p); void rm_memrev64(void *p);
uint64_t rm_rdbLoadLen(migrateObj *mobj, int *isencoded); uint64_t rmLoadLen(migrateObj *mobj, int *isencoded);
void *rm_rdbGenericLoadStringObject(migrateObj *mobj, int flags, size_t *lenptr); robj *rmLoadStringObject(migrateObj *mobj);
void *rmGenericLoadStringObject(migrateObj *mobj, int flags, size_t *lenptr);
void *rmLoadIntegerObject(migrateObj *mobj, int enctype, int flags, size_t *lenptr);
void *rmLoadLzfStringObject(migrateObj *mobj, int flags, size_t *lenptr);
#endif #endif