引言

视频文件动辄几百兆甚至几个GB的体量,给存储、传输和分享带来了巨大挑战。Python凭借其丰富的生态和FFmpeg这一强大的底层工具,为视频压缩和处理提供了灵活高效的解决方案。本文将系统介绍使用Python进行视频压缩的各种方案,涵盖从入门级到专业级的完整技术路径。

一、核心技术基础:FFmpeg

几乎所有Python视频处理库的底层都依赖同一个强大的工具——FFmpeg。它是一个开源的跨平台多媒体框架,支持几乎所有的视频、音频格式,并提供了先进的编解码器如H.264和H.265。

1.1 FFmpeg安装

在使用任何Python视频处理库之前,需要先安装FFmpeg:

bash

# macOS
brew install ffmpeg

# Ubuntu/Debian
sudo apt update && sudo apt install ffmpeg

# Windows (使用Scoop)
scoop install ffmpeg

# 或直接从官网下载并添加到PATH环境变量

1.2 理解视频编码的核心概念

在深入代码之前,需要明确一个重要认知:MP4、AVI、MOV只是容器格式,真正决定压缩效果的是内部的视频编码算法

编码器 压缩效率 兼容性 编码速度 适用场景
H.264 (libx264) 基准 极佳(所有设备) 通用首选
H.265 (libx265) 高50% 较新设备 高压缩率需求
VP9 Web浏览器 Web视频

H.265的压缩效率显著优于H.264——在相同画质下,文件体积可减小约50%,但编码时间会更长,且对旧设备的兼容性较差。

二、方案一:MoviePy(最易上手)

MoviePy是一个专为视频编辑设计的Python模块,API设计简洁直观,基本操作通常一行代码即可完成。

2.1 安装

bash

pip install moviepy
# MoviePy依赖ImageMagick用于文本效果(可选)
# macOS: brew install imagemagick

2.2 基础压缩

python

from moviepy.editor import VideoFileClip

def compress_video_moviepy(input_path: str, output_path: str, bitrate: str = "500k"):
    """
    使用MoviePy压缩视频
    
    Args:
        input_path: 输入视频路径
        output_path: 输出视频路径
        bitrate: 视频比特率,如 "500k", "1000k", "2M"
    """
    # 加载视频
    clip = VideoFileClip(input_path)
    
    # 压缩并保存
    clip.write_videofile(
        output_path,
        bitrate=bitrate,           # 视频码率
        audio_bitrate="128k",      # 音频码率
        preset="medium",           # 编码速度:ultrafast/fast/medium/slow/veryslow
        threads=4                  # 并行线程数
    )
    
    # 关闭释放资源
    clip.close()

# 使用示例
compress_video_moviepy("input.mp4", "output_moviepy.mp4", bitrate="800k")

2.3 高级压缩选项

python

def advanced_compress_moviepy(input_path: str, output_path: str, 
                               scale_factor: float = 0.5, quality: str = "medium"):
    """
    高级压缩:同时调整分辨率和编码参数
    """
    clip = VideoFileClip(input_path)
    
    # 缩放分辨率(scale_factor=0.5 表示宽高各减半,面积减少75%)
    if scale_factor < 1.0:
        new_width = int(clip.w * scale_factor)
        new_height = int(clip.h * scale_factor)
        clip = clip.resize(newsize=(new_width, new_height))
    
    # 编码预设映射
    presets = {
        "fast": "ultrafast",
        "medium": "medium", 
        "high": "veryslow"
    }
    
    clip.write_videofile(
        output_path,
        codec="libx264",           # 使用H.264编码器
        audio_codec="aac",
        bitrate="1000k",
        preset=presets.get(quality, "medium"),
        ffmpeg_params=["-crf", "23"]  # CRF: 0-51,数值越大压缩越狠,18-28为常用范围
    )
    
    clip.close()

三、方案二:vidpack(命令行工具)

vidpack是一个专门为视频压缩设计的Python CLI工具,封装了FFmpeg的复杂参数,使用极其简单。

3.1 安装与基本使用

bash

pip install -U vidpack

bash

# 最简单的用法:使用默认设置压缩单个视频
pack video.mp4

# 指定输出文件
pack video.mp4 --output compressed/small_video.mp4

# 调整质量(0-100,默认75)
pack video.mp4 -q 60

