本文将演示如何使用 Python 调用 OpenAI Sora 2 API,实现自媒体视频的批量生成。通过简单的代码,你可以快速将创意文案转化为高质量的 AI 视频内容。

功能概述

在自媒体运营中,视频内容的制作往往耗时耗力。本文将展示如何利用 Sora 2 API 实现:

  • 批量生成视频:一次性提交多个创意文案,自动生成对应视频
  • 任务状态追踪:实时监控视频生成进度
  • 结果自动保存:将生成的视频链接保存到本地,方便后续使用

最终效果:输入一组视频创意文案,程序自动调用 API 生成视频,并将结果保存为 JSON 文件。整个过程无需人工干预,适合批量生产自媒体内容。

注意,网上非官网API多为带SORA水印的版本,当前版本不带任何水印。

准备工作

环境要求

  • Python 3.7 或更高版本
  • 可访问互联网的开发环境

获取 API 密钥

  1. 访问 DefAPI 官网 注册账号
  2. 在用户中心创建 API Key
  3. 重要提示:请妥善保管 API Key,不要在代码中硬编码或上传到公开仓库

建议使用环境变量管理 API Key:

# Linux/macOS
export SORA_API_KEY="your-api-key-here"

# Windows CMD
set SORA_API_KEY=your-api-key-here

# Windows PowerShell
$env:SORA_API_KEY="your-api-key-here"

安装依赖

本项目只需要 Python 标准库中的 requests 模块(如果未安装):

pip install requests

API 接口说明

1. 视频生成接口

接口地址POST https://api.defapi.org/api/sora2/gen

认证方式:在请求头中添加 Authorization: Bearer YOUR_API_KEY

核心请求参数

{
  "prompt": "视频描述文案",
  "images": ["https://example.com/reference.jpg"],  // 可选,最多1张参考图
  "callback_url": "https://example.com/callback"    // 可选,任务完成后回调
}

响应数据

{
  "code": 0,
  "message": "ok",
  "data": {
    "task_id": "ta12345678-1234-1234-1234-123456789abc"
  }
}

2. 任务查询接口

接口地址GET https://api.defapi.org/api/task/query?task_id=xxx

任务状态说明

  • pending:等待处理
  • submitted:已提交
  • in_progress:生成中
  • success:生成成功
  • failed:生成失败

成功响应示例

{
  "code": 0,
  "message": "ok",
  "data": {
    "task_id": "ta823dfb-eaac-44fd-aec2-3e2c7ba8e071",
    "status": "success",
    "result": {
      "video": "https://example.com/generated-video.mp4"
    },
    "consumed": "0.00100000",
    "created_at": "2025-08-03T10:22:20.010Z"
  }
}

代码实现

步骤 1:导入必要的库

import os
import time
import json
import requests
from typing import List, Dict, Optional

步骤 2:配置 API 参数

# 从环境变量读取 API Key(推荐做法,避免硬编码)
API_KEY = os.getenv("SORA_API_KEY")
if not API_KEY:
    raise ValueError("请设置环境变量 SORA_API_KEY")

# API 基础配置
BASE_URL = "https://api.defapi.org"
HEADERS = {
    "Authorization": f"Bearer {API_KEY}",
    "Content-Type": "application/json"
}

步骤 3:封装核心功能函数

def generate_video(prompt: str, images: Optional[List[str]] = None) -> Optional[str]:
    """
    提交视频生成任务
    
    Args:
        prompt: 视频描述文案
        images: 参考图片URL列表(最多1张)
    
    Returns:
        task_id: 任务ID,用于后续查询
    """
    url = f"{BASE_URL}/api/sora2/gen"
    
    payload = {"prompt": prompt}
    if images:
        payload["images"] = images[:1]  # 确保最多1张图片
    
    try:
        response = requests.post(url, headers=HEADERS, json=payload, timeout=30)
        response.raise_for_status()
        
        result = response.json()
        if result.get("code") == 0:
            task_id = result["data"]["task_id"]
            print(f"✓ 任务已提交: {prompt[:30]}... [Task ID: {task_id}]")
            return task_id
        else:
            print(f"✗ 提交失败: {result.get('message')}")
            return None
            
    except requests.exceptions.RequestException as e:
        print(f"✗ 请求异常: {e}")
        return None


def query_task(task_id: str) -> Dict:
    """
    查询任务状态和结果
    
    Args:
        task_id: 任务ID
    
    Returns:
        任务详情字典
    """
    url = f"{BASE_URL}/api/task/query"
    params = {"task_id": task_id}
    
    try:
        response = requests.get(url, headers=HEADERS, params=params, timeout=30)
        response.raise_for_status()
        
        result = response.json()
        if result.get("code") == 0:
            return result["data"]
        else:
            print(f"✗ 查询失败: {result.get('message')}")
            return {}
            
    except requests.exceptions.RequestException as e:
        print(f"✗ 查询异常: {e}")
        return {}


