Linux - C++实现简单线程池
迪丽瓦拉
2024-05-31 14:32:19
0

概念:

一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着 监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利 用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。 线程池的应用场景: 1. 需要大量的线程来完成任务,且完成任务的时间比较短。 WEB服务器完成网页请求这样的任务,使用线程池技 术是非常合适的。因为单个任务小,而任务数量巨大,你可以想象一个热门网站的点击次数。 但对于长时间的任务,比如一个 Telnet连接请求,线程池的优点就不明显了。因为Telnet会话时间比线程的创建时间大多了。 2. 对性能要求苛刻的应用,比如要求服务器迅速响应客户请求。 3. 接受突发性的大量请求,但不至于使服务器因此产生大量线程的应用。突发性大量客户请求,在没有线程池情 况下,将产生大量线程,虽然理论上大部分操作系统线程数目最大值不是问题,短时间内产生大量线程可能使内存到达极限, 出现错误。

threadPool.hpp

// 线程池类定义
#pragma once
#include 
#include 
#include 
#include 
#include "thread.hpp"
#include "mutex.hpp"
#include "task.hpp"const int g_thread_num = 3;// 这样不太合适,因为这样的话,除非把它设计为单例模式,否则所有线程池共用一个任务队列,锁,条件变量,显然是不行的。
// template 
// class ThreadPool
// {
// public:
//     // 构造函数
//     ThreadPool(int num = g_thread_num)
//         : thread_num_(num)
//     {
//         pthread_mutex_init(&mutex_, nullptr);
//         pthread_cond_init(&cond_, nullptr);
//     }
//     ~ThreadPool()
//     {
//         pthread_mutex_destroy(&mutex_);
//         pthread_cond_destroy(&cond_);
//     }// public:
//     void run()
//     {
//         pthread_t tid;
//         // 让所有线程执行起来,等待任务,执行任务
//         for (int i = 0; i < thread_num_; ++i)
//         {
//             pthread_create(&tid, nullptr, routine, (void *)this);
//         }
//     }
//     // 因为类成员函数,有一个隐藏的this指针,不能直接作为void* (*p)(void*)函数
//     //因此设置为static成员函数,但是routine内部要访问锁和条件变量,以及任务队列,因此把它们设置为static数据成员。
//     //但是,这样的话,所有线程池都使用一个锁,一个条件变量,一个任务队列,这无疑不是最佳方案。
//     static void *routine(void *args) // 消费者pop任务,此处为线程。
//     {
//         ThreadPool *tp = (ThreadPool *)args;
//         while (true)
//         {
//             pthread_mutex_lock(&mutex_);
//             while (task_queue_.empty())
//                 pthread_cond_wait(&cond_, &mutex_); // 线程等待任务投放,并解锁
//             T t;
//             t = task_queue_.front();
//             task_queue_.pop();
//             std::cout << "线程 " << pthread_self() << " 执行任务 : " << t() << std::endl; // 执行任务
//             pthread_mutex_unlock(&mutex_);
//         }
//         return nullptr;
//     }
//     void pushTask(const T &task) // 生产者push任务,此处为主线程。
//     {
//         pthread_mutex_lock(&mutex_);
//         task_queue_.push(task);
//         pthread_cond_signal(&cond_); // signal一下条件变量,通知线程
//         pthread_mutex_unlock(&mutex_);
//     }// private:
//     int thread_num_;
//     static std::queue task_queue_; // 缓冲区,存放任务
//     static pthread_mutex_t mutex_;    // 锁,因为static函数访问了这个
//     static pthread_cond_t cond_;      // 条件变量,因为static函数访问了这个
// };// template 
// std::queue ThreadPool::task_queue_;// template 
// pthread_mutex_t ThreadPool::mutex_;// template 
// pthread_cond_t ThreadPool::cond_;// 单例模式:把构造函数私有化,把拷贝和赋值设为delete,设置一个static接口获取单例指针。
template 
class ThreadPool
{
public:static ThreadPool *getThreadPool(int thread_num = g_thread_num){// 这样是不安全的,因为可能多个线程执行这个getThreadPool,这样可能new出多个单例对象。// 因此要加锁保护// if(thread_pool_ptr == nullptr)//     thread_pool_ptr = new ThreadPool(thread_num);// return thread_pool_ptr;// 可能多次调用这个getThreadPool,但是没必要每次进来都加锁。// 因为只有第一次进来的时候加锁保护,防止创建多个单例对象是有意义的。// if (nullptr == thread_pool_ptr){lockGuard lockguard(&mutex);// pthread_mutex_lock(&mutex);if (nullptr == thread_pool_ptr){thread_pool_ptr = new ThreadPool(thread_num);}// pthread_mutex_unlock(&mutex);}return thread_pool_ptr;}
private:ThreadPool(int num = g_thread_num): thread_num_(num){pthread_mutex_init(&mutex_, nullptr);pthread_cond_init(&cond_, nullptr);for (int i = 1; i <= thread_num_; ++i){threads_.push_back(new Thread(i, routine, (void *)this));}}ThreadPool(const ThreadPool& tp) = delete;ThreadPool& operator=(const ThreadPool& tp) = delete;
public:~ThreadPool(){for (auto &iter : threads_){iter->join();delete iter;}pthread_mutex_destroy(&mutex_);pthread_cond_destroy(&cond_);logMessage(NORMAL, "线程池内所有线程退出成功");}void run(){// 线程池内所有线程开始执行for (auto &i : threads_){i->create();logMessage(NORMAL, "%s启动成功", i->name().c_str());}}// 避免this指针static void *routine(void *args){ThreadData *td = (ThreadData *)args;ThreadPool *tp = (ThreadPool *)td->args_;while (true){// T task;// {//     lockGuard lockguard(tp->getMutex()); // RAII加锁//     while (tp->isEmpty())//         tp->waitCond();//     task = tp->getTask();// }// task(td->name_);T task;{// 临界区,临界资源是任务队列task_queue_lockGuard lockguard(&tp->mutex_);while(tp->task_queue_.empty())pthread_cond_wait(&tp->cond_, &tp->mutex_);task = tp->task_queue_.front();tp->task_queue_.pop();}task(td->name_);}}void pushTask(const T &task){lockGuard lockguard(&mutex_);task_queue_.push(task);pthread_cond_signal(&cond_);}
public:pthread_mutex_t *getMutex(){return &mutex_;}bool isEmpty(){return task_queue_.empty();}void waitCond(){pthread_cond_wait(&cond_, &mutex_);}T getTask(){Task ret = task_queue_.front();task_queue_.pop();return ret;}
private:std::vector threads_;int thread_num_;std::queue task_queue_;pthread_mutex_t mutex_;pthread_cond_t cond_;static ThreadPool *thread_pool_ptr; // 单例模式指针static pthread_mutex_t mutex;
};template 
ThreadPool *ThreadPool::thread_pool_ptr = nullptr;
template 
pthread_mutex_t ThreadPool::mutex = PTHREAD_MUTEX_INITIALIZER;

thread.hpp

// 封装一下线程,线程池里包含线程
#include 
#include 
#include // typedef std::function func_t;
typedef void *(*func_t)(void*);// 实际创建线程时,传递的参数是ThreadData类型
struct ThreadData
{
public:std::string name_;void* args_;        // 创建Thread对象时,传来的例程参数,void*
};class Thread
{
public:Thread(int num, func_t callback, void *args): func_(callback){char nameBuffer[1024];snprintf(nameBuffer, sizeof nameBuffer, "thread_%d", num);name_ = nameBuffer;tdata_.name_ = name_;tdata_.args_ = args;    // 线程池那里传来的this指针}~Thread() = default;void create(){pthread_create(&tid_, nullptr, func_, (void*)&tdata_);}void join(){pthread_join(tid_, nullptr);}std::string name(){return name_;}
private:std::string name_;pthread_t tid_;   // 线程id,是在pthread_create的时候才产生的func_t func_;   // 线程执行例程函数ThreadData tdata_;  // 执行例程函数的参数
};

testMain.cc

// 创建线程池的main源文件
#include 
#include 
#include 
#include "threadPool.hpp"
#include "task.hpp"
#include "log.hpp"int main()
{srand((unsigned int)time(nullptr) ^ getpid());ThreadPool *tp = ThreadPool::getThreadPool();// ThreadPool tp1 = *tp;// tp1 = *tp;tp->run();// 创建线程池,往线程池内push任务,供给线程池内的线程执行。while(true){int x = rand() % 10 + 1;usleep(3333);int y = rand() % 10 + 1;Task t(x, y, [](int x, int y)->int {return x + y;});tp->pushTask(t);logMessage(NORMAL, "主线程生产完成: %d+%d=?", x, y);sleep(1);}return 0;
}

task.hpp

#pragma once
#include 
#include 
#include "log.hpp"typedef std::function fun_t; class Task
{
public:Task() = default;Task(int x, int y, fun_t func): x_(x), y_(y), func_(func){}int operator()(const std::string& name){logMessage(NORMAL, "%s处理完成: %d+%d=%d | %s | %d", name.c_str() , x_, y_, func_(x_, y_), __FILE__, __LINE__);}
public:int x_;int y_;fun_t func_;
};

mutex.hpp

// RAII式的mutex互斥锁封装
#pragma once
#include // 对锁指针的封装,需要一个锁指针
class Mutex
{
public:Mutex(pthread_mutex_t *pmtx):pmtx_(pmtx){}void lock(){pthread_mutex_lock(pmtx_);}void unlock(){pthread_mutex_unlock(pmtx_);}
private:pthread_mutex_t *pmtx_;
};// 需要一个锁指针,RAII式的加锁解锁。
class lockGuard
{
public:lockGuard(pthread_mutex_t *mtx): mtx_(mtx){mtx_.lock();}~lockGuard(){mtx_.unlock();}
private:Mutex mtx_;
};

log.hpp

// 一个打印,提示的功能
#pragma once
#include 
#include 
#include // 日志是有级别的
#define DEBUG 0
#define NORMAL 1
#define WARNING 2
#define ERROR 3
#define FATAL 4const char *gLevelMap[] = {"DEBUG","NORMAL","WARNING","ERROR","FATAL"
};// 第二个参数本质上就是一个字符串,你忘了吗 "你好,哈哈,%s%d"
void logMessage(int level, const char *format, ...)
{// 如果没有定义DEBUG_SHOW,你就把DEBUGlevel的log都屏蔽掉
#ifndef DEBUG_SHOWif(level == DEBUG)  return;
#endifchar stdBuffer[1024];   // 标准部分的存储空间time_t timestamp = time(nullptr);   // 获取时间戳以代替时间snprintf(stdBuffer, sizeof stdBuffer, "[%s] [%d] ", gLevelMap[level], timestamp);char logBuffer[1024];   // 自定义部分的存储空间va_list args;va_start(args, format);vsnprintf(logBuffer, sizeof logBuffer, format, args);va_end(args);printf("%s%s\n", stdBuffer, logBuffer);
}

log.hpp就是一个打印日志的功能。mutex.hpp实现了一个RAII式的加锁解锁类,参数是一个锁指针。

task.hpp就是对线程池内线程执行的任务的一个简单模拟。main函数生产task,调用线程池的pushTask方法,将task放入线程池的任务队列中,线程池内等待任务的线程被signal然后执行任务。

thread.hpp对于线程的封装,线程的数据成员有线程名,执行例程方法,参数,tid。就是一个很简单的封装。并不是构造时直接pthread_create,而是需要调用一个create方法。线程池构造时构造若干个thread对象。线程池的run方法中调用线程的create方法,使线程执行routine等待任务队列中的任务。


线程池的基本原理就是生产消费模型,main函数充当生产者,放入线程池的任务队列缓冲区中,然后利用条件变量唤醒pthread_cond_wait的线程,线程执行任务。这里需要加锁保护临界资源,在生产消费之间,消费消费之间都需要加锁实现互斥(因为此处仅main在生产,故没有生产生产之间的关系,但是也实现了互斥),还需要条件变量实现同步机制。

对于routine函数,因为类的普通成员方法有一个隐藏的this指针参数,因此不能直接作为线程的执行例程方法(void*(*p)(void*)),因此需要将其定为static,但是static方法不能访问非static数据成员。也就导致了不能直接访问task_queue_,mutex,cond等数据成员,如果想直接访问,需要加static。但是如果这样的话,所有线程池都共用一个任务列表,锁,条件变量是不合适的。

解决方法是:将routine设为static方法,然后线程传参时传一个线程池对象的this指针过去,static成员方法不能访问非static数据成员的原因就是因为没有this指针。现在传一个过去,这个routine就可以访问任何非static数据成员和非static方法了。

有关加锁,和条件变量的使用,就是常规的生产消费模型中锁和条件变量的使用。

单例模式

有懒汉模式和饿汉模式。

将构造设为private,拷贝和赋值设为delete。对外提供一个static方法,内部有一个static对象指针。static方法用于获取这个static对象指针。也就是static ThreadPool* threadpool_ptr。在static接口getThreadPool中,实现懒汉模式逻辑。

getThreadPool有一个线程安全问题,也就是多线程调用时,可能出现内存中创建出两份单例对象的情况。后面再调用时,因为threadpool_ptr指针为nullptr,就不会出现这种情况了。

为了避免上述情况发生,需要加锁保护。加锁保护之后,可以有一个优化。也就是再套一层判断。因为只有第一批进入这个函数的线程可能有安全问题,后面当对象创建出来之后,单例指针不为nullptr,再进入方法中不再需要加锁解锁这样的过程。因此又加了一层判断,提高效率。

其他

STL中的容器不是线程安全的。自旋锁,悲观锁,乐观锁....略了。读写锁,略了。

相关内容