方案和选型:基于Python的邮件自动收取与GLM大模型邮件简报系统
本文提出了一种基于Python和GLM大模型的邮件自动处理系统,通过三种技术方案对比,最终选择OAuth2认证+异步处理+缓存机制作为最优方案。系统架构包含邮件获取、处理、简报生成等模块,采用分层设计实现安全高效的邮件管理。核心代码展示了IMAP连接、邮件模型和异步处理实现,支持定时自动收取、智能摘要生成和HTML格式简报输出,适用于个人和企业级邮件自动化处理场景。
·
基于Python的邮件自动收取与GLM大模型邮件简报系统
一、解决方案思路
方案1:本地IMAP + GLM API + 定时任务
- 架构:Python脚本定期连接邮箱IMAP服务器 → 获取邮件 → 调用GLM API进行总结 → 生成简报
- 优点:实现简单,依赖少,成本低
- 缺点:需要暴露邮箱密码,安全性较低,无法处理大量邮件
方案2:OAuth2认证 + 异步处理 + 缓存机制
- 架构:使用OAuth2安全认证 → 异步获取邮件 → 本地缓存避免重复处理 → 批量调用GLM API
- 优点:安全性高,支持大规模邮件处理,性能较好
- 缺点:实现复杂度高,需要配置OAuth2
方案3:微服务架构 + 消息队列 + 容器化部署
- 架构:邮件收取服务 + 处理服务 + 简报生成服务,通过消息队列解耦,容器化部署
- 优点:可扩展性强,高可用,易于维护
- 缺点:架构复杂,资源消耗大,适合企业级应用
方案对比分析
方案 | 安全性 | 复杂度 | 性能 | 成本 | 适用场景 |
---|---|---|---|---|---|
方案1 | 低 | 低 | 中 | 低 | 个人使用、原型验证 |
方案2 | 高 | 中 | 高 | 中 | 中小企业、生产环境 |
方案3 | 高 | 高 | 很高 | 高 | 大型企业、高并发场景 |
选择方案2作为最优方案:在安全性、复杂度和性能之间取得最佳平衡,适合大多数实际应用场景。
二、完整项目结构
email-digest-system/
├── config/
│ ├── __init__.py
│ ├── settings.py
│ └── email_config.py
├── core/
│ ├── __init__.py
│ ├── email_fetcher.py
│ ├── email_processor.py
│ ├── glm_client.py
│ └── digest_generator.py
├── models/
│ ├── __init__.py
│ └── email_model.py
├── utils/
│ ├── __init__.py
│ ├── cache_manager.py
│ ├── logger.py
│ └── security.py
├── services/
│ ├── __init__.py
│ └── scheduler.py
├── tests/
│ ├── __init__.py
│ └── test_email_fetcher.py
├── templates/
│ └── digest_template.html
├── .env
├── requirements.txt
├── main.py
├── README.md
└── docker-compose.yml
三、核心代码实现
1. 配置文件
config/settings.py
import os
from pathlib import Path
# 项目根目录
BASE_DIR = Path(__file__).resolve().parent.parent
# 环境变量加载
from dotenv import load_dotenv
load_dotenv(BASE_DIR / '.env')
class Settings:
# 邮箱配置
EMAIL_HOST = os.getenv('EMAIL_HOST', 'imap.gmail.com')
EMAIL_PORT = int(os.getenv('EMAIL_PORT', '993'))
EMAIL_ADDRESS = os.getenv('EMAIL_ADDRESS')
EMAIL_PASSWORD = os.getenv('EMAIL_PASSWORD')
EMAIL_OAUTH2 = os.getenv('EMAIL_OAUTH2', 'false').lower() == 'true'
# GLM配置
GLM_API_KEY = os.getenv('GLM_API_KEY')
GLM_MODEL = os.getenv('GLM_MODEL', 'glm-4')
GLM_BASE_URL = os.getenv('GLM_BASE_URL', 'https://open.bigmodel.cn/api/paas/v4')
# 缓存配置
CACHE_DIR = BASE_DIR / 'cache'
CACHE_EXPIRE_HOURS = int(os.getenv('CACHE_EXPIRE_HOURS', '24'))
# 调度配置
CHECK_INTERVAL_MINUTES = int(os.getenv('CHECK_INTERVAL_MINUTES', '30'))
MAX_EMAILS_PER_RUN = int(os.getenv('MAX_EMAILS_PER_RUN', '50'))
# 日志配置
LOG_LEVEL = os.getenv('LOG_LEVEL', 'INFO')
LOG_FILE = BASE_DIR / 'logs' / 'email_digest.log'
settings = Settings()
.env 示例
# 邮箱配置
EMAIL_HOST=imap.gmail.com
EMAIL_PORT=993
EMAIL_ADDRESS=your_email@gmail.com
EMAIL_PASSWORD=your_app_password
EMAIL_OAUTH2=false
# GLM配置
GLM_API_KEY=your_glm_api_key
GLM_MODEL=glm-4
GLM_BASE_URL=https://open.bigmodel.cn/api/paas/v4
# 应用配置
CHECK_INTERVAL_MINUTES=30
MAX_EMAILS_PER_RUN=20
CACHE_EXPIRE_HOURS=24
LOG_LEVEL=INFO
2. 邮件模型
models/email_model.py
from dataclasses import dataclass
from datetime import datetime
from typing import Optional, List
@dataclass
class Email:
id: str
subject: str
sender: str
recipients: List[str]
date: datetime
body: str
body_html: Optional[str] = None
attachments: List[str] = None
summary: Optional[str] = None
def __post_init__(self):
if self.attachments is None:
self.attachments = []
3. 邮件获取器
core/email_fetcher.py
import imaplib
import email
from email.header import decode_header
import time
from typing import List, Optional
from datetime import datetime, timedelta
import logging
from config.settings import settings
from models.email_model import Email
from utils.logger import get_logger
logger = get_logger(__name__)
class EmailFetcher:
def __init__(self):
self.imap_server = None
self.connected = False
def connect(self) -> bool:
"""连接到IMAP服务器"""
try:
self.imap_server = imaplib.IMAP4_SSL(
settings.EMAIL_HOST,
settings.EMAIL_PORT
)
self.imap_server.login(settings.EMAIL_ADDRESS, settings.EMAIL_PASSWORD)
self.connected = True
logger.info(f"成功连接到邮箱: {settings.EMAIL_ADDRESS}")
return True
except Exception as e:
logger.error(f"连接邮箱失败: {e}")
return False
def disconnect(self):
"""断开连接"""
if self.imap_server and self.connected:
self.imap_server.logout()
self.connected = False
logger.info("邮箱连接已断开")
def _decode_mime_words(self, s: str) -> str:
"""解码MIME编码的字符串"""
decoded_fragments = decode_header(s)
fragments = []
for fragment, encoding in decoded_fragments:
if isinstance(fragment, bytes):
if encoding:
fragment = fragment.decode(encoding)
else:
fragment = fragment.decode('utf-8', errors='ignore')
fragments.append(fragment)
return ''.join(fragments)
def _get_email_body(self, msg) -> tuple:
"""提取邮件正文"""
body = ""
body_html = ""
if msg.is_multipart():
for part in msg.walk():
content_type = part.get_content_type()
content_disposition = str(part.get("Content-Disposition"))
if "attachment" not in content_disposition:
charset = part.get_content_charset() or 'utf-8'
try:
payload = part.get_payload(decode=True)
if payload:
text = payload.decode(charset, errors='ignore')
if content_type == "text/plain":
body += text
elif content_type == "text/html":
body_html += text
except Exception as e:
logger.warning(f"解析邮件正文失败: {e}")
else:
charset = msg.get_content_charset() or 'utf-8'
try:
payload = msg.get_payload(decode=True)
if payload:
body = payload.decode(charset, errors='ignore')
except Exception as e:
logger.warning(f"解析邮件正文失败: {e}")
return body, body_html
def fetch_emails(self, since_days: int = 1) -> List[Email]:
"""获取指定天数内的邮件"""
if not self.connected:
if not self.connect():
return []
try:
# 选择收件箱
self.imap_server.select('INBOX')
# 计算日期范围
since_date = (datetime.now() - timedelta(days=since_days)).strftime("%d-%b-%Y")
search_criteria = f'(SINCE "{since_date}")'
# 搜索邮件
status, messages = self.imap_server.search(None, search_criteria)
if status != 'OK':
logger.error("搜索邮件失败")
return []
email_ids = messages[0].split()
logger.info(f"找到 {len(email_ids)} 封邮件")
# 限制处理数量
email_ids = email_ids[-settings.MAX_EMAILS_PER_RUN:] if email_ids else []
emails = []
for email_id in email_ids:
try:
# 获取邮件
status, msg_data = self.imap_server.fetch(email_id, '(RFC822)')
if status != 'OK':
continue
msg = email.message_from_bytes(msg_data[0][1])
# 解析邮件信息
subject = self._decode_mime_words(msg.get("Subject", ""))
sender = self._decode_mime_words(msg.get("From", ""))
recipients = [self._decode_mime_words(r) for r in msg.get_all("To", [])]
date_str = msg.get("Date", "")
try:
email_date = email.utils.parsedate_to_datetime(date_str)
except:
email_date = datetime.now()
body, body_html = self._get_email_body(msg)
email_obj = Email(
id=email_id.decode('utf-8'),
subject=subject,
sender=sender,
recipients=recipients,
date=email_date,
body=body[:10000], # 限制正文长度
body_html=body_html[:10000] if body_html else None
)
emails.append(email_obj)
except Exception as e:
logger.error(f"解析邮件 {email_id} 失败: {e}")
continue
return emails
except Exception as e:
logger.error(f"获取邮件失败: {e}")
return []
def __enter__(self):
self.connect()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.disconnect()
4. GLM客户端
core/glm_client.py
import asyncio
import aiohttp
import json
from typing import Optional, List
import logging
from config.settings import settings
from utils.logger import get_logger
logger = get_logger(__name__)
class GLMClient:
def __init__(self):
self.api_key = settings.GLM_API_KEY
self.base_url = settings.GLM_BASE_URL
self.model = settings.GLM_MODEL
async def summarize_email(self, email_content: str, subject: str = "") -> Optional[str]:
"""使用GLM模型总结邮件内容"""
if not self.api_key:
logger.error("GLM API key 未配置")
return None
headers = {
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json"
}
# 构建提示词
prompt = f"""
请为以下邮件生成简洁的中文摘要,突出重点信息:
邮件主题:{subject}
邮件内容:{email_content}
要求:
1. 摘要控制在100字以内
2. 突出关键信息和行动项
3. 语言简洁明了
"""
payload = {
"model": self.model,
"messages": [
{"role": "user", "content": prompt}
],
"temperature": 0.3,
"max_tokens": 150
}
try:
async with aiohttp.ClientSession() as session:
async with session.post(
f"{self.base_url}/chat/completions",
headers=headers,
json=payload,
timeout=aiohttp.ClientTimeout(total=30)
) as response:
if response.status == 200:
result = await response.json()
summary = result['choices'][0]['message']['content'].strip()
return summary
else:
error_text = await response.text()
logger.error(f"GLM API 调用失败: {response.status} - {error_text}")
return None
except Exception as e:
logger.error(f"GLM API 调用异常: {e}")
return None
async def batch_summarize(self, emails: List) -> List:
"""批量总结邮件"""
tasks = []
for email_obj in emails:
if email_obj.body.strip():
task = self.summarize_email(email_obj.body, email_obj.subject)
tasks.append((email_obj, task))
else:
email_obj.summary = "邮件内容为空"
if tasks:
results = await asyncio.gather(*[task for _, task in tasks], return_exceptions=True)
for i, (email_obj, _) in enumerate(tasks):
if isinstance(results[i], Exception):
email_obj.summary = f"总结失败: {str(results[i])}"
else:
email_obj.summary = results[i] or "总结失败"
return emails
5. 缓存管理器
utils/cache_manager.py
import json
import os
import time
from pathlib import Path
from typing import Optional, Dict, Any
from datetime import datetime, timedelta
from config.settings import settings
from utils.logger import get_logger
logger = get_logger(__name__)
class CacheManager:
def __init__(self):
self.cache_dir = settings.CACHE_DIR
self.cache_dir.mkdir(exist_ok=True)
self.cache_file = self.cache_dir / 'processed_emails.json'
self.cache_data = self._load_cache()
def _load_cache(self) -> Dict[str, Any]:
"""加载缓存数据"""
if self.cache_file.exists():
try:
with open(self.cache_file, 'r', encoding='utf-8') as f:
return json.load(f)
except Exception as e:
logger.warning(f"加载缓存失败: {e}")
return {}
def _save_cache(self):
"""保存缓存数据"""
try:
with open(self.cache_file, 'w', encoding='utf-8') as f:
json.dump(self.cache_data, f, ensure_ascii=False, indent=2)
except Exception as e:
logger.error(f"保存缓存失败: {e}")
def is_processed(self, email_id: str) -> bool:
"""检查邮件是否已处理"""
if email_id in self.cache_data:
processed_time = datetime.fromisoformat(self.cache_data[email_id])
expire_time = processed_time + timedelta(hours=settings.CACHE_EXPIRE_HOURS)
if datetime.now() < expire_time:
return True
else:
# 缓存过期,删除记录
del self.cache_data[email_id]
self._save_cache()
return False
def mark_processed(self, email_id: str):
"""标记邮件为已处理"""
self.cache_data[email_id] = datetime.now().isoformat()
self._save_cache()
def cleanup_expired(self):
"""清理过期缓存"""
current_time = datetime.now()
expired_ids = []
for email_id, processed_time_str in self.cache_data.items():
processed_time = datetime.fromisoformat(processed_time_str)
expire_time = processed_time + timedelta(hours=settings.CACHE_EXPIRE_HOURS)
if current_time >= expire_time:
expired_ids.append(email_id)
for email_id in expired_ids:
del self.cache_data[email_id]
if expired_ids:
self._save_cache()
logger.info(f"清理了 {len(expired_ids)} 个过期缓存记录")
6. 简报生成器
core/digest_generator.py
import json
from datetime import datetime
from typing import List
from jinja2 import Environment, FileSystemLoader
import os
from models.email_model import Email
from config.settings import settings
from utils.logger import get_logger
logger = get_logger(__name__)
class DigestGenerator:
def __init__(self):
template_dir = os.path.join(os.path.dirname(__file__), '..', 'templates')
self.env = Environment(loader=FileSystemLoader(template_dir))
def generate_text_digest(self, emails: List[Email]) -> str:
"""生成文本格式的简报"""
digest = f"邮件简报 - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n"
digest += "=" * 50 + "\n\n"
for i, email in enumerate(emails, 1):
digest += f"{i}. 主题: {email.subject}\n"
digest += f" 发件人: {email.sender}\n"
digest += f" 时间: {email.date.strftime('%Y-%m-%d %H:%M')}\n"
digest += f" 摘要: {email.summary or '无摘要'}\n"
digest += "-" * 50 + "\n"
return digest
def generate_html_digest(self, emails: List[Email]) -> str:
"""生成HTML格式的简报"""
try:
template = self.env.get_template('digest_template.html')
return template.render(
emails=emails,
generated_time=datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
total_count=len(emails)
)
except Exception as e:
logger.error(f"生成HTML简报失败: {e}")
return self.generate_text_digest(emails)
def save_digest(self, digest_content: str, format_type: str = 'html'):
"""保存简报到文件"""
output_dir = settings.BASE_DIR / 'output'
output_dir.mkdir(exist_ok=True)
filename = f"email_digest_{datetime.now().strftime('%Y%m%d_%H%M%S')}.{format_type}"
filepath = output_dir / filename
with open(filepath, 'w', encoding='utf-8') as f:
f.write(digest_content)
logger.info(f"简报已保存: {filepath}")
return filepath
templates/digest_template.html
<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>邮件简报</title>
<style>
body { font-family: Arial, sans-serif; margin: 20px; background-color: #f5f5f5; }
.container { max-width: 800px; margin: 0 auto; background: white; padding: 20px; border-radius: 10px; box-shadow: 0 2px 10px rgba(0,0,0,0.1); }
.header { text-align: center; margin-bottom: 30px; }
.header h1 { color: #333; margin-bottom: 10px; }
.header .time { color: #666; font-size: 14px; }
.email-item { border: 1px solid #e0e0e0; margin-bottom: 20px; padding: 15px; border-radius: 5px; }
.email-item h3 { color: #2c3e50; margin-top: 0; }
.email-meta { color: #7f8c8d; font-size: 12px; margin-bottom: 10px; }
.email-summary { color: #34495e; line-height: 1.5; }
.total-count { text-align: right; color: #7f8c8d; margin-top: 20px; }
</style>
</head>
<body>
<div class="container">
<div class="header">
<h1>📧 邮件简报</h1>
<div class="time">生成时间: {{ generated_time }}</div>
</div>
{% for email in emails %}
<div class="email-item">
<h3>{{ loop.index }}. {{ email.subject }}</h3>
<div class="email-meta">
发件人: {{ email.sender }} |
时间: {{ email.date.strftime('%Y-%m-%d %H:%M') }}
</div>
<div class="email-summary">
{% if email.summary %}
{{ email.summary }}
{% else %}
<em>摘要生成失败</em>
{% endif %}
</div>
</div>
{% endfor %}
<div class="total-count">共 {{ total_count }} 封邮件</div>
</div>
</body>
</html>
7. 主调度服务
services/scheduler.py
import asyncio
import time
from datetime import datetime
import logging
from core.email_fetcher import EmailFetcher
from core.glm_client import GLMClient
from core.digest_generator import DigestGenerator
from utils.cache_manager import CacheManager
from utils.logger import get_logger
from config.settings import settings
logger = get_logger(__name__)
class EmailDigestScheduler:
def __init__(self):
self.fetcher = EmailFetcher()
self.glm_client = GLMClient()
self.digest_generator = DigestGenerator()
self.cache_manager = CacheManager()
async def process_emails(self):
"""处理邮件并生成简报"""
logger.info("开始处理邮件...")
# 获取邮件
with self.fetcher as fetcher:
emails = fetcher.fetch_emails(since_days=1)
if not emails:
logger.info("没有新邮件需要处理")
return
# 过滤已处理的邮件
new_emails = []
for email in emails:
if not self.cache_manager.is_processed(email.id):
new_emails.append(email)
else:
logger.debug(f"邮件 {email.id} 已处理,跳过")
if not new_emails:
logger.info("没有新的未处理邮件")
return
logger.info(f"发现 {len(new_emails)} 封新邮件,开始生成摘要...")
# 生成摘要
summarized_emails = await self.glm_client.batch_summarize(new_emails)
# 标记为已处理
for email in summarized_emails:
self.cache_manager.mark_processed(email.id)
# 生成简报
html_digest = self.digest_generator.generate_html_digest(summarized_emails)
text_digest = self.digest_generator.generate_text_digest(summarized_emails)
# 保存简报
html_file = self.digest_generator.save_digest(html_digest, 'html')
text_file = self.digest_generator.save_digest(text_digest, 'txt')
logger.info(f"简报生成完成!HTML: {html_file}, TXT: {text_file}")
async def run_scheduler(self):
"""运行调度器"""
logger.info("邮件简报调度器启动")
while True:
try:
await self.process_emails()
# 等待下次执行
logger.info(f"等待 {settings.CHECK_INTERVAL_MINUTES} 分钟后再次检查...")
await asyncio.sleep(settings.CHECK_INTERVAL_MINUTES * 60)
except KeyboardInterrupt:
logger.info("收到中断信号,正在停止...")
break
except Exception as e:
logger.error(f"调度器运行异常: {e}")
# 出错后等待一段时间再重试
await asyncio.sleep(60)
logger.info("邮件简报调度器已停止")
8. 主程序入口
main.py
import asyncio
import sys
import os
from pathlib import Path
# 添加项目根目录到Python路径
sys.path.insert(0, str(Path(__file__).parent))
from services.scheduler import EmailDigestScheduler
from utils.logger import setup_logging
from config.settings import settings
def main():
# 设置日志
log_dir = settings.BASE_DIR / 'logs'
log_dir.mkdir(exist_ok=True)
setup_logging()
print("📧 邮件简报系统启动中...")
print(f"邮箱地址: {settings.EMAIL_ADDRESS}")
print(f"检查间隔: {settings.CHECK_INTERVAL_MINUTES} 分钟")
print(f"最大处理邮件数: {settings.MAX_EMAILS_PER_RUN}")
print("-" * 50)
# 创建调度器并运行
scheduler = EmailDigestScheduler()
try:
asyncio.run(scheduler.run_scheduler())
except KeyboardInterrupt:
print("\n👋 系统已停止")
except Exception as e:
print(f"❌ 系统异常: {e}")
sys.exit(1)
if __name__ == "__main__":
main()
9. 工具函数
utils/logger.py
import logging
import sys
from pathlib import Path
from config.settings import settings
def setup_logging():
"""设置日志配置"""
log_dir = settings.BASE_DIR / 'logs'
log_dir.mkdir(exist_ok=True)
# 创建日志格式
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
# 文件处理器
file_handler = logging.FileHandler(settings.LOG_FILE)
file_handler.setFormatter(formatter)
file_handler.setLevel(getattr(logging, settings.LOG_LEVEL))
# 控制台处理器
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setFormatter(formatter)
console_handler.setLevel(getattr(logging, settings.LOG_LEVEL))
# 根日志器
root_logger = logging.getLogger()
root_logger.setLevel(getattr(logging, settings.LOG_LEVEL))
root_logger.addHandler(file_handler)
root_logger.addHandler(console_handler)
def get_logger(name: str) -> logging.Logger:
"""获取日志器"""
return logging.getLogger(name)
utils/security.py
import base64
from cryptography.fernet import Fernet
import os
class SecurityManager:
@staticmethod
def generate_key():
"""生成加密密钥"""
return Fernet.generate_key()
@staticmethod
def encrypt_data(data: str, key: bytes) -> str:
"""加密数据"""
f = Fernet(key)
encrypted_data = f.encrypt(data.encode())
return base64.urlsafe_b64encode(encrypted_data).decode()
@staticmethod
def decrypt_data(encrypted_data: str, key: bytes) -> str:
"""解密数据"""
f = Fernet(key)
encrypted_bytes = base64.urlsafe_b64decode(encrypted_data.encode())
decrypted_data = f.decrypt(encrypted_bytes)
return decrypted_data.decode()
10. 依赖文件
requirements.txt
aiohttp==3.8.5
python-dotenv==1.0.0
jinja2==3.1.2
cryptography==41.0.4
imaplib2==3.6
四、部署说明
1. 环境准备
# 克隆项目
git clone https://github.com/your-username/email-digest-system.git
cd email-digest-system
# 创建虚拟环境
python -m venv venv
source venv/bin/activate # Linux/Mac
# venv\Scripts\activate # Windows
# 安装依赖
pip install -r requirements.txt
2. 配置环境变量
# 复制环境变量模板
cp .env.example .env
# 编辑 .env 文件,填入实际配置
nano .env
重要配置说明:
EMAIL_PASSWORD
: 对于Gmail,需要使用应用专用密码而非账户密码GLM_API_KEY
: 从智谱AI开放平台获取API密钥EMAIL_OAUTH2
: 如需使用OAuth2认证,设为true并实现相应逻辑
3. 运行应用
# 直接运行
python main.py
# 后台运行(Linux/Mac)
nohup python main.py > app.log 2>&1 &
# 使用screen运行
screen -S email-digest
python main.py
# Ctrl+A, D 退出screen
4. Docker部署
Dockerfile
FROM python:3.9-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
RUN mkdir -p /app/logs /app/cache /app/output
CMD ["python", "main.py"]
docker-compose.yml
version: '3.8'
services:
email-digest:
build: .
environment:
- EMAIL_HOST=${EMAIL_HOST}
- EMAIL_PORT=${EMAIL_PORT}
- EMAIL_ADDRESS=${EMAIL_ADDRESS}
- EMAIL_PASSWORD=${EMAIL_PASSWORD}
- GLM_API_KEY=${GLM_API_KEY}
- CHECK_INTERVAL_MINUTES=${CHECK_INTERVAL_MINUTES}
volumes:
- ./logs:/app/logs
- ./cache:/app/cache
- ./output:/app/output
restart: unless-stopped
Docker部署命令:
# 构建镜像
docker-compose build
# 启动服务
docker-compose up -d
# 查看日志
docker-compose logs -f
5. 系统服务部署(Linux)
创建systemd服务文件 /etc/systemd/system/email-digest.service
:
[Unit]
Description=Email Digest Service
After=network.target
[Service]
Type=simple
User=your-username
WorkingDirectory=/path/to/email-digest-system
ExecStart=/path/to/email-digest-system/venv/bin/python main.py
Restart=always
RestartSec=10
[Install]
WantedBy=multi-user.target
启用并启动服务:
sudo systemctl daemon-reload
sudo systemctl enable email-digest.service
sudo systemctl start email-digest.service
sudo systemctl status email-digest.service
五、安全注意事项
- 邮箱密码安全:建议使用应用专用密码,不要使用主账户密码
- API密钥保护:GLM API密钥不要硬编码,使用环境变量
- 网络访问控制:如部署在服务器上,限制外部访问
- 数据加密:敏感数据可使用SecurityManager进行加密存储
- 定期更新:保持依赖库更新,修复安全漏洞
六、扩展功能建议
- 多邮箱支持:配置多个邮箱账户
- 邮件分类:根据发件人或主题自动分类
- 通知推送:通过企业微信、钉钉等推送简报
- Web界面:提供Web界面查看和管理简报
- 统计分析:邮件数量、处理时间等统计信息
- 自定义模板:支持用户自定义简报模板
这个解决方案提供了完整的邮件自动收取、GLM大模型总结和简报生成功能,具有良好的安全性、可扩展性和易用性。
更多推荐
所有评论(0)