🚀 使用OpenClaw + DeepSeek模型实现小红书自动发布与智能回帖

📖 前言:AI赋能内容创作

项目目标
通过OpenClaw框架结合DeepSeek模型,实现小红书内容的:

  1. ✅ 自动文章创作
  2. ✅ 智能配图生成
  3. ✅ 自动发布管理
  4. ✅ 智能评论回复
  5. ✅ 数据分析优化

技术栈

  • OpenClaw:AI助手框架,负责任务调度
  • DeepSeek模型:内容生成,免费且强大
  • 小红书API/模拟:内容发布接口
  • 图像处理:配图生成与优化

适合人群

  • 小红书内容创作者
  • 社交媒体运营人员
  • AI应用开发者
  • 自动化工具爱好者

🎯 第一章:环境搭建与配置

1.1 安装OpenClaw并配置DeepSeek模型

# 1. 安装OpenClaw
pnpm add -g openclaw

# 2. 创建工作空间
mkdir ~/xiaohongshu-bot && cd ~/xiaohongshu-bot
openclaw init

# 3. 配置DeepSeek模型
cat > config.yaml << 'EOF'
gateway:
  port: 3000
  host: localhost

model:
  provider: "deepseek"
  # DeepSeek API配置
  apiKey: "${DEEPSEEK_API_KEY}"
  model: "deepseek-chat"
  baseUrl: "https://api.deepseek.com"
  
  # 模型参数
  temperature: 0.7
  maxTokens: 2000
  topP: 0.9

memory:
  enabled: true
  path: ./workspace/memory

skills:
  enabled: true
  path: ./workspace/skills

# 小红书配置
xiaohongshu:
  enabled: true
  username: "${XHS_USERNAME}"
  password: "${XHS_PASSWORD}"
  # 或使用cookie登录
  cookies: "${XHS_COOKIES}"
  
  # 发布设置
  post:
    interval: 3600  # 发布间隔(秒)
    maxPerDay: 3    # 每日最多发布数
    categories: ["美食", "生活", "科技", "旅行"]
    
  # 评论设置
  comment:
    autoReply: true
    replyDelay: 60  # 回复延迟(秒)
    maxReplies: 10  # 单帖最多回复数
EOF

# 4. 设置环境变量
cat > .env << 'EOF'
# DeepSeek API密钥
DEEPSEEK_API_KEY=your-deepseek-api-key-here

# 小红书账号
XHS_USERNAME=your-xiaohongshu-username
XHS_PASSWORD=your-xiaohongshu-password

# 可选:直接使用cookie
XHS_COOKIES=your-xiaohongshu-cookies

# 代理设置(如果需要)
HTTP_PROXY=http://127.0.0.1:7890
HTTPS_PROXY=http://127.0.0.1:7890
EOF

# 5. 加载环境变量
export $(grep -v '^#' .env | xargs)

1.2 安装必要的Python库

# 1. 创建Python虚拟环境
python3 -m venv venv
source venv/bin/activate

# 2. 安装依赖
pip install --upgrade pip
pip install requests beautifulsoup4 pillow openai python-dotenv
pip install selenium webdriver-manager  # 用于模拟浏览器
pip install schedule  # 定时任务

# 3. 安装图像处理库
pip install pillow opencv-python numpy

# 4. 安装中文NLP工具
pip install jieba pypinyin

1.3 安装Node.js依赖

# 1. 初始化Node.js项目
npm init -y

# 2. 安装必要的包
npm install axios cheerio node-schedule
npm install sharp canvas  # 图像处理
npm install puppeteer  # 浏览器自动化

# 3. 安装OpenClaw相关
npm install @openclaw/core @openclaw/memory

🤖 第二章:DeepSeek内容生成模块

2.1 创建DeepSeek API客户端

# scripts/deepseek_client.py
import os
import json
import requests
from typing import List, Dict, Optional
from dotenv import load_dotenv

load_dotenv()

