full data sync #8
243
src/rdbLoad.c
243
src/rdbLoad.c
@ -1,19 +1,25 @@
|
|||||||
#include "rdbLoad.h"
|
#include "rdbLoad.h"
|
||||||
|
#include "sds.h"
|
||||||
|
#include "sdscompat.h"
|
||||||
|
|
||||||
#include <errno.h>
|
#include <errno.h>
|
||||||
#include <unistd.h>
|
#include <unistd.h>
|
||||||
|
#include <sys/time.h>
|
||||||
|
#include <unistd.h>
|
||||||
|
#include <string.h>
|
||||||
|
|
||||||
int rdbLoadRioWithLoading(migrateObj *mobj) {
|
int rmLoadRioWithLoading(migrateObj *mobj) {
|
||||||
char buf[1024];
|
char buf[1024];
|
||||||
int error;
|
int error;
|
||||||
if (syncReadLine(mobj->source_cc->fd, buf, 9, mobj->timeout) == -1) {
|
if (syncReadLine(mobj->source_cc->fd, buf, 9, mobj->timeout) == -1) {
|
||||||
serverLog(LL_WARNING, "read version failed:%s,port:%d ", mobj->host,
|
serverLog(LL_WARNING, "[rm]read version failed:%s,port:%d ", mobj->host,
|
||||||
mobj->port);
|
mobj->port);
|
||||||
return C_ERR;
|
return C_ERR;
|
||||||
}
|
}
|
||||||
buf[9] = '\0';
|
buf[9] = '\0';
|
||||||
|
serverLog(LL_NOTICE, "[rm] version=%s", buf);
|
||||||
if (memcmp(buf, "REDIS", 5) != 0) {
|
if (memcmp(buf, "REDIS", 5) != 0) {
|
||||||
serverLog(LL_WARNING, "Wrong signature trying to load DB from file");
|
serverLog(LL_WARNING, "[rm] Wrong signature trying to load DB from file");
|
||||||
errno = EINVAL;
|
errno = EINVAL;
|
||||||
return C_ERR;
|
return C_ERR;
|
||||||
}
|
}
|
||||||
@ -25,25 +31,39 @@ int rdbLoadRioWithLoading(migrateObj *mobj) {
|
|||||||
errno = EINVAL;
|
errno = EINVAL;
|
||||||
return C_ERR;
|
return C_ERR;
|
||||||
}
|
}
|
||||||
serverLog(LL_NOTICE, "buf=%s", buf);
|
|
||||||
long long lru_idle = -1, lfu_freq = -1, expiretime = -1, now = mstime();
|
long long lru_idle = -1, lfu_freq = -1, expiretime = -1, now = mstime();
|
||||||
|
|
||||||
|
// syncReadLine(mobj->source_cc->fd, buf, 4, mobj->timeout);
|
||||||
|
// buf[4] = '\0';
|
||||||
|
// serverLog(LL_NOTICE, "buf:%s", buf);
|
||||||
|
// type = atoi(buf);
|
||||||
|
// serverLog(LL_NOTICE, "type:%d", type);
|
||||||
|
char type_buf[1];
|
||||||
|
if (read(mobj->source_cc->fd, &type_buf, 1) == 0)
|
||||||
|
{
|
||||||
|
serverLog(LL_WARNING, "read mark failed");
|
||||||
|
return C_ERR;
|
||||||
|
}
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
sds key;
|
sds key;
|
||||||
robj *val;
|
robj *val;
|
||||||
if ((type = rm_rdbLoadType(mobj)) == -1) {
|
if (read(mobj->source_cc->fd, &type_buf, 1) == 0) {
|
||||||
serverLog(LL_WARNING, "read type failed");
|
serverLog(LL_WARNING, "read type failed");
|
||||||
return C_ERR;
|
return C_ERR;
|
||||||
}
|
}
|
||||||
|
int type = type_buf[0] & 0xff;
|
||||||
|
serverLog(LL_NOTICE, "type: %d", type);
|
||||||
|
|
||||||
if (type == RDB_OPCODE_EXPIRETIME) {
|
if (type == RDB_OPCODE_EXPIRETIME) {
|
||||||
expiretime = rm_rdbLoadTime(mobj);
|
expiretime = rmLoadTime(mobj);
|
||||||
if (expiretime == -1) {
|
if (expiretime == -1) {
|
||||||
return C_ERR;
|
return C_ERR;
|
||||||
}
|
}
|
||||||
expiretime *= 1000;
|
expiretime *= 1000;
|
||||||
continue;
|
continue;
|
||||||
} else if (type == RDB_OPCODE_EXPIRETIME_MS) {
|
} else if (type == RDB_OPCODE_EXPIRETIME_MS) {
|
||||||
expiretime = rm_rdbLoadMillisecondTime(mobj, rdbver);
|
expiretime = rmLoadMillisecondTime(mobj, rdbver);
|
||||||
continue;
|
continue;
|
||||||
} else if (type == RDB_OPCODE_FREQ) {
|
} else if (type == RDB_OPCODE_FREQ) {
|
||||||
uint8_t byte;
|
uint8_t byte;
|
||||||
@ -53,56 +73,235 @@ int rdbLoadRioWithLoading(migrateObj *mobj) {
|
|||||||
continue;
|
continue;
|
||||||
} else if (type == RDB_OPCODE_IDLE) {
|
} else if (type == RDB_OPCODE_IDLE) {
|
||||||
uint64_t qword;
|
uint64_t qword;
|
||||||
if ((qword = rm_rdbLoadLen(mobj, NULL)) == RDB_LENERR)
|
if ((qword = rmLoadLen(mobj, NULL)) == RDB_LENERR)
|
||||||
return C_ERR;
|
return C_ERR;
|
||||||
lru_idle = qword;
|
lru_idle = qword;
|
||||||
continue;
|
continue;
|
||||||
} else if (type == RDB_OPCODE_EOF) {
|
} else if (type == RDB_OPCODE_EOF) {
|
||||||
|
serverLog(LL_NOTICE, "rdb file parse end..");
|
||||||
break;
|
break;
|
||||||
} else if (type == RDB_OPCODE_SELECTDB) {
|
} else if (type == RDB_OPCODE_SELECTDB) {
|
||||||
if ((dbid = rm_rdbLoadLen(mobj, NULL)) == RDB_LENERR) return C_ERR;
|
if ((dbid = rmLoadLen(mobj, NULL)) == RDB_LENERR) return C_ERR;
|
||||||
continue;
|
continue;
|
||||||
} else if (type == RDB_OPCODE_RESIZEDB) {
|
} else if (type == RDB_OPCODE_RESIZEDB) {
|
||||||
uint64_t db_size, expires_size;
|
uint64_t db_size, expires_size;
|
||||||
if ((db_size = rm_rdbLoadLen(mobj, NULL)) == RDB_LENERR) return C_ERR;
|
if ((db_size = rmLoadLen(mobj, NULL)) == RDB_LENERR) return C_ERR;
|
||||||
if ((expires_size = rm_rdbLoadLen(mobj, NULL)) == RDB_LENERR) return C_ERR;
|
if ((expires_size = rmLoadLen(mobj, NULL)) == RDB_LENERR) return C_ERR;
|
||||||
|
|
||||||
continue;
|
continue;
|
||||||
} else if (type == RDB_OPCODE_AUX) {
|
} else if (type == RDB_OPCODE_AUX) {
|
||||||
robj *auxkey, *auxval;
|
sds *auxkey, *auxval;
|
||||||
|
if ((auxkey = rmLoadStringObject(mobj)) == NULL) {
|
||||||
|
serverLog(LL_WARNING, "auxkey is null");
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
if ((auxval = rmLoadStringObject(mobj)) == NULL) {
|
||||||
|
serverLog(LL_WARNING, "auxval is null");
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
if (auxkey[0] == '%') {
|
||||||
|
serverLog(LL_NOTICE, "RDB '%s': %s", auxkey, auxval);
|
||||||
|
} else if (!strcasecmp(auxkey, "repl-id")) {
|
||||||
|
memcmp(mobj->psync_replid, auxval, CONFIG_RUN_ID_SIZE + 1);
|
||||||
|
serverLog(LL_NOTICE, "repl-id: %s", auxval);
|
||||||
|
} else if (!strcasecmp(auxkey, "repl-offset")) {
|
||||||
|
serverLog(LL_NOTICE, "repl-offset: %s", auxval);
|
||||||
|
} else if (!strcasecmp(auxkey, "lua") || !strcasecmp(auxkey, "aof-base")
|
||||||
|
|| !strcasecmp(auxkey, "redis-bits") || !strcasecmp(auxkey, "aof-preamble")
|
||||||
|
|| !strcasecmp(auxkey, "repl-stream-db") || !strcasecmp(auxkey, "redis-ver")
|
||||||
|
|| !strcasecmp(auxkey, "ctime")) {
|
||||||
|
serverLog(LL_NOTICE, "RDB '%s': %s", auxkey, auxval);
|
||||||
|
} else if (!strcasecmp(auxkey, "used-mem")) {
|
||||||
|
long long usedmem = strtoll(auxval, NULL, 10);
|
||||||
|
serverLog(LL_NOTICE, "RDB memory usage when created %.2f Mb", (double)usedmem / (1024 * 1024));
|
||||||
|
} else {
|
||||||
|
serverLog(LL_DEBUG, "Unrecognized RDB AUX field: '%s'", auxkey);
|
||||||
|
}
|
||||||
|
continue;
|
||||||
|
|
||||||
} else if (type == RDB_OPCODE_FUNCTION || type == RDB_OPCODE_FUNCTION2) {
|
} else if (type == RDB_OPCODE_FUNCTION || type == RDB_OPCODE_FUNCTION2) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
if ((key = rmGenericLoadStringObject(mobj, RDB_LOAD_SDS, NULL)) == NULL) {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
val = rmLoadObject(type, mobj, key, &error);
|
||||||
|
|
||||||
|
if (val == NULL) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void *rm_rdbGenericLoadStringObject(migrateObj *mobj, int flags, size_t *lenptr) {
|
robj *rmLoadObject(int rdbtype, migrateObj *mobj, sds key, int *error) {
|
||||||
|
int dbid = 0;
|
||||||
|
robj *o = NULL, *ele, *dec;
|
||||||
|
uint64_t len;
|
||||||
|
unsigned int i;
|
||||||
|
|
||||||
|
return o;
|
||||||
|
}
|
||||||
|
|
||||||
|
sds rmLoadStringObject(migrateObj *mobj) {
|
||||||
|
return rmGenericLoadStringObject(mobj, RDB_LOAD_NONE, NULL);
|
||||||
|
}
|
||||||
|
|
||||||
|
sds rmGenericLoadStringObject(migrateObj *mobj, int flags, size_t *lenptr) {
|
||||||
int encode = flags & RDB_LOAD_ENC;
|
int encode = flags & RDB_LOAD_ENC;
|
||||||
int plain = flags & RDB_LOAD_PLAIN;
|
int plain = flags & RDB_LOAD_PLAIN;
|
||||||
int sds = flags & RDB_LOAD_SDS;
|
int sds = flags & RDB_LOAD_SDS;
|
||||||
int isencoded;
|
int isencoded;
|
||||||
unsigned long long len;
|
unsigned long long len;
|
||||||
|
len = rmLoadLen(mobj, &isencoded);
|
||||||
|
serverLog(LL_NOTICE, "len=%d, isencoded=%d", len, isencoded);
|
||||||
|
if (len == RDB_LENERR) return NULL;
|
||||||
|
if (isencoded) {
|
||||||
|
switch (len) {
|
||||||
|
case RDB_ENC_INT8:
|
||||||
|
case RDB_ENC_INT16:
|
||||||
|
case RDB_ENC_INT32:
|
||||||
|
return rmLoadIntegerObject(mobj, len, flags, lenptr);
|
||||||
|
case RDB_ENC_LZF:
|
||||||
|
return rmLoadLzfStringObject(mobj, flags, lenptr);
|
||||||
|
default:
|
||||||
|
serverLog(LL_WARNING, "Unknown RDB string encoding type %llu", len);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (plain || sds) {
|
||||||
|
void *buf = sds_malloc(len);
|
||||||
|
if (!buf) {
|
||||||
|
serverLog(LL_WARNING, "rdbGenericLoadStringObject failed allocating %llu bytes", len);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
if (lenptr) *lenptr = len;
|
||||||
|
if (len && read(mobj->source_cc->fd, buf, len) == 0) {
|
||||||
|
if (plain)
|
||||||
|
zfree(buf);
|
||||||
|
else
|
||||||
|
sdsfree(buf);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
return hi_sdsnewlen(buf, len);
|
||||||
|
} else {
|
||||||
|
char buf[len];
|
||||||
|
if (!buf) {
|
||||||
|
serverLog(LL_WARNING, "rdbGenericLoadStringObject failed allocating %llu bytes", len);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
if (len && read(mobj->source_cc->fd, buf, len) == 0) {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
buf[len] = '\0';
|
||||||
|
return hi_sdsnewlen(buf, len);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
uint64_t rm_rdbLoadLen(migrateObj *mobj, int *isencoded) {
|
void *rmLoadIntegerObject(migrateObj *mobj, int enctype, int flags, size_t *lenptr) {
|
||||||
|
int plain = flags & RDB_LOAD_PLAIN;
|
||||||
|
int sds = flags & RDB_LOAD_SDS;
|
||||||
|
int encode = flags & RDB_LOAD_ENC;
|
||||||
|
unsigned char enc[4];
|
||||||
|
long long val;
|
||||||
|
if (enctype == RDB_ENC_INT8) {
|
||||||
|
if (read(mobj->source_cc->fd, enc, 1) == 0) return NULL;
|
||||||
|
val = (signed char)enc[0];
|
||||||
|
} else if (enctype == RDB_ENC_INT16) {
|
||||||
|
uint16_t v;
|
||||||
|
if (read(mobj->source_cc->fd, enc, 2) == 0) return NULL;
|
||||||
|
v = ((uint32_t)enc[0]) | ((uint32_t)enc[1] << 8);
|
||||||
|
val = (int16_t)v;
|
||||||
|
} else if (enctype == RDB_ENC_INT32) {
|
||||||
|
uint32_t v;
|
||||||
|
if (read(mobj->source_cc->fd, enc, 4) == 0) return NULL;
|
||||||
|
v = ((uint32_t)enc[0]) | ((uint32_t)enc[1] << 8) | ((uint32_t)enc[2] << 16) | ((uint32_t)enc[3] << 24);
|
||||||
|
val = (int32_t)v;
|
||||||
|
} else {
|
||||||
|
// rdbReportCorruptRDB("Unknown RDB integer encoding type %d", enctype);
|
||||||
|
return NULL; /* Never reached. */
|
||||||
|
}
|
||||||
|
serverLog(LL_NOTICE, "plain=%d, encode=%d", plain, encode);
|
||||||
|
if (plain || sds) {
|
||||||
|
char buf[LONG_STR_SIZE], *p;
|
||||||
|
int len = ll2string(buf, sizeof(buf), val);
|
||||||
|
if (lenptr) *lenptr = len;
|
||||||
|
p = plain ? zmalloc(len) : sdsnewlen(SDS_NOINIT, len);
|
||||||
|
memcpy(p, buf, len);
|
||||||
|
return p;
|
||||||
|
} else if (encode) {
|
||||||
|
return createStringObjectFromLongLongForValue(val);
|
||||||
|
} else {
|
||||||
|
return sdsfromlonglong(val);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void *rmLoadLzfStringObject(migrateObj *mobj, int flags, size_t *lenptr) {
|
||||||
|
int plain = flags & RDB_LOAD_PLAIN;
|
||||||
|
int sds = flags & RDB_LOAD_SDS;
|
||||||
|
uint64_t len, clen;
|
||||||
|
unsigned char *c = NULL;
|
||||||
|
char *val = NULL;
|
||||||
|
if ((clen = rmLoadLen(mobj, NULL)) == RDB_LENERR) return NULL;
|
||||||
|
if ((len = rmLoadLen(mobj, NULL)) == RDB_LENERR) return NULL;
|
||||||
|
if ((c = hi_malloc(clen)) == NULL) {
|
||||||
|
serverLog(LL_WARNING, "rdbLoadLzfStringObject failed allocating %llu bytes", (unsigned long long)clen);
|
||||||
|
goto err;
|
||||||
|
}
|
||||||
|
if (plain) {
|
||||||
|
val = hi_malloc(len);
|
||||||
|
} else {
|
||||||
|
val = hi_sdsnewlen(SDS_NOINIT, len);
|
||||||
|
}
|
||||||
|
if (!val) {
|
||||||
|
serverLog(LL_WARNING, "rdbLoadLzfStringObject failed allocating %llu bytes", (unsigned long long)len);
|
||||||
|
goto err;
|
||||||
|
}
|
||||||
|
if (lenptr) *lenptr = len;
|
||||||
|
if (read(mobj->source_cc->fd, c, len) == 0) return NULL;
|
||||||
|
if (lzf_decompress(c, clen, val, len) != len) {
|
||||||
|
serverLog(LL_WARNING, "Invalid LZF compressed string");
|
||||||
|
goto err;
|
||||||
|
}
|
||||||
|
zfree(c);
|
||||||
|
|
||||||
|
if (plain || sds) {
|
||||||
|
return val;
|
||||||
|
} else {
|
||||||
|
return createObject(OBJ_STRING, val);
|
||||||
|
}
|
||||||
|
err:
|
||||||
|
zfree(c);
|
||||||
|
if (plain)
|
||||||
|
zfree(val);
|
||||||
|
else
|
||||||
|
sdsfree(val);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
uint64_t rmLoadLen(migrateObj *mobj, int *isencoded) {
|
||||||
uint64_t len;
|
uint64_t len;
|
||||||
|
|
||||||
if (rdbLoadLenByRef(mobj, isencoded, &len) == -1)
|
if (rmLoadLenByRef(mobj, isencoded, &len) == -1)
|
||||||
return RDB_LENERR;
|
return RDB_LENERR;
|
||||||
return len;
|
return len;
|
||||||
}
|
}
|
||||||
|
|
||||||
int rm_rdbLoadType(migrateObj *mobj) {
|
int rmLoadType(migrateObj *mobj) {
|
||||||
unsigned char type;
|
unsigned char type;
|
||||||
if (read(mobj->source_cc->fd, &type, 1) == 0) {
|
if (rmRead(mobj, &type, 1) == 0) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
return type;
|
return type;
|
||||||
}
|
}
|
||||||
|
|
||||||
int rm_rdbLoadLenByRef(migrateObj *mobj, int *isencoded, uint64_t *lenptr) {
|
int rmRead(migrateObj *mobj, void *buf, size_t len) {
|
||||||
|
if (read(mobj->source_cc->fd, &buf, len) == 0) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
buf = (char *)buf + len;
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
int rmLoadLenByRef(migrateObj *mobj, int *isencoded, uint64_t *lenptr) {
|
||||||
unsigned char buf[2];
|
unsigned char buf[2];
|
||||||
int type;
|
int type;
|
||||||
if (isencoded)
|
if (isencoded)
|
||||||
@ -138,7 +337,7 @@ int rm_rdbLoadLenByRef(migrateObj *mobj, int *isencoded, uint64_t *lenptr) {
|
|||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
time_t rm_rdbLoadTime(migrateObj *mobj) {
|
time_t rmLoadTime(migrateObj *mobj) {
|
||||||
int32_t t32;
|
int32_t t32;
|
||||||
if (read(mobj->source_cc->fd, &t32, 4) == 0) {
|
if (read(mobj->source_cc->fd, &t32, 4) == 0) {
|
||||||
return -1;
|
return -1;
|
||||||
@ -147,7 +346,7 @@ time_t rm_rdbLoadTime(migrateObj *mobj) {
|
|||||||
return (time_t)t32;
|
return (time_t)t32;
|
||||||
}
|
}
|
||||||
|
|
||||||
long long rm_rdbLoadMillisecondTime(migrateObj *mobj, int rdbver) {
|
long long rmLoadMillisecondTime(migrateObj *mobj, int rdbver) {
|
||||||
int64_t t64;
|
int64_t t64;
|
||||||
if (read(mobj->source_cc->fd, &t64, 8) == 0) {
|
if (read(mobj->source_cc->fd, &t64, 8) == 0) {
|
||||||
return LLONG_MAX;
|
return LLONG_MAX;
|
||||||
|
@ -27,22 +27,48 @@
|
|||||||
#define RDB_LOAD_PLAIN (1 << 1)
|
#define RDB_LOAD_PLAIN (1 << 1)
|
||||||
#define RDB_LOAD_SDS (1 << 2)
|
#define RDB_LOAD_SDS (1 << 2)
|
||||||
|
|
||||||
|
#define RDB_ENC_INT8 0 /* 8 bit signed integer */
|
||||||
|
#define RDB_ENC_INT16 1 /* 16 bit signed integer */
|
||||||
|
#define RDB_ENC_INT32 2 /* 32 bit signed integer */
|
||||||
|
#define RDB_ENC_LZF 3 /* string compressed with FASTLZ */
|
||||||
|
|
||||||
|
#define LONG_STR_SIZE 21
|
||||||
|
extern const char *SDS_NOINIT;
|
||||||
|
#define OBJ_STRING 0 /* String object. */
|
||||||
|
#define OBJ_LIST 1 /* List object. */
|
||||||
|
#define OBJ_SET 2 /* Set object. */
|
||||||
|
#define OBJ_ZSET 3 /* Sorted set object. */
|
||||||
|
#define OBJ_HASH 4 /* Hash object. */
|
||||||
|
|
||||||
|
#define htonu64(v) intrev64(v)
|
||||||
|
#define ntohu64(v) intrev64(v)
|
||||||
|
|
||||||
#define LLONG_MAX __LONG_LONG_MAX__
|
#define LLONG_MAX __LONG_LONG_MAX__
|
||||||
|
|
||||||
int rm_rdbLoadRioWithLoading(migrateObj *mobj);
|
int rmRead(migrateObj *mobj, void *buf, size_t len);
|
||||||
|
|
||||||
int rm_rdbLoadType(migrateObj *mobj);
|
int rmLoadRioWithLoading(migrateObj *mobj);
|
||||||
|
|
||||||
time_t rm_rdbLoadTime(migrateObj *mobj);
|
int rmLoadType(migrateObj *mobj);
|
||||||
|
|
||||||
long long rm_rdbLoadMillisecondTime(migrateObj *mobj, int rdbver);
|
time_t rmLoadTime(migrateObj *mobj);
|
||||||
|
|
||||||
int rm_rdbLoadLenByRef(migrateObj *mobi, int *isencoded, uint64_t *lenptr);
|
long long rmLoadMillisecondTime(migrateObj *mobj, int rdbver);
|
||||||
|
|
||||||
|
int rmLoadLenByRef(migrateObj *mobi, int *isencoded, uint64_t *lenptr);
|
||||||
|
|
||||||
void rm_memrev64(void *p);
|
void rm_memrev64(void *p);
|
||||||
|
|
||||||
uint64_t rm_rdbLoadLen(migrateObj *mobj, int *isencoded);
|
uint64_t rmLoadLen(migrateObj *mobj, int *isencoded);
|
||||||
|
|
||||||
void *rm_rdbGenericLoadStringObject(migrateObj *mobj, int flags, size_t *lenptr);
|
sds rmLoadStringObject(migrateObj *mobj);
|
||||||
|
|
||||||
|
sds rmGenericLoadStringObject(migrateObj *mobj, int flags, size_t *lenptr);
|
||||||
|
|
||||||
|
void *rmLoadIntegerObject(migrateObj *mobj, int enctype, int flags, size_t *lenptr);
|
||||||
|
|
||||||
|
void *rmLoadLzfStringObject(migrateObj *mobj, int flags, size_t *lenptr);
|
||||||
|
|
||||||
|
robj *rmLoadObject(int rdbtype, migrateObj *mobj, sds key, int *error);
|
||||||
|
|
||||||
#endif
|
#endif
|
||||||
|
@ -147,9 +147,7 @@ void readFullData() {
|
|||||||
goto error;
|
goto error;
|
||||||
}
|
}
|
||||||
if (buf[0] == '-') {
|
if (buf[0] == '-') {
|
||||||
serverLog(LL_WARNING,
|
serverLog(LL_WARNING, "MASTER aborted replication with an error: %s", buf + 1);
|
||||||
"MASTER aborted replication with an error: %s",
|
|
||||||
buf + 1);
|
|
||||||
goto error;
|
goto error;
|
||||||
} else if (buf[0] == '\0') {
|
} else if (buf[0] == '\0') {
|
||||||
// mobj->repl_transfer_lastio = server.unixtime;
|
// mobj->repl_transfer_lastio = server.unixtime;
|
||||||
@ -176,7 +174,7 @@ void readFullData() {
|
|||||||
(long long)mobj->repl_transfer_size);
|
(long long)mobj->repl_transfer_size);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
int flag = rdbLoadRioWithLoading(mobj);
|
int flag = rmLoadRioWithLoading(mobj);
|
||||||
|
|
||||||
return;
|
return;
|
||||||
error:
|
error:
|
||||||
@ -292,21 +290,30 @@ void syncDataWithRedis(int fd, void *user_data, int mask) {
|
|||||||
mobj->repl_stat = REPL_STATE_RECEIVE_PSYNC_REPLY;
|
mobj->repl_stat = REPL_STATE_RECEIVE_PSYNC_REPLY;
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
if (mobj->repl_stat != REPL_STATE_RECEIVE_PSYNC_REPLY) {
|
|
||||||
serverLog(LL_WARNING, "syncDataWithRedis(): state machine error, state should be RECEIVE_PSYNC but is %d",
|
if (mobj->repl_stat == REPL_STATE_RECEIVE_PSYNC_REPLY) {
|
||||||
mobj->repl_stat);
|
int psync_result = receiveDataFromRedis();
|
||||||
goto error;
|
if (psync_result == PSYNC_WAIT_REPLY)
|
||||||
}
|
return;
|
||||||
int psync_result = receiveDataFromRedis();
|
if (psync_result == PSYNC_TRY_LATER)
|
||||||
if (psync_result == PSYNC_WAIT_REPLY)
|
goto error;
|
||||||
return;
|
if (psync_result == PSYNC_CONTINUE) {
|
||||||
if (psync_result == PSYNC_TRY_LATER)
|
mobj->repl_stat = REPL_STATE_CONTINUE_SYNC;
|
||||||
goto error;
|
} else {
|
||||||
if (psync_result == PSYNC_CONTINUE) {
|
mobj->repl_stat = REPL_STATE_FULL_SYNC;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// 接受全部数据
|
// 接受全部数据
|
||||||
serverLog(LL_NOTICE, "begin receive full data");
|
if (mobj->repl_stat == REPL_STATE_FULL_SYNC) {
|
||||||
readFullData();
|
serverLog(LL_NOTICE, "begin receive full data");
|
||||||
|
mobj->repl_stat = REPL_STATE_READING_FULL_DATA;
|
||||||
|
readFullData();
|
||||||
|
}
|
||||||
|
if (mobj->repl_stat == REPL_STATE_CONTINUE_SYNC) {
|
||||||
|
serverLog(LL_NOTICE, "begin receive continue data");
|
||||||
|
}
|
||||||
|
|
||||||
return;
|
return;
|
||||||
error:
|
error:
|
||||||
if (err != NULL) {
|
if (err != NULL) {
|
||||||
|
@ -68,6 +68,9 @@ typedef enum
|
|||||||
REPL_STATE_RECEIVE_CAPA_REPLY, /* Wait for REPLCONF reply */
|
REPL_STATE_RECEIVE_CAPA_REPLY, /* Wait for REPLCONF reply */
|
||||||
REPL_STATE_SEND_PSYNC, /* Send PSYNC */
|
REPL_STATE_SEND_PSYNC, /* Send PSYNC */
|
||||||
REPL_STATE_RECEIVE_PSYNC_REPLY, /* Wait for PSYNC reply */
|
REPL_STATE_RECEIVE_PSYNC_REPLY, /* Wait for PSYNC reply */
|
||||||
|
REPL_STATE_FULL_SYNC,
|
||||||
|
REPL_STATE_READING_FULL_DATA,
|
||||||
|
REPL_STATE_CONTINUE_SYNC,
|
||||||
/* --- End of handshake states --- */
|
/* --- End of handshake states --- */
|
||||||
REPL_STATE_TRANSFER, /* Receiving .rdb from master */
|
REPL_STATE_TRANSFER, /* Receiving .rdb from master */
|
||||||
REPL_STATE_CONNECTED, /* Connected to master */
|
REPL_STATE_CONNECTED, /* Connected to master */
|
||||||
|
Loading…
Reference in New Issue
Block a user