一、库的简介

PyTorch是由Facebook人工智能研究院开发的深度学习框架,以其动态计算图、直观的API设计和优秀的GPU加速能力而闻名。在实际生活中,PyTorch的应用已经深入到人工智能的各个角落:研究人员用它快速验证新的深度学习算法,工程师用它构建智能对话系统,艺术家用它生成创意艺术图像,医生用它辅助医学影像分析。当你在TikTok上看到AI生成的滤镜特效,在翻译软件中使用实时语音翻译,或在医疗诊断中看到AI辅助的病理分析时,背后往往都有PyTorch的身影。它的设计哲学强调灵活性、易用性和高效性,使得深度学习模型从构思到部署的全流程变得更加自然和高效,尤其适合需要快速迭代和实验的研究环境。

二、安装库

安装PyTorch有多种方式,根据硬件配置和需求选择:

python

# 基础CPU版本安装(最新稳定版)
pip install torch torchvision torchaudio

# 指定CUDA版本的安装(例如CUDA 11.8)
pip install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu118

# 安装夜间构建版(最新功能)
pip install --pre torch torchvision torchaudio --index-url https://download.pytorch.org/whl/nightly/cu118

# 使用conda安装
conda install pytorch torchvision torchaudio pytorch-cuda=11.8 -c pytorch -c nvidia

# 验证安装
import torch
print(f"PyTorch版本: {torch.__version__}")
print(f"CUDA是否可用: {torch.cuda.is_available()}")
print(f"CUDA版本: {torch.version.cuda}")
print(f"GPU数量: {torch.cuda.device_count()}")

# 安装可选依赖
pip install torchviz  # 可视化计算图
pip install tensorboard  # 可视化工具
pip install torchtext  # 自然语言处理工具
pip install torchaudio  # 音频处理工具

对于需要特定硬件加速的用户:

bash

# 安装支持AMD ROCm的版本
pip install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/rocm5.6

# 安装支持Apple MPS(Metal Performance Shaders)的版本
pip install torch torchvision torchaudio

三、基本用法

1. 张量操作和自动微分

python

import torch
import numpy as np
import matplotlib.pyplot as plt

# 设置随机种子保证可重复性
torch.manual_seed(42)
np.random.seed(42)

# 1.1 创建张量
print("1. 张量创建:")
# 从数据创建
scalar = torch.tensor(42)
print(f"标量: {scalar}, 形状: {scalar.shape}, 数据类型: {scalar.dtype}")

vector = torch.tensor([1.0, 2.0, 3.0, 4.0])
print(f"向量: {vector}, 形状: {vector.shape}")

matrix = torch.tensor([[1, 2], [3, 4]], dtype=torch.float32)
print(f"矩阵:\n{matrix}")

# 特殊张量
zeros = torch.zeros(2, 3)
ones = torch.ones(3, 2)
eye = torch.eye(3)
range_tensor = torch.arange(0, 10, 2)
rand_tensor = torch.rand(2, 2)  # 均匀分布
randn_tensor = torch.randn(2, 2)  # 标准正态分布

# 1.2 张量操作
print("\n2. 张量操作:")
a = torch.tensor([[1, 2], [3, 4]], dtype=torch.float32)
b = torch.tensor([[5, 6], [7, 8]], dtype=torch.float32)

print(f"加法:\n{a + b}")
print(f"逐元素乘法:\n{a * b}")
print(f"矩阵乘法:\n{torch.matmul(a, b)}")
print(f"转置:\n{a.T}")

# 广播机制
c = torch.tensor([10, 20], dtype=torch.float32)
print(f"广播加法:\n{a + c}")

# 聚合操作
print(f"求和: {torch.sum(a)}")
print(f"均值: {torch.mean(a)}")
print(f"最大值: {torch.max(a)}")
print(f"按行求和: {torch.sum(a, dim=1)}")
print(f"按列均值: {torch.mean(a, dim=0)}")

# 1.3 张量变形
print("\n3. 张量变形:")
tensor = torch.arange(12)
print(f"原始张量: {tensor}, 形状: {tensor.shape}")

# 改变形状
reshaped = tensor.reshape(3, 4)
print(f"重塑为3x4:\n{reshaped}")

# 扩展维度
expanded = tensor.unsqueeze(0)  # 在0维扩展
print(f"扩展维度后形状: {expanded.shape}")

# 压缩维度
squeezed = expanded.squeeze(0)  # 压缩维度0
print(f"压缩维度后形状: {squeezed.shape}")

# 1.4 索引和切片
print("\n4. 索引和切片:")
tensor = torch.tensor([[1, 2, 3], [4, 5, 6], [7, 8, 9]], dtype=torch.float32)
print(f"原始张量:\n{tensor}")

print(f"第一个元素: {tensor[0, 0]}")
print(f"第一行: {tensor[0, :]}")
print(f"第一列: {tensor[:, 0]}")
print(f"子张量:\n{tensor[1:, 1:]}")

# 布尔索引
mask = tensor > 5
print(f"布尔掩码:\n{mask}")
print(f"使用掩码选择:\n{tensor[mask]}")

# 1.5 张量连接和分割
print("\n5. 张量连接和分割:")
t1 = torch.tensor([[1, 2], [3, 4]])
t2 = torch.tensor([[5, 6], [7, 8]])

# 连接
concatenated = torch.cat([t1, t2], dim=0)  # 沿行方向连接
print(f"行连接:\n{concatenated}")

stacked = torch.stack([t1, t2], dim=0)  # 堆叠,创建新维度
print(f"堆叠后形状: {stacked.shape}")

# 分割
split_tensors = torch.split(tensor, split_size_or_sections=1, dim=1)  # 沿列分割
print(f"分割为3个张量:")
for i, t in enumerate(split_tensors):
    print(f"张量{i}:\n{t}")

# 1.6 自动微分
print("\n6. 自动微分:")

# 创建需要梯度的张量
x = torch.tensor(2.0, requires_grad=True)

# 定义函数
def f(x):
    return 3 * x**2 + 2 * x + 1

# 计算函数值
y = f(x)

# 反向传播计算梯度
y.backward()

print(f"f(x) = 3x² + 2x + 1")
print(f"在 x={x.item()} 处的函数值: {y.item()}")
print(f"在 x={x.item()} 处的梯度: {x.grad.item()}")

# 多变量函数的梯度
x1 = torch.tensor(3.0, requires_grad=True)
x2 = torch.tensor(4.0, requires_grad=True)

def g(x1, x2):
    return x1**2 + x2**2

y = g(x1, x2)
y.backward()

print(f"\ng(x1, x2) = x1² + x2²")
print(f"在 x1={x1.item()}, x2={x2.item()} 处的函数值: {y.item()}")
print(f"偏导数 ∂g/∂x1: {x1.grad.item()}")
print(f"偏导数 ∂g/∂x2: {x2.grad.item()}")

# 1.7 张量和NumPy互操作
print("\n7. 张量和NumPy互操作:")
# PyTorch张量转NumPy数组
tensor = torch.tensor([[1, 2], [3, 4]])
numpy_array = tensor.numpy()
print(f"PyTorch张量转NumPy:\n{numpy_array}, 类型: {type(numpy_array)}")

# NumPy数组转PyTorch张量
numpy_array = np.array([[5, 6], [7, 8]])
tensor_from_numpy = torch.from_numpy(numpy_array)
print(f"NumPy转PyTorch张量:\n{tensor_from_numpy}")

# 1.8 设备管理
print("\n8. 设备管理:")
# 检查可用设备
print("可用设备:")
if torch.cuda.is_available():
    print(f"  GPU: {torch.cuda.get_device_name(0)}")
    print(f"  GPU数量: {torch.cuda.device_count()}")
else:
    print("  未检测到GPU设备")

# 创建张量并指定设备
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"当前设备: {device}")

# 在CPU上创建张量
cpu_tensor = torch.tensor([1, 2, 3])
print(f"CPU上的张量: {cpu_tensor.device}")

# 移动到GPU(如果可用)
if torch.cuda.is_available():
    gpu_tensor = cpu_tensor.to('cuda')
    print(f"GPU上的张量: {gpu_tensor.device}")
else:
    print("GPU不可用,跳过GPU测试")

# 1.9 性能优化
print("\n9. 性能优化:")
# 启用cudnn自动优化器
torch.backends.cudnn.benchmark = True

# 使用in-place操作减少内存使用
a = torch.ones(1000, 1000)
b = torch.ones(1000, 1000)

# 普通操作(创建新张量)
result1 = a + b

# in-place操作(修改原张量)
a.add_(b)  # 注意:这会修改a的值
print(f"in-place操作后a的形状: {a.shape}")

# 1.10 内存管理
print("\n10. 内存管理:")
# 查看张量内存占用
large_tensor = torch.randn(1000, 1000)
print(f"张量大小: {large_tensor.element_size() * large_tensor.nelement() / 1024**2:.2f} MB")

# 手动释放内存
del large_tensor
torch.cuda.empty_cache() if torch.cuda.is_available() else None
print("内存已释放")

# 1.11 随机数生成器
print("\n11. 随机数生成器:")
# 设置随机种子
torch.manual_seed(42)

# 生成随机数
rand1 = torch.rand(3, 3)
rand2 = torch.rand(3, 3)

print(f"固定种子后的随机数1:\n{rand1}")
print(f"固定种子后的随机数2:\n{rand2}")

# 重置随机种子
torch.seed()
rand_new = torch.rand(3, 3)
print(f"重置种子后的随机数:\n{rand_new}")

2. 神经网络基础模块

python

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
import numpy as np
import matplotlib.pyplot as plt
from sklearn.datasets import make_moons
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler

# 设置随机种子
torch.manual_seed(42)
np.random.seed(42)

# 1. 准备数据
print("1. 准备数据:")
X, y = make_moons(n_samples=1000, noise=0.2, random_state=42)

# 可视化数据
plt.figure(figsize=(8, 6))
plt.scatter(X[y==0, 0], X[y==0, 1], color='red', label='类别0', alpha=0.6)
plt.scatter(X[y==1, 0], X[y==1, 1], color='blue', label='类别1', alpha=0.6)
plt.xlabel('特征1')
plt.ylabel('特征2')
plt.title('月亮数据集')
plt.legend()
plt.grid(True, alpha=0.3)
plt.show()

# 数据分割和标准化
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=42, stratify=y
)

scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)

# 转换为PyTorch张量
X_train_tensor = torch.tensor(X_train_scaled, dtype=torch.float32)
X_test_tensor = torch.tensor(X_test_scaled, dtype=torch.float32)
y_train_tensor = torch.tensor(y_train, dtype=torch.long)
y_test_tensor = torch.tensor(y_test, dtype=torch.long)

print(f"训练集形状: {X_train_tensor.shape}, {y_train_tensor.shape}")
print(f"测试集形状: {X_test_tensor.shape}, {y_test_tensor.shape}")

# 2. 构建神经网络模型
print("\n2. 构建神经网络模型:")

# 方法1: 使用nn.Sequential
class MoonClassifierSequential(nn.Module):
    def __init__(self):
        super().__init__()
        self.model = nn.Sequential(
            nn.Linear(2, 16),
            nn.ReLU(),
            nn.Dropout(0.2),
            nn.Linear(16, 8),
            nn.ReLU(),
            nn.Dropout(0.2),
            nn.Linear(8, 2)
        )
    
    def forward(self, x):
        return self.model(x)

# 方法2: 使用模块列表
class MoonClassifierModuleList(nn.Module):
    def __init__(self):
        super().__init__()
        self.layers = nn.ModuleList([
            nn.Linear(2, 16),
            nn.ReLU(),
            nn.Dropout(0.2),
            nn.Linear(16, 8),
            nn.ReLU(),
            nn.Dropout(0.2),
            nn.Linear(8, 2)
        ])
    
    def forward(self, x):
        for layer in self.layers:
            x = layer(x)
        return x

# 方法3: 详细定义每一层
class MoonClassifierDetailed(nn.Module):
    def __init__(self):
        super().__init__()
        self.fc1 = nn.Linear(2, 16)
        self.fc2 = nn.Linear(16, 8)
        self.fc3 = nn.Linear(8, 2)
        self.dropout = nn.Dropout(0.2)
        self.relu = nn.ReLU()
    
    def forward(self, x):
        x = self.relu(self.fc1(x))
        x = self.dropout(x)
        x = self.relu(self.fc2(x))
        x = self.dropout(x)
        x = self.fc3(x)
        return x

# 创建模型实例
model = MoonClassifierDetailed()
print("模型结构:")
print(model)

# 3. 模型参数分析
print("\n3. 模型参数分析:")
print(f"总参数数量: {sum(p.numel() for p in model.parameters()):,}")
print(f"可训练参数数量: {sum(p.numel() for p in model.parameters() if p.requires_grad):,}")

# 打印每层参数
print("\n各层参数:")
for name, param in model.named_parameters():
    print(f"{name}: {param.shape} - {param.numel():,} 个参数")

# 4. 损失函数和优化器
print("\n4. 损失函数和优化器:")

# 定义损失函数
criterion = nn.CrossEntropyLoss()
print(f"损失函数: {criterion}")

# 定义优化器
optimizer = optim.Adam(model.parameters(), lr=0.01, weight_decay=1e-4)
print(f"优化器: {optimizer}")

# 定义学习率调度器
scheduler = optim.lr_scheduler.ReduceLROnPlateau(
    optimizer, mode='min', factor=0.5, patience=10, verbose=True
)
print(f"学习率调度器: {scheduler}")

# 5. 训练模型
print("\n5. 训练模型:")

# 创建数据加载器
train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)

# 训练参数
epochs = 100
train_losses = []
val_losses = []
train_accuracies = []
val_accuracies = []

# 训练循环
for epoch in range(epochs):
    # 训练阶段
    model.train()
    running_loss = 0.0
    correct = 0
    total = 0
    
    for batch_X, batch_y in train_loader:
        # 前向传播
        outputs = model(batch_X)
        loss = criterion(outputs, batch_y)
        
        # 反向传播和优化
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        
        # 统计
        running_loss += loss.item()
        _, predicted = torch.max(outputs.data, 1)
        total += batch_y.size(0)
        correct += (predicted == batch_y).sum().item()
    
    train_loss = running_loss / len(train_loader)
    train_accuracy = 100 * correct / total
    train_losses.append(train_loss)
    train_accuracies.append(train_accuracy)
    
    # 验证阶段
    model.eval()
    with torch.no_grad():
        val_outputs = model(X_test_tensor)
        val_loss = criterion(val_outputs, y_test_tensor).item()
        _, val_predicted = torch.max(val_outputs.data, 1)
        val_correct = (val_predicted == y_test_tensor).sum().item()
        val_accuracy = 100 * val_correct / len(y_test_tensor)
        
        val_losses.append(val_loss)
        val_accuracies.append(val_accuracy)
    
    # 更新学习率
    scheduler.step(val_loss)
    
    # 打印进度
    if (epoch + 1) % 20 == 0:
        print(f'Epoch [{epoch+1}/{epochs}], '
              f'Train Loss: {train_loss:.4f}, Train Acc: {train_accuracy:.2f}%, '
              f'Val Loss: {val_loss:.4f}, Val Acc: {val_accuracy:.2f}%')

print("训练完成!")

# 6. 模型评估
print("\n6. 模型评估:")

# 最终评估
model.eval()
with torch.no_grad():
    test_outputs = model(X_test_tensor)
    test_loss = criterion(test_outputs, y_test_tensor).item()
    _, test_predicted = torch.max(test_outputs.data, 1)
    test_correct = (test_predicted == y_test_tensor).sum().item()
    test_accuracy = 100 * test_correct / len(y_test_tensor)
    
    print(f"测试集损失: {test_loss:.4f}")
    print(f"测试集准确率: {test_accuracy:.2f}%")
    
    # 计算精确率、召回率、F1分数
    from sklearn.metrics import precision_score, recall_score, f1_score
    
    y_pred_np = test_predicted.numpy()
    y_true_np = y_test_tensor.numpy()
    
    precision = precision_score(y_true_np, y_pred_np, average='weighted')
    recall = recall_score(y_true_np, y_pred_np, average='weighted')
    f1 = f1_score(y_true_np, y_pred_np, average='weighted')
    
    print(f"精确率: {precision:.4f}")
    print(f"召回率: {recall:.4f}")
    print(f"F1分数: {f1:.4f}")

# 7. 可视化结果
print("\n7. 可视化结果:")

fig, axes = plt.subplots(2, 3, figsize=(15, 10))

# 训练历史
axes[0, 0].plot(train_losses, label='训练损失')
axes[0, 0].plot(val_losses, label='验证损失')
axes[0, 0].set_xlabel('Epoch')
axes[0, 0].set_ylabel('损失')
axes[0, 0].set_title('训练和验证损失')
axes[0, 0].legend()
axes[0, 0].grid(True, alpha=0.3)

axes[0, 1].plot(train_accuracies, label='训练准确率')
axes[0, 1].plot(val_accuracies, label='验证准确率')
axes[0, 1].set_xlabel('Epoch')
axes[0, 1].set_ylabel('准确率 (%)')
axes[0, 1].set_title('训练和验证准确率')
axes[0, 1].legend()
axes[0, 1].grid(True, alpha=0.3)

# 决策边界
def plot_decision_boundary(model, X, y, ax, title):
    # 创建网格
    x_min, x_max = X[:, 0].min() - 0.5, X[:, 0].max() + 0.5
    y_min, y_max = X[:, 1].min() - 0.5, X[:, 1].max() + 0.5
    xx, yy = np.meshgrid(np.linspace(x_min, x_max, 200),
                         np.linspace(y_min, y_max, 200))
    
    # 预测
    X_grid = np.c_[xx.ravel(), yy.ravel()]
    X_grid_tensor = torch.tensor(X_grid, dtype=torch.float32)
    
    model.eval()
    with torch.no_grad():
        Z = model(X_grid_tensor)
        Z = torch.softmax(Z, dim=1)
        Z = Z[:, 1].reshape(xx.shape).numpy()
    
    # 绘制
    ax.contourf(xx, yy, Z, levels=50, cmap='RdBu', alpha=0.8)
    ax.scatter(X[:, 0], X[:, 1], c=y, cmap='bwr', edgecolors='k', s=50)
    ax.set_xlim(x_min, x_max)
    ax.set_ylim(y_min, y_max)
    ax.set_title(title)
    ax.set_xlabel('特征1')
    ax.set_ylabel('特征2')

plot_decision_boundary(model, X_test_scaled, y_test, axes[0, 2], '决策边界')

# 混淆矩阵
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
cm = confusion_matrix(y_test, test_predicted.numpy())
disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=['类别0', '类别1'])
disp.plot(ax=axes[1, 0], cmap='Blues')
axes[1, 0].set_title('混淆矩阵')

# ROC曲线
from sklearn.metrics import roc_curve, auc
model.eval()
with torch.no_grad():
    test_probs = torch.softmax(model(X_test_tensor), dim=1)
    test_probs_np = test_probs[:, 1].numpy()

fpr, tpr, _ = roc_curve(y_test, test_probs_np)
roc_auc = auc(fpr, tpr)

axes[1, 1].plot(fpr, tpr, color='darkorange', lw=2, 
               label=f'ROC曲线 (AUC = {roc_auc:.3f})')
axes[1, 1].plot([0, 1], [0, 1], color='navy', lw=2, linestyle='--', label='随机分类器')
axes[1, 1].set_xlim([0.0, 1.0])
axes[1, 1].set_ylim([0.0, 1.05])
axes[1, 1].set_xlabel('假正率')
axes[1, 1].set_ylabel('真正率')
axes[1, 1].set_title('ROC曲线')
axes[1, 1].legend(loc="lower right")
axes[1, 1].grid(True, alpha=0.3)

# 特征重要性
def compute_feature_importance(model, X, y, criterion):
    """通过梯度计算特征重要性"""
    X_tensor = torch.tensor(X, dtype=torch.float32, requires_grad=True)
    y_tensor = torch.tensor(y, dtype=torch.long)
    
    model.eval()
    outputs = model(X_tensor)
    loss = criterion(outputs, y_tensor)
    
    # 计算梯度
    loss.backward()
    
    # 获取梯度并计算重要性
    gradients = X_tensor.grad.abs().mean(dim=0).numpy()
    return gradients

importance = compute_feature_importance(model, X_test_scaled, y_test, criterion)
features = ['特征1', '特征2']
axes[1, 2].bar(features, importance, color=['skyblue', 'lightcoral'])
axes[1, 2].set_ylabel('平均梯度绝对值')
axes[1, 2].set_title('特征重要性')
axes[1, 2].grid(True, alpha=0.3, axis='y')

plt.suptitle('神经网络分类器完整分析', fontsize=16, fontweight='bold')
plt.tight_layout()
plt.show()

# 8. 模型保存和加载
print("\n8. 模型保存和加载:")

# 保存整个模型
torch.save(model, 'moon_classifier.pth')
print("模型已保存为 moon_classifier.pth")

# 保存模型参数
torch.save(model.state_dict(), 'moon_classifier_params.pth')
print("模型参数已保存为 moon_classifier_params.pth")

# 加载整个模型
loaded_model = torch.load('moon_classifier.pth')
print("模型加载成功")

# 验证加载的模型
loaded_model.eval()
with torch.no_grad():
    loaded_outputs = loaded_model(X_test_tensor)
    _, loaded_predicted = torch.max(loaded_outputs.data, 1)
    loaded_accuracy = (loaded_predicted == y_test_tensor).sum().item() / len(y_test_tensor) * 100
    
print(f"加载模型的测试准确率: {loaded_accuracy:.2f}%")

# 9. 模型转换
print("\n9. 模型转换:")

# 转换为TorchScript(用于生产部署)
scripted_model = torch.jit.script(model)
scripted_model.save('moon_classifier_scripted.pt')
print("模型已转换为TorchScript格式")

# 测试TorchScript模型
scripted_model.eval()
with torch.no_grad():
    scripted_outputs = scripted_model(X_test_tensor)
    scripted_accuracy = (torch.max(scripted_outputs.data, 1)[1] == y_test_tensor).sum().item() / len(y_test_tensor) * 100
    
print(f"TorchScript模型的测试准确率: {scripted_accuracy:.2f}%")

# 10. 性能优化
print("\n10. 性能优化:")

# 使用@torch.jit.script装饰器优化关键函数
@torch.jit.script
def optimized_forward(x: torch.Tensor, weights: torch.Tensor, biases: torch.Tensor) -> torch.Tensor:
    """优化的前向传播函数"""
    x = torch.relu(torch.matmul(x, weights[0]) + biases[0])
    x = torch.relu(torch.matmul(x, weights[1]) + biases[1])
    x = torch.matmul(x, weights[2]) + biases[2]
    return x

# 提取模型参数
weights = [p for p in model.parameters() if len(p.shape) == 2]
biases = [p for p in model.parameters() if len(p.shape) == 1]

# 测试优化后的函数
test_input = torch.randn(1, 2)
optimized_output = optimized_forward(test_input, weights, biases)
print(f"优化函数输出形状: {optimized_output.shape}")

# 11. 内存和性能分析
print("\n11. 内存和性能分析:")

import time

# 内存使用分析
model_size = sum(p.numel() * p.element_size() for p in model.parameters()) / 1024**2
print(f"模型内存占用: {model_size:.2f} MB")

# 推理速度测试
test_batch = torch.randn(100, 2)

# 预热
for _ in range(10):
    _ = model(test_batch)

# 测量推理时间
start_time = time.time()
for _ in range(100):
    _ = model(test_batch)
inference_time = time.time() - start_time

print(f"推理100批次的时间: {inference_time:.4f}秒")
print(f"平均每批次推理时间: {inference_time/100*1000:.2f}毫秒")

# 12. 模型量化(减小模型大小)
print("\n12. 模型量化:")

# 动态量化
quantized_model = torch.quantization.quantize_dynamic(
    model, {nn.Linear}, dtype=torch.qint8
)

# 保存量化模型
torch.save(quantized_model.state_dict(), 'moon_classifier_quantized.pth')

# 计算量化后的模型大小
quantized_size = sum(p.numel() * p.element_size() for p in quantized_model.parameters()) / 1024**2
print(f"量化后模型内存占用: {quantized_size:.2f} MB")
print(f"压缩比: {model_size/quantized_size:.2f}x")

# 测试量化模型
quantized_model.eval()
with torch.no_grad():
    quantized_outputs = quantized_model(X_test_tensor)
    quantized_accuracy = (torch.max(quantized_outputs.data, 1)[1] == y_test_tensor).sum().item() / len(y_test_tensor) * 100
    
print(f"量化模型的测试准确率: {quantized_accuracy:.2f}%")

3. 卷积神经网络(CNN)

python

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import DataLoader
from torchvision import datasets, transforms, models
import numpy as np
import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix, classification_report

# 设置随机种子
torch.manual_seed(42)
np.random.seed(42)

# 1. 数据准备和增强
print("1. 准备CIFAR-10数据集:")

# 定义数据变换
transform_train = transforms.Compose([
    transforms.RandomCrop(32, padding=4),
    transforms.RandomHorizontalFlip(),
    transforms.ToTensor(),
    transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)),
])

transform_test = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)),
])

# 加载数据集
train_dataset = datasets.CIFAR10(
    root='./data', train=True, download=True, transform=transform_train
)
test_dataset = datasets.CIFAR10(
    root='./data', train=False, download=True, transform=transform_test
)

# 类别名称
class_names = ['airplane', 'automobile', 'bird', 'cat', 'deer',
               'dog', 'frog', 'horse', 'ship', 'truck']

print(f"训练集大小: {len(train_dataset)}")
print(f"测试集大小: {len(test_dataset)}")
print(f"类别数: {len(class_names)}")

# 创建数据加载器
batch_size = 64
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=2)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False, num_workers=2)

# 2. 构建CNN模型
print("\n2. 构建卷积神经网络:")

class CIFAR10CNN(nn.Module):
    def __init__(self):
        super().__init__()
        
        # 卷积块1
        self.conv1 = nn.Conv2d(3, 64, kernel_size=3, padding=1)
        self.bn1 = nn.BatchNorm2d(64)
        self.conv2 = nn.Conv2d(64, 64, kernel_size=3, padding=1)
        self.bn2 = nn.BatchNorm2d(64)
        self.pool1 = nn.MaxPool2d(2, 2)
        self.dropout1 = nn.Dropout2d(0.25)
        
        # 卷积块2
        self.conv3 = nn.Conv2d(64, 128, kernel_size=3, padding=1)
        self.bn3 = nn.BatchNorm2d(128)
        self.conv4 = nn.Conv2d(128, 128, kernel_size=3, padding=1)
        self.bn4 = nn.BatchNorm2d(128)
        self.pool2 = nn.MaxPool2d(2, 2)
        self.dropout2 = nn.Dropout2d(0.25)
        
        # 卷积块3
        self.conv5 = nn.Conv2d(128, 256, kernel_size=3, padding=1)
        self.bn5 = nn.BatchNorm2d(256)
        self.conv6 = nn.Conv2d(256, 256, kernel_size=3, padding=1)
        self.bn6 = nn.BatchNorm2d(256)
        self.pool3 = nn.MaxPool2d(2, 2)
        self.dropout3 = nn.Dropout2d(0.25)
        
        # 全连接层
        self.fc1 = nn.Linear(256 * 4 * 4, 512)
        self.bn7 = nn.BatchNorm1d(512)
        self.dropout4 = nn.Dropout(0.5)
        self.fc2 = nn.Linear(512, 256)
        self.bn8 = nn.BatchNorm1d(256)
        self.dropout5 = nn.Dropout(0.5)
        self.fc3 = nn.Linear(256, 10)
        
    def forward(self, x):
        # 卷积块1
        x = F.relu(self.bn1(self.conv1(x)))
        x = F.relu(self.bn2(self.conv2(x)))
        x = self.pool1(x)
        x = self.dropout1(x)
        
        # 卷积块2
        x = F.relu(self.bn3(self.conv3(x)))
        x = F.relu(self.bn4(self.conv4(x)))
        x = self.pool2(x)
        x = self.dropout2(x)
        
        # 卷积块3
        x = F.relu(self.bn5(self.conv5(x)))
        x = F.relu(self.bn6(self.conv6(x)))
        x = self.pool3(x)
        x = self.dropout3(x)
        
        # 展平
        x = x.view(-1, 256 * 4 * 4)
        
        # 全连接层
        x = F.relu(self.bn7(self.fc1(x)))
        x = self.dropout4(x)
        x = F.relu(self.bn8(self.fc2(x)))
        x = self.dropout5(x)
        x = self.fc3(x)
        
        return x

# 创建模型
model = CIFAR10CNN()
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = model.to(device)

print("模型结构:")
print(model)
print(f"模型已移动到: {device}")

# 3. 模型参数统计
print("\n3. 模型参数统计:")
total_params = sum(p.numel() for p in model.parameters())
trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
print(f"总参数: {total_params:,}")
print(f"可训练参数: {trainable_params:,}")

# 4. 定义损失函数和优化器
print("\n4. 定义损失函数和优化器:")
criterion = nn.CrossEntropyLoss(label_smoothing=0.1)  # 标签平滑减少过拟合
optimizer = optim.AdamW(model.parameters(), lr=0.001, weight_decay=1e-4)

# 学习率调度器
scheduler = optim.lr_scheduler.CosineAnnealingLR(
    optimizer, T_max=200, eta_min=1e-6
)

# 5. 训练模型
print("\n5. 训练模型:")

def train_epoch(model, loader, criterion, optimizer, device):
    """训练一个epoch"""
    model.train()
    running_loss = 0.0
    correct = 0
    total = 0
    
    for batch_idx, (inputs, targets) in enumerate(loader):
        inputs, targets = inputs.to(device), targets.to(device)
        
        # 前向传播
        outputs = model(inputs)
        loss = criterion(outputs, targets)
        
        # 反向传播
        optimizer.zero_grad()
        loss.backward()
        
        # 梯度裁剪防止梯度爆炸
        torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
        
        optimizer.step()
        
        # 统计
        running_loss += loss.item()
        _, predicted = outputs.max(1)
        total += targets.size(0)
        correct += predicted.eq(targets).sum().item()
        
        # 打印进度
        if batch_idx % 100 == 0:
            print(f'批次 [{batch_idx}/{len(loader)}], 损失: {loss.item():.4f}')
    
    epoch_loss = running_loss / len(loader)
    epoch_acc = 100. * correct / total
    return epoch_loss, epoch_acc

def test_epoch(model, loader, criterion, device):
    """测试一个epoch"""
    model.eval()
    running_loss = 0.0
    correct = 0
    total = 0
    
    with torch.no_grad():
        for inputs, targets in loader:
            inputs, targets = inputs.to(device), targets.to(device)
            
            outputs = model(inputs)
            loss = criterion(outputs, targets)
            
            running_loss += loss.item()
            _, predicted = outputs.max(1)
            total += targets.size(0)
            correct += predicted.eq(targets).sum().item()
    
    epoch_loss = running_loss / len(loader)
    epoch_acc = 100. * correct / total
    return epoch_loss, epoch_acc

# 训练循环
epochs = 50
train_losses, train_accuracies = [], []
test_losses, test_accuracies = [], []

for epoch in range(epochs):
    print(f'\nEpoch {epoch+1}/{epochs}')
    
    # 训练
    train_loss, train_acc = train_epoch(model, train_loader, criterion, optimizer, device)
    train_losses.append(train_loss)
    train_accuracies.append(train_acc)
    
    # 测试
    test_loss, test_acc = test_epoch(model, test_loader, criterion, device)
    test_losses.append(test_loss)
    test_accuracies.append(test_acc)
    
    # 更新学习率
    scheduler.step()
    
    # 打印结果
    print(f'训练损失: {train_loss:.4f}, 训练准确率: {train_acc:.2f}%')
    print(f'测试损失: {test_loss:.4f}, 测试准确率: {test_acc:.2f}%')
    print(f'当前学习率: {scheduler.get_last_lr()[0]:.6f}')

print("训练完成!")

# 6. 可视化训练结果
print("\n6. 可视化训练结果:")

fig, axes = plt.subplots(2, 3, figsize=(15, 10))

# 训练历史
axes[0, 0].plot(train_losses, label='训练损失')
axes[0, 0].plot(test_losses, label='测试损失')
axes[0, 0].set_xlabel('Epoch')
axes[0, 0].set_ylabel('损失')
axes[0, 0].set_title('训练和测试损失')
axes[0, 0].legend()
axes[0, 0].grid(True, alpha=0.3)

axes[0, 1].plot(train_accuracies, label='训练准确率')
axes[0, 1].plot(test_accuracies, label='测试准确率')
axes[0, 1].set_xlabel('Epoch')
axes[0, 1].set_ylabel('准确率 (%)')
axes[0, 1].set_title('训练和测试准确率')
axes[0, 1].legend()
axes[0, 1].grid(True, alpha=0.3)

# 学习率变化
learning_rates = [scheduler.get_last_lr()[0] for _ in range(epochs)]
axes[0, 2].plot(learning_rates)
axes[0, 2].set_xlabel('Epoch')
axes[0, 2].set_ylabel('学习率')
axes[0, 2].set_title('学习率变化')
axes[0, 2].grid(True, alpha=0.3)

# 混淆矩阵
print("\n生成混淆矩阵...")
model.eval()
all_preds = []
all_targets = []

with torch.no_grad():
    for inputs, targets in test_loader:
        inputs = inputs.to(device)
        outputs = model(inputs)
        _, preds = outputs.max(1)
        
        all_preds.extend(preds.cpu().numpy())
        all_targets.extend(targets.numpy())

cm = confusion_matrix(all_targets, all_preds)

# 绘制归一化的混淆矩阵
cm_normalized = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]

im = axes[1, 0].imshow(cm_normalized, interpolation='nearest', cmap=plt.cm.Blues)
axes[1, 0].set_title('归一化混淆矩阵')
plt.colorbar(im, ax=axes[1, 0])

# 添加文本标注
thresh = cm_normalized.max() / 2.
for i in range(cm_normalized.shape[0]):
    for j in range(cm_normalized.shape[1]):
        axes[1, 0].text(j, i, format(cm_normalized[i, j], '.2f'),
                       ha="center", va="center",
                       color="white" if cm_normalized[i, j] > thresh else "black")

axes[1, 0].set_xticks(range(len(class_names)))
axes[1, 0].set_yticks(range(len(class_names)))
axes[1, 0].set_xticklabels(class_names, rotation=45, ha='right')
axes[1, 0].set_yticklabels(class_names)

# 各类别准确率
class_accuracies = []
for i in range(len(class_names)):
    class_mask = np.array(all_targets) == i
    class_correct = np.sum(np.array(all_preds)[class_mask] == i)
    class_total = np.sum(class_mask)
    class_accuracies.append(class_correct / class_total if class_total > 0 else 0)

axes[1, 1].bar(range(len(class_names)), class_accuracies, color='skyblue')
axes[1, 1].set_xlabel('类别')
axes[1, 1].set_ylabel('准确率')
axes[1, 1].set_title('各类别准确率')
axes[1, 1].set_xticks(range(len(class_names)))
axes[1, 1].set_xticklabels(class_names, rotation=45, ha='right')
axes[1, 1].set_ylim([0, 1])
axes[1, 1].grid(True, alpha=0.3, axis='y')

# 显示一些预测结果
print("\n显示预测结果示例...")
model.eval()
with torch.no_grad():
    # 获取一批测试数据
    test_images, test_labels = next(iter(test_loader))
    test_images = test_images[:5].to(device)
    test_labels = test_labels[:5]
    
    # 预测
    outputs = model(test_images)
    _, preds = outputs.max(1)
    
    # 反归一化图像以便显示
    mean = torch.tensor([0.4914, 0.4822, 0.4465]).view(1, 3, 1, 1)
    std = torch.tensor([0.2023, 0.1994, 0.2010]).view(1, 3, 1, 1)
    test_images_denorm = test_images.cpu() * std + mean
    test_images_denorm = torch.clamp(test_images_denorm, 0, 1)
    
    # 显示图像和预测
    for i in range(5):
        ax = axes[1, 2] if i == 0 else None
        if i == 0:
            ax = axes[1, 2]
            img = test_images_denorm[i].permute(1, 2, 0).numpy()
            ax.imshow(img)
            
            true_label = class_names[test_labels[i]]
            pred_label = class_names[preds[i].item()]
            color = 'green' if true_label == pred_label else 'red'
            
            ax.set_title(f'真实: {true_label}\n预测: {pred_label}', color=color)
            ax.axis('off')

plt.suptitle('CNN在CIFAR-10上的完整分析', fontsize=16, fontweight='bold')
plt.tight_layout()
plt.show()

# 7. 模型解释性分析
print("\n7. 模型解释性分析:")

# Grad-CAM可视化
class GradCAM:
    def __init__(self, model, target_layer):
        self.model = model
        self.target_layer = target_layer
        self.gradients = None
        self.activations = None
        
        # 注册钩子
        target_layer.register_forward_hook(self.save_activation)
        target_layer.register_full_backward_hook(self.save_gradient)
    
    def save_activation(self, module, input, output):
        self.activations = output
    
    def save_gradient(self, module, grad_input, grad_output):
        self.gradients = grad_output[0]
    
    def generate(self, input_tensor, target_class=None):
        """生成Grad-CAM热图"""
        self.model.eval()
        
        # 前向传播
        output = self.model(input_tensor)
        
        if target_class is None:
            target_class = output.argmax(dim=1).item()
        
        # 反向传播
        self.model.zero_grad()
        one_hot = torch.zeros_like(output)
        one_hot[0, target_class] = 1
        output.backward(gradient=one_hot)
        
        # 计算权重
        gradients = self.gradients
        activations = self.activations
        
        # 全局平均池化梯度
        pooled_gradients = torch.mean(gradients, dim=[0, 2, 3])
        
        # 加权特征图
        for i in range(activations.size(1)):
            activations[:, i, :, :] *= pooled_gradients[i]
        
        # 生成热图
        heatmap = torch.mean(activations, dim=1).squeeze()
        heatmap = F.relu(heatmap)  # ReLU激活
        heatmap = heatmap.detach().cpu().numpy()
        
        # 归一化
        heatmap = (heatmap - np.min(heatmap)) / (np.max(heatmap) - np.min(heatmap) + 1e-8)
        
        return heatmap

# 选择目标层(最后一个卷积层)
target_layer = model.conv6
grad_cam = GradCAM(model, target_layer)

# 选择一个测试样本
sample_idx = 10
sample_image, sample_label = test_dataset[sample_idx]
sample_image = sample_image.unsqueeze(0).to(device)

# 生成Grad-CAM热图
heatmap = grad_cam.generate(sample_image)

# 可视化热图
fig, axes = plt.subplots(1, 3, figsize=(12, 4))

# 原始图像
sample_image_cpu = sample_image.squeeze().cpu()
mean = torch.tensor([0.4914, 0.4822, 0.4465]).view(3, 1, 1)
std = torch.tensor([0.2023, 0.1994, 0.2010]).view(3, 1, 1)
sample_image_denorm = sample_image_cpu * std + mean
sample_image_denorm = torch.clamp(sample_image_denorm, 0, 1)

axes[0].imshow(sample_image_denorm.permute(1, 2, 0).numpy())
axes[0].set_title(f'原始图像\n真实类别: {class_names[sample_label]}')
axes[0].axis('off')

# 热图
import cv2
heatmap_resized = cv2.resize(heatmap, (32, 32))
axes[1].imshow(heatmap_resized, cmap='jet')
axes[1].set_title('Grad-CAM热图')
axes[1].axis('off')

# 叠加图
heatmap_colored = cv2.applyColorMap(np.uint8(255 * heatmap_resized), cv2.COLORMAP_JET)
heatmap_colored = cv2.cvtColor(heatmap_colored, cv2.COLOR_BGR2RGB)

img_np = sample_image_denorm.permute(1, 2, 0).numpy()
superimposed_img = heatmap_colored * 0.4 + img_np * 255
superimposed_img = np.clip(superimposed_img, 0, 255).astype('uint8')

axes[2].imshow(superimposed_img)
axes[2].set_title('热图叠加')
axes[2].axis('off')

plt.suptitle('模型解释性: Grad-CAM可视化', fontsize=14, fontweight='bold')
plt.tight_layout()
plt.show()

# 8. 模型保存和部署
print("\n8. 模型保存和部署:")

# 保存完整模型
torch.save(model.state_dict(), 'cifar10_cnn_model.pth')
print("模型参数已保存为 cifar10_cnn_model.pth")

# 保存为TorchScript格式
scripted_model = torch.jit.script(model)
scripted_model.save('cifar10_cnn_scripted.pt')
print("模型已转换为TorchScript格式")

# 模型量化(减小大小)
print("\n模型量化:")
try:
    # 动态量化
    quantized_model = torch.quantization.quantize_dynamic(
        model, {nn.Conv2d, nn.Linear}, dtype=torch.qint8
    )
    
    # 保存量化模型
    torch.save(quantized_model.state_dict(), 'cifar10_cnn_quantized.pth')
    
    # 计算模型大小
    def get_model_size(model_path):
        import os
        return os.path.getsize(model_path) / (1024 * 1024)
    
    original_size = get_model_size('cifar10_cnn_model.pth')
    quantized_size = get_model_size('cifar10_cnn_quantized.pth')
    
    print(f"原始模型大小: {original_size:.2f} MB")
    print(f"量化模型大小: {quantized_size:.2f} MB")
    print(f"压缩比: {original_size/quantized_size:.2f}x")
    
except Exception as e:
    print(f"量化失败: {e}")

# 9. 性能基准测试
print("\n9. 性能基准测试:")

import time

# 测试推理速度
test_batch = torch.randn(64, 3, 32, 32).to(device)

# 预热
for _ in range(10):
    _ = model(test_batch)

# 测量推理时间
n_runs = 100
start_time = time.time()

for _ in range(n_runs):
    _ = model(test_batch)

# 等待所有CUDA操作完成
if torch.cuda.is_available():
    torch.cuda.synchronize()

inference_time = time.time() - start_time

print(f"推理 {n_runs} 批次的时间: {inference_time:.4f}秒")
print(f"平均每批次推理时间: {inference_time/n_runs*1000:.2f}毫秒")
print(f"每秒推理图像数: {64 * n_runs / inference_time:.0f}")

# 10. 模型集成
print("\n10. 模型集成演示:")

class ModelEnsemble(nn.Module):
    """模型集成类"""
    def __init__(self, models):
        super().__init__()
        self.models = nn.ModuleList(models)
    
    def forward(self, x):
        # 收集所有模型的输出
        outputs = [model(x) for model in self.models]
        
        # 平均所有输出
        avg_output = torch.stack(outputs).mean(dim=0)
        
        return avg_output

# 创建多个模型(这里用同一个模型的不同初始化)
def create_models(num_models=3):
    models = []
    for i in range(num_models):
        model_i = CIFAR10CNN().to(device)
        # 使用不同的随机种子初始化
        torch.manual_seed(42 + i)
        model_i.apply(weights_init)
        models.append(model_i)
    
    # 恢复原始随机种子
    torch.manual_seed(42)
    return models

def weights_init(m):
    """权重初始化函数"""
    if isinstance(m, nn.Conv2d) or isinstance(m, nn.Linear):
        nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')
        if m.bias is not None:
            nn.init.constant_(m.bias, 0)
    elif isinstance(m, nn.BatchNorm2d):
        nn.init.constant_(m.weight, 1)
        nn.init.constant_(m.bias, 0)

# 创建集成模型
ensemble_models = create_models(3)
ensemble = ModelEnsemble(ensemble_models).to(device)

print(f"集成模型包含 {len(ensemble_models)} 个子模型")

# 测试集成模型性能
ensemble.eval()
with torch.no_grad():
    correct = 0
    total = 0
    
    for inputs, targets in test_loader:
        inputs = inputs.to(device)
        targets = targets.to(device)
        
        outputs = ensemble(inputs)
        _, predicted = outputs.max(1)
        
        total += targets.size(0)
        correct += predicted.eq(targets).sum().item()
    
    ensemble_accuracy = 100. * correct / total

print(f"集成模型测试准确率: {ensemble_accuracy:.2f}%")
print(f"相比单个模型提升: {ensemble_accuracy - test_accuracies[-1]:.2f}%")

# 11. 知识蒸馏演示
print("\n11. 知识蒸馏演示:")

class KnowledgeDistillation(nn.Module):
    """知识蒸馏"""
    def __init__(self, teacher_model, student_model, temperature=3.0, alpha=0.5):
        super().__init__()
        self.teacher = teacher_model
        self.student = student_model
        self.temperature = temperature
        self.alpha = alpha
    
    def forward(self, x):
        return self.student(x)
    
    def compute_loss(self, student_outputs, teacher_outputs, targets, criterion):
        """计算知识蒸馏损失"""
        # 学生损失
        student_loss = criterion(student_outputs, targets)
        
        # 蒸馏损失
        distillation_loss = nn.KLDivLoss(reduction='batchmean')(
            F.log_softmax(student_outputs / self.temperature, dim=1),
            F.softmax(teacher_outputs / self.temperature, dim=1)
        ) * (self.temperature ** 2)
        
        # 总损失
        total_loss = self.alpha * student_loss + (1 - self.alpha) * distillation_loss
        
        return total_loss, student_loss, distillation_loss

# 创建学生模型(更小的模型)
class SmallCNN(nn.Module):
    def __init__(self):
        super().__init__()
        self.conv1 = nn.Conv2d(3, 32, 3, padding=1)
        self.conv2 = nn.Conv2d(32, 64, 3, padding=1)
        self.pool = nn.MaxPool2d(2, 2)
        self.fc1 = nn.Linear(64 * 16 * 16, 256)
        self.fc2 = nn.Linear(256, 10)
        self.dropout = nn.Dropout(0.5)
    
    def forward(self, x):
        x = F.relu(self.conv1(x))
        x = self.pool(F.relu(self.conv2(x)))
        x = x.view(-1, 64 * 16 * 16)
        x = self.dropout(F.relu(self.fc1(x)))
        x = self.fc2(x)
        return x

teacher_model = model  # 使用训练好的模型作为教师
student_model = SmallCNN().to(device)

print(f"教师模型参数: {sum(p.numel() for p in teacher_model.parameters()):,}")
print(f"学生模型参数: {sum(p.numel() for p in student_model.parameters()):,}")
print(f"压缩率: {sum(p.numel() for p in teacher_model.parameters()) / sum(p.numel() for p in student_model.parameters()):.1f}x")

# 创建知识蒸馏实例
kd = KnowledgeDistillation(teacher_model, student_model, temperature=3.0, alpha=0.5).to(device)

# 训练学生模型
print("\n开始知识蒸馏训练...")
optimizer_kd = optim.Adam(student_model.parameters(), lr=0.001)
criterion_kd = nn.CrossEntropyLoss()

# 训练几个epoch
for epoch in range(10):
    student_model.train()
    teacher_model.eval()
    
    running_loss = 0.0
    correct = 0
    total = 0
    
    for batch_idx, (inputs, targets) in enumerate(train_loader):
        inputs, targets = inputs.to(device), targets.to(device)
        
        # 教师预测
        with torch.no_grad():
            teacher_outputs = teacher_model(inputs)
        
        # 学生预测
        student_outputs = student_model(inputs)
        
        # 计算损失
        loss, student_loss, distillation_loss = kd.compute_loss(
            student_outputs, teacher_outputs, targets, criterion_kd
        )
        
        # 反向传播
        optimizer_kd.zero_grad()
        loss.backward()
        optimizer_kd.step()
        
        # 统计
        running_loss += loss.item()
        _, predicted = student_outputs.max(1)
        total += targets.size(0)
        correct += predicted.eq(targets).sum().item()
    
    epoch_loss = running_loss / len(train_loader)
    epoch_acc = 100. * correct / total
    
    print(f'Epoch [{epoch+1}/10], 损失: {epoch_loss:.4f}, 准确率: {epoch_acc:.2f}%')

print("知识蒸馏训练完成!")

# 评估学生模型
student_model.eval()
with torch.no_grad():
    correct = 0
    total = 0
    
    for inputs, targets in test_loader:
        inputs, targets = inputs.to(device), targets.to(device)
        
        outputs = student_model(inputs)
        _, predicted = outputs.max(1)
        
        total += targets.size(0)
        correct += predicted.eq(targets).sum().item()
    
    student_accuracy = 100. * correct / total

print(f"学生模型测试准确率: {student_accuracy:.2f}%")
print(f"教师模型测试准确率: {test_accuracies[-1]:.2f}%")
print(f"准确率差距: {test_accuracies[-1] - student_accuracy:.2f}%")

4. 循环神经网络(RNN)和LSTM

python

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import mean_squared_error, mean_absolute_error
import yfinance as yf
import warnings
warnings.filterwarnings('ignore')

# 设置随机种子
torch.manual_seed(42)
np.random.seed(42)

# 1. 获取和准备时间序列数据
print("1. 获取股票价格数据:")

def download_stock_data(ticker='AAPL', period='2y'):
    """下载股票数据"""
    try:
        stock = yf.Ticker(ticker)
        df = stock.history(period=period)
        return df
    except:
        print(f"无法下载{ticker}数据,使用模拟数据")
        return None

# 尝试下载数据
df = download_stock_data('AAPL', '2y')

if df is not None:
    print(f"下载到 {len(df)} 天数据")
    # 使用收盘价
    data = df['Close'].values.reshape(-1, 1)
    dates = df.index
else:
    # 生成模拟股票数据
    print("使用模拟数据...")
    np.random.seed(42)
    n_points = 500
    time = np.arange(n_points)
    # 模拟股票价格:趋势 + 季节性 + 噪声
    trend = 0.001 * time
    seasonal = 10 * np.sin(2 * np.pi * time / 50)
    noise = np.random.normal(0, 5, n_points)
    data = 100 + trend + seasonal + noise
    data = data.reshape(-1, 1)
    dates = pd.date_range(start='2022-01-01', periods=n_points, freq='D')

print(f"数据形状: {data.shape}")
print(f"价格范围: [{data.min():.2f}, {data.max():.2f}]")

# 可视化原始数据
plt.figure(figsize=(12, 5))
plt.plot(dates, data, linewidth=1.5, color='blue', alpha=0.7)
plt.xlabel('日期')
plt.ylabel('价格')
plt.title('股票价格时间序列')
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()

# 2. 数据预处理
print("\n2. 数据预处理:")

# 归一化
scaler = MinMaxScaler(feature_range=(0, 1))
data_scaled = scaler.fit_transform(data)

# 创建时间序列数据集
def create_sequences(data, seq_length):
    """创建时间序列序列"""
    X, y = [], []
    for i in range(len(data) - seq_length):
        X.append(data[i:i+seq_length])
        y.append(data[i+seq_length])
    return np.array(X), np.array(y)

# 设置序列长度
SEQ_LENGTH = 20
X, y = create_sequences(data_scaled, SEQ_LENGTH)

print(f"序列数据形状: X={X.shape}, y={y.shape}")

# 分割数据集
split_ratio = 0.8
split_idx = int(len(X) * split_ratio)

X_train, X_test = X[:split_idx], X[split_idx:]
y_train, y_test = y[:split_idx], y[split_idx:]

print(f"训练集: {X_train.shape}, 测试集: {X_test.shape}")

# 转换为PyTorch张量
X_train_tensor = torch.tensor(X_train, dtype=torch.float32)
y_train_tensor = torch.tensor(y_train, dtype=torch.float32)
X_test_tensor = torch.tensor(X_test, dtype=torch.float32)
y_test_tensor = torch.tensor(y_test, dtype=torch.float32)

# 3. 构建LSTM模型
print("\n3. 构建LSTM模型:")

class LSTMForecaster(nn.Module):
    def __init__(self, input_size=1, hidden_size=64, num_layers=2, output_size=1, dropout=0.2):
        super().__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        
        # LSTM层
        self.lstm = nn.LSTM(
            input_size=input_size,
            hidden_size=hidden_size,
            num_layers=num_layers,
            batch_first=True,
            dropout=dropout if num_layers > 1 else 0,
            bidirectional=True  # 双向LSTM
        )
        
        # 注意力机制
        self.attention = nn.Sequential(
            nn.Linear(hidden_size * 2, hidden_size),
            nn.Tanh(),
            nn.Linear(hidden_size, 1)
        )
        
        # 全连接层
        self.fc = nn.Sequential(
            nn.Linear(hidden_size * 2, hidden_size),
            nn.ReLU(),
            nn.Dropout(dropout),
            nn.Linear(hidden_size, output_size)
        )
        
        # 初始化权重
        self.init_weights()
    
    def init_weights(self):
        """初始化权重"""
        for name, param in self.lstm.named_parameters():
            if 'weight_ih' in name:
                nn.init.xavier_uniform_(param.data)
            elif 'weight_hh' in name:
                nn.init.orthogonal_(param.data)
            elif 'bias' in name:
                nn.init.constant_(param.data, 0)
    
    def forward(self, x, return_attention=False):
        # LSTM前向传播
        lstm_out, (hidden, cell) = self.lstm(x)
        
        # 注意力机制
        attention_weights = self.attention(lstm_out)
        attention_weights = torch.softmax(attention_weights, dim=1)
        
        # 加权求和
        context = torch.sum(attention_weights * lstm_out, dim=1)
        
        # 全连接层
        output = self.fc(context)
        
        if return_attention:
            return output, attention_weights
        else:
            return output

# 创建模型
model = LSTMForecaster(
    input_size=1,
    hidden_size=64,
    num_layers=2,
    output_size=1,
    dropout=0.2
)

print("模型结构:")
print(model)
print(f"总参数: {sum(p.numel() for p in model.parameters()):,}")

# 4. 定义损失函数和优化器
print("\n4. 定义损失函数和优化器:")

# 自定义损失函数(结合MSE和MAE)
class HybridLoss(nn.Module):
    def __init__(self, alpha=0.7):
        super().__init__()
        self.alpha = alpha
        self.mse = nn.MSELoss()
        self.mae = nn.L1Loss()
    
    def forward(self, pred, target):
        mse_loss = self.mse(pred, target)
        mae_loss = self.mae(pred, target)
        return self.alpha * mse_loss + (1 - self.alpha) * mae_loss

criterion = HybridLoss(alpha=0.7)
optimizer = optim.AdamW(model.parameters(), lr=0.001, weight_decay=1e-4)

# 学习率调度器
scheduler = optim.lr_scheduler.ReduceLROnPlateau(
    optimizer, mode='min', factor=0.5, patience=10, verbose=True
)

# 5. 训练模型
print("\n5. 训练模型:")

# 创建数据加载器
train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)

# 训练参数
epochs = 100
train_losses = []
val_losses = []

# 训练循环
for epoch in range(epochs):
    # 训练阶段
    model.train()
    running_loss = 0.0
    
    for batch_X, batch_y in train_loader:
        # 前向传播
        outputs = model(batch_X)
        loss = criterion(outputs, batch_y)
        
        # 反向传播
        optimizer.zero_grad()
        loss.backward()
        
        # 梯度裁剪
        torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
        
        optimizer.step()
        
        running_loss += loss.item()
    
    train_loss = running_loss / len(train_loader)
    train_losses.append(train_loss)
    
    # 验证阶段
    model.eval()
    with torch.no_grad():
        val_outputs = model(X_test_tensor)
        val_loss = criterion(val_outputs, y_test_tensor).item()
        val_losses.append(val_loss)
    
    # 更新学习率
    scheduler.step(val_loss)
    
    # 打印进度
    if (epoch + 1) % 20 == 0:
        print(f'Epoch [{epoch+1}/{epochs}], '
              f'Train Loss: {train_loss:.6f}, Val Loss: {val_loss:.6f}')

print("训练完成!")

# 6. 模型评估
print("\n6. 模型评估:")

# 预测
model.eval()
with torch.no_grad():
    y_train_pred = model(X_train_tensor)
    y_test_pred = model(X_test_tensor)
    
    # 反归一化
    y_train_pred_actual = scaler.inverse_transform(y_train_pred.numpy())
    y_test_pred_actual = scaler.inverse_transform(y_test_pred.numpy())
    
    y_train_actual = scaler.inverse_transform(y_train.numpy())
    y_test_actual = scaler.inverse_transform(y_test.numpy())

# 计算误差指标
def calculate_metrics(y_true, y_pred):
    """计算各种误差指标"""
    mse = mean_squared_error(y_true, y_pred)
    rmse = np.sqrt(mse)
    mae = mean_absolute_error(y_true, y_pred)
    mape = np.mean(np.abs((y_true - y_pred) / y_true)) * 100
    
    return {
        'MSE': mse,
        'RMSE': rmse,
        'MAE': mae,
        'MAPE': mape
    }

train_metrics = calculate_metrics(y_train_actual, y_train_pred_actual)
test_metrics = calculate_metrics(y_test_actual, y_test_pred_actual)

print("训练集误差指标:")
for metric, value in train_metrics.items():
    print(f"  {metric}: {value:.4f}")

print("\n测试集误差指标:")
for metric, value in test_metrics.items():
    print(f"  {metric}: {value:.4f}")

# 7. 可视化结果
print("\n7. 可视化结果:")

fig, axes = plt.subplots(2, 3, figsize=(15, 10))

# 训练历史
axes[0, 0].plot(train_losses, label='训练损失')
axes[0, 0].plot(val_losses, label='验证损失')
axes[0, 0].set_xlabel('Epoch')
axes[0, 0].set_ylabel('损失')
axes[0, 0].set_title('训练历史')
axes[0, 0].legend()
axes[0, 0].grid(True, alpha=0.3)

# 整体预测对比
train_dates = dates[SEQ_LENGTH:split_idx+SEQ_LENGTH]
test_dates = dates[split_idx+SEQ_LENGTH:]

axes[0, 1].plot(dates[SEQ_LENGTH:], scaler.inverse_transform(data_scaled[SEQ_LENGTH:]), 
                label='实际价格', alpha=0.7, linewidth=1)
axes[0, 1].plot(train_dates, y_train_pred_actual, label='训练集预测', alpha=0.7)
axes[0, 1].plot(test_dates, y_test_pred_actual, label='测试集预测', alpha=0.7)
axes[0, 1].set_xlabel('日期')
axes[0, 1].set_ylabel('价格')
axes[0, 1].set_title('整体预测对比')
axes[0, 1].legend()
axes[0, 1].grid(True, alpha=0.3)
axes[0, 1].tick_params(axis='x', rotation=45)

# 测试集详细对比
axes[0, 2].plot(test_dates, y_test_actual, label='实际价格', linewidth=1.5)
axes[0, 2].plot(test_dates, y_test_pred_actual, label='预测价格', linewidth=1.5, linestyle='--')
axes[0, 2].set_xlabel('日期')
axes[0, 2].set_ylabel('价格')
axes[0, 2].set_title('测试集预测对比')
axes[0, 2].legend()
axes[0, 2].grid(True, alpha=0.3)
axes[0, 2].tick_params(axis='x', rotation=45)

# 残差分析
train_residuals = y_train_actual - y_train_pred_actual
test_residuals = y_test_actual - y_test_pred_actual

axes[1, 0].hist(train_residuals, bins=30, alpha=0.7, label='训练集残差', density=True)
axes[1, 0].hist(test_residuals, bins=30, alpha=0.7, label='测试集残差', density=True)
axes[1, 0].axvline(x=0, color='r', linestyle='--', alpha=0.5)
axes[1, 0].set_xlabel('残差')
axes[1, 0].set_ylabel('密度')
axes[1, 0].set_title('残差分布')
axes[1, 0].legend()
axes[1, 0].grid(True, alpha=0.3)

# 预测vs实际散点图
axes[1, 1].scatter(y_train_actual, y_train_pred_actual, alpha=0.6, s=20, label='训练集')
axes[1, 1].scatter(y_test_actual, y_test_pred_actual, alpha=0.6, s=20, label='测试集')
axes[1, 1].plot([y_train_actual.min(), y_train_actual.max()], 
                [y_train_actual.min(), y_train_actual.max()], 
                'r--', label='理想预测', linewidth=2)
axes[1, 1].set_xlabel('实际价格')
axes[1, 1].set_ylabel('预测价格')
axes[1, 1].set_title('预测vs实际')
axes[1, 1].legend()
axes[1, 1].grid(True, alpha=0.3)

# 注意力权重可视化(最后一个序列)
model.eval()
with torch.no_grad():
    sample_sequence = X_test_tensor[-1:].unsqueeze(-1)
    _, attention_weights = model(sample_sequence, return_attention=True)
    
    attention_weights = attention_weights.squeeze().numpy()
    time_steps = range(SEQ_LENGTH)
    
    axes[1, 2].bar(time_steps, attention_weights, color=plt.cm.viridis(attention_weights))
    axes[1, 2].set_xlabel('时间步')
    axes[1, 2].set_ylabel('注意力权重')
    axes[1, 2].set_title('注意力权重分布')
    axes[1, 2].grid(True, alpha=0.3, axis='y')

plt.suptitle('LSTM时间序列预测完整分析', fontsize=16, fontweight='bold')
plt.tight_layout()
plt.show()

# 8. 多步预测
print("\n8. 多步预测演示:")

def rolling_forecast(model, initial_sequence, n_steps):
    """进行滚动预测"""
    predictions = []
    current_sequence = initial_sequence.clone()
    
    model.eval()
    with torch.no_grad():
        for _ in range(n_steps):
            # 预测下一步
            pred = model(current_sequence.unsqueeze(0))
            predictions.append(pred.item())
            
            # 更新序列:移除第一个,添加新预测
            current_sequence = torch.cat([current_sequence[1:], pred])
    
    return np.array(predictions)

# 选择测试集开始的一个序列
initial_idx = 0
initial_sequence = X_test_tensor[initial_idx]
n_forecast_steps = 30

# 进行滚动预测
rolling_preds_scaled = rolling_forecast(model, initial_sequence, n_forecast_steps)
rolling_preds = scaler.inverse_transform(rolling_preds_scaled.reshape(-1, 1)).flatten()

# 获取对应的实际值
actual_start_idx = split_idx + SEQ_LENGTH + initial_idx
actual_values = data[actual_start_idx:actual_start_idx+n_forecast_steps].flatten()
actual_dates = dates[actual_start_idx:actual_start_idx+n_forecast_steps]

# 可视化滚动预测
plt.figure(figsize=(12, 6))
plt.plot(actual_dates, actual_values, 'o-', label='实际价格', linewidth=2, markersize=8)
plt.plot(actual_dates, rolling_preds, 's-', label='滚动预测', linewidth=2, markersize=8)
plt.xlabel('日期')
plt.ylabel('价格')
plt.title(f'{n_forecast_steps}步滚动预测')
plt.legend()
plt.grid(True, alpha=0.3)
plt.xticks(rotation=45)
plt.tight_layout()
plt.show()

# 计算滚动预测的误差
rolling_mae = mean_absolute_error(actual_values, rolling_preds)
rolling_rmse = np.sqrt(mean_squared_error(actual_values, rolling_preds))
rolling_mape = np.mean(np.abs((actual_values - rolling_preds) / actual_values)) * 100

print(f"滚动预测误差指标:")
print(f"  MAE: {rolling_mae:.4f}")
print(f"  RMSE: {rolling_rmse:.4f}")
print(f"  MAPE: {rolling_mape:.2f}%")

# 9. 模型解释性分析
print("\n9. 模型解释性分析:")

# 计算特征重要性(通过梯度)
def compute_feature_importance(model, X, y, criterion):
    """通过梯度计算特征重要性"""
    X_tensor = X.clone().requires_grad_()
    
    model.eval()
    output = model(X_tensor.unsqueeze(0))
    loss = criterion(output, y.unsqueeze(0))
    
    # 计算梯度
    loss.backward()
    
    # 获取梯度并计算重要性
    gradients = X_tensor.grad.abs().mean(dim=0).numpy()
    return gradients

# 选择一个序列进行分析
sample_idx = 50
sample_X = X_test_tensor[sample_idx]
sample_y = y_test_tensor[sample_idx]

importance = compute_feature_importance(model, sample_X, sample_y, criterion)

# 可视化特征重要性
plt.figure(figsize=(10, 6))
time_steps = range(SEQ_LENGTH)
plt.bar(time_steps, importance, color=plt.cm.viridis(importance))
plt.xlabel('时间步')
plt.ylabel('梯度重要性')
plt.title('时间步重要性分析')
plt.grid(True, alpha=0.3, axis='y')
plt.tight_layout()
plt.show()

# 10. 模型部署
print("\n10. 模型部署:")

# 保存模型
torch.save(model.state_dict(), 'lstm_forecaster.pth')
print("模型参数已保存为 lstm_forecaster.pth")

# 保存为TorchScript
traced_model = torch.jit.trace(model, X_test_tensor[:1])
traced_model.save('lstm_forecaster_traced.pt')
print("模型已转换为TorchScript格式")

# 保存归一化器
import joblib
joblib.dump(scaler, 'scaler.pkl')
print("归一化器已保存为 scaler.pkl")

# 创建推理类
class LSTMPredictor:
    def __init__(self, model_path, scaler_path, seq_length=20):
        # 加载模型
        self.model = LSTMForecaster()
        self.model.load_state_dict(torch.load(model_path, map_location='cpu'))
        self.model.eval()
        
        # 加载归一化器
        self.scaler = joblib.load(scaler_path)
        
        self.seq_length = seq_length
    
    def predict(self, data):
        """预测单个序列"""
        # 归一化
        data_scaled = self.scaler.transform(data.reshape(-1, 1)).flatten()
        
        # 转换为张量
        data_tensor = torch.tensor(data_scaled, dtype=torch.float32).unsqueeze(0)
        
        # 预测
        with torch.no_grad():
            prediction_scaled = self.model(data_tensor).item()
        
        # 反归一化
        prediction = self.scaler.inverse_transform([[prediction_scaled]])[0, 0]
        
        return prediction
    
    def predict_sequence(self, initial_data, n_steps):
        """进行多步预测"""
        predictions = []
        current_sequence = initial_data[-self.seq_length:].copy()
        
        for _ in range(n_steps):
            # 预测下一步
            pred = self.predict(current_sequence)
            predictions.append(pred)
            
            # 更新序列
            current_sequence = np.append(current_sequence[1:], pred)
        
        return np.array(predictions)

# 测试推理类
predictor = LSTMPredictor('lstm_forecaster.pth', 'scaler.pkl')

# 使用测试数据进行预测
test_initial_data = data[-SEQ_LENGTH-30:-30].flatten()
predictions = predictor.predict_sequence(test_initial_data, n_steps=30)

# 可视化预测结果
actual_values = data[-30:].flatten()
plt.figure(figsize=(12, 6))
plt.plot(range(30), actual_values, 'o-', label='实际价格', linewidth=2)
plt.plot(range(30), predictions, 's-', label='预测价格', linewidth=2)
plt.xlabel('时间步')
plt.ylabel('价格')
plt.title('30步预测结果')
plt.legend()
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()

# 11. 性能优化
print("\n11. 性能优化:")

# 使用@torch.jit.script优化关键函数
@torch.jit.script
def optimized_lstm_forward(x: torch.Tensor, 
                          lstm_weight_ih: torch.Tensor,
                          lstm_weight_hh: torch.Tensor,
                          lstm_bias_ih: torch.Tensor,
                          lstm_bias_hh: torch.Tensor,
                          fc_weight: torch.Tensor,
                          fc_bias: torch.Tensor) -> torch.Tensor:
    """优化的LSTM前向传播函数"""
    batch_size, seq_len, input_size = x.shape
    hidden_size = lstm_weight_ih.shape[0] // 4
    
    # 初始化隐藏状态和细胞状态
    h = torch.zeros(1, batch_size, hidden_size)
    c = torch.zeros(1, batch_size, hidden_size)
    
    # LSTM计算
    outputs = []
    for t in range(seq_len):
        xt = x[:, t, :]
        
        # 计算LSTM门
        gates = torch.matmul(xt, lstm_weight_ih.T) + lstm_bias_ih + \
                torch.matmul(h.squeeze(0), lstm_weight_hh.T) + lstm_bias_hh
        
        # 分割门
        i, f, g, o = gates.chunk(4, 1)
        
        # 激活函数
        i = torch.sigmoid(i)
        f = torch.sigmoid(f)
        g = torch.tanh(g)
        o = torch.sigmoid(o)
        
        # 更新细胞状态和隐藏状态
        c = f * c + i * g
        h = o * torch.tanh(c)
        
        outputs.append(h)
    
    # 取最后一个隐藏状态
    last_hidden = outputs[-1]
    
    # 全连接层
    output = torch.matmul(last_hidden.squeeze(0), fc_weight.T) + fc_bias
    
    return output

# 提取模型参数
lstm = model.lstm
fc = model.fc[0]  # 取第一个全连接层

# 测试优化后的函数
test_input = torch.randn(1, SEQ_LENGTH, 1)
optimized_output = optimized_lstm_forward(
    test_input,
    lstm.weight_ih_l0,
    lstm.weight_hh_l0,
    lstm.bias_ih_l0,
    lstm.bias_hh_l0,
    fc.weight,
    fc.bias
)

print(f"优化函数输出形状: {optimized_output.shape}")

# 性能比较
import time

# 原始模型推理时间
start_time = time.time()
for _ in range(100):
    _ = model(test_input)
original_time = time.time() - start_time

# 优化函数推理时间
start_time = time.time()
for _ in range(100):
    _ = optimized_lstm_forward(
        test_input,
        lstm.weight_ih_l0,
        lstm.weight_hh_l0,
        lstm.bias_ih_l0,
        lstm.bias_hh_l0,
        fc.weight,
        fc.bias
    )
optimized_time = time.time() - start_time

print(f"原始模型推理时间: {original_time:.4f}秒")
print(f"优化函数推理时间: {optimized_time:.4f}秒")
print(f"加速比: {original_time/optimized_time:.2f}x")

# 12. 交易策略回测
print("\n12. 交易策略回测:")

def backtest_trading_strategy(predictions, actual_prices, initial_capital=10000):
    """回测交易策略"""
    capital = initial_capital
    position = 0  # 0: 空仓, 1: 持仓
    shares = 0
    trades = []
    equity_curve = [capital]
    
    for i in range(1, len(predictions)):
        current_price = actual_prices[i]
        previous_price = actual_prices[i-1]
        
        # 预测明天的价格变化
        predicted_return = (predictions[i] - previous_price) / previous_price
        
        # 简单的交易策略
        if predicted_return > 0.01 and position == 0:  # 预测上涨超过1%
            # 买入
            position = 1
            shares = capital / current_price
            capital = 0
            trades.append(('BUY', i, current_price))
        
        elif predicted_return < -0.01 and position == 1:  # 预测下跌超过1%
            # 卖出
            position = 0
            capital = shares * current_price
            shares = 0
            trades.append(('SELL', i, current_price))
        
        # 计算当前权益
        if position == 1:
            current_equity = shares * current_price
        else:
            current_equity = capital
        
        equity_curve.append(current_equity)
    
    # 如果最后还持有仓位,强制平仓
    if position == 1:
        capital = shares * actual_prices[-1]
        trades.append(('SELL', len(predictions)-1, actual_prices[-1]))
    
    final_capital = equity_curve[-1]
    total_return = (final_capital - initial_capital) / initial_capital * 100
    
    return {
        'final_capital': final_capital,
        'total_return': total_return,
        'trades': trades,
        'equity_curve': equity_curve
    }

# 使用测试集进行回测
test_predictions = y_test_pred_actual.flatten()
test_actuals = y_test_actual.flatten()

results = backtest_trading_strategy(test_predictions, test_actuals)

print(f"初始资金: $10,000")
print(f"最终资金: ${results['final_capital']:.2f}")
print(f"总收益率: {results['total_return']:.2f}%")
print(f"交易次数: {len(results['trades'])}")

# 绘制权益曲线
plt.figure(figsize=(12, 5))
plt.subplot(1, 2, 1)
plt.plot(results['equity_curve'], linewidth=2)
plt.axhline(y=10000, color='r', linestyle='--', alpha=0.5, label='初始资金')
plt.xlabel('交易日')
plt.ylabel('资金')
plt.title('交易策略权益曲线')
plt.legend()
plt.grid(True, alpha=0.3)

# 绘制买卖点
plt.subplot(1, 2, 2)
plt.plot(test_actuals, label='实际价格', linewidth=1)

buy_points = [i for action, i, price in results['trades'] if action == 'BUY']
sell_points = [i for action, i, price in results['trades'] if action == 'SELL']

plt.scatter(buy_points, test_actuals[buy_points], color='green', 
           s=100, marker='^', label='买入点', zorder=5)
plt.scatter(sell_points, test_actuals[sell_points], color='red', 
           s=100, marker='v', label='卖出点', zorder=5)

plt.xlabel('交易日')
plt.ylabel('价格')
plt.title('交易信号')
plt.legend()
plt.grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

# 基准比较(买入持有策略)
buy_hold_return = (test_actuals[-1] - test_actuals[0]) / test_actuals[0] * 100
print(f"\n基准比较:")
print(f"买入持有策略收益率: {buy_hold_return:.2f}%")
print(f"主动交易策略收益率: {results['total_return']:.2f}%")
print(f"超额收益: {results['total_return'] - buy_hold_return:.2f}%")

# 计算风险调整后收益
returns = np.diff(results['equity_curve']) / results['equity_curve'][:-1]
sharpe_ratio = np.mean(returns) / np.std(returns) * np.sqrt(252)  # 年化夏普比率
max_drawdown = (np.maximum.accumulate(results['equity_curve']) - results['equity_curve']).max() / np.maximum.accumulate(results['equity_curve']).max() * 100

print(f"年化夏普比率: {sharpe_ratio:.3f}")
print(f"最大回撤: {max_drawdown:.2f}%")

print("\nPyTorch LSTM时间序列预测和交易策略演示完成!")

四、高级用法

1. 自定义自动微分和分布式训练

python

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset, DistributedSampler
import torch.distributed as dist
import torch.multiprocessing as mp
from torch.nn.parallel import DistributedDataParallel as DDP
import numpy as np
import os

# 1. 自定义自动微分函数
print("1. 自定义自动微分函数:")

# 自定义ReLU激活函数(带手动梯度计算)
class CustomReLU(torch.autograd.Function):
    @staticmethod
    def forward(ctx, input):
        # 保存输入用于反向传播
        ctx.save_for_backward(input)
        return input.clamp(min=0)
    
    @staticmethod
    def backward(ctx, grad_output):
        # 获取保存的输入
        input, = ctx.saved_tensors
        # 计算梯度:输入>0时梯度为1,否则为0
        grad_input = grad_output.clone()
        grad_input[input < 0] = 0
        return grad_input

# 使用自定义函数
custom_relu = CustomReLU.apply

# 测试自定义ReLU
x = torch.tensor([-2.0, -1.0, 0.0, 1.0, 2.0], requires_grad=True)
y = custom_relu(x)
print(f"输入: {x}")
print(f"CustomReLU输出: {y}")

# 计算梯度
y.sum().backward()
print(f"梯度: {x.grad}")

# 对比内置ReLU
x2 = torch.tensor([-2.0, -1.0, 0.0, 1.0, 2.0], requires_grad=True)
y2 = F.relu(x2)
y2.sum().backward()
print(f"内置ReLU梯度: {x2.grad}")

# 2. 自定义优化器
print("\n2. 自定义优化器:")

class CustomAdam(optim.Optimizer):
    def __init__(self, params, lr=0.001, betas=(0.9, 0.999), eps=1e-8, weight_decay=0):
        defaults = dict(lr=lr, betas=betas, eps=eps, weight_decay=weight_decay)
        super().__init__(params, defaults)
    
    @torch.no_grad()
    def step(self, closure=None):
        loss = None
        if closure is not None:
            with torch.enable_grad():
                loss = closure()
        
        for group in self.param_groups:
            for p in group['params']:
                if p.grad is None:
                    continue
                
                grad = p.grad
                state = self.state[p]
                
                # 初始化状态
                if len(state) == 0:
                    state['step'] = 0
                    state['exp_avg'] = torch.zeros_like(p)
                    state['exp_avg_sq'] = torch.zeros_like(p)
                
                exp_avg, exp_avg_sq = state['exp_avg'], state['exp_avg_sq']
                beta1, beta2 = group['betas']
                
                state['step'] += 1
                
                # 权重衰减
                if group['weight_decay'] != 0:
                    grad = grad.add(p, alpha=group['weight_decay'])
                
                # 更新一阶和二阶矩估计
                exp_avg.mul_(beta1).add_(grad, alpha=1 - beta1)
                exp_avg_sq.mul_(beta2).addcmul_(grad, grad, value=1 - beta2)
                
                # 偏置校正
                bias_correction1 = 1 - beta1 ** state['step']
                bias_correction2 = 1 - beta2 ** state['step']
                
                step_size = group['lr'] / bias_correction1
                
                # 更新参数
                denom = (exp_avg_sq.sqrt() / (bias_correction2 ** 0.5)).add_(group['eps'])
                p.addcdiv_(exp_avg, denom, value=-step_size)
        
        return loss

# 测试自定义优化器
model = nn.Linear(10, 2)
optimizer = CustomAdam(model.parameters(), lr=0.01)

# 模拟训练步骤
for epoch in range(3):
    optimizer.zero_grad()
    inputs = torch.randn(5, 10)
    targets = torch.randn(5, 2)
    outputs = model(inputs)
    loss = F.mse_loss(outputs, targets)
    loss.backward()
    optimizer.step()
    print(f"Epoch {epoch+1}, Loss: {loss.item():.4f}")

# 3. 混合精度训练
print("\n3. 混合精度训练:")

from torch.cuda.amp import autocast, GradScaler

# 创建模型和优化器
model = nn.Sequential(
    nn.Linear(100, 200),
    nn.ReLU(),
    nn.Linear(200, 100),
    nn.ReLU(),
    nn.Linear(100, 10)
).cuda()

optimizer = optim.Adam(model.parameters(), lr=0.001)
scaler = GradScaler()  # 梯度缩放器

# 混合精度训练循环
for epoch in range(3):
    for batch in range(10):
        # 生成数据
        inputs = torch.randn(32, 100).cuda()
        targets = torch.randn(32, 10).cuda()
        
        # 前向传播(使用自动混合精度)
        with autocast():
            outputs = model(inputs)
            loss = F.mse_loss(outputs, targets)
        
        # 反向传播
        optimizer.zero_grad()
        scaler.scale(loss).backward()  # 缩放损失
        
        # 更新参数
        scaler.step(optimizer)
        scaler.update()  # 更新缩放器
        
    print(f"Epoch {epoch+1}, Loss: {loss.item():.4f}")

# 4. 梯度累积
print("\n4. 梯度累积:")

def train_with_gradient_accumulation(model, dataloader, optimizer, accumulation_steps=4):
    """使用梯度累积进行训练"""
    model.train()
    optimizer.zero_grad()
    
    for batch_idx, (inputs, targets) in enumerate(dataloader):
        inputs, targets = inputs.cuda(), targets.cuda()
        
        # 前向传播
        outputs = model(inputs)
        loss = F.cross_entropy(outputs, targets)
        
        # 缩放损失(考虑累积步数)
        loss = loss / accumulation_steps
        
        # 反向传播
        loss.backward()
        
        # 每accumulation_steps步更新一次参数
        if (batch_idx + 1) % accumulation_steps == 0:
            optimizer.step()
            optimizer.zero_grad()
            print(f"Batch {batch_idx+1}, Loss: {loss.item() * accumulation_steps:.4f}")

# 5. 模型并行
print("\n5. 模型并行:")

class ModelParallelNN(nn.Module):
    def __init__(self, device_ids=[0, 1]):
        super().__init__()
        # 将模型分割到不同GPU
        self.seq1 = nn.Sequential(
            nn.Linear(100, 200),
            nn.ReLU(),
            nn.Linear(200, 100)
        ).to(device_ids[0])
        
        self.seq2 = nn.Sequential(
            nn.Linear(100, 50),
            nn.ReLU(),
            nn.Linear(50, 10)
        ).to(device_ids[1])
        
        self.device_ids = device_ids
    
    def forward(self, x):
        # 在第一个GPU上计算
        x = self.seq1(x.to(self.device_ids[0]))
        
        # 将中间结果移动到第二个GPU
        x = x.to(self.device_ids[1])
        
        # 在第二个GPU上计算
        x = self.seq2(x)
        
        return x

# 测试模型并行
if torch.cuda.device_count() >= 2:
    model_parallel = ModelParallelNN(device_ids=[0, 1])
    inputs = torch.randn(32, 100)
    outputs = model_parallel(inputs)
    print(f"模型并行输出形状: {outputs.shape}")
else:
    print("需要至少2个GPU进行模型并行测试")

# 6. 数据并行
print("\n6. 数据并行:")

def setup_ddp(rank, world_size):
    """设置分布式训练环境"""
    os.environ['MASTER_ADDR'] = 'localhost'
    os.environ['MASTER_PORT'] = '12355'
    dist.init_process_group("gloo", rank=rank, world_size=world_size)

def cleanup_ddp():
    """清理分布式训练环境"""
    dist.destroy_process_group()

