diff --git a/src/rdbLoad.c b/src/rdbLoad.c index 890c6ce..f99c081 100644 --- a/src/rdbLoad.c +++ b/src/rdbLoad.c @@ -13,7 +13,7 @@ int rmLoadRioWithLoading(migrateObj *mobj) { return C_ERR; } buf[9] = '\0'; - serverLog(LL_NOTICE, "version:%s", buf); + serverLog(LL_NOTICE, "[rm] version=%s", buf); if (memcmp(buf, "REDIS", 5) != 0) { serverLog(LL_WARNING, "[rm] Wrong signature trying to load DB from file"); errno = EINVAL; @@ -30,19 +30,19 @@ int rmLoadRioWithLoading(migrateObj *mobj) { long long lru_idle = -1, lfu_freq = -1, expiretime = -1, now = mstime(); // syncReadLine(mobj->source_cc->fd, buf, 4, mobj->timeout); - // buf[4] = '\0'; + // buf[4] = '\0'; // serverLog(LL_NOTICE, "buf:%s", buf); // type = atoi(buf); // serverLog(LL_NOTICE, "type:%d", type); - char type_buf[1]; + char type_buf[2]; while (1) { sds key; robj *val; - if (read(mobj->source_cc->fd, &type_buf, 4) == 0) { + if (read(mobj->source_cc->fd, &type_buf, 2) == 0) { serverLog(LL_WARNING, "read type failed"); return C_ERR; } - int type = type_buf[0] & 0xff; + int type = type_buf[1] & 0xff; serverLog(LL_NOTICE, "type: %d", type); if (type == RDB_OPCODE_EXPIRETIME) { @@ -80,6 +80,7 @@ int rmLoadRioWithLoading(migrateObj *mobj) { continue; } else if (type == RDB_OPCODE_AUX) { robj *auxkey, *auxval; + serverLog(LL_NOTICE, "type=RDB_OPCODE_AUX"); if ((auxkey = rmLoadStringObject(mobj)) == NULL) { return NULL; } @@ -146,6 +147,7 @@ void *rmGenericLoadStringObject(migrateObj *mobj, int flags, size_t *lenptr) { int isencoded; unsigned long long len; len = rmLoadLen(mobj, &isencoded); + serverLog(LL_NOTICE, "len=%d, isencoded=%d", len, isencoded); if (len == RDB_LENERR) return NULL; if (isencoded) { switch (len) { @@ -160,6 +162,34 @@ void *rmGenericLoadStringObject(migrateObj *mobj, int flags, size_t *lenptr) { return NULL; } } + if (plain || sds) { + void *buf = plain ? ztrymalloc(len) : sdstrynewlen(SDS_NOINIT, len); + if (!buf) { + serverLog(LL_WARNING, "rdbGenericLoadStringObject failed allocating %llu bytes", len); + return NULL; + } + if (lenptr) *lenptr = len; + if (len && read(mobj->source_cc->fd, buf, len) == 0) { + if (plain) + zfree(buf); + else + sdsfree(buf); + return NULL; + } + return buf; + } else { + robj *o = encode ? tryCreateStringObject(SDS_NOINIT, len) : + tryCreateRawStringObject(SDS_NOINIT, len); + if (!o) { + serverLog(LL_WARNING, "rdbGenericLoadStringObject failed allocating %llu bytes", len); + return NULL; + } + if (len && read(mobj->source_cc->fd, o->ptr, len)) { + decrRefCount(o); + return NULL; + } + return o; + } } void *rmLoadIntegerObject(migrateObj *mobj, int enctype, int flags, size_t *lenptr) { @@ -263,7 +293,7 @@ int rmRead(migrateObj *mobj, void *buf, size_t len) { if (read(mobj->source_cc->fd, &buf, len) == 0) { return 0; } - buf = (char*)buf + len; + buf = (char *)buf + len; return 1; } diff --git a/src/redis-migrate.c b/src/redis-migrate.c index 960e835..bbfce48 100644 --- a/src/redis-migrate.c +++ b/src/redis-migrate.c @@ -147,9 +147,7 @@ void readFullData() { goto error; } if (buf[0] == '-') { - serverLog(LL_WARNING, - "MASTER aborted replication with an error: %s", - buf + 1); + serverLog(LL_WARNING, "MASTER aborted replication with an error: %s", buf + 1); goto error; } else if (buf[0] == '\0') { // mobj->repl_transfer_lastio = server.unixtime; @@ -292,11 +290,7 @@ void syncDataWithRedis(int fd, void *user_data, int mask) { mobj->repl_stat = REPL_STATE_RECEIVE_PSYNC_REPLY; return; } - if (mobj->repl_stat != REPL_STATE_RECEIVE_PSYNC_REPLY) { - serverLog(LL_WARNING, "syncDataWithRedis(): state machine error, state should be RECEIVE_PSYNC but is %d", - mobj->repl_stat); - goto error; - } + if (mobj->repl_stat == REPL_STATE_RECEIVE_PSYNC_REPLY) { int psync_result = receiveDataFromRedis(); if (psync_result == PSYNC_WAIT_REPLY) @@ -313,6 +307,7 @@ void syncDataWithRedis(int fd, void *user_data, int mask) { // 接受全部数据 if (mobj->repl_stat == REPL_STATE_FULL_SYNC) { serverLog(LL_NOTICE, "begin receive full data"); + mobj->repl_stat = REPL_STATE_READING_FULL_DATA; readFullData(); } if (mobj->repl_stat == REPL_STATE_CONTINUE_SYNC) { diff --git a/src/redis-migrate.h b/src/redis-migrate.h index 832bd02..0bed185 100644 --- a/src/redis-migrate.h +++ b/src/redis-migrate.h @@ -69,6 +69,7 @@ typedef enum REPL_STATE_SEND_PSYNC, /* Send PSYNC */ REPL_STATE_RECEIVE_PSYNC_REPLY, /* Wait for PSYNC reply */ REPL_STATE_FULL_SYNC, + REPL_STATE_READING_FULL_DATA, REPL_STATE_CONTINUE_SYNC, /* --- End of handshake states --- */ REPL_STATE_TRANSFER, /* Receiving .rdb from master */