增加连接 #4

Merged
zeekling merged 1 commits from init into master 2022-05-25 16:07:30 +00:00
5 changed files with 39 additions and 69 deletions

View File

@ -13,7 +13,7 @@ else
SHOBJ_LDFLAGS ?= -bundle -undefined dynamic_lookup
endif
FINAL_CFLAGS+= -I./hiredis $(SHOBJ_CFLAGS)
FINAL_CFLAGS+= $(SHOBJ_CFLAGS)
.PHONY: all

View File

@ -209,6 +209,13 @@ redisAsyncContext *redisAsyncConnectBind(const char *ip, int port,
return redisAsyncConnectWithOptions(&options);
}
redisAsyncContext *redisAsyncConnectBindWithTimeout(const char *ip, int port, const char *source_addr) {
redisOptions options = {0};
REDIS_OPTIONS_SET_TCP(&options, ip, port);
options.endpoint.tcp.source_addr = source_addr;
return redisAsyncConnectWithOptions(&options);
}
redisAsyncContext *redisAsyncConnectBindWithReuse(const char *ip, int port,
const char *source_addr) {
redisOptions options = {0};

View File

@ -115,6 +115,7 @@ typedef struct redisAsyncContext {
redisAsyncContext *redisAsyncConnectWithOptions(const redisOptions *options);
redisAsyncContext *redisAsyncConnect(const char *ip, int port);
redisAsyncContext *redisAsyncConnectBind(const char *ip, int port, const char *source_addr);
redisAsyncContext *redisAsyncConnectBindWithTimeout(const char *ip, int port, const char *source_addr);
redisAsyncContext *redisAsyncConnectBindWithReuse(const char *ip, int port,
const char *source_addr);
redisAsyncContext *redisAsyncConnectUnix(const char *path);

View File

@ -1,73 +1,20 @@
#include <sys/time.h>
#include <unistd.h>
#include <string.h>
#include <signal.h>
#include <pthread.h>
#include "redis-migrate.h"
#include "hiredis.h"
#include "async.h"
#include "ae.h"
static redisAsyncContext *context;
char *bind_source_addr;
static aeEventLoop *loop;
static redisContext *context;
int sync_state;
RedisModuleCtx *moduleCtx;
/* Return the UNIX time in microseconds */
long long ustime(void) {
struct timeval tv;
long long ust;
gettimeofday(&tv, NULL);
ust = ((long long) tv.tv_sec) * 1000000;
ust += tv.tv_usec;
return ust;
void *syncWithRedis(void *arg) {
RedisModuleCtx *ctx = arg;
RedisModule_Log(ctx, REDISMODULE_LOGLEVEL_NOTICE, "begin sync data");
return NULL;
}
/* Return the UNIX time in milliseconds */
mstime_t mstime(void) {
return ustime() / 1000;
}
void syncWithMaster() {
}
void connectCallback(const redisAsyncContext *c, int status) {
}
void disconnectCallback(const redisAsyncContext *c, int status) {
}
int connectWithSourceRedis(RedisModuleCtx *ctx, robj *host, robj *p) {
int port = atoi(p->ptr);
context = redisAsyncConnect(host->ptr, port);
if (context->err) {
RedisModule_Log(ctx, REDISMODULE_LOGLEVEL_WARNING, "Could not connect to Redis at ip:%s,port:%s",
(char *) host->ptr,
(char *) p->ptr);
goto err;
}
loop = aeCreateEventLoop(64);
redisAeAttach(loop, context);
redisAsyncSetConnectCallback(context, connectCallback);
redisAsyncSetDisconnectCallback(context, disconnectCallback);
// send commands
return C_OK;
err:
redisAsyncFree((redisAsyncContext *) context);
context = NULL;
if (loop != NULL) {
aeStop(loop);
loop = NULL;
}
return C_ERR;
}
/**
* migrate data to current instance.
* migrate host port begin-slot end-slot
@ -81,17 +28,35 @@ int rm_migrateCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
return RedisModule_WrongArity(ctx);
}
robj *host = (robj *) argv[1];
robj *port = (robj *) argv[2];
robj *p = (robj *) argv[2];
robj *begin_slot = (robj *) argv[3];
robj *end_slot = (robj *) argv[4];
RedisModule_Log(ctx, REDISMODULE_LOGLEVEL_NOTICE, "host:%s, port:%s, begin:%s, end:%s", (char *) host->ptr,
(char *) port->ptr, (char *) begin_slot->ptr, (char *) end_slot->ptr);
(char *) p->ptr, (char *) begin_slot->ptr, (char *) end_slot->ptr);
if (context != NULL) {
return RedisModule_ReplyWithError(ctx, "-ERR is migrating, please waiting");
}
if (connectWithSourceRedis(ctx, host, port) == C_ERR) {
return RedisModule_ReplyWithError(ctx, "-ERR Can't connect source");
int port = atoi(p->ptr);
struct timeval timeout = {0, 500000}; // 0.5s
context = redisConnectWithTimeout(host->ptr, port, timeout);
if (context == NULL || context->err) {
RedisModule_Log(ctx, REDISMODULE_LOGLEVEL_WARNING, "Could not connect to Redis at ip:%s,port:%s",
(char *) host->ptr,
(char *) p->ptr);
redisFree((redisContext *) context);
context = NULL;
return RedisModule_ReplyWithError(ctx, "Can't connect source redis");
}
pthread_t pthread;
int flag = pthread_create(&pthread, NULL, syncWithRedis, ctx);
if (flag == 0) {
RedisModule_Log(ctx, REDISMODULE_LOGLEVEL_WARNING, "Can't start thread");
redisFree((redisContext *) context);
context = NULL;
return RedisModule_ReplyWithError(ctx, "Can't start thread");
}
pthread_join(pthread, NULL);
return RedisModule_ReplyWithSimpleString(ctx, "OK");
}
@ -112,6 +77,5 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
RedisModule_Log(ctx, REDISMODULE_LOGLEVEL_WARNING, "init rm.migrate failed");
return REDISMODULE_ERR;
}
moduleCtx = ctx;
return REDISMODULE_OK;
}

View File

@ -38,8 +38,6 @@ typedef enum {
REPL_STATE_CONNECTED, /* Connected to master */
} repl_state;
mstime_t mstime(void);
long long ustime(void);
int rm_migrateCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc);