分享10个我在工作中实际使用的Python自动化脚本,涵盖Excel处理、邮件发送、文件整理、数据备份等常见场景,每个脚本都可以直接复制使用。

## 环境准备

```bash

pip install pandas openpyxl python-docx schedule smtplib-docs requests beautifulsoup4 watchdog pyautogui pillow

```

---

## 一、Excel自动化:合并多个工作簿

**场景**:每天需要把10个门店的销售数据合并成一张总表

```python

import pandas as pd

import os

from datetime import datetime

def merge_excel_files(folder_path, output_file):

    """

    合并文件夹下所有Excel文件到一个总表

    """

    all_data = []

   

    for filename in os.listdir(folder_path):

        if filename.endswith(('.xlsx', '.xls')):

            filepath = os.path.join(folder_path, filename)

            # 读取Excel,自动识别表头

            df = pd.read_excel(filepath)

            # 添加来源文件名列,方便溯源

            df['数据来源'] = filename

            all_data.append(df)

            print(f"已处理: {filename}")

   

    # 合并所有数据

    merged_df = pd.concat(all_data, ignore_index=True)

   

    # 添加汇总时间

    merged_df['汇总时间'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S')

   

    # 导出

    merged_df.to_excel(output_file, index=False)

    print(f"\n合并完成!共处理 {len(all_data)} 个文件,输出到: {output_file}")

# 使用示例

merge_excel_files(

    folder_path=r"D:\销售数据\2026年6月",

    output_file=r"D:\销售数据\6月汇总.xlsx"

)

```

**进阶版:带数据清洗的合并**

```python

def merge_and_clean(folder_path, output_file):

    all_data = []

   

    for filename in os.listdir(folder_path):

        if filename.endswith(('.xlsx', '.xls')):

            filepath = os.path.join(folder_path, filename)

            df = pd.read_excel(filepath)

           

            # 数据清洗

            df = df.dropna(subset=['订单号'])  # 删除没有订单号的行

            df['金额'] = pd.to_numeric(df['金额'], errors='coerce')  # 统一金额格式

            df['日期'] = pd.to_datetime(df['日期']).dt.strftime('%Y-%m-%d')  # 统一日期格式

            df['门店'] = filename.replace('.xlsx', '').replace('.xls', '')

           

            all_data.append(df)

   

    merged_df = pd.concat(all_data, ignore_index=True)

   

    # 生成数据透视表

    pivot = merged_df.pivot_table(

        values='金额',

        index='门店',

        columns='日期',

        aggfunc='sum',

        fill_value=0

    )

   

    # 同时输出明细表和汇总表

    with pd.ExcelWriter(output_file) as writer:

        merged_df.to_excel(writer, sheet_name='明细', index=False)

        pivot.to_excel(writer, sheet_name='门店汇总')

   

    print(f"完成!共 {len(merged_df)} 条记录")

merge_and_clean(r"D:\销售数据\6月", r"D:\销售数据\6月报表.xlsx")

```

---

## 二、批量重命名文件

**场景**:下载的文件名乱七八糟,需要统一格式

```python

import os

import re

def batch_rename(folder_path, pattern, replacement):

    """

    批量重命名文件

    pattern: 正则表达式

    replacement: 替换内容

    """

    count = 0

    for filename in os.listdir(folder_path):

        if os.path.isfile(os.path.join(folder_path, filename)):

            # 分离文件名和扩展名

            name, ext = os.path.splitext(filename)

           

            # 使用正则替换

            new_name = re.sub(pattern, replacement, name)

           

            if new_name != name:

                old_path = os.path.join(folder_path, filename)

                new_path = os.path.join(folder_path, new_name + ext)

                os.rename(old_path, new_path)

                print(f"重命名: {filename} -> {new_name + ext}")

                count += 1

   

    print(f"\n共重命名 {count} 个文件")

# 示例1:删除文件名中的多余空格和特殊字符

batch_rename(r"D:\下载", r'[\s\-\_]+', '_')

# 示例2:给文件添加日期前缀

def add_date_prefix(folder_path):

    from datetime import datetime

    date_str = datetime.now().strftime('%Y%m%d')

   

    for filename in os.listdir(folder_path):

        if os.path.isfile(os.path.join(folder_path, filename)):

            if not filename.startswith(date_str):

                old_path = os.path.join(folder_path, filename)

                new_path = os.path.join(folder_path, f"{date_str}_{filename}")

                os.rename(old_path, new_path)

                print(f"添加前缀: {filename}")

add_date_prefix(r"D:\日报")

```

