手搓一个Python增量备份器:160行代码实现MD5差异检测+镜像同步+JSON日志(零第三方库)

写过文件整理器之后,我发现不少人还有另一个刚需——Python 增量备份。Windows自带的文件历史记录不好控制,第三方同步工具(FreeFileSync 之类)功能强大但界面复杂。这次就手搓一个纯 Python增量备份小工具,标准库为主,零依赖,支持 MD5 差异检测、增量复制、镜像同步,跑完还能生成一份 JSON 操作日志。

代码量在 160 行左右,直接复制就能跑。


一、需求拆解

我们要实现的核心能力:

  1. 对比两边:递归扫描源目录和目标目录,列出所有文件。
  2. 判断变化:不只看文件名,而是用 MD5 校验判断内容是否真的变了(防止只改时间戳但没改内容的文件被重复复制)。
  3. 增量复制:只复制新增或修改过的文件,保留目标端已有的、未改动的文件。
  4. 镜像模式(可选):目标端存在、但源端已删除的文件,可以选择同步删除。
  5. 过滤规则:支持白名单/黑名单,比如只备份 .py.md,或者跳过 __pycache__.git
  6. 日志记录:把本次操作复制了谁、删除了谁、跳过了谁,写成一份带时间戳的 JSON 日志,方便回溯。

二、环境准备

纯标准库实现,不需要 pip 安装任何东西:

模块 用途
pathlib 跨平台路径操作,比 os.path 舒服很多
hashlib 计算文件 MD5
shutil 复制文件、创建目录
json 写操作日志
datetime 打时间戳
argparse 命令行参数解析
sys 错误输出与退出码

Python 版本:3.8+(pathlibrelative_to 在 3.8 以上行为更稳定,海象运算符 := 也是 3.8 引入的)。


三、核心代码逐段讲解

3.1 计算文件 MD5

判断文件是否变更,不能只看修改时间——有时候 Git 切分支、解压压缩包会把时间戳全刷一遍,但内容没变。MD5 虽然不算加密安全,但做文件一致性校验完全够用,速度也快。

import hashlib

def file_md5(filepath, block_size=65536):
    """计算文件 MD5,大文件分块读取(默认 64KB,平衡速度与内存)"""
    h = hashlib.md5()
    file_size = filepath.stat().st_size
    bytes_read = 0
    with open(filepath, 'rb') as f:
        while chunk := f.read(block_size):
            h.update(chunk)
            bytes_read += len(chunk)
            # 大文件(>10MB)打印进度,避免看起来像卡死
            if file_size > 10 * 1024 * 1024:
                pct = bytes_read / file_size * 100
                print(f"\r  计算 MD5: {filepath.name} ({pct:.0f}%)", end='', flush=True)
    if file_size > 10 * 1024 * 1024:
        print(f"\r  计算 MD5: {filepath.name} (100%)  ")
    return h.hexdigest()

注意点block_size 默认 64KB,不是一次性 read(),避免几百兆文件直接爆内存。同时只对超过 10MB 的大文件显示进度百分比,小文件静默处理不刷屏。


3.2 扫描目录,生成 “相对路径 → MD5” 的字典

pathlib.Path.rglob('*') 递归遍历,过滤掉目录,只留文件。键用相对路径(如 src/main.py),这样源目录和目标目录才能对齐比较。

from pathlib import Path

def scan_directory(directory, exclude=None):
    """
    扫描目录,返回 {相对路径: md5} 字典
    exclude: 黑名单列表,匹配目录名或文件后缀
    """
    if exclude is None:
        exclude = []
    exclude_set = set(exclude)  # 用集合加速查找
    directory = Path(directory)
    result = {}
    for filepath in directory.rglob('*'):
        # 跳过符号链接,防止循环递归
        if filepath.is_symlink():
            continue
        if not filepath.is_file():
            continue
        rel_path = filepath.relative_to(directory).as_posix()
        # 黑名单过滤:目录名或后缀匹配就跳过
        if any(part in exclude_set for part in filepath.parts):
            continue
        if any(filepath.name.endswith(ext) for ext in exclude_set if ext.startswith('.')):
            continue
        result[rel_path] = file_md5(filepath)
    return result

注意点

  • as_posix() 把反斜杠转成正斜杠,Windows 和 Linux 日志里路径格式统一,看着不别扭。
  • exclude 转成 set 再查找,从 O(n) 变 O(1),黑白名单长的时候有明显区别。
  • 必须跳过符号链接rglob 默认会跟随符号链接,如果目录里有循环链接(比如 Linux 上的软链接指向父目录),会无限递归直到栈溢出。is_symlink() 判断加在最前面。

