基于Python的邮件自动收取与GLM大模型邮件简报系统

一、解决方案思路

方案1:本地IMAP + GLM API + 定时任务

  • 架构:Python脚本定期连接邮箱IMAP服务器 → 获取邮件 → 调用GLM API进行总结 → 生成简报
  • 优点:实现简单,依赖少,成本低
  • 缺点:需要暴露邮箱密码,安全性较低,无法处理大量邮件

方案2:OAuth2认证 + 异步处理 + 缓存机制

  • 架构:使用OAuth2安全认证 → 异步获取邮件 → 本地缓存避免重复处理 → 批量调用GLM API
  • 优点:安全性高,支持大规模邮件处理,性能较好
  • 缺点:实现复杂度高,需要配置OAuth2

方案3:微服务架构 + 消息队列 + 容器化部署

  • 架构:邮件收取服务 + 处理服务 + 简报生成服务,通过消息队列解耦,容器化部署
  • 优点:可扩展性强,高可用,易于维护
  • 缺点:架构复杂,资源消耗大,适合企业级应用

方案对比分析

方案 安全性 复杂度 性能 成本 适用场景
方案1 个人使用、原型验证
方案2 中小企业、生产环境
方案3 很高 大型企业、高并发场景

选择方案2作为最优方案:在安全性、复杂度和性能之间取得最佳平衡,适合大多数实际应用场景。
在这里插入图片描述

二、完整项目结构

email-digest-system/
├── config/
│   ├── __init__.py
│   ├── settings.py
│   └── email_config.py
├── core/
│   ├── __init__.py
│   ├── email_fetcher.py
│   ├── email_processor.py
│   ├── glm_client.py
│   └── digest_generator.py
├── models/
│   ├── __init__.py
│   └── email_model.py
├── utils/
│   ├── __init__.py
│   ├── cache_manager.py
│   ├── logger.py
│   └── security.py
├── services/
│   ├── __init__.py
│   └── scheduler.py
├── tests/
│   ├── __init__.py
│   └── test_email_fetcher.py
├── templates/
│   └── digest_template.html
├── .env
├── requirements.txt
├── main.py
├── README.md
└── docker-compose.yml

三、核心代码实现

1. 配置文件

config/settings.py

import os
from pathlib import Path

# 项目根目录
BASE_DIR = Path(__file__).resolve().parent.parent

# 环境变量加载
from dotenv import load_dotenv
load_dotenv(BASE_DIR / '.env')

class Settings:
    # 邮箱配置
    EMAIL_HOST = os.getenv('EMAIL_HOST', 'imap.gmail.com')
    EMAIL_PORT = int(os.getenv('EMAIL_PORT', '993'))
    EMAIL_ADDRESS = os.getenv('EMAIL_ADDRESS')
    EMAIL_PASSWORD = os.getenv('EMAIL_PASSWORD')
    EMAIL_OAUTH2 = os.getenv('EMAIL_OAUTH2', 'false').lower() == 'true'
    
    # GLM配置
    GLM_API_KEY = os.getenv('GLM_API_KEY')
    GLM_MODEL = os.getenv('GLM_MODEL', 'glm-4')
    GLM_BASE_URL = os.getenv('GLM_BASE_URL', 'https://open.bigmodel.cn/api/paas/v4')
    
    # 缓存配置
    CACHE_DIR = BASE_DIR / 'cache'
    CACHE_EXPIRE_HOURS = int(os.getenv('CACHE_EXPIRE_HOURS', '24'))
    
    # 调度配置
    CHECK_INTERVAL_MINUTES = int(os.getenv('CHECK_INTERVAL_MINUTES', '30'))
    MAX_EMAILS_PER_RUN = int(os.getenv('MAX_EMAILS_PER_RUN', '50'))
    
    # 日志配置
    LOG_LEVEL = os.getenv('LOG_LEVEL', 'INFO')
    LOG_FILE = BASE_DIR / 'logs' / 'email_digest.log'

settings = Settings()

.env 示例

