手动管理定时任务太麻烦了,cron表达式又难记。今天分享一套完整的定时任务调度管理系统,让任务管理变得可视化、智能化。

任务调度器框架

import schedule
import time
import threading
import logging
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from typing import Callable, Dict, List, Optional
from enum import Enum
import json
from pathlib import Path
from abc import ABC, abstractmethod

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class TaskStatus(Enum):
    """任务状态"""
    PENDING = "pending"
    RUNNING = "running"
    SUCCESS = "success"
    FAILED = "failed"
    STOPPED = "stopped"

@dataclass
class Task:
    """任务定义"""
    id: str
    name: str
    func: Callable
    interval: str  # 执行间隔,如 "1h", "30m", "daily"
    enabled: bool = True
    timeout: int = 3600  # 超时时间(秒)
    retry_times: int = 0  # 重试次数
    retry_interval: int = 60  # 重试间隔(秒)
    
    # 运行时信息
    status: TaskStatus = TaskStatus.PENDING
    last_run: Optional[datetime] = None
    next_run: Optional[datetime] = None
    run_count: int = 0
    success_count: int = 0
    failed_count: int = 0
    last_error: Optional[str] = None
    
    def to_dict(self) -> Dict:
        """转换为字典"""
        return {
            'id': self.id,
            'name': self.name,
            'interval': self.interval,
            'enabled': self.enabled,
            'status': self.status.value,
            'last_run': self.last_run.isoformat() if self.last_run else None,
            'next_run': self.next_run.isoformat() if self.next_run else None,
            'run_count': self.run_count,
            'success_count': self.success_count,
            'failed_count': self.failed_count,
            'last_error': self.last_error
        }

class TaskHook(ABC):
    """任务钩子基类"""
    
    @abstractmethod
    def before_run(self, task: Task):
        """任务执行前"""
        pass
    
    @abstractmethod
    def after_run(self, task: Task, result: any, error: Optional[str]):
        """任务执行后"""
        pass

class LogTaskHook(TaskHook):
    """日志钩子"""
    
    def before_run(self, task: Task):
        logger.info(f"[{task.name}] 开始执行")
    
    def after_run(self, task: Task, result: any, error: Optional[str]):
        if error:
            logger.error(f"[{task.name}] 执行失败: {error}")
        else:
            logger.info(f"[{task.name}] 执行完成")

class NotificationTaskHook(TaskHook):
    """通知钩子"""
    
    def __init__(self, webhook_url: str = None, email_to: List[str] = None):
        self.webhook_url = webhook_url
        self.email_to = email_to
    
    def before_run(self, task: Task):
        pass
    
    def after_run(self, task: Task, result: any, error: Optional[str]):
        if error and self.webhook_url:
            self._send_webhook(task, error)
    
    def _send_webhook(self, task: Task, error: str):
        """发送WebHook通知"""
        import requests
        try:
            requests.post(self.webhook_url, json={
                'msg_type': 'text',
                'content': {
                    'text': f"任务执行失败\n任务: {task.name}\n错误: {error}"
                }
            }, timeout=10)
        except Exception as e:
            logger.error(f"通知发送失败: {e}")

