Python自动化实现文件格式批量转换
·
日常工作中经常需要批量转换文件格式,手动一个个转换太费时。今天分享一套完整的文件格式转换自动化方案,支持图片、文档、音频、视频等多种类型。
转换器框架设计
from abc import ABC, abstractmethod
from dataclasses import dataclass
from pathlib import Path
from typing import List, Optional, Dict
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class ConvertResult:
"""转换结果"""
success: bool
input_file: str
output_file: str
message: str
size_before: int = 0
size_after: int = 0
class BaseConverter(ABC):
"""转换器基类"""
def __init__(self, input_dir: str, output_dir: str):
self.input_dir = Path(input_dir)
self.output_dir = Path(output_dir)
self.output_dir.mkdir(parents=True, exist_ok=True)
self.results: List[ConvertResult] = []
@abstractmethod
def convert(self, input_path: Path, output_path: Path) -> bool:
"""执行转换,子类实现"""
pass
def process(self, pattern: str = '*', recursive: bool = False) -> List[ConvertResult]:
"""批量处理"""
if recursive:
files = list(self.input_dir.rglob(pattern))
else:
files = list(self.input_dir.glob(pattern))
logger.info(f"找到 {len(files)} 个文件待转换")
for file_path in files:
if not file_path.is_file():
continue
output_name = self.get_output_name(file_path)
output_path = self.output_dir / output_name
size_before = file_path.stat().st_size
try:
success = self.convert(file_path, output_path)
size_after = output_path.stat().st_size if success else 0
self.results.append(ConvertResult(
success=success,
input_file=str(file_path),
output_file=str(output_path) if success else '',
message='成功' if success else '失败',
size_before=size_before,
size_after=size_after
))
logger.info(f"✓ {file_path.name} -> {output_name}")
except Exception as e:
logger.error(f"✗ {file_path.name}: {e}")
self.results.append(ConvertResult(
success=False,
input_file=str(file_path),
output_file='',
message=str(e),
size_before=size_before
))
return self.results
@abstractmethod
def get_output_name(self, input_path: Path) -> str:
"""生成输出文件名"""
pass
def get_summary(self) -> Dict:
"""获取统计摘要"""
total = len(self.results)
success = sum(1 for r in self.results if r.success)
failed = total - success
total_size_before = sum(r.size_before for r in self.results)
total_size_after = sum(r.size_after for r in self.results)
return {
'total': total,
'success': success,
'failed': failed,
'total_size_before': total_size_before,
'total_size_after': total_size_after,
'compression_ratio': f"{(1 - total_size_after/total_size_before)*100:.1f}%"
if total_size_after and total_size_before else 'N/A'
}
图片格式转换
from PIL import Image
import pillow_avif # 支持AVIF格式
class ImageConverter(BaseConverter):
"""图片格式转换器"""
SUPPORTED_FORMATS = {'.jpg', '.jpeg', '.png', '.webp', '.bmp', '.gif', '.tiff', '.avif'}
def __init__(self, input_dir: str, output_dir: str, quality: int = 85):
super().__init__(input_dir, output_dir)
self.quality = quality
def convert(self, input_path: Path, output_path: Path) -> bool:
"""转换为目标格式"""
with Image.open(input_path) as img:
# 统一转换为RGB(避免PNG转JPG问题)
if output_path.suffix.lower() in ('.jpg', '.jpeg') and img.mode in ('RGBA', 'P'):
img = img.convert('RGB')
# 保持EXIF信息
kwargs = {'quality': self.quality}
if output_path.suffix.lower() == '.png':
kwargs.pop('quality', None)
img.save(output_path, **kwargs)
return True
def get_output_name(self, input_path: Path) -> str:
return input_path.stem + '_converted.png'
def resize_images(self, width: int, height: int, pattern: str = '*') -> List[ConvertResult]:
"""批量调整图片尺寸"""
results = []
files = list(self.input_dir.glob(pattern))
for file_path in files:
if file_path.suffix.lower() not in self.SUPPORTED_FORMATS:
continue
output_path = self.output_dir / f"{file_path.stem}_{width}x{height}{file_path.suffix}"
with Image.open(file_path) as img:
resized = img.resize((width, height), Image.Resampling.LANCZOS)
resized.save(output_path, quality=self.quality)
results.append(ConvertResult(
success=True,
input_file=str(file_path),
output_file=str(output_path),
message=f'调整为 {width}x{height}'
))
return results
def create_thumbnail(self, size: tuple = (200, 200), pattern: str = '*') -> List[ConvertResult]:
"""批量生成缩略图"""
results = []
files = list(self.input_dir.glob(pattern))
for file_path in files:
if file_path.suffix.lower() not in self.SUPPORTED_FORMATS:
continue
output_path = self.output_dir / f"{file_path.stem}_thumb.jpg"
with Image.open(file_path) as img:
img.thumbnail(size, Image.Resampling.LANCZOS)
# 添加白色背景
background = Image.new('RGB', size, (255, 255, 255))
background.paste(img, ((size[0]-img.width)//2, (size[1]-img.height)//2))
background.save(output_path, quality=80)
results.append(ConvertResult(
success=True,
input_file=str(file_path),
output_file=str(output_path),
message=f'生成缩略图 {size}'
))
return results
PDF格式转换
import fitz # PyMuPDF
class PDFConverter(BaseConverter):
"""PDF转换器"""
def convert(self, input_path: Path, output_path: Path) -> bool:
"""转换为目标格式"""
ext = output_path.suffix.lower()
if ext == '.png' or ext == '.jpg':
return self._pdf_to_image(input_path, output_path)
elif ext == '.txt':
return self._pdf_to_text(input_path, output_path)
return False
def _pdf_to_image(self, pdf_path: Path, output_path: Path) -> bool:
"""PDF转图片"""
doc = fitz.open(pdf_path)
# 只转换第一页
page = doc[0]
mat = fitz.Matrix(2, 2) # 2倍分辨率
pix = page.get_pixmap(matrix=mat)
output_path = output_path.with_suffix('.png')
pix.save(output_path)
doc.close()
return True
def _pdf_to_text(self, pdf_path: Path, output_path: Path) -> bool:
"""PDF提取文本"""
doc = fitz.open(pdf_path)
text = []
for page in doc:
text.append(page.get_text())
output_path.write_text('\n\n'.join(text), encoding='utf-8')
doc.close()
return True
def get_output_name(self, input_path: Path) -> str:
return input_path.stem + '_converted.png'
def extract_images(self, pattern: str = '*.pdf') -> List[ConvertResult]:
"""从PDF提取图片"""
results = []
files = list(self.input_dir.glob(pattern))
for pdf_path in files:
doc = fitz.open(pdf_path)
img_count = 0
for page_num, page in enumerate(doc):
images = page.get_images(full=True)
for img_index, img in enumerate(images):
xref = img[0]
pix = fitz.Pixmap(doc, xref)
if pix.n - pix.alpha < 4:
output_path = self.output_dir / f"{pdf_path.stem}_p{page_num+1}_img{img_index+1}.png"
pix.save(output_path)
img_count += 1
doc.close()
results.append(ConvertResult(
success=True,
input_file=str(pdf_path),
output_file=f'{img_count} 张图片',
message=f'提取了 {img_count} 张图片'
))
return results
文档格式转换
from docx import Document
from docx.shared import Inches
import mammoth
class DocxConverter(BaseConverter):
"""Word文档转换器"""
def convert(self, input_path: Path, output_path: Path) -> bool:
"""Word转换"""
ext = output_path.suffix.lower()
if ext == '.pdf':
# 需要安装docx2pdf或libreoffice
return self._docx_to_pdf(input_path, output_path)
elif ext == '.html':
return self._docx_to_html(input_path, output_path)
elif ext == '.txt':
return self._docx_to_text(input_path, output_path)
return False
def _docx_to_html(self, docx_path: Path, output_path: Path) -> bool:
"""Word转HTML"""
with open(docx_path, 'rb') as docx_file:
result = mammoth.convert_to_html(docx_file)
output_path.write_text(result.value, encoding='utf-8')
return True
def _docx_to_text(self, docx_path: Path, output_path: Path) -> bool:
"""Word转纯文本"""
doc = Document(docx_path)
text = []
for para in doc.paragraphs:
text.append(para.text)
output_path.write_text('\n'.join(text), encoding='utf-8')
return True
def _docx_to_pdf(self, docx_path: Path, output_path: Path) -> bool:
"""Word转PDF(需要libreoffice)"""
import subprocess
cmd = [
'libreoffice', '--headless', '--convert-to', 'pdf',
'--outdir', str(self.output_dir),
str(docx_path)
]
result = subprocess.run(cmd, capture_output=True)
return result.returncode == 0
def get_output_name(self, input_path: Path) -> str:
return input_path.stem + '.html'
音频格式转换
import subprocess
from pydub import AudioSegment
class AudioConverter(BaseConverter):
"""音频格式转换器"""
def __init__(self, input_dir: str, output_dir: str, bitrate: str = '192k'):
super().__init__(input_dir, output_dir)
self.bitrates = bitrate
def convert(self, input_path: Path, output_path: Path) -> bool:
"""音频转换"""
try:
audio = AudioSegment.from_file(str(input_path))
audio.export(str(output_path), format=output_path.suffix[1:])
return True
except Exception as e:
logger.error(f"音频转换失败: {e}")
# 尝试使用ffmpeg
return self._convert_with_ffmpeg(input_path, output_path)
def _convert_with_ffmpeg(self, input_path: Path, output_path: Path) -> bool:
"""使用ffmpeg转换"""
cmd = [
'ffmpeg', '-i', str(input_path),
'-b:a', self.bitrates,
'-y', str(output_path)
]
result = subprocess.run(cmd, capture_output=True, text=True)
return result.returncode == 0
def get_output_name(self, input_path: Path) -> str:
return input_path.stem + '.mp3'
def extract_audio(self, video_path: Path, output_format: str = 'mp3') -> ConvertResult:
"""从视频提取音频"""
output_path = self.output_dir / f"{video_path.stem}.{output_format}"
try:
video = AudioSegment.from_file(str(video_path))
video.export(str(output_path), format=output_format)
return ConvertResult(
success=True,
input_file=str(video_path),
output_file=str(output_path),
message='成功提取音频'
)
except Exception as e:
return ConvertResult(
success=False,
input_file=str(video_path),
output_file='',
message=str(e)
)
def adjust_volume(self, db_change: int, pattern: str = '*.mp3') -> List[ConvertResult]:
"""调整音频音量"""
results = []
files = list(self.input_dir.glob(pattern))
for file_path in files:
output_path = self.output_dir / file_path.name
audio = AudioSegment.from_file(str(file_path))
adjusted = audio + db_change # 增加/减少分贝
adjusted.export(str(output_path), format='mp3')
results.append(ConvertResult(
success=True,
input_file=str(file_path),
output_file=str(output_path),
message=f'音量调整 {db_change}dB'
))
return results
视频格式转换
class VideoConverter(BaseConverter):
"""视频格式转换器"""
def __init__(self, input_dir: str, output_dir: str,
resolution: str = None, bitrate: str = None):
super().__init__(input_dir, output_dir)
self.resolution = resolution
self.bitrates = bitrate
def convert(self, input_path: Path, output_path: Path) -> bool:
"""视频转换"""
cmd = ['ffmpeg', '-i', str(input_path), '-y']
# 视频编码
cmd.extend(['-c:v', 'libx264', '-preset', 'medium'])
# 分辨率
if self.resolution:
cmd.extend(['-vf', f'scale={self.resolution}'])
# 码率
if self.bitrates:
cmd.extend(['-b:v', self.bitrates])
# 音频
cmd.extend(['-c:a', 'aac', '-b:a', '128k'])
cmd.append(str(output_path))
result = subprocess.run(cmd, capture_output=True, text=True)
return result.returncode == 0
def get_output_name(self, input_path: Path) -> str:
return input_path.stem + '_converted.mp4'
def compress_video(self, crf: int = 28, pattern: str = '*.mp4') -> List[ConvertResult]:
"""压缩视频"""
results = []
files = list(self.input_dir.glob(pattern))
for file_path in files:
output_path = self.output_dir / f"{file_path.stem}_compressed.mp4"
cmd = [
'ffmpeg', '-i', str(file_path),
'-c:v', 'libx264', '-crf', str(crf),
'-c:a', 'aac',
'-y', str(output_path)
]
result = subprocess.run(cmd, capture_output=True)
results.append(ConvertResult(
success=result.returncode == 0,
input_file=str(file_path),
output_file=str(output_path) if result.returncode == 0 else '',
message='压缩完成' if result.returncode == 0 else '压缩失败'
))
return results
使用示例
if __name__ == '__main__':
# 图片批量转换
img_converter = ImageConverter('./inputs/images', './outputs/images', quality=85)
results = img_converter.process('*.png')
print("图片转换结果:", img_converter.get_summary())
# 生成缩略图
img_converter.create_thumbnail((200, 200))
# PDF转图片
pdf_converter = PDFConverter('./inputs/docs', './outputs/docs')
pdf_converter.process('*.pdf')
# Word转HTML
doc_converter = DocxConverter('./inputs/docs', './outputs/html')
doc_converter.process('*.docx')
# 音频转MP3
audio_converter = AudioConverter('./inputs/audio', './outputs/audio', bitrate='192k')
results = audio_converter.process('*.wav')
# 视频压缩
video_converter = VideoConverter('./inputs/video', './outputs/video', resolution='1280:720')
results = video_converter.process('*.avi')
总结
文件格式转换自动化要点:
- 统一框架:基类定义标准流程,子类实现具体转换
- 异常处理:转换失败不影响其他文件
- 结果统计:记录每个文件的转换状态
- 工具选择:合理使用Pillow、PyMuPDF、pydub等库
- ffmpeg集成:处理音视频的专业工具
更多推荐
所有评论(0)