This commit is contained in:
LingZhaoHui 2022-07-09 15:44:53 +08:00
parent f8c1d0f251
commit fb5d3a725e
4 changed files with 52 additions and 11 deletions

View File

@ -4,7 +4,7 @@
#include <errno.h> #include <errno.h>
#include <unistd.h> #include <unistd.h>
int rdbLoadRioWithLoading(migrateObj *mobj) { int rmLoadRioWithLoading(migrateObj *mobj) {
char buf[1024]; char buf[1024];
int error; int error;
if (syncReadLine(mobj->source_cc->fd, buf, 9, mobj->timeout) == -1) { if (syncReadLine(mobj->source_cc->fd, buf, 9, mobj->timeout) == -1) {
@ -13,6 +13,7 @@ int rdbLoadRioWithLoading(migrateObj *mobj) {
return C_ERR; return C_ERR;
} }
buf[9] = '\0'; buf[9] = '\0';
serverLog(LL_NOTICE, "version:%s", buf);
if (memcmp(buf, "REDIS", 5) != 0) { if (memcmp(buf, "REDIS", 5) != 0) {
serverLog(LL_WARNING, "[rm] Wrong signature trying to load DB from file"); serverLog(LL_WARNING, "[rm] Wrong signature trying to load DB from file");
errno = EINVAL; errno = EINVAL;
@ -34,6 +35,7 @@ int rdbLoadRioWithLoading(migrateObj *mobj) {
serverLog(LL_WARNING, "read type failed"); serverLog(LL_WARNING, "read type failed");
return C_ERR; return C_ERR;
} }
serverLog(LL_NOTICE, "type: %d", type);
if (type == RDB_OPCODE_EXPIRETIME) { if (type == RDB_OPCODE_EXPIRETIME) {
expiretime = rmLoadTime(mobj); expiretime = rmLoadTime(mobj);
@ -109,9 +111,22 @@ int rdbLoadRioWithLoading(migrateObj *mobj) {
if ((key = rmGenericLoadStringObject(mobj, RDB_LOAD_SDS, NULL)) == NULL) { if ((key = rmGenericLoadStringObject(mobj, RDB_LOAD_SDS, NULL)) == NULL) {
return NULL; return NULL;
} }
val = rmLoadObject(type, mobj, key, &error);
if (val == NULL) {
}
} }
} }
robj *rmLoadObject(int rdbtype, migrateObj *mobj, sds key, int *error) {
int dbid = 0;
robj *o = NULL, *ele, *dec;
uint64_t len;
unsigned int i;
return o;
}
robj *rmLoadStringObject(migrateObj *mobj) { robj *rmLoadStringObject(migrateObj *mobj) {
return rmGenericLoadStringObject(mobj, RDB_LOAD_NONE, NULL); return rmGenericLoadStringObject(mobj, RDB_LOAD_NONE, NULL);
} }
@ -229,13 +244,21 @@ uint64_t rmLoadLen(migrateObj *mobj, int *isencoded) {
int rmLoadType(migrateObj *mobj) { int rmLoadType(migrateObj *mobj) {
unsigned char type; unsigned char type;
if (read(mobj->source_cc->fd, &type, 1) == 0) { if (rmRead(mobj, &type, 1) == -1) {
return -1; return -1;
} }
return type; return type;
} }
int rmRead(migrateObj *mobj, void *buf, size_t len) {
if (read(mobj->source_cc->fd, &buf, len) == 0) {
return -1;
}
// buf = (char*)buf + len;
return 1;
}
int rmLoadLenByRef(migrateObj *mobj, int *isencoded, uint64_t *lenptr) { int rmLoadLenByRef(migrateObj *mobj, int *isencoded, uint64_t *lenptr) {
unsigned char buf[2]; unsigned char buf[2];
int type; int type;

View File

@ -45,6 +45,8 @@ extern const char *SDS_NOINIT;
#define LLONG_MAX __LONG_LONG_MAX__ #define LLONG_MAX __LONG_LONG_MAX__
int rmRead(migrateObj *mobj, void *buf, size_t len);
int rmLoadRioWithLoading(migrateObj *mobj); int rmLoadRioWithLoading(migrateObj *mobj);
int rmLoadType(migrateObj *mobj); int rmLoadType(migrateObj *mobj);
@ -67,4 +69,6 @@ void *rmLoadIntegerObject(migrateObj *mobj, int enctype, int flags, size_t *lenp
void *rmLoadLzfStringObject(migrateObj *mobj, int flags, size_t *lenptr); void *rmLoadLzfStringObject(migrateObj *mobj, int flags, size_t *lenptr);
robj *rmLoadObject(int rdbtype, migrateObj *mobj, sds key, int *error);
#endif #endif

View File

@ -176,7 +176,7 @@ void readFullData() {
(long long)mobj->repl_transfer_size); (long long)mobj->repl_transfer_size);
} }
} }
int flag = rdbLoadRioWithLoading(mobj); int flag = rmLoadRioWithLoading(mobj);
return; return;
error: error:
@ -297,16 +297,28 @@ void syncDataWithRedis(int fd, void *user_data, int mask) {
mobj->repl_stat); mobj->repl_stat);
goto error; goto error;
} }
int psync_result = receiveDataFromRedis(); if (mobj->repl_stat == REPL_STATE_RECEIVE_PSYNC_REPLY) {
if (psync_result == PSYNC_WAIT_REPLY) int psync_result = receiveDataFromRedis();
return; if (psync_result == PSYNC_WAIT_REPLY)
if (psync_result == PSYNC_TRY_LATER) return;
goto error; if (psync_result == PSYNC_TRY_LATER)
if (psync_result == PSYNC_CONTINUE) { goto error;
if (psync_result == PSYNC_CONTINUE) {
mobj->repl_stat = REPL_STATE_CONTINUE_SYNC;
} else {
mobj->repl_stat = REPL_STATE_FULL_SYNC;
}
} }
// 接受全部数据 // 接受全部数据
serverLog(LL_NOTICE, "begin receive full data"); if (mobj->repl_stat == REPL_STATE_FULL_SYNC) {
readFullData(); serverLog(LL_NOTICE, "begin receive full data");
readFullData();
}
if (mobj->repl_stat == REPL_STATE_CONTINUE_SYNC) {
serverLog(LL_NOTICE, "begin receive continue data");
}
return; return;
error: error:
if (err != NULL) { if (err != NULL) {

View File

@ -68,6 +68,8 @@ typedef enum
REPL_STATE_RECEIVE_CAPA_REPLY, /* Wait for REPLCONF reply */ REPL_STATE_RECEIVE_CAPA_REPLY, /* Wait for REPLCONF reply */
REPL_STATE_SEND_PSYNC, /* Send PSYNC */ REPL_STATE_SEND_PSYNC, /* Send PSYNC */
REPL_STATE_RECEIVE_PSYNC_REPLY, /* Wait for PSYNC reply */ REPL_STATE_RECEIVE_PSYNC_REPLY, /* Wait for PSYNC reply */
REPL_STATE_FULL_SYNC,
REPL_STATE_CONTINUE_SYNC,
/* --- End of handshake states --- */ /* --- End of handshake states --- */
REPL_STATE_TRANSFER, /* Receiving .rdb from master */ REPL_STATE_TRANSFER, /* Receiving .rdb from master */
REPL_STATE_CONNECTED, /* Connected to master */ REPL_STATE_CONNECTED, /* Connected to master */