# 使用H.265获得更高压缩率
pack video.mp4 --codec libx265

# 压缩整个目录
pack /path/to/my/videos

# 组合使用:质量80 + H.265 + 覆盖原文件 + 删除原始文件
pack video.mp4 -q 80 --codec libx265 --overwrite --delete-original -v

3.2 在Python中调用

python

import subprocess

def compress_with_vidpack(input_path: str, quality: int = 75, use_h265: bool = False):
    """通过subprocess调用vidpack"""
    cmd = ["pack", input_path, "-q", str(quality)]
    
    if use_h265:
        cmd.extend(["--codec", "libx265"])
    
    result = subprocess.run(cmd, capture_output=True, text=True)
    
    if result.returncode == 0:
        print(f"压缩成功: {input_path}")
    else:
        print(f"压缩失败: {result.stderr}")
    
    return result

# 使用示例
compress_with_vidpack("video.mp4", quality=70, use_h265=True)

四、方案三:ffmpeg-python(最灵活)

对于需要精细控制每个参数的专业场景,ffmpeg-python提供了对FFmpeg的完整Python绑定。

4.1 安装

bash

pip install ffmpeg-python

4.2 基础压缩

python

import ffmpeg

def compress_video_ffmpeg(input_path: str, output_path: str, target_size_mb: int = 50):
    """
    根据目标文件大小自动计算码率进行压缩
    """
    # 获取输入视频信息
    probe = ffmpeg.probe(input_path)
    
    # 提取时长(秒)
    duration = float(probe['format']['duration'])
    
    # 提取音频比特率
    audio_stream = next((s for s in probe['streams'] if s['codec_type'] == 'audio'), None)
    audio_bitrate = float(audio_stream.get('bit_rate', 128000)) if audio_stream else 128000
    
    # 计算目标总比特率(kbps)
    # 公式: (目标大小MB * 8 * 1024) / 时长(秒) = 总比特率 kbps
    target_total_bitrate = (target_size_mb * 8 * 1024) / (1.073741824 * duration)
    
    # 视频比特率 = 总比特率 - 音频比特率(kbps)
    video_bitrate = max(target_total_bitrate - audio_bitrate / 1000, 100)  # 最低100kbps
    
    print(f"视频时长: {duration:.2f}秒")
    print(f"目标大小: {target_size_mb}MB")
    print(f"计算视频码率: {video_bitrate:.0f}kbps")
    
    # 执行压缩
    stream = ffmpeg.input(input_path)
    stream = ffmpeg.output(
        stream, output_path,
        video_bitrate=f"{video_bitrate:.0f}k",
        audio_bitrate=f"{audio_bitrate/1000:.0f}k",
        codec='libx264',
        preset='medium',
        crf=None  # 使用目标码率模式而非CRF
    )
    
    ffmpeg.run(stream, overwrite_output=True)
    print(f"压缩完成: {output_path}")

# 使用示例
compress_video_ffmpeg("input.mp4", "output_50mb.mp4", target_size_mb=50)

4.3 CRF模式压缩(恒定质量)

python

def compress_with_crf(input_path: str, output_path: str, crf_value: int = 23):
    """
    使用CRF模式压缩 - 保持恒定视觉质量
    
    Args:
        crf_value: 0-51,数值越小质量越高(文件越大)
                  18: 视觉无损
                  23: 默认,良好平衡
                  28: 高压缩,质量可接受
    """
    stream = ffmpeg.input(input_path)
    stream = ffmpeg.output(
        stream, output_path,
        vcodec='libx264',
        crf=crf_value,
        preset='medium',
        acodec='aac',
        audio_bitrate='128k'
    )
    
    ffmpeg.run(stream, overwrite_output=True)
    print(f"CRF={crf_value}压缩完成")

# CRF值对文件大小的影响示例
for crf in [18, 23, 28, 33]:
    compress_with_crf("input.mp4", f"output_crf{crf}.mp4", crf)

4.4 分辨率自适应压缩

python