def wait_for_task_completion(task_id: str, max_wait_time: int = 600) -> Dict:
    """
    等待任务完成(轮询查询)
    
    Args:
        task_id: 任务ID
        max_wait_time: 最大等待时间(秒),默认10分钟
    
    Returns:
        最终任务结果
    """
    start_time = time.time()
    interval = 5  # 每5秒查询一次
    
    while True:
        elapsed = time.time() - start_time
        if elapsed > max_wait_time:
            print(f"✗ 任务超时 [{task_id}]")
            return {"status": "timeout", "task_id": task_id}
        
        task_info = query_task(task_id)
        if not task_info:
            time.sleep(interval)
            continue
        
        status = task_info.get("status")
        
        if status == "success":
            video_url = task_info.get("result", {}).get("video")
            print(f"✓ 视频生成成功 [{task_id}]")
            print(f"  视频地址: {video_url}")
            return task_info
        
        elif status == "failed":
            reason = task_info.get("status_reason", {}).get("message", "未知错误")
            print(f"✗ 视频生成失败 [{task_id}]: {reason}")
            return task_info
        
        elif status in ["pending", "submitted", "in_progress"]:
            print(f"⏳ 生成中... [{task_id}] ({int(elapsed)}s)")
            time.sleep(interval)
        
        else:
            print(f"? 未知状态: {status}")
            time.sleep(interval)

步骤 4:实现批量生成功能

def batch_generate_videos(prompts: List[str], save_file: str = "video_results.json"):
    """
    批量生成视频并保存结果
    
    Args:
        prompts: 视频文案列表
        save_file: 结果保存文件名
    """
    print(f"\n{'='*60}")
    print(f"开始批量生成 {len(prompts)} 个视频")
    print(f"{'='*60}\n")
    
    # 第一步:提交所有任务
    tasks = []
    for idx, prompt in enumerate(prompts, 1):
        print(f"[{idx}/{len(prompts)}] 提交任务...")
        task_id = generate_video(prompt)
        if task_id:
            tasks.append({"task_id": task_id, "prompt": prompt})
        time.sleep(1)  # 避免请求过快
    
    print(f"\n已成功提交 {len(tasks)} 个任务\n")
    
    # 第二步:等待所有任务完成
    results = []
    for idx, task in enumerate(tasks, 1):
        print(f"\n[{idx}/{len(tasks)}] 等待任务完成...")
        print(f"文案: {task['prompt'][:50]}...")
        
        task_result = wait_for_task_completion(task["task_id"])
        results.append({
            "prompt": task["prompt"],
            "task_id": task["task_id"],
            "status": task_result.get("status"),
            "video_url": task_result.get("result", {}).get("video"),
            "consumed": task_result.get("consumed"),
            "created_at": task_result.get("created_at")
        })
    
    # 第三步:保存结果
    with open(save_file, "w", encoding="utf-8") as f:
        json.dump(results, f, ensure_ascii=False, indent=2)
    
    # 统计结果
    success_count = sum(1 for r in results if r["status"] == "success")
    print(f"\n{'='*60}")
    print(f"批量生成完成!成功: {success_count}/{len(results)}")
    print(f"结果已保存到: {save_file}")
    print(f"{'='*60}\n")
    
    return results

完整示例代码

#!/usr/bin/env python3
"""
Sora 2 API 批量视频生成工具
用于批量生成自媒体视频内容
"""

import os
import time
import json
import requests
from typing import List, Dict, Optional

# ==================== 配置部分 ====================

# 从环境变量读取 API Key(推荐做法,避免硬编码)
API_KEY = os.getenv("SORA_API_KEY")
if not API_KEY:
    raise ValueError("请设置环境变量 SORA_API_KEY")

BASE_URL = "https://api.defapi.org"
HEADERS = {
    "Authorization": f"Bearer {API_KEY}",
    "Content-Type": "application/json"
}

# ==================== 核心功能函数 ====================

def generate_video(prompt: str, images: Optional[List[str]] = None) -> Optional[str]:
    """提交视频生成任务"""
    url = f"{BASE_URL}/api/sora2/gen"
    payload = {"prompt": prompt}
    if images:
        payload["images"] = images[:1]
    
    try:
        response = requests.post(url, headers=HEADERS, json=payload, timeout=30)
        response.raise_for_status()
        result = response.json()
        
        if result.get("code") == 0:
            task_id = result["data"]["task_id"]
            print(f"✓ 任务已提交: {prompt[:30]}... [Task ID: {task_id}]")
            return task_id
        else:
            print(f"✗ 提交失败: {result.get('message')}")
            return None
    except requests.exceptions.RequestException as e:
        print(f"✗ 请求异常: {e}")
        return None


