full data sync #8

Merged
zeekling merged 5 commits from sync into master 2022-07-10 08:50:20 +00:00
Showing only changes of commit a5551d371a - Show all commits

View File

@ -28,13 +28,21 @@ int rmLoadRioWithLoading(migrateObj *mobj) {
return C_ERR; return C_ERR;
} }
long long lru_idle = -1, lfu_freq = -1, expiretime = -1, now = mstime(); long long lru_idle = -1, lfu_freq = -1, expiretime = -1, now = mstime();
// syncReadLine(mobj->source_cc->fd, buf, 4, mobj->timeout);
// buf[4] = '\0';
// serverLog(LL_NOTICE, "buf:%s", buf);
// type = atoi(buf);
// serverLog(LL_NOTICE, "type:%d", type);
char type_buf[1];
while (1) { while (1) {
sds key; sds key;
robj *val; robj *val;
if ((type = rmLoadType(mobj)) == -1) { if (read(mobj->source_cc->fd, &type_buf, 4) == 0) {
serverLog(LL_WARNING, "read type failed"); serverLog(LL_WARNING, "read type failed");
return C_ERR; return C_ERR;
} }
int type = type_buf[0] & 0xff;
serverLog(LL_NOTICE, "type: %d", type); serverLog(LL_NOTICE, "type: %d", type);
if (type == RDB_OPCODE_EXPIRETIME) { if (type == RDB_OPCODE_EXPIRETIME) {
@ -244,7 +252,7 @@ uint64_t rmLoadLen(migrateObj *mobj, int *isencoded) {
int rmLoadType(migrateObj *mobj) { int rmLoadType(migrateObj *mobj) {
unsigned char type; unsigned char type;
if (rmRead(mobj, &type, 1) == -1) { if (rmRead(mobj, &type, 1) == 0) {
return -1; return -1;
} }
@ -253,9 +261,9 @@ int rmLoadType(migrateObj *mobj) {
int rmRead(migrateObj *mobj, void *buf, size_t len) { int rmRead(migrateObj *mobj, void *buf, size_t len) {
if (read(mobj->source_cc->fd, &buf, len) == 0) { if (read(mobj->source_cc->fd, &buf, len) == 0) {
return -1; return 0;
} }
// buf = (char*)buf + len; buf = (char*)buf + len;
return 1; return 1;
} }