def adaptive_compress(input_path: str, output_path: str, 
                      max_width: int = 1280, max_height: int = 720):
    """
    自适应压缩:确保视频不超过指定分辨率,同时控制码率
    """
    probe = ffmpeg.probe(input_path)
    video_stream = next(s for s in probe['streams'] if s['codec_type'] == 'video')
    
    original_width = int(video_stream['width'])
    original_height = int(video_stream['height'])
    
    # 计算缩放尺寸(保持宽高比)
    scale = min(max_width / original_width, max_height / original_height, 1.0)
    
    if scale < 1.0:
        new_width = int(original_width * scale / 2) * 2  # 确保偶数
        new_height = int(original_height * scale / 2) * 2
        scale_filter = f'scale={new_width}:{new_height}'
        print(f"缩分辨率: {original_width}x{original_height} -> {new_width}x{new_height}")
    else:
        scale_filter = 'scale=iw:ih'  # 不缩放
        print("保持原始分辨率")
    
    # 根据分辨率动态调整码率
    if max_width <= 854:  # 480p
        video_bitrate = "500k"
    elif max_width <= 1280:  # 720p
        video_bitrate = "1000k"
    else:  # 1080p+
        video_bitrate = "2000k"
    
    stream = ffmpeg.input(input_path)
    stream = ffmpeg.output(
        stream, output_path,
        vf=scale_filter,
        vcodec='libx264',
        video_bitrate=video_bitrate,
        preset='fast',
        acodec='aac',
        audio_bitrate='96k'
    )
    
    ffmpeg.run(stream, overwrite_output=True)
    print(f"自适应压缩完成,输出大小: {output_path}")

五、方案四:批量处理工具库

5.1 使用video_compressing库

video_compressing是一个专注于批量压缩和合并的视频处理库。

bash

# 安装
git clone https://github.com/Gabriel-melki/video-compressing.git
cd video-compressing
poetry install

python

from video_compressing.tools import reduce_and_merge_videos

# 批量压缩并合并
def batch_compress_and_merge(video_list: list, output_path: str, reduction: float = 0.5):
    """
    压缩多个视频并合并为一个
    
    Args:
        video_list: 视频文件路径列表
        output_path: 输出文件路径
        reduction: 压缩因子(0.5 = 减少50%体积)
    """
    reduce_and_merge_videos(
        input_files=video_list,
        reduction_factor=reduction,
        output_file=output_path
    )
    print(f"已压缩 {len(video_list)} 个视频并合并到 {output_path}")

# 使用示例
batch_compress_and_merge(
    ["video1.mp4", "video2.mp4", "video3.mp4"],
    "merged_compressed.mp4",
    reduction=0.4
)

5.2 自定义批量处理脚本

python

import os
import glob
from pathlib import Path
from tqdm import tqdm
import ffmpeg

