连接已经可以,等待实现同步数据 #5
@ -1,8 +1,10 @@
|
||||
kind: pipeline
|
||||
type: exec
|
||||
name: default
|
||||
type: docker
|
||||
|
||||
steps:
|
||||
- name: build
|
||||
image: gcc
|
||||
commands:
|
||||
- make clean && make
|
||||
trigger:
|
||||
|
@ -209,13 +209,6 @@ redisAsyncContext *redisAsyncConnectBind(const char *ip, int port,
|
||||
return redisAsyncConnectWithOptions(&options);
|
||||
}
|
||||
|
||||
redisAsyncContext *redisAsyncConnectBindWithTimeout(const char *ip, int port, const char *source_addr) {
|
||||
redisOptions options = {0};
|
||||
REDIS_OPTIONS_SET_TCP(&options, ip, port);
|
||||
options.endpoint.tcp.source_addr = source_addr;
|
||||
return redisAsyncConnectWithOptions(&options);
|
||||
}
|
||||
|
||||
redisAsyncContext *redisAsyncConnectBindWithReuse(const char *ip, int port,
|
||||
const char *source_addr) {
|
||||
redisOptions options = {0};
|
||||
|
@ -115,7 +115,6 @@ typedef struct redisAsyncContext {
|
||||
redisAsyncContext *redisAsyncConnectWithOptions(const redisOptions *options);
|
||||
redisAsyncContext *redisAsyncConnect(const char *ip, int port);
|
||||
redisAsyncContext *redisAsyncConnectBind(const char *ip, int port, const char *source_addr);
|
||||
redisAsyncContext *redisAsyncConnectBindWithTimeout(const char *ip, int port, const char *source_addr);
|
||||
redisAsyncContext *redisAsyncConnectBindWithReuse(const char *ip, int port,
|
||||
const char *source_addr);
|
||||
redisAsyncContext *redisAsyncConnectUnix(const char *path);
|
||||
|
@ -2,19 +2,83 @@
|
||||
#include <sys/time.h>
|
||||
#include <unistd.h>
|
||||
#include <string.h>
|
||||
#include <signal.h>
|
||||
#include <pthread.h>
|
||||
#include "redis-migrate.h"
|
||||
#include "hiredis.h"
|
||||
#include "async.h"
|
||||
|
||||
static redisContext *context;
|
||||
static migrateObj *mobj;
|
||||
|
||||
migrateObj *createMigrateObject(robj *host, int port, int begin_slot, int end_slot) {
|
||||
migrateObj *m;
|
||||
m = hi_malloc(sizeof(*m));
|
||||
m->host = host->ptr;
|
||||
m->port = port;
|
||||
m->begin_slot = begin_slot;
|
||||
m->end_slot = end_slot;
|
||||
m->repl_stat = REPL_STATE_NONE;
|
||||
return m;
|
||||
}
|
||||
|
||||
void freeMigrateObj(migrateObj *m) {
|
||||
redisFree(m->source_cc);
|
||||
hi_free(m);
|
||||
mobj = NULL;
|
||||
}
|
||||
|
||||
int sendReplCommand(RedisModuleCtx *ctx, char *format, ...) {
|
||||
va_list ap;
|
||||
va_start(ap, format);
|
||||
redisReply *reply = redisvCommand(mobj->source_cc, format, ap);
|
||||
va_end(ap);
|
||||
if (reply == NULL || reply->type == REDIS_REPLY_ERROR) {
|
||||
RedisModule_Log(ctx, REDISMODULE_LOGLEVEL_WARNING, "send %s failed ip:%s,port:%d, error:%s",
|
||||
format, mobj->host, mobj->port, reply->str);
|
||||
freeReplyObject(reply);
|
||||
return 0;
|
||||
}
|
||||
RedisModule_Log(ctx, REDISMODULE_LOGLEVEL_NOTICE, "%s %s", format, reply->str);
|
||||
freeReplyObject(reply);
|
||||
return 1;
|
||||
}
|
||||
|
||||
void *syncWithRedis(void *arg) {
|
||||
RedisModuleCtx *ctx = arg;
|
||||
RedisModule_Log(ctx, REDISMODULE_LOGLEVEL_NOTICE, "begin sync data");
|
||||
if (!sendReplCommand(ctx, "PING")) {
|
||||
goto error;
|
||||
}
|
||||
//todo auth with master
|
||||
sds portstr = sdsfromlonglong(mobj->port);
|
||||
if (!sendReplCommand(ctx, "REPLCONF listening-port %s", portstr)) {
|
||||
sdsfree(portstr);
|
||||
goto error;
|
||||
}
|
||||
sdsfree(portstr);
|
||||
if (!sendReplCommand(ctx, "REPLCONF ip-address %s", mobj->host)) {
|
||||
goto error;
|
||||
}
|
||||
if (!sendReplCommand(ctx, "REPLCONF %s %s %s %s", "capa", "eof", "capa", "psync2")) {
|
||||
goto error;
|
||||
}
|
||||
|
||||
return NULL;
|
||||
error:
|
||||
freeMigrateObj(mobj);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
int connectRedis(RedisModuleCtx *ctx) {
|
||||
pthread_t pthread;
|
||||
int flag = pthread_create(&pthread, NULL, syncWithRedis, ctx);
|
||||
if (flag != 0) {
|
||||
RedisModule_Log(ctx, REDISMODULE_LOGLEVEL_WARNING, "Can't start thread");
|
||||
freeMigrateObj(mobj);
|
||||
return RedisModule_ReplyWithError(ctx, "Can't start thread");
|
||||
}
|
||||
pthread_join(pthread, NULL);
|
||||
return RedisModule_ReplyWithSimpleString(ctx, "OK");
|
||||
}
|
||||
|
||||
/**
|
||||
* migrate data to current instance.
|
||||
* migrate host port begin-slot end-slot
|
||||
@ -28,36 +92,28 @@ int rm_migrateCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
|
||||
return RedisModule_WrongArity(ctx);
|
||||
}
|
||||
robj *host = (robj *) argv[1];
|
||||
robj *p = (robj *) argv[2];
|
||||
robj *port = (robj *) argv[2];
|
||||
robj *begin_slot = (robj *) argv[3];
|
||||
robj *end_slot = (robj *) argv[4];
|
||||
RedisModule_Log(ctx, REDISMODULE_LOGLEVEL_NOTICE, "host:%s, port:%s, begin:%s, end:%s", (char *) host->ptr,
|
||||
(char *) p->ptr, (char *) begin_slot->ptr, (char *) end_slot->ptr);
|
||||
if (context != NULL) {
|
||||
(char *) port->ptr, (char *) begin_slot->ptr, (char *) end_slot->ptr);
|
||||
if (mobj != NULL) {
|
||||
return RedisModule_ReplyWithError(ctx, "-ERR is migrating, please waiting");
|
||||
}
|
||||
int port = atoi(p->ptr);
|
||||
struct timeval timeout = {0, 500000}; // 0.5s
|
||||
context = redisConnectWithTimeout(host->ptr, port, timeout);
|
||||
if (context == NULL || context->err) {
|
||||
RedisModule_Log(ctx, REDISMODULE_LOGLEVEL_WARNING, "Could not connect to Redis at ip:%s,port:%s",
|
||||
(char *) host->ptr,
|
||||
(char *) p->ptr);
|
||||
redisFree((redisContext *) context);
|
||||
context = NULL;
|
||||
mobj = createMigrateObject(host, atoi(port->ptr), atoi(begin_slot->ptr), atoi(end_slot->ptr));
|
||||
struct timeval timeout = {1, 500000}; // 1.5s
|
||||
redisOptions options = {0};
|
||||
REDIS_OPTIONS_SET_TCP(&options, (const char *) mobj->host, mobj->port);
|
||||
options.connect_timeout = &timeout;
|
||||
mobj->source_cc = redisConnectWithOptions(&options);
|
||||
if (mobj->source_cc == NULL || mobj->source_cc->err) {
|
||||
RedisModule_Log(ctx, REDISMODULE_LOGLEVEL_WARNING, "Could not connect to Redis at ip:%s,port:%d, error:%s",
|
||||
mobj->host, mobj->port, mobj->source_cc->errstr);
|
||||
freeMigrateObj(mobj);
|
||||
return RedisModule_ReplyWithError(ctx, "Can't connect source redis");
|
||||
}
|
||||
return connectRedis(ctx);
|
||||
|
||||
pthread_t pthread;
|
||||
int flag = pthread_create(&pthread, NULL, syncWithRedis, ctx);
|
||||
if (flag == 0) {
|
||||
RedisModule_Log(ctx, REDISMODULE_LOGLEVEL_WARNING, "Can't start thread");
|
||||
redisFree((redisContext *) context);
|
||||
context = NULL;
|
||||
return RedisModule_ReplyWithError(ctx, "Can't start thread");
|
||||
}
|
||||
pthread_join(pthread, NULL);
|
||||
return RedisModule_ReplyWithSimpleString(ctx, "OK");
|
||||
}
|
||||
|
||||
/* This function must be present on each Redis module. It is used in order to
|
||||
@ -71,11 +127,13 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
|
||||
if (flag == REDISMODULE_ERR) {
|
||||
return REDISMODULE_ERR;
|
||||
}
|
||||
RedisModule_Log(ctx, REDISMODULE_LOGLEVEL_NOTICE, "init %s success", MODULE_NAME);
|
||||
RedisModule_SetClusterFlags(ctx, REDISMODULE_CLUSTER_FLAG_NO_REDIRECTION);
|
||||
RedisModule_Log(ctx, REDISMODULE_LOGLEVEL_NOTICE, "begin init commands of %s", MODULE_NAME);
|
||||
flag = RedisModule_CreateCommand(ctx, "rm.migrate", rm_migrateCommand, "write deny-oom admin", 1, 1, 0);
|
||||
if (flag == REDISMODULE_ERR) {
|
||||
RedisModule_Log(ctx, REDISMODULE_LOGLEVEL_WARNING, "init rm.migrate failed");
|
||||
return REDISMODULE_ERR;
|
||||
}
|
||||
RedisModule_Log(ctx, REDISMODULE_LOGLEVEL_NOTICE, "init %s success", MODULE_NAME);
|
||||
return REDISMODULE_OK;
|
||||
}
|
||||
|
@ -3,6 +3,9 @@
|
||||
#define REDIS_MIGRATE_REDIS_MIGRATE_H
|
||||
|
||||
#include "redismodule.h"
|
||||
#include "ae.h"
|
||||
#include "sds.h"
|
||||
#include "sdscompat.h"
|
||||
|
||||
#define MODULE_NAME "redis-migrate"
|
||||
#define REDIS_MIGRATE_VERSION 1
|
||||
@ -10,6 +13,9 @@
|
||||
#define C_ERR -1
|
||||
#define C_OK 1
|
||||
|
||||
/* Anti-warning macro... */
|
||||
#define UNUSED(V) ((void) V)
|
||||
|
||||
typedef struct redisObject {
|
||||
unsigned type: 4;
|
||||
unsigned encoding: 4;
|
||||
@ -20,6 +26,18 @@ typedef struct redisObject {
|
||||
void *ptr;
|
||||
} robj;
|
||||
|
||||
typedef struct migrateObject {
|
||||
char *address;
|
||||
int repl_stat;
|
||||
redisContext *source_cc;
|
||||
char *host;
|
||||
int port;
|
||||
int begin_slot;
|
||||
int end_slot;
|
||||
char *psync_replid;
|
||||
char psync_offset[32];
|
||||
} migrateObj;
|
||||
|
||||
typedef enum {
|
||||
REPL_STATE_NONE = 0, /* No active replication */
|
||||
REPL_STATE_CONNECT, /* Must connect to master */
|
||||
@ -36,10 +54,22 @@ typedef enum {
|
||||
/* --- End of handshake states --- */
|
||||
REPL_STATE_TRANSFER, /* Receiving .rdb from master */
|
||||
REPL_STATE_CONNECTED, /* Connected to master */
|
||||
STATE_CONNECT_ERROR,
|
||||
STATE_DISCONNECT
|
||||
} repl_state;
|
||||
|
||||
long long ustime(void);
|
||||
|
||||
migrateObj *createMigrateObject(robj *host, int port, int begin_slot, int end_slot);
|
||||
|
||||
void freeMigrateObj(migrateObj *m);
|
||||
|
||||
int sendReplCommand(RedisModuleCtx *ctx, char *format, ...);
|
||||
|
||||
void *syncWithRedis(void *arg);
|
||||
|
||||
int connectRedis(RedisModuleCtx *ctx) ;
|
||||
|
||||
int rm_migrateCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc);
|
||||
|
||||
int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc);
|
||||
|
Loading…
Reference in New Issue
Block a user