Linux没有真正意义上的线性结构,Linux是用进程PCB模拟线程的,
Linux并不能直接给我们提供线程的接口,只能提供轻量级进程的接口!
#include
#include
#include
#include
#include
int ticket = 100;
void *route(void *arg)
{char *id = (char*)arg;while ( 1 ) {if ( ticket > 0 ) {usleep(1000);printf("%s sells ticket:%d\n", id, ticket);ticket--;} else {break;}}
}int main( void )
{pthread_t t1, t2, t3, t4;pthread_create(&t1, NULL, route, (void*)"thread 1");pthread_create(&t2, NULL, route, (void*)"thread 2");pthread_create(&t3, NULL, route, (void*)"thread 3");pthread_create(&t4, NULL, route, (void*)"thread 4");pthread_join(t1, NULL);pthread_join(t2, NULL);pthread_join(t3, NULL);pthread_join(t4, NULL);
}
为什么可能无法获得争取结果?
要解决以上问题,需要做到三点:
要做到这三点,本质上就是需要一把锁。Linux上提供的这把锁叫互斥量。
#include
#include
#include
#include
#include
#include
#include
#include
#include using namespace std;int tickets = 10000; // 在并发访问的时候,导致了我们数据不一致的问题!临界资源#define THREAD_NUM 10// 线程数量class ThreadData
{
public:ThreadData(const std::string& n, pthread_mutex_t* pm) :tname(n), pmtx(pm){}
public:std::string tname;pthread_mutex_t* pmtx;
};void* getTickets(void* args)
{ThreadData* td = (ThreadData*)args;while (true){// 抢票逻辑int n = pthread_mutex_lock(td->pmtx);// 加锁assert(n == 0);// 临界区if (tickets > 0) // 1. 判断的本质也是计算的一种{usleep(rand() % 1500);printf("%s: %d\n", td->tname.c_str(), tickets);tickets--; // 2. 也可能出现问题n = pthread_mutex_unlock(td->pmtx);// 解锁assert(n == 0);}else {n = pthread_mutex_unlock(td->pmtx);// 解锁assert(n == 0);break;}// 抢完票,其实还需要后续的动作usleep(rand() % 2000);}delete td;return nullptr;
}int main()
{time_t start = time(nullptr);pthread_mutex_t mtx;// 局部变量 + pthread_mutex_init初始化pthread_mutex_init(&mtx, nullptr);srand((unsigned long)time(nullptr) ^ getpid() ^ 0x147);// 让随机数更随机pthread_t t[THREAD_NUM];// 多线程抢票的逻辑for (int i = 0; i < THREAD_NUM; i++){std::string name = "thread ";name += std::to_string(i + 1);ThreadData* td = new ThreadData(name, &mtx);pthread_create(t + i, nullptr, getTickets, (void*)td);}for (int i = 0; i < THREAD_NUM; i++){pthread_join(t[i], nullptr);}pthread_mutex_destroy(&mtx);time_t end = time(nullptr);cout << "cast: " << (int)(end - start) << "S" << endl;
}
补充一点: 可重入函数就是安全的
当一个线程互斥地访问某个变量时,它可能发现在其它线程改变状态之前,它什么也做不了。
例如一个线程访问队列时,发现队列为空,它只能等待,直到其它线程将一个节点添加到队列中。这种情况就需要用到条件变量。
引入同步: 主要是为了解决访问临界资源合理性问题的,使线程按照一定顺序,进行临界资源的访问,线程同步
#include
#include
#include
#include #define TNUM 4
typedef void (*func_t)(const std::string &name,pthread_mutex_t *pmtx, pthread_cond_t *pcond);
volatile bool quit = false;class ThreadData
{
public:ThreadData(const std::string &name, func_t func, pthread_mutex_t *pmtx, pthread_cond_t *pcond):name_(name), func_(func), pmtx_(pmtx), pcond_(pcond){}
public:std::string name_;func_t func_;pthread_mutex_t *pmtx_;pthread_cond_t *pcond_;
};void func1(const std::string &name, pthread_mutex_t *pmtx, pthread_cond_t *pcond)
{while(!quit){// wait一定要在加锁和解锁之间进行wait!// v2: pthread_mutex_lock(pmtx);// if(临界资源是否就绪-- 否) pthread_cond_waitpthread_cond_wait(pcond, pmtx); //默认该线程在执行的时候,wait代码被执行,当前线程会被立即被阻塞std::cout << name << " running -- 播放" << std::endl;pthread_mutex_unlock(pmtx);}
}void func2(const std::string &name,pthread_mutex_t *pmtx, pthread_cond_t *pcond)
{while(!quit){pthread_mutex_lock(pmtx);pthread_cond_wait(pcond, pmtx); //默认该线程在执行的时候,wait代码被执行,当前线程会被立即被阻塞if(!quit) std::cout << name << " running -- 下载" << std::endl;pthread_mutex_unlock(pmtx);}
}
void func3(const std::string &name,pthread_mutex_t *pmtx, pthread_cond_t *pcond)
{while(!quit){pthread_mutex_lock(pmtx);pthread_cond_wait(pcond, pmtx); //默认该线程在执行的时候,wait代码被执行,当前线程会被立即被阻塞std::cout << name << " running -- 刷新" << std::endl;pthread_mutex_unlock(pmtx);}
}
void func4(const std::string &name,pthread_mutex_t *pmtx, pthread_cond_t *pcond)
{while(!quit){pthread_mutex_lock(pmtx);pthread_cond_wait(pcond, pmtx); //默认该线程在执行的时候,wait代码被执行,当前线程会被立即被阻塞std::cout << name << " running -- 扫码用户信息" << std::endl;pthread_mutex_unlock(pmtx);}
}void *Entry(void *args)
{ThreadData *td = (ThreadData*)args; //td在每一个线程自己私有的栈空间中保存td->func_(td->name_, td->pmtx_, td->pcond_); // 它是一个函数,调用完成就要返回!delete td;return nullptr;
}int main()
{pthread_mutex_t mtx;pthread_cond_t cond;// 条件变量pthread_mutex_init(&mtx, nullptr);pthread_cond_init(&cond, nullptr);// 初始化条件变量pthread_t tids[TNUM];func_t funcs[TNUM] = {func1, func2, func3, func4};// 函数指针数组for (int i = 0; i < TNUM; i++){std::string name = "Thread ";name += std::to_string(i + 1);ThreadData *td = new ThreadData(name, funcs[i], &mtx, &cond);pthread_create(tids + i, nullptr, Entry, (void*)td);}sleep(5);// ctrl new threadint cnt = 10;while(cnt){std::cout << "resume thread run code ...." << cnt-- << std::endl;pthread_cond_signal(&cond);// 唤醒等待sleep(1);}std::cout << "ctrl done" << std::endl;quit = true;pthread_cond_broadcast(&cond);// 唤醒等待for(int i = 0; i < TNUM; i++){pthread_join(tids[i], nullptr);std::cout << "thread: " << tids[i] << "quit" << std::endl;}pthread_mutex_destroy(&mtx);pthread_cond_destroy(&cond);return 0;
}
#pragma once#include
#include
#include
#include
#include "lockGuard.hpp"const int gDefaultCap = 5;// 默认容量template
class BlockQueue
{
private:bool isQueueEmpty(){return bq_.size() == 0;}bool isQueueFull(){return bq_.size() == capacity_;}
public:// 构造BlockQueue(int capacity = gDefaultCap) : capacity_(capacity){pthread_mutex_init(&mtx_, nullptr);pthread_cond_init(&Empty_, nullptr);pthread_cond_init(&Full_, nullptr);}void push(const T &in) // 生产者{// 1. 先检测当前的临界资源是否能够满足访问条件lockGuard lockgrard(&mtx_); // 自动调用构造函数while (isQueueFull()){pthread_cond_wait(&Full_, &mtx_);}// 2. 访问临界资源,100%确定,资源是就绪的!bq_.push(in);pthread_cond_signal(&Empty_);// 局部变量 出了作用域会自动调用lockgrard 析构函数} void pop(T *out){lockGuard lockguard(&mtx_);while (isQueueEmpty())pthread_cond_wait(&Empty_, &mtx_);*out = bq_.front();bq_.pop();pthread_cond_signal(&Full_);}~BlockQueue(){pthread_mutex_destroy(&mtx_);pthread_cond_destroy(&Empty_);pthread_cond_destroy(&Full_);}
private:std::queue bq_; // 阻塞队列int capacity_; // 容量上限pthread_mutex_t mtx_; // 通过互斥锁保证队列安全pthread_cond_t Empty_; // 用它来表示bq 是否空的条件pthread_cond_t Full_; // 用它来表示bq 是否满的条件
};
#include "BlockQueue.hpp"
#include "Task.hpp"#include
#include
#include int myAdd(int x, int y)
{return x + y;
}void* consumer(void *args)
{BlockQueue *bqueue = (BlockQueue *)args;while(true){// 获取任务Task t;bqueue->pop(&t);// 完成任务std::cout << pthread_self() <<" consumer: "<< t.x_ << "+" << t.y_ << "=" << t() << std::endl;// sleep(1);}return nullptr;
}// 生产者
void* productor(void *args)
{BlockQueue *bqueue = (BlockQueue *)args;while(true){// 制作任务int x = rand()%10 + 1;usleep(rand()%1000);int y = rand()%5 + 1;Task t(x, y, myAdd);// 生产任务bqueue->push(t);// 输出消息std::cout < *bqueue = new BlockQueue();pthread_t c[2],p[2];pthread_create(c, nullptr, consumer, bqueue);pthread_create(c + 1, nullptr, consumer, bqueue);pthread_create(p, nullptr, productor, bqueue);pthread_create(p + 1, nullptr, productor, bqueue);pthread_join(c[0], nullptr);// 等待pthread_join(c[1], nullptr);pthread_join(p[0], nullptr);pthread_join(p[1], nullptr);delete bqueue;return 0;
}
#pragma once#include
#include typedef std::function func_t;class Task
{public:Task(){}Task(int x, int y, func_t func):x_(x), y_(y), func_(func){}int operator ()(){return func_(x_, y_);}
public:int x_;int y_;func_t func_;
};
#pragma once#include
#include
// 定义一把锁
class Mutex
{
public:Mutex(pthread_mutex_t *mtx):pmtx_(mtx){}void lock() {std::cout << "要进行加锁" << std::endl;pthread_mutex_lock(pmtx_);}void unlock(){std::cout << "要进行解锁" << std::endl;pthread_mutex_unlock(pmtx_);}~Mutex(){}
private:pthread_mutex_t *pmtx_;
};// RAII风格的加锁方式
class lockGuard
{
public:lockGuard(pthread_mutex_t *mtx):mtx_(mtx){mtx_.lock();}~lockGuard(){mtx_.unlock();}
private:Mutex mtx_;
};
cp:ConProd.ccg++ -o $@ $^ -std=c++11 -lpthread
.PHONY:clean
clean:rm -f cp
生产者: 最关心的是空间资源->spaceSem->N
消费者: 最关心的是数据资源->dataSem->0
ring_queue:testMain.ccg++ -o $@ $^ -std=c++11 -lpthread
.PHONY:clean
clean:rm -f ring_queue
#ifndef _Ring_QUEUE_HPP_
#define _Ring_QUEUE_HPP_#include
#include
#include
#include "sem.hpp"const int g_default_num = 5;// 多线程
template
class RingQueue
{
public:RingQueue(int default_num = g_default_num): ring_queue_(default_num), num_(default_num),c_step(0),p_step(0),space_sem_(default_num),data_sem_(0){// 初始化锁pthread_mutex_init(&clock, nullptr);pthread_mutex_init(&plock, nullptr);}~RingQueue(){// 销毁锁pthread_mutex_destroy(&clock);pthread_mutex_destroy(&plock);}// 生产者: 空间资源, 生产者们的临界资源是什么?下标void push(const T &in){// 先申请信号量(0)space_sem_.p();// --pthread_mutex_lock(&plock); // ?// 一定是竞争成功的生产者线程 -- 就一个!ring_queue_[p_step++] = in;p_step %= num_;pthread_mutex_unlock(&plock);data_sem_.v();// ++ }// 消费者: 数据资源, 消费者们的临界资源是什么?下标void pop(T *out){data_sem_.p();pthread_mutex_lock(&clock);// 一定是竞争成功的消费者线程 -- 就一个!*out = ring_queue_[c_step++];c_step %= num_;pthread_mutex_unlock(&clock);space_sem_.v();}// void debug()// {// std::cerr << "size: " << ring_queue_.size() << " num: " << num << std::endl;// }
private:std::vector ring_queue_;int num_;int c_step; // 消费下标int p_step; // 生产下标Sem space_sem_;// 记录空间的信号量Sem data_sem_;// 记录空间数据的信号量pthread_mutex_t clock;// 消费者的锁pthread_mutex_t plock;// 生产者的锁
};#endif
#ifndef _SEM_HPP_
#define _SEM_HPP_#include
#include class Sem
{
public:Sem(int value){// 信号量初始化sem_init(&sem_, 0, value);}void p(){// 等待信号量,会将信号量的值减1sem_wait(&sem_);}void v(){// 发布信号量,会将信号量的值加1sem_post(&sem_);}~Sem(){// 销毁相互量sem_destroy(&sem_);}
private:sem_t sem_;
};#endif
#include "ringQueue.hpp"
#include
#include
#include
#include // 消费者
void *consumer(void *args)
{// 生产者和消费者看到的是同一个rqRingQueue *rq = (RingQueue *)args;while(true){sleep(1);int x;// 1. 从环形队列中获取任务或者数据rq->pop(&x);// 2. 进行一定的处理 -- 不要忽略它的时间消耗问题std::cout << "生产: " << x << " [" << pthread_self() << "]" << std::endl;}
}// 生产者
void *productor(void *args)
{// 生产者和消费者看到的是同一个rqRingQueue *rq = (RingQueue *)args;while(true){// sleep(1);// 1. 构建数据或者任务对象 -- 一般是可以从外部来 -- 不要忽略它的时间消耗问题int x = rand() % 100 + 1;std::cout << "消费:" << x << " [" << pthread_self() << "]" << std::endl;// 2. 推送到环形队列中rq->push(x); // 完成生产的过程}
}int main()
{srand((uint64_t)time(nullptr) ^ getpid());// 产生随机数RingQueue *rq = new RingQueue();// 多线程模式pthread_t c[3],p[2];pthread_create(c, nullptr, consumer, (void*)rq);pthread_create(c+1, nullptr, consumer, (void*)rq);pthread_create(c+2, nullptr, consumer, (void*)rq);pthread_create(p, nullptr, productor, (void*)rq);pthread_create(p+1, nullptr, productor, (void*)rq);for(int i = 0; i < 3; i++) pthread_join(c[i], nullptr);for(int i = 0; i < 2; i++) pthread_join(p[i], nullptr);return 0;
}
将数据或者任务生产前和拿到之后处理,才是最耗费时间的。
生产的本质:私有的任务-> 公共空间中
消费的本质:公共空间中的任务-> 私有化
thread_pool:testMain.ccg++ -o $@ $^ -std=c++11 -lpthread #-DDEBUG_SHOW
.PHONY:clean
clean:rm -f thread_pool
#pragma once#include
#include
#include
#include "log.hpp"typedef std::function func_t;class Task
{
public:Task(){}Task(int x, int y, func_t func):x_(x), y_(y), func_(func){}void operator ()(const std::string &name){// std::cout << "线程 " << name << " 处理完成, 结果是: " << x_ << "+" << y_ << "=" << func_(x_, y_) << std::endl;logMessage(WARNING, "%s处理完成: %d+%d=%d | %s | %d",name.c_str(), x_, y_, func_(x_, y_), __FILE__, __LINE__);}
public:int x_;int y_;// int type;func_t func_;
};
#pragma once#include
#include class Mutex
{
public:Mutex(pthread_mutex_t *mtx):pmtx_(mtx){}void lock() {// std::cout << "要进行加锁" << std::endl;pthread_mutex_lock(pmtx_);}void unlock(){// std::cout << "要进行解锁" << std::endl;pthread_mutex_unlock(pmtx_);}~Mutex(){}
private:pthread_mutex_t *pmtx_;
};// RAII风格的加锁方式
class lockGuard
{
public:lockGuard(pthread_mutex_t *mtx):mtx_(mtx){mtx_.lock();}~lockGuard(){mtx_.unlock();}
private:Mutex mtx_;
};
#pragma once#include
#include
#include
#include
#include // 日志是有日志级别的
#define DEBUG 0
#define NORMAL 1
#define WARNING 2
#define ERROR 3
#define FATAL 4const char *gLevelMap[] = {"DEBUG","NORMAL","WARNING","ERROR","FATAL"
};#define LOGFILE "./threadpool.log"// 完整的日志功能,至少: 日志等级 时间 支持用户自定义(日志内容, 文件行,文件名)
void logMessage(int level, const char *format, ...)
{
#ifndef DEBUG_SHOWif(level== DEBUG) return;
#endif// va_list ap;// va_start(ap, format);// while()// int x = va_arg(ap, int);// va_end(ap); //ap=nullptrchar stdBuffer[1024]; //标准部分time_t timestamp = time(nullptr);// struct tm *localtime = localtime(×tamp);snprintf(stdBuffer, sizeof stdBuffer, "[%s] [%ld] ", gLevelMap[level], timestamp);char logBuffer[1024]; //自定义部分va_list args;va_start(args, format);// vprintf(format, args);vsnprintf(logBuffer, sizeof logBuffer, format, args);va_end(args);FILE *fp = fopen(LOGFILE, "a");// printf("%s%s\n", stdBuffer, logBuffer);fprintf(fp, "%s%s\n", stdBuffer, logBuffer);fclose(fp);
}
#pragma once
#include
#include
#include
#include // typedef std::function fun_t;
typedef void* (*fun_t)(void*);class ThreadData
{
public:void* args_;std::string name_;
};class Thread
{
public:Thread(int num, fun_t callback, void* args) : func_(callback){char nameBuffer[64];snprintf(nameBuffer, sizeof nameBuffer, "Thread-%d", num);name_ = nameBuffer;tdata_.args_ = args;tdata_.name_ = name_;}void start(){pthread_create(&tid_, nullptr, func_, (void*)&tdata_);}void join(){pthread_join(tid_, nullptr);}std::string name(){return name_;}~Thread(){}private:std::string name_;fun_t func_;ThreadData tdata_;pthread_t tid_;
};
#pragma once#include
#include
#include
#include
#include
#include "thread.hpp"
#include "lockGuard.hpp"
#include "log.hpp"const int g_thread_num = 3;
// 本质是: 生产消费模型
template
class ThreadPool
{
public:pthread_mutex_t* getMutex(){return &lock;}bool isEmpty(){return task_queue_.empty();}void waitCond(){pthread_cond_wait(&cond, &lock);}T getTask(){T t = task_queue_.front();task_queue_.pop();return t;}private:ThreadPool(int thread_num = g_thread_num) : num_(thread_num){pthread_mutex_init(&lock, nullptr);pthread_cond_init(&cond, nullptr);for (int i = 1; i <= num_; i++){threads_.push_back(new Thread(i, routine, this));}}ThreadPool(const ThreadPool& other) = delete;const ThreadPool& operator=(const ThreadPool& other) = delete;public:// 考虑一下多线程使用单例的过程static ThreadPool* getThreadPool(int num = g_thread_num){// 可以有效减少未来必定要进行加锁检测的问题// 拦截大量的在已经创建好单例的时候,剩余线程请求单例的而直接访问锁的行为if (nullptr == thread_ptr){lockGuard lockguard(&mutex);// 但是,未来任何一个线程想获取单例,都必须调用getThreadPool接口// 但是,一定会存在大量的申请和释放锁的行为,这个是无用且浪费资源的// pthread_mutex_lock(&mutex);if (nullptr == thread_ptr){thread_ptr = new ThreadPool(num);}// pthread_mutex_unlock(&mutex);}return thread_ptr;}// 1. run()void run(){for (auto& iter : threads_){iter->start();// std::cout << iter->name() << " 启动成功" << std::endl;logMessage(NORMAL, "%s %s", iter->name().c_str(), "启动成功");}}// 线程池本质也是一个生产消费模型// void *routine(void *args)// 消费过程static void* routine(void* args){ThreadData* td = (ThreadData*)args;ThreadPool* tp = (ThreadPool *)td->args_;while (true){T task;{lockGuard lockguard(tp->getMutex());while (tp->isEmpty())tp->waitCond();// 读取任务task = tp->getTask(); // 任务队列是共享的-> 将任务从共享,拿到自己的私有空间}task(td->name_);// lock// while(task_queue_.empty()) wait();// 获取任务// unlock// 处理任务}}// 2. pushTask()void pushTask(const T& task){lockGuard lockguard(&lock);task_queue_.push(task);pthread_cond_signal(&cond);}// test func// void joins()// {// for (auto &iter : threads_)// {// iter->join();// }// }~ThreadPool(){for (auto& iter : threads_){iter->join();delete iter;}pthread_mutex_destroy(&lock);pthread_cond_destroy(&cond);}private:std::vector threads_;int num_;std::queue task_queue_;static ThreadPool* thread_ptr;static pthread_mutex_t mutex;// 方案2:// queue1,queue2// std::queue *p_queue, *c_queue// p_queue->queue1// c_queue->queue2// p_queue -> 生产一批任务之后,swap(p_queue,c_queue),唤醒所有线程/一个线程// 当消费者处理完毕的时候,你也可以进行swap(p_queue,c_queue)// 因为我们生产和消费用的是不同的队列,未来我们要进行资源的处理的时候,仅仅是指针pthread_mutex_t lock;pthread_cond_t cond;
};
template
ThreadPool* ThreadPool::thread_ptr = nullptr;template
pthread_mutex_t ThreadPool::mutex = PTHREAD_MUTEX_INITIALIZER;
#include "threadPool.hpp"
#include "Task.hpp"
#include
#include
#include
#include // void *run(void *args)
// {
// while(true)
// {
// ThreadPool::getThreadPool();
// }
// }int main()
{// logMessage(NORMAL, "%s %d %c %f \n", "这是一条日志信息", 1234, 'c', 3.14);srand((unsigned long)time(nullptr) ^ getpid());// ThreadPool *tp = new ThreadPool();// ThreadPool *tp = ThreadPool::getThreadPool();// 那么,如果单例本身也在被多线程申请使用呢??ThreadPool::getThreadPool()->run();//thread1,2,3,4while(true){//生产的过程,制作任务的时候,要花时间int x = rand()%100 + 1;usleep(7721);int y = rand()%30 + 1;Task t(x, y, [](int x, int y)->int{return x + y;});// std::cout << "制作任务完成: " << x << "+" << y << "=?" << std::endl;logMessage(DEBUG, "制作任务完成: %d+%d=?", x, y);logMessage(DEBUG, "制作任务完成: %d+%d=?", x, y);logMessage(DEBUG, "制作任务完成: %d+%d=?", x, y);logMessage(DEBUG, "制作任务完成: %d+%d=?", x, y);// 推送任务到线程池中ThreadPool::getThreadPool()->pushTask(t);sleep(1);}return 0;
}
[NORMAL] [1675916038] Thread-1 启动成功
[NORMAL] [1675916038] Thread-2 启动成功
[NORMAL] [1675916038] Thread-3 启动成功
[WARNING] [1675916038] Thread-1处理完成: 21+20=41 | Task.hpp | 20
[WARNING] [1675916039] Thread-2处理完成: 69+30=99 | Task.hpp | 20
[WARNING] [1675916040] Thread-3处理完成: 97+21=118 | Task.hpp | 20
[WARNING] [1675916041] Thread-1处理完成: 74+21=95 | Task.hpp | 20
[WARNING] [1675916042] Thread-2处理完成: 30+24=54 | Task.hpp | 20
[WARNING] [1675916043] Thread-3处理完成: 95+17=112 | Task.hpp | 20
[WARNING] [1675916044] Thread-1处理完成: 76+27=103 | Task.hpp | 20
[WARNING] [1675916045] Thread-2处理完成: 67+7=74 | Task.hpp | 20
[WARNING] [1675916046] Thread-3处理完成: 38+4=42 | Task.hpp | 20
[WARNING] [1675916047] Thread-1处理完成: 62+19=81 | Task.hpp | 20
[WARNING] [1675916048] Thread-2处理完成: 93+21=114 | Task.hpp | 20
[WARNING] [1675916049] Thread-3处理完成: 64+13=77 | Task.hpp | 20
[WARNING] [1675916050] Thread-1处理完成: 81+3=84 | Task.hpp | 20
[WARNING] [1675916051] Thread-2处理完成: 86+26=112 | Task.hpp | 20
[WARNING] [1675916052] Thread-3处理完成: 24+21=45 | Task.hpp | 20
[WARNING] [1675916053] Thread-1处理完成: 69+6=75 | Task.hpp | 20