class VideoBatchCompressor:
    """批量视频压缩器"""
    
    def __init__(self, input_dir: str, output_dir: str):
        self.input_dir = Path(input_dir)
        self.output_dir = Path(output_dir)
        self.output_dir.mkdir(parents=True, exist_ok=True)
        self.results = []
    
    def compress_all(self, bitrate: str = "1000k", crf: int = 23, 
                     max_width: int = None, extensions: list = None):
        """
        压缩目录下所有视频
        
        Args:
            bitrate: 目标码率(使用码率模式时)
            crf: CRF值(使用CRF模式时,设置后bitrate无效)
            max_width: 最大宽度(可选)
            extensions: 处理的扩展名列表
        """
        if extensions is None:
            extensions = ['.mp4', '.mov', '.avi', '.mkv', '.m4v']
        
        # 收集所有视频文件
        video_files = []
        for ext in extensions:
            video_files.extend(self.input_dir.glob(f"**/*{ext}"))
            video_files.extend(self.input_dir.glob(f"**/*{ext.upper()}"))
        
        print(f"找到 {len(video_files)} 个视频文件")
        
        for video_path in tqdm(video_files, desc="压缩进度"):
            # 保持相对路径结构
            rel_path = video_path.relative_to(self.input_dir)
            output_path = self.output_dir / rel_path.with_suffix('.mp4')
            output_path.parent.mkdir(parents=True, exist_ok=True)
            
            try:
                self._compress_single(video_path, output_path, bitrate, crf, max_width)
                
                original_size = video_path.stat().st_size / (1024 * 1024)
                compressed_size = output_path.stat().st_size / (1024 * 1024)
                ratio = (1 - compressed_size / original_size) * 100
                
                self.results.append({
                    "file": str(video_path),
                    "original_mb": original_size,
                    "compressed_mb": compressed_size,
                    "ratio": ratio
                })
                
            except Exception as e:
                print(f"压缩失败 {video_path}: {e}")
        
        self._print_summary()
        return self.results
    
    def _compress_single(self, input_path: Path, output_path: Path,
                         bitrate: str, crf: int, max_width: int):
        """压缩单个视频"""
        stream = ffmpeg.input(str(input_path))
        
        # 可选:缩放分辨率
        if max_width:
            stream = ffmpeg.filter(stream, 'scale', max_width, -2)
        
        # 编码参数
        if crf:
            output_params = {
                'vcodec': 'libx264',
                'crf': crf,
                'preset': 'medium',
                'acodec': 'aac',
                'audio_bitrate': '128k'
            }
        else:
            output_params = {
                'vcodec': 'libx264',
                'video_bitrate': bitrate,
                'preset': 'fast',
                'acodec': 'aac',
                'audio_bitrate': '128k'
            }
        
        stream = ffmpeg.output(stream, str(output_path), **output_params)
        ffmpeg.run(stream, overwrite_output=True, quiet=True)
    
    def _print_summary(self):
        """打印压缩报告"""
        if not self.results:
            return
        
        total_original = sum(r['original_mb'] for r in self.results)
        total_compressed = sum(r['compressed_mb'] for r in self.results)
        total_ratio = (1 - total_compressed / total_original) * 100
        
        print("\n" + "=" * 50)
        print("📊 批量压缩报告")
        print(f"   文件数量: {len(self.results)}")
        print(f"   原始总大小: {total_original:.2f} MB")
        print(f"   压缩后总大小: {total_compressed:.2f} MB")
        print(f"   🎯 总体压缩率: {total_ratio:.1f}%")
        print("=" * 50)

# 使用示例
compressor = VideoBatchCompressor("raw_videos", "compressed_videos")
compressor.compress_all(crf=26, max_width=1280)

六、方案对比与选择指南

方案 难度 灵活性 适用场景 推荐度
MoviePy 初学者、快速脚本、简单压缩 ⭐⭐⭐⭐⭐
vidpack 极低 命令行批量处理、不写代码 ⭐⭐⭐⭐
ffmpeg-python 中高 极高 专业需求、精细控制、自动化 ⭐⭐⭐⭐⭐
video_compressing 合并+压缩批量处理 ⭐⭐⭐

七、性能优化建议

7.1 硬件加速编码

python

# 使用GPU加速(如果有NVIDIA显卡)
def gpu_accelerated_compress(input_path: str, output_path: str):
    """使用NVIDIA GPU加速压缩"""
    stream = ffmpeg.input(input_path)
    stream = ffmpeg.output(
        stream, output_path,
        vcodec='h264_nvenc',  # NVIDIA GPU H.264编码器
        preset='p4',           # GPU预设
        rc='vbr',              # 可变码率
        b=v='2M',              # 目标码率
        acodec='aac'
    )
    ffmpeg.run(stream, overwrite_output=True)

# AMD GPU使用h264_amf,Intel使用h264_qsv

7.2 并行处理

python

from concurrent.futures import ProcessPoolExecutor, as_completed

def parallel_compress(video_files: list, output_dir: str, max_workers: int = 4):
    """并行压缩多个视频"""
    with ProcessPoolExecutor(max_workers=max_workers) as executor:
        futures = {
            executor.submit(compress_with_crf, video, f"{output_dir}/{Path(video).stem}_compressed.mp4", 26): video
            for video in video_files
        }
        
        for future in as_completed(futures):
            video = futures[future]
            try:
                future.result()
                print(f"✅ 完成: {video}")
            except Exception as e:
                print(f"❌ 失败: {video}, 错误: {e}")

八、完整示例:视频压缩服务类

python

from dataclasses import dataclass
from typing import Optional, Literal
import ffmpeg
import os

@dataclass
class CompressionResult:
    """压缩结果"""
    input_path: str
    output_path: str
    original_size_mb: float
    compressed_size_mb: float
    compression_ratio: float
    duration: float
    method: str

