C++ 定时线程池(ScheduledThreadPool)深度解析
·
前言
在后端服务开发中,定时任务调度是一个高频需求:定期备份数据、清理临时文件、定时发送心跳包、日志归档等场景都离不开它。而 ScheduledThreadPool(定时线程池)正是实现这一需求的核心组件。
一、为什么需要定时线程池?
传统的定时任务实现方式(如 sleep + 循环、alarm 信号)存在诸多缺陷:
- 阻塞式等待:
sleep会阻塞整个线程,无法处理其他任务;- 精度不足:信号定时易受系统调度影响,时间精度难以保证;
- 扩展性差:难以同时管理多个定时任务,更无法支持动态添加 / 取消;
- 任务调度复杂:缺乏统一的调度模型,任务执行顺序和并发控制难以保障。
定时线程池的核心价值,正是解决这些痛点:
- 支持单次延迟执行、固定时间点执行、周期性重复执行三种模式;
- 基于
timerfd+epoll实现高效事件驱动,非阻塞管理海量定时任务;- 提供线程安全的任务添加 / 取消接口,扩展性极强;
- 统一调度模型,任务执行与定时逻辑解耦,支持并发执行。
二、整体架构概览
采用了分层设计,结构清晰、职责分明的定时线程池实现:
- Timer 定时器:封装
timerfd,管理单个定时任务的创建、设置、触发与销毁; - TimerQueue 事件队列:基于
epoll管理所有定时器事件,循环监听触发的定时器并执行回调; - ScheduledThreadPool 线程池:对外提供统一的定时任务提交接口,支持多种定时模式。
三、核心组件解析
3.1 Timer:单个定时器的封装
Timer 类是定时任务的最小单元,封装了 Linux 系统的 timerfd 定时器接口,负责单个任务的定时管理。
核心成员与接口
#include <functional>
#include <utility> // pair
using namespace std;
#include "Timestamp.hpp"
#ifndef TIMER_HPP
#define TIMER_HPP
namespace tulun
{
// 定时器回调函数类型:无参无返回值
using TimerCallback = std::function<void(void)>;
class Timer
{
private:
int m_timerfd; // Linux timerfd 文件描述符
TimerCallback m_callback; // 超时回调函数
tulun::Timestamp m_expiration; // 超时时间点
size_t m_interval; // 重复间隔(毫秒)
bool m_repeat; // 是否循环定时器
// 设置定时器超时时间(内部调用)
bool settimer();
public:
Timer(); // 构造:初始化成员
~Timer(); // 析构:关闭 timerfd
// 初始化定时器
// cb:回调 when:首次超时时间 interval:重复间隔
bool init(const TimerCallback &cb, const Timestamp &when, size_t interval);
// 重置定时器(循环任务用)
bool resetTimer(const Timestamp &newtime);
// 定时器触发时处理(读 timerfd + 执行回调)
void handleEvent();
// 获取 timerfd
int getTimerFd() const;
// 关闭定时器,释放资源
bool closeTimer();
};
// 定时器唯一标识:fd + Timer 对象指针
using TimerId = std::pair<int, Timer *>;
} // namespace tulun
#endif
关键实现细节
-
定时器初始化(
init)- 通过
timerfd_create(CLOCK_MONOTONIC, 0)创建定时器,使用系统单调时间,不受系统时间修改影响; - 保存回调函数、超时时间、间隔等参数,调用
settimer()设置定时器超时时间。
- 通过
-
定时器设置(
settimer)- 计算目标时间与当前时间的差值,转换为
timespec结构; - 通过
timerfd_settime设置定时器的首次超时时间和重复间隔。
- 计算目标时间与当前时间的差值,转换为
-
超时事件处理(
handleEvent)- 读取
timerfd,获取超时次数; - 执行用户定义的回调函数;
- 重复定时器会自动重置超时时间,非重复定时器则置为无效。
- 读取
#include <sys/timerfd.h> // Linux timerfd 定时器接口
#include <errno.h> // 错误码
#include <unistd.h> // close() 关闭文件描述符
#include <string.h> // strerror() 打印错误信息
#include "Logger.hpp" // 日志模块
#include "Timer.hpp" // 定时器类定义
namespace tulun
{
/*
计算当前时间到目标时间的间隔,返回 timespec 结构(秒 + 纳秒)
when 目标超时时间戳
return timespec 距离超时还剩多久
*/
static struct timespec howMuchTimeFormNow(const Timestamp &when)
{
// 计算目标时间 - 当前时间,得到剩余微秒数
int64_t microseconds = when.getMicro() - Timestamp::Now().getMicro();
// 最小间隔 100 微秒,防止时间为负导致定时器异常
if (microseconds < 100)
{
microseconds = 100;
}
struct timespec ts;
ts.tv_sec = (microseconds / Timestamp::KMinPerSec);
ts.tv_nsec = (microseconds % Timestamp::KMinPerSec) * 1000;
return ts;
}
/*
设置 timerfd 定时器的超时时间
return true 设置成功 / false 失败
*/
bool Timer::settimer()
{
bool ret = true;
// 定时器配置结构体
struct itimerspec new_value = {};
// m_interval 单位:毫秒
new_value.it_interval.tv_sec = (m_interval / 1000); // 1500
new_value.it_interval.tv_nsec = (m_interval % 1000) * 1000 * 1000;
// 配置第一次超时的时间
// 调用工具函数计算距离现在多久
new_value.it_value = howMuchTimeFormNow(m_expiration);
// 系统调用:设置定时器
if (::timerfd_settime(m_timerfd, 0, &new_value, nullptr) < 0)
{
LOG_ERROR << "fimerfd_settime fail : " << strerror(errno);
ret = false;
}
return ret;
}
Timer::Timer()
: m_timerfd(-1), // 定时器fd初始化为-1
m_callback(nullptr), // 回调函数为空
m_expiration(), // 超时时间初始化
m_interval(0), // 间隔时间0
m_repeat(false) // 默认不重复
{
}
Timer::~Timer()
{
closeTimer();
}
/*
初始化定时器:创建timerfd、设置回调、超时时间、间隔
cb 超时回调函数
when 第一次超时时间
interval 超时间隔(毫秒)
return 初始化是否成功
*/
bool Timer::init(const TimerCallback &cb, const Timestamp &when, size_t interval)
{
bool ret = true;
// 创建 timerfd 定时器(CLOCK_MONOTONIC:系统绝对时间,不受系统时间修改影响)
m_timerfd = ::timerfd_create(CLOCK_MONOTONIC, 0);
LOG_DEBUG << "m_timerfd: " << m_timerfd;
if (m_timerfd < 0)
{
LOG_FATAL << "timerfd_create fail : " << strerror(errno);
ret = false;
}
else
{
// 保存回调、超时时间、间隔、是否重复
m_callback = cb;
m_expiration = when;
m_interval = interval;
m_repeat = (interval > 0);// 间隔>0 → 循环定时器
// 设置定时器
settimer();// 启动计时
}
return ret;
}
// 重置定时器(循环任务)
bool Timer::resetTimer(const Timestamp &newtime)
{
bool ret = true;
if (m_repeat)
{
// 如果是重复定时器,重新计算下一次超时时间
m_expiration = tulun::addTimeMilloc(newtime, m_interval);
settimer();
}
else
{
// 非重复定时器,置为无效
m_expiration = tulun::Timestamp::Invalid();
ret = false;
}
return ret;
}
// 定时器触发:必须读 timerfd,否则一直触发
void Timer::handleEvent()
{
uint64_t expire_cnt = 0;
if (::read(m_timerfd, &expire_cnt, sizeof(expire_cnt)) != sizeof(expire_cnt))
{
LOG_ERROR << "read m_fimerfd fail ";
return;
}
// 执行用户回调
if (m_callback != nullptr)
{
m_callback();
}
}
// 获取 timerfd
int Timer::getTimerFd() const
{
return m_timerfd;
}
/*
关闭定时器,释放资源
return 关闭成功/失败
*/
bool Timer::closeTimer()
{
bool ret = false;
if (m_timerfd > 0)
{
close(m_timerfd);
m_timerfd = -1;
m_callback = nullptr;
m_expiration = Timestamp::Invalid();
m_repeat = false;
ret = true;
}
return ret;
}
} // namespace tulun
3.2 TimerQueue:定时器事件队列
TimerQueue 是整个定时线程池的核心调度器,基于 epoll 实现高效事件监听,管理所有定时器的生命周期。
核心成员与接口
#include <sys/epoll.h>
#include <unordered_map>
#include <vector>
#include <thread>
#include <atomic>
#include <mutex>
using namespace std;
#include "Logger.hpp"
#include "Timestamp.hpp"
#include "Timer.hpp"
#ifndef TIMERQUEUE_HPP
#define TIMERQUEUE_HPP
namespace tulun
{
// 定时器队列:单线程 + epoll 管理所有定时器
class TimerQueue
{
private:
static const int eventsize = 16; // epoll 初始事件数组大小
int m_epollfd; // epoll 文件描述符
int m_timeout; // epoll_wait 超时时间
std::vector<struct epoll_event> m_events; // 接收事件
std::unordered_map<int, Timer *> m_timers; // fd -> Timer*
std::atomic_bool m_stop; // 停止标记
std::once_flag m_flag; // 保证 stop 只执行一次
std::thread m_worderThread; // 监听线程
void loop(); // 线程主循环:epoll_wait
void init(); // 初始化 epoll + 启动线程
void stopQueue(); // 真正停止队列
public:
TimerQueue(int timeout = -1);
~TimerQueue();
// 添加定时器:回调、超时时间、重复间隔
tulun::TimerId addTimer(const TimerCallback &cb, const Timestamp &when, size_t interval);
// 取消定时器
void cancel(TimerId timerid);
// 停止队列
void stop();
};
} // namespace tulun
#endif
关键实现细节
-
事件循环(
loop)- 调用
epoll_wait监听所有timerfd的可读事件; - 当定时器超时触发时,调用
Timer::handleEvent()执行回调; - 动态扩展事件数组,支持海量定时器管理。
- 调用
-
添加定时器(
addTimer)- 创建
Timer对象并初始化; - 将
timerfd添加到epoll监听集合; - 保存
fd与Timer的映射关系,返回TimerId用于后续取消。
- 创建
-
取消定时器(
cancel)- 根据
TimerId找到对应的Timer对象; - 关闭
timerfd,释放资源并从映射表中移除。
- 根据
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include "Timer.hpp"
#include "TimerQueue.hpp"
namespace tulun
{
// epoll 循环:监听所有 timerfd
void TimerQueue::loop()
{
while (!m_stop)
{
// 等待事件(实际触发的事件数量)
int n = ::epoll_wait(m_epollfd, m_events.data(), m_events.size(), m_timeout);
// 处理所有触发的定时器
for (int i = 0; i < n; ++i)
{
int fd = m_events[i].data.fd;
auto it = m_timers.find(fd);
if (it != m_timers.end())
{
// 执行定时器触发逻辑
it->second->handleEvent();
}
}
// 自动扩容
if (n >= m_events.size())
{
m_events.resize(m_events.size() * 2);
}
}
}
// 初始化:创建 epoll,启动监听线程
void TimerQueue::init()
{
// EPOLL_CLOEXEC:子进程不继承 fd,安全
m_epollfd = ::epoll_create1(EPOLL_CLOEXEC); //?
if (m_epollfd < 0)
{
LOG_FATAL << "epoll_create1 fail: " << strerror(errno);
return;
}
try
{
m_stop = false;
// 创建线程执行 loop
m_worderThread = std::thread(&TimerQueue::loop, this);
}
catch (const std::exception &e)
{
LOG_FATAL << e.what();
close(m_epollfd);
m_epollfd = -1;
m_stop = true;
}
}
// 停止队列:关闭线程 + 释放所有定时器
void TimerQueue::stopQueue()
{
m_stop = true;
if (m_worderThread.joinable())
{
m_worderThread.join();
}
// 释放所有 Timer
for (auto &id : m_timers)
{
id.second->closeTimer();
delete id.second;
}
m_timers.clear();
close(m_epollfd);
m_epollfd = -1;
}
TimerQueue::TimerQueue(int timeout)
: m_epollfd(-1),
m_timeout(timeout),
m_stop(true)
{
m_events.resize(eventsize);
init();
}
TimerQueue::~TimerQueue()
{
stop();
}
// 添加定时器到 epoll
tulun::TimerId TimerQueue::addTimer(const TimerCallback &cb,
const Timestamp &when,
size_t interval)
{
TimerId ret{-1, nullptr};
Timer *ptimer = new Timer();
if (!ptimer->init(cb, when, interval))
{
return ret;
}
// 添加到 epoll 监听(可读事件)
struct epoll_event evt;
evt.events = EPOLLIN;
evt.data.fd = ptimer->getTimerFd();
if (::epoll_ctl(m_epollfd, EPOLL_CTL_ADD, ptimer->getTimerFd(), &evt) < 0)
{
LOG_ERROR << "epoll_ctl add fail " << strerror(errno);
return ret;
}
// 保存到 map
m_timers[ptimer->getTimerFd()] = ptimer;
ret.first = ptimer->getTimerFd();
ret.second = ptimer;
return ret;
}
// 取消定时器
void TimerQueue::cancel(TimerId timerid)
{
auto it = m_timers.find(timerid.first);
if (it != m_timers.end())
{
m_timers.erase(it);
it->second->closeTimer();
delete it->second;
}
}
// 线程安全停止(只执行一次)
void TimerQueue::stop()
{
std::call_once(m_flag, &TimerQueue::stopQueue, this);
}
} // namespace tulun
3.3 ScheduledThreadPool:对外统一接口
ScheduledThreadPool 封装了 TimerQueue,为用户提供了三种常用的定时任务提交接口,使用便捷。
#include "Timestamp.hpp"
#include "Timer.hpp"
#include "TimerQueue.hpp"
#include "Logger.hpp"
#include <functional>
#include <future>
#include <memory>
#include <chrono>
using namespace std;
#ifndef SCHEDULEDTHREADPOOL_HPP
#define SCHEDULEDTHREADPOOL_HPP
namespace tulun
{
// 定时线程池:对外提供 3 种定时任务
class ScheduledThreadPool
{
private:
tulun::TimerQueue m_queue;// 内部定时器队列
public:
ScheduledThreadPool() {}
~ScheduledThreadPool() {}
// 在指定时间点执行一次
TimerId AddRunAt(const Timestamp &time, const TimerCallback &cb)
{
return m_queue.addTimer(cb, time, 0);
}
// 延迟 delay 毫秒执行一次
TimerId AddRunAfter(size_t delay, const TimerCallback &cb)
{
tulun::Timestamp time(tulun::addTimeMilloc(tulun::Timestamp::Now(), delay));
return AddRunAt(time, cb);
}
// 每隔 interval 毫秒循环执行
TimerId AddRunEvery(size_t interval, const TimerCallback &cb)
{
tulun::Timestamp time(tulun::addTimeMilloc(tulun::Timestamp::Now(), interval));
return m_queue.addTimer(cb, time, interval);
}
// 取消任务
void Cancel(TimerId timerid)
{
m_queue.cancel(timerid);
}
};
} // namespace tulun
#endif
四、核心定时模式详解
1. 单次延迟执行(AddRunAfter)
在当前时间延迟指定毫秒后执行一次任务。
// 延迟 1000ms 执行一次
spool.AddRunAfter(1000, [](){ cout << "延迟任务执行" << endl; });
2. 指定时间点执行(AddRunAt)
在指定的时间点执行一次任务,支持精确到微秒级的时间戳。
// 在指定时间点执行
tulun::Timestamp targetTime = tulun::Timestamp::Now() + 3000;
spool.AddRunAt(targetTime, [](){ cout << "定时任务执行" << endl; });
3. 周期性重复执行(AddRunEvery)
每隔指定毫秒重复执行任务,适用于心跳、备份等周期性场景。
// 每隔 1000ms 执行一次
spool.AddRunEvery(1000, func);
五、适用场景与最佳实践
5.1 典型适用场景
结合定时线程池的特性,它最适合以下场景:
- 定时任务调度:定期备份数据、清理临时文件、定时发送邮件;
- 心跳检测:周期性检测系统状态、发送心跳包,维持网络连接;
- 任务调度器:程序启动后执行初始化任务、结束前执行清理任务;
- 定时数据处理:特定时间点的数据库优化、日志归档、报表生成。
5.2 工程实践建议
- 任务耗时控制:定时任务应尽量避免阻塞操作,否则会影响后续任务的调度;
- 异常处理:回调函数中需捕获异常,防止单个任务崩溃影响整个线程池;
- 资源管理:长时间不使用的定时任务应调用
Cancel()取消,避免资源泄漏; - 精度要求:
timerfd定时精度可达微秒级,适用于大多数定时场景;对精度要求极高的场景,需结合业务逻辑做额外补偿。
六、完整测试示例
#include "ScheduledThreadPool.hpp"
#include <iostream>
using namespace std;
void func()
{
static int i=0;
cout << "func" << ++i <<endl;
}
int main()
{
tulun::ScheduledThreadPool spool;
// 每隔 1 秒执行一次
spool.AddRunEvery(1000, func);
// 阻塞 60 秒
std::this_thread::sleep_for(std::chrono::seconds(60));
return 0;
}
运行后,程序会每隔 1 秒输出 func1、func2...,持续 60 秒,验证周期性定时任务的正确性。
七、总结:定时线程池的核心优势
| 特性 | 说明 |
|---|---|
| 高效事件驱动 | 基于 timerfd + epoll 实现,非阻塞管理海量定时任务 |
| 多种定时模式 | 支持单次延迟、指定时间点、周期性重复三种模式 |
| 线程安全 | 提供线程安全的任务添加 / 取消接口 |
| 精度高 | 定时精度可达微秒级,不受系统时间修改影响 |
| 易扩展 | 分层设计,易于添加新的定时模式和回调逻辑 |
更多推荐

所有评论(0)