---

## 三、自动发送邮件

**场景**:每天早上9点自动发送昨日数据报表给领导

```python

import smtplib

from email.mime.text import MIMEText

from email.mime.multipart import MIMEMultipart

from email.mime.application import MIMEApplication

import os

class EmailSender:

    def __init__(self, smtp_server, smtp_port, email, password):

        self.smtp_server = smtp_server

        self.smtp_port = smtp_port

        self.email = email

        self.password = password

   

    def send_email(self, to_addrs, subject, body, attachments=None):

        """

        发送邮件,支持附件

        """

        msg = MIMEMultipart()

        msg['From'] = self.email

        msg['To'] = ', '.join(to_addrs) if isinstance(to_addrs, list) else to_addrs

        msg['Subject'] = subject

       

        # 正文

        msg.attach(MIMEText(body, 'html', 'utf-8'))

       

        # 附件

        if attachments:

            for filepath in attachments:

                if os.path.exists(filepath):

                    with open(filepath, 'rb') as f:

                        attachment = MIMEApplication(f.read())

                        attachment.add_header(

                            'Content-Disposition',

                            'attachment',

                            filename=os.path.basename(filepath)

                        )

                        msg.attach(attachment)

       

        # 发送

        with smtplib.SMTP_SSL(self.smtp_server, self.smtp_port) as server:

            server.login(self.email, self.password)

            server.send_message(msg)

       

        print(f"邮件已发送至: {to_addrs}")

# 使用示例

sender = EmailSender(

    smtp_server='smtp.qq.com',

    smtp_port=465,

    email='your_email@qq.com',

    password='your_authorization_code'  # QQ邮箱需要使用授权码

)

# 发送带附件的报表邮件

sender.send_email(

    to_addrs=['leader@company.com'],

    subject='【日报】2026年6月2日销售数据',

    body='''

    <h3>领导好,</h3>

    <p>以下是今日销售数据报表,请查收:</p>

    <ul>

        <li>今日订单数:<b>156单</b></li>

        <li>今日销售额:<b>¥45,890</b></li>

        <li>环比昨日:<b style="color:green">↑12.5%</b></li>

    </ul>

    <p>详情请见附件。</p>

    <br>

    <p>此邮件由系统自动发送</p>

    ''',

    attachments=[

        r'D:\报表\20260602_销售日报.xlsx',

        r'D:\报表\20260602_趋势图.png'

    ]

)

```

---

## 四、文件自动分类整理

**场景**:下载文件夹乱成一锅粥,自动按类型分类

```python

import os

import shutil

from pathlib import Path

def organize_files(source_folder):

    """

    自动整理文件夹,按文件类型分类

    """

    # 定义分类规则

    file_categories = {

        '图片': ['.jpg', '.jpeg', '.png', '.gif', '.bmp', '.webp', '.svg'],

        '文档': ['.pdf', '.doc', '.docx', '.xls', '.xlsx', '.ppt', '.pptx', '.txt'],

        '视频': ['.mp4', '.avi', '.mkv', '.mov', '.wmv'],

        '音频': ['.mp3', '.wav', '.flac', '.aac'],

        '压缩包': ['.zip', '.rar', '.7z', '.tar', '.gz'],

        '代码': ['.py', '.js', '.html', '.css', '.java', '.cpp', '.c'],

        '安装包': ['.exe', '.msi', '.dmg', '.deb', '.rpm'],

    }

   

    # 创建分类文件夹

    for category in file_categories:

        category_path = os.path.join(source_folder, category)

        os.makedirs(category_path, exist_ok=True)

   

    # 创建"其他"文件夹

    other_path = os.path.join(source_folder, '其他')

    os.makedirs(other_path, exist_ok=True)

   

    moved_count = 0

   

    for filename in os.listdir(source_folder):

        filepath = os.path.join(source_folder, filename)

       

        # 跳过文件夹

        if os.path.isdir(filepath):

            continue

       

        # 获取文件扩展名

        ext = Path(filename).suffix.lower()

       

        # 确定分类

        target_category = '其他'

        for category, extensions in file_categories.items():

            if ext in extensions:

                target_category = category

                break

       

        # 移动文件

        target_path = os.path.join(source_folder, target_category, filename)

       

        # 处理重名文件

        if os.path.exists(target_path):

            name, extension = os.path.splitext(filename)

            counter = 1

            while os.path.exists(target_path):

                target_path = os.path.join(

                    source_folder,

                    target_category,

                    f"{name}_{counter}{extension}"

                )

                counter += 1

       

        shutil.move(filepath, target_path)

        print(f"移动: {filename} -> {target_category}/")

        moved_count += 1

   

    print(f"\n整理完成!共移动 {moved_count} 个文件")

# 使用示例

organize_files(r"C:\Users\Downloads")

```

---

## 五、定时任务:自动备份数据库

**场景**:每天凌晨3点自动备份MySQL数据库

```python

import subprocess

import os

from datetime import datetime

import schedule

import time

def backup_mysql(host, port, user, password, database, backup_dir):

    """

    备份MySQL数据库

    """

    # 生成备份文件名

    timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')

    backup_file = os.path.join(backup_dir, f"{database}_{timestamp}.sql")

   

    # 确保备份目录存在

    os.makedirs(backup_dir, exist_ok=True)

   

    # 构建mysqldump命令

    cmd = [

        'mysqldump',

        f'--host={host}',

        f'--port={port}',

        f'--user={user}',

        f'--password={password}',

        '--single-transaction',

        '--routines',

        '--triggers',

        database

    ]

   

    try:

        # 执行备份

        with open(backup_file, 'w', encoding='utf-8') as f:

            result = subprocess.run(cmd, stdout=f, stderr=subprocess.PIPE, text=True)

       

        if result.returncode == 0:

            file_size = os.path.getsize(backup_file) / (1024 * 1024)  # MB

            print(f"备份成功: {backup_file} ({file_size:.2f} MB)")

           

            # 清理30天前的旧备份

            cleanup_old_backups(backup_dir, days=30)

        else:

            print(f"备份失败: {result.stderr}")

   

    except Exception as e:

        print(f"备份异常: {e}")

def cleanup_old_backups(backup_dir, days=30):

    """清理超过指定天数的备份文件"""

    import glob

    from datetime import timedelta

   

    cutoff = datetime.now() - timedelta(days=days)

   

    for filepath in glob.glob(os.path.join(backup_dir, '*.sql')):

        filename = os.path.basename(filepath)

        # 从文件名提取日期

        try:

            date_str = filename.split('_')[1][:8]

            file_date = datetime.strptime(date_str, '%Y%m%d')

            if file_date < cutoff:

                os.remove(filepath)

                print(f"已删除过期备份: {filename}")

        except (IndexError, ValueError):

            continue

# 配置备份任务

def daily_backup():

    backup_mysql(

        host='localhost',

        port=3306,

        user='root',

        password='your_password',

        database='your_database',

        backup_dir=r'D:\数据库备份'

    )

# 设置每天凌晨3点执行

schedule.every().day.at("03:00").do(daily_backup)

print("备份任务已启动,每天凌晨3点自动执行...")

print("按 Ctrl+C 停止")

while True:

    schedule.run_pending()

    time.sleep(60)

```

---

## 六、网页数据抓取

**场景**:每天抓取竞品价格数据

```python

import requests

from bs4 import BeautifulSoup

import pandas as pd

from datetime import datetime

import time

import random

class PriceScraper:

    def __init__(self):

        self.headers = {

            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'

        }

        self.results = []

   

    def scrape_page(self, url):

        """

        抓取单页商品数据(示例结构,需根据实际网页调整)

        """

        try:

            response = requests.get(url, headers=self.headers, timeout=10)

            response.raise_for_status()

            soup = BeautifulSoup(response.text, 'html.parser')

           

            # 示例:抓取商品列表(根据实际网页结构调整选择器)

            products = soup.find_all('div', class_='product-item')

           

            for product in products:

                try:

                    name = product.find('h3', class_='title').text.strip()

                    price = product.find('span', class_='price').text.strip()

                    sales = product.find('span', class_='sales').text.strip()

                   

                    self.results.append({

                        '商品名称': name,

                        '价格': price,

                        '销量': sales,

                        '抓取时间': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),

                        '来源': url

                    })

                except AttributeError:

                    continue

           

            print(f"成功抓取 {len(products)} 条数据")

           

        except Exception as e:

            print(f"抓取失败: {e}")

   

    def scrape_multiple_pages(self, base_url, pages=10):

        """

        抓取多页数据

        """

        for page in range(1, pages + 1):

            url = f"{base_url}?page={page}"

            self.scrape_page(url)

            # 随机延迟,避免被封

            time.sleep(random.uniform(1, 3))

   

    def save_to_excel(self, output_file):

        """

        保存到Excel

        """

        df = pd.DataFrame(self.results)

        df.to_excel(output_file, index=False)

        print(f"数据已保存到: {output_file}")

# 使用示例

scraper = PriceScraper()

scraper.scrape_multiple_pages('https://example.com/products', pages=5)

scraper.save_to_excel(r'D:\竞品分析\价格数据.xlsx')

```

---

## 七、批量图片处理

**场景**:电商运营需要批量压缩、加水印、调整尺寸

```python

from PIL import Image, ImageDraw, ImageFont

import os

class ImageProcessor:

    def __init__(self, input_folder, output_folder):

        self.input_folder = input_folder

        self.output_folder = output_folder

        os.makedirs(output_folder, exist_ok=True)

   

    def resize_image(self, input_path, output_path, size=(800, 800)):

        """

        调整图片尺寸,保持比例

        """

        with Image.open(input_path) as img:

            # 保持宽高比

            img.thumbnail(size, Image.Resampling.LANCZOS)

            img.save(output_path, quality=95)

   

    def compress_image(self, input_path, output_path, quality=85):

        """

        压缩图片

        """

        with Image.open(input_path) as img:

            # 如果是PNG且不需要透明度,转为JPEG

            if img.mode == 'RGBA':

                img = img.convert('RGB')

            img.save(output_path, optimize=True, quality=quality)

   

    def add_watermark(self, input_path, output_path, text, position='bottom-right'):

        """

        添加文字水印

        """

        with Image.open(input_path) as img:

            if img.mode == 'RGBA':

                img = img.convert('RGB')

           

            draw = ImageDraw.Draw(img)

           

            # 尝试加载字体

            try:

                font = ImageFont.truetype("msyh.ttc", 36)  # 微软雅黑

            except:

                font = ImageFont.load_default()

           

            # 计算文字位置

            bbox = draw.textbbox((0, 0), text, font=font)

            text_width = bbox[2] - bbox[0]

            text_height = bbox[3] - bbox[1]

           

            if position == 'bottom-right':

                x = img.width - text_width - 20

                y = img.height - text_height - 20

            elif position == 'center':

                x = (img.width - text_width) // 2

                y = (img.height - text_height) // 2

            else:

                x, y = 20, 20

           

            # 绘制半透明背景

            draw.rectangle(

                [x - 10, y - 10, x + text_width + 10, y + text_height + 10],

                fill=(0, 0, 0, 128)

            )

           

            # 绘制文字

            draw.text((x, y), text, fill='white', font=font)

            img.save(output_path, quality=95)

   

    def batch_process(self, operation, **kwargs):

        """

        批量处理图片

        """

        count = 0

        for filename in os.listdir(self.input_folder):

            if filename.lower().endswith(('.jpg', '.jpeg', '.png', '.webp')):

                input_path = os.path.join(self.input_folder, filename)

                output_path = os.path.join(self.output_folder, filename)

               

                if operation == 'resize':

                    self.resize_image(input_path, output_path, kwargs.get('size', (800, 800)))

                elif operation == 'compress':

                    self.compress_image(input_path, output_path, kwargs.get('quality', 85))

                elif operation == 'watermark':

                    self.add_watermark(input_path, output_path, kwargs.get('text', '水印'))

               

                count += 1

                print(f"已处理: {filename}")

       

        print(f"\n处理完成!共 {count} 张图片")

# 使用示例

processor = ImageProcessor(

    input_folder=r'D:\商品图片\原图',

    output_folder=r'D:\商品图片\处理后'

)

# 批量压缩

processor.batch_process('compress', quality=80)

# 批量添加水印

processor.batch_process('watermark', text='© 2026 我的店铺')

# 批量调整尺寸(适合淘宝主图)

processor.batch_process('resize', size=(800, 800))

```

