使用OpenClaw + DeepSeek模型实现小红书自动发布与智能回帖
本文介绍了使用OpenClaw框架和DeepSeek模型实现小红书自动化运营的技术方案。主要内容包括: 系统架构:整合OpenClaw任务调度框架与DeepSeek内容生成模型,实现小红书内容创作、发布和评论回复的全流程自动化。 环境配置: OpenClaw安装与初始化 DeepSeek API密钥配置 Python和Node.js依赖安装 小红书账号认证设置 核心功能模块: DeepSeek内容
·
🚀 使用OpenClaw + DeepSeek模型实现小红书自动发布与智能回帖
📖 前言:AI赋能内容创作
项目目标:
通过OpenClaw框架结合DeepSeek模型,实现小红书内容的:
- ✅ 自动文章创作
- ✅ 智能配图生成
- ✅ 自动发布管理
- ✅ 智能评论回复
- ✅ 数据分析优化
技术栈:
- OpenClaw:AI助手框架,负责任务调度
- DeepSeek模型:内容生成,免费且强大
- 小红书API/模拟:内容发布接口
- 图像处理:配图生成与优化
适合人群:
- 小红书内容创作者
- 社交媒体运营人员
- AI应用开发者
- 自动化工具爱好者
🎯 第一章:环境搭建与配置
1.1 安装OpenClaw并配置DeepSeek模型
# 1. 安装OpenClaw
pnpm add -g openclaw
# 2. 创建工作空间
mkdir ~/xiaohongshu-bot && cd ~/xiaohongshu-bot
openclaw init
# 3. 配置DeepSeek模型
cat > config.yaml << 'EOF'
gateway:
port: 3000
host: localhost
model:
provider: "deepseek"
# DeepSeek API配置
apiKey: "${DEEPSEEK_API_KEY}"
model: "deepseek-chat"
baseUrl: "https://api.deepseek.com"
# 模型参数
temperature: 0.7
maxTokens: 2000
topP: 0.9
memory:
enabled: true
path: ./workspace/memory
skills:
enabled: true
path: ./workspace/skills
# 小红书配置
xiaohongshu:
enabled: true
username: "${XHS_USERNAME}"
password: "${XHS_PASSWORD}"
# 或使用cookie登录
cookies: "${XHS_COOKIES}"
# 发布设置
post:
interval: 3600 # 发布间隔(秒)
maxPerDay: 3 # 每日最多发布数
categories: ["美食", "生活", "科技", "旅行"]
# 评论设置
comment:
autoReply: true
replyDelay: 60 # 回复延迟(秒)
maxReplies: 10 # 单帖最多回复数
EOF
# 4. 设置环境变量
cat > .env << 'EOF'
# DeepSeek API密钥
DEEPSEEK_API_KEY=your-deepseek-api-key-here
# 小红书账号
XHS_USERNAME=your-xiaohongshu-username
XHS_PASSWORD=your-xiaohongshu-password
# 可选:直接使用cookie
XHS_COOKIES=your-xiaohongshu-cookies
# 代理设置(如果需要)
HTTP_PROXY=http://127.0.0.1:7890
HTTPS_PROXY=http://127.0.0.1:7890
EOF
# 5. 加载环境变量
export $(grep -v '^#' .env | xargs)
1.2 安装必要的Python库
# 1. 创建Python虚拟环境
python3 -m venv venv
source venv/bin/activate
# 2. 安装依赖
pip install --upgrade pip
pip install requests beautifulsoup4 pillow openai python-dotenv
pip install selenium webdriver-manager # 用于模拟浏览器
pip install schedule # 定时任务
# 3. 安装图像处理库
pip install pillow opencv-python numpy
# 4. 安装中文NLP工具
pip install jieba pypinyin
1.3 安装Node.js依赖
# 1. 初始化Node.js项目
npm init -y
# 2. 安装必要的包
npm install axios cheerio node-schedule
npm install sharp canvas # 图像处理
npm install puppeteer # 浏览器自动化
# 3. 安装OpenClaw相关
npm install @openclaw/core @openclaw/memory
🤖 第二章:DeepSeek内容生成模块
2.1 创建DeepSeek API客户端
# scripts/deepseek_client.py
import os
import json
import requests
from typing import List, Dict, Optional
from dotenv import load_dotenv
load_dotenv()
class DeepSeekClient:
def __init__(self):
self.api_key = os.getenv("DEEPSEEK_API_KEY")
self.base_url = "https://api.deepseek.com/v1"
self.headers = {
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json"
}
def generate_content(self, prompt: str, system_prompt: str = None, **kwargs) -> str:
"""生成文本内容"""
messages = []
if system_prompt:
messages.append({"role": "system", "content": system_prompt})
messages.append({"role": "user", "content": prompt})
data = {
"model": "deepseek-chat",
"messages": messages,
"temperature": kwargs.get("temperature", 0.7),
"max_tokens": kwargs.get("max_tokens", 2000),
"top_p": kwargs.get("top_p", 0.9),
"stream": False
}
try:
response = requests.post(
f"{self.base_url}/chat/completions",
headers=self.headers,
json=data,
timeout=30
)
response.raise_for_status()
result = response.json()
return result["choices"][0]["message"]["content"]
except Exception as e:
print(f"DeepSeek API调用失败: {e}")
return None
def generate_xiaohongshu_post(self, topic: str, style: str = "活泼") -> Dict:
"""生成小红书帖子"""
system_prompt = """你是一个专业的小红书内容创作者,擅长创作吸引人的笔记。
请按照以下格式生成内容:
1. 标题:吸引眼球,包含emoji
2. 正文:分段清晰,使用适当的emoji
3. 标签:相关的话题标签
风格要求:{style},面向年轻女性用户。"""
prompt = f"""请创作一篇关于{topic}的小红书笔记。
要求:
1. 标题要吸引人,包含2-3个emoji
2. 正文分3-5段,每段不要太长
3. 使用小红书常用的语气和表达方式
4. 添加5-8个相关的话题标签
5. 可以适当添加互动引导(如"评论区告诉我")
请用JSON格式返回:
{{
"title": "标题",
"content": "正文内容",
"hashtags": ["#标签1", "#标签2"],
"interaction": "互动引导语"
}}"""
result = self.generate_content(prompt, system_prompt.format(style=style))
if result:
try:
# 提取JSON部分
start = result.find('{')
end = result.rfind('}') + 1
json_str = result[start:end]
return json.loads(json_str)
except:
# 如果解析失败,返回默认结构
return {
"title": f"🔥 {topic} | 超实用分享",
"content": result,
"hashtags": [f"#{topic}", "#生活分享", "#实用技巧"],
"interaction": "大家有什么问题评论区问我哦~"
}
return None
def generate_comment_reply(self, post_content: str, comment: str) -> str:
"""生成评论回复"""
system_prompt = "你是一个友善的小红书博主,正在回复粉丝的评论。回复要亲切、有帮助,可以适当使用emoji。"
prompt = f"""原帖内容:{post_content[:200]}...
粉丝评论:{comment}
请生成一个亲切、有帮助的回复,长度在20-50字之间。"""
reply = self.generate_content(prompt, system_prompt, max_tokens=100)
return reply or "谢谢你的评论!❤️"
def generate_hashtags(self, content: str, count: int = 5) -> List[str]:
"""生成话题标签"""
prompt = f"""根据以下内容生成{count}个小红书话题标签:
{content[:300]}
要求:
1. 标签要相关、热门
2. 格式为#标签名
3. 返回JSON数组格式"""
result = self.generate_content(prompt, max_tokens=200)
if result:
try:
start = result.find('[')
end = result.rfind(']') + 1
json_str = result[start:end]
tags = json.loads(json_str)
return [f"#{tag}" if not tag.startswith('#') else tag for tag in tags[:count]]
except:
pass
# 默认标签
default_tags = ["#生活分享", "#实用技巧", "#好物推荐", "#经验分享", "#日常"]
return default_tags[:count]
# 测试DeepSeek客户端
if __name__ == "__main__":
client = DeepSeekClient()
# 测试生成小红书帖子
post = client.generate_xiaohongshu_post("夏季穿搭技巧")
print("生成的帖子:")
print(json.dumps(post, ensure_ascii=False, indent=2))
# 测试生成评论回复
reply = client.generate_comment_reply(
"今天分享夏季穿搭技巧...",
"博主搭配好好看!求链接"
)
print("\n生成的回复:", reply)
2.2 内容优化模块
# scripts/content_optimizer.py
import re
import jieba
from typing import List, Dict
class ContentOptimizer:
def __init__(self):
# 加载停用词
self.stopwords = self.load_stopwords()
# 小红书热门关键词
self.hot_keywords = [
"穿搭", "美妆", "护肤", "美食", "旅行",
"家居", "健身", "学习", "职场", "情感"
]
def load_stopwords(self) -> set:
"""加载中文停用词"""
stopwords = set()
# 这里可以加载停用词文件
common_stopwords = ["的", "了", "在", "是", "我", "有", "和", "就",
"不", "人", "都", "一", "一个", "上", "也", "很"]
stopwords.update(common_stopwords)
return stopwords
def optimize_title(self, title: str) -> str:
"""优化标题"""
# 确保标题有emoji
if not any(ord(char) > 127 for char in title[:10]):
emojis = ["🔥", "✨", "🌟", "💫", "🎯", "📝", "💡", "👑"]
import random
title = f"{random.choice(emojis)} {title}"
# 限制标题长度
if len(title) > 30:
title = title[:27] + "..."
return title
def optimize_content(self, content: str) -> str:
"""优化正文内容"""
# 分段处理
paragraphs = content.split('\n')
optimized_paragraphs = []
for para in paragraphs:
if not para.strip():
continue
# 添加适当的emoji
para = self.add_emoji_to_paragraph(para)
# 限制段落长度
if len(para) > 200:
para = para[:197] + "..."
optimized_paragraphs.append(para)
# 重新组合
optimized_content = '\n\n'.join(optimized_paragraphs)
# 添加互动引导
if "评论区" not in optimized_content and "留言" not in optimized_content:
interactions = [
"大家有什么问题可以在评论区问我哦~",
"喜欢的话记得点赞收藏呀!",
"你们还有什么想看的主题吗?评论区告诉我!"
]
import random
optimized_content += f"\n\n{random.choice(interactions)}"
return optimized_content
def add_emoji_to_paragraph(self, paragraph: str) -> str:
"""给段落添加合适的emoji"""
emoji_map = {
"推荐": "👍",
"技巧": "💡",
"分享": "📝",
"教程": "🎯",
"测评": "⭐",
"对比": "⚖️",
"清单": "📋",
"总结": "📊",
"注意": "⚠️",
"重要": "❗",
"惊喜": "🎁",
"省钱": "💰",
"变美": "💄",
"健康": "💪",
"学习": "📚",
"旅行": "✈️",
"美食": "🍽️",
"家居": "🏠"
}
for keyword, emoji in emoji_map.items():
if keyword in paragraph and emoji not in paragraph:
# 在关键词后添加emoji
paragraph = paragraph.replace(keyword, f"{keyword}{emoji}", 1)
break
return paragraph
def extract_keywords(self, content: str, top_n: int = 5) -> List[str]:
"""提取关键词"""
# 使用jieba分词
words = jieba.cut(content)
# 过滤停用词和短词
keywords = []
for word in words:
word = word.strip()
if (len(word) >= 2 and
word not in self.stopwords and
not word.isdigit()):
keywords.append(word)
# 统计词频
from collections import Counter
word_freq = Counter(keywords)
# 返回高频词
return [word for word, _ in word_freq.most_common(top_n)]
def generate_hashtags_from_content(self, content: str, count: int = 8) -> List[str]:
"""从内容生成话题标签"""
keywords = self.extract_keywords(content, count * 2)
# 结合热门关键词
all_keywords = list(set(keywords + self.hot_keywords))
# 生成标签
hashtags = []
for keyword in all_keywords[:count]:
hashtag = f"#{keyword}"
if len(hashtag) <= 20: # 小红书标签长度限制
hashtags.append(hashtag)
# 确保有足够数量的标签
while len(hashtags) < count:
hashtags.append("#生活分享")
return hashtags
# 测试内容优化
if __name__ == "__main__":
optimizer = ContentOptimizer()
test_content = """
今天给大家分享几个夏季穿搭的小技巧。
首先选择透气面料,比如棉麻材质。
其次颜色要清爽,白色蓝色都不错。
最后配饰也很重要,草帽和墨镜是必备。
"""
print("原始内容:", test_content)
print("\n优化后内容:", optimizer.optimize_content(test_content))
print("\n关键词:", optimizer.extract_keywords(test_content))
print("\n话题标签:", optimizer.generate_hashtags_from_content(test_content))
🖼️ 第三章:图像生成与处理模块
3.1 使用AI生成配图
# scripts/image_generator.py
import os
import requests
import base64
from PIL import Image, ImageDraw, ImageFont
import io
from typing import Optional, List
class ImageGenerator:
def __init__(self):
# 可以使用多种AI图像生成API
self.apis = {
"deepseek": "https://api.deepseek.com/v1/images/generations",
# 其他API可以在这里添加
}
# 字体路径
self.font_path = self.find_font()
def find_font(self) -> str:
"""查找可用的中文字体"""
font_paths = [
"/System/Library/Fonts/PingFang.ttc", # macOS
"/usr/share/fonts/truetype/wqy/wqy-microhei.ttc", # Linux
"C:/Windows/Fonts/msyh.ttc", # Windows
]
for path in font_paths:
if os.path.exists(path):
return path
return None
def generate_ai_image(self, prompt: str, api: str = "deepseek") -> Optional[Image.Image]:
"""使用AI生成图像"""
if api not in self.apis:
print(f"不支持的API: {api}")
return None
headers = {
"Authorization": f"Bearer {os.getenv('DEEPSEEK_API_KEY')}",
"Content-Type": "application/json"
}
data = {
"model": "dall-e-3", # DeepSeek可能支持图像生成
"prompt": prompt,
"n": 1,
"size": "1024x1024",
"quality": "standard"
}
try:
response = requests.post(
self.apis[api],
headers=headers,
json=data,
timeout=60
)
if response.status_code == 200:
result = response.json()
image_url = result["data"][0]["url"]
# 下载图像
img_response = requests.get(image_url, timeout=30)
img_response.raise_for_status()
image = Image.open(io.BytesIO(img_response.content))
return image
else:
print(f"AI图像生成失败: {response.status_code}")
return None
except Exception as e:
print(f"AI图像生成错误: {e}")
return None
def create_text_image(self, text: str, size: tuple = (800, 600),
bg_color: str = "#FFFFFF") -> Image.Image:
"""创建文字图片(小红书常用风格)"""
# 创建画布
image = Image.new("RGB", size, bg_color)
draw = ImageDraw.Draw(image)
# 加载字体
try:
if self.font_path:
font = ImageFont.truetype(self.font_path, 40)
else:
font = ImageFont.load_default()
except:
font = ImageFont.load_default()
# 计算文字位置
lines = self.wrap_text(text, font, size[0] - 100)
total_height = len(lines) * 50
# 绘制文字
y = (size[1] - total_height) // 2
for line in lines:
bbox = draw.textbbox((0, 0), line, font=font)
text_width = bbox[2] - bbox[0]
x = (size[0] - text_width) // 2
# 添加文字阴影(可选)
draw.text((x+2, y+2), line, font=font, fill="#CCCCCC")
draw.text((x, y), line, font=font, fill="#333333")
y += 50
return image
def wrap_text(self, text: str, font, max_width: int) -> List[str]:
"""文本换行"""
lines = []
words = list(text)
current_line = []
current_width = 0
for word in words:
bbox = font.getbbox(''.join(current_line + [word]))
word_width = bbox[2] - bbox[0]
if current_width + word_width <= max_width:
current_line.append(word)
current_width += word_width
else:
if current_line:
lines.append(''.join(current_line))
current_line = [word]
current_width = word_width
if current_line:
lines.append(''.join(current_line))
return lines
def create_collage(self, images: List[Image.Image], layout: str = "grid") -> Image.Image:
"""创建拼图"""
if not images:
return self.create_text_image("暂无图片")
if layout == "grid":
# 网格布局
cols = min(3, len(images))
rows = (len(images) + cols - 1) // cols
cell_size = 300
collage = Image.new("RGB", (cols * cell_size, rows * cell_size), "#FFFFFF")
for i, img in enumerate(images):
# 调整图片大小
img = img.resize((cell_size, cell_size), Image.Resampling.LANCZOS)
row = i // cols
col = i % cols
collage.paste(img, (col * cell_size, row * cell_size))
return collage
elif layout == "vertical":
# 垂直拼接
widths = [img.width for img in images]
height = sum(img.height for img in images)
width = max(widths)
collage = Image.new("RGB", (width, height), "#FFFFFF")
y = 0
for img in images:
if img.width != width:
img = img.resize((width, int(img.height * width / img.width)),
Image.Resampling.LANCZOS)
collage.paste(img, (0, y))
y += img.height
return collage
return images[0] # 默认返回第一张
def add_watermark(self, image: Image.Image, text: str = "小红书自动发布") -> Image.Image:
"""添加水印"""
draw = ImageDraw.Draw(image)
try:
if self.font_path:
font = ImageFont.truetype(self.font_path, 30)
else:
font = ImageFont.load_default()
except:
font = ImageFont.load_default()
# 在右下角添加水印
bbox = draw.textbbox((0, 0), text, font=font)
text_width = bbox[2] - bbox[0]
text_height = bbox[3] - bbox[1]
x = image.width - text_width - 20
y = image.height - text_height - 20
# 半透明背景
overlay = Image.new("RGBA", image.size, (0, 0, 0, 0))
overlay_draw = ImageDraw.Draw(overlay)
overlay_draw.rectangle([x-10, y-10, x+text_width+10, y+text_height+10],
fill=(255, 255, 255, 180))
image = Image.alpha_composite(image.convert("RGBA"), overlay)
draw = ImageDraw.Draw(image)
draw.text((x, y), text, font=font, fill=(0, 0, 0, 255))
return image.convert("RGB")
def generate_xiaohongshu_images(self, content: str, count: int = 3) -> List[Image.Image]:
"""生成小红书配图"""
images = []
# 1. 生成封面图
cover_prompt = f"小红书风格封面图,内容关于:{content[:50]}...,清新简洁风格"
cover_image = self.generate_ai_image(cover_prompt)
if cover_image:
images.append(cover_image)
# 2. 生成内容图
for i in range(count - 1):
if i == 0:
# 文字图
text_image = self.create_text_image(
content[:100] + "...",
size=(800, 600),
bg_color="#FFF9F0" # 小红书常用背景色
)
images.append(text_image)
else:
# 相关配图
related_prompt = f"小红书配图,{content[:30]}...,生活化场景"
related_image = self.generate_ai_image(related_prompt)
if related_image:
images.append(related_image)
# 确保有足够图片
while len(images) < count:
default_image = self.create_text_image(
"精彩内容分享",
size=(800, 600),
bg_color="#FFE4E1"
)
images.append(default_image)
# 添加水印
images = [self.add_watermark(img) for img in images]
return images
# 测试图像生成
if __name__ == "__main__":
generator = ImageGenerator()
# 测试文字图片
text_img = generator.create_text_image("夏季穿搭技巧分享", size=(800, 400))
text_img.save("test_text_image.jpg")
print("文字图片已保存: test_text_image.jpg")
# 测试拼图
images = [generator.create_text_image(f"图片{i+1}") for i in range(4)]
collage = generator.create_collage(images, "grid")
collage.save("test_collage.jpg")
print("拼图已保存: test_collage.jpg")
3.2 图片优化器
# scripts/image_optimizer.py
from PIL import Image, ImageEnhance, ImageFilter
import numpy as np
class ImageOptimizer:
def __init__(self):
self.xiaohongshu_aspect_ratios = [
(3, 4), # 竖图
(1, 1), # 方图
(4, 3), # 横图
]
def optimize_for_xiaohongshu(self, image: Image.Image,
target_ratio: tuple = (3, 4)) -> Image.Image:
"""优化图片为小红书尺寸"""
# 计算目标尺寸
target_width, target_height = self.calculate_target_size(
image.size, target_ratio
)
# 调整尺寸
optimized = image.resize(
(target_width, target_height),
Image.Resampling.LANCZOS
)
# 增强效果
optimized = self.enhance_image(optimized)
return optimized
def calculate_target_size(self, original_size: tuple, target_ratio: tuple) -> tuple:
"""计算目标尺寸"""
orig_width, orig_height = original_size
target_width, target_height = target_ratio
# 保持原图比例,填充到目标比例
if orig_width / orig_height > target_width / target_height:
# 原图更宽,以高度为准
new_height = orig_height
new_width = int(new_height * target_width / target_height)
# 裁剪宽度
left = (orig_width - new_width) // 2
right = left + new_width
return (new_width, new_height)
else:
# 原图更高,以宽度为准
new_width = orig_width
new_height = int(new_width * target_height / target_width)
# 裁剪高度
top = (orig_height - new_height) // 2
bottom = top + new_height
return (new_width, new_height)
def enhance_image(self, image: Image.Image) -> Image.Image:
"""增强图片效果"""
# 提高亮度
enhancer = ImageEnhance.Brightness(image)
image = enhancer.enhance(1.1)
# 提高对比度
enhancer = ImageEnhance.Contrast(image)
image = enhancer.enhance(1.05)
# 提高饱和度
enhancer = ImageEnhance.Color(image)
image = enhancer.enhance(1.1)
# 轻微锐化
image = image.filter(ImageFilter.SHARPEN)
return image
def add_xiaohongshu_frame(self, image: Image.Image) -> Image.Image:
"""添加小红书风格边框"""
# 创建边框
border_size = 20
frame_color = "#FFFFFF"
new_width = image.width + border_size * 2
new_height = image.height + border_size * 2
framed = Image.new("RGB", (new_width, new_height), frame_color)
framed.paste(image, (border_size, border_size))
return framed
def create_gradient_background(self, size: tuple,
colors: list = ["#FF9A9E", "#FAD0C4"]) -> Image.Image:
"""创建渐变背景"""
width, height = size
# 创建渐变
gradient = Image.new("RGB", (width, height))
for y in range(height):
# 计算渐变颜色
ratio = y / height
r = int(int(colors[0][1:3], 16) * (1 - ratio) +
int(colors[1][1:3], 16) * ratio)
g = int(int(colors[0][3:5], 16) * (1 - ratio) +
int(colors[1][3:5], 16) * ratio)
b = int(int(colors[0][5:7], 16) * (1 - ratio) +
int(colors[1][5:7], 16) * ratio)
color = (r, g, b)
# 绘制水平线
for x in range(width):
gradient.putpixel((x, y), color)
return gradient
📱 第四章:小红书自动化发布模块
4.1 小红书Web自动化
# scripts/xiaohongshu_automator.py
import time
import json
import random
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import TimeoutException, NoSuchElementException
import undetected_chromedriver as uc
class XiaohongshuAutomator:
def __init__(self, headless: bool = False):
self.driver = None
self.wait = None
self.headless = headless
self.is_logged_in = False
def init_driver(self):
"""初始化浏览器驱动"""
options = uc.ChromeOptions()
if self.headless:
options.add_argument("--headless")
# 添加常用参数
options.add_argument("--disable-blink-features=AutomationControlled")
options.add_argument("--disable-dev-shm-usage")
options.add_argument("--no-sandbox")
options.add_argument("--disable-gpu")
options.add_argument("--window-size=1920,1080")
# 添加用户代理
options.add_argument("user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36")
try:
self.driver = uc.Chrome(options=options)
self.wait = WebDriverWait(self.driver, 20)
return True
except Exception as e:
print(f"初始化浏览器失败: {e}")
return False
def login_with_cookies(self, cookies_file: str = "cookies.json"):
"""使用cookies登录"""
try:
# 打开小红书
self.driver.get("https://www.xiaohongshu.com")
time.sleep(3)
# 加载cookies
with open(cookies_file, 'r') as f:
cookies = json.load(f)
for cookie in cookies:
if 'sameSite' in cookie:
del cookie['sameSite']
self.driver.add_cookie(cookie)
# 刷新页面
self.driver.refresh()
time.sleep(3)
# 检查是否登录成功
if self.check_login():
self.is_logged_in = True
print("Cookies登录成功")
return True
else:
print("Cookies登录失败")
return False
except Exception as e:
print(f"Cookies登录失败: {e}")
return False
def check_login(self) -> bool:
"""检查是否已登录"""
try:
# 检查登录元素
self.driver.find_element(By.CSS_SELECTOR, ".login-container")
return False
except:
try:
# 检查用户头像
self.driver.find_element(By.CSS_SELECTOR, ".avatar")
return True
except:
return False
def create_post(self, title: str, content: str, images: list,
hashtags: list, publish: bool = False) -> bool:
"""创建帖子"""
if not self.is_logged_in:
print("未登录,无法创建帖子")
return False
try:
# 打开发布页面
self.driver.get("https://www.xiaohongshu.com/creation")
time.sleep(3)
# 输入标题
title_input = self.wait.until(
EC.presence_of_element_located((By.CSS_SELECTOR, "textarea[placeholder*='标题']"))
)
title_input.clear()
title_input.send_keys(title)
time.sleep(1)
# 输入内容
content_input = self.driver.find_element(
By.CSS_SELECTOR, "div[contenteditable='true']"
)
content_input.clear()
content_input.send_keys(content)
time.sleep(1)
# 添加话题标签
for hashtag in hashtags:
content_input.send_keys(f" {hashtag}")
time.sleep(0.5)
# 上传图片
self.upload_images(images)
time.sleep(3)
# 选择分类
self.select_category()
time.sleep(1)
if publish:
# 发布
publish_btn = self.wait.until(
EC.element_to_be_clickable((By.CSS_SELECTOR, "button:contains('发布')"))
)
publish_btn.click()
time.sleep(5)
print("帖子发布成功")
return True
else:
print("帖子已保存为草稿")
return True
except Exception as e:
print(f"创建帖子失败: {e}")
return False
def upload_images(self, image_paths: list):
"""上传图片"""
try:
# 找到上传按钮
upload_input = self.driver.find_element(
By.CSS_SELECTOR, "input[type='file'][accept*='image']"
)
# 上传多张图片
for image_path in image_paths:
upload_input.send_keys(image_path)
time.sleep(2) # 等待上传完成
print(f"成功上传 {len(image_paths)} 张图片")
except Exception as e:
print(f"上传图片失败: {e}")
def select_category(self, category: str = "生活"):
"""选择分类"""
try:
# 点击分类选择器
category_btn = self.driver.find_element(
By.CSS_SELECTOR, ".category-selector"
)
category_btn.click()
time.sleep(1)
# 选择分类
category_option = self.driver.find_element(
By.XPATH, f"//div[contains(text(), '{category}')]"
)
category_option.click()
time.sleep(1)
更多推荐




所有评论(0)