def train_ddp(rank, world_size):
    """分布式数据并行训练函数"""
    setup_ddp(rank, world_size)
    
    # 创建模型并包装为DDP
    model = nn.Sequential(
        nn.Linear(100, 200),
        nn.ReLU(),
        nn.Linear(200, 10)
    ).to(rank)
    
    ddp_model = DDP(model, device_ids=[rank])
    
    # 创建数据加载器
    dataset = TensorDataset(torch.randn(1000, 100), torch.randn(1000, 10))
    sampler = DistributedSampler(dataset, num_replicas=world_size, rank=rank)
    dataloader = DataLoader(dataset, batch_size=32, sampler=sampler)
    
    # 训练
    optimizer = optim.Adam(ddp_model.parameters(), lr=0.001)
    
    for epoch in range(2):
        sampler.set_epoch(epoch)
        for batch_idx, (inputs, targets) in enumerate(dataloader):
            inputs, targets = inputs.to(rank), targets.to(rank)
            
            optimizer.zero_grad()
            outputs = ddp_model(inputs)
            loss = F.mse_loss(outputs, targets)
            loss.backward()
            optimizer.step()
            
            if batch_idx % 10 == 0:
                print(f"Rank {rank}, Epoch {epoch}, Batch {batch_idx}, Loss: {loss.item():.4f}")
    
    cleanup_ddp()

# 启动分布式训练(注释掉,实际使用时取消注释)
# if __name__ == "__main__":
#     world_size = 2
#     mp.spawn(train_ddp, args=(world_size,), nprocs=world_size, join=True)

# 7. 梯度检查点(内存优化)
print("\n7. 梯度检查点(内存优化):")

from torch.utils.checkpoint import checkpoint

class MemoryEfficientNN(nn.Module):
    def __init__(self):
        super().__init__()
        self.layers = nn.ModuleList([
            nn.Linear(100, 200),
            nn.Linear(200, 300),
            nn.Linear(300, 400),
            nn.Linear(400, 300),
            nn.Linear(300, 200),
            nn.Linear(200, 100),
            nn.Linear(100, 10)
        ])
    
    def forward(self, x):
        # 使用梯度检查点节省内存
        for layer in self.layers[:-1]:
            x = checkpoint(lambda x, l=layer: F.relu(l(x)), x)
        x = self.layers[-1](x)
        return x

# 测试内存高效模型
model_efficient = MemoryEfficientNN()
inputs = torch.randn(32, 100, requires_grad=True)
outputs = model_efficient(inputs)
print(f"内存高效模型输出形状: {outputs.shape}")

# 8. 自定义数据加载器
print("\n8. 自定义数据加载器:")

class CustomDataset(torch.utils.data.Dataset):
    def __init__(self, size=1000):
        self.data = torch.randn(size, 100)
        self.labels = torch.randint(0, 10, (size,))
    
    def __len__(self):
        return len(self.data)
    
    def __getitem__(self, idx):
        # 数据增强
        data = self.data[idx]
        label = self.labels[idx]
        
        # 添加随机噪声作为数据增强
        if torch.rand(1) > 0.5:
            noise = torch.randn_like(data) * 0.1
            data = data + noise
        
        return data, label

# 使用自定义数据集
custom_dataset = CustomDataset(1000)
custom_loader = DataLoader(custom_dataset, batch_size=32, shuffle=True)

# 9. 模型剪枝
print("\n9. 模型剪枝:")

from torch.nn.utils import prune

# 创建模型
model_to_prune = nn.Sequential(
    nn.Linear(100, 200),
    nn.ReLU(),
    nn.Linear(200, 100),
    nn.ReLU(),
    nn.Linear(100, 10)
)

# 随机剪枝
parameters_to_prune = (
    (model_to_prune[0], 'weight'),
    (model_to_prune[2], 'weight'),
    (model_to_prune[4], 'weight'),
)

prune.global_unstructured(
    parameters_to_prune,
    pruning_method=prune.L1Unstructured,
    amount=0.3  # 剪枝30%的参数
)

# 计算剪枝后的稀疏度
def compute_sparsity(model):
    total_params = 0
    zero_params = 0
    
    for name, param in model.named_parameters():
        total_params += param.numel()
        zero_params += torch.sum(param == 0).item()
    
    return zero_params / total_params * 100

sparsity = compute_sparsity(model_to_prune)
print(f"模型稀疏度: {sparsity:.2f}%")

# 10. 量化感知训练
print("\n10. 量化感知训练:")

from torch.quantization import QuantStub, DeQuantStub, prepare_qat, convert

class QATModel(nn.Module):
    def __init__(self):
        super().__init__()
        self.quant = QuantStub()
        self.dequant = DeQuantStub()
        
        self.layers = nn.Sequential(
            nn.Linear(100, 200),
            nn.ReLU(),
            nn.Linear(200, 100),
            nn.ReLU(),
            nn.Linear(100, 10)
        )
    
    def forward(self, x):
        x = self.quant(x)
        x = self.layers(x)
        x = self.dequant(x)
        return x

# 创建量化感知模型
qat_model = QATModel()

# 准备量化感知训练
qat_model.train()
qat_model = prepare_qat(qat_model)

# 模拟训练
optimizer = optim.Adam(qat_model.parameters(), lr=0.001)
for epoch in range(3):
    inputs = torch.randn(32, 100)
    targets = torch.randn(32, 10)
    
    optimizer.zero_grad()
    outputs = qat_model(inputs)
    loss = F.mse_loss(outputs, targets)
    loss.backward()
    optimizer.step()
    
    print(f"QAT Epoch {epoch+1}, Loss: {loss.item():.4f}")

# 转换为量化模型
qat_model.eval()
quantized_model = convert(qat_model)

print("量化感知训练完成!")

# 11. 模型融合
print("\n11. 模型融合:")

def fuse_model(model):
    """融合模型中的Conv-BN-ReLU层"""
    # 这里以Sequential模型为例
    torch.quantization.fuse_modules(model, [['0', '1', '2']], inplace=True)
    return model

# 12. ONNX导出
print("\n12. ONNX导出:")

def export_to_onnx(model, input_shape, filename='model.onnx'):
    """导出模型到ONNX格式"""
    dummy_input = torch.randn(*input_shape)
    
    # 导出模型
    torch.onnx.export(
        model,
        dummy_input,
        filename,
        export_params=True,
        opset_version=11,
        do_constant_folding=True,
        input_names=['input'],
        output_names=['output'],
        dynamic_axes={
            'input': {0: 'batch_size'},
            'output': {0: 'batch_size'}
        }
    )
    
    print(f"模型已导出到 {filename}")

# 测试ONNX导出
simple_model = nn.Linear(10, 2)
export_to_onnx(simple_model, (1, 10), 'simple_model.onnx')

# 13. 性能分析
print("\n13. 性能分析:")

from torch.profiler import profile, record_function, ProfilerActivity

def profile_model(model, input_shape):
    """分析模型性能"""
    model.eval()
    inputs = torch.randn(*input_shape)
    
    with profile(
        activities=[ProfilerActivity.CPU, ProfilerActivity.CUDA],
        record_shapes=True,
        profile_memory=True,
        with_stack=True
    ) as prof:
        with record_function("model_inference"):
            _ = model(inputs)
    
    # 打印分析结果
    print(prof.key_averages().table(sort_by="cuda_time_total", row_limit=10))

# 测试性能分析
if torch.cuda.is_available():
    profile_model(model.cuda(), (32, 100))

# 14. 自定义分布式策略
print("\n14. 自定义分布式策略:")

class CustomDistributedStrategy:
    def __init__(self, model, optimizer, world_size):
        self.model = model
        self.optimizer = optimizer
        self.world_size = world_size
    
    def all_reduce_gradients(self):
        """自定义梯度同步策略"""
        for param in self.model.parameters():
            if param.grad is not None:
                # 简单的梯度平均
                dist.all_reduce(param.grad, op=dist.ReduceOp.SUM)
                param.grad.data /= self.world_size
    
    def step(self):
        """自定义优化步骤"""
        # 同步梯度
        self.all_reduce_gradients()
        
        # 更新参数
        self.optimizer.step()

# 15. 模型监控和可视化
print("\n15. 模型监控和可视化:")

from torch.utils.tensorboard import SummaryWriter

class ModelMonitor:
    def __init__(self, log_dir='runs/experiment'):
        self.writer = SummaryWriter(log_dir)
        self.step = 0
    
    def log_scalar(self, tag, value):
        self.writer.add_scalar(tag, value, self.step)
    
    def log_histogram(self, tag, values):
        self.writer.add_histogram(tag, values, self.step)
    
    def log_model_graph(self, model, input_tensor):
        self.writer.add_graph(model, input_tensor)
    
    def increment_step(self):
        self.step += 1
    
    def close(self):
        self.writer.close()

# 使用模型监控
monitor = ModelMonitor()

# 记录训练过程
for epoch in range(3):
    for batch in range(10):
        # 模拟训练
        loss = torch.rand(1).item()
        accuracy = torch.rand(1).item()
        
        # 记录标量
        monitor.log_scalar('Loss/train', loss)
        monitor.log_scalar('Accuracy/train', accuracy)
        
        # 记录直方图
        weights = torch.randn(100)
        monitor.log_histogram('Weights/fc1', weights)
        
        monitor.increment_step()

monitor.close()
print("训练过程已记录到TensorBoard")

print("\nPyTorch高级用法演示完成!")

五、实际应用场景

1. 智能对话系统

python

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import numpy as np
import json
import re
from collections import Counter
import matplotlib.pyplot as plt
from tqdm import tqdm
import warnings
warnings.filterwarnings('ignore')

class IntelligentDialogSystem:
    def __init__(self):
        self.vocab = None
        self.vocab_size = 0
        self.max_len = 0
        self.model = None
        self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
        
    def prepare_data(self):
        """准备对话数据集"""
        print("准备对话数据集...")
        
        # 创建模拟对话数据
        conversations = [
            ["你好", "你好!有什么可以帮助你的吗?"],
            ["今天天气怎么样", "今天天气很好,阳光明媚。"],
            ["你会做什么", "我可以回答问题、聊天、提供建议。"],
            ["给我讲个笑话", "为什么程序员喜欢黑暗模式?因为光会吸引bug。"],
            ["谢谢", "不客气!很高兴能帮到你。"],
            ["再见", "再见!祝你有个美好的一天。"],
            ["你叫什么名字", "我是智能助手,很高兴认识你!"],
            ["今天几号", "今天是美好的一天,具体日期请查看日历。"],
            ["推荐一部电影", "《肖申克的救赎》是一部经典电影,值得一看。"],
            ["你会唱歌吗", "我是文本模型,不会唱歌,但我可以推荐好听的歌曲。"],
            ["什么是人工智能", "人工智能是模拟人类智能的计算机系统。"],
            ["Python好学吗", "Python是一门很好的入门语言,相对容易学习。"],
            ["深度学习是什么", "深度学习是机器学习的一个分支,使用多层神经网络。"],
            ["PyTorch有什么优点", "PyTorch灵活易用,支持动态计算图,适合研究和生产。"],
            ["明天会下雨吗", "我无法预测天气,建议查看天气预报应用。"]
        ]
        
        return conversations
    
    def build_vocabulary(self, conversations, min_freq=1):
        """构建词汇表"""
        print("构建词汇表...")
        
        # 收集所有词语
        all_words = []
        for conv in conversations:
            for sentence in conv:
                # 简单的中文分词(按字符分割)
                words = list(sentence)
                all_words.extend(words)
        
        # 统计词频
        word_counts = Counter(all_words)
        
        # 构建词汇表
        vocab = {'<PAD>': 0, '<UNK>': 1, '<SOS>': 2, '<EOS>': 3}
        
        for word, count in word_counts.items():
            if count >= min_freq:
                vocab[word] = len(vocab)
        
        self.vocab = vocab
        self.vocab_size = len(vocab)
        self.max_len = max(len(s) for conv in conversations for s in conv) + 2  # 加上SOS和EOS
        
        print(f"词汇表大小: {self.vocab_size}")
        print(f"最大序列长度: {self.max_len}")
        
        return vocab
    
    def sentence_to_indices(self, sentence):
        """将句子转换为索引序列"""
        if self.vocab is None:
            raise ValueError("请先构建词汇表")
        
        # 添加SOS和EOS标记
        indices = [self.vocab['<SOS>']]
        
        # 转换字符为索引
        for char in sentence:
            indices.append(self.vocab.get(char, self.vocab['<UNK>']))
        
        indices.append(self.vocab['<EOS>'])
        
        # 填充到最大长度
        if len(indices) < self.max_len:
            indices.extend([self.vocab['<PAD>']] * (self.max_len - len(indices)))
        else:
            indices = indices[:self.max_len-1] + [self.vocab['<EOS>']]
        
        return indices
    
    def indices_to_sentence(self, indices):
        """将索引序列转换为句子"""
        if self.vocab is None:
            raise ValueError("请先构建词汇表")
        
        # 反转词汇表
        idx_to_word = {idx: word for word, idx in self.vocab.items()}
        
        # 转换索引为字符
        chars = []
        for idx in indices:
            if idx == self.vocab['<EOS>']:
                break
            if idx not in [self.vocab['<PAD>'], self.vocab['<SOS>']]:
                chars.append(idx_to_word.get(idx, '<UNK>'))
        
        return ''.join(chars)
    
    def create_dataset(self, conversations):
        """创建训练数据集"""
        print("创建训练数据集...")
        
        X = []
        y = []
        
        for conv in conversations:
            if len(conv) >= 2:
                input_sentence = conv[0]
                target_sentence = conv[1]
                
                input_indices = self.sentence_to_indices(input_sentence)
                target_indices = self.sentence_to_indices(target_sentence)
                
                X.append(input_indices)
                y.append(target_indices)
        
        X = torch.tensor(X, dtype=torch.long)
        y = torch.tensor(y, dtype=torch.long)
        
        print(f"数据集大小: {X.shape[0]}")
        
        return X, y
    
    class DialogDataset(Dataset):
        """对话数据集类"""
        def __init__(self, X, y):
            self.X = X
            self.y = y
        
        def __len__(self):
            return len(self.X)
        
        def __getitem__(self, idx):
            return self.X[idx], self.y[idx]
    
    class Seq2SeqModel(nn.Module):
        """序列到序列模型"""
        def __init__(self, vocab_size, embed_dim=128, hidden_dim=256, num_layers=2, dropout=0.2):
            super().__init__()
            
            # 编码器
            self.encoder_embedding = nn.Embedding(vocab_size, embed_dim, padding_idx=0)
            self.encoder_lstm = nn.LSTM(
                embed_dim, hidden_dim, num_layers,
                batch_first=True, dropout=dropout if num_layers > 1 else 0,
                bidirectional=True
            )
            
            # 解码器
            self.decoder_embedding = nn.Embedding(vocab_size, embed_dim, padding_idx=0)
            self.decoder_lstm = nn.LSTM(
                embed_dim, hidden_dim * 2, num_layers,  # 隐藏维度乘以2因为编码器是双向的
                batch_first=True, dropout=dropout if num_layers > 1 else 0
            )
            
            # 注意力机制
            self.attention = nn.MultiheadAttention(
                embed_dim=hidden_dim * 2, num_heads=8,
                dropout=dropout, batch_first=True
            )
            
            # 输出层
            self.fc = nn.Sequential(
                nn.Linear(hidden_dim * 4, hidden_dim * 2),
                nn.ReLU(),
                nn.Dropout(dropout),
                nn.Linear(hidden_dim * 2, vocab_size)
            )
            
            # 层归一化
            self.layer_norm = nn.LayerNorm(hidden_dim * 2)
            
            # 初始化权重
            self.init_weights()
        
        def init_weights(self):
            """初始化权重"""
            for name, param in self.named_parameters():
                if 'weight' in name:
                    if 'lstm' in name:
                        if 'weight_ih' in name:
                            nn.init.xavier_uniform_(param.data)
                        elif 'weight_hh' in name:
                            nn.init.orthogonal_(param.data)
                    else:
                        nn.init.xavier_uniform_(param.data)
                elif 'bias' in name:
                    nn.init.constant_(param.data, 0)
        
        def encode(self, x):
            """编码器"""
            embedded = self.encoder_embedding(x)
            outputs, (hidden, cell) = self.encoder_lstm(embedded)
            
            # 合并双向LSTM的输出
            hidden = hidden.view(self.encoder_lstm.num_layers, 2, -1, self.encoder_lstm.hidden_size)
            hidden = torch.cat([hidden[:, 0, :, :], hidden[:, 1, :, :]], dim=2)
            
            cell = cell.view(self.encoder_lstm.num_layers, 2, -1, self.encoder_lstm.hidden_size)
            cell = torch.cat([cell[:, 0, :, :], cell[:, 1, :, :]], dim=2)
            
            return outputs, hidden, cell
        
        def decode(self, x, encoder_outputs, hidden, cell):
            """解码器(带注意力)"""
            embedded = self.decoder_embedding(x)
            
            # LSTM解码
            lstm_out, (hidden, cell) = self.decoder_lstm(embedded, (hidden, cell))
            lstm_out = self.layer_norm(lstm_out)
            
            # 注意力机制
            attn_out, attn_weights = self.attention(
                lstm_out, encoder_outputs, encoder_outputs
            )
            
            # 合并LSTM输出和注意力输出
            combined = torch.cat([lstm_out, attn_out], dim=-1)
            
            # 输出层
            output = self.fc(combined)
            
            return output, hidden, cell, attn_weights
        
        def forward(self, src, tgt, teacher_forcing_ratio=0.5):
            """前向传播"""
            batch_size = src.size(0)
            tgt_len = tgt.size(1)
            vocab_size = self.fc[-1].out_features
            
            # 编码
            encoder_outputs, hidden, cell = self.encode(src)
            
            # 准备解码器输入和输出
            decoder_input = tgt[:, 0].unsqueeze(1)  # 第一个输入是SOS
            outputs = torch.zeros(batch_size, tgt_len, vocab_size).to(src.device)
            
            # 解码
            for t in range(1, tgt_len):
                decoder_output, hidden, cell, attn_weights = self.decode(
                    decoder_input, encoder_outputs, hidden, cell
                )
                
                outputs[:, t, :] = decoder_output.squeeze(1)
                
                # 教师强制
                teacher_force = torch.rand(1).item() < teacher_forcing_ratio
                
                if teacher_force and t < tgt_len - 1:
                    decoder_input = tgt[:, t].unsqueeze(1)
                else:
                    top1 = decoder_output.argmax(2)
                    decoder_input = top1
            
            return outputs
    
    def build_model(self):
        """构建模型"""
        print("构建对话模型...")
        
        self.model = self.Seq2SeqModel(
            vocab_size=self.vocab_size,
            embed_dim=128,
            hidden_dim=256,
            num_layers=2,
            dropout=0.2
        ).to(self.device)
        
        print("模型结构:")
        print(self.model)
        print(f"总参数: {sum(p.numel() for p in self.model.parameters()):,}")
        
        return self.model
    
    def train_model(self, X, y, epochs=100, batch_size=4):
        """训练模型"""
        print("训练模型...")
        
        # 创建数据集和数据加载器
        dataset = self.DialogDataset(X, y)
        dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True)
        
        # 定义损失函数和优化器
        criterion = nn.CrossEntropyLoss(ignore_index=0)  # 忽略PAD标记
        optimizer = optim.AdamW(self.model.parameters(), lr=0.001, weight_decay=1e-4)
        
        # 学习率调度器
        scheduler = optim.lr_scheduler.CosineAnnealingWarmRestarts(
            optimizer, T_0=10, T_mult=2, eta_min=1e-6
        )
        
        # 训练循环
        train_losses = []
        
        for epoch in range(epochs):
            self.model.train()
            epoch_loss = 0
            
            pbar = tqdm(dataloader, desc=f'Epoch {epoch+1}/{epochs}')
            for batch_X, batch_y in pbar:
                batch_X, batch_y = batch_X.to(self.device), batch_y.to(self.device)
                
                # 前向传播
                outputs = self.model(batch_X, batch_y, teacher_forcing_ratio=0.5)
                
                # 计算损失
                outputs = outputs[:, 1:].reshape(-1, outputs.shape[-1])
                targets = batch_y[:, 1:].reshape(-1)
                loss = criterion(outputs, targets)
                
                # 反向传播
                optimizer.zero_grad()
                loss.backward()
                
                # 梯度裁剪
                torch.nn.utils.clip_grad_norm_(self.model.parameters(), max_norm=1.0)
                
                optimizer.step()
                
                epoch_loss += loss.item()
                pbar.set_postfix({'loss': loss.item()})
            
            # 更新学习率
            scheduler.step()
            
            avg_loss = epoch_loss / len(dataloader)
            train_losses.append(avg_loss)
            
            print(f'Epoch {epoch+1}/{epochs}, Loss: {avg_loss:.4f}, LR: {scheduler.get_last_lr()[0]:.6f}')
            
            # 每隔10个epoch生成一个示例回复
            if (epoch + 1) % 10 == 0:
                self.generate_example_response("你好")
        
        print("训练完成!")
        
        return train_losses
    
    def generate_response(self, input_text, max_length=50):
        """生成回复"""
        if self.model is None:
            raise ValueError("请先训练模型")
        
        self.model.eval()
        
        # 将输入文本转换为索引
        input_indices = self.sentence_to_indices(input_text)
        input_tensor = torch.tensor([input_indices], dtype=torch.long).to(self.device)
        
        # 编码输入
        with torch.no_grad():
            encoder_outputs, hidden, cell = self.model.encode(input_tensor)
            
            # 准备解码
            decoder_input = torch.tensor([[self.vocab['<SOS>']]], device=self.device)
            output_indices = []
            attn_weights_list = []
            
            # 解码生成回复
            for _ in range(max_length):
                decoder_output, hidden, cell, attn_weights = self.model.decode(
                    decoder_input, encoder_outputs, hidden, cell
                )
                
                # 获取预测的词
                top1 = decoder_output.argmax(2)
                output_indices.append(top1.item())
                
                # 保存注意力权重
                attn_weights_list.append(attn_weights.squeeze().cpu().numpy())
                
                # 如果生成了EOS标记,停止生成
                if top1.item() == self.vocab['<EOS>']:
                    break
                
                # 更新解码器输入
                decoder_input = top1
            
            # 将索引转换为文本
            response = self.indices_to_sentence(output_indices)
        
        return response, attn_weights_list
    
    def generate_example_response(self, input_text):
        """生成示例回复并显示"""
        response, attn_weights = self.generate_response(input_text)
        print(f"输入: {input_text}")
        print(f"回复: {response}")
        print("-" * 50)
        
        return response
    
    def interactive_chat(self):
        """交互式聊天"""
        print("="*60)
        print("智能对话系统")
        print("输入 'quit' 或 '退出' 结束对话")
        print("="*60)
        
        while True:
            user_input = input("\n你: ")
            
            if user_input.lower() in ['quit', '退出', 'exit']:
                print("再见!")
                break
            
            response, attn_weights = self.generate_response(user_input)
            print(f"助手: {response}")
    
    def visualize_attention(self, input_text, response, attn_weights):
        """可视化注意力权重"""
        if not attn_weights:
            print("没有注意力权重数据")
            return
        
        # 准备数据
        input_chars = list(input_text)
        response_chars = list(response)
        
        # 创建注意力矩阵
        attn_matrix = np.array(attn_weights[:len(response_chars)])
        
        # 可视化
        plt.figure(figsize=(10, 8))
        plt.imshow(attn_matrix, cmap='hot', interpolation='nearest', aspect='auto')
        
        # 设置坐标轴
        plt.xticks(range(len(input_chars)), input_chars, rotation=45)
        plt.yticks(range(len(response_chars)), response_chars)
        
        plt.xlabel('输入文本')
        plt.ylabel('生成文本')
        plt.title('注意力权重可视化')
        plt.colorbar(label='注意力强度')
        plt.tight_layout()
        plt.show()
    
    def evaluate_model(self, test_conversations):
        """评估模型"""
        print("评估模型...")
        
        correct = 0
        total = len(test_conversations)
        
        for input_text, expected_response in test_conversations:
            generated_response, _ = self.generate_response(input_text)
            
            # 简单的评估:检查是否包含关键词
            common_words = set(list(input_text)) & set(list(generated_response))
            if len(common_words) >= 1:
                correct += 1
        
        accuracy = correct / total * 100
        print(f"准确率: {accuracy:.2f}%")
        
        return accuracy
    
    def save_model(self, path='dialog_model.pth'):
        """保存模型"""
        torch.save({
            'model_state_dict': self.model.state_dict(),
            'vocab': self.vocab,
            'vocab_size': self.vocab_size,
            'max_len': self.max_len
        }, path)
        print(f"模型已保存到 {path}")
    
    def load_model(self, path='dialog_model.pth'):
        """加载模型"""
        checkpoint = torch.load(path, map_location=self.device)
        
        self.vocab = checkpoint['vocab']
        self.vocab_size = checkpoint['vocab_size']
        self.max_len = checkpoint['max_len']
        
        self.model = self.Seq2SeqModel(
            vocab_size=self.vocab_size,
            embed_dim=128,
            hidden_dim=256,
            num_layers=2,
            dropout=0.2
        ).to(self.device)
        
        self.model.load_state_dict(checkpoint['model_state_dict'])
        print(f"模型已从 {path} 加载")
    
    def run_complete_system(self):
        """运行完整的对话系统"""
        print("="*60)
        print("启动智能对话系统")
        print("="*60)
        
        # 1. 准备数据
        conversations = self.prepare_data()
        
        # 2. 构建词汇表
        self.build_vocabulary(conversations)
        
        # 3. 创建数据集
        X, y = self.create_dataset(conversations)
        
        # 4. 构建模型
        self.build_model()
        
        # 5. 训练模型
        train_losses = self.train_model(X, y, epochs=50, batch_size=4)
        
        # 6. 可视化训练损失
        plt.figure(figsize=(10, 6))
        plt.plot(train_losses)
        plt.xlabel('Epoch')
        plt.ylabel('Loss')
        plt.title('训练损失曲线')
        plt.grid(True, alpha=0.3)
        plt.show()
        
        # 7. 生成示例回复
        print("\n示例对话:")
        test_inputs = ["你好", "今天天气怎么样", "你会做什么", "再见"]
        for input_text in test_inputs:
            self.generate_example_response(input_text)
        
        # 8. 评估模型
        test_data = [
            ["你好吗", "我很好,谢谢关心!"],
            ["你叫什么", "我是智能助手。"],
            ["今天心情如何", "作为一个AI,我没有情感,但我会尽力帮助你!"]
        ]
        self.evaluate_model(test_data)
        
        # 9. 保存模型
        self.save_model()
        
        # 10. 交互式聊天
        self.interactive_chat()
        
        return {
            'model': self.model,
            'vocab': self.vocab,
            'train_losses': train_losses
        }

# 运行对话系统
if __name__ == "__main__":
    dialog_system = IntelligentDialogSystem()
    results = dialog_system.run_complete_system()

PyTorch通过其动态计算图和直观的API设计,为深度学习研究和应用开发提供了前所未有的灵活性。它的真正价值在于将复杂的数学运算和模型构建抽象为简单易懂的Python代码,使得研究人员能够快速实验新想法,工程师能够高效部署生产系统。随着PyTorch 2.0的发布,其性能得到了显著提升,同时保持了向后兼容性和易用性。

随着AI技术的快速发展,PyTorch也在持续演进。从最初的动态图到现在的编译优化,从单机训练到大规模分布式,从模型训练到移动端部署,PyTorch始终致力于提供最佳的用户体验。同时,PyTorch生态系统(如TorchVision、TorchText、TorchAudio等)的不断完善,使得构建端到端的AI应用变得更加容易。

你在使用PyTorch时有什么特别的技巧或遇到过什么有趣的挑战?或者你有关于深度学习的独特应用场景想要分享吗?欢迎在评论区交流你的经验和想法!

Logo

免费领 200 小时云算力,进群参与显卡、AI PC 幸运抽奖

更多推荐