---

## 八、自动监控文件夹变化

**场景**:监控文件夹,有新文件自动处理

```python

import time

from watchdog.observers import Observer

from watchdog.events import FileSystemEventHandler

import os

import shutil

class FileHandler(FileSystemEventHandler):

    def __init__(self, target_folder):

        self.target_folder = target_folder

        os.makedirs(target_folder, exist_ok=True)

   

    def on_created(self, event):

        """

        当有新文件创建时触发

        """

        if event.is_directory:

            return

       

        filepath = event.src_path

        filename = os.path.basename(filepath)

       

        print(f"检测到新文件: {filename}")

       

        # 等待文件写入完成

        time.sleep(1)

       

        # 根据文件类型自动处理

        if filename.endswith(('.xlsx', '.xls')):

            self.process_excel(filepath, filename)

        elif filename.endswith(('.jpg', '.png')):

            self.process_image(filepath, filename)

        elif filename.endswith('.pdf'):

            self.process_pdf(filepath, filename)

   

    def process_excel(self, filepath, filename):

        """自动处理Excel文件"""

        import pandas as pd

       

        print(f"正在处理Excel: {filename}")

       

        # 读取并汇总

        df = pd.read_excel(filepath)

       

        # 添加处理时间列

        df['处理时间'] = time.strftime('%Y-%m-%d %H:%M:%S')

       

        # 保存到目标文件夹

        output_path = os.path.join(self.target_folder, f"已处理_{filename}")

        df.to_excel(output_path, index=False)

       

        print(f"处理完成: {filename}")

   

    def process_image(self, filepath, filename):

        """自动压缩图片"""

        from PIL import Image

       

        print(f"正在压缩图片: {filename}")

       

        with Image.open(filepath) as img:

            if img.mode == 'RGBA':

                img = img.convert('RGB')

           

            output_path = os.path.join(self.target_folder, filename)

            img.save(output_path, optimize=True, quality=85)

       

        print(f"压缩完成: {filename}")

   

    def process_pdf(self, filepath, filename):

        """移动PDF到指定文件夹"""

        output_path = os.path.join(self.target_folder, filename)

        shutil.copy2(filepath, output_path)

        print(f"已备份: {filename}")

def start_monitor(watch_folder, target_folder):

    """

    启动文件夹监控

    """

    event_handler = FileHandler(target_folder)

    observer = Observer()

    observer.schedule(event_handler, watch_folder, recursive=False)

    observer.start()

   

    print(f"开始监控文件夹: {watch_folder}")

    print(f"处理后的文件保存到: {target_folder}")

    print("按 Ctrl+C 停止监控\n")

   

    try:

        while True:

            time.sleep(1)

    except KeyboardInterrupt:

        observer.stop()

        print("\n监控已停止")

   

    observer.join()

# 使用示例

start_monitor(

    watch_folder=r'C:\Users\Downloads',

    target_folder=r'D:\已处理文件'

)

```