class DeepSeekClient:
    def __init__(self):
        self.api_key = os.getenv("DEEPSEEK_API_KEY")
        self.base_url = "https://api.deepseek.com/v1"
        self.headers = {
            "Authorization": f"Bearer {self.api_key}",
            "Content-Type": "application/json"
        }
    
    def generate_content(self, prompt: str, system_prompt: str = None, **kwargs) -> str:
        """生成文本内容"""
        messages = []
        
        if system_prompt:
            messages.append({"role": "system", "content": system_prompt})
        
        messages.append({"role": "user", "content": prompt})
        
        data = {
            "model": "deepseek-chat",
            "messages": messages,
            "temperature": kwargs.get("temperature", 0.7),
            "max_tokens": kwargs.get("max_tokens", 2000),
            "top_p": kwargs.get("top_p", 0.9),
            "stream": False
        }
        
        try:
            response = requests.post(
                f"{self.base_url}/chat/completions",
                headers=self.headers,
                json=data,
                timeout=30
            )
            response.raise_for_status()
            result = response.json()
            return result["choices"][0]["message"]["content"]
        except Exception as e:
            print(f"DeepSeek API调用失败: {e}")
            return None
    
    def generate_xiaohongshu_post(self, topic: str, style: str = "活泼") -> Dict:
        """生成小红书帖子"""
        system_prompt = """你是一个专业的小红书内容创作者,擅长创作吸引人的笔记。
        请按照以下格式生成内容:
        1. 标题:吸引眼球,包含emoji
        2. 正文:分段清晰,使用适当的emoji
        3. 标签:相关的话题标签
        
        风格要求:{style},面向年轻女性用户。"""
        
        prompt = f"""请创作一篇关于{topic}的小红书笔记。
        
        要求:
        1. 标题要吸引人,包含2-3个emoji
        2. 正文分3-5段,每段不要太长
        3. 使用小红书常用的语气和表达方式
        4. 添加5-8个相关的话题标签
        5. 可以适当添加互动引导(如"评论区告诉我")
        
        请用JSON格式返回:
        {{
            "title": "标题",
            "content": "正文内容",
            "hashtags": ["#标签1", "#标签2"],
            "interaction": "互动引导语"
        }}"""
        
        result = self.generate_content(prompt, system_prompt.format(style=style))
        
        if result:
            try:
                # 提取JSON部分
                start = result.find('{')
                end = result.rfind('}') + 1
                json_str = result[start:end]
                return json.loads(json_str)
            except:
                # 如果解析失败,返回默认结构
                return {
                    "title": f"🔥 {topic} | 超实用分享",
                    "content": result,
                    "hashtags": [f"#{topic}", "#生活分享", "#实用技巧"],
                    "interaction": "大家有什么问题评论区问我哦~"
                }
        return None
    
    def generate_comment_reply(self, post_content: str, comment: str) -> str:
        """生成评论回复"""
        system_prompt = "你是一个友善的小红书博主,正在回复粉丝的评论。回复要亲切、有帮助,可以适当使用emoji。"
        
        prompt = f"""原帖内容:{post_content[:200]}...
        
        粉丝评论:{comment}
        
        请生成一个亲切、有帮助的回复,长度在20-50字之间。"""
        
        reply = self.generate_content(prompt, system_prompt, max_tokens=100)
        return reply or "谢谢你的评论!❤️"
    
    def generate_hashtags(self, content: str, count: int = 5) -> List[str]:
        """生成话题标签"""
        prompt = f"""根据以下内容生成{count}个小红书话题标签:
        
        {content[:300]}
        
        要求:
        1. 标签要相关、热门
        2. 格式为#标签名
        3. 返回JSON数组格式"""
        
        result = self.generate_content(prompt, max_tokens=200)
        
        if result:
            try:
                start = result.find('[')
                end = result.rfind(']') + 1
                json_str = result[start:end]
                tags = json.loads(json_str)
                return [f"#{tag}" if not tag.startswith('#') else tag for tag in tags[:count]]
            except:
                pass
        
        # 默认标签
        default_tags = ["#生活分享", "#实用技巧", "#好物推荐", "#经验分享", "#日常"]
        return default_tags[:count]

# 测试DeepSeek客户端
if __name__ == "__main__":
    client = DeepSeekClient()
    
    # 测试生成小红书帖子
    post = client.generate_xiaohongshu_post("夏季穿搭技巧")
    print("生成的帖子:")
    print(json.dumps(post, ensure_ascii=False, indent=2))
    
    # 测试生成评论回复
    reply = client.generate_comment_reply(
        "今天分享夏季穿搭技巧...",
        "博主搭配好好看!求链接"
    )
    print("\n生成的回复:", reply)

2.2 内容优化模块

# scripts/content_optimizer.py
import re
import jieba
from typing import List, Dict

class ContentOptimizer:
    def __init__(self):
        # 加载停用词
        self.stopwords = self.load_stopwords()
        
        # 小红书热门关键词
        self.hot_keywords = [
            "穿搭", "美妆", "护肤", "美食", "旅行",
            "家居", "健身", "学习", "职场", "情感"
        ]
    
    def load_stopwords(self) -> set:
        """加载中文停用词"""
        stopwords = set()
        # 这里可以加载停用词文件
        common_stopwords = ["的", "了", "在", "是", "我", "有", "和", "就", 
                           "不", "人", "都", "一", "一个", "上", "也", "很"]
        stopwords.update(common_stopwords)
        return stopwords
    
    def optimize_title(self, title: str) -> str:
        """优化标题"""
        # 确保标题有emoji
        if not any(ord(char) > 127 for char in title[:10]):
            emojis = ["🔥", "✨", "🌟", "💫", "🎯", "📝", "💡", "👑"]
            import random
            title = f"{random.choice(emojis)} {title}"
        
        # 限制标题长度
        if len(title) > 30:
            title = title[:27] + "..."
        
        return title
    
    def optimize_content(self, content: str) -> str:
        """优化正文内容"""
        # 分段处理
        paragraphs = content.split('\n')
        optimized_paragraphs = []
        
        for para in paragraphs:
            if not para.strip():
                continue
            
            # 添加适当的emoji
            para = self.add_emoji_to_paragraph(para)
            
            # 限制段落长度
            if len(para) > 200:
                para = para[:197] + "..."
            
            optimized_paragraphs.append(para)
        
        # 重新组合
        optimized_content = '\n\n'.join(optimized_paragraphs)
        
        # 添加互动引导
        if "评论区" not in optimized_content and "留言" not in optimized_content:
            interactions = [
                "大家有什么问题可以在评论区问我哦~",
                "喜欢的话记得点赞收藏呀!",
                "你们还有什么想看的主题吗?评论区告诉我!"
            ]
            import random
            optimized_content += f"\n\n{random.choice(interactions)}"
        
        return optimized_content
    
    def add_emoji_to_paragraph(self, paragraph: str) -> str:
        """给段落添加合适的emoji"""
        emoji_map = {
            "推荐": "👍",
            "技巧": "💡",
            "分享": "📝",
            "教程": "🎯",
            "测评": "⭐",
            "对比": "⚖️",
            "清单": "📋",
            "总结": "📊",
            "注意": "⚠️",
            "重要": "❗",
            "惊喜": "🎁",
            "省钱": "💰",
            "变美": "💄",
            "健康": "💪",
            "学习": "📚",
            "旅行": "✈️",
            "美食": "🍽️",
            "家居": "🏠"
        }
        
        for keyword, emoji in emoji_map.items():
            if keyword in paragraph and emoji not in paragraph:
                # 在关键词后添加emoji
                paragraph = paragraph.replace(keyword, f"{keyword}{emoji}", 1)
                break
        
        return paragraph
    
    def extract_keywords(self, content: str, top_n: int = 5) -> List[str]:
        """提取关键词"""
        # 使用jieba分词
        words = jieba.cut(content)
        
        # 过滤停用词和短词
        keywords = []
        for word in words:
            word = word.strip()
            if (len(word) >= 2 and 
                word not in self.stopwords and
                not word.isdigit()):
                keywords.append(word)
        
        # 统计词频
        from collections import Counter
        word_freq = Counter(keywords)
        
        # 返回高频词
        return [word for word, _ in word_freq.most_common(top_n)]
    
    def generate_hashtags_from_content(self, content: str, count: int = 8) -> List[str]:
        """从内容生成话题标签"""
        keywords = self.extract_keywords(content, count * 2)
        
        # 结合热门关键词
        all_keywords = list(set(keywords + self.hot_keywords))
        
        # 生成标签
        hashtags = []
        for keyword in all_keywords[:count]:
            hashtag = f"#{keyword}"
            if len(hashtag) <= 20:  # 小红书标签长度限制
                hashtags.append(hashtag)
        
        # 确保有足够数量的标签
        while len(hashtags) < count:
            hashtags.append("#生活分享")
        
        return hashtags