class VideoCompressor:
    """统一视频压缩服务类"""
    
    def __init__(self, input_path: str):
        self.input_path = input_path
        self._probe = None
    
    def _get_probe(self):
        if self._probe is None:
            self._probe = ffmpeg.probe(self.input_path)
        return self._probe
    
    def get_info(self) -> dict:
        """获取视频信息"""
        probe = self._get_probe()
        video_stream = next(s for s in probe['streams'] if s['codec_type'] == 'video')
        
        return {
            "duration": float(probe['format']['duration']),
            "size_mb": float(probe['format']['size']) / (1024 * 1024),
            "width": int(video_stream['width']),
            "height": int(video_stream['height']),
            "codec": video_stream['codec_name'],
            "bitrate": int(video_stream.get('bit_rate', 0)) / 1000
        }
    
    def compress(self, output_path: Optional[str] = None,
                 mode: Literal['bitrate', 'crf', 'size'] = 'crf',
                 value: int = 23,
                 preset: str = 'medium',
                 max_width: Optional[int] = None) -> CompressionResult:
        """
        压缩视频
        
        Args:
            output_path: 输出路径(默认自动生成)
            mode: 压缩模式 - 'bitrate'(码率kbps), 'crf'(质量值), 'size'(目标大小MB)
            value: 对应的值
            preset: 编码速度预设
            max_width: 最大宽度(缩放)
        """
        if output_path is None:
            name, ext = os.path.splitext(self.input_path)
            output_path = f"{name}_compressed{ext}"
        
        stream = ffmpeg.input(self.input_path)
        
        # 分辨率缩放
        if max_width:
            stream = ffmpeg.filter(stream, 'scale', max_width, -2)
        
        # 根据模式设置编码参数
        if mode == 'crf':
            output_params = {
                'vcodec': 'libx264',
                'crf': value,
                'preset': preset,
                'acodec': 'aac',
                'audio_bitrate': '128k'
            }
        elif mode == 'bitrate':
            output_params = {
                'vcodec': 'libx264',
                'video_bitrate': f"{value}k",
                'preset': preset,
                'acodec': 'aac',
                'audio_bitrate': '128k'
            }
        elif mode == 'size':
            # 自动计算码率
            info = self.get_info()
            target_total_bitrate = (value * 8 * 1024) / (1.073741824 * info['duration'])
            video_bitrate = max(target_total_bitrate - 128, 100)
            output_params = {
                'vcodec': 'libx264',
                'video_bitrate': f"{video_bitrate:.0f}k",
                'preset': preset,
                'acodec': 'aac',
                'audio_bitrate': '128k'
            }
        else:
            raise ValueError(f"未知模式: {mode}")
        
        stream = ffmpeg.output(stream, output_path, **output_params)
        ffmpeg.run(stream, overwrite_output=True, quiet=True)
        
        # 计算结果
        original_size = os.path.getsize(self.input_path) / (1024 * 1024)
        compressed_size = os.path.getsize(output_path) / (1024 * 1024)
        
        return CompressionResult(
            input_path=self.input_path,
            output_path=output_path,
            original_size_mb=original_size,
            compressed_size_mb=compressed_size,
            compression_ratio=(1 - compressed_size / original_size) * 100,
            duration=self.get_info()['duration'],
            method=f"{mode}={value}"
        )

# 使用示例
compressor = VideoCompressor("large_video.mp4")
print("视频信息:", compressor.get_info())

# CRF模式压缩(推荐)
result = compressor.compress(mode='crf', value=26, preset='fast', max_width=1280)
print(f"压缩率: {result.compression_ratio:.1f}%")

# 按目标大小压缩
result = compressor.compress("output.mp4", mode='size', value=50)  # 50MB

九、总结

Python视频压缩的核心是掌握FFmpeg这一强大的多媒体框架。通过不同层次的封装库,可以满足从简单压缩到专业级精细控制的各种需求:

  • 快速上手:使用MoviePy,几行代码完成压缩

  • 批量处理:使用vidpack或自定义批量脚本

  • 专业控制:使用ffmpeg-python,精细调节每个参数

选择合适的编码器(H.264 vs H.265)和压缩参数(CRF vs 固定码率),可以在文件大小和视频质量之间找到最佳平衡点。对于大多数场景,CRF=23~26配合H.264编码器是最稳妥的选择,能在保证较好画质的同时显著减小文件体积。

更多推荐