---

## 九、自动生成周报

**场景**:每周五自动汇总本周工作,生成周报

```python

import os

from datetime import datetime, timedelta

import pandas as pd

class WeeklyReportGenerator:

    def __init__(self, work_log_folder):

        self.work_log_folder = work_log_folder

   

    def get_week_range(self):

        """获取本周日期范围"""

        today = datetime.now()

        # 找到本周一

        monday = today - timedelta(days=today.weekday())

        # 本周日

        sunday = monday + timedelta(days=6)

        return monday.strftime('%Y-%m-%d'), sunday.strftime('%Y-%m-%d')

   

    def collect_week_logs(self):

        """收集本周的工作日志"""

        monday, sunday = self.get_week_range()

        logs = []

       

        for filename in os.listdir(self.work_log_folder):

            if filename.endswith('.txt'):

                # 从文件名提取日期(假设格式:20260601_日志.txt)

                try:

                    date_str = filename[:8]

                    file_date = datetime.strptime(date_str, '%Y%m%d').strftime('%Y-%m-%d')

                   

                    if monday <= file_date <= sunday:

                        filepath = os.path.join(self.work_log_folder, filename)

                        with open(filepath, 'r', encoding='utf-8') as f:

                            content = f.read()

                       

                        logs.append({

                            '日期': file_date,

                            '内容': content

                        })

                except (ValueError, IndexError):

                    continue

       

        return sorted(logs, key=lambda x: x['日期'])

   

    def generate_report(self):

        """生成周报"""

        monday, sunday = self.get_week_range()

        logs = self.collect_week_logs()

       

        report = f"""

# 工作周报

**周期**:{monday} 至 {sunday}

**生成时间**:{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}

---

## 一、本周工作总结

"""

        for log in logs:

            report += f"### {log['日期']}\n"

            report += f"{log['内容']}\n\n"

       

        report += """

---

## 二、下周工作计划

- [ ] 待补充

- [ ] 待补充

- [ ] 待补充

---

## 三、需要协调的问题

1. 待补充

2. 待补充

---

## 四、本周数据指标

| 指标 | 目标 | 实际 | 完成率 |

|------|------|------|--------|

| 待补充 | - | - | - |

| 待补充 | - | - | - |

"""

        return report

   

    def save_report(self, output_path):

        """保存周报"""

        report = self.generate_report()

       

        with open(output_path, 'w', encoding='utf-8') as f:

            f.write(report)

       

        print(f"周报已生成: {output_path}")

# 使用示例

generator = WeeklyReportGenerator(r'D:\工作日志')

generator.save_report(r'D:\工作日志\本周周报.md')

```

---

## 十、系统监控告警

**场景**:服务器CPU/内存/磁盘使用率过高时自动告警

