Linux下精简线程池的实现
文章目录
在Linux下使用C++调用pthread API实现的一个线程池。
简介
这个线程池是在学习完《Linux/UNIX系统编程手册》中线程相关知识后用来练手的小项目,线程相关函数都是直接调用Linux的API,并且使用了C++中的queue和vector。 虽然C++中也提供了线程创建、互斥锁等函数库,但是也是对系统函数的封装。并且作为初学,先学会用原生函数比较好。
基础知识
pthread API
函数命名
pthread_ 线程本身和各种相关函数
pthread_attr_ 线程属性对象
pthread_mutex_ 互斥量
pthread_mutexattr_ 互斥量属性对象
pthread_cond_ 条件变量
pthread_condattr_ 条件变量属性对象
pthread_key_ 线程数据键(Thread-specific data keys)
线程管理相关函数
int pthread_create(pthread_t *restrict tidp,const pthread_attr_t *restrict_attr,void*(*start_rtn)(void*),void *restrict arg);
pthread_create是UNIX环境创建线程函数。
返回值:若成功则返回0,否则返回出错编号
参数:
第一个参数为指向线程标识符的指针。
第二个参数用来设置线程属性。
第三个参数是线程运行函数的地址。
最后一个参数是运行函数的参数。
int pthread_join(pthread_t thread, void **retval);
pthread_join()函数,以阻塞的方式等待thread指定的线程结束。当函数返回时,被等待线程的资源被收回。如果线程已经结束,那么该函数会立即返回。
返回值:0代表成功,失败返回错误号。
参数:
thread: 线程标识符,即线程ID,标识唯一线程。
retval: 用户定义的指针,用来存储被等待线程的返回值。
int pthread_detach(pthread_t thread);
使主线程与该线程分离,线程结束后,其退出状态不由其他线程获取,而直接自己自动释放。
返回值:0代表成功,失败返回错误号。
参数:线程标识符
int pthread_cancel(pthread_t thread);
该函数使目标线程停止执行,调用该方法后,被终止的线程并不一定立马被终止,只有在下次系统调用或调用了pthread_testcancel()方法后,才真正终止线程。
void pthread_exit(void* retval);
使当前线程结束运行。
int pthread_kill(pthread_t thread, int sig);
向指定ID的线程发送sig信号,如果线程代码内不做处理,则按照信号默认的行为影响整个进程,也就是说,如果你给一个线程发送了SIGQUIT,但线程却没有实现signal处理函数,则整个进程退出。
如果int sig是0呢,这是一个保留信号,一个作用是用来判断线程是不是还活着。
互斥锁
当不同的线程需要对同一块资源进行访问时,为了保证资源的安全,可以给其加锁。
int pthread_mutex_init(pthread_mutex_t *restrict mutex, const pthread_mutexattr_t *restric attr);
对锁进行初始化。
PTHREAD_MUTEX_TIMED_N 这是缺省值,也就是普通锁。当一个线程加锁以后,其余请求锁的线程将形成一个等待队列,并在解锁后按优先级获得锁。这种锁策略保证了资源分配的公平性。
PTHREAD_MUTEX_RECURSIVE_NP 嵌套锁,允许同一个线程对同一个锁成功获得多次,并通过多次unlock解锁。如果是不同线程请求,则在加锁线程解锁时重新竞争。
PTHREAD_MUTEX_ERRORCHECK_NP 检错锁,如果同一个线程请求同一个锁,则返回EDEADLK,否则与PTHREAD_MUTEX_TIMED_NP类型动作相同。这样就保证当不允许多次加锁时不会出现最简单情况下的死锁。
PTHREAD_MUTEX_ADAPTIVE_NP 适应锁,动作最简单的锁类型,仅等待解锁后重新竞争。
int pthread_mutex_lock(pthread_mutex_t *mutex);
给互斥量加锁,如果该互斥量已经加锁,那么该线程将等待互斥量解锁。
返回值:成功:0,失败:错误码
int pthread_mutex_unlock(pthread_mutex_t *mutex);
返回值:成功:0,失败:错误码
int pthread_mutex_destory(pthread_mutex_t *mutex);
返回值:成功:0,失败:错误码
条件变量
与互斥锁不同,条件变量是用来等待而不是用来上锁的。条件变量用来自动阻塞一个线程,直 到某特殊情况发生为止。通常条件变量和互斥锁同时使用。 条件变量使我们可以睡眠等待某种条件出现。条件变量是利用线程间共享的全局变量进行同步 的一种机制,主要包括两个动作:1.一个线程等待”条件变量的条件成立”而挂起;2.另一个线程使 “条件成立”(给出条件成立信号)。
代码示例:
In Thread1:
pthread_mutex_lock(&m_mutex);
pthread_cond_wait(&m_cond,&m_mutex);
pthread_mutex_unlock(&m_mutex);
In Thread2:
pthread_mutex_lock(&m_mutex);
pthread_cond_signal(&m_cond);
pthread_mutex_unlock(&m_mutex);
pthread_cond_wait(cond, mutex)的功能有3个: 调用者线程首先释放mutex 然后阻塞,等待被别的线程唤醒 当调用者线程被唤醒后,调用者线程会再次获取mutex
线程清理函数
pthread_cleanup_push & pthread_cleanup_pop pthread_cleanup_push()/pthread_cleanup_pop()的详解
在POSIX线程API中提供了一个pthread_cleanup_push()/pthread_cleanup_pop()函数对用于自动释放资源 --从pthread_cleanup_push()的调用点到pthread_cleanup_pop()之间的程序段中的终止动作(pthread_exit(),在执行return时不会执行清理函数)都将执行pthread_cleanup_push()所指定的清理函数。API定义如下:
void pthread_cleanup_push(void (*routine) (void *), void *arg)
void pthread_cleanup_pop(int execute)
pthread_cleanup_push()/pthread_cleanup_pop()采用先入后出的栈结构管理,void routine(void *arg)函数在调用pthread_cleanup_push()时压入清理函数栈,多次对pthread_cleanup_push()的调用将在清理函数栈中形成一个函数链,在执行该函数链时按照压栈的相反顺序弹出。execute参数表示执行到pthread_cleanup_pop()时是否在弹出清理函数的同时执行该函数,为0表示不执行,非0为执行;这个参数并不影响异常终止时清理函数的执行。
pthread_cleanup_push()/pthread_cleanup_pop()是以宏方式实现的,这是pthread.h中的宏定义:
#define pthread_cleanup_push(routine,arg)
{ struct _pthread_cleanup_buffer _buffer;
_pthread_cleanup_push (&_buffer, (routine), (arg));
#define pthread_cleanup_pop(execute)
_pthread_cleanup_pop (&_buffer, (execute)); }
可见,pthread_cleanup_push()带有一个"{",而pthread_cleanup_pop()带有一个"}",因此这两个函数必须成对出现,且必须位于程序的同一级别的代码段中才能通过编译。在下面的例子里,当线程在"do some work"中终止时,将主动调用pthread_mutex_unlock(mut),以完成解锁动作。
work"中终止时,将主动调用pthread_mutex_unlock(mut),以完成解锁动作。
pthread_cleanup_push(pthread_mutex_unlock, (void *) &mut);
pthread_mutex_lock(&mut);
/* do some work */
pthread_mutex_unlock(&mut);
pthread_cleanup_pop(0);
必须要注意的是,如果线程处于PTHREAD_CANCEL_ASYNCHRONOUS状态,上述代码段就有可能出错,因为CANCEL事件有可能在
pthread_cleanup_push()和pthread_mutex_lock()之间发生,或者在pthread_mutex_unlock()和pthread_cleanup_pop()之间发生,从而导致清理函数unlock一个并没有加锁的
mutex变量,造成错误。因此,在使用清理函数的时候,都应该暂时设置成PTHREAD_CANCEL_DEFERRED模式。
结构
为了缓存添加进来的任务,需要创建一个队列来存储任务,并且还用一个vector数组来存储线程。 使用的是生产者-消费者模型,其中addOneTask()是往队列中添加任务,是生产者。而执行任务的线程有多个,所以每一个线程都是消费者。 每次添加完任务之后,会使用条件变量通知“一个”空闲线程来执行任务
遇到的问题
- 惊群问题 在每次添加任务之后,需要通知一个线程来执行任务,这里如果使用pthread_cond_broadcast(), 就会唤醒所有线程,而只能有一个线程得到任务,其他线程只能回去继续等待。这样就造成了不必要的浪费。 但即使使用pthread_cond_signal(),好像有些系统的实现,也可能会唤醒不止一个线程。 为了解决这个问题,可以给每一个线程添加一个条件变量,如果有任务被添加,此时只要看哪个线程处于空闲状态,只通知那一个线程即可。
- 静态函数访问非静态成员 在使用pthread_create()创建线程的时候,往里传的函数必须是静态函数,但是我们经常会需要在这个静态函数里访问类的非静态成员变量,那怎么办呢? 这里已知有两种方法解决这个问题: 1.创建线程时,需要用arg往里传递回调函数的参数,可以在这里把当前对象的地址封装到回调函数的参数arg里面,然后在回调函数中使用这个对象地址来调用他的非静态成员变量。 2.直接把需要访问的普通成员变量改成静态的。由于这种方法比较简单,并且已经满足当前需求,在线程池使用的这种方法。
代码
github链接:https://github.com/TWS-YIFEI/ThreadPool
注释版代码:ThreadPool.h
//ThreadPool.h
#ifndef THREAD_POOL
#define THREAD_POOL
#include <iostream>
#include <queue>
#include <vector>
#include <string>
#include <pthread.h>
using namespace std;
class ThreadPool;
//要处理的任务以及参数
typedef struct{
void *(*function)(void *);
void *arg;
}Task;
//线程结构体
typedef struct{
pthread_t threadid;
bool state;//false休闲 true忙碌
pthread_cond_t cond;
}Thread;
class ThreadPool{
private:
static queue<Task> task_queue; //任务队列
static pthread_mutex_t task_queue_mutex; //任务队列的互斥体
int maxqueuetaskcount; //最大队列数量,暂时没有用到该变量
static vector<Thread> thread_pool; //线程池
static pthread_mutex_t thread_pool_mutex; //线程池的互斥体
static bool shutdown; //销毁线程池的标志
private:
void initThreadPool(unsigned int count); //初始化线程池
int chooseLeisureThread(); //选择空闲线程
static void *threadFunction(void *arg); //
static void cleanupFunction(void *arg); //
static void excuteAndTest(int s,string str); //测试函数,如果s==0,说明函数调用出错,然后打印错误信息。
public:
ThreadPool(const ThreadPool&)=delete; //禁用红拷贝构造函数
ThreadPool& operator=(const ThreadPool&)=delete; //禁用拷贝赋值运算符
//explicit禁用隐式类型转换
explicit ThreadPool(int threadcount,int maxtaskcount); //带参构造(要创建的线程数量以及最大任务数量)
bool addOneTask(Task task); //添加一个任务
void destroyThreadPool(); //销毁线程池
};
#endif
ThreadPool.cpp
//ThreadPool.cpp
#include "ThreadPool.h"
std::queue<Task> ThreadPool::task_queue;
pthread_mutex_t ThreadPool::task_queue_mutex=PTHREAD_MUTEX_INITIALIZER;
std::vector<Thread> ThreadPool::thread_pool;
pthread_mutex_t ThreadPool::thread_pool_mutex=PTHREAD_MUTEX_INITIALIZER;
bool ThreadPool::shutdown=false;
//带参构造
ThreadPool::ThreadPool(int threadcount,int maxtaskcount):maxqueuetaskcount(maxtaskcount){
initThreadPool(threadcount);
}
void ThreadPool::initThreadPool(unsigned int count){
Thread tmp;
//该静态变量属于类(是不是直接改成类的成员比较好?)
static int pthcrearg[100005];
tmp.state=false;
for(unsigned int i=0;i<count;i++){
pthcrearg[i]=i;
excuteAndTest(
//初始化每个线程的条件变量
pthread_cond_init(&(tmp.cond),NULL),
"pthread_cond_init(tmp,cond)"
);
cout<<i<<" pthread_cond init sucess"<<endl;
excuteAndTest(
pthread_create(&(tmp.threadid),NULL,threadFunction,(void *)&pthcrearg[i]),
"pthread_create()"
);
thread_pool.emplace_back(tmp);
}
}
//选择空闲的线程,这里state需要改为atomic的!
int ThreadPool::chooseLeisureThread(){
for(int i=0;i<(int)thread_pool.size();i++){
if(thread_pool[i].state==false) return i;
}
return -1;
}
//每个线程的函数,在里面执行回调函数arg
void * ThreadPool::threadFunction(void *arg){
while(1){
excuteAndTest(
//给队列解锁,准备访问队列
pthread_mutex_lock(&(task_queue_mutex)),
"task_queu_mutex_lock in threadFunction"
);
//如果队列中没有任务,就解锁然后阻塞,等待新任务
while(task_queue.size()==0&&shutdown==false){
excuteAndTest(
pthread_cond_wait(&(thread_pool[*(int*)arg].cond),&(task_queue_mutex)),
"pthread_cond_wait in threadFunction"
);
}
if(shutdown==true){
break;
}
Task tt;
pthread_cleanup_push(cleanupFunction,arg); //设置线程清理函数
tt=task_queue.front(); //从队列中取任务
task_queue.pop();
excuteAndTest(
//解锁
pthread_mutex_unlock(&(task_queue_mutex)),
"pthread_mutex_unlock in threadFunction"
);
//从cleanup_push到这里 如果意外退出,清理函数会将task_queue_mutex解锁,此时不需要了,直接将函数出栈即可
pthread_cleanup_pop(0);
(tt.function)(tt.arg); //执行任务
thread_pool[*(int*)arg].state=false;
//设置进程取消点,检测是否有cancel信号
//线程取消功能处于启用状态且取消状态设置为延迟状态时,pthread_testcancel()函数有效。
//如果在取消功能处处于禁用状态下调用pthread_testcancel(),则该函数不起作用。
pthread_testcancel();
}
cout<<"thread "<<*(int*)arg<<" ready to shutdown"<<endl;
excuteAndTest(
pthread_mutex_unlock(&(task_queue_mutex)),
"pthread_mutex_unlock in threadFunction"
);
excuteAndTest(
//销毁条件变量
pthread_cond_destroy(&(thread_pool[*(int*)arg].cond)),
"destroy pthread in threadFunction"
);
cout<<"thread "<<*(int*)arg<<" cond destroy sucess"<<endl;
pthread_exit(NULL);
}
//线程清理函数
void ThreadPool::cleanupFunction(void *arg){
excuteAndTest(
pthread_mutex_unlock(&(task_queue_mutex)),
"pthread_mutex_unlock in cleanupFunction"
);
}
//添加任务
bool ThreadPool::addOneTask(Task task){
excuteAndTest(
//给任务队列加锁
pthread_mutex_lock(&task_queue_mutex),
"pthread_mutex_lock in addOneTask"
);
task_queue.push(task); //任务入队
excuteAndTest(
pthread_mutex_unlock(&task_queue_mutex),
"pthread_mutex_unlock in addOneTask"
);
excuteAndTest(
//给线程池加锁
pthread_mutex_lock(&thread_pool_mutex),
"pthread_mutex_lock in addOneTask"
);
int leisurethread=chooseLeisureThread(); //选择空闲线程
if(leisurethread>=0){
thread_pool[leisurethread].state=true;
excuteAndTest(
pthread_mutex_unlock(&thread_pool_mutex),
"pthread_mutex_unlock in addOneTask"
);
excuteAndTest(
//通知空闲线程工作,此时空闲线程未处于阻塞状态怎么办!
pthread_cond_signal(&(thread_pool[leisurethread].cond)),
"pthread_cond_signal in addOneTask"
);
return true;
}else{
cout<<"no leisure thread"<<endl;
excuteAndTest(
pthread_mutex_unlock(&thread_pool_mutex),
"pthread_mutex_unlock in addOneTask"
);
return true;
}
}
void ThreadPool::destroyThreadPool(){
//设置关闭标志
shutdown=true;
for(int i=0;i<(int)thread_pool.size();i++){
excuteAndTest(
//在销毁线程池时,空闲线程都阻塞在pthread_cond_wait了,需要先将其唤醒
pthread_cond_signal(&thread_pool[i].cond),
"cond_signal in destroyThreadPool"
);
cout<<"notified "<<i<<" pthread_cond to shutdown"<<endl;
}
}
void ThreadPool::excuteAndTest(int s,string str){
if(s!=0) cout<<str<<endl;
}
待解决
- 如果销毁线程池时,给每个线程发送了pthread_cond_signal,但是有线程还没有执行pthread_cond_wait,此时信号会丢失https://www.cnblogs.com/super119/archive/2011/07/29/2120761.html。
- 在添加任务后,对空闲线程发送pthread_cond_signal时,空闲线程未处于阻塞状态怎么处理?
- bool变量需要改为原子的atomic!
- 使用RAII机制的锁。
- queue锁的细粒度(无锁队列) STL中queue不是线程安全的,所以如果加锁的话只能给整个队列加锁,而不能给入队和出队两个操作分别加锁。 所以添加任务和执行任务两个操作并不能同时进行。
- 线程优先级
- 销毁线程池时将自己设置的shutdown改成利用cancel信号可以吗。
其他
- 关于线程数量的设置 N核服务器,通过执行业务的单线程分析出本地计算时间为x,等待时间为y,则工作线程数(线程池线程数)设置为 N*(x+y)/x,能让CPU的利用率最大化。
- 思路不难,但是真正写代码才发现有好多需要考虑的细节。
- 待学习:使用gdb调试多线程线程的查看以及利用gdb调试多线程
- 阅读jdk里的线程池源码
参考
- 线程数究竟设多少合理
- C++并发实战17:线程安全的stack和queue
- linyacool WebServer中的线程池
- 用C++写线程池是怎样一种体验?
- 基于c++11的100行实现简单线程池
- 使用C++11实现线程池的两种方法
欢迎与我分享你的看法。 转载请注明出处:http://taowusheng.cn/
相关文章
- Linux查看可执行文件位置的方法(linux执行文件位置)
- Linux配置文件:实现强大灵活性(linux的conf文件)
- 用Linux实现安全访问:建立堡垒机(linux堡垒机)
- 安装Deb文件:Linux下的轻松操作(linux安装deb文件)
- 杭州Linux运维:实现智能化管理(杭州linux运维)
- Linux线程查询研究(linux线程查询)
- 管理Linux权限管理:实现安全控制的必要步骤(linux所有权限)
- 使用Linux系统的备份工具(linux备份工具)
- 品质无双:Linux虚拟主机的优势(虚拟主机linux)
- 揭秘Linux中的统计力量(linux统计工具)
- Linux镜像烧录轻松实现(linux镜像写入)
- 配置Linux环境变量路径配置完美指南(linux环境变量路径)
- 使用Linux实现多WAN网络连接管理(linux多wan)
- Linux文件写入教程:简单易懂的文件写入方法(linux将文件写入文件)
- 图片Linux实现网页图片抓取功能(linux抓取网页)
- Linux:实现U盘的数字化存储(可以装u盘的linux)
- Linux 实现局域网挂载:快速共享文件和数据(linux挂载局域网)
- Linux 主机管理实现可靠运行(linux 主机管理系统)