lesson8-Linux多线程
迪丽瓦拉
2024-05-31 14:25:56
0

Linux线程概念

  •  线程在进程内部执行,是OS调度的基本单位
  • OS是可以做到让进程进行资源的细粒度划分的

  •  物理内存是以4kb为单位
  • 我们的.exe可执行程序本来就是按照地址空间的方式进行编译的

 页表映射 - 详细图

 理解线程

  •  线程在进程的地址空间内运行,
  •  进程内部具有多个执行流的,而线程只有一个执行流,
  • 线程独有一组寄存器,栈等等

CPU的视角

  • Linux下,PCB <= 其他OS内的PCB的,所以Linux下的进程:统一称之为轻量级进程
  • CPU其实不关心执行流是进程还是线程,只关心PCB

Linux没有真正意义上的线性结构,Linux是用进程PCB模拟线程的,

Linux并不能直接给我们提供线程的接口,只能提供轻量级进程的接口!

  •   它也在用户层实现了一套用户层多线程方案,以库的方式提供给用户进行使用pthread线程库 -- 原生线程库

为什么线程切换的成本更低

  • CPU内部是有L1~L3cache 对应内存的代码和数据,根据局部性原理,预读CPU内部!!!
  •  线程是不用切换页表 && 改变地址空间,
  • 进程需要切换cache就会立即失效,新进程过来,只能重新缓存,

创建线程

pthread_create

 

  •  这段代码也可以证明CPU是用LWP来区分线程

线程ID及进程地址空间布局

  • pthread_ create函数会产生一个线程ID,存放在第一个参数指向的地址中。该线程ID和前面说的线程ID 不是一回事。
  • 前面讲的线程ID属于进程调度的范畴。因为线程是轻量级进程,是操作系统调度器的最小单位,所以需要一个数值来唯一表示该线程。
  • pthread_ create函数第一个参数指向一个虚拟内存单元,该内存单元的地址即为新创建线程的线程ID,

 pthread_self

  •  可以获得线程自身的ID

 pthread_t类型的理解

  • 对于Linux目前实现的NPTL实现而言,pthread_t类型的线程ID,
  • 本质就是一个进程地址空间上的一个地址。

 线程终止

如果需要只终止某个线程而不终止整个进程,可以有三种方法:
  1. 从线程函数return。这种方法对主线程不适用,从main函数return相当于调用exit。
  2. 线程可以调用pthread_ exit终止自己
  3. 一个线程可以调用pthread_ cancel终止同一进程中的另一个线程。

 pthread_cancel

 线程等待

为什么需要线程等待?
  • 已经退出的线程,其空间没有被释放,仍然在进程的地址空间内。
  • 创建新的线程不会复用刚才退出线程的地址空间。

pthread_join 

分离线程

  • 默认情况下,新创建的线程是joinable的,线程退出后,需要对其进行pthread_join操作,否则无法释放资源,从而造成系统泄漏。
  • 如果不关心线程的返回值,join是一种负担,这个时候,我们可以告诉系统,当线程退出时,自动释放线程资源。

pthread_detach

 Linux线程互斥

  • 临界资源:多线程执行流共享的资源就叫做临界资源
  • 临界区:每个线程内部,访问临界资源的代码,就叫做临界区
  • 互斥:任何时刻,互斥保证有且只有一个执行流进入临界区,访问临界资源,通常对临界资源起保护作用原子性
  • 不会被任何调度机制打断的操作,该操作只有两态,要么完成,要么未完成
#include 
#include 
#include 
#include 
#include 
int ticket = 100;
void *route(void *arg)
{char *id = (char*)arg;while ( 1 ) {if ( ticket > 0 ) {usleep(1000);printf("%s sells ticket:%d\n", id, ticket);ticket--;} else {break;}}
}int main( void )
{pthread_t t1, t2, t3, t4;pthread_create(&t1, NULL, route, (void*)"thread 1");pthread_create(&t2, NULL, route, (void*)"thread 2");pthread_create(&t3, NULL, route, (void*)"thread 3");pthread_create(&t4, NULL, route, (void*)"thread 4");pthread_join(t1, NULL);pthread_join(t2, NULL);pthread_join(t3, NULL);pthread_join(t4, NULL);
}

  • 多个线程并发的操作共享变量,会带来一些问题。 

