add thread pool

This commit is contained in:
LingZhaoHui 2022-10-06 22:15:10 +08:00
parent 83a2cdcc96
commit d0dfa04129
6 changed files with 145 additions and 4 deletions

View File

@ -26,7 +26,8 @@ set(migrate_sources
src/hiredis/sds.c
src/hiredis/sockcompat.c
src/log.cpp
src/redis-migrate.cpp)
src/thread_pool.cpp
src/redis_migrate.cpp)
set(migrate_sources ${migrate_sources})

View File

@ -6,7 +6,7 @@
#include <sys/time.h>
#include <time.h>
#include "log.h"
#include "redis-migrate.h"
#include "redis_migrate.h"
#include "hiredis/hiredis.h"
static int is_leap_year(time_t year) {

View File

@ -5,7 +5,8 @@
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include "redis-migrate.h"
#include <pthread.h>
#include "redis_migrate.h"
#include "hiredis/hiredis.h"
#include "redismodule.h"
#include "log.h"
@ -149,7 +150,7 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
return REDISMODULE_ERR;
}
RedisModule_Log(ctx, NOTICE, "init commands of %s success", "rm.migrate");
filter = RedisModule_RegisterCommandFilter(ctx, rm_migrateFilter, 0);
if (filter == NULL) {

63
src/thread_pool.cpp Normal file
View File

@ -0,0 +1,63 @@
#include "thread_pool.h"
template <typename T>
ThreadPool<T>::ThreadPool(int num, int max) :
stop(false) {
if (max < num) {
max = MAX_THREADS;
}
if (num <= 0 || num > max) {
throw std::exception();
}
for (int i = 0; i < num; i++) {
work_threads.emplace_back(worker, this);
}
}
template <typename T>
inline ThreadPool<T>::~ThreadPool() {
std::unique_lock<std::mutex> lock(queue_mutex);
stop = true;
condition.notify_all();
for (auto &w : work_threads) {
w.join();
}
}
template <typename T>
bool ThreadPool<T>::submit(T *task) {
queue_mutex.lock();
tasks_queue.push(task);
queue_mutex.unlock();
condition.notify_one();
return true;
}
template <typename T>
void *ThreadPool<T>::worker(void *arg) {
ThreadPool *pool = (ThreadPool *)arg;
pool->run();
return pool;
}
template <typename T>
void ThreadPool<T>::run() {
while (!stop) {
std::unique_lock<std::mutex> lk(this->queue_mutex);
this->condition.wait(lk, [this] {
return !this->tasks_queue.empty();
});
if (this->tasks_queue.empty()) {
continue;
}
T *request = tasks_queue.front();
tasks_queue.pop();
if (request) {
request->process();
}
}
}

76
src/thread_pool.h Normal file
View File

@ -0,0 +1,76 @@
#ifndef _THREADPOOL_H
#define _THREADPOOL_H
#include <iostream>
#include <vector>
#include <queue>
#include <thread>
#include <stdexcept>
#include <condition_variable>
#include <memory>
const int MAX_THREADS = 200;
template <typename T>
class ThreadPool {
public:
/**
* @brief Create a new thread pool
*
* @param num default thread num
* @param max max of thread in thread pool
*/
ThreadPool(int num=1, int max = MAX_THREADS);
/**
* @brief submit a new task to thread pool
*
* @param t
* @return true
* @return false
*/
bool submit(T *task);
/**
* @brief Destroy the Thread Pool object
*
*/
~ThreadPool();
private:
/**
* @brief work thread
*
*/
std::vector<std::thread> work_threads;
/**
* @brief task queue
*
*/
std::queue<T *> tasks_queue;
/**
* @brief task lock
*
*/
std::mutex queue_mutex;
std::condition_variable condition;
bool stop;
private:
/**
* @brief worker task
*
* @param arg
* @return void*
*/
static void *worker(void *arg);
/**
* @brief run the threads
*
* @param arg th args of task
* @return void*
*/
void run();
};
#endif