# 测试内容优化
if __name__ == "__main__":
    optimizer = ContentOptimizer()
    
    test_content = """
    今天给大家分享几个夏季穿搭的小技巧。
    首先选择透气面料,比如棉麻材质。
    其次颜色要清爽,白色蓝色都不错。
    最后配饰也很重要,草帽和墨镜是必备。
    """
    
    print("原始内容:", test_content)
    print("\n优化后内容:", optimizer.optimize_content(test_content))
    print("\n关键词:", optimizer.extract_keywords(test_content))
    print("\n话题标签:", optimizer.generate_hashtags_from_content(test_content))

🖼️ 第三章:图像生成与处理模块

3.1 使用AI生成配图

# scripts/image_generator.py
import os
import requests
import base64
from PIL import Image, ImageDraw, ImageFont
import io
from typing import Optional, List

class ImageGenerator:
    def __init__(self):
        # 可以使用多种AI图像生成API
        self.apis = {
            "deepseek": "https://api.deepseek.com/v1/images/generations",
            # 其他API可以在这里添加
        }
        
        # 字体路径
        self.font_path = self.find_font()
    
    def find_font(self) -> str:
        """查找可用的中文字体"""
        font_paths = [
            "/System/Library/Fonts/PingFang.ttc",  # macOS
            "/usr/share/fonts/truetype/wqy/wqy-microhei.ttc",  # Linux
            "C:/Windows/Fonts/msyh.ttc",  # Windows
        ]
        
        for path in font_paths:
            if os.path.exists(path):
                return path
        
        return None
    
    def generate_ai_image(self, prompt: str, api: str = "deepseek") -> Optional[Image.Image]:
        """使用AI生成图像"""
        if api not in self.apis:
            print(f"不支持的API: {api}")
            return None
        
        headers = {
            "Authorization": f"Bearer {os.getenv('DEEPSEEK_API_KEY')}",
            "Content-Type": "application/json"
        }
        
        data = {
            "model": "dall-e-3",  # DeepSeek可能支持图像生成
            "prompt": prompt,
            "n": 1,
            "size": "1024x1024",
            "quality": "standard"
        }
        
        try:
            response = requests.post(
                self.apis[api],
                headers=headers,
                json=data,
                timeout=60
            )
            
            if response.status_code == 200:
                result = response.json()
                image_url = result["data"][0]["url"]
                
                # 下载图像
                img_response = requests.get(image_url, timeout=30)
                img_response.raise_for_status()
                
                image = Image.open(io.BytesIO(img_response.content))
                return image
            else:
                print(f"AI图像生成失败: {response.status_code}")
                return None
                
        except Exception as e:
            print(f"AI图像生成错误: {e}")
            return None
    
    def create_text_image(self, text: str, size: tuple = (800, 600), 
                         bg_color: str = "#FFFFFF") -> Image.Image:
        """创建文字图片(小红书常用风格)"""
        # 创建画布
        image = Image.new("RGB", size, bg_color)
        draw = ImageDraw.Draw(image)
        
        # 加载字体
        try:
            if self.font_path:
                font = ImageFont.truetype(self.font_path, 40)
            else:
                font = ImageFont.load_default()
        except:
            font = ImageFont.load_default()
        
        # 计算文字位置
        lines = self.wrap_text(text, font, size[0] - 100)
        total_height = len(lines) * 50
        
        # 绘制文字
        y = (size[1] - total_height) // 2
        for line in lines:
            bbox = draw.textbbox((0, 0), line, font=font)
            text_width = bbox[2] - bbox[0]
            x = (size[0] - text_width) // 2
            
            # 添加文字阴影(可选)
            draw.text((x+2, y+2), line, font=font, fill="#CCCCCC")
            draw.text((x, y), line, font=font, fill="#333333")
            
            y += 50
        
        return image
    
    def wrap_text(self, text: str, font, max_width: int) -> List[str]:
        """文本换行"""
        lines = []
        words = list(text)
        
        current_line = []
        current_width = 0
        
        for word in words:
            bbox = font.getbbox(''.join(current_line + [word]))
            word_width = bbox[2] - bbox[0]
            
            if current_width + word_width <= max_width:
                current_line.append(word)
                current_width += word_width
            else:
                if current_line:
                    lines.append(''.join(current_line))
                current_line = [word]
                current_width = word_width
        
        if current_line:
            lines.append(''.join(current_line))
        
        return lines
    
    def create_collage(self, images: List[Image.Image], layout: str = "grid") -> Image.Image:
        """创建拼图"""
        if not images:
            return self.create_text_image("暂无图片")
        
        if layout == "grid":
            # 网格布局
            cols = min(3, len(images))
            rows = (len(images) + cols - 1) // cols
            
            cell_size = 300
            collage = Image.new("RGB", (cols * cell_size, rows * cell_size), "#FFFFFF")
            
            for i, img in enumerate(images):
                # 调整图片大小
                img = img.resize((cell_size, cell_size), Image.Resampling.LANCZOS)
                
                row = i // cols
                col = i % cols
                collage.paste(img, (col * cell_size, row * cell_size))
            
            return collage
        
        elif layout == "vertical":
            # 垂直拼接
            widths = [img.width for img in images]
            height = sum(img.height for img in images)
            width = max(widths)
            
            collage = Image.new("RGB", (width, height), "#FFFFFF")
            
            y = 0
            for img in images:
                if img.width != width:
                    img = img.resize((width, int(img.height * width / img.width)), 
                                   Image.Resampling.LANCZOS)
                collage.paste(img, (0, y))
                y += img.height
            
            return collage
        
        return images[0]  # 默认返回第一张
    
    def add_watermark(self, image: Image.Image, text: str = "小红书自动发布") -> Image.Image:
        """添加水印"""
        draw = ImageDraw.Draw(image)
        
        try:
            if self.font_path:
                font = ImageFont.truetype(self.font_path, 30)
            else:
                font = ImageFont.load_default()
        except:
            font = ImageFont.load_default()
        
        # 在右下角添加水印
        bbox = draw.textbbox((0, 0), text, font=font)
        text_width = bbox[2] - bbox[0]
        text_height = bbox[3] - bbox[1]
        
        x = image.width - text_width - 20
        y = image.height - text_height - 20
        
        # 半透明背景
        overlay = Image.new("RGBA", image.size, (0, 0, 0, 0))
        overlay_draw = ImageDraw.Draw(overlay)
        overlay_draw.rectangle([x-10, y-10, x+text_width+10, y+text_height+10], 
                             fill=(255, 255, 255, 180))
        
        image = Image.alpha_composite(image.convert("RGBA"), overlay)
        draw = ImageDraw.Draw(image)
        
        draw.text((x, y), text, font=font, fill=(0, 0, 0, 255))
        
        return image.convert("RGB")
    
    def generate_xiaohongshu_images(self, content: str, count: int = 3) -> List[Image.Image]:
        """生成小红书配图"""
        images = []
        
        # 1. 生成封面图
        cover_prompt = f"小红书风格封面图,内容关于:{content[:50]}...,清新简洁风格"
        cover_image = self.generate_ai_image(cover_prompt)
        if cover_image:
            images.append(cover_image)
        
        # 2. 生成内容图
        for i in range(count - 1):
            if i == 0:
                # 文字图
                text_image = self.create_text_image(
                    content[:100] + "...",
                    size=(800, 600),
                    bg_color="#FFF9F0"  # 小红书常用背景色
                )
                images.append(text_image)
            else:
                # 相关配图
                related_prompt = f"小红书配图,{content[:30]}...,生活化场景"
                related_image = self.generate_ai_image(related_prompt)
                if related_image:
                    images.append(related_image)
        
        # 确保有足够图片
        while len(images) < count:
            default_image = self.create_text_image(
                "精彩内容分享",
                size=(800, 600),
                bg_color="#FFE4E1"
            )
            images.append(default_image)
        
        # 添加水印
        images = [self.add_watermark(img) for img in images]
        
        return images

