diff --git a/src/Makefile b/src/Makefile index 996357e..f948b2c 100644 --- a/src/Makefile +++ b/src/Makefile @@ -13,7 +13,7 @@ else SHOBJ_LDFLAGS ?= -bundle -undefined dynamic_lookup endif -FINAL_CFLAGS+= -I./hiredis $(SHOBJ_CFLAGS) +FINAL_CFLAGS+= $(SHOBJ_CFLAGS) .PHONY: all diff --git a/src/async.c b/src/async.c index e13c090..243e882 100644 --- a/src/async.c +++ b/src/async.c @@ -209,6 +209,13 @@ redisAsyncContext *redisAsyncConnectBind(const char *ip, int port, return redisAsyncConnectWithOptions(&options); } +redisAsyncContext *redisAsyncConnectBindWithTimeout(const char *ip, int port, const char *source_addr) { + redisOptions options = {0}; + REDIS_OPTIONS_SET_TCP(&options, ip, port); + options.endpoint.tcp.source_addr = source_addr; + return redisAsyncConnectWithOptions(&options); +} + redisAsyncContext *redisAsyncConnectBindWithReuse(const char *ip, int port, const char *source_addr) { redisOptions options = {0}; diff --git a/src/async.h b/src/async.h index 4c65203..b935744 100644 --- a/src/async.h +++ b/src/async.h @@ -115,6 +115,7 @@ typedef struct redisAsyncContext { redisAsyncContext *redisAsyncConnectWithOptions(const redisOptions *options); redisAsyncContext *redisAsyncConnect(const char *ip, int port); redisAsyncContext *redisAsyncConnectBind(const char *ip, int port, const char *source_addr); +redisAsyncContext *redisAsyncConnectBindWithTimeout(const char *ip, int port, const char *source_addr); redisAsyncContext *redisAsyncConnectBindWithReuse(const char *ip, int port, const char *source_addr); redisAsyncContext *redisAsyncConnectUnix(const char *path); diff --git a/src/redis-migrate.c b/src/redis-migrate.c index 242c61f..9243b30 100644 --- a/src/redis-migrate.c +++ b/src/redis-migrate.c @@ -1,73 +1,20 @@ #include #include +#include +#include +#include #include "redis-migrate.h" #include "hiredis.h" #include "async.h" -#include "ae.h" -static redisAsyncContext *context; -char *bind_source_addr; -static aeEventLoop *loop; +static redisContext *context; -int sync_state; - -RedisModuleCtx *moduleCtx; - -/* Return the UNIX time in microseconds */ -long long ustime(void) { - struct timeval tv; - long long ust; - - gettimeofday(&tv, NULL); - ust = ((long long) tv.tv_sec) * 1000000; - ust += tv.tv_usec; - return ust; +void *syncWithRedis(void *arg) { + RedisModuleCtx *ctx = arg; + RedisModule_Log(ctx, REDISMODULE_LOGLEVEL_NOTICE, "begin sync data"); + return NULL; } - -/* Return the UNIX time in milliseconds */ -mstime_t mstime(void) { - return ustime() / 1000; -} - -void syncWithMaster() { - -} - -void connectCallback(const redisAsyncContext *c, int status) { - -} - -void disconnectCallback(const redisAsyncContext *c, int status) { - -} - -int connectWithSourceRedis(RedisModuleCtx *ctx, robj *host, robj *p) { - int port = atoi(p->ptr); - context = redisAsyncConnect(host->ptr, port); - if (context->err) { - RedisModule_Log(ctx, REDISMODULE_LOGLEVEL_WARNING, "Could not connect to Redis at ip:%s,port:%s", - (char *) host->ptr, - (char *) p->ptr); - goto err; - } - loop = aeCreateEventLoop(64); - redisAeAttach(loop, context); - redisAsyncSetConnectCallback(context, connectCallback); - redisAsyncSetDisconnectCallback(context, disconnectCallback); - // send commands - return C_OK; - err: - redisAsyncFree((redisAsyncContext *) context); - context = NULL; - - if (loop != NULL) { - aeStop(loop); - loop = NULL; - } - return C_ERR; -} - /** * migrate data to current instance. * migrate host port begin-slot end-slot @@ -81,17 +28,35 @@ int rm_migrateCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { return RedisModule_WrongArity(ctx); } robj *host = (robj *) argv[1]; - robj *port = (robj *) argv[2]; + robj *p = (robj *) argv[2]; robj *begin_slot = (robj *) argv[3]; robj *end_slot = (robj *) argv[4]; RedisModule_Log(ctx, REDISMODULE_LOGLEVEL_NOTICE, "host:%s, port:%s, begin:%s, end:%s", (char *) host->ptr, - (char *) port->ptr, (char *) begin_slot->ptr, (char *) end_slot->ptr); + (char *) p->ptr, (char *) begin_slot->ptr, (char *) end_slot->ptr); if (context != NULL) { return RedisModule_ReplyWithError(ctx, "-ERR is migrating, please waiting"); } - if (connectWithSourceRedis(ctx, host, port) == C_ERR) { - return RedisModule_ReplyWithError(ctx, "-ERR Can't connect source"); + int port = atoi(p->ptr); + struct timeval timeout = {0, 500000}; // 0.5s + context = redisConnectWithTimeout(host->ptr, port, timeout); + if (context == NULL || context->err) { + RedisModule_Log(ctx, REDISMODULE_LOGLEVEL_WARNING, "Could not connect to Redis at ip:%s,port:%s", + (char *) host->ptr, + (char *) p->ptr); + redisFree((redisContext *) context); + context = NULL; + return RedisModule_ReplyWithError(ctx, "Can't connect source redis"); } + + pthread_t pthread; + int flag = pthread_create(&pthread, NULL, syncWithRedis, ctx); + if (flag == 0) { + RedisModule_Log(ctx, REDISMODULE_LOGLEVEL_WARNING, "Can't start thread"); + redisFree((redisContext *) context); + context = NULL; + return RedisModule_ReplyWithError(ctx, "Can't start thread"); + } + pthread_join(pthread, NULL); return RedisModule_ReplyWithSimpleString(ctx, "OK"); } @@ -112,6 +77,5 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) RedisModule_Log(ctx, REDISMODULE_LOGLEVEL_WARNING, "init rm.migrate failed"); return REDISMODULE_ERR; } - moduleCtx = ctx; return REDISMODULE_OK; } diff --git a/src/redis-migrate.h b/src/redis-migrate.h index 5b50e67..78cd768 100644 --- a/src/redis-migrate.h +++ b/src/redis-migrate.h @@ -38,8 +38,6 @@ typedef enum { REPL_STATE_CONNECTED, /* Connected to master */ } repl_state; -mstime_t mstime(void); - long long ustime(void); int rm_migrateCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc);