从TRPO到PPO:用PyTorch实现策略优化的进化之路

在强化学习领域,策略优化算法的发展经历了从复杂到简洁、从理论到实用的演变过程。当我们已经掌握了策略梯度(Policy Gradient)的基础知识后,往往会遇到一个关键瓶颈:如何在保证策略稳定更新的同时,实现高效的学习?这正是TRPO(Trust Region Policy Optimization)和PPO(Proximal Policy Optimization)试图解决的问题。

1. 策略优化的演进:为什么需要PPO?

强化学习中的策略优化本质上是在寻找一种平衡——既要充分利用当前数据快速改进策略,又要避免因更新过大导致策略崩溃。早期的TRPO通过数学上的信任区域约束实现了这一目标,但其复杂的二阶优化过程让许多实践者望而却步。

TRPO的核心约束可以表示为:

\text{maximize } \mathbb{E} \left[ \frac{\pi_\theta(a|s)}{\pi_{\theta_{\text{old}}}(a|s)} A_t \right], \quad \text{s.t. } \mathbb{E} \left[ \text{KL}(\pi_{\theta_{\text{old}}} \| \pi_\theta) \right] \leq \delta

这种方法的局限性显而易见:

  • 计算复杂度高 :需要计算和逆Fisher信息矩阵
  • 实现难度大 :约束条件需要严格满足
  • 参数敏感 :KL散度阈值δ的选择对性能影响显著

PPO通过两种创新方式解决了这些问题:

  1. Clipped Surrogate Objective :用简单的剪切操作替代复杂的约束优化
  2. Adaptive KL Penalty :动态调整的KL惩罚项,避免硬性约束

2. PPO的核心机制解析

2.1 Clipped Surrogate Objective

PPO最核心的创新在于其目标函数设计。与TRPO的硬约束不同,PPO通过剪切比例来隐式限制策略更新幅度:

ratio = torch.exp(log_probs - old_log_probs)
surr1 = ratio * advantages
surr2 = torch.clamp(ratio, 1 - eps, 1 + eps) * advantages
policy_loss = -torch.min(surr1, surr2).mean()

这个看似简单的min操作实际上包含了精妙的设计逻辑:

情况 A > 0 (鼓励动作) A < 0 (抑制动作)
ratio > 1 + ε 使用裁剪后的(1+ε)·A 使用未裁剪的ratio·A
ratio < 1 - ε 使用未裁剪的ratio·A 使用裁剪后的(1-ε)·A
1-ε ≤ ratio ≤ 1+ε 两者相等 两者相等

这种设计确保了:

  • 对好动作(A>0)的更新不会过度激进
  • 对坏动作(A<0)的惩罚力度足够强
  • 整体更新幅度始终控制在合理范围内

2.2 广义优势估计(GAE)

PPO通常结合GAE(Generalized Advantage Estimation)来更准确地估计优势函数。GAE通过平衡偏差和方差,提供了更稳定的学习信号:

# GAE计算实现
advantages = []
advantage = 0
for delta in reversed(td_deltas):
    advantage = delta + gamma * lambda_ * advantage
    advantages.insert(0, advantage)

其中,δ_t是TD误差:

\delta_t = r_t + \gamma V(s_{t+1}) - V(s_t)

GAE参数λ提供了对偏差-方差权衡的控制:

  • λ=0:完全依赖一步TD估计(低偏差,高方差)
  • λ=1:等同于蒙特卡洛估计(高偏差,低方差)

3. PPO的PyTorch实现详解

让我们从零开始构建一个完整的PPO实现。以下代码针对离散动作空间设计,但原理同样适用于连续控制。

3.1 网络结构设计

首先定义策略网络(actor)和价值网络(critic):

class PolicyNet(nn.Module):
    def __init__(self, n_states, n_hiddens, n_actions):
        super().__init__()
        self.fc1 = nn.Linear(n_states, n_hiddens)
        self.fc2 = nn.Linear(n_hiddens, n_actions)
    
    def forward(self, x):
        x = F.relu(self.fc1(x))
        return F.softmax(self.fc2(x), dim=1)

class ValueNet(nn.Module):
    def __init__(self, n_states, n_hiddens):
        super().__init__()
        self.fc1 = nn.Linear(n_states, n_hiddens)
        self.fc2 = nn.Linear(n_hiddens, 1)
    
    def forward(self, x):
        x = F.relu(self.fc1(x))
        return self.fc2(x)

3.2 PPO主体实现

完整的PPO算法类包含动作选择和学习两个核心方法:

class PPO:
    def __init__(self, n_states, n_hiddens, n_actions, actor_lr, critic_lr, 
                 lmbda, epochs, eps, gamma, device):
        self.actor = PolicyNet(n_states, n_hiddens, n_actions).to(device)
        self.critic = ValueNet(n_states, n_hiddens).to(device)
        self.actor_optimizer = torch.optim.Adam(self.actor.parameters(), lr=actor_lr)
        self.critic_optimizer = torch.optim.Adam(self.critic.parameters(), lr=critic_lr)
        self.gamma = gamma
        self.lmbda = lmbda
        self.epochs = epochs
        self.eps = eps
        self.device = device
    
    def take_action(self, state):
        state = torch.tensor(state[np.newaxis, :]).to(self.device)
        probs = self.actor(state)
        action_dist = torch.distributions.Categorical(probs)
        return action_dist.sample().item()
    
    def learn(self, transition_dict):
        states = torch.tensor(transition_dict['states'], dtype=torch.float).to(self.device)
        actions = torch.tensor(transition_dict['actions']).to(self.device).view(-1, 1)
        rewards = torch.tensor(transition_dict['rewards'], dtype=torch.float).to(self.device).view(-1, 1)
        next_states = torch.tensor(transition_dict['next_states'], dtype=torch.float).to(self.device)
        dones = torch.tensor(transition_dict['dones'], dtype=torch.float).to(self.device).view(-1, 1)
        
        # 计算TD目标和优势函数
        td_target = rewards + self.gamma * self.critic(next_states) * (1 - dones)
        td_delta = td_target - self.critic(states)
        td_delta = td_delta.cpu().detach().numpy()
        
        advantage = 0
        advantage_list = []
        for delta in td_delta[::-1]:
            advantage = self.gamma * self.lmbda * advantage + delta
            advantage_list.append(advantage)
        advantage_list.reverse()
        advantage = torch.tensor(advantage_list, dtype=torch.float).to(self.device)
        
        # 多轮次更新
        old_log_probs = torch.log(self.actor(states).gather(1, actions)).detach()
        for _ in range(self.epochs):
            log_probs = torch.log(self.actor(states).gather(1, actions))
            ratio = torch.exp(log_probs - old_log_probs)
            
            surr1 = ratio * advantage
            surr2 = torch.clamp(ratio, 1-self.eps, 1+self.eps) * advantage
            actor_loss = -torch.min(surr1, surr2).mean()
            
            critic_loss = F.mse_loss(self.critic(states), td_target.detach())
            
            self.actor_optimizer.zero_grad()
            self.critic_optimizer.zero_grad()
            actor_loss.backward()
            critic_loss.backward()
            self.actor_optimizer.step()
            self.critic_optimizer.step()

3.3 训练循环与超参数设置

在实际应用中,我们需要合理设置超参数并组织训练流程:

# 超参数设置
num_episodes = 300
gamma = 0.9
actor_lr = 1e-3
critic_lr = 1e-2
n_hiddens = 16
env_name = 'CartPole-v0'

# 环境初始化
env = gym.make(env_name)
n_states = env.observation_space.shape[0]
n_actions = env.action_space.n

# 创建PPO智能体
agent = PPO(n_states=n_states,
            n_hiddens=n_hiddens,
            n_actions=n_actions,
            actor_lr=actor_lr,
            critic_lr=critic_lr,
            lmbda=0.95,
            epochs=10,
            eps=0.2,
            gamma=gamma,
            device=device)

# 训练循环
for episode in range(num_episodes):
    state = env.reset()
    done = False
    episode_return = 0
    transition_dict = {'states': [], 'actions': [], 'next_states': [], 'rewards': [], 'dones': []}
    
    while not done:
        action = agent.take_action(state)
        next_state, reward, done, _ = env.step(action)
        transition_dict['states'].append(state)
        transition_dict['actions'].append(action)
        transition_dict['next_states'].append(next_state)
        transition_dict['rewards'].append(reward)
        transition_dict['dones'].append(done)
        state = next_state
        episode_return += reward
    
    agent.learn(transition_dict)
    print(f'Episode: {episode}, Return: {episode_return}')

4. PPO的实战技巧与调优策略

4.1 关键参数的影响与调优

PPO的性能很大程度上依赖于几个关键参数的设置:

参数 典型值 影响 调优建议
ε (clip范围) 0.1-0.3 控制策略更新幅度 环境复杂度越高,ε应越小
λ (GAE参数) 0.9-0.99 平衡偏差与方差 环境随机性大时降低λ
γ (折扣因子) 0.9-0.999 未来奖励的重要性 长周期任务需要更大的γ
训练轮次(epochs) 3-10 数据重用效率 样本效率与过拟合的权衡
批量大小 64-2048 梯度估计稳定性 资源允许下尽可能增大

4.2 常见问题与解决方案

问题1:训练初期性能下降

  • 原因 :初始探索不足,策略过早收敛到次优解
  • 解决 :增加熵正则项,鼓励探索
entropy = -torch.sum(probs * torch.log(probs), dim=1).mean()
policy_loss = -torch.min(surr1, surr2).mean() - 0.01 * entropy

问题2:价值函数估计不准确

  • 原因 :价值网络学习过快或过慢
  • 解决 :调整critic学习率,或使用单独的价值函数训练步数

问题3:高方差导致训练不稳定

  • 解决 :采用以下技术组合:
    1. 状态归一化
    2. 奖励缩放
    3. 梯度裁剪
    4. 并行环境采样

4.3 进阶改进方向

对于追求更高性能的实现,可以考虑以下扩展:

  1. 混合目标函数 :结合clip和KL散度惩罚
kl_div = torch.distributions.kl_divergence(old_dist, new_dist).mean()
if kl_div > 1.5 * target_kl:
    # 提前停止更新
    break
  1. 自适应clip范围 :根据KL散度动态调整ε
if kl_div.mean() > target_kl:
    self.eps *= 0.9
else:
    self.eps *= 1.1
  1. 优势函数归一化
advantages = (advantages - advantages.mean()) / (advantages.std() + 1e-8)
  1. 多步回报与TD(λ)结合
# 使用n步回报计算TD目标
td_target = rewards + gamma * (1 - dones) * (
    (1 - lambda_) * self.critic(next_states) + 
    lambda_ * next_values)

在实际项目中,我发现PPO对超参数的选择相当敏感,特别是在连续控制任务中。一个实用的技巧是先在简单环境(如CartPole)上验证基本实现,再逐步应用到复杂场景。另一个经验是,适当增加并行环境数量往往比调参更能提升训练效率。

Logo

免费领 100 小时云算力,进群参与显卡、AI PC 幸运抽奖

更多推荐