This commit is contained in:
LingZhaoHui 2022-07-10 01:05:24 +08:00
parent a5551d371a
commit 7258e80989
3 changed files with 40 additions and 14 deletions

View File

@ -13,7 +13,7 @@ int rmLoadRioWithLoading(migrateObj *mobj) {
return C_ERR; return C_ERR;
} }
buf[9] = '\0'; buf[9] = '\0';
serverLog(LL_NOTICE, "version:%s", buf); serverLog(LL_NOTICE, "[rm] version=%s", buf);
if (memcmp(buf, "REDIS", 5) != 0) { if (memcmp(buf, "REDIS", 5) != 0) {
serverLog(LL_WARNING, "[rm] Wrong signature trying to load DB from file"); serverLog(LL_WARNING, "[rm] Wrong signature trying to load DB from file");
errno = EINVAL; errno = EINVAL;
@ -30,19 +30,19 @@ int rmLoadRioWithLoading(migrateObj *mobj) {
long long lru_idle = -1, lfu_freq = -1, expiretime = -1, now = mstime(); long long lru_idle = -1, lfu_freq = -1, expiretime = -1, now = mstime();
// syncReadLine(mobj->source_cc->fd, buf, 4, mobj->timeout); // syncReadLine(mobj->source_cc->fd, buf, 4, mobj->timeout);
// buf[4] = '\0'; // buf[4] = '\0';
// serverLog(LL_NOTICE, "buf:%s", buf); // serverLog(LL_NOTICE, "buf:%s", buf);
// type = atoi(buf); // type = atoi(buf);
// serverLog(LL_NOTICE, "type:%d", type); // serverLog(LL_NOTICE, "type:%d", type);
char type_buf[1]; char type_buf[2];
while (1) { while (1) {
sds key; sds key;
robj *val; robj *val;
if (read(mobj->source_cc->fd, &type_buf, 4) == 0) { if (read(mobj->source_cc->fd, &type_buf, 2) == 0) {
serverLog(LL_WARNING, "read type failed"); serverLog(LL_WARNING, "read type failed");
return C_ERR; return C_ERR;
} }
int type = type_buf[0] & 0xff; int type = type_buf[1] & 0xff;
serverLog(LL_NOTICE, "type: %d", type); serverLog(LL_NOTICE, "type: %d", type);
if (type == RDB_OPCODE_EXPIRETIME) { if (type == RDB_OPCODE_EXPIRETIME) {
@ -80,6 +80,7 @@ int rmLoadRioWithLoading(migrateObj *mobj) {
continue; continue;
} else if (type == RDB_OPCODE_AUX) { } else if (type == RDB_OPCODE_AUX) {
robj *auxkey, *auxval; robj *auxkey, *auxval;
serverLog(LL_NOTICE, "type=RDB_OPCODE_AUX");
if ((auxkey = rmLoadStringObject(mobj)) == NULL) { if ((auxkey = rmLoadStringObject(mobj)) == NULL) {
return NULL; return NULL;
} }
@ -146,6 +147,7 @@ void *rmGenericLoadStringObject(migrateObj *mobj, int flags, size_t *lenptr) {
int isencoded; int isencoded;
unsigned long long len; unsigned long long len;
len = rmLoadLen(mobj, &isencoded); len = rmLoadLen(mobj, &isencoded);
serverLog(LL_NOTICE, "len=%d, isencoded=%d", len, isencoded);
if (len == RDB_LENERR) return NULL; if (len == RDB_LENERR) return NULL;
if (isencoded) { if (isencoded) {
switch (len) { switch (len) {
@ -160,6 +162,34 @@ void *rmGenericLoadStringObject(migrateObj *mobj, int flags, size_t *lenptr) {
return NULL; return NULL;
} }
} }
if (plain || sds) {
void *buf = plain ? ztrymalloc(len) : sdstrynewlen(SDS_NOINIT, len);
if (!buf) {
serverLog(LL_WARNING, "rdbGenericLoadStringObject failed allocating %llu bytes", len);
return NULL;
}
if (lenptr) *lenptr = len;
if (len && read(mobj->source_cc->fd, buf, len) == 0) {
if (plain)
zfree(buf);
else
sdsfree(buf);
return NULL;
}
return buf;
} else {
robj *o = encode ? tryCreateStringObject(SDS_NOINIT, len) :
tryCreateRawStringObject(SDS_NOINIT, len);
if (!o) {
serverLog(LL_WARNING, "rdbGenericLoadStringObject failed allocating %llu bytes", len);
return NULL;
}
if (len && read(mobj->source_cc->fd, o->ptr, len)) {
decrRefCount(o);
return NULL;
}
return o;
}
} }
void *rmLoadIntegerObject(migrateObj *mobj, int enctype, int flags, size_t *lenptr) { void *rmLoadIntegerObject(migrateObj *mobj, int enctype, int flags, size_t *lenptr) {
@ -263,7 +293,7 @@ int rmRead(migrateObj *mobj, void *buf, size_t len) {
if (read(mobj->source_cc->fd, &buf, len) == 0) { if (read(mobj->source_cc->fd, &buf, len) == 0) {
return 0; return 0;
} }
buf = (char*)buf + len; buf = (char *)buf + len;
return 1; return 1;
} }

View File

@ -147,9 +147,7 @@ void readFullData() {
goto error; goto error;
} }
if (buf[0] == '-') { if (buf[0] == '-') {
serverLog(LL_WARNING, serverLog(LL_WARNING, "MASTER aborted replication with an error: %s", buf + 1);
"MASTER aborted replication with an error: %s",
buf + 1);
goto error; goto error;
} else if (buf[0] == '\0') { } else if (buf[0] == '\0') {
// mobj->repl_transfer_lastio = server.unixtime; // mobj->repl_transfer_lastio = server.unixtime;
@ -292,11 +290,7 @@ void syncDataWithRedis(int fd, void *user_data, int mask) {
mobj->repl_stat = REPL_STATE_RECEIVE_PSYNC_REPLY; mobj->repl_stat = REPL_STATE_RECEIVE_PSYNC_REPLY;
return; return;
} }
if (mobj->repl_stat != REPL_STATE_RECEIVE_PSYNC_REPLY) {
serverLog(LL_WARNING, "syncDataWithRedis(): state machine error, state should be RECEIVE_PSYNC but is %d",
mobj->repl_stat);
goto error;
}
if (mobj->repl_stat == REPL_STATE_RECEIVE_PSYNC_REPLY) { if (mobj->repl_stat == REPL_STATE_RECEIVE_PSYNC_REPLY) {
int psync_result = receiveDataFromRedis(); int psync_result = receiveDataFromRedis();
if (psync_result == PSYNC_WAIT_REPLY) if (psync_result == PSYNC_WAIT_REPLY)
@ -313,6 +307,7 @@ void syncDataWithRedis(int fd, void *user_data, int mask) {
// 接受全部数据 // 接受全部数据
if (mobj->repl_stat == REPL_STATE_FULL_SYNC) { if (mobj->repl_stat == REPL_STATE_FULL_SYNC) {
serverLog(LL_NOTICE, "begin receive full data"); serverLog(LL_NOTICE, "begin receive full data");
mobj->repl_stat = REPL_STATE_READING_FULL_DATA;
readFullData(); readFullData();
} }
if (mobj->repl_stat == REPL_STATE_CONTINUE_SYNC) { if (mobj->repl_stat == REPL_STATE_CONTINUE_SYNC) {

View File

@ -69,6 +69,7 @@ typedef enum
REPL_STATE_SEND_PSYNC, /* Send PSYNC */ REPL_STATE_SEND_PSYNC, /* Send PSYNC */
REPL_STATE_RECEIVE_PSYNC_REPLY, /* Wait for PSYNC reply */ REPL_STATE_RECEIVE_PSYNC_REPLY, /* Wait for PSYNC reply */
REPL_STATE_FULL_SYNC, REPL_STATE_FULL_SYNC,
REPL_STATE_READING_FULL_DATA,
REPL_STATE_CONTINUE_SYNC, REPL_STATE_CONTINUE_SYNC,
/* --- End of handshake states --- */ /* --- End of handshake states --- */
REPL_STATE_TRANSFER, /* Receiving .rdb from master */ REPL_STATE_TRANSFER, /* Receiving .rdb from master */