```python

import psutil

import time

from datetime import datetime

class SystemMonitor:

    def __init__(self, cpu_threshold=80, memory_threshold=80, disk_threshold=90):

        self.cpu_threshold = cpu_threshold

        self.memory_threshold = memory_threshold

        self.disk_threshold = disk_threshold

        self.alerts = []

   

    def check_cpu(self):

        """检查CPU使用率"""

        cpu_percent = psutil.cpu_percent(interval=1)

        if cpu_percent > self.cpu_threshold:

            alert = f"[CPU告警] 使用率 {cpu_percent}% 超过阈值 {self.cpu_threshold}%"

            self.alerts.append(alert)

            print(f"⚠️ {alert}")

        return cpu_percent

   

    def check_memory(self):

        """检查内存使用率"""

        memory = psutil.virtual_memory()

        if memory.percent > self.memory_threshold:

            alert = f"[内存告警] 使用率 {memory.percent}% 超过阈值 {self.memory_threshold}%"

            self.alerts.append(alert)

            print(f"⚠️ {alert}")

        return memory.percent

   

    def check_disk(self):

        """检查磁盘使用率"""

        alerts = []

        for partition in psutil.disk_partitions():

            try:

                usage = psutil.disk_usage(partition.mountpoint)

                if usage.percent > self.disk_threshold:

                    alert = f"[磁盘告警] {partition.mountpoint} 使用率 {usage.percent}%"

                    alerts.append(alert)

                    self.alerts.append(alert)

                    print(f"⚠️ {alert}")

            except PermissionError:

                continue

        return alerts

   

    def get_system_info(self):

        """获取系统信息"""

        cpu = self.check_cpu()

        memory = self.check_memory()

        self.check_disk()

       

        info = f"""

系统状态 - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}

CPU使用率: {cpu}%

内存使用率: {memory}%

告警数量: {len(self.alerts)}

"""

        return info

   

    def send_alert(self, message):

        """

        发送告警(可对接钉钉/企业微信/邮件)

        """

        # 这里可以接入钉钉机器人、企业微信、邮件等

        # 示例:钉钉机器人

        import requests

       

        webhook = "https://oapi.dingtalk.com/robot/send?access_token=YOUR_TOKEN"

       

        data = {

            "msgtype": "text",

            "text": {

                "content": f"【服务器告警】\n{message}"

            }

        }

       

        try:

            response = requests.post(webhook, json=data)

            if response.status_code == 200:

                print("告警已发送")

        except Exception as e:

            print(f"发送告警失败: {e}")

   

    def run(self, interval=60):

        """持续监控"""

        print(f"系统监控已启动,检查间隔 {interval} 秒")

        print(f"告警阈值 - CPU: {self.cpu_threshold}%, 内存: {self.memory_threshold}%, 磁盘: {self.disk_threshold}%")

        print("按 Ctrl+C 停止\n")

       

        try:

            while True:

                self.alerts = []  # 重置告警

                info = self.get_system_info()

                print(info)

               

                # 如果有告警,发送通知

                if self.alerts:

                    alert_message = "\n".join(self.alerts)

                    self.send_alert(alert_message)

               

                time.sleep(interval)

        except KeyboardInterrupt:

            print("\n监控已停止")

# 使用示例

monitor = SystemMonitor(

    cpu_threshold=80,

    memory_threshold=80,

    disk_threshold=90

)

monitor.run(interval=300)  # 每5分钟检查一次

```

---

## 总结

以上10个脚本涵盖了日常办公中最常见的自动化场景:

| 脚本 | 应用场景 | 难度 |

|------|---------|------|

| Excel合并 | 数据汇总 | ⭐ |

| 批量重命名 | 文件整理 | ⭐ |

| 自送邮件 | 日报/周报 | ⭐⭐ |

| 文件分类 | 下载整理 | ⭐ |

| 数据库备份 | 运维 | ⭐⭐ |

| 网页抓取 | 数据采集 | ⭐⭐⭐ |

| 图片处理 | 电商运营 | ⭐⭐ |

| 文件夹监控 | 自动化流程 | ⭐⭐ |

| 自动生成周报 | 办公效率 | ⭐⭐ |

| 系统监控 | 运维告警 | ⭐⭐⭐ |

**建议**:

1. 根据自己的需求选择2-3个脚本先用起来

2. 理解代码逻辑后,根据实际情况修改参数

3. 可以用Windows任务计划程序或Linux crontab设置定时执行

---

> 如果这篇文章对你有帮助,点个赞👍收藏⭐支持一下,有问题欢迎评论区讨论!

更多推荐