class TaskScheduler:
    """任务调度器"""
    
    def __init__(self):
        self.tasks: Dict[str, Task] = {}
        self.hooks: List[TaskHook] = []
        self.running = False
        self._lock = threading.Lock()
        
        # 添加默认钩子
        self.add_hook(LogTaskHook())
        
        # 加载保存的任务
        self._load_tasks()
    
    def add_hook(self, hook: TaskHook):
        """添加钩子"""
        self.hooks.append(hook)
    
    def add_task(self, task: Task):
        """添加任务"""
        with self._lock:
            self.tasks[task.id] = task
        logger.info(f"任务已添加: {task.name} ({task.interval})")
        self._schedule_task(task)
        self._save_tasks()
    
    def remove_task(self, task_id: str):
        """移除任务"""
        with self._lock:
            if task_id in self.tasks:
                task = self.tasks[task_id]
                task.enabled = False
                schedule.clear(tag=task_id)
                del self.tasks[task_id]
                logger.info(f"任务已移除: {task.name}")
                self._save_tasks()
    
    def get_task(self, task_id: str) -> Optional[Task]:
        """获取任务"""
        return self.tasks.get(task_id)
    
    def list_tasks(self) -> List[Task]:
        """列出所有任务"""
        return list(self.tasks.values())
    
    def enable_task(self, task_id: str):
        """启用任务"""
        task = self.tasks.get(task_id)
        if task:
            task.enabled = True
            self._schedule_task(task)
            self._save_tasks()
            logger.info(f"任务已启用: {task.name}")
    
    def disable_task(self, task_id: str):
        """禁用任务"""
        task = self.tasks.get(task_id)
        if task:
            task.enabled = False
            schedule.clear(tag=task_id)
            self._save_tasks()
            logger.info(f"任务已禁用: {task.name}")
    
    def run_task_now(self, task_id: str):
        """立即执行任务"""
        task = self.tasks.get(task_id)
        if task:
            self._execute_task(task)
    
    def _schedule_task(self, task: Task):
        """调度任务"""
        if not task.enabled:
            return
        
        schedule.clear(tag=task.id)
        
        # 解析时间间隔
        interval = task.interval.lower()
        
        if interval.endswith('s'):
            # 秒
            seconds = int(interval[:-1])
            schedule.every(seconds).seconds.do(
                self._execute_task, task
            ).tag(task.id)
        elif interval.endswith('m'):
            # 分钟
            minutes = int(interval[:-1])
            schedule.every(minutes).minutes.do(
                self._execute_task, task
            ).tag(task.id)
        elif interval.endswith('h'):
            # 小时
            hours = int(interval[:-1])
            schedule.every(hours).hours.do(
                self._execute_task, task
            ).tag(task.id)
        elif interval.endswith('d'):
            # 天
            days = int(interval[:-1])
            schedule.every(days).days.do(
                self._execute_task, task
            ).tag(task.id)
        else:
            # 尝试解析cron格式
            logger.warning(f"无法解析间隔: {interval}")
    
    def _execute_task(self, task: Task):
        """执行任务"""
        # 执行前钩子
        for hook in self.hooks:
            try:
                hook.before_run(task)
            except Exception as e:
                logger.error(f"钩子执行失败: {e}")
        
        task.status = TaskStatus.RUNNING
        task.last_run = datetime.now()
        task.run_count += 1
        
        result = None
        error = None
        
        try:
            # 带超时执行
            import signal
            
            def timeout_handler(signum, frame):
                raise TimeoutError(f"任务执行超时 ({task.timeout}s)")
            
            signal.signal(signal.SIGALRM, timeout_handler)
            signal.alarm(task.timeout)
            
            result = task.func()
            
            signal.alarm(0)  # 取消闹钟
            
            task.status = TaskStatus.SUCCESS
            task.success_count += 1
            task.last_error = None
            
        except Exception as e:
            error = str(e)
            task.status = TaskStatus.FAILED
            task.failed_count += 1
            task.last_error = error
            
            # 重试逻辑
            if task.retry_times > 0:
                self._schedule_retry(task, error)
        
        # 执行后钩子
        for hook in self.hooks:
            try:
                hook.after_run(task, result, error)
            except Exception as e:
                logger.error(f"钩子执行失败: {e}")
        
        # 计算下次执行时间
        self._update_next_run(task)
        self._save_tasks()
    
    def _schedule_retry(self, task: Task, error: str):
        """调度重试"""
        logger.warning(f"任务 {task.name} 失败,将于 {task.retry_interval}s 后重试")
        
        def retry():
            self._execute_task(task)
        
        threading.Timer(task.retry_interval, retry).start()
    
    def _update_next_run(self, task: Task):
        """更新下次执行时间"""
        # 从schedule模块获取下次执行时间
        for job in schedule.jobs:
            if task.id in job.tags:
                task.next_run = datetime.now() + job.period
                break
    
    def start(self):
        """启动调度器"""
        self.running = True
        
        def run_loop():
            while self.running:
                schedule.run_pending()
                time.sleep(1)
        
        thread = threading.Thread(target=run_loop, daemon=True)
        thread.start()
        logger.info("任务调度器已启动")
    
    def stop(self):
        """停止调度器"""
        self.running = False
        logger.info("任务调度器已停止")
    
    def get_status(self) -> Dict:
        """获取调度器状态"""
        tasks = self.list_tasks()
        
        return {
            'running': self.running,
            'total_tasks': len(tasks),
            'enabled_tasks': sum(1 for t in tasks if t.enabled),
            'tasks': [t.to_dict() for t in tasks],
            'summary': {
                'success_rate': f"{sum(t.success_count for t in tasks) / max(sum(t.run_count for t in tasks), 1) * 100:.1f}%",
                'total_runs': sum(t.run_count for t in tasks)
            }
        }
    
    def _save_tasks(self):
        """保存任务配置"""
        config_dir = Path.home() / '.task_scheduler'
        config_dir.mkdir(exist_ok=True)
        
        tasks_config = []
        for task in self.tasks.values():
            tasks_config.append({
                'id': task.id,
                'name': task.name,
                'interval': task.interval,
                'enabled': task.enabled,
                'timeout': task.timeout,
                'retry_times': task.retry_times,
                'retry_interval': task.retry_interval
            })
        
        with open(config_dir / 'tasks.json', 'w', encoding='utf-8') as f:
            json.dump(tasks_config, f, ensure_ascii=False, indent=2)
    
    def _load_tasks(self):
        """加载任务配置"""
        config_file = Path.home() / '.task_scheduler' / 'tasks.json'
        
        if not config_file.exists():
            return
        
        # 注意:这里只加载配置,具体的函数需要重新注册
        logger.info("找到保存的任务配置,请使用 add_task() 添加实际的任务函数")