3.3 差异分析:找出需要复制、删除、保留的文件

对比源端和目标端的两个字典:

  • 新增:源端有,目标端没有。
  • 修改:两边都有,但 MD5 不同。
  • 保留:两边都有,MD5 相同。
  • 待删:目标端有,源端没有(仅在镜像模式下处理)。
def diff_directories(src_dict, dst_dict):
    """
    对比两个字典,返回四类文件列表
    """
    to_copy = []
    to_delete = []
    unchanged = []
    for rel_path, md5 in src_dict.items():
        if rel_path not in dst_dict:
            to_copy.append((rel_path, 'new'))
        elif dst_dict[rel_path] != md5:
            to_copy.append((rel_path, 'modified'))
        else:
            unchanged.append(rel_path)
    for rel_path in dst_dict:
        if rel_path not in src_dict:
            to_delete.append(rel_path)
    return to_copy, to_delete, unchanged

先别急着复制代码,文章最后的避坑表里总结了7个容易翻车的点,建议看完再跑。


3.4 执行同步操作

import shutil

def sync_files(src_dir, dst_dir, to_copy, to_delete, mirror=False, dry_run=False):
    """
    执行实际的复制/删除操作
    dry_run=True 时只打印不执行,适合先预览
    """
    src_dir = Path(src_dir)
    dst_dir = Path(dst_dir)
    log = {'copied': [], 'deleted': [], 'errors': []}

    for rel_path, reason in to_copy:
        src_file = src_dir / rel_path
        dst_file = dst_dir / rel_path
        print(f"[复制] {rel_path} ({reason})")
        if not dry_run:
            try:
                dst_file.parent.mkdir(parents=True, exist_ok=True)
                shutil.copy2(src_file, dst_file)
                log['copied'].append({'file': rel_path, 'reason': reason})
            except Exception as e:
                print(f"  [失败] {e}")
                log['errors'].append({'file': rel_path, 'error': str(e)})

    if mirror:
        for rel_path in to_delete:
            dst_file = dst_dir / rel_path
            print(f"[删除] {rel_path} (镜像模式)")
            if not dry_run:
                try:
                    dst_file.unlink()
                    log['deleted'].append(rel_path)
                except Exception as e:
                    print(f"  [失败] {e}")
                    log['errors'].append({'file': rel_path, 'error': str(e)})
        # 镜像模式:清理目标端空目录(从最深层开始)
        if not dry_run and log['deleted']:
            _clean_empty_dirs(dst_dir, src_dir)
    else:
        for rel_path in to_delete:
            print(f"[跳过删除] {rel_path} (非镜像模式)")

    return log


def _clean_empty_dirs(dst_dir, src_dir):
    """清理目标端中源端不存在的空目录,从深到浅遍历"""
    src_dir = Path(src_dir)
    for dirpath in sorted(Path(dst_dir).rglob('*'), reverse=True):
        if dirpath.is_dir() and not dirpath.is_symlink():
            if dirpath == Path(dst_dir):
                continue
            try:
                rel = dirpath.relative_to(dst_dir)
                if (src_dir / rel).is_dir():
                    continue
            except ValueError:
                continue
            try:
                dirpath.rmdir()  # rmdir 只能删空目录
            except OSError:
                pass  # 非空目录,跳过

注意点

  • shutil.copy2copy 多保留了原文件的修改时间、权限等元数据,备份场景更实用。
  • mkdir(parents=True, exist_ok=True) 自动创建中间目录,不用自己一层层判断。
  • dry_run 模式很重要——第一次跑不确定对不对,先 --dry-run 看它会动哪些文件,确认无误再去掉这个参数。
  • 镜像模式会自动清理空目录:文件删了但父目录还在的话会很别扭,_clean_empty_dirsrmdir() 从最深层开始逐级清理(rmdir 只删空目录,非空则自动跳过,不会误删有内容的目录)。

3.5 生成 JSON 日志

import json
from datetime import datetime

def write_log(log_data, log_dir='backup_logs'):
    """把操作记录写成带时间戳的 JSON 文件"""
    log_dir = Path(log_dir)
    log_dir.mkdir(exist_ok=True)
    timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
    log_file = log_dir / f"backup_{timestamp}.json"
    with open(log_file, 'w', encoding='utf-8') as f:
        json.dump(log_data, f, ensure_ascii=False, indent=2)
    print(f"\n日志已保存: {log_file}")

3.6 入口:参数解析与前置校验

import sys

def main():
    parser = argparse.ArgumentParser(description='文件夹增量备份工具')
    parser.add_argument('src', help='源目录路径')
    parser.add_argument('dst', help='目标目录路径')
    parser.add_argument('--mirror', action='store_true', help='镜像模式:删除目标端多余文件')
    parser.add_argument('--dry-run', action='store_true', help='试运行,只打印不执行')
    parser.add_argument('--exclude', nargs='+', default=['__pycache__', '.git', 'node_modules', '.tmp'],
                        help='黑名单,空格分隔')
    parser.add_argument('--log-dir', default='backup_logs', help='日志保存目录')
    args = parser.parse_args()

    # 前置校验:源目录必须存在
    src_path = Path(args.src)
    if not src_path.is_dir():
        print(f"错误: 源目录不存在 — {args.src}", file=sys.stderr)
        sys.exit(1)

    dst_path = Path(args.dst)
    # 目标目录不存在则自动创建
    if not dst_path.exists():
        print(f"目标目录不存在,自动创建: {args.dst}")
        dst_path.mkdir(parents=True, exist_ok=True)

    # ... 后续扫描、对比、同步逻辑

注意点

  • 源目录不存在时直接 sys.exit(1) 报错退出,比跑到一半 FileNotFoundError 好排查得多。
  • 目标目录不存在不是致命错误,自动创建就行,减少使用前的准备工作。

四、完整源码(直接复制可运行)

