#!/usr/bin/env python3

"""163 邮箱监听 & 内容抓取

通过 IMAP 连接 163 邮箱,获取指定日期范围内的邮件并输出摘要。

使用 IMAP ID 命令绕过网易 "Unsafe Login" 限制。

用法:

    python3 mail_monitor.py                # 获取当天邮件

    python3 mail_monitor.py --days 7       # 获取最近 7 天邮件

    python3 mail_monitor.py --all          # 获取所有未读邮件

配置:

    凭证读取优先级:

    1. 环境变量 MAIL163_USER / MAIL163_PASS

    2. 配置文件 ~/.mail163.conf 或同目录下 .mail163.conf(两行: 账号、授权码)

"""

import argparse

import email

import email.header

import email.utils

import imaplib

import os

import re

import sys

from datetime import datetime, timedelta

from html.parser import HTMLParser

# ── 配置 ──────────────────────────────────────────────────

IMAP_HOST = "imap.163.com"

IMAP_PORT = 993

PREVIEW_LENGTH = 200

CONF_PATHS = [

    os.path.join(os.path.dirname(os.path.abspath(__file__)), ".mail163.conf"),

    os.path.expanduser("~/.mail163.conf"),

]

def load_credentials():

    """加载邮箱凭证(环境变量 > 配置文件)"""

    user = os.environ.get("MAIL163_USER", "")

    passwd = os.environ.get("MAIL163_PASS", "")

    if user and passwd:

        return user, passwd

    for path in CONF_PATHS:

        if os.path.isfile(path):

            with open(path) as f:

                lines = [l.strip() for l in f if l.strip()]

            if len(lines) >= 2:

                return lines[0], lines[1]

    return "", ""

# ── HTML → 纯文本 ────────────────────────────────────────

class _HTMLTextExtractor(HTMLParser):

    _BLOCK_TAGS = {"p", "br", "div", "li", "tr", "h1", "h2", "h3", "h4", "h5", "h6"}

    _SKIP_TAGS = {"script", "style"}

    def __init__(self):

        super().__init__()

        self._parts, self._skip = [], False

    def handle_starttag(self, tag, attrs):

        if tag in self._SKIP_TAGS:

            self._skip = True

    def handle_endtag(self, tag):

        if tag in self._SKIP_TAGS:

            self._skip = False

        if tag in self._BLOCK_TAGS:

            self._parts.append("\n")

    def handle_data(self, data):

        if not self._skip:

            self._parts.append(data)

    def get_text(self):

        return re.sub(r"\n{3,}", "\n\n", "".join(self._parts)).strip()

def html_to_text(html_content):

    ext = _HTMLTextExtractor()

    try:

        ext.feed(html_content)

        return ext.get_text()

    except Exception:

        return re.sub(r"<[^>]+>", "", html_content).strip()

# ── 邮件解析 ─────────────────────────────────────────────

def _decode_header(value):

    """解码邮件头(处理 RFC 2047 编码的中文等)"""

    if not value:

        return ""

    parts = []

    for data, charset in email.header.decode_header(value):

        if isinstance(data, bytes):

            try:

                parts.append(data.decode(charset or "utf-8", errors="replace"))

            except (LookupError, UnicodeDecodeError):

                parts.append(data.decode("utf-8", errors="replace"))

        else:

            parts.append(data)

    return "".join(parts)

def _decode_payload(part):

    """解码邮件正文 payload,自动尝试多种编码"""

    raw = part.get_payload(decode=True)

    if not raw:

        return ""

    charset = part.get_content_charset()

    if charset:

        try:

            return raw.decode(charset, errors="replace")

        except (LookupError, UnicodeDecodeError):

            pass

    for enc in ("utf-8", "gbk", "gb2312", "gb18030", "big5", "latin-1"):

        try:

            return raw.decode(enc)

        except (UnicodeDecodeError, LookupError):

            continue

    return raw.decode("utf-8", errors="replace")

def _parse_sender(msg):

    name, addr = email.utils.parseaddr(msg.get("From", ""))

    name = _decode_header(name)

    return f"{name} <{addr}>" if name else addr

def _parse_date(msg):

    try:

        return email.utils.parsedate_to_datetime(msg["Date"]).strftime("%Y-%m-%d %H:%M")

    except Exception:

        return msg.get("Date", "")

def _get_body(msg):

    """提取正文,优先纯文本,降级 HTML"""

    text_body = html_body = None

    parts = msg.walk() if msg.is_multipart() else [msg]

    for part in parts:

        disp = str(part.get("Content-Disposition", ""))

        if "attachment" in disp:

            continue

        ct = part.get_content_type()

        if ct == "text/plain" and text_body is None:

            text_body = _decode_payload(part)

        elif ct == "text/html" and html_body is None:

            html_body = _decode_payload(part)

    if text_body:

        return text_body.strip()

    if html_body:

        return html_to_text(html_body)

    return "(无法提取正文)"

