增加连接 #4
@ -13,7 +13,7 @@ else
|
||||
SHOBJ_LDFLAGS ?= -bundle -undefined dynamic_lookup
|
||||
endif
|
||||
|
||||
FINAL_CFLAGS+= -I./hiredis $(SHOBJ_CFLAGS)
|
||||
FINAL_CFLAGS+= $(SHOBJ_CFLAGS)
|
||||
|
||||
.PHONY: all
|
||||
|
||||
|
@ -209,6 +209,13 @@ redisAsyncContext *redisAsyncConnectBind(const char *ip, int port,
|
||||
return redisAsyncConnectWithOptions(&options);
|
||||
}
|
||||
|
||||
redisAsyncContext *redisAsyncConnectBindWithTimeout(const char *ip, int port, const char *source_addr) {
|
||||
redisOptions options = {0};
|
||||
REDIS_OPTIONS_SET_TCP(&options, ip, port);
|
||||
options.endpoint.tcp.source_addr = source_addr;
|
||||
return redisAsyncConnectWithOptions(&options);
|
||||
}
|
||||
|
||||
redisAsyncContext *redisAsyncConnectBindWithReuse(const char *ip, int port,
|
||||
const char *source_addr) {
|
||||
redisOptions options = {0};
|
||||
|
@ -115,6 +115,7 @@ typedef struct redisAsyncContext {
|
||||
redisAsyncContext *redisAsyncConnectWithOptions(const redisOptions *options);
|
||||
redisAsyncContext *redisAsyncConnect(const char *ip, int port);
|
||||
redisAsyncContext *redisAsyncConnectBind(const char *ip, int port, const char *source_addr);
|
||||
redisAsyncContext *redisAsyncConnectBindWithTimeout(const char *ip, int port, const char *source_addr);
|
||||
redisAsyncContext *redisAsyncConnectBindWithReuse(const char *ip, int port,
|
||||
const char *source_addr);
|
||||
redisAsyncContext *redisAsyncConnectUnix(const char *path);
|
||||
|
@ -1,73 +1,20 @@
|
||||
|
||||
#include <sys/time.h>
|
||||
#include <unistd.h>
|
||||
#include <string.h>
|
||||
#include <signal.h>
|
||||
#include <pthread.h>
|
||||
#include "redis-migrate.h"
|
||||
#include "hiredis.h"
|
||||
#include "async.h"
|
||||
#include "ae.h"
|
||||
|
||||
static redisAsyncContext *context;
|
||||
char *bind_source_addr;
|
||||
static aeEventLoop *loop;
|
||||
static redisContext *context;
|
||||
|
||||
int sync_state;
|
||||
|
||||
RedisModuleCtx *moduleCtx;
|
||||
|
||||
/* Return the UNIX time in microseconds */
|
||||
long long ustime(void) {
|
||||
struct timeval tv;
|
||||
long long ust;
|
||||
|
||||
gettimeofday(&tv, NULL);
|
||||
ust = ((long long) tv.tv_sec) * 1000000;
|
||||
ust += tv.tv_usec;
|
||||
return ust;
|
||||
void *syncWithRedis(void *arg) {
|
||||
RedisModuleCtx *ctx = arg;
|
||||
RedisModule_Log(ctx, REDISMODULE_LOGLEVEL_NOTICE, "begin sync data");
|
||||
return NULL;
|
||||
}
|
||||
|
||||
/* Return the UNIX time in milliseconds */
|
||||
mstime_t mstime(void) {
|
||||
return ustime() / 1000;
|
||||
}
|
||||
|
||||
void syncWithMaster() {
|
||||
|
||||
}
|
||||
|
||||
void connectCallback(const redisAsyncContext *c, int status) {
|
||||
|
||||
}
|
||||
|
||||
void disconnectCallback(const redisAsyncContext *c, int status) {
|
||||
|
||||
}
|
||||
|
||||
int connectWithSourceRedis(RedisModuleCtx *ctx, robj *host, robj *p) {
|
||||
int port = atoi(p->ptr);
|
||||
context = redisAsyncConnect(host->ptr, port);
|
||||
if (context->err) {
|
||||
RedisModule_Log(ctx, REDISMODULE_LOGLEVEL_WARNING, "Could not connect to Redis at ip:%s,port:%s",
|
||||
(char *) host->ptr,
|
||||
(char *) p->ptr);
|
||||
goto err;
|
||||
}
|
||||
loop = aeCreateEventLoop(64);
|
||||
redisAeAttach(loop, context);
|
||||
redisAsyncSetConnectCallback(context, connectCallback);
|
||||
redisAsyncSetDisconnectCallback(context, disconnectCallback);
|
||||
// send commands
|
||||
return C_OK;
|
||||
err:
|
||||
redisAsyncFree((redisAsyncContext *) context);
|
||||
context = NULL;
|
||||
|
||||
if (loop != NULL) {
|
||||
aeStop(loop);
|
||||
loop = NULL;
|
||||
}
|
||||
return C_ERR;
|
||||
}
|
||||
|
||||
/**
|
||||
* migrate data to current instance.
|
||||
* migrate host port begin-slot end-slot
|
||||
@ -81,17 +28,35 @@ int rm_migrateCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
|
||||
return RedisModule_WrongArity(ctx);
|
||||
}
|
||||
robj *host = (robj *) argv[1];
|
||||
robj *port = (robj *) argv[2];
|
||||
robj *p = (robj *) argv[2];
|
||||
robj *begin_slot = (robj *) argv[3];
|
||||
robj *end_slot = (robj *) argv[4];
|
||||
RedisModule_Log(ctx, REDISMODULE_LOGLEVEL_NOTICE, "host:%s, port:%s, begin:%s, end:%s", (char *) host->ptr,
|
||||
(char *) port->ptr, (char *) begin_slot->ptr, (char *) end_slot->ptr);
|
||||
(char *) p->ptr, (char *) begin_slot->ptr, (char *) end_slot->ptr);
|
||||
if (context != NULL) {
|
||||
return RedisModule_ReplyWithError(ctx, "-ERR is migrating, please waiting");
|
||||
}
|
||||
if (connectWithSourceRedis(ctx, host, port) == C_ERR) {
|
||||
return RedisModule_ReplyWithError(ctx, "-ERR Can't connect source");
|
||||
int port = atoi(p->ptr);
|
||||
struct timeval timeout = {0, 500000}; // 0.5s
|
||||
context = redisConnectWithTimeout(host->ptr, port, timeout);
|
||||
if (context == NULL || context->err) {
|
||||
RedisModule_Log(ctx, REDISMODULE_LOGLEVEL_WARNING, "Could not connect to Redis at ip:%s,port:%s",
|
||||
(char *) host->ptr,
|
||||
(char *) p->ptr);
|
||||
redisFree((redisContext *) context);
|
||||
context = NULL;
|
||||
return RedisModule_ReplyWithError(ctx, "Can't connect source redis");
|
||||
}
|
||||
|
||||
pthread_t pthread;
|
||||
int flag = pthread_create(&pthread, NULL, syncWithRedis, ctx);
|
||||
if (flag == 0) {
|
||||
RedisModule_Log(ctx, REDISMODULE_LOGLEVEL_WARNING, "Can't start thread");
|
||||
redisFree((redisContext *) context);
|
||||
context = NULL;
|
||||
return RedisModule_ReplyWithError(ctx, "Can't start thread");
|
||||
}
|
||||
pthread_join(pthread, NULL);
|
||||
return RedisModule_ReplyWithSimpleString(ctx, "OK");
|
||||
}
|
||||
|
||||
@ -112,6 +77,5 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
|
||||
RedisModule_Log(ctx, REDISMODULE_LOGLEVEL_WARNING, "init rm.migrate failed");
|
||||
return REDISMODULE_ERR;
|
||||
}
|
||||
moduleCtx = ctx;
|
||||
return REDISMODULE_OK;
|
||||
}
|
||||
|
@ -38,8 +38,6 @@ typedef enum {
|
||||
REPL_STATE_CONNECTED, /* Connected to master */
|
||||
} repl_state;
|
||||
|
||||
mstime_t mstime(void);
|
||||
|
||||
long long ustime(void);
|
||||
|
||||
int rm_migrateCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc);
|
||||
|
Loading…
Reference in New Issue
Block a user