可视化管理界面

import tkinter as tk
from tkinter import ttk, messagebox
from datetime import datetime
import threading

class TaskManagerUI:
    """任务管理界面"""
    
    def __init__(self, scheduler: TaskScheduler):
        self.scheduler = scheduler
        self.root = tk.Tk()
        self.root.title("任务调度管理器 v1.0")
        self.root.geometry("900x600")
        
        self._create_layout()
        self._start_auto_refresh()
    
    def _create_layout(self):
        """创建布局"""
        # 顶部工具栏
        toolbar = tk.Frame(self.root, bg='#3498db', height=50)
        toolbar.pack(fill='x', side='top')
        
        tk.Button(toolbar, text="▶ 启动", command=self._start_scheduler,
                 bg='#27ae60', fg='white', relief='flat').pack(side='left', padx=10, pady=10)
        tk.Button(toolbar, text="⏹ 停止", command=self._stop_scheduler,
                 bg='#e74c3c', fg='white', relief='flat').pack(side='left', padx=10, pady=10)
        tk.Button(toolbar, text="🔄 刷新", command=self._refresh_tasks,
                 bg='#f39c12', fg='white', relief='flat').pack(side='left', padx=10, pady=10)
        
        # 状态标签
        self.status_label = tk.Label(toolbar, text="状态: 未启动",
                                    bg='#3498db', fg='white', font=('微软雅黑', 10))
        self.status_label.pack(side='right', padx=10)
        
        # 左侧任务列表
        left_frame = tk.Frame(self.root, width=300)
        left_frame.pack(fill='y', side='left', padx=5, pady=5)
        
        tk.Label(left_frame, text="任务列表", font=('微软雅黑', 12, 'bold')).pack(anchor='w', pady=5)
        
        # 任务树
        tree_frame = tk.Frame(left_frame)
        tree_frame.pack(fill='both', expand=True)
        
        self.tree = ttk.Treeview(tree_frame, columns=('状态', '间隔'), show='headings')
        self.tree.heading('状态', text='状态')
        self.tree.heading('间隔', text='间隔')
        self.tree.column('状态', width=80)
        self.tree.column('间隔', width=80)
        self.tree.pack(side='left', fill='both', expand=True)
        
        scrollbar = ttk.Scrollbar(tree_frame, orient='vertical', command=self.tree.yview)
        scrollbar.pack(side='right', fill='y')
        self.tree.configure(yscrollcommand=scrollbar.set)
        
        # 右侧详情
        right_frame = tk.Frame(self.root)
        right_frame.pack(fill='both', expand=True, padx=5, pady=5)
        
        tk.Label(right_frame, text="任务详情", font=('微软雅黑', 12, 'bold')).pack(anchor='w', pady=5)
        
        # 详情面板
        detail_frame = tk.LabelFrame(right_frame, text="基本信息")
        detail_frame.pack(fill='x', pady=5)
        
        self.detail_labels = {}
        for i, label in enumerate(['任务ID:', '任务名称:', '执行间隔:', '超时时间:']):
            tk.Label(detail_frame, text=label, font=('微软雅黑', 9)).grid(
                row=i, column=0, sticky='w', padx=5, pady=3)
            self.detail_labels[label] = tk.Label(detail_frame, text='-', font=('Consolas', 9))
            self.detail_labels[label].grid(row=i, column=1, sticky='w', padx=5, pady=3)
        
        # 统计面板
        stats_frame = tk.LabelFrame(right_frame, text="执行统计")
        stats_frame.pack(fill='x', pady=5)
        
        self.stats_labels = {}
        for i, label in enumerate(['执行次数:', '成功次数:', '失败次数:', '成功率:', '最近执行:', '下次执行:']):
            tk.Label(stats_frame, text=label, font=('微软雅黑', 9)).grid(
                row=i, column=0, sticky='w', padx=5, pady=3)
            self.stats_labels[label] = tk.Label(stats_frame, text='-', font=('Consolas', 9))
            self.stats_labels[label].grid(row=i, column=1, sticky='w', padx=5, pady=3)
        
        # 操作按钮
        btn_frame = tk.Frame(right_frame)
        btn_frame.pack(fill='x', pady=10)
        
        tk.Button(btn_frame, text="立即执行", command=self._run_now,
                 bg='#3498db', fg='white').pack(side='left', padx=5)
        tk.Button(btn_frame, text="启用", command=self._enable_task,
                 bg='#27ae60', fg='white').pack(side='left', padx=5)
        tk.Button(btn_frame, text="禁用", command=self._disable_task,
                 bg='#f39c12', fg='white').pack(side='left', padx=5)
        tk.Button(btn_frame, text="删除", command=self._delete_task,
                 bg='#e74c3c', fg='white').pack(side='left', padx=5)
        
        # 绑定选中事件
        self.tree.bind('<<TreeviewSelect>>', self._on_task_select)
        
        # 底部日志
        log_frame = tk.LabelFrame(self.root, text="执行日志")
        log_frame.pack(fill='both', expand=True, padx=5, pady=5)
        
        self.log_text = tk.Text(log_frame, font=('Consolas', 9), height=8, state='disabled')
        self.log_text.pack(fill='both', expand=True, padx=5, pady=5)
    
    def _refresh_tasks(self):
        """刷新任务列表"""
        # 清空现有项
        for item in self.tree.get_children():
            self.tree.delete(item)
        
        # 添加任务
        for task in self.scheduler.list_tasks():
            status_color = {'pending': 'gray', 'running': 'blue', 
                          'success': 'green', 'failed': 'red', 'stopped': 'gray'}
            self.tree.insert('', 'end', values=(
                task.status.value, task.interval
            ), tags=(task.id,))
    
    def _on_task_select(self, event):
        """任务选中事件"""
        selection = self.tree.selection()
        if not selection:
            return
        
        # 获取选中的任务ID
        item = self.tree.item(selection[0])
        # 这里需要映射回task_id
        tasks = self.scheduler.list_tasks()
        if selection[0]:
            task = tasks[0] if tasks else None
            if task:
                self._update_task_detail(task)
    
    def _update_task_detail(self, task):
        """更新任务详情"""
        self.detail_labels['任务ID:'].config(text=task.id)
        self.detail_labels['任务名称:'].config(text=task.name)
        self.detail_labels['执行间隔:'].config(text=task.interval)
        self.detail_labels['超时时间:'].config(text=f"{task.timeout}秒")
        
        self.stats_labels['执行次数:'].config(text=str(task.run_count))
        self.stats_labels['成功次数:'].config(text=str(task.success_count))
        self.stats_labels['失败次数:'].config(text=str(task.failed_count))
        
        success_rate = task.success_count / task.run_count * 100 if task.run_count else 0
        self.stats_labels['成功率:'].config(text=f"{success_rate:.1f}%")
        
        self.stats_labels['最近执行:'].config(
            text=task.last_run.strftime('%Y-%m-%d %H:%M:%S') if task.last_run else '从未')
        self.stats_labels['下次执行:'].config(
            text=task.next_run.strftime('%Y-%m-%d %H:%M:%S') if task.next_run else '已禁用')
    
    def _start_scheduler(self):
        self.scheduler.start()
        self.status_label.config(text="状态: 运行中")
        self._log("调度器已启动")
    
    def _stop_scheduler(self):
        self.scheduler.stop()
        self.status_label.config(text="状态: 已停止")
        self._log("调度器已停止")
    
    def _run_now(self):
        messagebox.showinfo("提示", "请先选中要执行的任务")
    
    def _enable_task(self):
        self._log("启用任务")
    
    def _disable_task(self):
        self._log("禁用任务")
    
    def _delete_task(self):
        if messagebox.askyesno("确认", "确定要删除选中的任务吗?"):
            self._log("删除任务")
    
    def _log(self, message: str):
        """添加日志"""
        timestamp = datetime.now().strftime("%H:%M:%S")
        self.log_text.config(state='normal')
        self.log_text.insert('end', f"[{timestamp}] {message}\n")
        self.log_text.see('end')
        self.log_text.config(state='disabled')
    
    def _start_auto_refresh(self):
        """自动刷新"""
        def refresh():
            self._refresh_tasks()
            self.root.after(5000, refresh)
        
        self.root.after(5000, refresh)
    
    def run(self):
        self._refresh_tasks()
        self.root.mainloop()

