一、前言

前面两篇文章【强化学习的数学原理 —— 值迭代(Value Iteration)算法推导与 Python 从零实现【附源码】】&【强化学习的数学原理 —— 策略迭代(Policy Iteration)算法推导与 Python 从零实现【附源码】】介绍了基于系统模型求解最优策略的两种算法。本篇博文将继续基于赵世钰老师《强化学习的数学原理》课程的网格世界例子,介绍第一种 无需模型(model free)蒙特卡罗(Monte Carlo,MC) 强化学习算法 —— MC Basic 算法,并从零完成该算法的 Python 实现。

二、问题建模

为了方便读者阅读,本节将再次介绍问题有关背景。如已知悉,可跳过本节。

2.1 问题背景

在如图 2.1 所示的 网格世界(grid world) 中,智能体(agent) 需要从一个初始区域出发,最终到达目标区域。其中,白色单元格代表可以进入的区域,黄色单元格代表禁止进入的区域,蓝色单元格代表目标区域。


图 2.1 5×5 网格世界:出自赵世钰《强化学习的数学原理》

2.2 状态和动作空间

状态对应智能体所在单元格的位置,如图 2.1 中共有从左上到右下 25 个状态,表示为状态空间(state space) S = { s 1 , s 2 , … , s 25 } \mathcal{S} = \{s_1,s_2,\dots , s_{25}\} S={s1,s2,,s25}

在网格世界中,智能体在每一个状态有 5 个可选的动作:向上移动、向右移动、向下移动、向左移动、保持不动。所有动作的集合被称为动作空间(action space) A = { a 1 , a 2 , … , a 5 } \mathcal{A} = \{a_1,a_2,\dots , a_5\} A={a1,a2,,a5},具体如图 2.2 所示。


图 2.2 动作空间:出自赵世钰《强化学习的数学原理》原图 1.3 (b)

2.3 奖励

在一个状态执行一个动作后,智能体会得到奖励(reward) ,可以写成 r ( s , a ) r(s,a) r(s,a)。一般来说,正的奖励表示我们鼓励智能体采取相应的动作;负的奖励表示我们不鼓励智能体采取该动作。在图 2.1 的网格世界中,我们设置如下奖励:

  • 如果智能体越过四周边界,设 r b o u n d a r y = − 1 r_{boundary}=-1 rboundary=1
  • 如果智能体试图进入禁止区域,设 r f r o b i d d e n = − 10 r_{frobidden}=-10 rfrobidden=10
  • 如果智能体到达了目标区域,设 r t a r g e t = + 1 r_{target}=+1 rtarget=+1
  • 在其他情况下,智能体获得的奖励为 r o t h e r = 0 r_{other}=0 rother=0

三、MC Basic算法

3.1 无需模型的转换

蒙特卡罗方法是以策略迭代算法为基础的,因此也分为 “策略评估”“策略改进” 两个步骤。其中,策略改进步骤的元素展开形式是

