#include
#include
#include
#include #define NUM 8class BlockQueue
{
private:std::queue q;int cap;pthread_mutex_t lock;pthread_cond_t full;pthread_cond_t empty;private:void LockQueue(){pthread_mutex_lock(&lock);}void UnLockQueue(){pthread_mutex_unlock(&lock);}void ProductWait(){pthread_cond_wait(&full, &lock);}void ConsumeWait(){pthread_cond_wait(&empty, &lock);}void NotifyProduct(){pthread_cond_signal(&full);}void NotifyConsume(){pthread_cond_signal(&empty);}bool IsEmpty(){return (q.size() == 0 ? true : false);}bool IsFull(){return (q.size() == cap ? true : false);}public:BlockQueue(int _cap = NUM) :cap(_cap){pthread_mutex_init(&lock, NULL);pthread_cond_init(&full, NULL);pthread_cond_init(&empty, NULL);}void PushData(const int& data){LockQueue();while (IsFull()){NotifyConsume();std::cout << "queue full, notify consume data, product stop." << std::endl;ProductWait();}q.push(data);// NotifyConsume();UnLockQueue();}void PopData(int& data){LockQueue();while (IsEmpty()){NotifyProduct();std::cout << "queue empty, notify product data, consume stop." << std::endl;ConsumeWait();}data = q.front();q.pop();// NotifyProduct();UnLockQueue();}~BlockQueue(){pthread_mutex_destroy(&lock);pthread_cond_destroy(&full);pthread_cond_destroy(&empty);}
};
pthread_t c, p;
pthread_create(&c, NULL, consumer, (void*)&bq);
pthread_create(&p, NULL, producter, (void*)&bq);pthread_join(c, NULL);
pthread_join(p, NULL);
return 0;
}void* consumer(void* arg)
{BlockQueue* bqp = (BlockQueue*)arg;int data;for (; ; ) {bqp->PopData(data);std::cout << "Consume data done : " << data << std::endl;}
}//more faster
void* producter(void* arg)
{BlockQueue* bqp = (BlockQueue*)arg;srand((unsigned long)time(NULL));for (; ; ) {int data = rand() % 1024;bqp->PushData(data);std::cout << "Prodoct data done: " << data << std::endl;// sleep(1);}
}
int main()
{BlockQueue bq;pthread_t c, p;pthread_create(&c, NULL, consumer, (void*)&bq);pthread_create(&p, NULL, producter, (void*)&bq);pthread_join(c, NULL);pthread_join(p, NULL);return 0;
}
#include
int sem_init(sem_t *sem, int pshared, unsigned int value);
参数:pshared:0表示线程间共享,非零表示进程间共享value:信号量初始值
int sem_destroy(sem_t *sem);
功能:等待信号量,会将信号量的值减1
int sem_wait(sem_t *sem); //P()
功能:发布信号量,表示资源使用完毕,可以归还资源了。将信号量值加1。
int sem_post(sem_t *sem);//V()
#include
#include
#include
#include
#include #define NUM 16class RingQueue
{
private:std::vector q;int cap;sem_t data_sem;sem_t space_sem;int consume_step;int product_step;public:RingQueue(int _cap = NUM) :q(_cap), cap(_cap){sem_init(&data_sem, 0, 0);sem_init(&space_sem, 0, cap);consume_step = 0;product_step = 0;}void PutData(const int& data){sem_wait(&space_sem); // Pq[consume_step] = data;consume_step++;consume_step %= cap;sem_post(&data_sem); //V}void GetData(int& data){sem_wait(&data_sem);data = q[product_step];product_step++;product_step %= cap;sem_post(&space_sem);}~RingQueue(){sem_destroy(&data_sem);sem_destroy(&space_sem);}
};
void* consumer(void* arg)
{RingQueue* rqp = (RingQueue*)arg;int data;for (; ; ) {rqp->GetData(data);std::cout << "Consume data done : " << data << std::endl;sleep(1);}
}//more faster
void* producter(void* arg)
{RingQueue* rqp = (RingQueue*)arg;srand((unsigned long)time(NULL));for (; ; ) {int data = rand() % 1024;rqp->PutData(data);std::cout << "Prodoct data done: " << data << std::endl;// sleep(1);}
}int main()
{RingQueue rq;pthread_t c, p;pthread_create(&c, NULL, consumer, (void*)&rq);pthread_create(&p, NULL, producter, (void*)&rq);pthread_join(c, NULL);pthread_join(p, NULL);
}
/*threadpool.h*/
/* 线程池:
* 一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着
监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利
用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。
* 线程池的应用场景:
* 1. 需要大量的线程来完成任务,且完成任务的时间比较短。 WEB服务器完成网页请求这样的任务,使用线程池技
术是非常合适的。因为单个任务小,而任务数量巨大,你可以想象一个热门网站的点击次数。 但对于长时间的任务,比如一个
Telnet连接请求,线程池的优点就不明显了。因为Telnet会话时间比线程的创建时间大多了。
* 2. 对性能要求苛刻的应用,比如要求服务器迅速响应客户请求。
* 3. 接受突发性的大量请求,但不至于使服务器因此产生大量线程的应用。突发性大量客户请求,在没有线程池情
况下,将产生大量线程,虽然理论上大部分操作系统线程数目最大值不是问题,短时间内产生大量线程可能使内存到达极限,
出现错误.
* 线程池的种类:
* 线程池示例:
* 1. 创建固定数量线程池,循环从任务队列中获取任务对象,
* 2. 获取到任务对象后,执行任务对象中的任务接口
*//*threadpool.hpp*/
#ifndef __M_TP_H__
#define __M_TP_H__
#include
#include
#include #define MAX_THREAD 5typedef bool (*handler_t)(int);class ThreadTask
{
private:int _data;handler_t _handler;
public:ThreadTask() :_data(-1), _handler(NULL){}ThreadTask(int data, handler_t handler){_data = data;_handler = handler;}void SetTask(int data, handler_t handler){_data = data;_handler = handler;}void Run() {_handler(_data);}
};class ThreadPool
{
private:int _thread_max;int _thread_cur;bool _tp_quit;std::queue _task_queue;pthread_mutex_t _lock;pthread_cond_t _cond;
private:void LockQueue() {pthread_mutex_lock(&_lock);}void UnLockQueue(){pthread_mutex_unlock(&_lock);}void WakeUpOne() {pthread_cond_signal(&_cond);}void WakeUpAll() {pthread_cond_broadcast(&_cond);}void ThreadQuit() {_thread_cur--;UnLockQueue();pthread_exit(NULL);}void ThreadWait(){if (_tp_quit){ThreadQuit();}pthread_cond_wait(&_cond, &_lock);}bool IsEmpty() {return _task_queue.empty();}static void* thr_start(void* arg){ThreadPool* tp = (ThreadPool*)arg;while (1) {tp->LockQueue();while (tp->IsEmpty()){tp->ThreadWait();}ThreadTask* tt;tp->PopTask(&tt);tp->UnLockQueue();tt->Run();delete tt;}return NULL;}
public:ThreadPool(int max = MAX_THREAD) :_thread_max(max), _thread_cur(max),_tp_quit(false) {pthread_mutex_init(&_lock, NULL);pthread_cond_init(&_cond, NULL);}~ThreadPool() {pthread_mutex_destroy(&_lock);pthread_cond_destroy(&_cond);}bool PoolInit() {pthread_t tid;for (int i = 0; i < _thread_max; i++) {int ret = pthread_create(&tid, NULL, thr_start, this);if (ret != 0) {std::cout << "create pool thread error\n";return false;}}return true;}bool PushTask(ThreadTask* tt){LockQueue();if (_tp_quit) {UnLockQueue();return false;}_task_queue.push(tt);WakeUpOne();UnLockQueue();return true;}bool PopTask(ThreadTask** tt) {*tt = _task_queue.front();_task_queue.pop();return true;}bool PoolQuit(){LockQueue();_tp_quit = true;UnLockQueue();while (_thread_cur > 0) {WakeUpAll();usleep(1000);}return true;}
};
#endif/*main.cpp*/
bool handler(int data)
{srand(time(NULL));int n = rand() % 5;printf("Thread: %p Run Tast: %d--sleep %d sec\n", pthread_self(), data, n);sleep(n);return true;
}int main()
{int i;ThreadPool pool;pool.PoolInit();for (i = 0; i < 10; i++){ThreadTask* tt = new ThreadTask(i, handler);pool.PushTask(tt);}pool.PoolQuit();return 0;
}
g++ -std=c++0x test.cpp -o test -pthread -lrt
吃完饭, 立刻洗碗, 这种就是饿汉方式. 因为下一顿吃的时候可以立刻拿着碗就能吃饭.
吃完饭, 先把碗放下, 然后下一顿饭用到这个碗了再洗碗, 就是懒汉方式.
懒汉方式最核心的思想是 "延时加载". 从而能够优化服务器的启动速度。
template
class Singleton
{static T data;
public:static T* GetInstance(){return &data;}
};
只要通过 Singleton 这个包装类来使用 T 对象, 则一个进程中只有一个T对象的实例。
template
class Singleton
{static T* inst;
public:static T* GetInstance() {if (inst == NULL){inst = new T();}return inst;}
};
存在一个严重的问题, 线程不安全。第一次调用 GetInstance 的时候, 如果两个线程同时调用, 可能会创建出两份T对象的实例。但是后续再次调用, 就没有问题了。
// 懒汉模式, 线程安全
template
class Singleton
{volatile static T* inst; // 需要设置 volatile 关键字, 否则可能被编译器优化.static std::mutex lock;
public:static T* GetInstance() {if (inst == NULL) { // 双重判定空指针, 降低锁冲突的概率, 提高性能.lock.lock(); // 使用互斥锁, 保证多线程情况下也只调用一次 new.if (inst == NULL) {inst = new T();}lock.unlock();}return inst;}
};
注意事项:
int pthread_rwlockattr_setkind_np(pthread_rwlockattr_t *attr, int pref);
/*
pref 共有 3 种选择PTHREAD_RWLOCK_PREFER_READER_NP (默认设置) 读者优先,可能会导致写者饥饿情况
PTHREAD_RWLOCK_PREFER_WRITER_NP 写者优先,目前有 BUG,导致表现行为和
PTHREAD_RWLOCK_PREFER_READER_NP 一致PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP 写者优先,但写者不能递归加锁
*/
int pthread_rwlock_init(pthread_rwlock_t *restrict rwlock,const pthread_rwlockattr_t
*restrict attr);
int pthread_rwlock_destroy(pthread_rwlock_t *rwlock);
int pthread_rwlock_rdlock(pthread_rwlock_t *rwlock);
int pthread_rwlock_wrlock(pthread_rwlock_t *rwlock);int pthread_rwlock_unlock(pthread_rwlock_t *rwlock);
#include
#include
#include
#include
#include
#include
#include volatile int ticket = 1000;
pthread_rwlock_t rwlock;void* reader(void* arg)
{char* id = (char*)arg;while (1){pthread_rwlock_rdlock(&rwlock);if (ticket <= 0) {pthread_rwlock_unlock(&rwlock);break;}printf("%s: %d\n", id, ticket);pthread_rwlock_unlock(&rwlock);usleep(1);}return nullptr;
}void* writer(void* arg)
{char* id = (char*)arg;while (1) {pthread_rwlock_wrlock(&rwlock);if (ticket <= 0) {pthread_rwlock_unlock(&rwlock);break;}printf("%s: %d\n", id, --ticket);pthread_rwlock_unlock(&rwlock);usleep(1);}return nullptr;
}struct ThreadAttr
{pthread_t tid;std::string id;
};std::string create_reader_id(std::size_t i)
{// 利用 ostringstream 进行 string 拼接std::ostringstream oss("thread reader ", std::ios_base::ate);oss << i;return oss.str();
}std::string create_writer_id(std::size_t i)
{// 利用 ostringstream 进行 string 拼接std::ostringstream oss("thread writer ", std::ios_base::ate);oss << i;return oss.str();
}void init_readers(std::vector& vec)
{for (std::size_t i = 0; i < vec.size(); ++i) {vec[i].id = create_reader_id(i);pthread_create(&vec[i].tid, nullptr, reader, (void*)vec[i].id.c_str());}
}void init_writers(std::vector& vec)
{for (std::size_t i = 0; i < vec.size(); ++i) {vec[i].id = create_writer_id(i);pthread_create(&vec[i].tid, nullptr, writer, (void*)vec[i].id.c_str());}
}void join_threads(std::vector const& vec)
{// 我们按创建的 逆序 来进行线程的回收for (std::vector::const_reverse_iterator it = vec.rbegin(); it !=vec.rend(); ++it) {pthread_t const& tid = it->tid;pthread_join(tid, nullptr);}
}void init_rwlock()
{
#if 0 // 写优先pthread_rwlockattr_t attr;pthread_rwlockattr_init(&attr);pthread_rwlockattr_setkind_np(&attr, PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP);pthread_rwlock_init(&rwlock, &attr);pthread_rwlockattr_destroy(&attr);
#else // 读优先,会造成写饥饿pthread_rwlock_init(&rwlock, nullptr);
#endif
}int main()
{// 测试效果不明显的情况下,可以加大 reader_nr// 但也不能太大,超过一定阈值后系统就调度不了主线程了const std::size_t reader_nr = 1000;const std::size_t writer_nr = 2;std::vector readers(reader_nr);std::vector writers(writer_nr);init_rwlock();init_readers(readers);init_writers(writers);join_threads(writers);join_threads(readers);pthread_rwlock_destroy(&rwlock);
}
main: main.cppg++ -std=c++11 -Wall -Werror $^ -o $@ -lpthread