def query_task(task_id: str) -> Dict:
    """查询任务状态和结果"""
    url = f"{BASE_URL}/api/task/query"
    params = {"task_id": task_id}
    
    try:
        response = requests.get(url, headers=HEADERS, params=params, timeout=30)
        response.raise_for_status()
        result = response.json()
        
        if result.get("code") == 0:
            return result["data"]
        else:
            print(f"✗ 查询失败: {result.get('message')}")
            return {}
    except requests.exceptions.RequestException as e:
        print(f"✗ 查询异常: {e}")
        return {}


def wait_for_task_completion(task_id: str, max_wait_time: int = 600) -> Dict:
    """等待任务完成(轮询查询)"""
    start_time = time.time()
    interval = 5
    
    while True:
        elapsed = time.time() - start_time
        if elapsed > max_wait_time:
            print(f"✗ 任务超时 [{task_id}]")
            return {"status": "timeout", "task_id": task_id}
        
        task_info = query_task(task_id)
        if not task_info:
            time.sleep(interval)
            continue
        
        status = task_info.get("status")
        
        if status == "success":
            video_url = task_info.get("result", {}).get("video")
            print(f"✓ 视频生成成功 [{task_id}]")
            print(f"  视频地址: {video_url}")
            return task_info
        elif status == "failed":
            reason = task_info.get("status_reason", {}).get("message", "未知错误")
            print(f"✗ 视频生成失败 [{task_id}]: {reason}")
            return task_info
        elif status in ["pending", "submitted", "in_progress"]:
            print(f"⏳ 生成中... [{task_id}] ({int(elapsed)}s)")
            time.sleep(interval)
        else:
            print(f"? 未知状态: {status}")
            time.sleep(interval)


def batch_generate_videos(prompts: List[str], save_file: str = "video_results.json"):
    """批量生成视频并保存结果"""
    print(f"\n{'='*60}")
    print(f"开始批量生成 {len(prompts)} 个视频")
    print(f"{'='*60}\n")
    
    # 提交所有任务
    tasks = []
    for idx, prompt in enumerate(prompts, 1):
        print(f"[{idx}/{len(prompts)}] 提交任务...")
        task_id = generate_video(prompt)
        if task_id:
            tasks.append({"task_id": task_id, "prompt": prompt})
        time.sleep(1)
    
    print(f"\n已成功提交 {len(tasks)} 个任务\n")
    
    # 等待所有任务完成
    results = []
    for idx, task in enumerate(tasks, 1):
        print(f"\n[{idx}/{len(tasks)}] 等待任务完成...")
        print(f"文案: {task['prompt'][:50]}...")
        
        task_result = wait_for_task_completion(task["task_id"])
        results.append({
            "prompt": task["prompt"],
            "task_id": task["task_id"],
            "status": task_result.get("status"),
            "video_url": task_result.get("result", {}).get("video"),
            "consumed": task_result.get("consumed"),
            "created_at": task_result.get("created_at")
        })
    
    # 保存结果
    with open(save_file, "w", encoding="utf-8") as f:
        json.dump(results, f, ensure_ascii=False, indent=2)
    
    success_count = sum(1 for r in results if r["status"] == "success")
    print(f"\n{'='*60}")
    print(f"批量生成完成!成功: {success_count}/{len(results)}")
    print(f"结果已保存到: {save_file}")
    print(f"{'='*60}\n")
    
    return results


# ==================== 使用示例 ====================

if __name__ == "__main__":
    # 定义要生成的视频文案列表
    video_prompts = [
        "一位年轻的程序员在咖啡厅专注地敲代码,窗外是城市的夜景,温暖的灯光营造出舒适的氛围",
        "美食博主正在制作一道精美的甜品,镜头特写展示巧克力酱缓缓流下的瞬间",
        "清晨的公园里,一位老人在打太极拳,阳光透过树叶洒下斑驳的光影",
        "科技感十足的办公室内,团队成员围坐在一起进行头脑风暴,白板上写满了创意想法",
        "夕阳下的海滩,一对恋人牵手漫步在沙滩上,海浪轻轻拍打着岸边"
    ]
    
    # 批量生成视频
    results = batch_generate_videos(video_prompts, "my_videos.json")
    
    # 打印成功的视频链接
    print("\n成功生成的视频链接:")
    for idx, result in enumerate(results, 1):
        if result["status"] == "success" and result["video_url"]:
            print(f"{idx}. {result['video_url']}")
            
            print(f"   文案: {result['prompt'][:40]}...")
            print(f"   消费: {result['consumed']} 积分\n")
Logo

助力合肥开发者学习交流的技术社区,不定期举办线上线下活动,欢迎大家的加入

更多推荐