前言

在后端服务开发中,定时任务调度是一个高频需求:定期备份数据、清理临时文件、定时发送心跳包、日志归档等场景都离不开它。而 ScheduledThreadPool(定时线程池)正是实现这一需求的核心组件。


一、为什么需要定时线程池?

传统的定时任务实现方式(如 sleep + 循环、alarm 信号)存在诸多缺陷:

  • 阻塞式等待sleep 会阻塞整个线程,无法处理其他任务;
  • 精度不足:信号定时易受系统调度影响,时间精度难以保证;
  • 扩展性差:难以同时管理多个定时任务,更无法支持动态添加 / 取消;
  • 任务调度复杂:缺乏统一的调度模型,任务执行顺序和并发控制难以保障。

定时线程池的核心价值,正是解决这些痛点:

  • 支持单次延迟执行固定时间点执行周期性重复执行三种模式;
  • 基于 timerfd + epoll 实现高效事件驱动,非阻塞管理海量定时任务;
  • 提供线程安全的任务添加 / 取消接口,扩展性极强;
  • 统一调度模型,任务执行与定时逻辑解耦,支持并发执行。

二、整体架构概览

采用了分层设计,结构清晰、职责分明的定时线程池实现:

  1. Timer 定时器:封装 timerfd,管理单个定时任务的创建、设置、触发与销毁;
  2. TimerQueue 事件队列:基于 epoll 管理所有定时器事件,循环监听触发的定时器并执行回调;
  3. ScheduledThreadPool 线程池:对外提供统一的定时任务提交接口,支持多种定时模式。

三、核心组件解析

3.1 Timer:单个定时器的封装

Timer 类是定时任务的最小单元,封装了 Linux 系统的 timerfd 定时器接口,负责单个任务的定时管理。

核心成员与接口
#include <functional>
#include <utility> // pair

using namespace std;

#include "Timestamp.hpp"

#ifndef TIMER_HPP
#define TIMER_HPP
namespace tulun
{
    // 定时器回调函数类型:无参无返回值
    using TimerCallback = std::function<void(void)>;

    class Timer
    {
    private:
        int m_timerfd;                 // Linux timerfd 文件描述符
        TimerCallback m_callback;      // 超时回调函数
        tulun::Timestamp m_expiration; // 超时时间点
        size_t m_interval;             // 重复间隔(毫秒)
        bool m_repeat;                 // 是否循环定时器
        // 设置定时器超时时间(内部调用)
        bool settimer();

    public:
        Timer();  // 构造:初始化成员
        ~Timer(); // 析构:关闭 timerfd

        // 初始化定时器
        // cb:回调  when:首次超时时间  interval:重复间隔
        bool init(const TimerCallback &cb, const Timestamp &when, size_t interval);
        // 重置定时器(循环任务用)
        bool resetTimer(const Timestamp &newtime);
        // 定时器触发时处理(读 timerfd + 执行回调)
        void handleEvent();
        // 获取 timerfd
        int getTimerFd() const;
        // 关闭定时器,释放资源
        bool closeTimer();
    };
    // 定时器唯一标识:fd + Timer 对象指针
    using TimerId = std::pair<int, Timer *>;
} // namespace tulun
#endif