# 邮箱配置
EMAIL_HOST=imap.gmail.com
EMAIL_PORT=993
EMAIL_ADDRESS=your_email@gmail.com
EMAIL_PASSWORD=your_app_password
EMAIL_OAUTH2=false

# GLM配置
GLM_API_KEY=your_glm_api_key
GLM_MODEL=glm-4
GLM_BASE_URL=https://open.bigmodel.cn/api/paas/v4

# 应用配置
CHECK_INTERVAL_MINUTES=30
MAX_EMAILS_PER_RUN=20
CACHE_EXPIRE_HOURS=24
LOG_LEVEL=INFO

2. 邮件模型

models/email_model.py

from dataclasses import dataclass
from datetime import datetime
from typing import Optional, List

@dataclass
class Email:
    id: str
    subject: str
    sender: str
    recipients: List[str]
    date: datetime
    body: str
    body_html: Optional[str] = None
    attachments: List[str] = None
    summary: Optional[str] = None
    
    def __post_init__(self):
        if self.attachments is None:
            self.attachments = []

3. 邮件获取器

core/email_fetcher.py

import imaplib
import email
from email.header import decode_header
import time
from typing import List, Optional
from datetime import datetime, timedelta
import logging

from config.settings import settings
from models.email_model import Email
from utils.logger import get_logger

logger = get_logger(__name__)

class EmailFetcher:
    def __init__(self):
        self.imap_server = None
        self.connected = False
        
    def connect(self) -> bool:
        """连接到IMAP服务器"""
        try:
            self.imap_server = imaplib.IMAP4_SSL(
                settings.EMAIL_HOST, 
                settings.EMAIL_PORT
            )
            self.imap_server.login(settings.EMAIL_ADDRESS, settings.EMAIL_PASSWORD)
            self.connected = True
            logger.info(f"成功连接到邮箱: {settings.EMAIL_ADDRESS}")
            return True
        except Exception as e:
            logger.error(f"连接邮箱失败: {e}")
            return False
            
    def disconnect(self):
        """断开连接"""
        if self.imap_server and self.connected:
            self.imap_server.logout()
            self.connected = False
            logger.info("邮箱连接已断开")
            
    def _decode_mime_words(self, s: str) -> str:
        """解码MIME编码的字符串"""
        decoded_fragments = decode_header(s)
        fragments = []
        for fragment, encoding in decoded_fragments:
            if isinstance(fragment, bytes):
                if encoding:
                    fragment = fragment.decode(encoding)
                else:
                    fragment = fragment.decode('utf-8', errors='ignore')
            fragments.append(fragment)
        return ''.join(fragments)
        
    def _get_email_body(self, msg) -> tuple:
        """提取邮件正文"""
        body = ""
        body_html = ""
        
        if msg.is_multipart():
            for part in msg.walk():
                content_type = part.get_content_type()
                content_disposition = str(part.get("Content-Disposition"))
                
                if "attachment" not in content_disposition:
                    charset = part.get_content_charset() or 'utf-8'
                    try:
                        payload = part.get_payload(decode=True)
                        if payload:
                            text = payload.decode(charset, errors='ignore')
                            if content_type == "text/plain":
                                body += text
                            elif content_type == "text/html":
                                body_html += text
                    except Exception as e:
                        logger.warning(f"解析邮件正文失败: {e}")
        else:
            charset = msg.get_content_charset() or 'utf-8'
            try:
                payload = msg.get_payload(decode=True)
                if payload:
                    body = payload.decode(charset, errors='ignore')
            except Exception as e:
                logger.warning(f"解析邮件正文失败: {e}")
                
        return body, body_html
        
    def fetch_emails(self, since_days: int = 1) -> List[Email]:
        """获取指定天数内的邮件"""
        if not self.connected:
            if not self.connect():
                return []
                
        try:
            # 选择收件箱
            self.imap_server.select('INBOX')
            
            # 计算日期范围
            since_date = (datetime.now() - timedelta(days=since_days)).strftime("%d-%b-%Y")
            search_criteria = f'(SINCE "{since_date}")'
            
            # 搜索邮件
            status, messages = self.imap_server.search(None, search_criteria)
            if status != 'OK':
                logger.error("搜索邮件失败")
                return []
                
            email_ids = messages[0].split()
            logger.info(f"找到 {len(email_ids)} 封邮件")
            
            # 限制处理数量
            email_ids = email_ids[-settings.MAX_EMAILS_PER_RUN:] if email_ids else []
            
            emails = []
            for email_id in email_ids:
                try:
                    # 获取邮件
                    status, msg_data = self.imap_server.fetch(email_id, '(RFC822)')
                    if status != 'OK':
                        continue
                        
                    msg = email.message_from_bytes(msg_data[0][1])
                    
                    # 解析邮件信息
                    subject = self._decode_mime_words(msg.get("Subject", ""))
                    sender = self._decode_mime_words(msg.get("From", ""))
                    recipients = [self._decode_mime_words(r) for r in msg.get_all("To", [])]
                    date_str = msg.get("Date", "")
                    
                    try:
                        email_date = email.utils.parsedate_to_datetime(date_str)
                    except:
                        email_date = datetime.now()
                    
                    body, body_html = self._get_email_body(msg)
                    
                    email_obj = Email(
                        id=email_id.decode('utf-8'),
                        subject=subject,
                        sender=sender,
                        recipients=recipients,
                        date=email_date,
                        body=body[:10000],  # 限制正文长度
                        body_html=body_html[:10000] if body_html else None
                    )
                    
                    emails.append(email_obj)
                    
                except Exception as e:
                    logger.error(f"解析邮件 {email_id} 失败: {e}")
                    continue
                    
            return emails
            
        except Exception as e:
            logger.error(f"获取邮件失败: {e}")
            return []
            
    def __enter__(self):
        self.connect()
        return self
        
    def __exit__(self, exc_type, exc_val, exc_tb):
        self.disconnect()

