From c665fe9a4aa79829c3e8bf5cbca26e7897e7f50c Mon Sep 17 00:00:00 2001 From: zeekling Date: Sun, 10 Jul 2022 16:49:32 +0800 Subject: [PATCH] add sync --- src/rdbLoad.c | 82 +++++++++++++++++++++++++++------------------------ src/rdbLoad.h | 4 +-- 2 files changed, 45 insertions(+), 41 deletions(-) diff --git a/src/rdbLoad.c b/src/rdbLoad.c index f99c081..097a32c 100644 --- a/src/rdbLoad.c +++ b/src/rdbLoad.c @@ -1,8 +1,12 @@ #include "rdbLoad.h" #include "sds.h" +#include "sdscompat.h" #include #include +#include +#include +#include int rmLoadRioWithLoading(migrateObj *mobj) { char buf[1024]; @@ -34,15 +38,21 @@ int rmLoadRioWithLoading(migrateObj *mobj) { // serverLog(LL_NOTICE, "buf:%s", buf); // type = atoi(buf); // serverLog(LL_NOTICE, "type:%d", type); - char type_buf[2]; + char type_buf[1]; + if (read(mobj->source_cc->fd, &type_buf, 1) == 0) + { + serverLog(LL_WARNING, "read mark failed"); + return C_ERR; + } + while (1) { sds key; robj *val; - if (read(mobj->source_cc->fd, &type_buf, 2) == 0) { + if (read(mobj->source_cc->fd, &type_buf, 1) == 0) { serverLog(LL_WARNING, "read type failed"); return C_ERR; } - int type = type_buf[1] & 0xff; + int type = type_buf[0] & 0xff; serverLog(LL_NOTICE, "type: %d", type); if (type == RDB_OPCODE_EXPIRETIME) { @@ -68,6 +78,7 @@ int rmLoadRioWithLoading(migrateObj *mobj) { lru_idle = qword; continue; } else if (type == RDB_OPCODE_EOF) { + serverLog(LL_NOTICE, "rdb file parse end.."); break; } else if (type == RDB_OPCODE_SELECTDB) { if ((dbid = rmLoadLen(mobj, NULL)) == RDB_LENERR) return C_ERR; @@ -76,41 +87,34 @@ int rmLoadRioWithLoading(migrateObj *mobj) { uint64_t db_size, expires_size; if ((db_size = rmLoadLen(mobj, NULL)) == RDB_LENERR) return C_ERR; if ((expires_size = rmLoadLen(mobj, NULL)) == RDB_LENERR) return C_ERR; - continue; } else if (type == RDB_OPCODE_AUX) { - robj *auxkey, *auxval; - serverLog(LL_NOTICE, "type=RDB_OPCODE_AUX"); + sds *auxkey, *auxval; if ((auxkey = rmLoadStringObject(mobj)) == NULL) { + serverLog(LL_WARNING, "auxkey is null"); return NULL; } if ((auxval = rmLoadStringObject(mobj)) == NULL) { - decrRefCount(auxkey); + serverLog(LL_WARNING, "auxval is null"); return NULL; } - if (((char *)auxkey->ptr)[0] == '%') { - serverLog(LL_NOTICE, "RDB '%s': %s", (char *)auxkey->ptr, (char *)auxval->ptr); - } else if (!strcasecmp(auxkey->ptr, "repl-stream-db")) { - serverLog(LL_NOTICE, "repl-stream-db: %s", auxval->ptr); - } else if (!strcasecmp(auxkey->ptr, "repl-id")) { - memcmp(mobj->psync_replid, auxval->ptr, CONFIG_RUN_ID_SIZE + 1); - serverLog(LL_NOTICE, "repl-id: %s", auxval->ptr); - } else if (!strcasecmp(auxkey->ptr, "repl-offset")) { - serverLog(LL_NOTICE, "repl-offset: %s", auxval->ptr); - } else if (!strcasecmp(auxkey->ptr, "lua") || !strcasecmp(auxkey->ptr, "aof-base") - || !strcasecmp(auxkey->ptr, "redis-bits") || !strcasecmp(auxkey->ptr, "aof-preamble")) { - // do nothing - } else if (!strcasecmp(auxkey->ptr, "redis-ver")) { - serverLog(LL_NOTICE, "redis-ver:%s", auxval->ptr); - } else if (!strcasecmp(auxkey->ptr, "ctime")) { - time_t age = time(NULL) - strtol(auxval->ptr, NULL, 10); - if (age < 0) age = 0; - serverLog(LL_NOTICE, "RDB age %ld seconds", (unsigned long)age); - } else if (!strcasecmp(auxkey->ptr, "used-mem")) { - long long usedmem = strtoll(auxval->ptr, NULL, 10); + if (auxkey[0] == '%') { + serverLog(LL_NOTICE, "RDB '%s': %s", auxkey, auxval); + } else if (!strcasecmp(auxkey, "repl-id")) { + memcmp(mobj->psync_replid, auxval, CONFIG_RUN_ID_SIZE + 1); + serverLog(LL_NOTICE, "repl-id: %s", auxval); + } else if (!strcasecmp(auxkey, "repl-offset")) { + serverLog(LL_NOTICE, "repl-offset: %s", auxval); + } else if (!strcasecmp(auxkey, "lua") || !strcasecmp(auxkey, "aof-base") + || !strcasecmp(auxkey, "redis-bits") || !strcasecmp(auxkey, "aof-preamble") + || !strcasecmp(auxkey, "repl-stream-db") || !strcasecmp(auxkey, "redis-ver") + || !strcasecmp(auxkey, "ctime")) { + serverLog(LL_NOTICE, "RDB '%s': %s", auxkey, auxval); + } else if (!strcasecmp(auxkey, "used-mem")) { + long long usedmem = strtoll(auxval, NULL, 10); serverLog(LL_NOTICE, "RDB memory usage when created %.2f Mb", (double)usedmem / (1024 * 1024)); } else { - serverLog(LL_DEBUG, "Unrecognized RDB AUX field: '%s'", (char *)auxkey->ptr); + serverLog(LL_DEBUG, "Unrecognized RDB AUX field: '%s'", auxkey); } continue; @@ -136,11 +140,11 @@ robj *rmLoadObject(int rdbtype, migrateObj *mobj, sds key, int *error) { return o; } -robj *rmLoadStringObject(migrateObj *mobj) { +sds rmLoadStringObject(migrateObj *mobj) { return rmGenericLoadStringObject(mobj, RDB_LOAD_NONE, NULL); } -void *rmGenericLoadStringObject(migrateObj *mobj, int flags, size_t *lenptr) { +sds rmGenericLoadStringObject(migrateObj *mobj, int flags, size_t *lenptr) { int encode = flags & RDB_LOAD_ENC; int plain = flags & RDB_LOAD_PLAIN; int sds = flags & RDB_LOAD_SDS; @@ -163,7 +167,7 @@ void *rmGenericLoadStringObject(migrateObj *mobj, int flags, size_t *lenptr) { } } if (plain || sds) { - void *buf = plain ? ztrymalloc(len) : sdstrynewlen(SDS_NOINIT, len); + void *buf = sds_malloc(len); if (!buf) { serverLog(LL_WARNING, "rdbGenericLoadStringObject failed allocating %llu bytes", len); return NULL; @@ -176,19 +180,18 @@ void *rmGenericLoadStringObject(migrateObj *mobj, int flags, size_t *lenptr) { sdsfree(buf); return NULL; } - return buf; + return hi_sdsnewlen(buf, len); } else { - robj *o = encode ? tryCreateStringObject(SDS_NOINIT, len) : - tryCreateRawStringObject(SDS_NOINIT, len); - if (!o) { + char buf[len]; + if (!buf) { serverLog(LL_WARNING, "rdbGenericLoadStringObject failed allocating %llu bytes", len); return NULL; } - if (len && read(mobj->source_cc->fd, o->ptr, len)) { - decrRefCount(o); + if (len && read(mobj->source_cc->fd, buf, len) == 0) { return NULL; } - return o; + buf[len] = '\0'; + return hi_sdsnewlen(buf, len); } } @@ -215,6 +218,7 @@ void *rmLoadIntegerObject(migrateObj *mobj, int enctype, int flags, size_t *lenp // rdbReportCorruptRDB("Unknown RDB integer encoding type %d", enctype); return NULL; /* Never reached. */ } + serverLog(LL_NOTICE, "plain=%d, encode=%d", plain, encode); if (plain || sds) { char buf[LONG_STR_SIZE], *p; int len = ll2string(buf, sizeof(buf), val); @@ -225,7 +229,7 @@ void *rmLoadIntegerObject(migrateObj *mobj, int enctype, int flags, size_t *lenp } else if (encode) { return createStringObjectFromLongLongForValue(val); } else { - return createObject(OBJ_STRING, sdsfromlonglong(val)); + return sdsfromlonglong(val); } } diff --git a/src/rdbLoad.h b/src/rdbLoad.h index 159fccc..589b935 100644 --- a/src/rdbLoad.h +++ b/src/rdbLoad.h @@ -61,9 +61,9 @@ void rm_memrev64(void *p); uint64_t rmLoadLen(migrateObj *mobj, int *isencoded); -robj *rmLoadStringObject(migrateObj *mobj); +sds rmLoadStringObject(migrateObj *mobj); -void *rmGenericLoadStringObject(migrateObj *mobj, int flags, size_t *lenptr); +sds rmGenericLoadStringObject(migrateObj *mobj, int flags, size_t *lenptr); void *rmLoadIntegerObject(migrateObj *mobj, int enctype, int flags, size_t *lenptr);