π k + 1 ( s ) = arg ⁡ max ⁡ π ∑ a π ( a ∣ s ) ( ∑ r p ( r ∣ s , a ) r + γ ∑ s ′ p ( s ′ ∣ s , a ) v π k ( s ′ ) ) = arg ⁡ max ⁡ π ∑ a π ( a ∣ s ) q π k ( s , a ) , s ∈ S \begin{align*} \pi_{k+1}(s) &= \arg \max_{\pi} \sum_{a} \pi(a|s) {\left( \sum_{r} p(r|s,a)r + \gamma \sum_{s'} p(s'|s,a)v_{\pi_k}(s') \right)}\\ &= \arg \max_{\pi} \sum_a \pi(a|s)q_{\pi_k}(s,a), \quad s \in \mathcal{S} \end{align*} πk+1(s)=argπmaxaπ(as)(rp(rs,a)r+γsp(ss,a)vπk(s))=argπmaxaπ(as)qπk(s,a),sS

无需模型的方法,即不需要知道模型 { p ( r ∣ s , a ) , p ( s ′ ∣ s , a ) } \{p(r|s,a),p(s'|s,a)\} {p(rs,a),p(ss,a)} ,通过蒙特卡罗方法用数据来直接估计 q π k ( s , a ) q_{\pi_k}(s,a) qπk(s,a) 。具体做法是,从 ( s , a ) (s,a) (s,a) 开始,智能体可以执行策略 π k \pi_k πk ,进而获得 n n n 个回合,假设第 i i i 个回合的回报是 g π k ( i ) ( s , a ) g_{\pi_k}^{(i)}(s,a) gπk(i)(s,a) 。则这些回合的回报的平均值可以用来近似 q π k ( s , a ) q_{\pi_k}(s,a) qπk(s,a) ,即

q π k ( s , a ) = E [ G t ∣ S t = s , A t = a ] ≈ 1 n ∑ i = 1 n g π k ( i ) ( s , a ) q_{\pi_k}(s,a)=\mathbb{E} [G_t|S_t=s,A_t=a]\approx \frac{1}{n}\sum_{i=1}^ng_{\pi_k}^{(i)}(s,a) qπk(s,a)=E[GtSt=s,At=a]n1i=1ngπk(i)(s,a)

根据大数定律,如果 n n n 足够大,上式的近似将会足够精确。

3.2 MC Basic 算法

从初始策略 π 0 \pi_0 π0 开始,该算法在第 k k k 次迭代( k = 0 , 1 , 2 , … k=0,1,2,\dots k=0,1,2,)中有两个步骤。

  • 步骤 1:策略评估。这一步用于估算所有 ( s , a ) (s,a) (s,a) q π k ( s , a ) q_{\pi_k}(s,a) qπk(s,a)。具体来说,对于每个 ( s , a ) (s,a) (s,a),收集足够多的回合进而求其回报的平均值(记作 q k ( s , a ) q_k(s,a) qk(s,a))来近似 q π k ( s , a ) q_{\pi_k}(s,a) qπk(s,a)
  • 步骤 2:策略改进。这一步通过 π k + 1 ( s ) = arg ⁡ max ⁡ π ∑ a π ( a ∣ s ) q k ( s , a ) \pi_{k+1}(s)=\arg \max_{\pi}\sum_a \pi(a|s)q_k(s,a) πk+1(s)=argmaxπaπ(as)qk(s,a) 得到所有 s ∈ S s \in \mathcal{S} sS 的新策略,即 π k + 1 ( a k ∗ ∣ s ) = 1 \pi_{k+1}(a_k^*|s)=1 πk+1(aks)=1,其中 a k ∗ = arg ⁡ max ⁡ a q k ( s , a ) a_k^*=\arg \max_aq_k(s,a) ak=argmaxaqk(s,a)

3.3 伪代码

总结上述数学原理,该算法的伪代码如图 3.1 所示:


图 3.1 策略迭代伪代码:出自赵世钰《强化学习的数学原理》原算法 5.1

四、Python 代码实现

整套代码开发思路分成:环境定义、交互逻辑、可视化、策略迭代算法四部分,本篇挑选决定算法逻辑的四个核心函数细说,绘图工具类代码不再拆分讲解,最后附上完整源码方便读者一键运行。

4.1 get_reward 函数

get_reward 用来判定当前格子的即时奖励,对照奖励规则:到达目标 + 1、踏入禁区 - 10、普通格子收益 0。

def get_reward(state):
    if state == TARGET_STATE:
        return R_TARGET
    if state in FORBIDDEN_STATES:
        return R_FORBIDDEN
    return R_NORMAL

4.2 step 函数

step 是智能体执行动作后的状态转移规则,整个网格世界使用 5×5 矩阵表示,i 为行,j 为列。

def step(state, action):
    i, j = state
    di, dj = action
    ni, nj = i + di, j + dj
    if ni < 0 or ni >= GRID_SIZE or nj < 0 or nj >= GRID_SIZE:
        return (i, j), R_BOUNDARY
    next_state = (ni, nj)
    return next_state, get_reward(next_state)
  1. 向网格外侧移动算作越界,原地停留并扣除边界惩罚 -1;
  2. 在网格内正常移动,落地后调用 get_reward 获取对应奖励。

4.3 sample_episode 函数

sample_episod 函数是蒙特卡罗算法中最核心的轨迹采样与回报计算工具,用于评估某个初始状态 - 动作对 ( s , a ) (s,a) (s,a) 的好坏。它的作用是从指定起点 start_s、指定初始动作 start_a 出发,按照给定策略 policy 走一条完整轨迹(episode),并返回这条轨迹从起点到终点的最终累计回报 G

# 采样轨迹
def sample_episode(self, start_s, start_a, policy, max_step=100):
    episode = []
    s = start_s
    a = start_a
    for _ in range(max_step):
        ns, r = self.env.step(s, a)
        episode.append((s, a, r))
        a = policy[ns[0], ns[1]]
        s = ns
        if s == self.env.TARGET_STATE:
            break

    G = 0
    ret_list = []
    for _, _, r in reversed(episode):
        G = r + self.env.GAMMA * G
        ret_list.append(G)
    ret_list.reverse()
    return ret_list[0]

读者可以更改 max_step 的值来限制每回合的长度,探究回合长度对 Monte Carlo 算法收敛性的影响。

4.4 蒙特卡罗训练主循环

train 函数是基于蒙特卡洛采样的策略迭代算法的核心训练函数。它会自动交替执行「策略评估」和「策略改进」,直到策略收敛到最优。
该函数的整体流程概括如下:用 MC 采样估计 Q 值 → 更新价值函数 V → 改进策略 → 重复直到收敛。

def train(self, save_img=False):
    iter_cnt = 0
    while True:
        V_old = self.V.copy()
        Q = np.zeros((self.env.GRID_SIZE, self.env.GRID_SIZE, self.env.n_actions))

        # 策略评估
        for i in range(self.env.GRID_SIZE):
            for j in range(self.env.GRID_SIZE):
                for a in range(self.env.n_actions):
                    total = 0.0
                    for _ in range(self.EPISODE_PER_SA):
                        total += self.sample_episode((i, j), a, self.policy)
                    Q[i, j, a] = total / self.EPISODE_PER_SA

        # 价值更新
        self.V = np.max(Q, axis=-1)
        delta = np.max(np.abs(self.V - V_old))

        # 策略改进:平局时随机选择一个最优动作
        old_policy = self.policy.copy()
        for i in range(self.env.GRID_SIZE):
            for j in range(self.env.GRID_SIZE):
                q_vals = Q[i, j, :]
                max_q = np.max(q_vals)
                best_actions = np.where(q_vals == max_q)[0]
                self.policy[i, j] = np.random.choice(best_actions)

        iter_cnt += 1

        if delta < self.env.THETA:
            print(f"✅ 收敛!迭代次数:{iter_cnt}")
            break

    return self.V, old_policy, Q

4.4 完整源码

import os
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.patches import Rectangle, Circle

# ===================== 环境类:地图 + 绘图 =====================
class GridWorld:
    def __init__(self):
        # 基础配置
        self.GRID_SIZE = 5
        self.GAMMA = 0.9
        self.THETA = 1e-6

        # 奖励
        self.R_TARGET = 1
        self.R_FORBIDDEN = -10
        self.R_BOUNDARY = -1
        self.R_NORMAL = 0

        # 动作:原地、上、右、下、左
        self.ACTIONS = [(0, 0), (-1, 0), (0, 1), (1, 0), (0, -1)]
        self.n_actions = len(self.ACTIONS)

        # 地图
        self.FORBIDDEN_STATES = [(1, 1), (1, 2), (2, 2), (3, 1), (3, 3), (4, 1)]
        self.TARGET_STATE = (3, 2)

    # 奖励函数
    def get_reward(self, state):
        if state == self.TARGET_STATE:
            return self.R_TARGET
        if state in self.FORBIDDEN_STATES:
            return self.R_FORBIDDEN
        return self.R_NORMAL

    # 环境步进
    def step(self, state, action_idx):
        i, j = state
        di, dj = self.ACTIONS[action_idx]
        ni, nj = i + di, j + dj
        if 0 <= ni < self.GRID_SIZE and 0 <= nj < self.GRID_SIZE:
            next_state = (ni, nj)
        else:
            return (i, j), self.R_BOUNDARY
        return next_state, self.get_reward(next_state)

    # ===================== 绘图 =====================
    def draw(self, V, policy, iter_num, save_flag=False):
        plt.figure(figsize=(12, 5))
        ax1 = plt.subplot(1, 2, 1)
        self.plot_policy(policy, ax1)

        ax2 = plt.subplot(1, 2, 2)
        self.plot_value(V, ax2)

        plt.suptitle(f'Monte Carlo Iter: {iter_num}', fontsize=14)
        plt.tight_layout()

        if save_flag:
            save_dir = "Monte Carlo/Basic/iter_img"
            os.makedirs(save_dir, exist_ok=True)  # 自动创建文件夹
            plt.savefig(f"{save_dir}/iter_{iter_num:04d}.png", dpi=150)
            plt.close()
        else:
            plt.show()

    def plot_policy(self, policy, ax):
        ax.set_xlim(0, self.GRID_SIZE)
        ax.set_ylim(0, self.GRID_SIZE)
        ax.invert_yaxis()
        ax.set_xticks(range(self.GRID_SIZE))
        ax.set_yticks(range(self.GRID_SIZE))
        ax.grid(True, color='black', linewidth=1)
        ax.set_title('Optimal Policy (MC)', fontsize=12)

        for i in range(self.GRID_SIZE):
            for j in range(self.GRID_SIZE):
                rect = Rectangle((j, i), 1, 1, ec='black', lw=1)
                ax.add_patch(rect)
                if (i, j) in self.FORBIDDEN_STATES:
                    rect.set_facecolor('gold')
                elif (i, j) == self.TARGET_STATE:
                    rect.set_facecolor('cornflowerblue')
                else:
                    rect.set_facecolor('white')

        arrow_scale = 0.3
        circle_r = 0.15
        for i in range(self.GRID_SIZE):
            for j in range(self.GRID_SIZE):
                act = policy[i, j]
                xc, yc = j + 0.5, i + 0.5
                dx, dy = 0, 0
                if act == 1:
                    dy = -arrow_scale
                elif act == 2:
                    dx = arrow_scale
                elif act == 3:
                    dy = arrow_scale
                elif act == 4:
                    dx = -arrow_scale
                elif act == 0:
                    ax.add_patch(Circle((xc, yc), circle_r, color='red', fill=False, lw=2))
                    continue
                ax.arrow(xc, yc, dx, dy, head_width=0.12, head_length=0.12, fc='red', ec='red')

    def plot_value(self, V, ax):
        im = ax.imshow(V, cmap='coolwarm', origin='upper')
        ax.set_xticks(range(self.GRID_SIZE))
        ax.set_yticks(range(self.GRID_SIZE))
        ax.set_title('State Value V(s)', fontsize=12)
        for i in range(self.GRID_SIZE):
            for j in range(self.GRID_SIZE):
                ax.text(j, i, f'{V[i, j]:.2f}', ha='center', va='center', fontweight='bold')
        plt.colorbar(im, ax=ax, fraction=0.046, pad=0.04)

# ===================== 算法类:蒙特卡洛策略迭代 =====================
class MonteCarlo_Basic:
    def __init__(self, env):
        self.env = env
        self.policy = np.random.randint(0, self.env.n_actions, size=(env.GRID_SIZE, env.GRID_SIZE))
        self.V = np.zeros((env.GRID_SIZE, env.GRID_SIZE))
        self.EPISODE_PER_SA = 30

    # 采样轨迹
    def sample_episode(self, start_s, start_a, policy, max_step=100):
        episode = []
        s = start_s
        a = start_a
        for _ in range(max_step):
            ns, r = self.env.step(s, a)
            episode.append((s, a, r))
            a = policy[ns[0], ns[1]]
            s = ns
            if s == self.env.TARGET_STATE:
                break

        G = 0
        ret_list = []
        for _, _, r in reversed(episode):
            G = r + self.env.GAMMA * G
            ret_list.append(G)
        ret_list.reverse()
        return ret_list[0]

    # 训练主循环
    def train(self, save_img=False):
        iter_cnt = 0
        while True:
            V_old = self.V.copy()
            Q = np.zeros((self.env.GRID_SIZE, self.env.GRID_SIZE, self.env.n_actions))

            # 策略评估
            for i in range(self.env.GRID_SIZE):
                for j in range(self.env.GRID_SIZE):
                    for a in range(self.env.n_actions):
                        total = 0.0
                        for _ in range(self.EPISODE_PER_SA):
                            total += self.sample_episode((i, j), a, self.policy)
                        Q[i, j, a] = total / self.EPISODE_PER_SA

            # 价值更新
            self.V = np.max(Q, axis=-1)
            delta = np.max(np.abs(self.V - V_old))

            # 绘图(环境类负责)
            if save_img:
                self.env.draw(self.V, self.policy, iter_cnt, save_flag=True)

            # 策略改进:平局时随机选择一个最优动作
            old_policy = self.policy.copy()
            for i in range(self.env.GRID_SIZE):
                for j in range(self.env.GRID_SIZE):
                    q_vals = Q[i, j, :]
                    max_q = np.max(q_vals)
                    best_actions = np.where(q_vals == max_q)[0]
                    self.policy[i, j] = np.random.choice(best_actions)

            iter_cnt += 1

            if delta < self.env.THETA:
                print(f"✅ 收敛!迭代次数:{iter_cnt}")
                break

        if not save_img:
            self.env.draw(self.V, old_policy, iter_cnt - 1, save_flag=False)

        return self.V, old_policy, Q

# ===================== 主程序 =====================
if __name__ == '__main__':
    env = GridWorld()
    mc = MonteCarlo_Basic(env)
    V_opt, policy_opt, Q_opt = mc.train(save_img=True)

另外,代码增加可视化控制:save_img = True 自动保存每一轮迭代图片到 \Monte Carlo\Basic\iter_img 目录下,方便观察策略与价值的收敛变化;save_img = False 仅在算法收敛后弹出最终效果图,适合日常调试运行。

五、运行结果

初始状态,随机生成所有格子的初始策略,根据初始策略计算 V π 0 V_{\pi_0} Vπ0 ,策略及状态值如图 5.1 所示。


图 5.1 左:策略图;右:状态值图

第 1 轮迭代后,得到图 5.2 结果。可以看到最靠近目标区域的格子策略得到了很好的改善,但其它区域内的策略还是随机生成的,并且它们的 state value 也不是很好。


图 5.2 左:策略图;右:状态值图

第 2 轮迭代后,得到图 5.3 结果。可以看到最后一行第四列的格子,策略不再是随机生成的,而是变成了向左移动,策略得到提升。


图 5.3 左:策略图;右:状态值图

按照上述过程,策略逐步提升。最终,经历 12 次迭代收敛后,得到最优策略及状态值热力图如图 5.4 所示。


图 5.4 左:策略图;右:状态值图

六、其他说明

本篇博文重在代码实际实现,对于强化学习的基本概念和数学公式推导(如马尔可夫决策、贝尔曼方程等),还需读者自行查阅相关资料。这里附上赵世钰老师的b站讲解视频链接及Github,供读者参考。

更多推荐