关键实现细节
  1. 定时器初始化(init

    • 通过 timerfd_create(CLOCK_MONOTONIC, 0) 创建定时器,使用系统单调时间,不受系统时间修改影响;
    • 保存回调函数、超时时间、间隔等参数,调用 settimer() 设置定时器超时时间。
  2. 定时器设置(settimer

    • 计算目标时间与当前时间的差值,转换为 timespec 结构;
    • 通过 timerfd_settime 设置定时器的首次超时时间和重复间隔。
  3. 超时事件处理(handleEvent

    • 读取 timerfd,获取超时次数;
    • 执行用户定义的回调函数;
    • 重复定时器会自动重置超时时间,非重复定时器则置为无效。
#include <sys/timerfd.h> // Linux timerfd 定时器接口
#include <errno.h>       // 错误码
#include <unistd.h>      // close() 关闭文件描述符
#include <string.h>      // strerror() 打印错误信息

#include "Logger.hpp" // 日志模块
#include "Timer.hpp"  // 定时器类定义

namespace tulun
{
    /*
        计算当前时间到目标时间的间隔,返回 timespec 结构(秒 + 纳秒)
        when 目标超时时间戳
        return timespec 距离超时还剩多久
     */
    static struct timespec howMuchTimeFormNow(const Timestamp &when)
    {
        // 计算目标时间 - 当前时间,得到剩余微秒数
        int64_t microseconds = when.getMicro() - Timestamp::Now().getMicro();
        // 最小间隔 100 微秒,防止时间为负导致定时器异常
        if (microseconds < 100)
        {
            microseconds = 100;
        }
        struct timespec ts;
        ts.tv_sec = (microseconds / Timestamp::KMinPerSec);
        ts.tv_nsec = (microseconds % Timestamp::KMinPerSec) * 1000;
        return ts;
    }
    /*
        设置 timerfd 定时器的超时时间
        return true 设置成功 / false 失败
    */
    bool Timer::settimer()
    {
        bool ret = true;
        // 定时器配置结构体
        struct itimerspec new_value = {};
        // m_interval 单位:毫秒
        new_value.it_interval.tv_sec = (m_interval / 1000); // 1500
        new_value.it_interval.tv_nsec = (m_interval % 1000) * 1000 * 1000;
        // 配置第一次超时的时间
        // 调用工具函数计算距离现在多久
        new_value.it_value = howMuchTimeFormNow(m_expiration);
        // 系统调用:设置定时器
        if (::timerfd_settime(m_timerfd, 0, &new_value, nullptr) < 0)
        {
            LOG_ERROR << "fimerfd_settime fail : " << strerror(errno);
            ret = false;
        }
        return ret;
    }

    Timer::Timer()
        : m_timerfd(-1),         // 定时器fd初始化为-1
          m_callback(nullptr),   // 回调函数为空
          m_expiration(),        // 超时时间初始化
          m_interval(0),         // 间隔时间0
          m_repeat(false)        // 默认不重复
    {
    }
    Timer::~Timer()
    {
        closeTimer();
    }

    /*
        初始化定时器:创建timerfd、设置回调、超时时间、间隔
        cb 超时回调函数
        when 第一次超时时间
        interval 超时间隔(毫秒)
        return 初始化是否成功
    */
    bool Timer::init(const TimerCallback &cb, const Timestamp &when, size_t interval)
    {
        bool ret = true;
        // 创建 timerfd 定时器(CLOCK_MONOTONIC:系统绝对时间,不受系统时间修改影响)
        m_timerfd = ::timerfd_create(CLOCK_MONOTONIC, 0);
        LOG_DEBUG << "m_timerfd: " << m_timerfd;
        if (m_timerfd < 0)
        {
            LOG_FATAL << "timerfd_create fail : " << strerror(errno);
            ret = false;
        }
        else
        {
            // 保存回调、超时时间、间隔、是否重复
            m_callback = cb;
            m_expiration = when;
            m_interval = interval;
            m_repeat = (interval > 0);// 间隔>0 → 循环定时器
            // 设置定时器
            settimer();// 启动计时
        }
        return ret;
    }
    // 重置定时器(循环任务)
    bool Timer::resetTimer(const Timestamp &newtime)
    {
        bool ret = true;
        if (m_repeat)
        {
            // 如果是重复定时器,重新计算下一次超时时间
            m_expiration = tulun::addTimeMilloc(newtime, m_interval);
            settimer();
        }
        else
        {
            // 非重复定时器,置为无效
            m_expiration = tulun::Timestamp::Invalid();
            ret = false;
        }
        return ret;
    }
    // 定时器触发:必须读 timerfd,否则一直触发
    void Timer::handleEvent()
    {
        uint64_t expire_cnt = 0;
        if (::read(m_timerfd, &expire_cnt, sizeof(expire_cnt)) != sizeof(expire_cnt))
        {
            LOG_ERROR << "read m_fimerfd fail ";
            return;
        }
        // 执行用户回调
        if (m_callback != nullptr)
        {
            m_callback();
        }
    }
    // 获取 timerfd
    int Timer::getTimerFd() const
    {
        return m_timerfd;
    }
    /*
        关闭定时器,释放资源
        return 关闭成功/失败
    */
    bool Timer::closeTimer()
    {
        bool ret = false;
        if (m_timerfd > 0)
        {
            close(m_timerfd);
            m_timerfd = -1;
            m_callback = nullptr;
            m_expiration = Timestamp::Invalid();
            m_repeat = false;
            ret = true;
        }
        return ret;
    }
} // namespace tulun

3.2 TimerQueue:定时器事件队列

TimerQueue 是整个定时线程池的核心调度器,基于 epoll 实现高效事件监听,管理所有定时器的生命周期。

核心成员与接口


#include <sys/epoll.h>

#include <unordered_map>
#include <vector>
#include <thread>
#include <atomic>
#include <mutex>

using namespace std;

#include "Logger.hpp"
#include "Timestamp.hpp"
#include "Timer.hpp"

#ifndef TIMERQUEUE_HPP
#define TIMERQUEUE_HPP
namespace tulun
{
    // 定时器队列:单线程 + epoll 管理所有定时器
    class TimerQueue
    {
    private:
        static const int eventsize = 16; // epoll 初始事件数组大小

        int m_epollfd;                             // epoll 文件描述符
        int m_timeout;                             // epoll_wait 超时时间
        std::vector<struct epoll_event> m_events;  // 接收事件
        std::unordered_map<int, Timer *> m_timers; // fd -> Timer*

        std::atomic_bool m_stop;    // 停止标记
        std::once_flag m_flag;      // 保证 stop 只执行一次
        std::thread m_worderThread; // 监听线程

        void loop();      // 线程主循环:epoll_wait
        void init();      // 初始化 epoll + 启动线程
        void stopQueue(); // 真正停止队列

    public:
        TimerQueue(int timeout = -1);
        ~TimerQueue();

        // 添加定时器:回调、超时时间、重复间隔
        tulun::TimerId addTimer(const TimerCallback &cb, const Timestamp &when, size_t interval);
        // 取消定时器
        void cancel(TimerId timerid);
        // 停止队列
        void stop();
    };
} // namespace tulun
#endif

关键实现细节
  1. 事件循环(loop

    • 调用 epoll_wait 监听所有 timerfd 的可读事件;
    • 当定时器超时触发时,调用 Timer::handleEvent() 执行回调;
    • 动态扩展事件数组,支持海量定时器管理。
  2. 添加定时器(addTimer

    • 创建 Timer 对象并初始化;
    • timerfd 添加到 epoll 监听集合;
    • 保存 fdTimer 的映射关系,返回 TimerId 用于后续取消。
  3. 取消定时器(cancel

    • 根据 TimerId 找到对应的 Timer 对象;
    • 关闭 timerfd,释放资源并从映射表中移除。


#include <unistd.h>
#include <errno.h>

#include <string.h>

#include "Timer.hpp"
#include "TimerQueue.hpp"

namespace tulun
{
    // epoll 循环:监听所有 timerfd
    void TimerQueue::loop()
    {
        while (!m_stop)
        {
            // 等待事件(实际触发的事件数量)
            int n = ::epoll_wait(m_epollfd, m_events.data(), m_events.size(), m_timeout);
            // 处理所有触发的定时器
            for (int i = 0; i < n; ++i)
            {
                int fd = m_events[i].data.fd;
                auto it = m_timers.find(fd);
                if (it != m_timers.end())
                {
                    // 执行定时器触发逻辑
                    it->second->handleEvent();
                }
            }
            // 自动扩容
            if (n >= m_events.size())
            {
                m_events.resize(m_events.size() * 2);
            }
        }
    }
    // 初始化:创建 epoll,启动监听线程
    void TimerQueue::init()
    {
        // EPOLL_CLOEXEC:子进程不继承 fd,安全
        m_epollfd = ::epoll_create1(EPOLL_CLOEXEC); //?
        if (m_epollfd < 0)
        {
            LOG_FATAL << "epoll_create1 fail: " << strerror(errno);
            return;
        }
        try
        {
            m_stop = false;
            // 创建线程执行 loop
            m_worderThread = std::thread(&TimerQueue::loop, this);
        }
        catch (const std::exception &e)
        {
            LOG_FATAL << e.what();
            close(m_epollfd);
            m_epollfd = -1;
            m_stop = true;
        }
    }
    // 停止队列:关闭线程 + 释放所有定时器
    void TimerQueue::stopQueue()
    {
        m_stop = true;
        if (m_worderThread.joinable())
        {
            m_worderThread.join();
        }
        // 释放所有 Timer
        for (auto &id : m_timers)
        {
            id.second->closeTimer();
            delete id.second;
        }
        m_timers.clear();
        close(m_epollfd);
        m_epollfd = -1;
    }

    TimerQueue::TimerQueue(int timeout)
        : m_epollfd(-1),
          m_timeout(timeout),
          m_stop(true)
    {
        m_events.resize(eventsize);
        init();
    }
    TimerQueue::~TimerQueue()
    {
        stop();
    }
    // 添加定时器到 epoll
    tulun::TimerId TimerQueue::addTimer(const TimerCallback &cb,
                                        const Timestamp &when,
                                        size_t interval)
    {
        TimerId ret{-1, nullptr};
        Timer *ptimer = new Timer();
        if (!ptimer->init(cb, when, interval))
        {
            return ret;
        }
        // 添加到 epoll 监听(可读事件)
        struct epoll_event evt;
        evt.events = EPOLLIN;
        evt.data.fd = ptimer->getTimerFd();
        if (::epoll_ctl(m_epollfd, EPOLL_CTL_ADD, ptimer->getTimerFd(), &evt) < 0)
        {
            LOG_ERROR << "epoll_ctl add fail " << strerror(errno);
            return ret;
        }
        // 保存到 map
        m_timers[ptimer->getTimerFd()] = ptimer;
        ret.first = ptimer->getTimerFd();
        ret.second = ptimer;
        return ret;
    }
    // 取消定时器
    void TimerQueue::cancel(TimerId timerid)
    {
        auto it = m_timers.find(timerid.first);
        if (it != m_timers.end())
        {
            m_timers.erase(it);
            it->second->closeTimer();
            delete it->second;
        }
    }
    // 线程安全停止(只执行一次)
    void TimerQueue::stop()
    {
        std::call_once(m_flag, &TimerQueue::stopQueue, this);
    }

} // namespace tulun

3.3 ScheduledThreadPool:对外统一接口

ScheduledThreadPool 封装了 TimerQueue,为用户提供了三种常用的定时任务提交接口,使用便捷。

#include "Timestamp.hpp"
#include "Timer.hpp"
#include "TimerQueue.hpp"
#include "Logger.hpp"

#include <functional>
#include <future>
#include <memory>
#include <chrono>
using namespace std;

#ifndef SCHEDULEDTHREADPOOL_HPP
#define SCHEDULEDTHREADPOOL_HPP
namespace tulun
{
    // 定时线程池:对外提供 3 种定时任务
    class ScheduledThreadPool
    {
    private:
        tulun::TimerQueue m_queue;// 内部定时器队列
    public:
        ScheduledThreadPool() {}
        ~ScheduledThreadPool() {}
        // 在指定时间点执行一次
        TimerId AddRunAt(const Timestamp &time, const TimerCallback &cb)
        {
            return m_queue.addTimer(cb, time, 0);
        }
        // 延迟 delay 毫秒执行一次
        TimerId AddRunAfter(size_t delay, const TimerCallback &cb)
        {
            tulun::Timestamp time(tulun::addTimeMilloc(tulun::Timestamp::Now(), delay));
            return AddRunAt(time, cb);
        }
        // 每隔 interval 毫秒循环执行
        TimerId AddRunEvery(size_t interval, const TimerCallback &cb)
        {
            tulun::Timestamp time(tulun::addTimeMilloc(tulun::Timestamp::Now(), interval));
            return m_queue.addTimer(cb, time, interval);
        }
        // 取消任务
        void Cancel(TimerId timerid)
        {
            m_queue.cancel(timerid);
        }
    };
} // namespace tulun

#endif

四、核心定时模式详解

1. 单次延迟执行(AddRunAfter)

在当前时间延迟指定毫秒后执行一次任务。

// 延迟 1000ms 执行一次
spool.AddRunAfter(1000, [](){ cout << "延迟任务执行" << endl; });

2. 指定时间点执行(AddRunAt)

在指定的时间点执行一次任务,支持精确到微秒级的时间戳。

// 在指定时间点执行
tulun::Timestamp targetTime = tulun::Timestamp::Now() + 3000;
spool.AddRunAt(targetTime, [](){ cout << "定时任务执行" << endl; });

3. 周期性重复执行(AddRunEvery)

每隔指定毫秒重复执行任务,适用于心跳、备份等周期性场景。

// 每隔 1000ms 执行一次
spool.AddRunEvery(1000, func);

五、适用场景与最佳实践

5.1 典型适用场景

结合定时线程池的特性,它最适合以下场景:

  1. 定时任务调度:定期备份数据、清理临时文件、定时发送邮件;
  2. 心跳检测:周期性检测系统状态、发送心跳包,维持网络连接;
  3. 任务调度器:程序启动后执行初始化任务、结束前执行清理任务;
  4. 定时数据处理:特定时间点的数据库优化、日志归档、报表生成。

5.2 工程实践建议

  • 任务耗时控制:定时任务应尽量避免阻塞操作,否则会影响后续任务的调度;
  • 异常处理:回调函数中需捕获异常,防止单个任务崩溃影响整个线程池;
  • 资源管理:长时间不使用的定时任务应调用 Cancel() 取消,避免资源泄漏;
  • 精度要求timerfd 定时精度可达微秒级,适用于大多数定时场景;对精度要求极高的场景,需结合业务逻辑做额外补偿。

六、完整测试示例

#include "ScheduledThreadPool.hpp"
#include <iostream>
using namespace std;

void func()
{
    static int i=0;
    cout << "func" << ++i <<endl;
}

int main()
{
    tulun::ScheduledThreadPool spool;
    // 每隔 1 秒执行一次
    spool.AddRunEvery(1000, func);
    // 阻塞 60 秒
    std::this_thread::sleep_for(std::chrono::seconds(60));
    return 0;
}

运行后,程序会每隔 1 秒输出 func1func2...,持续 60 秒,验证周期性定时任务的正确性。


七、总结:定时线程池的核心优势

特性 说明
高效事件驱动 基于 timerfd + epoll 实现,非阻塞管理海量定时任务
多种定时模式 支持单次延迟、指定时间点、周期性重复三种模式
线程安全 提供线程安全的任务添加 / 取消接口
精度高 定时精度可达微秒级,不受系统时间修改影响
易扩展 分层设计,易于添加新的定时模式和回调逻辑

更多推荐