为什么可能无法获得争取结果?

  • if 语句判断条件为真以后,代码可以并发的切换到其他线程
  • usleep 这个模拟漫长业务的过程,在这个漫长的业务过程中,可能有很多个线程会进入该代码段
  • --ticket 操作本身就不是一个原子操作

 要解决以上问题,需要做到三点:

  1. 代码必须要有互斥行为:当代码进入临界区执行时,不允许其他线程进入该临界区。
  2. 如果多个线程同时要求执行临界区的代码,并且临界区没有线程在执行,那么只能允许一个线程进入该临界区。
  3. 如果线程不在临界区中执行,那么该线程不能阻止其他线程进入临界区。

要做到这三点,本质上就是需要一把锁。Linux上提供的这把锁叫互斥量。 

pthread_mutex_lock && pthread_mutex_unlock

 

#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include using namespace std;int tickets = 10000; // 在并发访问的时候,导致了我们数据不一致的问题!临界资源#define THREAD_NUM 10// 线程数量class ThreadData
{
public:ThreadData(const std::string& n, pthread_mutex_t* pm) :tname(n), pmtx(pm){}
public:std::string tname;pthread_mutex_t* pmtx;
};void* getTickets(void* args)
{ThreadData* td = (ThreadData*)args;while (true){// 抢票逻辑int n = pthread_mutex_lock(td->pmtx);// 加锁assert(n == 0);// 临界区if (tickets > 0) // 1. 判断的本质也是计算的一种{usleep(rand() % 1500);printf("%s: %d\n", td->tname.c_str(), tickets);tickets--; // 2. 也可能出现问题n = pthread_mutex_unlock(td->pmtx);// 解锁assert(n == 0);}else {n = pthread_mutex_unlock(td->pmtx);// 解锁assert(n == 0);break;}// 抢完票,其实还需要后续的动作usleep(rand() % 2000);}delete td;return nullptr;
}int main()
{time_t start = time(nullptr);pthread_mutex_t mtx;// 局部变量 + pthread_mutex_init初始化pthread_mutex_init(&mtx, nullptr);srand((unsigned long)time(nullptr) ^ getpid() ^ 0x147);// 让随机数更随机pthread_t t[THREAD_NUM];// 多线程抢票的逻辑for (int i = 0; i < THREAD_NUM; i++){std::string name = "thread ";name += std::to_string(i + 1);ThreadData* td = new ThreadData(name, &mtx);pthread_create(t + i, nullptr, getTickets, (void*)td);}for (int i = 0; i < THREAD_NUM; i++){pthread_join(t[i], nullptr);}pthread_mutex_destroy(&mtx);time_t end = time(nullptr);cout << "cast: " << (int)(end - start) << "S" << endl;
}

  • pthread_mutex_t 就是原生线程库提供的一个数据类型 
  •  如果多线程访问同一个全局变量,并对它进行数据计算,多线程不会互相影响
    • 可以定义一个全局变量 pthread_mutex_t mtx = PTHREAD_MUTEX_INITIALIZER;
  • 加锁保护:加锁的时候,一定要保证加锁的力度,越小越好!!
  • 加锁之后就是串行执行的了,线程在临界区,切换也不会有什么问题,
    • 锁只有一把,线程切换走了,持有锁也会被带走,而其他抢票的线程要执行临界区的代码
    • 又要申请锁,这时锁是无法申请成功的,也就不会让其他线程进入临界区,保证了临界区中数据的一致性

对锁的理解 

  •  在上那段代码中,锁只有一把,拿1的汇编只有1条
  • 一个线程被切换走,它会将自己的数据对应执行到的汇编 

补充一点: 可重入函数就是安全的

常见锁概念 

死锁

  •  一把锁也可能发生死锁问题

死锁四个必要条件

  1. 互斥条件:一个资源每次只能被一个执行流使用
  2. 请求与保持条件:一个执行流因请求资源而阻塞时,对已获得的资源保持不放
  3. 不剥夺条件:一个执行流已获得的资源,在末使用完之前,不能强行剥夺
  4. 循环等待条件:若干执行流之间形成一种头尾相接的循环等待资源的关系

避免死锁

  • 破坏死锁的四个必要条件
  • 加锁顺序一致
  • 避免锁未释放的场景
  • 资源一次性分配

Linux线程同步

条件变量

  •  当一个线程互斥地访问某个变量时,它可能发现在其它线程改变状态之前,它什么也做不了。

  • 例如一个线程访问队列时,发现队列为空,它只能等待,直到其它线程将一个节点添加到队列中。这种情况就需要用到条件变量

同步概念与竞态条件 

  • 同步:在保证数据安全的前提下,让线程能够按照某种特定的顺序访问临界资源,从而有效避免饥饿问题,叫做同步
  • 竞态条件:因为时序问题,而导致程序异常,我们称之为竞态条件。在线程场景下,这种问题也不难理解

 引入同步: 主要是为了解决访问临界资源合理性问题的,使线程按照一定顺序,进行临界资源的访问,线程同步

方案一: 条件变量

  • 当我们申请临界资源前,先要做临界资源是否存在的检测,而检测的本质: 也是访问临界资源
  • 所以对临界资源的检测,也一定需要在加锁和解锁之间的

 

#include 
#include 
#include 
#include #define TNUM 4
typedef void (*func_t)(const std::string &name,pthread_mutex_t *pmtx, pthread_cond_t *pcond);
volatile bool quit = false;class ThreadData
{
public:ThreadData(const std::string &name, func_t func, pthread_mutex_t *pmtx, pthread_cond_t *pcond):name_(name), func_(func), pmtx_(pmtx), pcond_(pcond){}
public:std::string name_;func_t func_;pthread_mutex_t *pmtx_;pthread_cond_t *pcond_;
};void func1(const std::string &name, pthread_mutex_t *pmtx, pthread_cond_t *pcond)
{while(!quit){// wait一定要在加锁和解锁之间进行wait!// v2: pthread_mutex_lock(pmtx);// if(临界资源是否就绪-- 否) pthread_cond_waitpthread_cond_wait(pcond, pmtx); //默认该线程在执行的时候,wait代码被执行,当前线程会被立即被阻塞std::cout << name << " running -- 播放" << std::endl;pthread_mutex_unlock(pmtx);}
}void func2(const std::string &name,pthread_mutex_t *pmtx, pthread_cond_t *pcond)
{while(!quit){pthread_mutex_lock(pmtx);pthread_cond_wait(pcond, pmtx); //默认该线程在执行的时候,wait代码被执行,当前线程会被立即被阻塞if(!quit) std::cout << name << " running  -- 下载" << std::endl;pthread_mutex_unlock(pmtx);}
}
void func3(const std::string &name,pthread_mutex_t *pmtx, pthread_cond_t *pcond)
{while(!quit){pthread_mutex_lock(pmtx);pthread_cond_wait(pcond, pmtx); //默认该线程在执行的时候,wait代码被执行,当前线程会被立即被阻塞std::cout << name << " running  -- 刷新" << std::endl;pthread_mutex_unlock(pmtx);}
}
void func4(const std::string &name,pthread_mutex_t *pmtx, pthread_cond_t *pcond)
{while(!quit){pthread_mutex_lock(pmtx);pthread_cond_wait(pcond, pmtx); //默认该线程在执行的时候,wait代码被执行,当前线程会被立即被阻塞std::cout << name << " running  -- 扫码用户信息" << std::endl;pthread_mutex_unlock(pmtx);}
}void *Entry(void *args)
{ThreadData *td = (ThreadData*)args; //td在每一个线程自己私有的栈空间中保存td->func_(td->name_, td->pmtx_, td->pcond_); // 它是一个函数,调用完成就要返回!delete td;return nullptr;
}int main()
{pthread_mutex_t mtx;pthread_cond_t cond;// 条件变量pthread_mutex_init(&mtx, nullptr);pthread_cond_init(&cond, nullptr);// 初始化条件变量pthread_t tids[TNUM];func_t funcs[TNUM] = {func1, func2, func3, func4};// 函数指针数组for (int i = 0; i < TNUM; i++){std::string name = "Thread ";name += std::to_string(i + 1);ThreadData *td = new ThreadData(name, funcs[i], &mtx, &cond);pthread_create(tids + i, nullptr, Entry, (void*)td);}sleep(5);// ctrl new threadint cnt = 10;while(cnt){std::cout << "resume thread run code ...." << cnt-- << std::endl;pthread_cond_signal(&cond);// 唤醒等待sleep(1);}std::cout << "ctrl done" << std::endl;quit = true;pthread_cond_broadcast(&cond);// 唤醒等待for(int i = 0; i < TNUM; i++){pthread_join(tids[i], nullptr);std::cout << "thread: " << tids[i] << "quit" << std::endl;}pthread_mutex_destroy(&mtx);pthread_cond_destroy(&cond);return 0;
}

生产者消费者模型 

 

 321原则

  • 3种关系:
    • 生产者和生产者(竞争,互斥)
    • 消费者和消费者(竞争,互斥)
    • 生产者和消费者(互斥,同步)
  • 2种角色:
    • 生产者和消费者
  • 1个交易场所
    • 超市

BlockQueue.hpp

#pragma once#include 
#include 
#include 
#include 
#include "lockGuard.hpp"const int gDefaultCap = 5;// 默认容量template 
class BlockQueue
{
private:bool isQueueEmpty(){return bq_.size() == 0;}bool isQueueFull(){return bq_.size() == capacity_;}
public:// 构造BlockQueue(int capacity = gDefaultCap) : capacity_(capacity){pthread_mutex_init(&mtx_, nullptr);pthread_cond_init(&Empty_, nullptr);pthread_cond_init(&Full_, nullptr);}void push(const T &in) // 生产者{// 1. 先检测当前的临界资源是否能够满足访问条件lockGuard lockgrard(&mtx_); // 自动调用构造函数while (isQueueFull()){pthread_cond_wait(&Full_, &mtx_);}// 2. 访问临界资源,100%确定,资源是就绪的!bq_.push(in);pthread_cond_signal(&Empty_);// 局部变量 出了作用域会自动调用lockgrard 析构函数} void pop(T *out){lockGuard lockguard(&mtx_);while (isQueueEmpty())pthread_cond_wait(&Empty_, &mtx_);*out = bq_.front();bq_.pop();pthread_cond_signal(&Full_);}~BlockQueue(){pthread_mutex_destroy(&mtx_);pthread_cond_destroy(&Empty_);pthread_cond_destroy(&Full_);}
private:std::queue bq_;     // 阻塞队列int capacity_;         // 容量上限pthread_mutex_t mtx_;  // 通过互斥锁保证队列安全pthread_cond_t Empty_; // 用它来表示bq 是否空的条件pthread_cond_t Full_;  //  用它来表示bq 是否满的条件
};

ConProd.cc

#include "BlockQueue.hpp"
#include "Task.hpp"#include 
#include 
#include int myAdd(int x, int y)
{return x + y;
}void* consumer(void *args)
{BlockQueue *bqueue = (BlockQueue *)args;while(true){// 获取任务Task t;bqueue->pop(&t);// 完成任务std::cout << pthread_self() <<" consumer: "<< t.x_ << "+" << t.y_ << "=" << t() << std::endl;// sleep(1);}return nullptr;
}// 生产者
void* productor(void *args)
{BlockQueue *bqueue = (BlockQueue *)args;while(true){// 制作任务int x = rand()%10 + 1;usleep(rand()%1000);int y = rand()%5 + 1;Task t(x, y, myAdd);// 生产任务bqueue->push(t);// 输出消息std::cout < *bqueue = new BlockQueue();pthread_t c[2],p[2];pthread_create(c, nullptr, consumer, bqueue);pthread_create(c + 1, nullptr, consumer, bqueue);pthread_create(p, nullptr, productor, bqueue);pthread_create(p + 1, nullptr, productor, bqueue);pthread_join(c[0], nullptr);// 等待pthread_join(c[1], nullptr);pthread_join(p[0], nullptr);pthread_join(p[1], nullptr);delete bqueue;return 0;
}

Task.hpp

#pragma once#include 
#include typedef std::function func_t;class Task
{public:Task(){}Task(int x, int y, func_t func):x_(x), y_(y), func_(func){}int operator ()(){return func_(x_, y_);}
public:int x_;int y_;func_t func_;
};

lockGuard.hpp

#pragma once#include 
#include 
// 定义一把锁
class Mutex
{
public:Mutex(pthread_mutex_t *mtx):pmtx_(mtx){}void lock() {std::cout << "要进行加锁" << std::endl;pthread_mutex_lock(pmtx_);}void unlock(){std::cout << "要进行解锁" << std::endl;pthread_mutex_unlock(pmtx_);}~Mutex(){}
private:pthread_mutex_t *pmtx_;
};// RAII风格的加锁方式
class lockGuard
{
public:lockGuard(pthread_mutex_t *mtx):mtx_(mtx){mtx_.lock();}~lockGuard(){mtx_.unlock();}
private:Mutex mtx_;
};

Makefile

cp:ConProd.ccg++ -o $@ $^ -std=c++11 -lpthread
.PHONY:clean
clean:rm -f cp
  • pthread_cond_wait第二个参数是一个锁,当成功调用wait之后,传入的锁,会被自动释放!
  • 从哪里阻塞挂起,就从哪里唤醒, 被唤醒的时候,我们还是在临界区被唤醒的啊
  • RAII风格的加锁(封装了一个锁的类)

POSIX信号量

信号量的概念

 

  • 共享资源->任何时候都只有一个执行流在进行访问,
  • 如果一个共享资源,不当做一个整体,而让不同的执行流访问不同的局域的话,那就可以并发
  • 我们用信号量来表示这段共享资源中还剩多少个资源,

 信号量本质:

  • 是一个计数器,访问临界资源的时候,必须先申请信号量资源(sem--,预订资源,P),
  • 使用完毕信号量资源(sem++,释放资源,V)

 环形队列实现生产者消费者模型 

  • 生产者不能将消费者套圈,消费者不能超过生产者,
  • 为空: 一定要让生产者先运行
  • 为满: 一定要让消费者先运行

 生产者: 最关心的是空间资源->spaceSem->N

  •  P(spaceSem),将会在特定位置生产,然后V(dataSem)

消费者: 最关心的是数据资源->dataSem->0

  • P(dataSem),消费特定的数据,然后V(spaceSem)

Makefile

ring_queue:testMain.ccg++ -o $@ $^ -std=c++11 -lpthread
.PHONY:clean
clean:rm -f ring_queue

 ringQueue.hpp

#ifndef _Ring_QUEUE_HPP_
#define _Ring_QUEUE_HPP_#include 
#include 
#include 
#include "sem.hpp"const int g_default_num = 5;// 多线程
template
class RingQueue
{
public:RingQueue(int default_num = g_default_num): ring_queue_(default_num), num_(default_num),c_step(0),p_step(0),space_sem_(default_num),data_sem_(0){// 初始化锁pthread_mutex_init(&clock, nullptr);pthread_mutex_init(&plock, nullptr);}~RingQueue(){// 销毁锁pthread_mutex_destroy(&clock);pthread_mutex_destroy(&plock);}// 生产者: 空间资源, 生产者们的临界资源是什么?下标void push(const T &in){// 先申请信号量(0)space_sem_.p();// --pthread_mutex_lock(&plock); // ?// 一定是竞争成功的生产者线程 -- 就一个!ring_queue_[p_step++] = in;p_step %= num_;pthread_mutex_unlock(&plock);data_sem_.v();// ++ }// 消费者: 数据资源, 消费者们的临界资源是什么?下标void pop(T *out){data_sem_.p();pthread_mutex_lock(&clock);// 一定是竞争成功的消费者线程 -- 就一个!*out = ring_queue_[c_step++];c_step %= num_;pthread_mutex_unlock(&clock);space_sem_.v();}// void debug()// {//     std::cerr << "size: " << ring_queue_.size() << " num: " << num << std::endl;// } 
private:std::vector ring_queue_;int num_;int c_step; // 消费下标int p_step; // 生产下标Sem space_sem_;// 记录空间的信号量Sem data_sem_;// 记录空间数据的信号量pthread_mutex_t clock;// 消费者的锁pthread_mutex_t plock;// 生产者的锁
};#endif

sem.hpp

#ifndef _SEM_HPP_
#define _SEM_HPP_#include 
#include class Sem
{
public:Sem(int value){// 信号量初始化sem_init(&sem_, 0, value);}void p(){// 等待信号量,会将信号量的值减1sem_wait(&sem_);}void v(){// 发布信号量,会将信号量的值加1sem_post(&sem_);}~Sem(){// 销毁相互量sem_destroy(&sem_);}
private:sem_t sem_;
};#endif

 testMain.cc

#include "ringQueue.hpp"
#include 
#include 
#include 
#include // 消费者
void *consumer(void *args)
{// 生产者和消费者看到的是同一个rqRingQueue *rq = (RingQueue *)args;while(true){sleep(1);int x;// 1. 从环形队列中获取任务或者数据rq->pop(&x);// 2. 进行一定的处理 -- 不要忽略它的时间消耗问题std::cout << "生产: " << x << " [" << pthread_self() << "]" << std::endl;} 
}// 生产者
void *productor(void *args)
{// 生产者和消费者看到的是同一个rqRingQueue *rq = (RingQueue *)args;while(true){// sleep(1);// 1. 构建数据或者任务对象 -- 一般是可以从外部来 -- 不要忽略它的时间消耗问题int x = rand() % 100 + 1;std::cout << "消费:" << x << " [" << pthread_self() << "]" << std::endl;// 2. 推送到环形队列中rq->push(x); // 完成生产的过程}
}int main()
{srand((uint64_t)time(nullptr) ^ getpid());// 产生随机数RingQueue *rq = new RingQueue();// 多线程模式pthread_t c[3],p[2];pthread_create(c, nullptr, consumer, (void*)rq);pthread_create(c+1, nullptr, consumer, (void*)rq);pthread_create(c+2, nullptr, consumer, (void*)rq);pthread_create(p, nullptr, productor, (void*)rq);pthread_create(p+1, nullptr, productor, (void*)rq);for(int i = 0; i < 3; i++) pthread_join(c[i], nullptr);for(int i = 0; i < 2; i++) pthread_join(p[i], nullptr);return 0;
}

 多生产多消费的意义

  • 将数据或者任务生产前和拿到之后处理,才是最耗费时间的。

  • 生产的本质:私有的任务-> 公共空间中

  • 消费的本质:公共空间中的任务-> 私有化

信号量的意义

  • 信号量本质是一把计数器, 可以不用进入临界区,就可以得知资源情况,甚至可以减少临界区内部的判断!
  • 信号量可以提前预设资源的情况,而且在pv变化过程中,我们可以在外部就能知晓临界资源的情况

线程池

Makefile

thread_pool:testMain.ccg++ -o $@ $^ -std=c++11 -lpthread #-DDEBUG_SHOW
.PHONY:clean
clean:rm -f thread_pool
  • -DDEBUG_SHOW命令中定义了一个宏,多和条件变量配合使用

 Task.hpp

#pragma once#include 
#include 
#include 
#include "log.hpp"typedef std::function func_t;class Task
{
public:Task(){}Task(int x, int y, func_t func):x_(x), y_(y), func_(func){}void operator ()(const std::string &name){// std::cout << "线程 " << name << " 处理完成, 结果是: " << x_ << "+" << y_ << "=" << func_(x_, y_) << std::endl;logMessage(WARNING, "%s处理完成: %d+%d=%d | %s | %d",name.c_str(), x_, y_, func_(x_, y_), __FILE__, __LINE__);}
public:int x_;int y_;// int type;func_t func_;
};

lockGuard.hpp

#pragma once#include 
#include class Mutex
{
public:Mutex(pthread_mutex_t *mtx):pmtx_(mtx){}void lock() {// std::cout << "要进行加锁" << std::endl;pthread_mutex_lock(pmtx_);}void unlock(){// std::cout << "要进行解锁" << std::endl;pthread_mutex_unlock(pmtx_);}~Mutex(){}
private:pthread_mutex_t *pmtx_;
};// RAII风格的加锁方式
class lockGuard
{
public:lockGuard(pthread_mutex_t *mtx):mtx_(mtx){mtx_.lock();}~lockGuard(){mtx_.unlock();}
private:Mutex mtx_;
};

log.hpp

#pragma once#include 
#include 
#include 
#include 
#include // 日志是有日志级别的
#define DEBUG   0
#define NORMAL  1
#define WARNING 2
#define ERROR   3
#define FATAL   4const char *gLevelMap[] = {"DEBUG","NORMAL","WARNING","ERROR","FATAL"
};#define LOGFILE "./threadpool.log"// 完整的日志功能,至少: 日志等级 时间 支持用户自定义(日志内容, 文件行,文件名)
void logMessage(int level, const char *format, ...)
{
#ifndef DEBUG_SHOWif(level== DEBUG) return;
#endif// va_list ap;// va_start(ap, format);// while()// int x = va_arg(ap, int);// va_end(ap); //ap=nullptrchar stdBuffer[1024]; //标准部分time_t timestamp = time(nullptr);// struct tm *localtime = localtime(×tamp);snprintf(stdBuffer, sizeof stdBuffer, "[%s] [%ld] ", gLevelMap[level], timestamp);char logBuffer[1024]; //自定义部分va_list args;va_start(args, format);// vprintf(format, args);vsnprintf(logBuffer, sizeof logBuffer, format, args);va_end(args);FILE *fp = fopen(LOGFILE, "a");// printf("%s%s\n", stdBuffer, logBuffer);fprintf(fp, "%s%s\n", stdBuffer, logBuffer);fclose(fp);
}

 

thread.hpp 

#pragma once
#include 
#include 
#include 
#include // typedef std::function fun_t;
typedef void* (*fun_t)(void*);class ThreadData
{
public:void* args_;std::string name_;
};class Thread
{
public:Thread(int num, fun_t callback, void* args) : func_(callback){char nameBuffer[64];snprintf(nameBuffer, sizeof nameBuffer, "Thread-%d", num);name_ = nameBuffer;tdata_.args_ = args;tdata_.name_ = name_;}void start(){pthread_create(&tid_, nullptr, func_, (void*)&tdata_);}void join(){pthread_join(tid_, nullptr);}std::string name(){return name_;}~Thread(){}private:std::string name_;fun_t func_;ThreadData tdata_;pthread_t tid_;
};

threadPool.hpp

#pragma once#include 
#include 
#include 
#include 
#include 
#include "thread.hpp"
#include "lockGuard.hpp"
#include "log.hpp"const int g_thread_num = 3;
// 本质是: 生产消费模型
template 
class ThreadPool
{
public:pthread_mutex_t* getMutex(){return &lock;}bool isEmpty(){return task_queue_.empty();}void waitCond(){pthread_cond_wait(&cond, &lock);}T getTask(){T t = task_queue_.front();task_queue_.pop();return t;}private:ThreadPool(int thread_num = g_thread_num) : num_(thread_num){pthread_mutex_init(&lock, nullptr);pthread_cond_init(&cond, nullptr);for (int i = 1; i <= num_; i++){threads_.push_back(new Thread(i, routine, this));}}ThreadPool(const ThreadPool& other) = delete;const ThreadPool& operator=(const ThreadPool& other) = delete;public:// 考虑一下多线程使用单例的过程static ThreadPool* getThreadPool(int num = g_thread_num){// 可以有效减少未来必定要进行加锁检测的问题// 拦截大量的在已经创建好单例的时候,剩余线程请求单例的而直接访问锁的行为if (nullptr == thread_ptr){lockGuard lockguard(&mutex);// 但是,未来任何一个线程想获取单例,都必须调用getThreadPool接口// 但是,一定会存在大量的申请和释放锁的行为,这个是无用且浪费资源的// pthread_mutex_lock(&mutex);if (nullptr == thread_ptr){thread_ptr = new ThreadPool(num);}// pthread_mutex_unlock(&mutex);}return thread_ptr;}// 1. run()void run(){for (auto& iter : threads_){iter->start();// std::cout << iter->name() << " 启动成功" << std::endl;logMessage(NORMAL, "%s %s", iter->name().c_str(), "启动成功");}}// 线程池本质也是一个生产消费模型// void *routine(void *args)// 消费过程static void* routine(void* args){ThreadData* td = (ThreadData*)args;ThreadPool* tp = (ThreadPool *)td->args_;while (true){T task;{lockGuard lockguard(tp->getMutex());while (tp->isEmpty())tp->waitCond();// 读取任务task = tp->getTask(); // 任务队列是共享的-> 将任务从共享,拿到自己的私有空间}task(td->name_);// lock// while(task_queue_.empty()) wait();// 获取任务// unlock// 处理任务}}// 2. pushTask()void pushTask(const T& task){lockGuard lockguard(&lock);task_queue_.push(task);pthread_cond_signal(&cond);}// test func// void joins()// {//     for (auto &iter : threads_)//     {//         iter->join();//     }// }~ThreadPool(){for (auto& iter : threads_){iter->join();delete iter;}pthread_mutex_destroy(&lock);pthread_cond_destroy(&cond);}private:std::vector threads_;int num_;std::queue task_queue_;static ThreadPool* thread_ptr;static pthread_mutex_t mutex;// 方案2://  queue1,queue2//  std::queue *p_queue, *c_queue//  p_queue->queue1//  c_queue->queue2//  p_queue -> 生产一批任务之后,swap(p_queue,c_queue),唤醒所有线程/一个线程//  当消费者处理完毕的时候,你也可以进行swap(p_queue,c_queue)//  因为我们生产和消费用的是不同的队列,未来我们要进行资源的处理的时候,仅仅是指针pthread_mutex_t lock;pthread_cond_t cond;
};
template 
ThreadPool* ThreadPool::thread_ptr = nullptr;template 
pthread_mutex_t ThreadPool::mutex = PTHREAD_MUTEX_INITIALIZER;

  • 进入{}创建lockGuard对象,并调用构造函数
  • 出{}调用lockGuard对象的析构函数

 

  •  这里使用双重判断,主要为了拦截大量的在已经创建好单例的时候,
  • 剩余线程请求单例的而直接访问锁的行为

testMain.cc

#include "threadPool.hpp"
#include "Task.hpp"
#include 
#include 
#include 
#include // void *run(void *args)
// {
//     while(true)
//     {
//         ThreadPool::getThreadPool();
//     }
// }int main()
{// logMessage(NORMAL, "%s %d %c %f \n", "这是一条日志信息", 1234, 'c', 3.14);srand((unsigned long)time(nullptr) ^ getpid());// ThreadPool *tp = new ThreadPool();// ThreadPool *tp = ThreadPool::getThreadPool();// 那么,如果单例本身也在被多线程申请使用呢??ThreadPool::getThreadPool()->run();//thread1,2,3,4while(true){//生产的过程,制作任务的时候,要花时间int x = rand()%100 + 1;usleep(7721);int y = rand()%30 + 1;Task t(x, y, [](int x, int y)->int{return x + y;});// std::cout << "制作任务完成: " << x << "+" << y << "=?" << std::endl;logMessage(DEBUG, "制作任务完成: %d+%d=?", x, y);logMessage(DEBUG, "制作任务完成: %d+%d=?", x, y);logMessage(DEBUG, "制作任务完成: %d+%d=?", x, y);logMessage(DEBUG, "制作任务完成: %d+%d=?", x, y);// 推送任务到线程池中ThreadPool::getThreadPool()->pushTask(t);sleep(1);}return 0;
}

threadpool.log

[NORMAL] [1675916038] Thread-1 启动成功
[NORMAL] [1675916038] Thread-2 启动成功
[NORMAL] [1675916038] Thread-3 启动成功
[WARNING] [1675916038] Thread-1处理完成: 21+20=41 | Task.hpp | 20
[WARNING] [1675916039] Thread-2处理完成: 69+30=99 | Task.hpp | 20
[WARNING] [1675916040] Thread-3处理完成: 97+21=118 | Task.hpp | 20
[WARNING] [1675916041] Thread-1处理完成: 74+21=95 | Task.hpp | 20
[WARNING] [1675916042] Thread-2处理完成: 30+24=54 | Task.hpp | 20
[WARNING] [1675916043] Thread-3处理完成: 95+17=112 | Task.hpp | 20
[WARNING] [1675916044] Thread-1处理完成: 76+27=103 | Task.hpp | 20
[WARNING] [1675916045] Thread-2处理完成: 67+7=74 | Task.hpp | 20
[WARNING] [1675916046] Thread-3处理完成: 38+4=42 | Task.hpp | 20
[WARNING] [1675916047] Thread-1处理完成: 62+19=81 | Task.hpp | 20
[WARNING] [1675916048] Thread-2处理完成: 93+21=114 | Task.hpp | 20
[WARNING] [1675916049] Thread-3处理完成: 64+13=77 | Task.hpp | 20
[WARNING] [1675916050] Thread-1处理完成: 81+3=84 | Task.hpp | 20
[WARNING] [1675916051] Thread-2处理完成: 86+26=112 | Task.hpp | 20
[WARNING] [1675916052] Thread-3处理完成: 24+21=45 | Task.hpp | 20
[WARNING] [1675916053] Thread-1处理完成: 69+6=75 | Task.hpp | 20

相关内容