From d0775f0de3de8a010c14652486f47aa9b1946d90 Mon Sep 17 00:00:00 2001 From: zeekling Date: Sat, 15 Oct 2022 21:31:11 +0800 Subject: [PATCH] add thread pool --- CMakeLists.txt | 1 + src/migrate_task.cpp | 23 +++++++++++++++++++++++ src/migrate_task.h | 16 ++++++++++++++++ src/redis_migrate.cpp | 8 ++++++++ src/redis_migrate.h | 5 ++--- 5 files changed, 50 insertions(+), 3 deletions(-) create mode 100644 src/migrate_task.cpp create mode 100644 src/migrate_task.h diff --git a/CMakeLists.txt b/CMakeLists.txt index 6d95a11..a128f33 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -26,6 +26,7 @@ set(migrate_sources src/hiredis/sds.c src/hiredis/sockcompat.c src/log.cpp + src/migrate_task.cpp src/thread_pool.cpp src/redis_migrate.cpp) diff --git a/src/migrate_task.cpp b/src/migrate_task.cpp new file mode 100644 index 0000000..3ce083e --- /dev/null +++ b/src/migrate_task.cpp @@ -0,0 +1,23 @@ +#include "migrate_task.h" + +MigrateTask::MigrateTask(migrateObj m, RedisModuleCtx *ctx, logObj mLog) { + this->mObj = m; + this->ctx = ctx; +} + +void MigrateTask::process() { + RedisModuleCallReply *reply = RedisModule_Call(this->ctx, "type", "c", this->mObj.key); + long long items = RedisModule_CallReplyLength(reply); + if (items != 1) { + RedisModule_Log(ctx, WARNING, "type is error"); + return; + } + RedisModuleCallReply *item1 = RedisModule_CallReplyArrayElement(reply, 0); + RedisModuleString *str = RedisModule_CreateStringFromCallReply(item1); + size_t len; + const char *type = RedisModule_StringPtrLen(str, &len); + migrateLog(this->mLog, LL_NOTICE, "type=%s", type); +} + +MigrateTask::~MigrateTask() { +} \ No newline at end of file diff --git a/src/migrate_task.h b/src/migrate_task.h new file mode 100644 index 0000000..e6b2100 --- /dev/null +++ b/src/migrate_task.h @@ -0,0 +1,16 @@ +#include +#include "redis_migrate.h" + +class MigrateTask { +private: + migrateObj mObj; + RedisModuleCtx *ctx; + logObj mLog; + +public: + MigrateTask(migrateObj m, RedisModuleCtx *ctx, logObj mLog); + + void process(); + + ~MigrateTask(); +}; diff --git a/src/redis_migrate.cpp b/src/redis_migrate.cpp index 5f3eac9..c40b160 100644 --- a/src/redis_migrate.cpp +++ b/src/redis_migrate.cpp @@ -10,6 +10,8 @@ #include "hiredis/hiredis.h" #include "redismodule.h" #include "log.h" +#include "thread_pool.h" +#include "migrate_task.h" static logObj mLog = {}; @@ -17,6 +19,10 @@ static RedisModuleCommandFilter *filter; static bool isMigrating = false; +static ThreadPool pool; + +static std::map migrating; + migrateObj createMigrateObject(RedisModuleString *host, int port, int slot, RedisModuleString *key, migrateObj m) { size_t hostLen, keyLen; const char *hostStr = RedisModule_StringPtrLen(host, &hostLen); @@ -94,6 +100,8 @@ int rm_migrateCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { m = createMigrateObject(host, (int)portDouble, (int)slotDouble, key, m); migrating[keyStr] = m; isMigrating = true; + MigrateTask task(m, ctx, mLog); + pool.submit(&task); return RedisModule_ReplyWithSimpleString(ctx, "OK"); } diff --git a/src/redis_migrate.h b/src/redis_migrate.h index 552e848..c614d97 100644 --- a/src/redis_migrate.h +++ b/src/redis_migrate.h @@ -41,13 +41,12 @@ typedef struct migrateObject { int isCache; } migrateObj; -static std::map migrating; - - migrateObj createMigrateObject(RedisModuleString *host, int port, int slot, RedisModuleString *key, migrateObj m); int rm_migrateCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc); +void migrateFinished(migrateObj m); + void rm_migrateFilter(RedisModuleCommandFilterCtx *filter); #ifdef __cplusplus