连接已经可以,等待实现同步数据 #5

Merged
zeekling merged 5 commits from init into master 2022-06-05 15:17:19 +00:00
4 changed files with 113 additions and 34 deletions
Showing only changes of commit c532b742a9 - Show all commits

View File

@ -209,13 +209,6 @@ redisAsyncContext *redisAsyncConnectBind(const char *ip, int port,
return redisAsyncConnectWithOptions(&options); return redisAsyncConnectWithOptions(&options);
} }
redisAsyncContext *redisAsyncConnectBindWithTimeout(const char *ip, int port, const char *source_addr) {
redisOptions options = {0};
REDIS_OPTIONS_SET_TCP(&options, ip, port);
options.endpoint.tcp.source_addr = source_addr;
return redisAsyncConnectWithOptions(&options);
}
redisAsyncContext *redisAsyncConnectBindWithReuse(const char *ip, int port, redisAsyncContext *redisAsyncConnectBindWithReuse(const char *ip, int port,
const char *source_addr) { const char *source_addr) {
redisOptions options = {0}; redisOptions options = {0};

View File

@ -115,7 +115,6 @@ typedef struct redisAsyncContext {
redisAsyncContext *redisAsyncConnectWithOptions(const redisOptions *options); redisAsyncContext *redisAsyncConnectWithOptions(const redisOptions *options);
redisAsyncContext *redisAsyncConnect(const char *ip, int port); redisAsyncContext *redisAsyncConnect(const char *ip, int port);
redisAsyncContext *redisAsyncConnectBind(const char *ip, int port, const char *source_addr); redisAsyncContext *redisAsyncConnectBind(const char *ip, int port, const char *source_addr);
redisAsyncContext *redisAsyncConnectBindWithTimeout(const char *ip, int port, const char *source_addr);
redisAsyncContext *redisAsyncConnectBindWithReuse(const char *ip, int port, redisAsyncContext *redisAsyncConnectBindWithReuse(const char *ip, int port,
const char *source_addr); const char *source_addr);
redisAsyncContext *redisAsyncConnectUnix(const char *path); redisAsyncContext *redisAsyncConnectUnix(const char *path);

View File

@ -2,19 +2,83 @@
#include <sys/time.h> #include <sys/time.h>
#include <unistd.h> #include <unistd.h>
#include <string.h> #include <string.h>
#include <signal.h>
#include <pthread.h> #include <pthread.h>
#include "redis-migrate.h" #include "redis-migrate.h"
#include "hiredis.h" #include "hiredis.h"
#include "async.h"
static redisContext *context; static migrateObj *mobj;
migrateObj *createMigrateObject(robj *host, int port, int begin_slot, int end_slot) {
migrateObj *m;
m = hi_malloc(sizeof(*m));
m->host = host->ptr;
m->port = port;
m->begin_slot = begin_slot;
m->end_slot = end_slot;
m->repl_stat = REPL_STATE_NONE;
return m;
}
void freeMigrateObj(migrateObj *m) {
redisFree(m->source_cc);
hi_free(m);
mobj = NULL;
}
int sendReplCommand(RedisModuleCtx *ctx, char *format, ...) {
va_list ap;
va_start(ap, format);
redisReply *reply = redisvCommand(mobj->source_cc, format, ap);
va_end(ap);
if (reply == NULL || reply->type == REDIS_REPLY_ERROR) {
RedisModule_Log(ctx, REDISMODULE_LOGLEVEL_WARNING, "send %s failed ip:%s,port:%d, error:%s",
format, mobj->host, mobj->port, reply->str);
freeReplyObject(reply);
return 0;
}
RedisModule_Log(ctx, REDISMODULE_LOGLEVEL_NOTICE, "%s %s", format, reply->str);
freeReplyObject(reply);
return 1;
}
void *syncWithRedis(void *arg) { void *syncWithRedis(void *arg) {
RedisModuleCtx *ctx = arg; RedisModuleCtx *ctx = arg;
RedisModule_Log(ctx, REDISMODULE_LOGLEVEL_NOTICE, "begin sync data"); RedisModule_Log(ctx, REDISMODULE_LOGLEVEL_NOTICE, "begin sync data");
if (!sendReplCommand(ctx, "PING")) {
goto error;
}
//todo auth with master
sds portstr = sdsfromlonglong(mobj->port);
if (!sendReplCommand(ctx, "REPLCONF listening-port %s", portstr)) {
sdsfree(portstr);
goto error;
}
sdsfree(portstr);
if (!sendReplCommand(ctx, "REPLCONF ip-address %s", mobj->host)) {
goto error;
}
if (!sendReplCommand(ctx, "REPLCONF %s %s %s %s", "capa", "eof", "capa", "psync2")) {
goto error;
}
return NULL;
error:
freeMigrateObj(mobj);
return NULL; return NULL;
} }
int connectRedis(RedisModuleCtx *ctx) {
pthread_t pthread;
int flag = pthread_create(&pthread, NULL, syncWithRedis, ctx);
if (flag != 0) {
RedisModule_Log(ctx, REDISMODULE_LOGLEVEL_WARNING, "Can't start thread");
freeMigrateObj(mobj);
return RedisModule_ReplyWithError(ctx, "Can't start thread");
}
pthread_join(pthread, NULL);
return RedisModule_ReplyWithSimpleString(ctx, "OK");
}
/** /**
* migrate data to current instance. * migrate data to current instance.
* migrate host port begin-slot end-slot * migrate host port begin-slot end-slot
@ -28,36 +92,28 @@ int rm_migrateCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
return RedisModule_WrongArity(ctx); return RedisModule_WrongArity(ctx);
} }
robj *host = (robj *) argv[1]; robj *host = (robj *) argv[1];
robj *p = (robj *) argv[2]; robj *port = (robj *) argv[2];
robj *begin_slot = (robj *) argv[3]; robj *begin_slot = (robj *) argv[3];
robj *end_slot = (robj *) argv[4]; robj *end_slot = (robj *) argv[4];
RedisModule_Log(ctx, REDISMODULE_LOGLEVEL_NOTICE, "host:%s, port:%s, begin:%s, end:%s", (char *) host->ptr, RedisModule_Log(ctx, REDISMODULE_LOGLEVEL_NOTICE, "host:%s, port:%s, begin:%s, end:%s", (char *) host->ptr,
(char *) p->ptr, (char *) begin_slot->ptr, (char *) end_slot->ptr); (char *) port->ptr, (char *) begin_slot->ptr, (char *) end_slot->ptr);
if (context != NULL) { if (mobj != NULL) {
return RedisModule_ReplyWithError(ctx, "-ERR is migrating, please waiting"); return RedisModule_ReplyWithError(ctx, "-ERR is migrating, please waiting");
} }
int port = atoi(p->ptr); mobj = createMigrateObject(host, atoi(port->ptr), atoi(begin_slot->ptr), atoi(end_slot->ptr));
struct timeval timeout = {0, 500000}; // 0.5s struct timeval timeout = {1, 500000}; // 1.5s
context = redisConnectWithTimeout(host->ptr, port, timeout); redisOptions options = {0};
if (context == NULL || context->err) { REDIS_OPTIONS_SET_TCP(&options, (const char *) mobj->host, mobj->port);
RedisModule_Log(ctx, REDISMODULE_LOGLEVEL_WARNING, "Could not connect to Redis at ip:%s,port:%s", options.connect_timeout = &timeout;
(char *) host->ptr, mobj->source_cc = redisConnectWithOptions(&options);
(char *) p->ptr); if (mobj->source_cc == NULL || mobj->source_cc->err) {
redisFree((redisContext *) context); RedisModule_Log(ctx, REDISMODULE_LOGLEVEL_WARNING, "Could not connect to Redis at ip:%s,port:%d, error:%s",
context = NULL; mobj->host, mobj->port, mobj->source_cc->errstr);
freeMigrateObj(mobj);
return RedisModule_ReplyWithError(ctx, "Can't connect source redis"); return RedisModule_ReplyWithError(ctx, "Can't connect source redis");
} }
return connectRedis(ctx);
pthread_t pthread;
int flag = pthread_create(&pthread, NULL, syncWithRedis, ctx);
if (flag == 0) {
RedisModule_Log(ctx, REDISMODULE_LOGLEVEL_WARNING, "Can't start thread");
redisFree((redisContext *) context);
context = NULL;
return RedisModule_ReplyWithError(ctx, "Can't start thread");
}
pthread_join(pthread, NULL);
return RedisModule_ReplyWithSimpleString(ctx, "OK");
} }
/* This function must be present on each Redis module. It is used in order to /* This function must be present on each Redis module. It is used in order to
@ -71,11 +127,13 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
if (flag == REDISMODULE_ERR) { if (flag == REDISMODULE_ERR) {
return REDISMODULE_ERR; return REDISMODULE_ERR;
} }
RedisModule_Log(ctx, REDISMODULE_LOGLEVEL_NOTICE, "init %s success", MODULE_NAME); RedisModule_SetClusterFlags(ctx, REDISMODULE_CLUSTER_FLAG_NO_REDIRECTION);
RedisModule_Log(ctx, REDISMODULE_LOGLEVEL_NOTICE, "begin init commands of %s", MODULE_NAME);
flag = RedisModule_CreateCommand(ctx, "rm.migrate", rm_migrateCommand, "write deny-oom admin", 1, 1, 0); flag = RedisModule_CreateCommand(ctx, "rm.migrate", rm_migrateCommand, "write deny-oom admin", 1, 1, 0);
if (flag == REDISMODULE_ERR) { if (flag == REDISMODULE_ERR) {
RedisModule_Log(ctx, REDISMODULE_LOGLEVEL_WARNING, "init rm.migrate failed"); RedisModule_Log(ctx, REDISMODULE_LOGLEVEL_WARNING, "init rm.migrate failed");
return REDISMODULE_ERR; return REDISMODULE_ERR;
} }
RedisModule_Log(ctx, REDISMODULE_LOGLEVEL_NOTICE, "init %s success", MODULE_NAME);
return REDISMODULE_OK; return REDISMODULE_OK;
} }

View File

@ -3,6 +3,9 @@
#define REDIS_MIGRATE_REDIS_MIGRATE_H #define REDIS_MIGRATE_REDIS_MIGRATE_H
#include "redismodule.h" #include "redismodule.h"
#include "ae.h"
#include "sds.h"
#include "sdscompat.h"
#define MODULE_NAME "redis-migrate" #define MODULE_NAME "redis-migrate"
#define REDIS_MIGRATE_VERSION 1 #define REDIS_MIGRATE_VERSION 1
@ -10,6 +13,9 @@
#define C_ERR -1 #define C_ERR -1
#define C_OK 1 #define C_OK 1
/* Anti-warning macro... */
#define UNUSED(V) ((void) V)
typedef struct redisObject { typedef struct redisObject {
unsigned type: 4; unsigned type: 4;
unsigned encoding: 4; unsigned encoding: 4;
@ -20,6 +26,17 @@ typedef struct redisObject {
void *ptr; void *ptr;
} robj; } robj;
typedef struct migrateObject {
char *address;
int repl_stat;
redisContext *source_cc;
char *host;
int port;
int begin_slot;
int end_slot;
char psync_offset[32];
} migrateObj;
typedef enum { typedef enum {
REPL_STATE_NONE = 0, /* No active replication */ REPL_STATE_NONE = 0, /* No active replication */
REPL_STATE_CONNECT, /* Must connect to master */ REPL_STATE_CONNECT, /* Must connect to master */
@ -36,10 +53,22 @@ typedef enum {
/* --- End of handshake states --- */ /* --- End of handshake states --- */
REPL_STATE_TRANSFER, /* Receiving .rdb from master */ REPL_STATE_TRANSFER, /* Receiving .rdb from master */
REPL_STATE_CONNECTED, /* Connected to master */ REPL_STATE_CONNECTED, /* Connected to master */
STATE_CONNECT_ERROR,
STATE_DISCONNECT
} repl_state; } repl_state;
long long ustime(void); long long ustime(void);
migrateObj *createMigrateObject(robj *host, int port, int begin_slot, int end_slot);
void freeMigrateObj(migrateObj *m);
int sendReplCommand(RedisModuleCtx *ctx, char *format, ...);
void *syncWithRedis(void *arg);
int connectRedis(RedisModuleCtx *ctx) ;
int rm_migrateCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc); int rm_migrateCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc);
int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc); int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc);