日常工作中经常需要批量转换文件格式,手动一个个转换太费时。今天分享一套完整的文件格式转换自动化方案,支持图片、文档、音频、视频等多种类型。

转换器框架设计

from abc import ABC, abstractmethod
from dataclasses import dataclass
from pathlib import Path
from typing import List, Optional, Dict
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

@dataclass
class ConvertResult:
    """转换结果"""
    success: bool
    input_file: str
    output_file: str
    message: str
    size_before: int = 0
    size_after: int = 0

class BaseConverter(ABC):
    """转换器基类"""
    
    def __init__(self, input_dir: str, output_dir: str):
        self.input_dir = Path(input_dir)
        self.output_dir = Path(output_dir)
        self.output_dir.mkdir(parents=True, exist_ok=True)
        self.results: List[ConvertResult] = []
    
    @abstractmethod
    def convert(self, input_path: Path, output_path: Path) -> bool:
        """执行转换,子类实现"""
        pass
    
    def process(self, pattern: str = '*', recursive: bool = False) -> List[ConvertResult]:
        """批量处理"""
        if recursive:
            files = list(self.input_dir.rglob(pattern))
        else:
            files = list(self.input_dir.glob(pattern))
        
        logger.info(f"找到 {len(files)} 个文件待转换")
        
        for file_path in files:
            if not file_path.is_file():
                continue
            
            output_name = self.get_output_name(file_path)
            output_path = self.output_dir / output_name
            
            size_before = file_path.stat().st_size
            
            try:
                success = self.convert(file_path, output_path)
                size_after = output_path.stat().st_size if success else 0
                
                self.results.append(ConvertResult(
                    success=success,
                    input_file=str(file_path),
                    output_file=str(output_path) if success else '',
                    message='成功' if success else '失败',
                    size_before=size_before,
                    size_after=size_after
                ))
                
                logger.info(f"✓ {file_path.name} -> {output_name}")
                
            except Exception as e:
                logger.error(f"✗ {file_path.name}: {e}")
                self.results.append(ConvertResult(
                    success=False,
                    input_file=str(file_path),
                    output_file='',
                    message=str(e),
                    size_before=size_before
                ))
        
        return self.results
    
    @abstractmethod
    def get_output_name(self, input_path: Path) -> str:
        """生成输出文件名"""
        pass
    
    def get_summary(self) -> Dict:
        """获取统计摘要"""
        total = len(self.results)
        success = sum(1 for r in self.results if r.success)
        failed = total - success
        
        total_size_before = sum(r.size_before for r in self.results)
        total_size_after = sum(r.size_after for r in self.results)
        
        return {
            'total': total,
            'success': success,
            'failed': failed,
            'total_size_before': total_size_before,
            'total_size_after': total_size_after,
            'compression_ratio': f"{(1 - total_size_after/total_size_before)*100:.1f}%" 
                if total_size_after and total_size_before else 'N/A'
        }

图片格式转换

from PIL import Image
import pillow_avif  # 支持AVIF格式