使用示例

# 初始化调度器
scheduler = TaskScheduler()

# 添加通知钩子
scheduler.add_hook(NotificationTaskHook(
    webhook_url='https://oapi.dingtalk.com/robot/send?access_token=xxx'
))

# 定义任务函数
def backup_database():
    """数据库备份任务"""
    print("开始备份数据库...")
    # 实际备份逻辑
    time.sleep(2)
    print("备份完成")
    return True

def send_daily_report():
    """发送日报任务"""
    print("生成日报...")
    # 实际发送逻辑
    return True

def check_system_health():
    """健康检查任务"""
    print("检查系统状态...")
    # 返回检查结果
    return {'cpu': 50, 'memory': 70}

# 添加任务
scheduler.add_task(Task(
    id='backup_db',
    name='数据库备份',
    func=backup_database,
    interval='6h',  # 每6小时
    timeout=3600,
    retry_times=2
))

scheduler.add_task(Task(
    id='send_report',
    name='发送日报',
    func=send_daily_report,
    interval='1d',  # 每天
    timeout=300
))

scheduler.add_task(Task(
    id='health_check',
    name='健康检查',
    func=check_system_health,
    interval='30m',  # 每30分钟
    timeout=60
))

# 启动调度器
scheduler.start()

# 或启动UI界面
ui = TaskManagerUI(scheduler)
ui.run()

# 保持运行
try:
    while True:
        time.sleep(1)
except KeyboardInterrupt:
    scheduler.stop()

总结

任务调度系统要点:

  1. 灵活配置:支持多种时间间隔表达式
  2. 状态管理:完整记录任务执行状态
  3. 钩子机制:执行前后可插入自定义逻辑
  4. 重试机制:失败任务自动重试
  5. 可视化界面:直观管理所有任务

下期预告:Python自动化脚本的Docker容器化部署。

更多推荐