From c532b742a938d472509e397999947b967e73ad34 Mon Sep 17 00:00:00 2001 From: zeekling Date: Sun, 5 Jun 2022 22:53:47 +0800 Subject: [PATCH 1/5] =?UTF-8?q?=E8=BF=9E=E6=8E=A5=E5=B7=B2=E7=BB=8F?= =?UTF-8?q?=E5=8F=AF=E4=BB=A5=EF=BC=8C=E7=AD=89=E5=BE=85=E5=AE=9E=E7=8E=B0?= =?UTF-8?q?=E5=90=8C=E6=AD=A5=E6=95=B0=E6=8D=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/async.c | 7 --- src/async.h | 1 - src/redis-migrate.c | 110 +++++++++++++++++++++++++++++++++----------- src/redis-migrate.h | 29 ++++++++++++ 4 files changed, 113 insertions(+), 34 deletions(-) diff --git a/src/async.c b/src/async.c index 243e882..e13c090 100644 --- a/src/async.c +++ b/src/async.c @@ -209,13 +209,6 @@ redisAsyncContext *redisAsyncConnectBind(const char *ip, int port, return redisAsyncConnectWithOptions(&options); } -redisAsyncContext *redisAsyncConnectBindWithTimeout(const char *ip, int port, const char *source_addr) { - redisOptions options = {0}; - REDIS_OPTIONS_SET_TCP(&options, ip, port); - options.endpoint.tcp.source_addr = source_addr; - return redisAsyncConnectWithOptions(&options); -} - redisAsyncContext *redisAsyncConnectBindWithReuse(const char *ip, int port, const char *source_addr) { redisOptions options = {0}; diff --git a/src/async.h b/src/async.h index b935744..4c65203 100644 --- a/src/async.h +++ b/src/async.h @@ -115,7 +115,6 @@ typedef struct redisAsyncContext { redisAsyncContext *redisAsyncConnectWithOptions(const redisOptions *options); redisAsyncContext *redisAsyncConnect(const char *ip, int port); redisAsyncContext *redisAsyncConnectBind(const char *ip, int port, const char *source_addr); -redisAsyncContext *redisAsyncConnectBindWithTimeout(const char *ip, int port, const char *source_addr); redisAsyncContext *redisAsyncConnectBindWithReuse(const char *ip, int port, const char *source_addr); redisAsyncContext *redisAsyncConnectUnix(const char *path); diff --git a/src/redis-migrate.c b/src/redis-migrate.c index 9243b30..9da53c8 100644 --- a/src/redis-migrate.c +++ b/src/redis-migrate.c @@ -2,19 +2,83 @@ #include #include #include -#include #include #include "redis-migrate.h" #include "hiredis.h" -#include "async.h" -static redisContext *context; +static migrateObj *mobj; + +migrateObj *createMigrateObject(robj *host, int port, int begin_slot, int end_slot) { + migrateObj *m; + m = hi_malloc(sizeof(*m)); + m->host = host->ptr; + m->port = port; + m->begin_slot = begin_slot; + m->end_slot = end_slot; + m->repl_stat = REPL_STATE_NONE; + return m; +} + +void freeMigrateObj(migrateObj *m) { + redisFree(m->source_cc); + hi_free(m); + mobj = NULL; +} + +int sendReplCommand(RedisModuleCtx *ctx, char *format, ...) { + va_list ap; + va_start(ap, format); + redisReply *reply = redisvCommand(mobj->source_cc, format, ap); + va_end(ap); + if (reply == NULL || reply->type == REDIS_REPLY_ERROR) { + RedisModule_Log(ctx, REDISMODULE_LOGLEVEL_WARNING, "send %s failed ip:%s,port:%d, error:%s", + format, mobj->host, mobj->port, reply->str); + freeReplyObject(reply); + return 0; + } + RedisModule_Log(ctx, REDISMODULE_LOGLEVEL_NOTICE, "%s %s", format, reply->str); + freeReplyObject(reply); + return 1; +} void *syncWithRedis(void *arg) { RedisModuleCtx *ctx = arg; RedisModule_Log(ctx, REDISMODULE_LOGLEVEL_NOTICE, "begin sync data"); + if (!sendReplCommand(ctx, "PING")) { + goto error; + } + //todo auth with master + sds portstr = sdsfromlonglong(mobj->port); + if (!sendReplCommand(ctx, "REPLCONF listening-port %s", portstr)) { + sdsfree(portstr); + goto error; + } + sdsfree(portstr); + if (!sendReplCommand(ctx, "REPLCONF ip-address %s", mobj->host)) { + goto error; + } + if (!sendReplCommand(ctx, "REPLCONF %s %s %s %s", "capa", "eof", "capa", "psync2")) { + goto error; + } + + return NULL; + error: + freeMigrateObj(mobj); return NULL; } + +int connectRedis(RedisModuleCtx *ctx) { + pthread_t pthread; + int flag = pthread_create(&pthread, NULL, syncWithRedis, ctx); + if (flag != 0) { + RedisModule_Log(ctx, REDISMODULE_LOGLEVEL_WARNING, "Can't start thread"); + freeMigrateObj(mobj); + return RedisModule_ReplyWithError(ctx, "Can't start thread"); + } + pthread_join(pthread, NULL); + return RedisModule_ReplyWithSimpleString(ctx, "OK"); +} + /** * migrate data to current instance. * migrate host port begin-slot end-slot @@ -28,36 +92,28 @@ int rm_migrateCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { return RedisModule_WrongArity(ctx); } robj *host = (robj *) argv[1]; - robj *p = (robj *) argv[2]; + robj *port = (robj *) argv[2]; robj *begin_slot = (robj *) argv[3]; robj *end_slot = (robj *) argv[4]; RedisModule_Log(ctx, REDISMODULE_LOGLEVEL_NOTICE, "host:%s, port:%s, begin:%s, end:%s", (char *) host->ptr, - (char *) p->ptr, (char *) begin_slot->ptr, (char *) end_slot->ptr); - if (context != NULL) { + (char *) port->ptr, (char *) begin_slot->ptr, (char *) end_slot->ptr); + if (mobj != NULL) { return RedisModule_ReplyWithError(ctx, "-ERR is migrating, please waiting"); } - int port = atoi(p->ptr); - struct timeval timeout = {0, 500000}; // 0.5s - context = redisConnectWithTimeout(host->ptr, port, timeout); - if (context == NULL || context->err) { - RedisModule_Log(ctx, REDISMODULE_LOGLEVEL_WARNING, "Could not connect to Redis at ip:%s,port:%s", - (char *) host->ptr, - (char *) p->ptr); - redisFree((redisContext *) context); - context = NULL; + mobj = createMigrateObject(host, atoi(port->ptr), atoi(begin_slot->ptr), atoi(end_slot->ptr)); + struct timeval timeout = {1, 500000}; // 1.5s + redisOptions options = {0}; + REDIS_OPTIONS_SET_TCP(&options, (const char *) mobj->host, mobj->port); + options.connect_timeout = &timeout; + mobj->source_cc = redisConnectWithOptions(&options); + if (mobj->source_cc == NULL || mobj->source_cc->err) { + RedisModule_Log(ctx, REDISMODULE_LOGLEVEL_WARNING, "Could not connect to Redis at ip:%s,port:%d, error:%s", + mobj->host, mobj->port, mobj->source_cc->errstr); + freeMigrateObj(mobj); return RedisModule_ReplyWithError(ctx, "Can't connect source redis"); } + return connectRedis(ctx); - pthread_t pthread; - int flag = pthread_create(&pthread, NULL, syncWithRedis, ctx); - if (flag == 0) { - RedisModule_Log(ctx, REDISMODULE_LOGLEVEL_WARNING, "Can't start thread"); - redisFree((redisContext *) context); - context = NULL; - return RedisModule_ReplyWithError(ctx, "Can't start thread"); - } - pthread_join(pthread, NULL); - return RedisModule_ReplyWithSimpleString(ctx, "OK"); } /* This function must be present on each Redis module. It is used in order to @@ -71,11 +127,13 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) if (flag == REDISMODULE_ERR) { return REDISMODULE_ERR; } - RedisModule_Log(ctx, REDISMODULE_LOGLEVEL_NOTICE, "init %s success", MODULE_NAME); + RedisModule_SetClusterFlags(ctx, REDISMODULE_CLUSTER_FLAG_NO_REDIRECTION); + RedisModule_Log(ctx, REDISMODULE_LOGLEVEL_NOTICE, "begin init commands of %s", MODULE_NAME); flag = RedisModule_CreateCommand(ctx, "rm.migrate", rm_migrateCommand, "write deny-oom admin", 1, 1, 0); if (flag == REDISMODULE_ERR) { RedisModule_Log(ctx, REDISMODULE_LOGLEVEL_WARNING, "init rm.migrate failed"); return REDISMODULE_ERR; } + RedisModule_Log(ctx, REDISMODULE_LOGLEVEL_NOTICE, "init %s success", MODULE_NAME); return REDISMODULE_OK; } diff --git a/src/redis-migrate.h b/src/redis-migrate.h index 78cd768..e74f4b5 100644 --- a/src/redis-migrate.h +++ b/src/redis-migrate.h @@ -3,6 +3,9 @@ #define REDIS_MIGRATE_REDIS_MIGRATE_H #include "redismodule.h" +#include "ae.h" +#include "sds.h" +#include "sdscompat.h" #define MODULE_NAME "redis-migrate" #define REDIS_MIGRATE_VERSION 1 @@ -10,6 +13,9 @@ #define C_ERR -1 #define C_OK 1 +/* Anti-warning macro... */ +#define UNUSED(V) ((void) V) + typedef struct redisObject { unsigned type: 4; unsigned encoding: 4; @@ -20,6 +26,17 @@ typedef struct redisObject { void *ptr; } robj; +typedef struct migrateObject { + char *address; + int repl_stat; + redisContext *source_cc; + char *host; + int port; + int begin_slot; + int end_slot; + char psync_offset[32]; +} migrateObj; + typedef enum { REPL_STATE_NONE = 0, /* No active replication */ REPL_STATE_CONNECT, /* Must connect to master */ @@ -36,10 +53,22 @@ typedef enum { /* --- End of handshake states --- */ REPL_STATE_TRANSFER, /* Receiving .rdb from master */ REPL_STATE_CONNECTED, /* Connected to master */ + STATE_CONNECT_ERROR, + STATE_DISCONNECT } repl_state; long long ustime(void); +migrateObj *createMigrateObject(robj *host, int port, int begin_slot, int end_slot); + +void freeMigrateObj(migrateObj *m); + +int sendReplCommand(RedisModuleCtx *ctx, char *format, ...); + +void *syncWithRedis(void *arg); + +int connectRedis(RedisModuleCtx *ctx) ; + int rm_migrateCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc); int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc); -- 2.45.2 From 9e8abd76dbb5c28cffb42d65452efe7c6fd742a2 Mon Sep 17 00:00:00 2001 From: zeekling Date: Sun, 5 Jun 2022 23:01:54 +0800 Subject: [PATCH 2/5] =?UTF-8?q?=E8=BF=9E=E6=8E=A5=E5=B7=B2=E7=BB=8F?= =?UTF-8?q?=E5=8F=AF=E4=BB=A5=EF=BC=8C=E7=AD=89=E5=BE=85=E5=AE=9E=E7=8E=B0?= =?UTF-8?q?=E5=90=8C=E6=AD=A5=E6=95=B0=E6=8D=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .drone.yml | 7 +++++++ src/redis-migrate.h | 1 + 2 files changed, 8 insertions(+) diff --git a/.drone.yml b/.drone.yml index aea69f8..e73b513 100644 --- a/.drone.yml +++ b/.drone.yml @@ -2,6 +2,13 @@ kind: pipeline type: exec name: default steps: +- name: code-analysis + image: aosapps/drone-sonar-plugin + settings: + sonar_host: + from_secret: sonar_host + sonar_token: + from_secret: sonar_token - name: build commands: - make clean && make diff --git a/src/redis-migrate.h b/src/redis-migrate.h index e74f4b5..57f786f 100644 --- a/src/redis-migrate.h +++ b/src/redis-migrate.h @@ -34,6 +34,7 @@ typedef struct migrateObject { int port; int begin_slot; int end_slot; + char *psync_replid; char psync_offset[32]; } migrateObj; -- 2.45.2 From 782136917849c621ad2f8364489528a83a694b76 Mon Sep 17 00:00:00 2001 From: zeekling Date: Sun, 5 Jun 2022 23:02:36 +0800 Subject: [PATCH 3/5] =?UTF-8?q?=E8=BF=9E=E6=8E=A5=E5=B7=B2=E7=BB=8F?= =?UTF-8?q?=E5=8F=AF=E4=BB=A5=EF=BC=8C=E7=AD=89=E5=BE=85=E5=AE=9E=E7=8E=B0?= =?UTF-8?q?=E5=90=8C=E6=AD=A5=E6=95=B0=E6=8D=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .drone.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.drone.yml b/.drone.yml index e73b513..637ee16 100644 --- a/.drone.yml +++ b/.drone.yml @@ -1,6 +1,6 @@ kind: pipeline -type: exec name: default + steps: - name: code-analysis image: aosapps/drone-sonar-plugin -- 2.45.2 From e32bcff150334ef920c01216a34767ae76e98e99 Mon Sep 17 00:00:00 2001 From: zeekling Date: Sun, 5 Jun 2022 23:09:12 +0800 Subject: [PATCH 4/5] =?UTF-8?q?=E8=BF=9E=E6=8E=A5=E5=B7=B2=E7=BB=8F?= =?UTF-8?q?=E5=8F=AF=E4=BB=A5=EF=BC=8C=E7=AD=89=E5=BE=85=E5=AE=9E=E7=8E=B0?= =?UTF-8?q?=E5=90=8C=E6=AD=A5=E6=95=B0=E6=8D=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .drone.yml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.drone.yml b/.drone.yml index 637ee16..527b831 100644 --- a/.drone.yml +++ b/.drone.yml @@ -1,5 +1,6 @@ kind: pipeline name: default +type: docker steps: - name: code-analysis @@ -10,6 +11,7 @@ steps: sonar_token: from_secret: sonar_token - name: build + image: gcc commands: - make clean && make trigger: -- 2.45.2 From 41bf7bf6488406590bc572cc5252a2db805220a8 Mon Sep 17 00:00:00 2001 From: zeekling Date: Sun, 5 Jun 2022 23:10:33 +0800 Subject: [PATCH 5/5] =?UTF-8?q?=E8=BF=9E=E6=8E=A5=E5=B7=B2=E7=BB=8F?= =?UTF-8?q?=E5=8F=AF=E4=BB=A5=EF=BC=8C=E7=AD=89=E5=BE=85=E5=AE=9E=E7=8E=B0?= =?UTF-8?q?=E5=90=8C=E6=AD=A5=E6=95=B0=E6=8D=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .drone.yml | 7 ------- 1 file changed, 7 deletions(-) diff --git a/.drone.yml b/.drone.yml index 527b831..7bffb79 100644 --- a/.drone.yml +++ b/.drone.yml @@ -3,13 +3,6 @@ name: default type: docker steps: -- name: code-analysis - image: aosapps/drone-sonar-plugin - settings: - sonar_host: - from_secret: sonar_host - sonar_token: - from_secret: sonar_token - name: build image: gcc commands: -- 2.45.2