4. GLM客户端

core/glm_client.py

import asyncio
import aiohttp
import json
from typing import Optional, List
import logging

from config.settings import settings
from utils.logger import get_logger

logger = get_logger(__name__)

class GLMClient:
    def __init__(self):
        self.api_key = settings.GLM_API_KEY
        self.base_url = settings.GLM_BASE_URL
        self.model = settings.GLM_MODEL
        
    async def summarize_email(self, email_content: str, subject: str = "") -> Optional[str]:
        """使用GLM模型总结邮件内容"""
        if not self.api_key:
            logger.error("GLM API key 未配置")
            return None
            
        headers = {
            "Authorization": f"Bearer {self.api_key}",
            "Content-Type": "application/json"
        }
        
        # 构建提示词
        prompt = f"""
        请为以下邮件生成简洁的中文摘要,突出重点信息:
        
        邮件主题:{subject}
        邮件内容:{email_content}
        
        要求:
        1. 摘要控制在100字以内
        2. 突出关键信息和行动项
        3. 语言简洁明了
        """
        
        payload = {
            "model": self.model,
            "messages": [
                {"role": "user", "content": prompt}
            ],
            "temperature": 0.3,
            "max_tokens": 150
        }
        
        try:
            async with aiohttp.ClientSession() as session:
                async with session.post(
                    f"{self.base_url}/chat/completions",
                    headers=headers,
                    json=payload,
                    timeout=aiohttp.ClientTimeout(total=30)
                ) as response:
                    if response.status == 200:
                        result = await response.json()
                        summary = result['choices'][0]['message']['content'].strip()
                        return summary
                    else:
                        error_text = await response.text()
                        logger.error(f"GLM API 调用失败: {response.status} - {error_text}")
                        return None
                        
        except Exception as e:
            logger.error(f"GLM API 调用异常: {e}")
            return None
            
    async def batch_summarize(self, emails: List) -> List:
        """批量总结邮件"""
        tasks = []
        for email_obj in emails:
            if email_obj.body.strip():
                task = self.summarize_email(email_obj.body, email_obj.subject)
                tasks.append((email_obj, task))
            else:
                email_obj.summary = "邮件内容为空"
                
        if tasks:
            results = await asyncio.gather(*[task for _, task in tasks], return_exceptions=True)
            
            for i, (email_obj, _) in enumerate(tasks):
                if isinstance(results[i], Exception):
                    email_obj.summary = f"总结失败: {str(results[i])}"
                else:
                    email_obj.summary = results[i] or "总结失败"
                    
        return emails