# 测试图像生成
if __name__ == "__main__":
    generator = ImageGenerator()
    
    # 测试文字图片
    text_img = generator.create_text_image("夏季穿搭技巧分享", size=(800, 400))
    text_img.save("test_text_image.jpg")
    print("文字图片已保存: test_text_image.jpg")
    
    # 测试拼图
    images = [generator.create_text_image(f"图片{i+1}") for i in range(4)]
    collage = generator.create_collage(images, "grid")
    collage.save("test_collage.jpg")
    print("拼图已保存: test_collage.jpg")

3.2 图片优化器

# scripts/image_optimizer.py
from PIL import Image, ImageEnhance, ImageFilter
import numpy as np

class ImageOptimizer:
    def __init__(self):
        self.xiaohongshu_aspect_ratios = [
            (3, 4),    # 竖图
            (1, 1),    # 方图
            (4, 3),    # 横图
        ]
    
    def optimize_for_xiaohongshu(self, image: Image.Image, 
                                target_ratio: tuple = (3, 4)) -> Image.Image:
        """优化图片为小红书尺寸"""
        # 计算目标尺寸
        target_width, target_height = self.calculate_target_size(
            image.size, target_ratio
        )
        
        # 调整尺寸
        optimized = image.resize(
            (target_width, target_height),
            Image.Resampling.LANCZOS
        )
        
        # 增强效果
        optimized = self.enhance_image(optimized)
        
        return optimized
    
    def calculate_target_size(self, original_size: tuple, target_ratio: tuple) -> tuple:
        """计算目标尺寸"""
        orig_width, orig_height = original_size
        target_width, target_height = target_ratio
        
        # 保持原图比例,填充到目标比例
        if orig_width / orig_height > target_width / target_height:
            # 原图更宽,以高度为准
            new_height = orig_height
            new_width = int(new_height * target_width / target_height)
            
            # 裁剪宽度
            left = (orig_width - new_width) // 2
            right = left + new_width
            return (new_width, new_height)
        else:
            # 原图更高,以宽度为准
            new_width = orig_width
            new_height = int(new_width * target_height / target_width)
            
            # 裁剪高度
            top = (orig_height - new_height) // 2
            bottom = top + new_height
            return (new_width, new_height)
    
    def enhance_image(self, image: Image.Image) -> Image.Image:
        """增强图片效果"""
        # 提高亮度
        enhancer = ImageEnhance.Brightness(image)
        image = enhancer.enhance(1.1)
        
        # 提高对比度
        enhancer = ImageEnhance.Contrast(image)
        image = enhancer.enhance(1.05)
        
        # 提高饱和度
        enhancer = ImageEnhance.Color(image)
        image = enhancer.enhance(1.1)
        
        # 轻微锐化
        image = image.filter(ImageFilter.SHARPEN)
        
        return image
    
    def add_xiaohongshu_frame(self, image: Image.Image) -> Image.Image:
        """添加小红书风格边框"""
        # 创建边框
        border_size = 20
        frame_color = "#FFFFFF"
        
        new_width = image.width + border_size * 2
        new_height = image.height + border_size * 2
        
        framed = Image.new("RGB", (new_width, new_height), frame_color)
        framed.paste(image, (border_size, border_size))
        
        return framed
    
    def create_gradient_background(self, size: tuple, 
                                  colors: list = ["#FF9A9E", "#FAD0C4"]) -> Image.Image:
        """创建渐变背景"""
        width, height = size
        
        # 创建渐变
        gradient = Image.new("RGB", (width, height))
        
        for y in range(height):
            # 计算渐变颜色
            ratio = y / height
            r = int(int(colors[0][1:3], 16) * (1 - ratio) + 
                    int(colors[1][1:3], 16) * ratio)
            g = int(int(colors[0][3:5], 16) * (1 - ratio) + 
                    int(colors[1][3:5], 16) * ratio)
            b = int(int(colors[0][5:7], 16) * (1 - ratio) + 
                    int(colors[1][5:7], 16) * ratio)
            
            color = (r, g, b)
            
            # 绘制水平线
            for x in range(width):
                gradient.putpixel((x, y), color)
        
        return gradient

📱 第四章:小红书自动化发布模块

4.1 小红书Web自动化

# scripts/xiaohongshu_automator.py
import time
import json
import random
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import TimeoutException, NoSuchElementException
import undetected_chromedriver as uc

class XiaohongshuAutomator:
    def __init__(self, headless: bool = False):
        self.driver = None
        self.wait = None
        self.headless = headless
        self.is_logged_in = False
        
    def init_driver(self):
        """初始化浏览器驱动"""
        options = uc.ChromeOptions()
        
        if self.headless:
            options.add_argument("--headless")
        
        # 添加常用参数
        options.add_argument("--disable-blink-features=AutomationControlled")
        options.add_argument("--disable-dev-shm-usage")
        options.add_argument("--no-sandbox")
        options.add_argument("--disable-gpu")
        options.add_argument("--window-size=1920,1080")
        
        # 添加用户代理
        options.add_argument("user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36")
        
        try:
            self.driver = uc.Chrome(options=options)
            self.wait = WebDriverWait(self.driver, 20)
            return True
        except Exception as e:
            print(f"初始化浏览器失败: {e}")
            return False
    
    def login_with_cookies(self, cookies_file: str = "cookies.json"):
        """使用cookies登录"""
        try:
            # 打开小红书
            self.driver.get("https://www.xiaohongshu.com")
            time.sleep(3)
            
            # 加载cookies
            with open(cookies_file, 'r') as f:
                cookies = json.load(f)
            
            for cookie in cookies:
                if 'sameSite' in cookie:
                    del cookie['sameSite']
                self.driver.add_cookie(cookie)
            
            # 刷新页面
            self.driver.refresh()
            time.sleep(3)
            
            # 检查是否登录成功
            if self.check_login():
                self.is_logged_in = True
                print("Cookies登录成功")
                return True
            else:
                print("Cookies登录失败")
                return False
                
        except Exception as e:
            print(f"Cookies登录失败: {e}")
            return False
    
    def check_login(self) -> bool:
        """检查是否已登录"""
        try:
            # 检查登录元素
            self.driver.find_element(By.CSS_SELECTOR, ".login-container")
            return False
        except:
            try:
                # 检查用户头像
                self.driver.find_element(By.CSS_SELECTOR, ".avatar")
                return True
            except:
                return False
    
    def create_post(self, title: str, content: str, images: list, 
                   hashtags: list, publish: bool = False) -> bool:
        """创建帖子"""
        if not self.is_logged_in:
            print("未登录,无法创建帖子")
            return False
        
        try:
            # 打开发布页面
            self.driver.get("https://www.xiaohongshu.com/creation")
            time.sleep(3)
            
            # 输入标题
            title_input = self.wait.until(
                EC.presence_of_element_located((By.CSS_SELECTOR, "textarea[placeholder*='标题']"))
            )
            title_input.clear()
            title_input.send_keys(title)
            time.sleep(1)
            
            # 输入内容
            content_input = self.driver.find_element(
                By.CSS_SELECTOR, "div[contenteditable='true']"
            )
            content_input.clear()
            content_input.send_keys(content)
            time.sleep(1)
            
            # 添加话题标签
            for hashtag in hashtags:
                content_input.send_keys(f" {hashtag}")
                time.sleep(0.5)
            
            # 上传图片
            self.upload_images(images)
            time.sleep(3)
            
            # 选择分类
            self.select_category()
            time.sleep(1)
            
            if publish:
                # 发布
                publish_btn = self.wait.until(
                    EC.element_to_be_clickable((By.CSS_SELECTOR, "button:contains('发布')"))
                )
                publish_btn.click()
                time.sleep(5)
                
                print("帖子发布成功")
                return True
            else:
                print("帖子已保存为草稿")
                return True
                
        except Exception as e:
            print(f"创建帖子失败: {e}")
            return False
    
    def upload_images(self, image_paths: list):
        """上传图片"""
        try:
            # 找到上传按钮
            upload_input = self.driver.find_element(
                By.CSS_SELECTOR, "input[type='file'][accept*='image']"
            )
            
            # 上传多张图片
            for image_path in image_paths:
                upload_input.send_keys(image_path)
                time.sleep(2)  # 等待上传完成
            
            print(f"成功上传 {len(image_paths)} 张图片")
            
        except Exception as e:
            print(f"上传图片失败: {e}")
    
    def select_category(self, category: str = "生活"):
        """选择分类"""
        try:
            # 点击分类选择器
            category_btn = self.driver.find_element(
                By.CSS_SELECTOR, ".category-selector"
            )
            category_btn.click()
            time.sleep(1)
            
            # 选择分类
            category_option = self.driver.find_element(
                By.XPATH, f"//div[contains(text(), '{category}')]"
            )
            category_option.click()
            time.sleep(1)

Logo

小龙虾开发者社区是 CSDN 旗下专注 OpenClaw 生态的官方阵地,聚焦技能开发、插件实践与部署教程,为开发者提供可直接落地的方案、工具与交流平台,助力高效构建与落地 AI 应用

更多推荐