Python实现TikTok云控自定义任务模块的二次开发与工程化
TK云控作为跨境电商与内容创作者实现TikTok账号矩阵批量运营的核心自动化工具,其核心价值在于通过自定义任务适配多变的平台规则与业务场景。但商用版本往往存在功能固化、风控适配弱、二次开发难度高等问题,难以满足个性化运营需求。本文基于Python语言,结合Playwright异步框架与模块化设计思想,从环境搭建、架构设计、核心模块开发到工程化部署,完整实现TK云控自定义任务模块的二次开发,提供可直接复用的工业级代码方案,兼顾灵活性、稳定性与风控规避能力,助力开发者快速搭建专属自动化运营系统。
一、开发环境搭建与依赖配置
开发环境的稳定性是二次开发的基础,本文采用Python 3.10+版本,搭配Playwright浏览器自动化框架、Pydantic数据验证库、Asyncio异步调度库等核心依赖,确保跨平台兼容性与高并发处理能力。以下是环境搭建与依赖安装的详细代码及配置文件。
# 环境检查与依赖安装脚本(setup_env.py)
import sys
import subprocess
import pkg_resources
from typing import List, Dict
# 定义核心依赖包及版本要求
REQUIRED_PACKAGES: List[Dict[str, str]] = [
{"name": "playwright", "version": ">=1.44.0"},
{"name": "pydantic", "version": ">=2.7.0"},
{"name": "asyncio", "version": ">=3.4.3"},
{"name": "aiohttp", "version": ">=3.9.5"},
{"name": "loguru", "version": ">=0.7.2"},
{"name": "pyyaml", "version": ">=6.0.1"},
{"name": "tenacity", "version": ">=8.4.1"},
{"name": "fake-useragent", "version": ">=1.5.1"}
]
def check_python_version() -> bool:
"""检查Python版本是否符合要求(3.10+)"""
current_version = sys.version_info
if current_version < (3, 10):
print(f"❌ Python版本过低,当前版本:{current_version.major}.{current_version.minor},请升级至3.10及以上")
return False
print(f"✅ Python版本检查通过:{current_version.major}.{current_version.minor}")
return True
def install_missing_packages() -> None:
"""安装缺失的依赖包"""
installed_packages = {pkg.key: pkg.version for pkg in pkg_resources.working_set}
missing_packages = []
for package in REQUIRED_PACKAGES:
pkg_name = package["name"]
pkg_version = package["version"]
if pkg_name not in installed_packages or not pkg_resources.parse_version(installed_packages[pkg_name]) >= pkg_resources.parse_version(pkg_version):
missing_packages.append(f"{pkg_name}{pkg_version}")
if missing_packages:
print(f"📦 发现缺失依赖包,开始安装:{', '.join(missing_packages)}")
subprocess.check_call([sys.executable, "-m", "pip", "install", *missing_packages])
print("✅ 依赖包安装完成")
else:
print("✅ 所有依赖包已满足要求")
def install_playwright_browsers() -> None:
"""安装Playwright所需浏览器驱动"""
try:
subprocess.check_call(["playwright", "install", "chromium"])
print("✅ Playwright浏览器驱动安装完成")
except subprocess.CalledProcessError as e:
print(f"❌ Playwright浏览器驱动安装失败:{str(e)}")
raise SystemExit(1)
if __name__ == "__main__":
if not check_python_version():
raise SystemExit(1)
install_missing_packages()
install_playwright_browsers()
print("🎉 开发环境搭建完成,可开始二次开发")
# 项目依赖配置文件(requirements.txt)
playwright>=1.44.0
pydantic>=2.7.0
asyncio>=3.4.3
aiohttp>=3.9.5
loguru>=0.7.2
pyyaml>=6.0.1
tenacity>=8.4.1
fake-useragent>=1.5.1
执行python setup_env.py即可自动完成环境检查、依赖安装与浏览器驱动配置,确保开发环境一致性,避免因环境差异导致的运行错误。
二、系统整体架构设计与模块划分
基于“高内聚、低耦合”的设计原则,将TK云控自定义任务模块划分为核心调度层、任务执行层、数据管理层、风控防护层、配置中心层五大模块,各模块通过标准化接口通信,支持独立开发、测试与部署,便于后续功能扩展与维护。以下是架构设计代码实现与模块交互逻辑。
# 系统架构核心定义(core/architecture.py)
from enum import Enum
from typing import Any, Optional, Dict
from pydantic import BaseModel, Field
from loguru import logger
# 任务类型枚举
class TaskType(str, Enum):
LIKE = "like" # 点赞任务
FOLLOW = "follow" # 关注任务
COMMENT = "comment" # 评论任务
UPLOAD = "upload" # 视频上传任务
SCROLL = "scroll" # 滑动浏览任务
CUSTOM = "custom" # 自定义任务
# 任务状态枚举
class TaskStatus(str, Enum):
PENDING = "pending" # 待执行
RUNNING = "running" # 执行中
SUCCESS = "success" # 执行成功
FAILED = "failed" # 执行失败
PAUSED = "paused" # 已暂停
# 基础任务模型
class BaseTask(BaseModel):
task_id: str = Field(..., description="任务唯一ID")
task_type: TaskType = Field(..., description="任务类型")
account_id: str = Field(..., description="关联账号ID")
params: Dict[str, Any] = Field(default_factory=dict, description="任务参数")
status: TaskStatus = Field(default=TaskStatus.PENDING, description="任务状态")
retry_count: int = Field(default=3, description="最大重试次数")
current_retry: int = Field(default=0, description="当前重试次数")
create_time: str = Field(..., description="任务创建时间")
update_time: Optional[str] = Field(None, description="任务更新时间")
# 模块通信接口
class ModuleInterface:
@staticmethod
def send_message(module_name: str, message: Dict[str, Any]) -> bool:
"""模块间发送消息"""
try:
logger.info(f"📨 向{module_name}模块发送消息:{message}")
# 实际项目中可替换为MQ、RPC等通信方式
return True
except Exception as e:
logger.error(f"❌ 向{module_name}模块发送消息失败:{str(e)}")
return False
@staticmethod
def receive_message(module_name: str) -> Optional[Dict[str, Any]]:
"""模块间接收消息"""
try:
logger.info(f"📥 从{module_name}模块接收消息")
# 实际项目中可替换为MQ、RPC等通信方式
return {"status": "success", "data": {}}
except Exception as e:
logger.error(f"❌ 从{module_name}模块接收消息失败:{str(e)}")
return None
# 核心调度器类
class CoreScheduler:
def __init__(self):
self.task_queue: Dict[str, BaseTask] = {} # 任务队列
self.running_tasks: Dict[str, BaseTask] = {} # 运行中任务
logger.info("🚀 核心调度器初始化完成")
def add_task(self, task: BaseTask) -> bool:
"""添加任务到队列"""
if task.task_id in self.task_queue:
logger.warning(f"⚠️ 任务{task.task_id}已存在,无需重复添加")
return False
self.task_queue[task.task_id] = task
logger.info(f"✅ 任务{task.task_id}添加至队列成功,当前队列任务数:{len(self.task_queue)}")
return True
def get_task(self, task_id: str) -> Optional[BaseTask]:
"""获取任务信息"""
return self.task_queue.get(task_id) or self.running_tasks.get(task_id)
def update_task_status(self, task_id: str, status: TaskStatus) -> bool:
"""更新任务状态"""
task = self.get_task(task_id)
if not task:
logger.error(f"❌ 任务{task_id}不存在,无法更新状态")
return False
task.status = status
logger.info(f"🔄 任务{task_id}状态更新为:{status}")
return True
架构设计采用分层思想,核心调度器负责任务的接收、分发与状态管理,任务执行层基于Playwright实现具体自动化操作,数据管理层负责账号、任务数据的持久化存储,风控防护层通过拟人化操作、随机延迟、设备指纹模拟等机制规避平台检测,配置中心层统一管理系统参数与任务模板,确保系统可扩展性与可维护性。
三、账号管理模块开发(含登录与状态维护)
账号管理是TK云控的核心基础功能,需支持多账号批量导入、登录状态维护、账号信息加密存储与异常账号检测,避免因账号关联或登录失败导致风控风险。以下是账号管理模块的完整代码实现,包含数据模型、加密存储、自动化登录与状态检测功能。
# 账号管理模块(core/account_manager.py)
import hashlib
import base64
import time
from typing import List, Optional, Dict
from pydantic import BaseModel, Field
from loguru import logger
from playwright.async_api import async_playwright, Page, BrowserContext
from tenacity import retry, stop_after_attempt, wait_random, retry_if_exception_type
# 账号状态枚举
class AccountStatus(str, Enum):
UNLOGIN = "unlogin" # 未登录
LOGIN_SUCCESS = "login_success" # 登录成功
LOGIN_FAILED = "login_failed" # 登录失败
BANNED = "banned" # 账号被封禁
EXPIRED = "expired" # 登录状态过期
# 账号数据模型
class TikTokAccount(BaseModel):
account_id: str = Field(..., description="账号唯一ID")
username: str = Field(..., description="TikTok用户名/手机号")
password: str = Field(..., description="加密后的密码")
proxy: Optional[str] = Field(None, description="代理地址(ip:port)")
status: AccountStatus = Field(default=AccountStatus.UNLOGIN, description="账号状态")
cookie: Optional[str] = Field(None, description="登录Cookie(加密)")
create_time: str = Field(..., description="账号创建时间")
last_login_time: Optional[str] = Field(None, description="最后登录时间")
# 账号管理器类
class AccountManager:
def __init__(self):
self.accounts: Dict[str, TikTokAccount] = {} # 账号存储
self.encryption_key = "tk_cloud_control_2026" # 加密密钥(实际项目中从环境变量读取)
logger.info("👤 账号管理器初始化完成")
def encrypt_password(self, password: str) -> str:
"""密码加密(Base64+MD5)"""
try:
md5_hash = hashlib.md5((password + self.encryption_key).encode()).hexdigest()
encrypted_pwd = base64.b64encode(md5_hash.encode()).decode()
return encrypted_pwd
except Exception as e:
logger.error(f"❌ 密码加密失败:{str(e)}")
raise
def decrypt_password(self, encrypted_pwd: str) -> str:
"""密码解密"""
try:
md5_hash = base64.b64decode(encrypted_pwd).decode()
# 实际项目中可根据加密逻辑反向解密
return md5_hash
except Exception as e:
logger.error(f"❌ 密码解密失败:{str(e)}")
raise
def add_account(self, username: str, password: str, proxy: Optional[str] = None) -> bool:
"""添加账号(自动加密密码)"""
account_id = hashlib.md5(username.encode()).hexdigest()[:16]
if account_id in self.accounts:
logger.warning(f"⚠️ 账号{username}已存在,无需重复添加")
return False
encrypted_pwd = self.encrypt_password(password)
account = TikTokAccount(
account_id=account_id,
username=username,
password=encrypted_pwd,
proxy=proxy,
create_time=time.strftime("%Y-%m-%d %H:%M:%S")
)
self.accounts[account_id] = account
logger.info(f"✅ 账号{username}添加成功,账号ID:{account_id}")
return True
def get_account(self, account_id: str) -> Optional[TikTokAccount]:
"""获取账号信息"""
return self.accounts.get(account_id)
@retry(
stop=stop_after_attempt(3),
wait=wait_random(min=2, max=5),
retry=retry_if_exception_type(Exception)
)
async def login_account(self, account_id: str) -> bool:
"""账号自动化登录(含风控规避)"""
account = self.get_account(account_id)
if not account:
logger.error(f"❌ 账号{account_id}不存在,无法登录")
return False
if account.status == AccountStatus.LOGIN_SUCCESS:
logger.info(f"ℹ️ 账号{account.username}已登录,无需重复操作")
return True
try:
# 初始化Playwright浏览器(配置代理、指纹模拟)
async with async_playwright() as p:
browser = await p.chromium.launch(
headless=False, # 开发环境关闭无头模式,生产环境可开启
proxy={"server": account.proxy} if account.proxy else None,
args=[
"--no-sandbox",
"--disable-blink-features=AutomationControlled", # 禁用自动化检测
"--disable-dev-shm-usage",
"--disable-gpu"
]
)
context: BrowserContext = await browser.new_context(
user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36",
viewport={"width": 1920, "height": 1080},
locale="en-US"
)
page: Page = await context.new_page()
# 导航至TikTok登录页
await page.goto("https://www.tiktok.com/login", timeout=60000)
logger.info(f"🌐 账号{account.username}导航至登录页成功")
# 输入用户名(拟人化输入,随机延迟50-150ms)
await page.click("input[name='username']")
for char in account.username:
await page.keyboard.type(char, delay=__import__("random").randint(50, 150))
logger.info(f"✍️ 账号{account.username}用户名输入完成")
# 输入密码(拟人化输入)
decrypted_pwd = self.decrypt_password(account.password)
await page.click("input[name='password']")
for char in decrypted_pwd:
await page.keyboard.type(char, delay=__import__("random").randint(50, 150))
logger.info(f"✍️ 账号{account.username}密码输入完成")
# 点击登录按钮
await page.click("button[type='submit']")
await page.wait_for_navigation(timeout=30000)
# 验证登录结果(检测首页元素)
if await page.locator("div[data-testid='user-profile']").is_visible():
# 保存登录Cookie(加密)
cookies = await context.cookies()
cookie_str = base64.b64encode(str(cookies).encode()).decode()
account.cookie = cookie_str
account.status = AccountStatus.LOGIN_SUCCESS
account.last_login_time = time.strftime("%Y-%m-%d %H:%M:%S")
logger.success(f"🎉 账号{account.username}登录成功")
await browser.close()
return True
else:
account.status = AccountStatus.LOGIN_FAILED
logger.error(f"❌ 账号{account.username}登录失败,未检测到登录成功标识")
await browser.close()
return False
except Exception as e:
account.status = AccountStatus.LOGIN_FAILED
logger.error(f"❌ 账号{account.username}登录异常:{str(e)}")
return False
async def check_account_status(self, account_id: str) -> AccountStatus:
"""检测账号登录状态是否有效"""
account = self.get_account(account_id)
if not account or not account.cookie:
return AccountStatus.UNLOGIN
try:
async with async_playwright() as p:
browser = await p.chromium.launch(headless=True)
context = await browser.new_context(
cookies=eval(base64.b64decode(account.cookie).decode())
)
page = await context.new_page()
await page.goto("https://www.tiktok.com/", timeout=30000)
if await page.locator("div[data-testid='user-profile']").is_visible():
return AccountStatus.LOGIN_SUCCESS
else:
return AccountStatus.EXPIRED
except Exception as e:
logger.error(f"❌ 账号{account.username}状态检测失败:{str(e)}")
return AccountStatus.EXPIRED
账号管理模块实现了账号的安全存储、自动化登录与状态维护,核心亮点包括:密码采用 Base64+MD5加密存储,避免明文泄露;登录过程模拟真人输入行为,随机延迟规避输入风控;集成Tenacity重试机制,应对网络波动导致的登录失败;支持代理配置,适配多地区账号运营需求,有效降低账号关联风险。
四、自定义任务模板设计与参数化配置
自定义任务模板是实现灵活自动化的核心,需支持用户自定义任务类型、配置任务参数、设置执行规则与风控策略,同时提供模板导入导出功能,便于批量任务创建与复用。以下是任务模板设计与参数化配置的代码实现,包含模板模型、参数校验、模板管理与动态生成功能。
# 自定义任务模板模块(core/task_template.py)
import time
from typing import List, Dict, Any, Optional
from pydantic import BaseModel, Field, validator
from loguru import logger
from core.architecture import TaskType, TaskStatus, BaseTask
# 任务参数模板模型
class TaskParamTemplate(BaseModel):
param_name: str = Field(..., description="参数名称")
param_type: str = Field(..., description="参数类型(str/int/bool/list)")
default_value: Any = Field(None, description="默认参数值")
required: bool = Field(default=False, description="是否必填")
description: str = Field(..., description="参数描述")
# 任务模板模型
class CustomTaskTemplate(BaseModel):
template_id: str = Field(..., description="模板唯一ID")
template_name: str = Field(..., description="模板名称")
task_type: TaskType = Field(..., description="关联任务类型")
params: List[TaskParamTemplate] = Field(default_factory=list, description="任务参数模板")
execute_interval: int = Field(default=60, description="任务执行间隔(秒)")
max_execution: int = Field(default=10, description="最大执行次数")
risk_level: str = Field(default="medium", description="风控等级(low/medium/high)")
create_time: str = Field(..., description="模板创建时间")
update_time: Optional[str] = Field(None, description="模板更新时间")
@validator("risk_level")
def check_risk_level(cls, v):
"""校验风控等级合法性"""
if v not in ["low", "medium", "high"]:
raise ValueError("风控等级必须为low/medium/high")
return v
# 任务模板管理器
class TaskTemplateManager:
def __init__(self):
self.templates: Dict[str, CustomTaskTemplate] = {} # 模板存储
logger.info("📋 任务模板管理器初始化完成")
def create_template(self, template_name: str, task_type: TaskType, params: List[Dict[str, Any]], execute_interval: int = 60, max_execution: int = 10, risk_level: str = "medium") -> Optional[CustomTaskTemplate]:
"""创建自定义任务模板"""
try:
template_id = f"template_{hash(template_name + str(time.time())) % 1000000}"
# 转换参数模板
param_templates = [TaskParamTemplate(**p) for p in params]
template = CustomTaskTemplate(
template_id=template_id,
template_name=template_name,
task_type=task_type,
params=param_templates,
execute_interval=execute_interval,
max_execution=max_execution,
risk_level=risk_level,
create_time=time.strftime("%Y-%m-%d %H:%M:%S")
)
self.templates[template_id] = template
logger.info(f"✅ 任务模板{template_name}创建成功,模板ID:{template_id}")
return template
except Exception as e:
logger.error(f"❌ 任务模板创建失败:{str(e)}")
return None
def get_template(self, template_id: str) -> Optional[CustomTaskTemplate]:
"""获取模板信息"""
return self.templates.get(template_id)
def generate_task_from_template(self, template_id: str, account_id: str, task_params: Dict[str, Any]) -> Optional[BaseTask]:
"""根据模板生成具体任务实例"""
template = self.get_template(template_id)
if not template:
logger.error(f"❌ 模板{template_id}不存在,无法生成任务")
return None
try:
# 校验任务参数完整性
required_params = [p.param_name for p in template.params if p.required]
missing_params = [p for p in required_params if p not in task_params]
if missing_params:
raise ValueError(f"缺少必填参数:{', '.join(missing_params)}")
# 生成任务ID
task_id = f"task_{hash(account_id + template_id + str(time.time())) % 1000000}"
# 创建任务实例
task = BaseTask(
task_id=task_id,
task_type=template.task_type,
account_id=account_id,
params=task_params,
status=TaskStatus.PENDING,
create_time=time.strftime("%Y-%m-%d %H:%M:%S")
)
logger.info(f"✅ 从模板{template_id}生成任务{task_id}成功")
return task
except Exception as e:
logger.error(f"❌ 从模板{template_id}生成任务失败:{str(e)}")
return None
def export_template(self, template_id: str, export_path: str) -> bool:
"""导出模板为JSON文件"""
import json
template = self.get_template(template_id)
if not template:
logger.error(f"❌ 模板{template_id}不存在,无法导出")
return False
try:
with open(export_path, "w", encoding="utf-8") as f:
json.dump(template.dict(), f, ensure_ascii=False, indent=4)
logger.info(f"📤 模板{template_id}导出成功,路径:{export_path}")
return True
except Exception as e:
logger.error(f"❌ 模板{template_id}导出失败:{str(e)}")
return False
def import_template(self, import_path: str) -> Optional[CustomTaskTemplate]:
"""从JSON文件导入模板"""
import json
try:
with open(import_path, "r", encoding="utf-8") as f:
template_data = json.load(f)
template = CustomTaskTemplate(**template_data)
self.templates[template.template_id] = template
logger.info(f"📥 模板{template.template_id}导入成功")
return template
except Exception as e:
logger.error(f"❌ 模板导入失败:{str(e)}")
return None
任务模板模块支持用户灵活定义任务规则,核心功能包括:可视化配置任务参数(必填 / 选填、参数类型、默认值);设置任务执行间隔、最大执行次数与风控等级;支持模板导入导出,便于批量任务复用;参数校验机制确保任务配置合法性,避免无效任务生成。用户可基于模板快速生成点赞、关注、评论、上传等各类自定义任务,大幅提升任务创建效率。
五、核心自动化任务执行逻辑实现(点赞 / 关注 / 评论)
任务执行层是TK云控的核心功能载体,基于Playwright异步框架实现点赞、关注、评论、滑动浏览等核心自动化操作,集成拟人化交互、随机延迟、元素智能等待等机制,确保操作稳定性与风控规避能力。以下是核心任务执行逻辑的完整代码实现,包含原子操作封装、任务执行器、异常处理与结果回调功能。
# 核心任务执行模块(core/task_executor.py)
import random
import time
from typing import Optional, Dict, Any
from loguru import logger
from playwright.async_api import Page, TimeoutError, ElementHandle
from tenacity import retry, stop_after_attempt, wait_random, retry_if_exception_type
from core.architecture import BaseTask, TaskType, TaskStatus
from core.account_manager import AccountManager
from core.risk_control import RiskController
# 原子操作封装类
class TkAtomicOperations:
def __init__(self, page: Page):
self.page = page
self.risk_controller = RiskController()
logger.info("⚙️ 原子操作封装初始化完成")
async def smart_wait(self, selector: str, timeout: int = 10000) -> Optional[ElementHandle]:
"""智能等待元素加载(支持动态渲染元素)"""
try:
element = await self.page.wait_for_selector(selector, timeout=timeout, state="visible")
# 随机等待0.5-1.5秒,模拟真人反应延迟
await self.page.wait_for_timeout(random.randint(500, 1500))
return element
except TimeoutError:
logger.warning(f"⏱️ 元素{selector}等待超时")
return None
except Exception as e:
logger.error(f"❌ 元素{selector}等待异常:{str(e)}")
return None
async def human_click(self, selector: str) -> bool:
"""拟人化点击操作(随机坐标、随机延迟)"""
try:
element = await self.smart_wait(selector)
if not element:
return False
# 获取元素边界框,生成随机点击坐标(元素内)
bounding_box = await element.bounding_box()
if not bounding_box:
await element.click()
return True
x = bounding_box["x"] + random.randint(5, int(bounding_box["width"]) - 5)
y = bounding_box["y"] + random.randint(5, int(bounding_box["height"]) - 5)
# 随机延迟后点击
await self.page.wait_for_timeout(random.randint(200, 800))
await self.page.mouse.click(x, y)
logger.info(f"🖱️ 拟人化点击{selector}成功")
return True
except Exception as e:
logger.error(f"❌ 拟人化点击{selector}失败:{str(e)}")
return False
async def human_input(self, selector: str, content: str) -> bool:
"""拟人化输入操作(逐字符输入、随机延迟)"""
try:
element = await self.smart_wait(selector)
if not element:
return False
await element.click()
await self.page.keyboard.press("Control+A")
await self.page.keyboard.press("Backspace")
# 逐字符输入,随机延迟50-150ms
for char in content:
await self.page.keyboard.type(char, delay=random.randint(50, 150))
# 10%概率额外停顿
if random.random() < 0.1:
await self.page.wait_for_timeout(random.randint(300, 600))
logger.info(f"✍️ 拟人化输入{content}完成")
return True
except Exception as e:
logger.error(f"❌ 拟人化输入失败:{str(e)}")
return False
async def scroll_video(self, direction: str = "up") -> bool:
"""视频滑动操作(上滑/下滑,模拟真人滑动轨迹)"""
try:
viewport = self.page.viewport_size
if not viewport:
return False
start_x = viewport["width"] // 2
start_y = viewport["height"] - 200
end_x = viewport["width"] // 2
end_y = 200 if direction == "up" else viewport["height"] - 200
# 模拟滑动轨迹(分3步滑动,随机延迟)
await self.page.mouse.move(start_x, start_y)
await self.page.wait_for_timeout(random.randint(100, 300))
await self.page.mouse.move(end_x, end_y, steps=random.randint(5, 10))
await self.page.wait_for_timeout(random.randint(200, 500))
await self.page.mouse.up()
logger.info(f"📜 {direction}滑动视频成功")
return True
except Exception as e:
logger.error(f"❌ 滑动视频失败:{str(e)}")
return False
# 任务执行器类
class TaskExecutor:
def __init__(self, account_manager: AccountManager):
self.account_manager = account_manager
logger.info("🚀 任务执行器初始化完成")
@retry(
stop=stop_after_attempt(3),
wait=wait_random(min=3, max=8),
retry=retry_if_exception_type(Exception)
)
async def execute_task(self, task: BaseTask) -> bool:
"""执行单个自定义任务"""
account = self.account_manager.get_account(task.account_id)
if not account or account.status != "login_success":
logger.error(f"❌ 账号{task.account_id}未登录,无法执行任务{task.task_id}")
return False
logger.info(f"▶️ 开始执行任务{task.task_id},任务类型:{task.task_type}")
task.status = TaskStatus.RUNNING
try:
# 初始化Playwright浏览器(复用账号Cookie)
from playwright.async_api import async_playwright
async with async_playwright() as p:
browser = await p.chromium.launch(
headless=True,
proxy={"server": account.proxy} if account.proxy else None,
args=["--no-sandbox", "--disable-blink-features=AutomationControlled"]
)
context = await browser.new_context(
cookies=eval(__import__("base64").b64decode(account.cookie).decode()),
user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) Chrome/124.0.0.0 Safari/537.36"
)
page = await context.new_page()
atomic_ops = TkAtomicOperations(page)
# 根据任务类型执行对应操作
if task.task_type == TaskType.LIKE:
# 点赞任务:导航至视频页 -> 点击点赞按钮
video_url = task.params.get("video_url")
if not video_url:
raise ValueError("点赞任务缺少video_url参数")
await page.goto(video_url, timeout=60000)
like_success = await atomic_ops.human_click("button[data-testid='like-button']")
if like_success:
logger.success(f"❤️ 任务{task.task_id}点赞执行成功")
else:
raise Exception("点赞按钮点击失败")
elif task.task_type == TaskType.FOLLOW:
# 关注任务:导航至用户主页 -> 点击关注按钮
user_url = task.params.get("user_url")
if not user_url:
raise ValueError("关注任务缺少user_url参数")
await page.goto(user_url, timeout=60000)
follow_success = await atomic_ops.human_click("button[data-testid='follow-button']")
if follow_success:
logger.success(f"➕ 任务{task.task_id}关注执行成功")
else:
raise Exception("关注按钮点击失败")
elif task.task_type == TaskType.COMMENT:
# 评论任务:导航至视频页 -> 输入评论 -> 提交评论
video_url = task.params.get("video_url")
comment_content = task.params.get("content")
if not video_url or not comment_content:
raise ValueError("评论任务缺少video_url或content参数")
await page.goto(video_url, timeout=60000)
# 点击评论输入框
await atomic_ops.human_click("textarea[data-testid='comment-input']")
# 输入评论内容
await atomic_ops.human_input("textarea[data-testid='comment-input']", comment_content)
# 点击提交按钮
comment_success = await atomic_ops.human_click("button[data-testid='comment-submit']")
if comment_success:
logger.success(f"💬 任务{task.task_id}评论执行成功:{comment_content}")
else:
raise Exception("评论提交失败")
elif task.task_type == TaskType.SCROLL:
# 滑动浏览任务:导航至首页 -> 循环滑动
scroll_count = task.params.get("count", 5)
await page.goto("https://www.tiktok.com/", timeout=60000)
for i in range(scroll_count):
await atomic_ops.scroll_video(direction="up")
await page.wait_for_timeout(random.randint(1000, 3000))
logger.success(f"📜 任务{task.task_id}滑动浏览执行成功,滑动次数:{scroll_count}")
else:
raise ValueError(f"不支持的任务类型:{task.task_type}")
await browser.close()
task.status = TaskStatus.SUCCESS
task.update_time = time.strftime("%Y-%m-%d %H:%M:%S")
return True
except Exception as e:
task.current_retry += 1
if task.current_retry >= task.retry_count:
task.status = TaskStatus.FAILED
logger.error(f"❌ 任务{task.task_id}执行失败,已达最大重试次数:{str(e)}")
else:
logger.warning(f"⚠️ 任务{task.task_id}执行失败,正在重试({task.current_retry}/{task.retry_count}):{str(e)}")
task.update_time = time.strftime("%Y-%m-%d %H:%M:%S")
return False
核心任务执行逻辑通过原子操作封装实现代码复用,关键特性包括:拟人化交互(随机点击坐标、逐字符输入、随机延迟);智能元素等待适配动态渲染页面;Tenacity重试机制应对临时异常Cookie复用减少登录操作,提升执行效率;模块化设计便于扩展分享、收藏、视频上传等更多任务类型,满足多样化运营需求。
六、风控防护机制集成(防检测、防关联)
风控防护是TK云控长期稳定运行的关键,需从设备指纹模拟、行为特征伪装、请求频率控制、异常行为检测四个维度构建防护体系,规避TikTok平台的自动化检测与账号关联风控。以下是风控防护模块的代码实现,包含设备指纹伪装、行为随机化、频率限制、异常检测与自动风控策略调整功能。
# 风控防护模块(core/risk_control.py)
import random
import time
from typing import Dict, Any, Optional
from loguru import logger
from fake_useragent import UserAgent
# 风控等级枚举
class RiskLevel(str, Enum):
LOW = "low" # 低风控(宽松策略)
MEDIUM = "medium" # 中等风控(默认策略)
HIGH = "high" # 高风控(严格策略)
# 风控配置模型
class RiskControlConfig(BaseModel):
risk_level: RiskLevel = Field(default=RiskLevel.MEDIUM, description="风控等级")
min_delay: int = Field(default=1000, description="最小操作延迟(毫秒)")
max_delay: int = Field(default=3000, description="最大操作延迟(毫秒)")
max_daily_tasks: int = Field(default=50, description="单账号每日最大任务数")
min_task_interval: int = Field(default=60, description="任务最小间隔(秒)")
device_fingerprint_randomize: bool = Field(default=True, description="是否随机化设备指纹")
behavior_randomize: bool = Field(default=True, description="是否随机化行为特征")
# 风控控制器类
class RiskController:
def __init__(self):
self.user_agent = UserAgent()
self.account_task_count: Dict[str, int] = {} # 账号每日任务计数
self.last_task_time: Dict[str, int] = {} # 账号上次任务执行时间
logger.info("🛡️ 风控控制器初始化完成")
def get_random_user_agent(self) -> str:
"""获取随机User-Agent(模拟不同设备)"""
try:
return self.user_agent.random
except:
# 备用User-Agent列表
ua_list = [
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) Chrome/124.0.0.0 Safari/537.36",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36",
"Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:125.0) Gecko/20100101 Firefox/125.0"
]
return random.choice(ua_list)
def generate_random_fingerprint(self) -> Dict[str, Any]:
"""生成随机设备指纹(适配Playwright)"""
return {
"screen": {
"width": random.choice([1366, 1920, 2560]),
"height": random.choice([768, 1080, 1440])
},
"language": random.choice(["en-US", "en-GB", "es-ES", "fr-FR"]),
"timezone": random.choice(["UTC-5", "UTC+0", "UTC+1", "UTC+8"]),
"webgl_vendor": random.choice(["Intel Inc.", "NVIDIA Corporation", "AMD Inc."]),
"canvas_noise": random.random() # Canvas指纹噪声
}
def get_operation_delay(self, risk_level: RiskLevel) -> int:
"""根据风控等级获取随机操作延迟(毫秒)"""
config = self._get_config_by_risk_level(risk_level)
return random.randint(config["min_delay"], config["max_delay"])
def check_task_frequency(self, account_id: str, risk_level: RiskLevel) -> bool:
"""检查任务频率是否超限(时间间隔+每日数量)"""
config = self._get_config_by_risk_level(risk_level)
current_time = int(time.time())
# 检查任务时间间隔
if account_id in self.last_task_time:
time_diff = current_time - self.last_task_time[account_id]
if time_diff < config["min_task_interval"]:
logger.warning(f"⏱️ 账号{account_id}任务间隔过短,需等待{config['min_task_interval'] - time_diff}秒")
return False
# 检查每日任务数量
if account_id not in self.account_task_count:
self.account_task_count[account_id] = 0
if self.account_task_count[account_id] >= config["max_daily_tasks"]:
logger.warning(f"📊 账号{account_id}今日任务数已达上限{config['max_daily_tasks']},请明日再试")
return False
# 更新计数与时间
self.account_task_count[account_id] += 1
self.last_task_time[account_id] = current_time
logger.info(f"✅ 账号{account_id}任务频率检查通过,今日已执行{self.account_task_count[account_id]}个任务")
return True
def detect_abnormal_behavior(self, account_id: str, operation_type: str) -> bool:
"""检测异常行为(短时间内高频相同操作)"""
# 实际项目中可基于操作日志分析行为特征
# 简化版:随机模拟异常检测结果(95%正常,5%异常)
is_normal = random.random() > 0.05
if not is_normal:
logger.warning(f"⚠️ 账号{account_id}检测到{operation_type}异常行为,触发风控保护")
return is_normal
def adjust_risk_strategy(self, account_id: str, failure_count: int) -> RiskLevel:
"""根据任务失败次数动态调整风控策略"""
if failure_count >= 5:
logger.warning(f"🔒 账号{account_id}任务失败次数过多,升级为高风控策略")
return RiskLevel.HIGH
elif failure_count >= 3:
logger.info(f"🔐 账号{account_id}任务失败次数较多,升级为中等风控策略")
return RiskLevel.MEDIUM
else:
return RiskLevel.LOW
def _get_config_by_risk_level(self, risk_level: RiskLevel) -> Dict[str, Any]:
"""根据风控等级获取配置参数"""
configs = {
RiskLevel.LOW: {
"min_delay": 500,
"max_delay": 1500,
"max_daily_tasks": 100,
"min_task_interval": 30
},
RiskLevel.MEDIUM: {
"min_delay": 1000,
"max_delay": 3000,
"max_daily_tasks": 50,
"min_task_interval": 60
},
RiskLevel.HIGH: {
"min_delay": 2000,
"max_delay": 5000,
"max_daily_tasks": 20,
"min_task_interval": 120
}
}
return configs[risk_level]
风控防护模块从多维度构建安全防护体系,核心能力包括:随机化设备指纹(User-Agent、屏幕分辨率、时区、WebGL信息),避免设备关联;行为特征随机化(操作延迟、点击坐标、输入速度),模拟真人行为模式;任务频率控制(时间间隔、每日数量),规避高频操作风控;异常行为检测与策略动态调整,任务失败次数过多时自动升级风控等级,最大程度降低账号封禁风险。
七、工程化部署与日志监控体系
工程化部署是确保TK云控系统稳定运行、便于维护与扩展的关键,需支持多进程并发调度、日志分级存储、任务状态实时监控、异常告警与自动恢复功能,适配生产环境大规模账号矩阵运营需求。以下是工程化部署与日志监控体系的代码实现,包含异步调度器、日志配置、监控告警与进程管理功能。
# 工程化部署与监控模块(core/deployment.py)
import asyncio
import logging
import sys
import time
from typing import List, Dict, Optional
from loguru import logger
from core.architecture import CoreScheduler, BaseTask, TaskStatus
from core.account_manager import AccountManager
from core.task_executor import TaskExecutor
from core.risk_control import RiskController
# 日志配置(分级存储:info/warning/error)
def configure_logger():
"""配置Loguru日志系统"""
# 移除默认处理器
logger.remove()
# 添加控制台处理器(INFO及以上)
logger.add(
sys.stdout,
level="INFO",
format="<green>{time:YYYY-MM-DD HH:mm:ss}</green> | <level>{level: <8}</level> | <cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> - <level>{message}</level>",
enqueue=True
)
# 添加文件处理器(INFO日志,按天轮转)
logger.add(
"logs/tk_cloud_control_info_{time:YYYY-MM-DD}.log",
level="INFO",
rotation="1 day",
retention="30 days",
encoding="utf-8",
enqueue=True
)
# 添加文件处理器(WARNING日志)
logger.add(
"logs/tk_cloud_control_warning_{time:YYYY-MM-DD}.log",
level="WARNING",
rotation="1 day",
retention="30 days",
encoding="utf-8",
enqueue=True
)
# 添加文件处理器(ERROR日志)
logger.add(
"logs/tk_cloud_control_error_{time:YYYY-MM-DD}.log",
level="ERROR",
rotation="1 day",
retention="90 days",
encoding="utf-8",
enqueue=True
)
logger.info("📝 日志系统配置完成")
# 异步任务调度器(支持多进程并发)
class AsyncTaskScheduler:
def __init__(self, max_workers: int = 5):
self.core_scheduler = CoreScheduler()
self.account_manager = AccountManager()
self.task_executor = TaskExecutor(self.account_manager)
self.risk_controller = RiskController()
self.max_workers = max_workers # 最大并发任务数
self.running = False
logger.info(f"🚀 异步任务调度器初始化完成,最大并发数:{max_workers}")
async def start(self):
"""启动调度器"""
self.running = True
logger.info("▶️ 异步任务调度器开始运行")
while self.running:
await self._dispatch_tasks()
await asyncio.sleep(1) # 每秒轮询一次任务队列
async def stop(self):
"""停止调度器"""
self.running = False
logger.info("⏹️ 异步任务调度器已停止")
async def _dispatch_tasks(self):
"""分发待执行任务"""
# 获取待执行任务
pending_tasks = [
task for task in self.core_scheduler.task_queue.values()
if task.status == TaskStatus.PENDING
]
if not pending_tasks:
return
# 控制并发数
running_count = len(self.core_scheduler.running_tasks)
if running_count >= self.max_workers:
return
# 分发任务至执行器
for task in pending_tasks[:self.max_workers - running_count]:
# 风控检查
if not self.risk_controller.check_task_frequency(task.account_id, task.risk_level):
continue
# 加入运行中任务列表
self.core_scheduler.running_tasks[task.task_id] = task
# 异步执行任务
asyncio.create_task(self._run_single_task(task))
async def _run_single_task(self, task: BaseTask):
"""执行单个任务并更新状态"""
try:
success = await self.task_executor.execute_task(task)
if success:
self.core_scheduler.update_task_status(task.task_id, TaskStatus.SUCCESS)
else:
self.core_scheduler.update_task_status(task.task_id, TaskStatus.FAILED)
except Exception as e:
logger.error(f"❌ 任务{task.task_id}执行异常:{str(e)}")
self.core_scheduler.update_task_status(task.task_id, TaskStatus.FAILED)
finally:
# 从运行中任务列表移除
if task.task_id in self.core_scheduler.running_tasks:
del self.core_scheduler.running_tasks[task.task_id]
# 监控告警系统
class MonitorSystem:
def __init__(self, scheduler: AsyncTaskScheduler):
self.scheduler = scheduler
self.alert_threshold = 10 # 连续失败10次触发告警
self.continuous_failure_count = 0
logger.info("📊 监控告警系统初始化完成")
async def start_monitor(self):
"""启动监控循环"""
logger.info("▶️ 监控告警系统开始运行")
while self.scheduler.running:
await self._check_system_status()
await asyncio.sleep(5) # 每5秒监控一次
async def _check_system_status(self):
"""检查系统运行状态"""
# 统计任务执行结果
success_count = 0
failed_count = 0
for task in self.scheduler.core_scheduler.task_queue.values():
if task.status == TaskStatus.SUCCESS:
success_count += 1
elif task.status == TaskStatus.FAILED:
failed_count += 1
# 连续失败计数
if failed_count > 0:
self.continuous_failure_count += 1
else:
self.continuous_failure_count = 0
# 触发告警
if self.continuous_failure_count >= self.alert_threshold:
self._send_alert()
self.continuous_failure_count = 0
logger.info(f"📈 系统状态:待执行任务{len(self.scheduler.core_scheduler.task_queue)}个,运行中任务{len(self.scheduler.core_scheduler.running_tasks)}个,今日成功{success_count}个,失败{failed_count}个")
def _send_alert(self):
"""发送告警通知(邮件/企业微信/短信,此处简化为日志告警)"""
logger.critical("🚨 系统告警:连续任务失败次数超限,请立即检查系统状态与账号安全!")
# 实际项目中可集成邮件、企业微信、短信等告警渠道
# 主程序入口
async def main():
# 初始化日志
configure_logger()
# 创建调度器(最大并发5个任务)
scheduler = AsyncTaskScheduler(max_workers=5)
# 创建监控系统
monitor = MonitorSystem(scheduler)
# 启动调度器与监控系统
await asyncio.gather(
scheduler.start(),
monitor.start_monitor()
)
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
logger.info("👋 系统接收到停止信号,正在退出...")
except Exception as e:
logger.critical(f"❌ 系统运行异常:{str(e)}")
工程化部署模块实现了系统的工业化运行能力,核心特性包括:分级日志存储(按天轮转、分级保留),便于问题排查与审计;异步调度器支持多进程并发,提升任务执行效率;监控告警系统实时监控任务执行状态,连续失败次数超限自动触发告警;模块化设计支持水平扩展,可通过增加服务器节点提升系统并发能力,适配大规模账号矩阵运营场景。
以上基于Python的TK云控自定义任务模块二次开发方案,从环境搭建、架构设计、核心模块开发到工程化部署,提供了完整的工业级代码实现。通过模块化、异步化、风控防护与工程化设计,解决了商用云控系统功能固化、风控弱、维护难的痛点,实现了自定义任务的灵活配置、稳定执行与安全防护。开发者可基于此方案快速搭建专属TikTok自动化运营系统,后续可进一步扩展视频上传、数据采集、AI内容生成等功能,适配更多跨境运营场景需求。
更多推荐
所有评论(0)