class ImageConverter(BaseConverter):
    """图片格式转换器"""
    
    SUPPORTED_FORMATS = {'.jpg', '.jpeg', '.png', '.webp', '.bmp', '.gif', '.tiff', '.avif'}
    
    def __init__(self, input_dir: str, output_dir: str, quality: int = 85):
        super().__init__(input_dir, output_dir)
        self.quality = quality
    
    def convert(self, input_path: Path, output_path: Path) -> bool:
        """转换为目标格式"""
        with Image.open(input_path) as img:
            # 统一转换为RGB(避免PNG转JPG问题)
            if output_path.suffix.lower() in ('.jpg', '.jpeg') and img.mode in ('RGBA', 'P'):
                img = img.convert('RGB')
            
            # 保持EXIF信息
            kwargs = {'quality': self.quality}
            
            if output_path.suffix.lower() == '.png':
                kwargs.pop('quality', None)
            
            img.save(output_path, **kwargs)
        
        return True
    
    def get_output_name(self, input_path: Path) -> str:
        return input_path.stem + '_converted.png'
    
    def resize_images(self, width: int, height: int, pattern: str = '*') -> List[ConvertResult]:
        """批量调整图片尺寸"""
        results = []
        files = list(self.input_dir.glob(pattern))
        
        for file_path in files:
            if file_path.suffix.lower() not in self.SUPPORTED_FORMATS:
                continue
            
            output_path = self.output_dir / f"{file_path.stem}_{width}x{height}{file_path.suffix}"
            
            with Image.open(file_path) as img:
                resized = img.resize((width, height), Image.Resampling.LANCZOS)
                resized.save(output_path, quality=self.quality)
            
            results.append(ConvertResult(
                success=True,
                input_file=str(file_path),
                output_file=str(output_path),
                message=f'调整为 {width}x{height}'
            ))
        
        return results
    
    def create_thumbnail(self, size: tuple = (200, 200), pattern: str = '*') -> List[ConvertResult]:
        """批量生成缩略图"""
        results = []
        files = list(self.input_dir.glob(pattern))
        
        for file_path in files:
            if file_path.suffix.lower() not in self.SUPPORTED_FORMATS:
                continue
            
            output_path = self.output_dir / f"{file_path.stem}_thumb.jpg"
            
            with Image.open(file_path) as img:
                img.thumbnail(size, Image.Resampling.LANCZOS)
                # 添加白色背景
                background = Image.new('RGB', size, (255, 255, 255))
                background.paste(img, ((size[0]-img.width)//2, (size[1]-img.height)//2))
                background.save(output_path, quality=80)
            
            results.append(ConvertResult(
                success=True,
                input_file=str(file_path),
                output_file=str(output_path),
                message=f'生成缩略图 {size}'
            ))
        
        return results

PDF格式转换

import fitz  # PyMuPDF

class PDFConverter(BaseConverter):
    """PDF转换器"""
    
    def convert(self, input_path: Path, output_path: Path) -> bool:
        """转换为目标格式"""
        ext = output_path.suffix.lower()
        
        if ext == '.png' or ext == '.jpg':
            return self._pdf_to_image(input_path, output_path)
        elif ext == '.txt':
            return self._pdf_to_text(input_path, output_path)
        
        return False
    
    def _pdf_to_image(self, pdf_path: Path, output_path: Path) -> bool:
        """PDF转图片"""
        doc = fitz.open(pdf_path)
        
        # 只转换第一页
        page = doc[0]
        mat = fitz.Matrix(2, 2)  # 2倍分辨率
        pix = page.get_pixmap(matrix=mat)
        
        output_path = output_path.with_suffix('.png')
        pix.save(output_path)
        doc.close()
        
        return True
    
    def _pdf_to_text(self, pdf_path: Path, output_path: Path) -> bool:
        """PDF提取文本"""
        doc = fitz.open(pdf_path)
        text = []
        
        for page in doc:
            text.append(page.get_text())
        
        output_path.write_text('\n\n'.join(text), encoding='utf-8')
        doc.close()
        
        return True
    
    def get_output_name(self, input_path: Path) -> str:
        return input_path.stem + '_converted.png'
    
    def extract_images(self, pattern: str = '*.pdf') -> List[ConvertResult]:
        """从PDF提取图片"""
        results = []
        files = list(self.input_dir.glob(pattern))
        
        for pdf_path in files:
            doc = fitz.open(pdf_path)
            img_count = 0
            
            for page_num, page in enumerate(doc):
                images = page.get_images(full=True)
                for img_index, img in enumerate(images):
                    xref = img[0]
                    pix = fitz.Pixmap(doc, xref)
                    
                    if pix.n - pix.alpha < 4:
                        output_path = self.output_dir / f"{pdf_path.stem}_p{page_num+1}_img{img_index+1}.png"
                        pix.save(output_path)
                        img_count += 1
            
            doc.close()
            results.append(ConvertResult(
                success=True,
                input_file=str(pdf_path),
                output_file=f'{img_count} 张图片',
                message=f'提取了 {img_count} 张图片'
            ))
        
        return results

文档格式转换

from docx import Document
from docx.shared import Inches
import mammoth

class DocxConverter(BaseConverter):
    """Word文档转换器"""
    
    def convert(self, input_path: Path, output_path: Path) -> bool:
        """Word转换"""
        ext = output_path.suffix.lower()
        
        if ext == '.pdf':
            # 需要安装docx2pdf或libreoffice
            return self._docx_to_pdf(input_path, output_path)
        elif ext == '.html':
            return self._docx_to_html(input_path, output_path)
        elif ext == '.txt':
            return self._docx_to_text(input_path, output_path)
        
        return False
    
    def _docx_to_html(self, docx_path: Path, output_path: Path) -> bool:
        """Word转HTML"""
        with open(docx_path, 'rb') as docx_file:
            result = mammoth.convert_to_html(docx_file)
            output_path.write_text(result.value, encoding='utf-8')
        
        return True
    
    def _docx_to_text(self, docx_path: Path, output_path: Path) -> bool:
        """Word转纯文本"""
        doc = Document(docx_path)
        text = []
        
        for para in doc.paragraphs:
            text.append(para.text)
        
        output_path.write_text('\n'.join(text), encoding='utf-8')
        return True
    
    def _docx_to_pdf(self, docx_path: Path, output_path: Path) -> bool:
        """Word转PDF(需要libreoffice)"""
        import subprocess
        
        cmd = [
            'libreoffice', '--headless', '--convert-to', 'pdf',
            '--outdir', str(self.output_dir),
            str(docx_path)
        ]
        
        result = subprocess.run(cmd, capture_output=True)
        return result.returncode == 0
    
    def get_output_name(self, input_path: Path) -> str:
        return input_path.stem + '.html'

音频格式转换

import subprocess
from pydub import AudioSegment

class AudioConverter(BaseConverter):
    """音频格式转换器"""
    
    def __init__(self, input_dir: str, output_dir: str, bitrate: str = '192k'):
        super().__init__(input_dir, output_dir)
        self.bitrates = bitrate
    
    def convert(self, input_path: Path, output_path: Path) -> bool:
        """音频转换"""
        try:
            audio = AudioSegment.from_file(str(input_path))
            audio.export(str(output_path), format=output_path.suffix[1:])
            return True
        except Exception as e:
            logger.error(f"音频转换失败: {e}")
            # 尝试使用ffmpeg
            return self._convert_with_ffmpeg(input_path, output_path)
    
    def _convert_with_ffmpeg(self, input_path: Path, output_path: Path) -> bool:
        """使用ffmpeg转换"""
        cmd = [
            'ffmpeg', '-i', str(input_path),
            '-b:a', self.bitrates,
            '-y', str(output_path)
        ]
        
        result = subprocess.run(cmd, capture_output=True, text=True)
        return result.returncode == 0
    
    def get_output_name(self, input_path: Path) -> str:
        return input_path.stem + '.mp3'
    
    def extract_audio(self, video_path: Path, output_format: str = 'mp3') -> ConvertResult:
        """从视频提取音频"""
        output_path = self.output_dir / f"{video_path.stem}.{output_format}"
        
        try:
            video = AudioSegment.from_file(str(video_path))
            video.export(str(output_path), format=output_format)
            
            return ConvertResult(
                success=True,
                input_file=str(video_path),
                output_file=str(output_path),
                message='成功提取音频'
            )
        except Exception as e:
            return ConvertResult(
                success=False,
                input_file=str(video_path),
                output_file='',
                message=str(e)
            )
    
    def adjust_volume(self, db_change: int, pattern: str = '*.mp3') -> List[ConvertResult]:
        """调整音频音量"""
        results = []
        files = list(self.input_dir.glob(pattern))
        
        for file_path in files:
            output_path = self.output_dir / file_path.name
            
            audio = AudioSegment.from_file(str(file_path))
            adjusted = audio + db_change  # 增加/减少分贝
            adjusted.export(str(output_path), format='mp3')
            
            results.append(ConvertResult(
                success=True,
                input_file=str(file_path),
                output_file=str(output_path),
                message=f'音量调整 {db_change}dB'
            ))
        
        return results

视频格式转换

class VideoConverter(BaseConverter):
    """视频格式转换器"""
    
    def __init__(self, input_dir: str, output_dir: str, 
                 resolution: str = None, bitrate: str = None):
        super().__init__(input_dir, output_dir)
        self.resolution = resolution
        self.bitrates = bitrate
    
    def convert(self, input_path: Path, output_path: Path) -> bool:
        """视频转换"""
        cmd = ['ffmpeg', '-i', str(input_path), '-y']
        
        # 视频编码
        cmd.extend(['-c:v', 'libx264', '-preset', 'medium'])
        
        # 分辨率
        if self.resolution:
            cmd.extend(['-vf', f'scale={self.resolution}'])
        
        # 码率
        if self.bitrates:
            cmd.extend(['-b:v', self.bitrates])
        
        # 音频
        cmd.extend(['-c:a', 'aac', '-b:a', '128k'])
        
        cmd.append(str(output_path))
        
        result = subprocess.run(cmd, capture_output=True, text=True)
        return result.returncode == 0
    
    def get_output_name(self, input_path: Path) -> str:
        return input_path.stem + '_converted.mp4'
    
    def compress_video(self, crf: int = 28, pattern: str = '*.mp4') -> List[ConvertResult]:
        """压缩视频"""
        results = []
        files = list(self.input_dir.glob(pattern))
        
        for file_path in files:
            output_path = self.output_dir / f"{file_path.stem}_compressed.mp4"
            
            cmd = [
                'ffmpeg', '-i', str(file_path),
                '-c:v', 'libx264', '-crf', str(crf),
                '-c:a', 'aac',
                '-y', str(output_path)
            ]
            
            result = subprocess.run(cmd, capture_output=True)
            
            results.append(ConvertResult(
                success=result.returncode == 0,
                input_file=str(file_path),
                output_file=str(output_path) if result.returncode == 0 else '',
                message='压缩完成' if result.returncode == 0 else '压缩失败'
            ))
        
        return results

使用示例

if __name__ == '__main__':
    # 图片批量转换
    img_converter = ImageConverter('./inputs/images', './outputs/images', quality=85)
    results = img_converter.process('*.png')
    print("图片转换结果:", img_converter.get_summary())
    
    # 生成缩略图
    img_converter.create_thumbnail((200, 200))
    
    # PDF转图片
    pdf_converter = PDFConverter('./inputs/docs', './outputs/docs')
    pdf_converter.process('*.pdf')
    
    # Word转HTML
    doc_converter = DocxConverter('./inputs/docs', './outputs/html')
    doc_converter.process('*.docx')
    
    # 音频转MP3
    audio_converter = AudioConverter('./inputs/audio', './outputs/audio', bitrate='192k')
    results = audio_converter.process('*.wav')
    
    # 视频压缩
    video_converter = VideoConverter('./inputs/video', './outputs/video', resolution='1280:720')
    results = video_converter.process('*.avi')

总结

文件格式转换自动化要点:

  1. 统一框架:基类定义标准流程,子类实现具体转换
  2. 异常处理:转换失败不影响其他文件
  3. 结果统计:记录每个文件的转换状态
  4. 工具选择:合理使用Pillow、PyMuPDF、pydub等库
  5. ffmpeg集成:处理音视频的专业工具

更多推荐