diff --git a/src/rdbLoad.c b/src/rdbLoad.c index 10e1ac5..7eea071 100644 --- a/src/rdbLoad.c +++ b/src/rdbLoad.c @@ -4,7 +4,7 @@ #include #include -int rdbLoadRioWithLoading(migrateObj *mobj) { +int rmLoadRioWithLoading(migrateObj *mobj) { char buf[1024]; int error; if (syncReadLine(mobj->source_cc->fd, buf, 9, mobj->timeout) == -1) { @@ -13,6 +13,7 @@ int rdbLoadRioWithLoading(migrateObj *mobj) { return C_ERR; } buf[9] = '\0'; + serverLog(LL_NOTICE, "version:%s", buf); if (memcmp(buf, "REDIS", 5) != 0) { serverLog(LL_WARNING, "[rm] Wrong signature trying to load DB from file"); errno = EINVAL; @@ -34,6 +35,7 @@ int rdbLoadRioWithLoading(migrateObj *mobj) { serverLog(LL_WARNING, "read type failed"); return C_ERR; } + serverLog(LL_NOTICE, "type: %d", type); if (type == RDB_OPCODE_EXPIRETIME) { expiretime = rmLoadTime(mobj); @@ -109,9 +111,22 @@ int rdbLoadRioWithLoading(migrateObj *mobj) { if ((key = rmGenericLoadStringObject(mobj, RDB_LOAD_SDS, NULL)) == NULL) { return NULL; } + val = rmLoadObject(type, mobj, key, &error); + + if (val == NULL) { + } } } +robj *rmLoadObject(int rdbtype, migrateObj *mobj, sds key, int *error) { + int dbid = 0; + robj *o = NULL, *ele, *dec; + uint64_t len; + unsigned int i; + + return o; +} + robj *rmLoadStringObject(migrateObj *mobj) { return rmGenericLoadStringObject(mobj, RDB_LOAD_NONE, NULL); } @@ -229,13 +244,21 @@ uint64_t rmLoadLen(migrateObj *mobj, int *isencoded) { int rmLoadType(migrateObj *mobj) { unsigned char type; - if (read(mobj->source_cc->fd, &type, 1) == 0) { + if (rmRead(mobj, &type, 1) == -1) { return -1; } return type; } +int rmRead(migrateObj *mobj, void *buf, size_t len) { + if (read(mobj->source_cc->fd, &buf, len) == 0) { + return -1; + } + // buf = (char*)buf + len; + return 1; +} + int rmLoadLenByRef(migrateObj *mobj, int *isencoded, uint64_t *lenptr) { unsigned char buf[2]; int type; diff --git a/src/rdbLoad.h b/src/rdbLoad.h index c61f279..159fccc 100644 --- a/src/rdbLoad.h +++ b/src/rdbLoad.h @@ -45,6 +45,8 @@ extern const char *SDS_NOINIT; #define LLONG_MAX __LONG_LONG_MAX__ +int rmRead(migrateObj *mobj, void *buf, size_t len); + int rmLoadRioWithLoading(migrateObj *mobj); int rmLoadType(migrateObj *mobj); @@ -67,4 +69,6 @@ void *rmLoadIntegerObject(migrateObj *mobj, int enctype, int flags, size_t *lenp void *rmLoadLzfStringObject(migrateObj *mobj, int flags, size_t *lenptr); +robj *rmLoadObject(int rdbtype, migrateObj *mobj, sds key, int *error); + #endif diff --git a/src/redis-migrate.c b/src/redis-migrate.c index bf8316b..960e835 100644 --- a/src/redis-migrate.c +++ b/src/redis-migrate.c @@ -176,7 +176,7 @@ void readFullData() { (long long)mobj->repl_transfer_size); } } - int flag = rdbLoadRioWithLoading(mobj); + int flag = rmLoadRioWithLoading(mobj); return; error: @@ -297,16 +297,28 @@ void syncDataWithRedis(int fd, void *user_data, int mask) { mobj->repl_stat); goto error; } - int psync_result = receiveDataFromRedis(); - if (psync_result == PSYNC_WAIT_REPLY) - return; - if (psync_result == PSYNC_TRY_LATER) - goto error; - if (psync_result == PSYNC_CONTINUE) { + if (mobj->repl_stat == REPL_STATE_RECEIVE_PSYNC_REPLY) { + int psync_result = receiveDataFromRedis(); + if (psync_result == PSYNC_WAIT_REPLY) + return; + if (psync_result == PSYNC_TRY_LATER) + goto error; + if (psync_result == PSYNC_CONTINUE) { + mobj->repl_stat = REPL_STATE_CONTINUE_SYNC; + } else { + mobj->repl_stat = REPL_STATE_FULL_SYNC; + } } + // 接受全部数据 - serverLog(LL_NOTICE, "begin receive full data"); - readFullData(); + if (mobj->repl_stat == REPL_STATE_FULL_SYNC) { + serverLog(LL_NOTICE, "begin receive full data"); + readFullData(); + } + if (mobj->repl_stat == REPL_STATE_CONTINUE_SYNC) { + serverLog(LL_NOTICE, "begin receive continue data"); + } + return; error: if (err != NULL) { diff --git a/src/redis-migrate.h b/src/redis-migrate.h index 43958d2..832bd02 100644 --- a/src/redis-migrate.h +++ b/src/redis-migrate.h @@ -68,6 +68,8 @@ typedef enum REPL_STATE_RECEIVE_CAPA_REPLY, /* Wait for REPLCONF reply */ REPL_STATE_SEND_PSYNC, /* Send PSYNC */ REPL_STATE_RECEIVE_PSYNC_REPLY, /* Wait for PSYNC reply */ + REPL_STATE_FULL_SYNC, + REPL_STATE_CONTINUE_SYNC, /* --- End of handshake states --- */ REPL_STATE_TRANSFER, /* Receiving .rdb from master */ REPL_STATE_CONNECTED, /* Connected to master */