增加连接
All checks were successful
continuous-integration/drone/pr Build is passing
continuous-integration/drone/push Build is passing

This commit is contained in:
LingZhaoHui 2022-05-26 00:06:56 +08:00
parent 51d27195d7
commit 2d13a047ad
5 changed files with 39 additions and 69 deletions

View File

@ -13,7 +13,7 @@ else
SHOBJ_LDFLAGS ?= -bundle -undefined dynamic_lookup SHOBJ_LDFLAGS ?= -bundle -undefined dynamic_lookup
endif endif
FINAL_CFLAGS+= -I./hiredis $(SHOBJ_CFLAGS) FINAL_CFLAGS+= $(SHOBJ_CFLAGS)
.PHONY: all .PHONY: all

View File

@ -209,6 +209,13 @@ redisAsyncContext *redisAsyncConnectBind(const char *ip, int port,
return redisAsyncConnectWithOptions(&options); return redisAsyncConnectWithOptions(&options);
} }
redisAsyncContext *redisAsyncConnectBindWithTimeout(const char *ip, int port, const char *source_addr) {
redisOptions options = {0};
REDIS_OPTIONS_SET_TCP(&options, ip, port);
options.endpoint.tcp.source_addr = source_addr;
return redisAsyncConnectWithOptions(&options);
}
redisAsyncContext *redisAsyncConnectBindWithReuse(const char *ip, int port, redisAsyncContext *redisAsyncConnectBindWithReuse(const char *ip, int port,
const char *source_addr) { const char *source_addr) {
redisOptions options = {0}; redisOptions options = {0};

View File

@ -115,6 +115,7 @@ typedef struct redisAsyncContext {
redisAsyncContext *redisAsyncConnectWithOptions(const redisOptions *options); redisAsyncContext *redisAsyncConnectWithOptions(const redisOptions *options);
redisAsyncContext *redisAsyncConnect(const char *ip, int port); redisAsyncContext *redisAsyncConnect(const char *ip, int port);
redisAsyncContext *redisAsyncConnectBind(const char *ip, int port, const char *source_addr); redisAsyncContext *redisAsyncConnectBind(const char *ip, int port, const char *source_addr);
redisAsyncContext *redisAsyncConnectBindWithTimeout(const char *ip, int port, const char *source_addr);
redisAsyncContext *redisAsyncConnectBindWithReuse(const char *ip, int port, redisAsyncContext *redisAsyncConnectBindWithReuse(const char *ip, int port,
const char *source_addr); const char *source_addr);
redisAsyncContext *redisAsyncConnectUnix(const char *path); redisAsyncContext *redisAsyncConnectUnix(const char *path);

View File

@ -1,73 +1,20 @@
#include <sys/time.h> #include <sys/time.h>
#include <unistd.h> #include <unistd.h>
#include <string.h>
#include <signal.h>
#include <pthread.h>
#include "redis-migrate.h" #include "redis-migrate.h"
#include "hiredis.h" #include "hiredis.h"
#include "async.h" #include "async.h"
#include "ae.h"
static redisAsyncContext *context; static redisContext *context;
char *bind_source_addr;
static aeEventLoop *loop;
int sync_state; void *syncWithRedis(void *arg) {
RedisModuleCtx *ctx = arg;
RedisModuleCtx *moduleCtx; RedisModule_Log(ctx, REDISMODULE_LOGLEVEL_NOTICE, "begin sync data");
return NULL;
/* Return the UNIX time in microseconds */
long long ustime(void) {
struct timeval tv;
long long ust;
gettimeofday(&tv, NULL);
ust = ((long long) tv.tv_sec) * 1000000;
ust += tv.tv_usec;
return ust;
} }
/* Return the UNIX time in milliseconds */
mstime_t mstime(void) {
return ustime() / 1000;
}
void syncWithMaster() {
}
void connectCallback(const redisAsyncContext *c, int status) {
}
void disconnectCallback(const redisAsyncContext *c, int status) {
}
int connectWithSourceRedis(RedisModuleCtx *ctx, robj *host, robj *p) {
int port = atoi(p->ptr);
context = redisAsyncConnect(host->ptr, port);
if (context->err) {
RedisModule_Log(ctx, REDISMODULE_LOGLEVEL_WARNING, "Could not connect to Redis at ip:%s,port:%s",
(char *) host->ptr,
(char *) p->ptr);
goto err;
}
loop = aeCreateEventLoop(64);
redisAeAttach(loop, context);
redisAsyncSetConnectCallback(context, connectCallback);
redisAsyncSetDisconnectCallback(context, disconnectCallback);
// send commands
return C_OK;
err:
redisAsyncFree((redisAsyncContext *) context);
context = NULL;
if (loop != NULL) {
aeStop(loop);
loop = NULL;
}
return C_ERR;
}
/** /**
* migrate data to current instance. * migrate data to current instance.
* migrate host port begin-slot end-slot * migrate host port begin-slot end-slot
@ -81,17 +28,35 @@ int rm_migrateCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
return RedisModule_WrongArity(ctx); return RedisModule_WrongArity(ctx);
} }
robj *host = (robj *) argv[1]; robj *host = (robj *) argv[1];
robj *port = (robj *) argv[2]; robj *p = (robj *) argv[2];
robj *begin_slot = (robj *) argv[3]; robj *begin_slot = (robj *) argv[3];
robj *end_slot = (robj *) argv[4]; robj *end_slot = (robj *) argv[4];
RedisModule_Log(ctx, REDISMODULE_LOGLEVEL_NOTICE, "host:%s, port:%s, begin:%s, end:%s", (char *) host->ptr, RedisModule_Log(ctx, REDISMODULE_LOGLEVEL_NOTICE, "host:%s, port:%s, begin:%s, end:%s", (char *) host->ptr,
(char *) port->ptr, (char *) begin_slot->ptr, (char *) end_slot->ptr); (char *) p->ptr, (char *) begin_slot->ptr, (char *) end_slot->ptr);
if (context != NULL) { if (context != NULL) {
return RedisModule_ReplyWithError(ctx, "-ERR is migrating, please waiting"); return RedisModule_ReplyWithError(ctx, "-ERR is migrating, please waiting");
} }
if (connectWithSourceRedis(ctx, host, port) == C_ERR) { int port = atoi(p->ptr);
return RedisModule_ReplyWithError(ctx, "-ERR Can't connect source"); struct timeval timeout = {0, 500000}; // 0.5s
context = redisConnectWithTimeout(host->ptr, port, timeout);
if (context == NULL || context->err) {
RedisModule_Log(ctx, REDISMODULE_LOGLEVEL_WARNING, "Could not connect to Redis at ip:%s,port:%s",
(char *) host->ptr,
(char *) p->ptr);
redisFree((redisContext *) context);
context = NULL;
return RedisModule_ReplyWithError(ctx, "Can't connect source redis");
} }
pthread_t pthread;
int flag = pthread_create(&pthread, NULL, syncWithRedis, ctx);
if (flag == 0) {
RedisModule_Log(ctx, REDISMODULE_LOGLEVEL_WARNING, "Can't start thread");
redisFree((redisContext *) context);
context = NULL;
return RedisModule_ReplyWithError(ctx, "Can't start thread");
}
pthread_join(pthread, NULL);
return RedisModule_ReplyWithSimpleString(ctx, "OK"); return RedisModule_ReplyWithSimpleString(ctx, "OK");
} }
@ -112,6 +77,5 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
RedisModule_Log(ctx, REDISMODULE_LOGLEVEL_WARNING, "init rm.migrate failed"); RedisModule_Log(ctx, REDISMODULE_LOGLEVEL_WARNING, "init rm.migrate failed");
return REDISMODULE_ERR; return REDISMODULE_ERR;
} }
moduleCtx = ctx;
return REDISMODULE_OK; return REDISMODULE_OK;
} }

View File

@ -38,8 +38,6 @@ typedef enum {
REPL_STATE_CONNECTED, /* Connected to master */ REPL_STATE_CONNECTED, /* Connected to master */
} repl_state; } repl_state;
mstime_t mstime(void);
long long ustime(void); long long ustime(void);
int rm_migrateCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc); int rm_migrateCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc);