基于DQN的二维网格世界路径规划系统

目录

  1. 引言与问题描述
  2. 强化学习与DQN理论基础
    • 2.1 强化学习核心概念
    • 2.2 Q-Learning与贝尔曼方程
    • 2.3 深度Q网络(DQN)的引入与创新
    • 2.4 DQN的挑战与改进
  3. 系统设计与实现
    • 3.1 环境搭建:网格世界类 GridWorld
    • 3.2 智能体构建:DQNAgent 类
    • 3.3 经验回放缓冲区:ReplayBuffer
    • 3.4 神经网络模型
  4. 完整代码实现
  5. 训练流程与超参数调优
  6. 结果分析与可视化
  7. 总结与展望
    • 7.1 项目总结
    • 7.2 局限性
    • 7.3 未来改进方向

1. 引言与问题描述

路径规划是机器人学、自动驾驶和游戏AI等领域的一个核心问题。其目标是在一个存在障碍物的环境中,为自主智能体(如机器人)找到一条从起点到目标点的最优(或可行)路径。

传统的路径规划算法,如A*、Dijkstra等,在许多场景下非常有效。但它们通常需要完整的环境地图信息,并且难以处理动态变化的环境或复杂、非线性的奖励信号。

深度强化学习(Deep Reinforcement Learning, DRL) 提供了一种替代方案。智能体通过与环境反复交互来学习策略,无需先验的环境模型,直接从高维感官输入(如图像或状态向量)中学习。DQN(Deep Q-Network) 作为DRL的里程碑式算法,将Q-Learning与深度学习相结合,极大地扩展了强化学习的应用范围。

本项目旨在构建一个模拟系统:一个机器人(智能体)位于一个二维网格世界中。世界中有:

  • 空闲格子:机器人可以通行。
  • 障碍物格子:机器人不可通行,碰撞会导致负奖励。
  • 起点:机器人的初始位置。
  • 目标点:机器人需要到达的位置,成功到达会获得大量正奖励。

智能体不知道地图的全局信息,只能通过移动并接收环境的反馈(奖励和新的状态)来学习如何避开障碍物并高效地到达目标。

2. 强化学习与DQN理论基础

2.1 强化学习核心概念
  • 智能体(Agent):学习和做出决策的实体,本例中的机器人。
  • 环境(Environment):智能体之外的一切,即网格世界。
  • 状态(State, s):对当前环境的完整描述。在本项目中,状态是机器人的坐标 (x, y)
  • 动作(Action, a):智能体可以做出的行为。通常是离散的,如:上、下、左、右。
  • 奖励(Reward, r):环境在智能体执行一个动作后返回的标量反馈信号。是学习过程的指导信号。我们的奖励设计:
    • 到达目标: +100
    • 碰撞障碍物: -10
    • 其他移动: -1 (鼓励智能体寻找最短路径,避免无意义徘徊)
  • 策略(Policy, π):智能体的行为方式,是从状态到动作的映射函数。我们的目标就是学习一个最优策略π*。
  • 价值函数(Value Function):评估状态或状态-动作对的长期价值。
    • 状态价值函数 V(s):从状态s开始,遵循策略π所能获得的期望累积奖励。
    • 动作价值函数 Q(s, a):在状态s下执行动作a,之后遵循策略π所能获得的期望累积奖励。DQN的核心就是学习一个近似Q函数。
2.2 Q-Learning与贝尔曼方程

Q-Learning是一种离线策略(off-policy)的时序差分(Temporal Difference, TD)算法。它通过贝尔曼最优方程来迭代更新Q值:

贝尔曼最优方程:
Q*(s, a) = E [ r + γ * max_{a'} Q*(s', a') | s, a ]

其中:

  • Q*(s, a) 是最优动作价值函数。
  • s' 是执行动作a后到达的新状态。
  • γ (gamma) 是折扣因子(0 ≤ γ ≤ 1),用于权衡即时奖励和未来奖励的重要性。

Q-Learning的更新公式为:
Q(s, a) ← Q(s, a) + α * [ r + γ * max_{a'} Q(s', a') - Q(s, a) ]
其中 α (alpha) 是学习率。

在简单问题中,Q表(一个状态数 x 动作数的表格)足以存储Q值。但对于状态空间巨大或连续的问题(如我们的网格世界变大时),Q表将变得不切实际。

2.3 深度Q网络(DQN)的引入与创新

DQN使用一个深度神经网络(DNN)来代替Q表,即 Q(s, a; θ),其中θ是神经网络的参数。网络的输入是状态s,输出是每个动作a对应的Q值。

直接使用神经网络拟合Q值面临两大挑战:1)样本间相关性高;2)不稳定的目标值。DQN通过两项关键技术解决了这些问题:

  1. 经验回放(Experience Replay)

    • 智能体将每一步的经历 (s, a, r, s', done) 存储在一个固定大小的回放缓冲区(Replay Buffer)中。
    • 训练时,从缓冲区中随机采样一小批(mini-batch)经历。
    • 好处:打破样本间的相关性,提高数据利用率,使训练过程更稳定。
  2. 目标网络(Target Network)

    • 使用两个结构相同的网络:主网络(Main Network) Q(θ)目标网络(Target Network) Q(θ⁻)
    • 主网络用于选择动作和更新参数。
    • 目标网络用于计算TD目标: y = r + γ * max_{a'} Q(s', a'; θ⁻)
    • 目标网络的参数θ⁻不会立即更新,而是定期(每隔C步)从主网络复制过来(θ⁻ ← θ)。
    • 好处:使TD目标在若干步内保持固定,大大提高了学习的稳定性。

损失函数(Loss Function)
DQN的优化目标是最小化主网络预测的Q值与目标网络给出的TD目标之间的均方误差(MSE)。
L(θ) = E_{(s,a,r,s')~U(D)} [ ( y - Q(s, a; θ) )² ]

2.4 DQN的挑战与改进

原始DQN之后又出现了许多改进算法,如Double DQN、Dueling DQN、Prioritized Experience Replay等,它们主要解决Q值过高估计等问题并提升采样效率。为保持项目简洁,我们实现最经典的DQN,但会指出可以扩展的地方。

3. 系统设计与实现

我们将系统分为三个主要部分:环境、智能体和经验回放缓冲区。

3.1 环境搭建:网格世界类 GridWorld

这个类负责模拟整个网格世界,与智能体进行交互。

属性:

  • size: 网格大小(例如10x10)。
  • grid: 一个二维数组,表示地图。0=空闲,1=障碍物,2=起点,3=目标。
  • agent_pos: 机器人当前的位置。
  • start_pos: 起点位置。
  • goal_pos: 目标位置。
  • actions: 可能的动作集合(通常是4个方向)。

方法:

  • __init__(self, size=10, obstacle_density=0.2): 初始化地图,随机生成障碍物、起点和目标。
  • reset(self): 重置环境,将机器人放回起点,返回初始状态。
  • step(self, action): 核心方法。执行动作,计算新状态、奖励,并判断回合是否结束。
  • is_valid_pos(self, pos): 检查一个位置是否在地图范围内且不是障碍物。
  • render(self): 可选方法,以字符形式打印当前地图,用于可视化。
3.2 智能体构建:DQNAgent 类

这个类封装了DQN算法的所有逻辑。

属性:

  • state_size: 状态维度(这里是2,因为状态是(x, y)坐标)。
  • action_size: 动作数量(4个)。
  • memory: 经验回放缓冲区对象。
  • gamma: 折扣因子。
  • epsilon: 探索率(ε-greedy策略)。
  • epsilon_min: 探索率的最小值。
  • epsilon_decay: 探索率的衰减率。
  • learning_rate: 神经网络的学习率。
  • model: 主Q网络(Keras模型)。
  • target_model: 目标Q网络。
  • update_target_network_frequency: 更新目标网络的频率(步数)。

方法:

  • __init__(...): 初始化所有参数,构建神经网络。
  • build_model(self): 使用Keras构建一个简单的全连接神经网络。
  • remember(self, state, action, reward, next_state, done): 将经验存储到回放缓冲区。
  • act(self, state): 根据ε-greedy策略选择动作。
  • replay(self, batch_size): 核心训练方法。从回放缓冲区采样,计算TD目标,并用这批数据训练主网络。
  • update_target_model(self): 将主网络的权重复制到目标网络。
  • load(self, name), save(self, name): 保存和加载模型权重。
3.3 经验回放缓冲区:ReplayBuffer

一个简单的数据结构,通常用dequelist实现。

属性:

  • buffer: 存储经验的列表。
  • max_size: 缓冲区的最大容量。

方法:

  • add(self, state, action, reward, next_state, done): 添加一条经验。如果缓冲区已满,则移除最早的经验。
  • sample(self, batch_size): 随机采样指定数量的经验。
  • __len__(self): 返回当前缓冲区中的经验数量。
3.4 神经网络模型

我们使用一个简单的多层感知机(MLP)。输入层接收状态(2个神经元),输出层有4个神经元,对应4个动作的Q值。
网络结构可以是:Input(2) -> Dense(24, activation='relu') -> Dense(24, activation='relu') -> Output(4)

对于更复杂的状态(例如局部视野图像),可以使用卷积神经网络(CNN)。

4. 完整代码实现

import numpy as np
import random
from collections import deque
from keras.models import Sequential
from keras.layers import Dense
from keras.optimizers import Adam
import matplotlib.pyplot as plt

# ---------------------------- 环境定义 ----------------------------
class GridWorld:
    def __init__(self, size=10, obstacle_density=0.2):
        self.size = size
        self.grid = np.zeros((size, size))
        self.actions = [0, 1, 2, 3]  # 0:上, 1:右, 2:下, 3:左
        self.action_meaning = {0: 'UP', 1: 'RIGHT', 2: 'DOWN', 3: 'LEFT'}
        
        # 随机生成起点和终点(确保它们不同且在边缘)
        self.start_pos = (0, 0)
        self.goal_pos = (size-1, size-1)
        
        # 随机生成障碍物
        num_obstacles = int(obstacle_density * size * size)
        obstacles_set = set()
        while len(obstacles_set) < num_obstacles:
            pos = (random.randint(0, size-1), random.randint(0, size-1))
            if pos != self.start_pos and pos != self.goal_pos:
                obstacles_set.add(pos)
        for pos in obstacles_set:
            self.grid[pos[0], pos[1]] = 1  # 1代表障碍物
            
        self.grid[self.start_pos] = 2 # 起点
        self.grid[self.goal_pos] = 3  # 终点
        self.agent_pos = self.start_pos
        self.done = False

    def reset(self):
        self.agent_pos = self.start_pos
        self.done = False
        return self.agent_pos

    def step(self, action):
        x, y = self.agent_pos
        if action == 0:   # 上
            new_pos = (x-1, y)
        elif action == 1: # 右
            new_pos = (x, y+1)
        elif action == 2: # 下
            new_pos = (x+1, y)
        elif action == 3: # 左
            new_pos = (x, y-1)
        else:
            raise ValueError("Invalid action")
            
        reward = -1  # 默认移动奖励
        self.done = False
        
        # 检查新位置是否有效
        if not (0 <= new_pos[0] < self.size and 0 <= new_pos[1] < self.size):
            # 出界,留在原地,给予较大惩罚
            new_pos = (x, y)
            reward = -10
        elif self.grid[new_pos[0], new_pos[1]] == 1: # 障碍物
            new_pos = (x, y)
            reward = -10
        elif new_pos == self.goal_pos: # 到达目标
            reward = 100
            self.done = True
            
        self.agent_pos = new_pos
        return new_pos, reward, self.done

    def render(self):
        grid_to_print = self.grid.copy()
        x, y = self.agent_pos
        if (x, y) not in [self.start_pos, self.goal_pos]:
            grid_to_print[x, y] = 8 # 用8表示智能体当前位置
        print(grid_to_print)
        print("Agent is at:", self.agent_pos)
        print("-----------------------------")

# ---------------------------- 经验回放缓冲区 ----------------------------
class ReplayBuffer:
    def __init__(self, max_size=2000):
        self.buffer = deque(maxlen=max_size)
    
    def add(self, state, action, reward, next_state, done):
        experience = (state, action, reward, next_state, done)
        self.buffer.append(experience)
    
    def sample(self, batch_size):
        batch = random.sample(self.buffer, batch_size)
        states, actions, rewards, next_states, dones = map(np.array, zip(*batch))
        return states, actions, rewards, next_states, dones
    
    def __len__(self):
        return len(self.buffer)

# ---------------------------- DQN智能体 ----------------------------
class DQNAgent:
    def __init__(self, state_size, action_size):
        self.state_size = state_size
        self.action_size = action_size
        self.memory = ReplayBuffer()
        self.gamma = 0.95    # 折扣因子
        self.epsilon = 1.0   # 探索率
        self.epsilon_min = 0.01
        self.epsilon_decay = 0.995
        self.learning_rate = 0.001
        self.update_target_network_frequency = 100
        self.model = self._build_model()
        self.target_model = self._build_model()
        self.update_target_model()
        self.step_count = 0

    def _build_model(self):
        model = Sequential()
        model.add(Dense(24, input_dim=self.state_size, activation='relu'))
        model.add(Dense(24, activation='relu'))
        model.add(Dense(self.action_size, activation='linear'))
        model.compile(loss='mse', optimizer=Adam(lr=self.learning_rate))
        return model

    def update_target_model(self):
        self.target_model.set_weights(self.model.get_weights())

    def remember(self, state, action, reward, next_state, done):
        self.memory.add(state, action, reward, next_state, done)

    def act(self, state):
        if np.random.rand() <= self.epsilon:
            return random.randrange(self.action_size) # 探索:随机选择动作
        state = np.reshape(state, [1, self.state_size])
        act_values = self.model.predict(state, verbose=0)
        return np.argmax(act_values[0]) # 利用:选择Q值最大的动作

    def replay(self, batch_size):
        if len(self.memory) < batch_size:
            return
        states, actions, rewards, next_states, dones = self.memory.sample(batch_size)
        
        # 使用主网络预测当前Q值
        current_q = self.model.predict(states, verbose=0)
        
        # 使用目标网络预测下一状态的Q值
        next_q = self.target_model.predict(next_states, verbose=0)
        max_next_q = np.amax(next_q, axis=1)
        
        # 计算TD目标
        target = current_q.copy()
        for i in range(batch_size):
            if dones[i]:
                target[i, actions[i]] = rewards[i]
            else:
                target[i, actions[i]] = rewards[i] + self.gamma * max_next_q[i]
                
        # 训练主网络
        self.model.fit(states, target, epochs=1, verbose=0)
        
        # 衰减探索率
        if self.epsilon > self.epsilon_min:
            self.epsilon *= self.epsilon_decay
            
    def load(self, name):
        self.model.load_weights(name)

    def save(self, name):
        self.model.save_weights(name)

# ---------------------------- 主训练循环 ----------------------------
if __name__ == "__main__":
    # 初始化环境和智能体
    env = GridWorld(size=10, obstacle_density=0.2)
    state_size = 2  # (x, y)坐标
    action_size = 4 # 4个方向
    agent = DQNAgent(state_size, action_size)
    
    # 训练参数
    episodes = 1000  # 训练回合数
    batch_size = 32
    max_steps_per_episode = 200
    rewards_history = [] # 记录每回合的总奖励
    success_history = [] # 记录是否成功到达目标
    
    for e in range(episodes):
        state = env.reset()
        state = np.array(state)
        total_reward = 0
        done = False
        step_in_episode = 0
        
        for step in range(max_steps_per_episode):
            step_in_episode += 1
            # 选择动作
            action = agent.act(state)
            
            # 执行动作
            next_state, reward, done = env.step(action)
            next_state = np.array(next_state)
            total_reward += reward
            
            # 记住这个经验
            agent.remember(state, action, reward, next_state, done)
            
            # 转移到下一个状态
            state = next_state
            
            # 进行经验回放训练
            agent.replay(batch_size)
            
            # 每隔一定步数更新目标网络
            agent.step_count += 1
            if agent.step_count % agent.update_target_network_frequency == 0:
                agent.update_target_model()
                
            if done:
                break
                
        rewards_history.append(total_reward)
        success = 1 if done and env.agent_pos == env.goal_pos else 0
        success_history.append(success)
        
        print(f"Episode: {e+1}/{episodes}, Total Reward: {total_reward}, Steps: {step_in_episode}, Epsilon: {agent.epsilon:.3f}, Success: {success}")
    
    # 训练结束后,保存模型
    agent.save("dqn_gridworld.h5")
    
    # 绘制训练结果
    plt.figure(figsize=(12, 5))
    
    plt.subplot(1, 2, 1)
    plt.plot(rewards_history)
    plt.title('Total Reward per Episode')
    plt.xlabel('Episode')
    plt.ylabel('Reward')
    
    plt.subplot(1, 2, 2)
    # 计算成功率的移动平均(窗口大小为50)
    moving_avg_success = np.convolve(success_history, np.ones(50)/50, mode='valid')
    plt.plot(moving_avg_success)
    plt.title('Success Rate (Moving Average, window=50)')
    plt.xlabel('Episode')
    plt.ylabel('Success Rate')
    
    plt.tight_layout()
    plt.savefig('training_results.png')
    plt.show()

    # ---------------------------- 测试训练好的智能体 ----------------------------
    print("\nTesting the trained agent...")
    test_episodes = 5
    for e in range(test_episodes):
        state = env.reset()
        state = np.array(state)
        done = False
        print(f"Test Episode {e+1}")
        env.render()
        step_count = 0
        while not done and step_count < 50:
            # 测试时关闭探索,只利用学到的知识
            state_reshaped = np.reshape(state, [1, state_size])
            action = np.argmax(agent.model.predict(state_reshaped, verbose=0)[0])
            next_state, reward, done = env.step(action)
            state = np.array(next_state)
            step_count += 1
            print(f"Step {step_count}: Action {env.action_meaning[action]}")
            env.render()
            if done:
                if state[0] == env.goal_pos[0] and state[1] == env.goal_pos[1]:
                    print("Reached the goal!")
                else:
                    print("Failed or hit obstacle too many times.")
            if step_count >= 50:
                print("Max steps reached.")
        print("------------------------------------")

5. 训练流程与超参数调优

训练流程:

  1. 初始化环境、智能体和回放缓冲区。
  2. 对于每个训练回合(episode):
    a. 重置环境,获得初始状态。
    b. 在未到达终止状态且未超过最大步数内循环:
    i. 智能体根据当前状态和ε-greedy策略选择动作。
    ii. 环境执行该动作,返回奖励、新状态和终止标志。
    iii. 将该经验 (s, a, r, s', done) 存入回放缓冲区。
    iv. 从回放缓冲区中随机采样一个批次的经验,用这些数据训练主Q网络(计算损失并反向传播)。
    v. 每隔C步,将主网络的权重复制到目标网络。
    vi. 衰减探索率ε。
    c. 记录本回合的总奖励和是否成功。

关键超参数及其调优:

  • gamma (γ): 折扣因子,通常设为0.95-0.99。值越大,智能体越重视长期奖励。
  • learning_rate: 学习率,通常设为0.001或更小。太大会导致训练不稳定。
  • epsilon_decay: 探索率衰减率。需要平衡探索和利用。衰减太快可能导致陷入局部最优,太慢则学习效率低下。
  • replay_buffer_size: 回放缓冲区大小。越大越好,但受内存限制。
  • batch_size: 训练时采样的批次大小,通常为32、64或128。
  • update_target_network_frequency: 更新目标网络的频率。太低(如每步更新)会失去稳定目标的意义;太高(如每1000步更新)会使学习变慢。通常设为100-1000步。

调优是一个经验性过程,需要通过观察训练曲线(奖励和成功率的变化)来反复调整。

6. 结果分析与可视化

运行代码后,我们会得到两个主要的可视化图表:

  1. 每回合总奖励图:初期奖励会非常低(大量负值),因为智能体频繁撞墙。随着训练进行,奖励会逐渐上升并趋于稳定,最终在成功到达目标时获得较高的正奖励。曲线可能会波动,但整体趋势应是上升的。
  2. 成功率移动平均图:这是更关键的指标。它显示了智能体成功找到路径的概率。我们希望看到这个曲线从0逐渐上升并最终稳定在1(或接近1)附近,表明智能体已经可靠地学会了完成任务。

在测试阶段,关闭探索(ε=0),观察智能体的行为。它应该能展示出一条从起点到目标点的合理路径,有效避开所有障碍物。

7. 总结与展望

7.1 项目总结

本项目成功实现了一个基于经典DQN算法的网格世界路径规划系统。我们从头构建了环境模拟、智能体决策、神经网络函数近似和经验回放机制。智能体通过试错,从稀疏的奖励信号中学会了复杂的导航和避障策略,验证了深度强化学习在解决序列决策问题上的强大能力。

7.2 局限性
  1. 状态表示简单:当前状态仅是坐标,是低维且完全可观测的。真实世界的状态通常是部分可观测且更高维的(如摄像头图像)。
  2. 经典DQN的缺陷:原始DQN存在Q值过高估计的问题,可能导致策略不如预期。
  3. 训练效率:在更复杂的环境中,简单的MLP和基础DQN可能需要非常长的训练时间。
  4. 静态环境:当前环境是静态的,障碍物位置固定,无法处理动态障碍物。
7.3 未来改进方向
  1. 算法升级:实现更先进的算法,如 Double DQN(解决过高估计)、Dueling DQN(更好评估状态价值)、Prioritized Experience Replay(优先回放重要的经验)等。
  2. 状态表示:将状态从坐标改为局部视野的网格图像(使用CNN处理),使智能体更能适应未知环境。
  3. 课程学习(Curriculum Learning):从简单地图开始训练,逐步增加地图大小和障碍物密度,加速学习过程。
  4. 动态环境:引入移动的障碍物,训练智能体在动态环境中规划路径。
  5. 多智能体:扩展环境,包含多个智能体,研究它们之间的协作或竞争。

通过本项目,我们不仅实现了一个有趣的路径规划demo,更重要的是搭建了一个可以持续迭代和扩展的深度强化学习研究框架。

Logo

更多推荐