一、什么是线程池?
如果你使用过线程,就会知道,每当你想要启用一个任务时,就必须手动去启动一个线程来执行指定的任务。
这是很自然的想法,并且也并没有什么不对。
当问题就出在,电脑CPU可以同时执行的线程数量并不是无限多的,比如我目前电脑CPU是4核8线程的,意思就是同时最多也只能执行8个线程。
如果超过了这个数量,那么CPU就需要不断切换线程,调动资源,让其它线程也能进来运行一会。
这个过程相当损耗性能。
为了解决这个问题,就有了线程池的概念。
既然电脑本身最多可同时运行的线程数是有限的,那我只要保证我整个程序最多只开启指定个数的线程数不就行了吗?
无论当前应用程序中有多少任务,我都交给这指定个数的线程来完成,如果任务数量少,那就让线程休眠,如果任务数量比线程多,那就等在后面排队即可,这样就不会出现多线程去竞争CPU了影响时间了。
这就是线程池的概念:提前准备好指定个数的线程,无论你想要执行多少个任务,都把任务交给线程池去做就行了,至于线程是如何调度的,你就不用管了,那是线程池的事情。
二、手写一个线程池
写一个通用、可靠的线程池并不是一件容易的事,但写一个简单、实用版本的线程池我们还是可以做到的。
但在开始写之前,我们还需要了解一下线程同步的概念。
1.了解线程同步
一旦你开启了一个线程,那这个线程你是无法控制其执行进度的,但有些时候我们有需要知道其执行情况,比如下载文件的进度、是否出现了错误等等。
这就涉及到了多线程访问同一资源的问题,如果都是访问那还没什么,但就怕有线程会去更改它的数据。
比如4个线程都依靠某个全局变量来获取数据,而其中两个线程都去更改了它,那么最后这个全局变量的结果就是不可预测的。
所以这个是否我们就需要让不同线程达到同步,也就是同一时间只能有一个线程去修改、使用某个变量。
一般在C++中用的比较多的就是原子、互斥体。
原子很简单,它本事就是最小操作单元,它只有两个状态:要么执行成功、要么执行失败,而不会出现执行错误的事情。
比如前面两个线程想要修改同一个全局变量,如果是原子操作那就不会出现问题,比如都+1,那最后的结果一定是+2,但如果不是原子操作却不一定了。
虽然只是一个+1操作,但在计算机底层同样得分几步完成,不是原子操作的话这一过程就无法控制,导致最后的结果不可控。
但原子操作只适合基本数据类型,比如int、bool等等。
它所在头文件为:
#include<atomic>
使用方法很简单,比如要一个原子操作的int数据:
std::atomic<int> num
它是一个模板,之后你就可以正常将它当作一个数字变量使用即可。
但很多时候我们想要在不同线程之间共享的不仅仅只是基本数据类型,所以就需要互斥体。
它所在的头文件为:
#include<mutex>
使用方法为:
std::mutex m_mutex;
它并不是一个模板,所以只需要将其申明为一个对象即可。
在你想要访问共享数据的是否,你就必须要像下面这样写:
m_mutex.lock();
//访问、否则修改多线程共享数据
m_mutex.unlock();
总共就两个函数,一个是lock
,代表你想要访问数据,如果你当前不能访问数据(比如其它线程正在使用),那这个线程就会阻塞在这里。
在你使用完成后,也一定要记得用unlock
把互斥体打开,这样其它线程才能使用,否则就进入了死锁状态。
所谓死锁状态,就是所有线程都在等,但却不存在资源可以被利用的时候,比如上面这个例子就是,如果你不解锁,那么其它线程想要使用资源就只能一直等待下去。
所以这两个函数都是成对出现的。
另一个使用互斥体的要点是,互斥体本身并不保护任何东西,我们仅仅只是借用了它这个特性:同一时间只能有一个线程去锁住它,以此来保护我们的数据。
所以如果你的多线程共享数据(状态、或者操作),你都得自己记得要添加互斥体才行,这也是使用它比较麻烦的地方。
2. 线程池类
上面的内容只是看还是有点抽象,所以下面还是直接来实战吧。
首先我们要定义一个线程池类:
class ThreadPool {
}
后面写的变量、函数都是在这个类中。
既然是线程池,首先我们得需要有线程吧!
所以需要头文件:
#include<thread>
然后声明一个成员变量用来存放线程:
std::vector<std::thread> m_thread; //存放所有线程
使用的基本容器vector来存放。
但使用过这个线程类的都知道,它还需要一个线程函数,这个线程函数肯定不能是类的成员函数,所以我们要将其写为静态函数:
static void process_task() {}
至于其中的内容,我们后面再说。
除此之外,为了让我们这个线程池类更好用,还需要记录两个状态:是否需要停止所有线程、以及当前正在执行任务的线程数。
但由于这两个状态需要在上面的静态线程函数中使用,所以必须要写为静态成员变量:
static std::atomic<bool> m_stop; //是否结束所有线程,原子操作,无需使用mutex
static std::atomic<int> m_exe; //记录当前正在线程中执行的任务数,原子操作,无需使用mutex
然后我们还需要一个线程池的构造函数,来构造出指定个数的线程类,并初始化状态:
ThreadPool(int nThread = 4)
{
m_stop = false;
m_exe = 0;
//初始化指定数量的线程
for (int i = 0; i < nThread; i++) {
m_thread.emplace_back(thread(process_task));
}
}
除此之外当然还有析构函数了:
~ThreadPool()
{
m_stop = true;//叫停所有线程
for (auto& i : m_thread) { //等待所有线程结束
if (i.joinable()) {
i.join();
}
}
}
然后我们就可以来到我们的重中之重了:线程处理函数process_task
。
既然是处理任意任务的线程池,我们得能获取到用户传入得任务吧!
所以就需要三个成员变量:
static std::mutex m_mutex; //管理任务队列的互斥体
static std::queue<std::function<void(void*)>> m_task;//任务队列
static std::queue<void*> m_param; //传递给任务队列中函数的参数,一一对应
第一个就是互斥体,用来限制多线程访问任务队列得。
第二个就是任务队列了,它用来存储用户传入的任务函数,std::queue
为标准库中的队列数据结构,只能先进先出。
std::function<void(void*)>
是标准库中的函数模板,实际上就是要接受void(void*)
类型的函数,前面的void代表无返回值,中间的void*
代表接受一个指针参数
如果不使用函数模板,你就得像下面这样定义一个函数类型,比较麻烦且不好理解: