监控邮箱/邮箱自动回复/python
·
主题:QQ邮箱的实时监控和自动回复
实现QQ邮箱的实时监控和自动回复
思路(代码):
1. 获取QQ邮箱授权码
只有开启了QQ邮箱的IMAP SMTP服务,才能
路径:登录QQ邮箱->设置->账号与安全->开启 IMAP/SMTP服务


大概在这个位置你会看到一个 ““POP3/IMAP/SMTP/Exchange/CardDAV/CalDAV服务”类似的内容,点击下方的“开启服务”,我这边因为已经开启过了所以看不到
补充(了解即可):


开启 IMAP/SMTP服务,按照提示生成一串16位的授权码,务必保存好

2. 读取配置文件
创建配置文件:

读取配置文件:
def load_config(config_file):
"""加载配置文件(如果需要)"""
try:
with open(config_file, 'r', encoding='utf-8') as f:
config = {}
for line in f:
if line.strip() and not line.startswith('#'):
key, value = line.split('=', 1)
config[key.strip()] = value.strip()
return config
except FileNotFoundError:
print(f"配置文件 {config_file} 未找到。使用默认配置。")
return {}
CONFIG = load_config(r'./QQ_mail.txt')
# ---------- 配置区域 ----------
IMAP_SERVER = CONFIG.get('IMAP_SERVER', 'imap.qq.com')
EMAIL_ACCOUNT = CONFIG.get('EMAIL_ACCOUNT') # 你的QQ邮箱
EMAIL_PASSWORD = CONFIG.get('EMAIL_PASSWORD') # 替换成16位授权码
CHECK_INTERVAL = CONFIG.get('CHECK_INTERVAL', 180) # 检查间隔(秒)
SMTP_SERVER = CONFIG.get('SMTP_SERVER', 'smtp.qq.com')
# ---------------------------
3. 解码邮件头部信息
def decode_email_header(header_value):
"""解码邮件头部信息(处理中文乱码)"""
if header_value is None:
return ""
decoded_parts = []
for part, encoding in decode_header(header_value):
if isinstance(part, bytes):
try:
if encoding:
decoded_parts.append(part.decode(encoding))
else:
decoded_parts.append(part.decode('utf-8', errors='ignore'))
except:
try:
decoded_parts.append(part.decode('gbk', errors='ignore'))
except:
decoded_parts.append(part.decode('utf-8', errors='ignore'))
else:
decoded_parts.append(part)
return ''.join(decoded_parts)
4. 从发件人字符串中提取邮箱地址
def get_sender_email(sender_string):
"""从发件人字符串中提取邮箱地址"""
# 匹配邮箱地址的正则表达式
email_pattern = r'[\w\.-]+@[\w\.-]+\.\w+'
match = re.search(email_pattern, sender_string)
if match:
return match.group(0)
return sender_string
5. 判断哪些邮箱应该自动回复
有的邮件是系统邮件,可以不用回复
def should_auto_reply(sender_email):
"""判断是否应该自动回复"""
# 检查黑名单
if sender_email in BLACKLIST:
return False
# 检查是否是系统邮件
system_keywords = ['postmaster', 'mailer-daemon', 'noreply', 'no-reply', 'service']
sender_lower = sender_email.lower()
for keyword in system_keywords:
if keyword in sender_lower:
return False
# 检查是否是自己的邮件
if sender_email == EMAIL_ACCOUNT:
return False
return True
6. 提取邮件正文
def get_email_body(msg):
"""提取邮件正文"""
body = ""
try:
if msg.is_multipart():
for part in msg.walk():
content_type = part.get_content_type()
content_disposition = str(part.get("Content-Disposition"))
# 只提取纯文本内容,跳过附件
if content_type == "text/plain" and "attachment" not in content_disposition:
try:
payload = part.get_payload(decode=True)
if payload:
body = payload.decode('utf-8', errors='ignore')
break
except:
try:
body = part.get_payload(decode=True).decode('gbk', errors='ignore')
break
except:
pass
else:
# 非 multipart 邮件
payload = msg.get_payload(decode=True)
if payload:
try:
body = payload.decode('utf-8', errors='ignore')
except:
body = payload.decode('gbk', errors='ignore')
except Exception as e:
print(f" 提取正文时出错: {e}")
return body[:500] # 限制长度
7. 生成回复内容
def generate_reply_content(original_sender, original_subject, original_content=""):
"""生成回复内容"""
reply = f"""您好,
感谢您的来信!
我是自动回复机器人,已收到您的邮件:
主题:{original_subject}
您的邮件对我们很重要,我会尽快人工回复您。
祝好!
{EMAIL_ACCOUNT}
(自动回复,请勿直接回复此邮件)"""
return reply
8. 发送回复邮件
def send_reply(to_email, subject, content):
"""发送回复邮件 - 增强错误处理"""
try:
# 创建邮件对象
msg = MIMEMultipart()
msg['From'] = EMAIL_ACCOUNT
msg['To'] = to_email
msg['Subject'] = f"Re: {subject}"
# 添加邮件正文
msg.attach(MIMEText(content, 'plain', 'utf-8'))
# 使用更稳定的SMTP连接方式
server = smtplib.SMTP_SSL(SMTP_SERVER, 465, timeout=30)
# 添加调试信息(可选)
# server.set_debuglevel(1)
# 登录
server.login(EMAIL_ACCOUNT, EMAIL_PASSWORD)
# 发送邮件
server.send_message(msg)
# 关闭连接
server.quit()
print(f" ✓ 已回复: {to_email}")
return True
except smtplib.SMTPServerDisconnected as e:
print(f" ✗ SMTP连接断开: {e}")
return False
except smtplib.SMTPAuthenticationError as e:
print(f" ✗ 认证失败,请检查授权码: {e}")
return False
except Exception as e:
print(f" ✗ 发送失败: {e}")
return False
def mark_as_replied(mail, email_id):
"""标记邮件为已处理"""
try:
# 标记为已读
mail.store(email_id, '+FLAGS', '\\Seen')
print(f" 已标记邮件为已读")
except Exception as e:
print(f" 标记失败: {e}")
9. 获取未读邮件并逐一回复
def process_and_reply():
"""主函数:获取未读邮件并逐一回复"""
try:
# 连接IMAP服务器
print("\n🔌 正在连接IMAP服务器...")
mail = imaplib.IMAP4_SSL(IMAP_SERVER, timeout=30)
mail.login(EMAIL_ACCOUNT, EMAIL_PASSWORD)
mail.select('INBOX')
# 搜索未读邮件
status, messages = mail.search(None, 'UNSEEN')
if status != 'OK':
return 0
email_ids = messages[0].split()
if not email_ids:
print("📭 没有未读邮件")
return 0
print(f"\n📬 发现 {len(email_ids)} 封未读邮件")
# 过滤掉系统邮件和黑名单
valid_emails = []
for email_id in email_ids:
status, data = mail.fetch(email_id, '(BODY.PEEK[HEADER.FIELDS (FROM SUBJECT)])')
if status == 'OK':
raw_header = data[0][1]
msg = email.message_from_bytes(raw_header)
from_raw = msg.get('From', '')
sender_email = get_sender_email(from_raw)
if should_auto_reply(sender_email):
valid_emails.append(email_id)
print(f" ✓ 有效邮件: {sender_email}")
else:
print(f" ⏭ 跳过黑名单: {sender_email}")
# 标记为已读避免重复处理
mail.store(email_id, '+FLAGS', '\\Seen')
if not valid_emails:
print("📭 没有需要回复的有效邮件")
return 0
print(f"\n🤖 开始回复 {len(valid_emails)} 封有效邮件...")
replied_count = 0
for email_id in valid_emails:
print(f"\n--- 处理邮件 ID: {email_id.decode()} ---")
# 获取完整邮件内容
status, data = mail.fetch(email_id, '(RFC822)')
if status != 'OK':
print(f" 无法获取邮件内容")
continue
raw_email = data[0][1]
msg = email.message_from_bytes(raw_email)
# 提取邮件信息
from_raw = msg.get('From', '')
from_addr = decode_email_header(from_raw)
subject_raw = msg.get('Subject', '无主题')
subject = decode_email_header(subject_raw)
sender_email = get_sender_email(from_addr)
print(f" 发件人: {from_addr}")
print(f" 邮箱: {sender_email}")
print(f" 主题: {subject}")
# 提取邮件正文
body = get_email_body(msg)
# 生成回复内容
reply_content = generate_reply_content(sender_email, subject, body[:200])
# 发送回复
if send_reply(sender_email, subject, reply_content):
replied_count += 1
# 标记已处理
mark_as_replied(mail, email_id)
# 等待一下,避免发送过快
time.sleep(2)
print(f"\n✅ 处理完成!共回复 {replied_count} 封邮件")
return replied_count
except imaplib.IMAP4.error as e:
print(f"❌ IMAP连接错误: {e}")
return 0
except Exception as e:
print(f"❌ 处理过程中出错: {e}")
import traceback
traceback.print_exc()
return 0
finally:
try:
mail.close()
mail.logout()
except:
pass
10. 定时监控并自动回复
在循环内定时执行 process_and_reply函数
def auto_reply_loop():
"""持续监控并自动回复"""
print("=" * 60)
print(f"🤖 自动回复系统已启动")
print(f"📧 监控邮箱: {EMAIL_ACCOUNT}")
print(f"⏱️ 检查间隔: {CHECK_INTERVAL}秒")
print(f"🚫 黑名单: {', '.join(BLACKLIST)}")
print("=" * 60)
while True:
try:
process_and_reply()
print(f"\n⏰ 等待 {CHECK_INTERVAL} 秒后继续监控...")
time.sleep(CHECK_INTERVAL)
except KeyboardInterrupt:
print("\n\n👋 用户中断,自动回复系统已停止")
break
except Exception as e:
print(f"⚠️ 运行错误: {e}")
time.sleep(CHECK_INTERVAL)
11. 测试
if __name__ == '__main__':
print(load_config(r'./QQ_mail.txt') )
print(f"开始监控邮箱: {EMAIL_ACCOUNT}")
auto_reply_loop()
12. 程序运行结果
PS D:\workspace\email-automation> uv run python monitor.py
{'IMAP_SERVER': 'imap.qq.com', 'EMAIL_ACCOUNT': 'spz_0911@qq.com', 'EMAIL_PASSWORD': 'jiugrsrwpmddbadi'}
开始监控邮箱: spz_0911@qq.com
============================================================
🤖 自动回复系统已启动
📧 监控邮箱: spz_0911@qq.com
⏱️ 检查间隔: 180秒
🚫 黑名单: PostMaster@qq.com, mailer-daemon@qq.com, noreply@qq.com, service@qq.com, spz_0911@qq.com
============================================================
🔌 正在连接IMAP服务器...
📬 发现 32 封未读邮件
⏭ 跳过黑名单: noreply@redditmail.com
⏭ 跳过黑名单: noreply@redditmail.com
⏭ 跳过黑名单: noreply@redditmail.com
⏭ 跳过黑名单: noreply@redditmail.com
⏭ 跳过黑名单: noreply@redditmail.com
⏭ 跳过黑名单: noreply@redditmail.com
⏭ 跳过黑名单: noreply@redditmail.com
⏭ 跳过黑名单: noreply@redditmail.com
⏭ 跳过黑名单: noreply@redditmail.com
✓ 有效邮件: campus@mailf.wisdomore.com
✓ 有效邮件: campus@mailf.wisdomore.com
⏭ 跳过黑名单: noreply@redditmail.com
⏭ 跳过黑名单: service@quickjobs.51job.com
⏭ 跳过黑名单: noreply@redditmail.com
⏭ 跳过黑名单: noreply@redditmail.com
⏭ 跳过黑名单: noreply@redditmail.com
⏭ 跳过黑名单: noreply@redditmail.com
⏭ 跳过黑名单: noreply@redditmail.com
✓ 有效邮件: MyClaw@aisecret.us
⏭ 跳过黑名单: no-reply@github.com
✓ 有效邮件: hi@cursor.com
✓ 有效邮件: position-tracking@semrush.com
✓ 有效邮件: product@mailsend1.jijyun.cn
⏭ 跳过黑名单: PostMaster@qq.com
⏭ 跳过黑名单: PostMaster@qq.com
⏭ 跳过黑名单: PostMaster@qq.com
⏭ 跳过黑名单: PostMaster@qq.com
⏭ 跳过黑名单: PostMaster@qq.com
⏭ 跳过黑名单: PostMaster@qq.com
⏭ 跳过黑名单: PostMaster@qq.com
⏭ 跳过黑名单: PostMaster@qq.com
⏭ 跳过黑名单: PostMaster@qq.com
🤖 开始回复 6 封有效邮件...
--- 处理邮件 ID: 155 ---
发件人: 智联招聘 <campus@mailf.wisdomore.com>
邮箱: campus@mailf.wisdomore.com
主题: 尊敬的曾同学【智联推荐】盒马前置仓校招专项来啦!
✓ 已回复: campus@mailf.wisdomore.com
已标记邮件为已读
--- 处理邮件 ID: 156 ---
发件人: 智联招聘 <campus@mailf.wisdomore.com>
邮箱: campus@mailf.wisdomore.com
主题: 尊敬的曾同学【智联推荐】盒马前置仓校招专项来啦!
✓ 已回复: campus@mailf.wisdomore.com
已标记邮件为已读
--- 处理邮件 ID: 198 ---
发件人: MyClaw Newsletter <MyClaw@aisecret.us>
邮箱: MyClaw@aisecret.us
主题: 🦞 OpenClaw Instances Widely Exposed
✓ 已回复: MyClaw@aisecret.us
已标记邮件为已读
--- 处理邮件 ID: 274 ---
发件人: Cursor <hi@cursor.com>
邮箱: hi@cursor.com
主题: Here to help
✓ 已回复: hi@cursor.com
已标记邮件为已读
--- 处理邮件 ID: 278 ---
发件人: Semrush Position Tracking <position-tracking@semrush.com>
邮箱: position-tracking@semrush.com
主题: Position Tracking Update (originmattress.com.au)
✓ 已回复: position-tracking@semrush.com
已标记邮件为已读
--- 处理邮件 ID: 279 ---
发件人: 集简云 <product@mailsend1.jijyun.cn>
邮箱: product@mailsend1.jijyun.cn
主题: 欢迎加入集简云!
✓ 已回复: product@mailsend1.jijyun.cn
已标记邮件为已读
✅ 处理完成!共回复 6 封邮件
⏰ 等待 180 秒后继续监控...
完整代码
import imaplib
import smtplib
import time
import email
from email.header import decode_header
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
import re
def load_config(config_file):
"""加载配置文件(如果需要)"""
try:
with open(config_file, 'r', encoding='utf-8') as f:
config = {}
for line in f:
if line.strip() and not line.startswith('#'):
key, value = line.split('=', 1)
config[key.strip()] = value.strip()
return config
except FileNotFoundError:
print(f"配置文件 {config_file} 未找到。使用默认配置。")
return {}
CONFIG = load_config(r'./QQ_mail.txt')
# ---------- 配置区域 ----------
IMAP_SERVER = CONFIG.get('IMAP_SERVER', 'imap.qq.com')
EMAIL_ACCOUNT = CONFIG.get('EMAIL_ACCOUNT') # 你的QQ邮箱
EMAIL_PASSWORD = CONFIG.get('EMAIL_PASSWORD') # 替换成16位授权码
CHECK_INTERVAL = CONFIG.get('CHECK_INTERVAL', 180) # 检查间隔(秒)
SMTP_SERVER = CONFIG.get('SMTP_SERVER', 'smtp.qq.com')
# ---------------------------
BLACKLIST = [
'PostMaster@qq.com',
'mailer-daemon@qq.com',
'noreply@qq.com',
'service@qq.com',
'spz_0911@qq.com' # 自己的邮箱
]
def decode_email_header(header_value):
"""解码邮件头部信息(处理中文乱码)"""
if header_value is None:
return ""
decoded_parts = []
for part, encoding in decode_header(header_value):
if isinstance(part, bytes):
try:
if encoding:
decoded_parts.append(part.decode(encoding))
else:
decoded_parts.append(part.decode('utf-8', errors='ignore'))
except:
try:
decoded_parts.append(part.decode('gbk', errors='ignore'))
except:
decoded_parts.append(part.decode('utf-8', errors='ignore'))
else:
decoded_parts.append(part)
return ''.join(decoded_parts)
def get_sender_email(sender_string):
"""从发件人字符串中提取邮箱地址"""
# 匹配邮箱地址的正则表达式
email_pattern = r'[\w\.-]+@[\w\.-]+\.\w+'
match = re.search(email_pattern, sender_string)
if match:
return match.group(0)
return sender_string
def should_auto_reply(sender_email):
"""判断是否应该自动回复"""
# 检查黑名单
if sender_email in BLACKLIST:
return False
# 检查是否是系统邮件
system_keywords = ['postmaster', 'mailer-daemon', 'noreply', 'no-reply', 'service']
sender_lower = sender_email.lower()
for keyword in system_keywords:
if keyword in sender_lower:
return False
# 检查是否是自己的邮件
if sender_email == EMAIL_ACCOUNT:
return False
return True
def generate_reply_content(original_sender, original_subject, original_content=""):
"""生成回复内容"""
reply = f"""您好,
感谢您的来信!
我是自动回复机器人,已收到您的邮件:
主题:{original_subject}
您的邮件对我们很重要,我会尽快人工回复您。
祝好!
{EMAIL_ACCOUNT}
(自动回复,请勿直接回复此邮件)"""
return reply
def send_reply(to_email, subject, content):
"""发送回复邮件 - 增强错误处理"""
try:
# 创建邮件对象
msg = MIMEMultipart()
msg['From'] = EMAIL_ACCOUNT
msg['To'] = to_email
msg['Subject'] = f"Re: {subject}"
# 添加邮件正文
msg.attach(MIMEText(content, 'plain', 'utf-8'))
# 使用更稳定的SMTP连接方式
server = smtplib.SMTP_SSL(SMTP_SERVER, 465, timeout=30)
# 添加调试信息(可选)
# server.set_debuglevel(1)
# 登录
server.login(EMAIL_ACCOUNT, EMAIL_PASSWORD)
# 发送邮件
server.send_message(msg)
# 关闭连接
server.quit()
print(f" ✓ 已回复: {to_email}")
return True
except smtplib.SMTPServerDisconnected as e:
print(f" ✗ SMTP连接断开: {e}")
return False
except smtplib.SMTPAuthenticationError as e:
print(f" ✗ 认证失败,请检查授权码: {e}")
return False
except Exception as e:
print(f" ✗ 发送失败: {e}")
return False
def mark_as_replied(mail, email_id):
"""标记邮件为已处理"""
try:
# 标记为已读
mail.store(email_id, '+FLAGS', '\\Seen')
print(f" 已标记邮件为已读")
except Exception as e:
print(f" 标记失败: {e}")
def get_email_body(msg):
"""提取邮件正文"""
body = ""
try:
if msg.is_multipart():
for part in msg.walk():
content_type = part.get_content_type()
content_disposition = str(part.get("Content-Disposition"))
# 只提取纯文本内容,跳过附件
if content_type == "text/plain" and "attachment" not in content_disposition:
try:
payload = part.get_payload(decode=True)
if payload:
body = payload.decode('utf-8', errors='ignore')
break
except:
try:
body = part.get_payload(decode=True).decode('gbk', errors='ignore')
break
except:
pass
else:
# 非 multipart 邮件
payload = msg.get_payload(decode=True)
if payload:
try:
body = payload.decode('utf-8', errors='ignore')
except:
body = payload.decode('gbk', errors='ignore')
except Exception as e:
print(f" 提取正文时出错: {e}")
return body[:500] # 限制长度
def process_and_reply():
"""主函数:获取未读邮件并逐一回复"""
try:
# 连接IMAP服务器
print("\n🔌 正在连接IMAP服务器...")
mail = imaplib.IMAP4_SSL(IMAP_SERVER, timeout=30)
mail.login(EMAIL_ACCOUNT, EMAIL_PASSWORD)
mail.select('INBOX')
# 搜索未读邮件
status, messages = mail.search(None, 'UNSEEN')
if status != 'OK':
return 0
email_ids = messages[0].split()
if not email_ids:
print("📭 没有未读邮件")
return 0
print(f"\n📬 发现 {len(email_ids)} 封未读邮件")
# 过滤掉系统邮件和黑名单
valid_emails = []
for email_id in email_ids:
status, data = mail.fetch(email_id, '(BODY.PEEK[HEADER.FIELDS (FROM SUBJECT)])')
if status == 'OK':
raw_header = data[0][1]
msg = email.message_from_bytes(raw_header)
from_raw = msg.get('From', '')
sender_email = get_sender_email(from_raw)
if should_auto_reply(sender_email):
valid_emails.append(email_id)
print(f" ✓ 有效邮件: {sender_email}")
else:
print(f" ⏭ 跳过黑名单: {sender_email}")
# 标记为已读避免重复处理
mail.store(email_id, '+FLAGS', '\\Seen')
if not valid_emails:
print("📭 没有需要回复的有效邮件")
return 0
print(f"\n🤖 开始回复 {len(valid_emails)} 封有效邮件...")
replied_count = 0
for email_id in valid_emails:
print(f"\n--- 处理邮件 ID: {email_id.decode()} ---")
# 获取完整邮件内容
status, data = mail.fetch(email_id, '(RFC822)')
if status != 'OK':
print(f" 无法获取邮件内容")
continue
raw_email = data[0][1]
msg = email.message_from_bytes(raw_email)
# 提取邮件信息
from_raw = msg.get('From', '')
from_addr = decode_email_header(from_raw)
subject_raw = msg.get('Subject', '无主题')
subject = decode_email_header(subject_raw)
sender_email = get_sender_email(from_addr)
print(f" 发件人: {from_addr}")
print(f" 邮箱: {sender_email}")
print(f" 主题: {subject}")
# 提取邮件正文
body = get_email_body(msg)
# 生成回复内容
reply_content = generate_reply_content(sender_email, subject, body[:200])
# 发送回复
if send_reply(sender_email, subject, reply_content):
replied_count += 1
# 标记已处理
mark_as_replied(mail, email_id)
# 等待一下,避免发送过快
time.sleep(2)
print(f"\n✅ 处理完成!共回复 {replied_count} 封邮件")
return replied_count
except imaplib.IMAP4.error as e:
print(f"❌ IMAP连接错误: {e}")
return 0
except Exception as e:
print(f"❌ 处理过程中出错: {e}")
import traceback
traceback.print_exc()
return 0
finally:
try:
mail.close()
mail.logout()
except:
pass
def auto_reply_loop():
"""持续监控并自动回复"""
print("=" * 60)
print(f"🤖 自动回复系统已启动")
print(f"📧 监控邮箱: {EMAIL_ACCOUNT}")
print(f"⏱️ 检查间隔: {CHECK_INTERVAL}秒")
print(f"🚫 黑名单: {', '.join(BLACKLIST)}")
print("=" * 60)
while True:
try:
process_and_reply()
print(f"\n⏰ 等待 {CHECK_INTERVAL} 秒后继续监控...")
time.sleep(CHECK_INTERVAL)
except KeyboardInterrupt:
print("\n\n👋 用户中断,自动回复系统已停止")
break
except Exception as e:
print(f"⚠️ 运行错误: {e}")
time.sleep(CHECK_INTERVAL)
if __name__ == '__main__':
print(load_config(r'./QQ_mail.txt') )
print(f"开始监控邮箱: {EMAIL_ACCOUNT}")
auto_reply_loop()
更多推荐

所有评论(0)