5. 缓存管理器

utils/cache_manager.py

import json
import os
import time
from pathlib import Path
from typing import Optional, Dict, Any
from datetime import datetime, timedelta

from config.settings import settings
from utils.logger import get_logger

logger = get_logger(__name__)

class CacheManager:
    def __init__(self):
        self.cache_dir = settings.CACHE_DIR
        self.cache_dir.mkdir(exist_ok=True)
        self.cache_file = self.cache_dir / 'processed_emails.json'
        self.cache_data = self._load_cache()
        
    def _load_cache(self) -> Dict[str, Any]:
        """加载缓存数据"""
        if self.cache_file.exists():
            try:
                with open(self.cache_file, 'r', encoding='utf-8') as f:
                    return json.load(f)
            except Exception as e:
                logger.warning(f"加载缓存失败: {e}")
        return {}
        
    def _save_cache(self):
        """保存缓存数据"""
        try:
            with open(self.cache_file, 'w', encoding='utf-8') as f:
                json.dump(self.cache_data, f, ensure_ascii=False, indent=2)
        except Exception as e:
            logger.error(f"保存缓存失败: {e}")
            
    def is_processed(self, email_id: str) -> bool:
        """检查邮件是否已处理"""
        if email_id in self.cache_data:
            processed_time = datetime.fromisoformat(self.cache_data[email_id])
            expire_time = processed_time + timedelta(hours=settings.CACHE_EXPIRE_HOURS)
            if datetime.now() < expire_time:
                return True
            else:
                # 缓存过期,删除记录
                del self.cache_data[email_id]
                self._save_cache()
        return False
        
    def mark_processed(self, email_id: str):
        """标记邮件为已处理"""
        self.cache_data[email_id] = datetime.now().isoformat()
        self._save_cache()
        
    def cleanup_expired(self):
        """清理过期缓存"""
        current_time = datetime.now()
        expired_ids = []
        
        for email_id, processed_time_str in self.cache_data.items():
            processed_time = datetime.fromisoformat(processed_time_str)
            expire_time = processed_time + timedelta(hours=settings.CACHE_EXPIRE_HOURS)
            if current_time >= expire_time:
                expired_ids.append(email_id)
                
        for email_id in expired_ids:
            del self.cache_data[email_id]
            
        if expired_ids:
            self._save_cache()
            logger.info(f"清理了 {len(expired_ids)} 个过期缓存记录")

6. 简报生成器

core/digest_generator.py

import json
from datetime import datetime
from typing import List
from jinja2 import Environment, FileSystemLoader
import os

from models.email_model import Email
from config.settings import settings
from utils.logger import get_logger

logger = get_logger(__name__)

class DigestGenerator:
    def __init__(self):
        template_dir = os.path.join(os.path.dirname(__file__), '..', 'templates')
        self.env = Environment(loader=FileSystemLoader(template_dir))
        
    def generate_text_digest(self, emails: List[Email]) -> str:
        """生成文本格式的简报"""
        digest = f"邮件简报 - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n"
        digest += "=" * 50 + "\n\n"
        
        for i, email in enumerate(emails, 1):
            digest += f"{i}. 主题: {email.subject}\n"
            digest += f"   发件人: {email.sender}\n"
            digest += f"   时间: {email.date.strftime('%Y-%m-%d %H:%M')}\n"
            digest += f"   摘要: {email.summary or '无摘要'}\n"
            digest += "-" * 50 + "\n"
            
        return digest
        
    def generate_html_digest(self, emails: List[Email]) -> str:
        """生成HTML格式的简报"""
        try:
            template = self.env.get_template('digest_template.html')
            return template.render(
                emails=emails,
                generated_time=datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
                total_count=len(emails)
            )
        except Exception as e:
            logger.error(f"生成HTML简报失败: {e}")
            return self.generate_text_digest(emails)
            
    def save_digest(self, digest_content: str, format_type: str = 'html'):
        """保存简报到文件"""
        output_dir = settings.BASE_DIR / 'output'
        output_dir.mkdir(exist_ok=True)
        
        filename = f"email_digest_{datetime.now().strftime('%Y%m%d_%H%M%S')}.{format_type}"
        filepath = output_dir / filename
        
        with open(filepath, 'w', encoding='utf-8') as f:
            f.write(digest_content)
            
        logger.info(f"简报已保存: {filepath}")
        return filepath

templates/digest_template.html

<!DOCTYPE html>
<html lang="zh-CN">
<head>
    <meta charset="UTF-8">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <title>邮件简报</title>
    <style>
        body { font-family: Arial, sans-serif; margin: 20px; background-color: #f5f5f5; }
        .container { max-width: 800px; margin: 0 auto; background: white; padding: 20px; border-radius: 10px; box-shadow: 0 2px 10px rgba(0,0,0,0.1); }
        .header { text-align: center; margin-bottom: 30px; }
        .header h1 { color: #333; margin-bottom: 10px; }
        .header .time { color: #666; font-size: 14px; }
        .email-item { border: 1px solid #e0e0e0; margin-bottom: 20px; padding: 15px; border-radius: 5px; }
        .email-item h3 { color: #2c3e50; margin-top: 0; }
        .email-meta { color: #7f8c8d; font-size: 12px; margin-bottom: 10px; }
        .email-summary { color: #34495e; line-height: 1.5; }
        .total-count { text-align: right; color: #7f8c8d; margin-top: 20px; }
    </style>
</head>
<body>
    <div class="container">
        <div class="header">
            <h1>📧 邮件简报</h1>
            <div class="time">生成时间: {{ generated_time }}</div>
        </div>
        
        {% for email in emails %}
        <div class="email-item">
            <h3>{{ loop.index }}. {{ email.subject }}</h3>
            <div class="email-meta">
                发件人: {{ email.sender }} | 
                时间: {{ email.date.strftime('%Y-%m-%d %H:%M') }}
            </div>
            <div class="email-summary">
                {% if email.summary %}
                    {{ email.summary }}
                {% else %}
                    <em>摘要生成失败</em>
                {% endif %}
            </div>
        </div>
        {% endfor %}
        
        <div class="total-count">共 {{ total_count }} 封邮件</div>
    </div>
</body>
</html>

7. 主调度服务

services/scheduler.py

import asyncio
import time
from datetime import datetime
import logging

from core.email_fetcher import EmailFetcher
from core.glm_client import GLMClient
from core.digest_generator import DigestGenerator
from utils.cache_manager import CacheManager
from utils.logger import get_logger
from config.settings import settings

logger = get_logger(__name__)

class EmailDigestScheduler:
    def __init__(self):
        self.fetcher = EmailFetcher()
        self.glm_client = GLMClient()
        self.digest_generator = DigestGenerator()
        self.cache_manager = CacheManager()
        
    async def process_emails(self):
        """处理邮件并生成简报"""
        logger.info("开始处理邮件...")
        
        # 获取邮件
        with self.fetcher as fetcher:
            emails = fetcher.fetch_emails(since_days=1)
            
        if not emails:
            logger.info("没有新邮件需要处理")
            return
            
        # 过滤已处理的邮件
        new_emails = []
        for email in emails:
            if not self.cache_manager.is_processed(email.id):
                new_emails.append(email)
            else:
                logger.debug(f"邮件 {email.id} 已处理,跳过")
                
        if not new_emails:
            logger.info("没有新的未处理邮件")
            return
            
        logger.info(f"发现 {len(new_emails)} 封新邮件,开始生成摘要...")
        
        # 生成摘要
        summarized_emails = await self.glm_client.batch_summarize(new_emails)
        
        # 标记为已处理
        for email in summarized_emails:
            self.cache_manager.mark_processed(email.id)
            
        # 生成简报
        html_digest = self.digest_generator.generate_html_digest(summarized_emails)
        text_digest = self.digest_generator.generate_text_digest(summarized_emails)
        
        # 保存简报
        html_file = self.digest_generator.save_digest(html_digest, 'html')
        text_file = self.digest_generator.save_digest(text_digest, 'txt')
        
        logger.info(f"简报生成完成!HTML: {html_file}, TXT: {text_file}")
        
    async def run_scheduler(self):
        """运行调度器"""
        logger.info("邮件简报调度器启动")
        
        while True:
            try:
                await self.process_emails()
                
                # 等待下次执行
                logger.info(f"等待 {settings.CHECK_INTERVAL_MINUTES} 分钟后再次检查...")
                await asyncio.sleep(settings.CHECK_INTERVAL_MINUTES * 60)
                
            except KeyboardInterrupt:
                logger.info("收到中断信号,正在停止...")
                break
            except Exception as e:
                logger.error(f"调度器运行异常: {e}")
                # 出错后等待一段时间再重试
                await asyncio.sleep(60)
                
        logger.info("邮件简报调度器已停止")

8. 主程序入口

main.py

import asyncio
import sys
import os
from pathlib import Path

# 添加项目根目录到Python路径
sys.path.insert(0, str(Path(__file__).parent))

from services.scheduler import EmailDigestScheduler
from utils.logger import setup_logging
from config.settings import settings

def main():
    # 设置日志
    log_dir = settings.BASE_DIR / 'logs'
    log_dir.mkdir(exist_ok=True)
    setup_logging()
    
    print("📧 邮件简报系统启动中...")
    print(f"邮箱地址: {settings.EMAIL_ADDRESS}")
    print(f"检查间隔: {settings.CHECK_INTERVAL_MINUTES} 分钟")
    print(f"最大处理邮件数: {settings.MAX_EMAILS_PER_RUN}")
    print("-" * 50)
    
    # 创建调度器并运行
    scheduler = EmailDigestScheduler()
    
    try:
        asyncio.run(scheduler.run_scheduler())
    except KeyboardInterrupt:
        print("\n👋 系统已停止")
    except Exception as e:
        print(f"❌ 系统异常: {e}")
        sys.exit(1)

if __name__ == "__main__":
    main()

9. 工具函数

utils/logger.py

import logging
import sys
from pathlib import Path

from config.settings import settings

def setup_logging():
    """设置日志配置"""
    log_dir = settings.BASE_DIR / 'logs'
    log_dir.mkdir(exist_ok=True)
    
    # 创建日志格式
    formatter = logging.Formatter(
        '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
    )
    
    # 文件处理器
    file_handler = logging.FileHandler(settings.LOG_FILE)
    file_handler.setFormatter(formatter)
    file_handler.setLevel(getattr(logging, settings.LOG_LEVEL))
    
    # 控制台处理器
    console_handler = logging.StreamHandler(sys.stdout)
    console_handler.setFormatter(formatter)
    console_handler.setLevel(getattr(logging, settings.LOG_LEVEL))
    
    # 根日志器
    root_logger = logging.getLogger()
    root_logger.setLevel(getattr(logging, settings.LOG_LEVEL))
    root_logger.addHandler(file_handler)
    root_logger.addHandler(console_handler)

def get_logger(name: str) -> logging.Logger:
    """获取日志器"""
    return logging.getLogger(name)

utils/security.py

import base64
from cryptography.fernet import Fernet
import os

class SecurityManager:
    @staticmethod
    def generate_key():
        """生成加密密钥"""
        return Fernet.generate_key()
        
    @staticmethod
    def encrypt_data(data: str, key: bytes) -> str:
        """加密数据"""
        f = Fernet(key)
        encrypted_data = f.encrypt(data.encode())
        return base64.urlsafe_b64encode(encrypted_data).decode()
        
    @staticmethod
    def decrypt_data(encrypted_data: str, key: bytes) -> str:
        """解密数据"""
        f = Fernet(key)
        encrypted_bytes = base64.urlsafe_b64decode(encrypted_data.encode())
        decrypted_data = f.decrypt(encrypted_bytes)
        return decrypted_data.decode()

10. 依赖文件

requirements.txt

aiohttp==3.8.5
python-dotenv==1.0.0
jinja2==3.1.2
cryptography==41.0.4
imaplib2==3.6

四、部署说明

1. 环境准备

# 克隆项目
git clone https://github.com/your-username/email-digest-system.git
cd email-digest-system

# 创建虚拟环境
python -m venv venv
source venv/bin/activate  # Linux/Mac
# venv\Scripts\activate  # Windows

# 安装依赖
pip install -r requirements.txt

2. 配置环境变量

# 复制环境变量模板
cp .env.example .env

# 编辑 .env 文件,填入实际配置
nano .env

重要配置说明:

  • EMAIL_PASSWORD: 对于Gmail,需要使用应用专用密码而非账户密码
  • GLM_API_KEY: 从智谱AI开放平台获取API密钥
  • EMAIL_OAUTH2: 如需使用OAuth2认证,设为true并实现相应逻辑

3. 运行应用

# 直接运行
python main.py

# 后台运行(Linux/Mac)
nohup python main.py > app.log 2>&1 &

# 使用screen运行
screen -S email-digest
python main.py
# Ctrl+A, D 退出screen

4. Docker部署

Dockerfile

FROM python:3.9-slim

WORKDIR /app

COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

COPY . .

RUN mkdir -p /app/logs /app/cache /app/output

CMD ["python", "main.py"]

docker-compose.yml

version: '3.8'
services:
  email-digest:
    build: .
    environment:
      - EMAIL_HOST=${EMAIL_HOST}
      - EMAIL_PORT=${EMAIL_PORT}
      - EMAIL_ADDRESS=${EMAIL_ADDRESS}
      - EMAIL_PASSWORD=${EMAIL_PASSWORD}
      - GLM_API_KEY=${GLM_API_KEY}
      - CHECK_INTERVAL_MINUTES=${CHECK_INTERVAL_MINUTES}
    volumes:
      - ./logs:/app/logs
      - ./cache:/app/cache
      - ./output:/app/output
    restart: unless-stopped

Docker部署命令:

# 构建镜像
docker-compose build

# 启动服务
docker-compose up -d

# 查看日志
docker-compose logs -f

5. 系统服务部署(Linux)

创建systemd服务文件 /etc/systemd/system/email-digest.service

[Unit]
Description=Email Digest Service
After=network.target

[Service]
Type=simple
User=your-username
WorkingDirectory=/path/to/email-digest-system
ExecStart=/path/to/email-digest-system/venv/bin/python main.py
Restart=always
RestartSec=10

[Install]
WantedBy=multi-user.target

启用并启动服务:

sudo systemctl daemon-reload
sudo systemctl enable email-digest.service
sudo systemctl start email-digest.service
sudo systemctl status email-digest.service

五、安全注意事项

  1. 邮箱密码安全:建议使用应用专用密码,不要使用主账户密码
  2. API密钥保护:GLM API密钥不要硬编码,使用环境变量
  3. 网络访问控制:如部署在服务器上,限制外部访问
  4. 数据加密:敏感数据可使用SecurityManager进行加密存储
  5. 定期更新:保持依赖库更新,修复安全漏洞

六、扩展功能建议

  1. 多邮箱支持:配置多个邮箱账户
  2. 邮件分类:根据发件人或主题自动分类
  3. 通知推送:通过企业微信、钉钉等推送简报
  4. Web界面:提供Web界面查看和管理简报
  5. 统计分析:邮件数量、处理时间等统计信息
  6. 自定义模板:支持用户自定义简报模板

这个解决方案提供了完整的邮件自动收取、GLM大模型总结和简报生成功能,具有良好的安全性、可扩展性和易用性。

Logo

更多推荐