把上面所有片段拼起来,加上 if __name__ == '__main__': 入口和命令行参数解析:

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
文件夹增量备份器
支持:差异检测(MD5)、增量复制、镜像同步、黑名单过滤、操作日志
"""

import argparse
import hashlib
import json
import shutil
import sys
from datetime import datetime
from pathlib import Path


def file_md5(filepath, block_size=65536):
    """计算文件 MD5,大文件分块读取(默认 64KB,平衡速度与内存)"""
    h = hashlib.md5()
    file_size = filepath.stat().st_size
    bytes_read = 0
    with open(filepath, 'rb') as f:
        while chunk := f.read(block_size):
            h.update(chunk)
            bytes_read += len(chunk)
            # 大文件(>10MB)打印进度,避免看起来像卡死
            if file_size > 10 * 1024 * 1024:
                pct = bytes_read / file_size * 100
                print(f"\r  计算 MD5: {filepath.name} ({pct:.0f}%)", end='', flush=True)
    if file_size > 10 * 1024 * 1024:
        print(f"\r  计算 MD5: {filepath.name} (100%)  ")
    return h.hexdigest()


def scan_directory(directory, exclude=None):
    """扫描目录,返回 {相对路径: md5} 字典,跳过符号链接"""
    if exclude is None:
        exclude = []
    exclude_set = set(exclude)  # 用集合加速查找
    directory = Path(directory)
    result = {}

    for filepath in directory.rglob('*'):
        # 跳过符号链接,防止循环递归
        if filepath.is_symlink():
            continue
        if not filepath.is_file():
            continue

        rel_path = filepath.relative_to(directory).as_posix()

        # 按目录名过滤(如 __pycache__, .git, node_modules)
        if any(part in exclude_set for part in filepath.parts):
            continue
        # 按后缀过滤(如 .tmp, .log)——只处理 exclude 中以 '.' 开头的项
        if any(filepath.name.endswith(ext) for ext in exclude_set if ext.startswith('.')):
            continue

        result[rel_path] = file_md5(filepath)

    return result


def diff_directories(src_dict, dst_dict):
    """对比源和目标字典,返回待复制、待删除、未变更列表"""
    to_copy = []
    to_delete = []
    unchanged = []
    for rel_path, md5 in src_dict.items():
        if rel_path not in dst_dict:
            to_copy.append((rel_path, 'new'))
        elif dst_dict[rel_path] != md5:
            to_copy.append((rel_path, 'modified'))
        else:
            unchanged.append(rel_path)
    for rel_path in dst_dict:
        if rel_path not in src_dict:
            to_delete.append(rel_path)
    return to_copy, to_delete, unchanged


def sync_files(src_dir, dst_dir, to_copy, to_delete, mirror=False, dry_run=False):
    """执行复制/删除,返回操作日志字典"""
    src_dir = Path(src_dir)
    dst_dir = Path(dst_dir)
    log = {'copied': [], 'deleted': [], 'errors': []}

    for rel_path, reason in to_copy:
        src_file = src_dir / rel_path
        dst_file = dst_dir / rel_path
        print(f"[复制] {rel_path} ({reason})")
        if not dry_run:
            try:
                dst_file.parent.mkdir(parents=True, exist_ok=True)
                shutil.copy2(src_file, dst_file)
                log['copied'].append({'file': rel_path, 'reason': reason})
            except Exception as e:
                print(f"  [失败] {e}")
                log['errors'].append({'file': rel_path, 'error': str(e)})

    if mirror:
        for rel_path in to_delete:
            dst_file = dst_dir / rel_path
            print(f"[删除] {rel_path} (镜像模式)")
            if not dry_run:
                try:
                    dst_file.unlink()
                    log['deleted'].append(rel_path)
                except Exception as e:
                    print(f"  [失败] {e}")
                    log['errors'].append({'file': rel_path, 'error': str(e)})
        # 镜像模式:清理目标端空目录(从最深层开始)
        if not dry_run and log['deleted']:
            _clean_empty_dirs(dst_dir, src_dir)
    else:
        for rel_path in to_delete:
            print(f"[跳过删除] {rel_path} (非镜像模式)")

    return log


def _clean_empty_dirs(dst_dir, src_dir):
    """清理目标端中源端不存在的空目录,从深到浅遍历"""
    src_dir = Path(src_dir)
    for dirpath in sorted(Path(dst_dir).rglob('*'), reverse=True):
        if dirpath.is_dir() and not dirpath.is_symlink():
            # 跳过目标端根目录本身
            if dirpath == Path(dst_dir):
                continue
            # 对应的源端目录如果存在,不清理
            try:
                rel = dirpath.relative_to(dst_dir)
                if (src_dir / rel).is_dir():
                    continue
            except ValueError:
                continue
            # 目录为空才删除
            try:
                dirpath.rmdir()  # rmdir 只能删空目录
            except OSError:
                pass  # 非空目录,跳过


def write_log(log_data, log_dir='backup_logs'):
    """写入带时间戳的 JSON 日志"""
    log_dir = Path(log_dir)
    log_dir.mkdir(exist_ok=True)
    timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
    log_file = log_dir / f"backup_{timestamp}.json"
    with open(log_file, 'w', encoding='utf-8') as f:
        json.dump(log_data, f, ensure_ascii=False, indent=2)
    print(f"\n日志已保存: {log_file}")


def main():
    parser = argparse.ArgumentParser(description='文件夹增量备份工具')
    parser.add_argument('src', help='源目录路径')
    parser.add_argument('dst', help='目标目录路径')
    parser.add_argument('--mirror', action='store_true', help='镜像模式:删除目标端多余文件')
    parser.add_argument('--dry-run', action='store_true', help='试运行,只打印不执行')
    parser.add_argument('--exclude', nargs='+', default=['__pycache__', '.git', 'node_modules', '.tmp'],
                        help='黑名单,空格分隔,默认跳过 __pycache__ .git node_modules .tmp')
    parser.add_argument('--log-dir', default='backup_logs', help='日志保存目录')
    args = parser.parse_args()

    # 前置校验:源目录必须存在
    src_path = Path(args.src)
    if not src_path.is_dir():
        print(f"错误: 源目录不存在 — {args.src}", file=sys.stderr)
        sys.exit(1)

    dst_path = Path(args.dst)
    # 目标目录不存在则自动创建
    if not dst_path.exists():
        print(f"目标目录不存在,自动创建: {args.dst}")
        dst_path.mkdir(parents=True, exist_ok=True)

    print(f"扫描源目录: {args.src}")
    src_dict = scan_directory(args.src, exclude=args.exclude)
    print(f"  发现 {len(src_dict)} 个文件")

    print(f"扫描目标目录: {args.dst}")
    dst_dict = scan_directory(args.dst, exclude=args.exclude)
    print(f"  发现 {len(dst_dict)} 个文件")

    to_copy, to_delete, unchanged = diff_directories(src_dict, dst_dict)

    print(f"\n分析结果:")
    print(f"  待复制: {len(to_copy)} 个")
    print(f"  待删除: {len(to_delete)} 个 (镜像模式下执行)")
    print(f"  未变更: {len(unchanged)} 个")

    if not to_copy and (not args.mirror or not to_delete):
        print("\n无需同步,已是最新。")
        return

    if args.dry_run:
        print("\n[试运行模式] 以下操作不会实际执行:")

    log = sync_files(args.src, args.dst, to_copy, to_delete,
                     mirror=args.mirror, dry_run=args.dry_run)

    log_summary = {
        'timestamp': datetime.now().isoformat(),
        'source': args.src,
        'destination': args.dst,
        'mirror': args.mirror,
        'dry_run': args.dry_run,
        'stats': {
            'source_files': len(src_dict),
            'dest_files': len(dst_dict),
            'copied': len(log['copied']),
            'deleted': len(log['deleted']),
            'errors': len(log['errors'])
        },
        'details': log
    }

    if not args.dry_run:
        write_log(log_summary, log_dir=args.log_dir)
    else:
        print("\n[试运行结束] 未生成日志。去掉 --dry-run 后正式执行。")


if __name__ == '__main__':
    main()

五、运行测试

5.1 先建两个测试目录

Windows(CMD / PowerShell)

mkdir test_src\sub
mkdir test_dst\sub

echo hello > test_src\a.txt
echo world > test_src\sub\b.txt
echo keep > test_src\c.txt

echo hello > test_dst\a.txt
echo old > test_dst\c.txt

Linux / macOS / Git Bash

mkdir -p test_src/sub
mkdir -p test_dst/sub

echo "hello" > test_src/a.txt
echo "world" > test_src/sub/b.txt
echo "keep" > test_src/c.txt

echo "hello" > test_dst/a.txt
echo "old" > test_dst/c.txt

5.2 试运行(dry-run)

python backup_sync.py test_src test_dst --dry-run

预期输出:

扫描源目录: test_src
  发现 3 个文件
扫描目标目录: test_dst
  发现 2 个文件

分析结果:
  待复制: 2 个
  待删除: 0 个 (镜像模式下执行)
  未变更: 1 个

[试运行模式] 以下操作不会实际执行:
[复制] sub/b.txt (new)
[复制] c.txt (modified)

[试运行结束] 未生成日志。去掉 --dry-run 后正式执行。

a.txt 内容没变(MD5 相同),所以被识别为未变更,不会重复复制。

5.3 正式执行

python backup_sync.py test_src test_dst

执行后 test_dst 会与 test_src 完全一致,并在 backup_logs/ 目录下生成类似 backup_20260518_113300.json 的日志文件。

5.4 镜像模式测试

先在目标端手动加一个源端没有的文件:

echo orphan > test_dst\orphan.txt
python backup_sync.py test_src test_dst --mirror --dry-run

输出里你会看到 [删除] orphan.txt (镜像模式),确认无误后去掉 --dry-run 正式执行。


六、功能拓展方向

这个小工具骨架已经搭好,往实用方向再改几行就能升级:

  1. 多线程加速:复制大文件时,用 concurrent.futures.ThreadPoolExecutorto_copy 列表并行处理,备份几百 GB 素材时提速明显。
  2. 保留历史版本:复制前把目标端旧文件重命名为 文件名.时间戳.bak,实现简易版本回溯。
  3. 配置文件化:把黑名单、日志路径、是否默认开启镜像等参数写进 backup_config.json,不用每次敲命令行。
  4. 定时任务:Windows 用任务计划程序 / Linux 用 cron,每天凌晨自动跑一遍,备份开发代码或文档目录。
  5. 白名单模式:现在默认黑名单过滤,可以再加一个 --include 参数,实现只备份指定类型(如仅 .py.md)。

七、关键避坑总结

坑点 现象 解决办法
只看修改时间 Git 切换分支后所有文件时间戳刷新,导致全部重复复制 用 MD5 校验内容,不依赖 mtime
一次性 read() 大文件 备份视频/镜像文件时内存暴涨 分块读取(block_size=65536,64KB)
符号链接循环递归 rglob 跟随软链接,如果链接指向父目录会栈溢出 is_symlink() 开头直接跳过
路径分隔符混乱 Windows 下日志里反斜杠,Linux 下正斜杠,对比时匹配失败 统一用 as_posix() 存相对路径
目标目录中间层级缺失 shutil.copy2 报错 No such file or directory mkdir(parents=True, exist_ok=True) 提前建好
误删目标端文件 第一次跑 --mirror 把不该删的删了 养成先 --dry-run 的习惯
镜像删除后残留空目录 文件删了但父目录还在,目标端一堆空文件夹 _clean_empty_dirsrmdir() 从深到浅清理
黑名单查找性能差 exclude 列表用 in 查找是 O(n) 转成 set,查找变 O(1)

核心就这几点:MD5 比时间戳靠谱、--dry-run 是保命参数、日志留底方便查问题。代码不依赖任何第三方库,随手丢进哪个环境都能跑。

先把上面的测试跑通,再根据自己的需求改。如果你加了多线程或者白名单功能,欢迎在评论区贴出来,互相借鉴。

更多推荐