# ── IMAP 连接 ────────────────────────────────────────────

def connect(user, passwd):

    """连接 163 IMAP 并登录,返回 IMAP4_SSL 连接"""

    try:

        conn = imaplib.IMAP4_SSL(IMAP_HOST, IMAP_PORT)

    except Exception as e:

        sys.exit(f"错误: 无法连接 {IMAP_HOST}:{IMAP_PORT}{e}")

    # 发送 ID 命令绕过 163 Unsafe Login 限制

    try:

        conn.xatom("ID", '("name" "mail_monitor" "version" "1.0" "vendor" "imaplib")')

    except Exception:

        pass

    try:

        conn.login(user, passwd)

    except imaplib.IMAP4.error as e:

        sys.exit(f"错误: 登录失败 — {e}\n  请确认授权码是否正确且 IMAP 已开启。")

    return conn

def fetch_mails(conn, days=0):

    """获取邮件列表。days=0 当天,>0 最近 N 天,<0 所有未读"""

    conn.select("INBOX", readonly=True)

    if days < 0:

        criteria = "UNSEEN"

    else:

        since = datetime.now() - timedelta(days=days)

        criteria = f'SINCE {since.strftime("%d-%b-%Y")}'

    status, data = conn.search(None, criteria)

    if status != "OK" or not data[0]:

        return []

    messages = []

    for mid in data[0].split():

        st, msg_data = conn.fetch(mid, "(RFC822)")

        if st == "OK" and msg_data and msg_data[0]:

            messages.append(email.message_from_bytes(msg_data[0][1]))

    return messages

# ── 输出 ──────────────────────────────────────────────────

def print_report(messages, days):

    today = datetime.now().strftime("%Y-%m-%d")

    if days < 0:

        label = "所有未读"

    elif days > 0:

        label = f"最近 {days} 天"

    else:

        label = "今日"

    print(f"===== 163 邮箱日报 ({today}) — {label} =====")

    print(f"共 {len(messages)} 封邮件\n")

    if not messages:

        print("(无新邮件)")

        return

    for i, msg in enumerate(messages, 1):

        body = _get_body(msg)

        preview = re.sub(r"\s+", " ", body[:PREVIEW_LENGTH]).strip()

        if len(body) > PREVIEW_LENGTH:

            preview += "..."

        print(f"[{i}] 📨 来自: {_parse_sender(msg)}")

        print(f"    主题: {_decode_header(msg.get('Subject', '(无主题)'))}")

        print(f"    时间: {_parse_date(msg)}")

        print(f"    内容: {preview}")

        print("-" * 40)

# ── 入口 ──────────────────────────────────────────────────

def main():

    ap = argparse.ArgumentParser(description="163 邮箱监听 & 内容抓取")

    ap.add_argument("--days", type=int, default=0, help="获取最近 N 天的邮件(默认 0 = 当天)")

    ap.add_argument("--all", action="store_true", help="获取所有未读邮件")

    args = ap.parse_args()

    user, passwd = load_credentials()

    if not user or not passwd:

        sys.exit(

            "错误: 未找到邮箱凭证。\n"

            "  方式一: export MAIL163_USER=xxx MAIL163_PASS=xxx\n"

            "  方式二: 创建配置文件 ~/.mail163.conf(第一行账号,第二行授权码)"

        )

    days = -1 if args.all else args.days

    conn = connect(user, passwd)

    try:

        print_report(fetch_mails(conn, days), days)

    finally:

        try:

            conn.logout()

        except Exception:

            pass

if __name__ == "__main__":

    main()

我有一个163邮箱监听脚本(mail_monitor.py),已经可以正常运行,能通过IMAP获取邮件内容并输出摘要。

请基于这个脚本帮我创建一个skill,实现以下功能:

每天早上8点自动运行脚本,获取当天邮件

将邮件内容喂给大模型,提炼成"今日AI热点TOP5"格式的摘要

摘要格式参考:标题+emoji分类标签+3个要点

输出结果展示在对话界面

注意事项:

163邮箱已通过xatom ID命令绕过Unsafe Login限制,不需要额外处理

凭证通过环境变量MAIL163_USER和MAIL163_PASS传入,不要硬编码

如果当天没有新邮件,输出"今日暂无新邮件"

脚本文件已附上,请直接基于现有代码生成skill配置

以上是提示词直接复制给openclaw 即可 注意最后还是要提供自己的邮箱和 授权码

Logo

小龙虾开发者社区是 CSDN 旗下专注 OpenClaw 生态的官方阵地,聚焦技能开发、插件实践与部署教程,为开发者提供可直接落地的方案、工具与交流平台,助力高效构建与落地 AI 应用

更多推荐