diff --git a/.drone.yml b/.drone.yml index aea69f8..7bffb79 100644 --- a/.drone.yml +++ b/.drone.yml @@ -1,8 +1,10 @@ kind: pipeline -type: exec name: default +type: docker + steps: - name: build + image: gcc commands: - make clean && make trigger: diff --git a/src/async.c b/src/async.c index 243e882..e13c090 100644 --- a/src/async.c +++ b/src/async.c @@ -209,13 +209,6 @@ redisAsyncContext *redisAsyncConnectBind(const char *ip, int port, return redisAsyncConnectWithOptions(&options); } -redisAsyncContext *redisAsyncConnectBindWithTimeout(const char *ip, int port, const char *source_addr) { - redisOptions options = {0}; - REDIS_OPTIONS_SET_TCP(&options, ip, port); - options.endpoint.tcp.source_addr = source_addr; - return redisAsyncConnectWithOptions(&options); -} - redisAsyncContext *redisAsyncConnectBindWithReuse(const char *ip, int port, const char *source_addr) { redisOptions options = {0}; diff --git a/src/async.h b/src/async.h index b935744..4c65203 100644 --- a/src/async.h +++ b/src/async.h @@ -115,7 +115,6 @@ typedef struct redisAsyncContext { redisAsyncContext *redisAsyncConnectWithOptions(const redisOptions *options); redisAsyncContext *redisAsyncConnect(const char *ip, int port); redisAsyncContext *redisAsyncConnectBind(const char *ip, int port, const char *source_addr); -redisAsyncContext *redisAsyncConnectBindWithTimeout(const char *ip, int port, const char *source_addr); redisAsyncContext *redisAsyncConnectBindWithReuse(const char *ip, int port, const char *source_addr); redisAsyncContext *redisAsyncConnectUnix(const char *path); diff --git a/src/redis-migrate.c b/src/redis-migrate.c index 9243b30..9da53c8 100644 --- a/src/redis-migrate.c +++ b/src/redis-migrate.c @@ -2,19 +2,83 @@ #include #include #include -#include #include #include "redis-migrate.h" #include "hiredis.h" -#include "async.h" -static redisContext *context; +static migrateObj *mobj; + +migrateObj *createMigrateObject(robj *host, int port, int begin_slot, int end_slot) { + migrateObj *m; + m = hi_malloc(sizeof(*m)); + m->host = host->ptr; + m->port = port; + m->begin_slot = begin_slot; + m->end_slot = end_slot; + m->repl_stat = REPL_STATE_NONE; + return m; +} + +void freeMigrateObj(migrateObj *m) { + redisFree(m->source_cc); + hi_free(m); + mobj = NULL; +} + +int sendReplCommand(RedisModuleCtx *ctx, char *format, ...) { + va_list ap; + va_start(ap, format); + redisReply *reply = redisvCommand(mobj->source_cc, format, ap); + va_end(ap); + if (reply == NULL || reply->type == REDIS_REPLY_ERROR) { + RedisModule_Log(ctx, REDISMODULE_LOGLEVEL_WARNING, "send %s failed ip:%s,port:%d, error:%s", + format, mobj->host, mobj->port, reply->str); + freeReplyObject(reply); + return 0; + } + RedisModule_Log(ctx, REDISMODULE_LOGLEVEL_NOTICE, "%s %s", format, reply->str); + freeReplyObject(reply); + return 1; +} void *syncWithRedis(void *arg) { RedisModuleCtx *ctx = arg; RedisModule_Log(ctx, REDISMODULE_LOGLEVEL_NOTICE, "begin sync data"); + if (!sendReplCommand(ctx, "PING")) { + goto error; + } + //todo auth with master + sds portstr = sdsfromlonglong(mobj->port); + if (!sendReplCommand(ctx, "REPLCONF listening-port %s", portstr)) { + sdsfree(portstr); + goto error; + } + sdsfree(portstr); + if (!sendReplCommand(ctx, "REPLCONF ip-address %s", mobj->host)) { + goto error; + } + if (!sendReplCommand(ctx, "REPLCONF %s %s %s %s", "capa", "eof", "capa", "psync2")) { + goto error; + } + + return NULL; + error: + freeMigrateObj(mobj); return NULL; } + +int connectRedis(RedisModuleCtx *ctx) { + pthread_t pthread; + int flag = pthread_create(&pthread, NULL, syncWithRedis, ctx); + if (flag != 0) { + RedisModule_Log(ctx, REDISMODULE_LOGLEVEL_WARNING, "Can't start thread"); + freeMigrateObj(mobj); + return RedisModule_ReplyWithError(ctx, "Can't start thread"); + } + pthread_join(pthread, NULL); + return RedisModule_ReplyWithSimpleString(ctx, "OK"); +} + /** * migrate data to current instance. * migrate host port begin-slot end-slot @@ -28,36 +92,28 @@ int rm_migrateCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { return RedisModule_WrongArity(ctx); } robj *host = (robj *) argv[1]; - robj *p = (robj *) argv[2]; + robj *port = (robj *) argv[2]; robj *begin_slot = (robj *) argv[3]; robj *end_slot = (robj *) argv[4]; RedisModule_Log(ctx, REDISMODULE_LOGLEVEL_NOTICE, "host:%s, port:%s, begin:%s, end:%s", (char *) host->ptr, - (char *) p->ptr, (char *) begin_slot->ptr, (char *) end_slot->ptr); - if (context != NULL) { + (char *) port->ptr, (char *) begin_slot->ptr, (char *) end_slot->ptr); + if (mobj != NULL) { return RedisModule_ReplyWithError(ctx, "-ERR is migrating, please waiting"); } - int port = atoi(p->ptr); - struct timeval timeout = {0, 500000}; // 0.5s - context = redisConnectWithTimeout(host->ptr, port, timeout); - if (context == NULL || context->err) { - RedisModule_Log(ctx, REDISMODULE_LOGLEVEL_WARNING, "Could not connect to Redis at ip:%s,port:%s", - (char *) host->ptr, - (char *) p->ptr); - redisFree((redisContext *) context); - context = NULL; + mobj = createMigrateObject(host, atoi(port->ptr), atoi(begin_slot->ptr), atoi(end_slot->ptr)); + struct timeval timeout = {1, 500000}; // 1.5s + redisOptions options = {0}; + REDIS_OPTIONS_SET_TCP(&options, (const char *) mobj->host, mobj->port); + options.connect_timeout = &timeout; + mobj->source_cc = redisConnectWithOptions(&options); + if (mobj->source_cc == NULL || mobj->source_cc->err) { + RedisModule_Log(ctx, REDISMODULE_LOGLEVEL_WARNING, "Could not connect to Redis at ip:%s,port:%d, error:%s", + mobj->host, mobj->port, mobj->source_cc->errstr); + freeMigrateObj(mobj); return RedisModule_ReplyWithError(ctx, "Can't connect source redis"); } + return connectRedis(ctx); - pthread_t pthread; - int flag = pthread_create(&pthread, NULL, syncWithRedis, ctx); - if (flag == 0) { - RedisModule_Log(ctx, REDISMODULE_LOGLEVEL_WARNING, "Can't start thread"); - redisFree((redisContext *) context); - context = NULL; - return RedisModule_ReplyWithError(ctx, "Can't start thread"); - } - pthread_join(pthread, NULL); - return RedisModule_ReplyWithSimpleString(ctx, "OK"); } /* This function must be present on each Redis module. It is used in order to @@ -71,11 +127,13 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) if (flag == REDISMODULE_ERR) { return REDISMODULE_ERR; } - RedisModule_Log(ctx, REDISMODULE_LOGLEVEL_NOTICE, "init %s success", MODULE_NAME); + RedisModule_SetClusterFlags(ctx, REDISMODULE_CLUSTER_FLAG_NO_REDIRECTION); + RedisModule_Log(ctx, REDISMODULE_LOGLEVEL_NOTICE, "begin init commands of %s", MODULE_NAME); flag = RedisModule_CreateCommand(ctx, "rm.migrate", rm_migrateCommand, "write deny-oom admin", 1, 1, 0); if (flag == REDISMODULE_ERR) { RedisModule_Log(ctx, REDISMODULE_LOGLEVEL_WARNING, "init rm.migrate failed"); return REDISMODULE_ERR; } + RedisModule_Log(ctx, REDISMODULE_LOGLEVEL_NOTICE, "init %s success", MODULE_NAME); return REDISMODULE_OK; } diff --git a/src/redis-migrate.h b/src/redis-migrate.h index 78cd768..57f786f 100644 --- a/src/redis-migrate.h +++ b/src/redis-migrate.h @@ -3,6 +3,9 @@ #define REDIS_MIGRATE_REDIS_MIGRATE_H #include "redismodule.h" +#include "ae.h" +#include "sds.h" +#include "sdscompat.h" #define MODULE_NAME "redis-migrate" #define REDIS_MIGRATE_VERSION 1 @@ -10,6 +13,9 @@ #define C_ERR -1 #define C_OK 1 +/* Anti-warning macro... */ +#define UNUSED(V) ((void) V) + typedef struct redisObject { unsigned type: 4; unsigned encoding: 4; @@ -20,6 +26,18 @@ typedef struct redisObject { void *ptr; } robj; +typedef struct migrateObject { + char *address; + int repl_stat; + redisContext *source_cc; + char *host; + int port; + int begin_slot; + int end_slot; + char *psync_replid; + char psync_offset[32]; +} migrateObj; + typedef enum { REPL_STATE_NONE = 0, /* No active replication */ REPL_STATE_CONNECT, /* Must connect to master */ @@ -36,10 +54,22 @@ typedef enum { /* --- End of handshake states --- */ REPL_STATE_TRANSFER, /* Receiving .rdb from master */ REPL_STATE_CONNECTED, /* Connected to master */ + STATE_CONNECT_ERROR, + STATE_DISCONNECT } repl_state; long long ustime(void); +migrateObj *createMigrateObject(robj *host, int port, int begin_slot, int end_slot); + +void freeMigrateObj(migrateObj *m); + +int sendReplCommand(RedisModuleCtx *ctx, char *format, ...); + +void *syncWithRedis(void *arg); + +int connectRedis(RedisModuleCtx *ctx) ; + int rm_migrateCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc); int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc);