diff --git a/src/hiredis.c b/src/hiredis.c index e2a483f..dd21d60 100644 --- a/src/hiredis.c +++ b/src/hiredis.c @@ -1028,6 +1028,20 @@ static int redisNextInBandReplyFromReader(redisContext *c, void **reply) { return REDIS_OK; } +int redisFlush(redisContext *c) { + int wdone = 0; + void *aux = NULL; + /* Try to read pending replies */ + if (redisNextInBandReplyFromReader(c,&aux) == REDIS_ERR) + return REDIS_ERR; + /* Write until done */ + do { + if (redisBufferWrite(c,&wdone) == REDIS_ERR) + return REDIS_ERR; + } while (!wdone); + return REDIS_OK; +} + int redisGetReply(redisContext *c, void **reply) { int wdone = 0; void *aux = NULL; diff --git a/src/hiredis.h b/src/hiredis.h index 332db5b..057fafd 100644 --- a/src/hiredis.h +++ b/src/hiredis.h @@ -315,6 +315,7 @@ redisFD redisFreeKeepFd(redisContext *c); int redisBufferRead(redisContext *c); int redisBufferWrite(redisContext *c, int *done); +int redisFlush(redisContext *c); /* In a blocking context, this function first checks if there are unconsumed * replies to return and returns one if so. Otherwise, it flushes the output * buffer to the socket and reads until it has a reply. In a non-blocking diff --git a/src/redis-migrate.c b/src/redis-migrate.c index 9da53c8..572ccd0 100644 --- a/src/redis-migrate.c +++ b/src/redis-migrate.c @@ -16,6 +16,7 @@ migrateObj *createMigrateObject(robj *host, int port, int begin_slot, int end_sl m->begin_slot = begin_slot; m->end_slot = end_slot; m->repl_stat = REPL_STATE_NONE; + m->isCache = 0; return m; } @@ -25,44 +26,82 @@ void freeMigrateObj(migrateObj *m) { mobj = NULL; } -int sendReplCommand(RedisModuleCtx *ctx, char *format, ...) { - va_list ap; - va_start(ap, format); - redisReply *reply = redisvCommand(mobj->source_cc, format, ap); - va_end(ap); - if (reply == NULL || reply->type == REDIS_REPLY_ERROR) { - RedisModule_Log(ctx, REDISMODULE_LOGLEVEL_WARNING, "send %s failed ip:%s,port:%d, error:%s", - format, mobj->host, mobj->port, reply->str); - freeReplyObject(reply); +int sendSyncCommand(RedisModuleCtx *ctx) { + + if (!mobj->isCache) { + mobj->isCache = 1; + mobj->psync_replid = "?"; + memcpy(mobj->psync_offset, "-1", 3); + } + if (redisAppendCommand(mobj->source_cc, "PSYNC %s %s", mobj->psync_replid, mobj->psync_offset) != REDIS_OK) { + RedisModule_Log(ctx, REDISMODULE_LOGLEVEL_WARNING, + "append PSYNC %s %s failed ip:%s,port:%d, ", + mobj->psync_replid, mobj->psync_offset, mobj->host, mobj->port); return 0; } - RedisModule_Log(ctx, REDISMODULE_LOGLEVEL_NOTICE, "%s %s", format, reply->str); - freeReplyObject(reply); + if (redisFlush(mobj->source_cc) != REDIS_OK) { + RedisModule_Log(ctx, REDISMODULE_LOGLEVEL_WARNING, + "send PSYNC %s %s failed ip:%s,port:%d, ", + mobj->psync_replid, mobj->psync_offset, mobj->host, mobj->port); + return 0; + } + RedisModule_Log(ctx, REDISMODULE_LOGLEVEL_NOTICE, "PSYNC %s %s ", + mobj->psync_replid, mobj->psync_offset); return 1; } void *syncWithRedis(void *arg) { RedisModuleCtx *ctx = arg; + redisReply *reply; RedisModule_Log(ctx, REDISMODULE_LOGLEVEL_NOTICE, "begin sync data"); - if (!sendReplCommand(ctx, "PING")) { + reply = redisCommand(mobj->source_cc, "PING"); + if (reply->type == REDIS_REPLY_ERROR) { + RedisModule_Log(ctx, REDISMODULE_LOGLEVEL_WARNING, "send PING failed ip:%s,port:%d, error:%s", + mobj->host, mobj->port, reply->str); goto error; } + RedisModule_Log(ctx, REDISMODULE_LOGLEVEL_NOTICE, "PING "); + freeReplyObject(reply); //todo auth with master sds portstr = sdsfromlonglong(mobj->port); - if (!sendReplCommand(ctx, "REPLCONF listening-port %s", portstr)) { + reply = redisCommand(mobj->source_cc, "REPLCONF listening-port %s", portstr); + if (reply->type == REDIS_REPLY_ERROR) { sdsfree(portstr); + RedisModule_Log(ctx, REDISMODULE_LOGLEVEL_WARNING, "REPLCONF listening-port %s failed ip:%s,port:%d, error:%s", + portstr, mobj->host, mobj->port, reply->str); goto error; } + RedisModule_Log(ctx, REDISMODULE_LOGLEVEL_NOTICE, "REPLCONF listening-port %s ", portstr); sdsfree(portstr); - if (!sendReplCommand(ctx, "REPLCONF ip-address %s", mobj->host)) { + freeReplyObject(reply); + reply = redisCommand(mobj->source_cc, "REPLCONF ip-address %s", mobj->host); + if (reply->type == REDIS_REPLY_ERROR) { + RedisModule_Log(ctx, REDISMODULE_LOGLEVEL_WARNING, "REPLCONF ip-address %s failed ip:%s,port:%d, error:%s", + mobj->host, mobj->host, mobj->port, reply->str); goto error; } - if (!sendReplCommand(ctx, "REPLCONF %s %s %s %s", "capa", "eof", "capa", "psync2")) { + RedisModule_Log(ctx, REDISMODULE_LOGLEVEL_NOTICE, "REPLCONF ip-address %s ", mobj->host); + freeReplyObject(reply); + reply = redisCommand(mobj->source_cc, "REPLCONF %s %s %s %s", "capa", "eof", "capa", "psync2"); + if (reply->type == REDIS_REPLY_ERROR) { + RedisModule_Log(ctx, REDISMODULE_LOGLEVEL_WARNING, + "REPLCONF capa eof capa psync2 failed ip:%s,port:%d, error:%s", + mobj->host, mobj->port, reply->str); + goto error; + } + RedisModule_Log(ctx, REDISMODULE_LOGLEVEL_NOTICE, "REPLCONF %s %s %s %s", + "capa", "eof", "capa", "psync2"); + freeReplyObject(reply); + if (!sendSyncCommand(ctx)) { + RedisModule_Log(ctx, REDISMODULE_LOGLEVEL_WARNING, + "send PSYNC %s %s failed ip:%s,port:%d, ", + mobj->psync_replid, mobj->psync_offset, mobj->host, mobj->port); goto error; } return NULL; error: + freeReplyObject(reply); freeMigrateObj(mobj); return NULL; } diff --git a/src/redis-migrate.h b/src/redis-migrate.h index 57f786f..ede2ef3 100644 --- a/src/redis-migrate.h +++ b/src/redis-migrate.h @@ -35,6 +35,7 @@ typedef struct migrateObject { int begin_slot; int end_slot; char *psync_replid; + int isCache; char psync_offset[32]; } migrateObj; @@ -64,7 +65,7 @@ migrateObj *createMigrateObject(robj *host, int port, int begin_slot, int end_sl void freeMigrateObj(migrateObj *m); -int sendReplCommand(RedisModuleCtx *ctx, char *format, ...); +int sendSyncCommand(RedisModuleCtx *ctx); void *syncWithRedis(void *arg);