Chapter 16 Reinforcement Learning

OReilly. Hands-On Machine Learning with Scikit-Learn and TensorFlow读书笔记

This chapter has been revised largely since the book published. So see the jupyter notebook version for update.

16.1 Learning to Optimize Rewards

In Reinforcement Learning, a software agent makes observations and takes actions within an environment, and in return it receives rewards. Its objective is to learn to act in a way that will maximize its expected long-term rewards.

16.2 Policy Search

The algorithm used by the software agent to determine its action is called its policy.

One possible learning algorithm could be to try out many different values for policy parameters, and pick the combination that performs best.

Another way to explore the policy space is to use genetic algorithms. For example, you could randomly create a first generation of 100 policies and try them out, then “kill” the 80 worst policies6 and make the 20 survivors produce 4 offspring each. An offspring is just a copy of its parent plus some random variation. The surviving policies plus their offspring together constitute the second generation. You can continue to iterate through generations this way, until you find a good policy.

Another approach is to use optimization techniques, by evaluating the gradients of the rewards with regards to the policy parameters, then tweaking these parameters by following the gradient toward higher rewards (gradient ascent). This approach is called policy gradients (PG).

16.3 Introduction to OpenAI Gym

OpenAI gym is a toolkit that provides a wide variety of simulated environments (Atari games, board games, 2D and 3D physical simulations, and so on), so you can train agents, compare them, or develop new RL algorithms.

import gym
#create an environment
env = gym.make("CartPole-v0")
#initialize the environment and obtain the first observation
obs = env.reset()
#each observation is a 1D NumPy array containing four floats: the cart’s horizontal position (0.0 = center), its velocity, the angle of the pole (0.0 = vertical), and its angular velocity
obs

from PIL import Image, ImageDraw

try:
    from pyglet.gl import gl_info
    openai_cart_pole_rendering = True   # no problem, let's use OpenAI gym's rendering function
except Exception:
    openai_cart_pole_rendering = False  # probably no X server available, let's use our own rendering function

def render_cart_pole(env, obs):
    if openai_cart_pole_rendering:
        # use OpenAI gym's rendering function
        return env.render(mode="rgb_array")
    else:
        # rendering for the cart pole environment (in case OpenAI gym can't do it)
        img_w = 600
        img_h = 400
        cart_w = img_w // 12
        cart_h = img_h // 15
        pole_len = img_h // 3.5
        pole_w = img_w // 80 + 1
        x_width = 2
        max_ang = 0.2
        bg_col = (255, 255, 255)
        cart_col = 0x000000 # Blue Green Red
        pole_col = 0x669acc # Blue Green Red

        pos, vel, ang, ang_vel = obs
        img = Image.new('RGB', (img_w, img_h), bg_col)
        draw = ImageDraw.Draw(img)
        cart_x = pos * img_w // x_width + img_w // x_width
        cart_y = img_h * 95 // 100
        top_pole_x = cart_x + pole_len * np.sin(ang)
        top_pole_y = cart_y - cart_h // 2 - pole_len * np.cos(ang)
        draw.line((0, cart_y, img_w, cart_y), fill=0)
        draw.rectangle((cart_x - cart_w // 2, cart_y - cart_h // 2, cart_x + cart_w // 2, cart_y + cart_h // 2), fill=cart_col) # draw cart
        draw.line((cart_x, cart_y - cart_h // 2, top_pole_x, top_pole_y), fill=pole_col, width=pole_w) # draw pole
        return np.array(img)

def plot_cart_pole(env, obs):
    plt.close()  # or else nbagg sometimes plots in the previous cell
    img = render_cart_pole(env, obs)
    plt.imshow(img)
    plt.axis("off")
    plt.show()
#show on the screen
plot_cart_pole(env, obs)
#what actions are possible, Discrete(2) means integers 0 and 1
env.action_space

obs = env.reset()
while True:
    #step() exectes the given action and returns four values:
    obs, reward, done, info = env.step(0)
    if done:
        break
        
plt.close()  # or else nbagg sometimes plots in the previous cell
img = render_cart_pole(env, obs)
plt.imshow(img)
plt.axis("off")

img.shape

obs = env.reset()
while True:
    obs, reward, done, info = env.step(1)
    if done:
        break

plot_cart_pole(env, obs)
#hard code a simple strategy

frames = []

n_max_steps = 1000
n_change_steps = 10

obs = env.reset()
for step in range(n_max_steps):
    img = render_cart_pole(env, obs)
    frames.append(img)

    # hard-coded policy
    position, velocity, angle, angular_velocity = obs
    if angle < 0:
        action = 0
    else:
        action = 1

    obs, reward, done, info = env.step(action)
    if done:
        break
        
video = plot_animation(frames)
plt.show()

16.4 Neural Network Policies

Note that in this particular environment, the past actions and observations can safely be ignored, since each observation contains the environment’s full state. If there were some hidden state, then you may need to consider past actions and observations as well.

import tensorflow as tf

# 1. Specify the neural network architecture
n_inputs = 4 # == env.observation_space.shape[0]
n_hidden = 4 # it's a simple task, we don't need more hidden neurons
n_outputs = 1 # only outputs the probability of accelerating left
initializer = tf.variance_scaling_initializer()

# 2. Build the neural network
X = tf.placeholder(tf.float32, shape=[None, n_inputs])
hidden = tf.layers.dense(X, n_hidden, activation=tf.nn.elu,
                         kernel_initializer=initializer)
logits = tf.layers.dense(hidden, n_outputs,
                         kernel_initializer=initializer)
outputs = tf.nn.sigmoid(logits)

# 3. Select a random action based on the estimated probabilities
p_left_and_right = tf.concat(axis=1, values=[outputs, 1 - outputs])
action = tf.multinomial(tf.log(p_left_and_right), num_samples=1)
init = tf.global_variables_initializer()

Let’s go through this code:

  1. After the imports, we define the neural network architecture. The number of inputs is the size of the observation space (which in the case of the CartPole is four), so we just have four hidden units and no need for more, and we have just one output probability (the probability of going left).
  2. Next we build the neural network. The output layer uses the logistic (sigmoid) activation function in order to output a probability from 0.0 to 1.0. If there were more than two possible actions, there would be one output neuron per action, and you would use the softmax activation function instead.
  3. Lastly, we call the multinomial() function to pick a random action. This function independently samples one (or more) integers, given the log probability of each integer. For example, if you call it with the array [np.log(0.5), np.log(0.2), np.log(0.3)] and with num_samples=5, then it will output five integers, each of which will have a 50% probability of being 0, 20% of being 1, and 30% of being 2. In our case we just need one integer representing the action to take. Since the outputs tensor only contains the probability of going left, we must first concatenate 1-outputs to it to have a tensor containing the probability of both left and right actions. Note that if there were more than two possible actions, the neural network would have to output one probability per action so you would not need the concatenation step.

Let’s randomly initialize this policy neural network and use it to play one game:

n_max_steps = 1000
frames = []

with tf.Session() as sess:
    init.run()
    obs = env.reset()
    for step in range(n_max_steps):
        #save each rendered images as an animation
        img = render_cart_pole(env, obs)
        frames.append(img)
        action_val = action.eval(feed_dict={X: obs.reshape(1, n_inputs)})
        obs, reward, done, info = env.step(action_val[0][0])
        if done:
            break

env.close()

video = plot_animation(frames)
plt.show()

It played pretty bad. The neural network will have to learn to do better.

16.5 Evaluating Actions: The Credit Assignment Problem

If we knew what the best action was at each step, we could train the neural network as usual, by minimizing the cross entropy between the estimated probability and the target probability. It would just be regular supervised learning. However, in Reinforcement Learning the only guidance the agent gets is through rewards, and rewards are typically sparse and delayed.

The credit assignment problem: when the agent gets a reward, it is hard for it to know which actions should get credited (or blamed) for it.

To tackle this problem, a common strategy is to evaluate an action based on the sum of all the rewards that come after it, usually applying a discount rate r at each step. For example (see Figure 16-6), if an agent decides to go right three times in a row and gets +10 reward after the first step, 0 after the second step, and finally –50 after the third step, then assuming we use a discount rate r = 0.8 r = 0.8 r=0.8, the first action will have a total score of 10 + r × 0 + r 2 × ( – 50 ) = – 22 10 + r × 0 + r^2 × (–50) = –22 10+r×0+r2×(50)=22. If the discount rate is close to 0, then future rewards won’t count for much compared to immediate rewards. Conversely, if the discount rate is close to 1, then rewards far into the future will count almost as much as immediate rewards. Typical discount rates are 0.95 or 0.99. With a discount rate of 0.95, rewards 13 steps into the future count roughly for half as much as immediate rewards (since 0.9 5 13 ≈ 0.5 0.95^{13} \approx 0.5 0.95130.5), while with a discount rate of 0.99, rewards 69 steps into the future count for half as much as immediate rewards. In the CartPole environment, actions have fairly short-term effects, so choosing a discount rate of 0.95 seems reasonable.

To get fairly reliable action scores, we must run many episodes and normalize all the action scores (by subtracting the mean and dividing by the standard deviation).

16.6 Policy Gradients

PG algorithms optimize the parameters of a policy by following the gradients toward higher rewards.

  1. First, let the neural network policy play the game several times and at each step compute the gradients that would make the chosen action even more likely, but don’t apply these gradients yet.
  2. Once you have run several episodes, compute each action’s score (using the method described in the previous paragraph).
  3. If an action’s score is positive, it means that the action was good and you want to apply the gradients computed earlier to make the action even more likely to be chosen in the future. However, if the score is negative, it means the action was bad and you want to apply the opposite gradients to make this action slightly less likely in the future. The solution is simply to multiply each gradient vector by the
    corresponding action’s score.
  4. Finally, compute the mean of all the resulting gradient vectors, and use it to perform a Gradient Descent step.
import tensorflow as tf

tf.reset_default_graph()

n_inputs = 4
n_hidden = 4
n_outputs = 1

learning_rate = 0.01

initializer = tf.variance_scaling_initializer()

X = tf.placeholder(tf.float32, shape=[None, n_inputs])

hidden = tf.layers.dense(X, n_hidden, activation=tf.nn.elu, kernel_initializer=initializer)
logits = tf.layers.dense(hidden, n_outputs)
outputs = tf.nn.sigmoid(logits)  # probability of action 0 (left)
p_left_and_right = tf.concat(axis=1, values=[outputs, 1 - outputs])
action = tf.multinomial(tf.log(p_left_and_right), num_samples=1)

#the target probability must be 1.0 if the chosen action is action 0. 
y = 1. - tf.to_float(action)
cross_entropy = tf.nn.sigmoid_cross_entropy_with_logits(labels=y, logits=logits)
optimizer = tf.train.AdamOptimizer(learning_rate)
grads_and_vars = optimizer.compute_gradients(cross_entropy)
gradients = [grad for grad, variable in grads_and_vars]

gradient_placeholders = []
grads_and_vars_feed = []
for grad, variable in grads_and_vars:
    gradient_placeholder = tf.placeholder(tf.float32, shape=grad.get_shape())
    gradient_placeholders.append(gradient_placeholder)
    grads_and_vars_feed.append((gradient_placeholder, variable))
training_op = optimizer.apply_gradients(grads_and_vars_feed)

init = tf.global_variables_initializer()
saver = tf.train.Saver()
def discount_rewards(rewards, discount_rate):
    discounted_rewards = np.zeros(len(rewards))
    cumulative_rewards = 0
    for step in reversed(range(len(rewards))):
        cumulative_rewards = rewards[step] + cumulative_rewards * discount_rate
        discounted_rewards[step] = cumulative_rewards
    return discounted_rewards

def discount_and_normalize_rewards(all_rewards, discount_rate):
    all_discounted_rewards = [discount_rewards(rewards, discount_rate) for rewards in all_rewards]
    flat_rewards = np.concatenate(all_discounted_rewards)
    reward_mean = flat_rewards.mean()
    reward_std = flat_rewards.std()
    return [(discounted_rewards - reward_mean)/reward_std for discounted_rewards in all_discounted_rewards]
env = gym.make("CartPole-v0")

n_games_per_update = 10
n_max_steps = 1000
n_iterations = 250
save_iterations = 10
discount_rate = 0.95

with tf.Session() as sess:
    init.run()
    for iteration in range(n_iterations):
        print("\rIteration: {}".format(iteration), end="")
        all_rewards = []
        all_gradients = []
        for game in range(n_games_per_update):#episode
            current_rewards = []
            current_gradients = []
            obs = env.reset()
            for step in range(n_max_steps):
                action_val, gradients_val = sess.run([action, gradients], feed_dict={X: obs.reshape(1, n_inputs)})
                obs, reward, done, info = env.step(action_val[0][0])
                current_rewards.append(reward)
                current_gradients.append(gradients_val)
                if done:
                    break
            all_rewards.append(current_rewards)
            all_gradients.append(current_gradients)

        all_rewards = discount_and_normalize_rewards(all_rewards, discount_rate=discount_rate)
        feed_dict = {}
        for var_index, gradient_placeholder in enumerate(gradient_placeholders):
            mean_gradients = np.mean([reward * all_gradients[game_index][step][var_index]
                                      for game_index, rewards in enumerate(all_rewards)
                                          for step, reward in enumerate(rewards)], axis=0)
            feed_dict[gradient_placeholder] = mean_gradients
        sess.run(training_op, feed_dict=feed_dict)
        if iteration % save_iterations == 0:
            saver.save(sess, "./my_policy_net_pg.ckpt")

Each training iteration starts by running the policy for 10 episodes (with maximum 1,000 steps per episode, to avoid running forever). At each step, we also compute the gradients, pretending that the chosen action was the best. After these 10 episodes have been run, we compute the action scores using the discount_and_normalize_rewards() function; we go through each trainable variable, across all episodes and all steps, to multiply each gradient vector by its corresponding action score; and we compute the mean of the resulting gradients. Finally, we run the training operation, feeding it these mean gradients (one per trainable variable). We also save the model every 10 training operations.

We will now look at another popular family of algorithms. Whereas PG algorithms directly try to optimize the policy to increase rewards, the algorithms we will look at now are less direct: the agent learns to estimate the expected sum of discounted future rewards for each state, or the expected sum of discounted future rewards for each action in each state, then uses this knowledge to decide how to act. To understand these algorithms, we must first introduce Markov decision processes (MDP).

16.7 Markov Decision Processes

Andrey Markov studied stochastic processes with no memory, called Markov chains. Such a process has a fixed number of states, and it randomly evolves from one state to another at each step. The probability for it to evolve from a state s s s to a state s ′ s′ s is fixed, and it depends only on the pair ( s , s ′ ) (s,s′) (s,s), not on past states (the system has no memory).

The optimal state value of any state s s s, noted V ∗ ( s ) V^*(s) V(s), which is the sum of all discounted future rewards the agent can expect on average after it reaches a state s s s, assuming it acts optimally. If the agent acts optimally, then the Bellman Optimality Equation applies.

Equation 16-1. Bellman Optimality Equation
V ∗ ( s ) = max ⁡ a ∑ s ′ T ( s , a , s ′ ) [ R ( s , a , s ′ ) + γ ⋅ V ∗ ( s ′ ) ]  for all  s V^*(s)=\max_a\sum_{s&#x27;}T(s,a,s&#x27;)[R(s,a,s&#x27;)+\gamma\cdot V^*(s&#x27;)] \textrm{ for all }s V(s)=amaxsT(s,a,s)[R(s,a,s)+γV(s)] for all s

  • T ( s , a , s ′ ) T(s, a, s′) T(s,a,s) is the transition probability from state s s s to state s ′ s′ s, given that the agent
    chose action a a a.
  • R ( s , a , s ′ ) R(s, a, s′) R(s,a,s) is the reward that the agent gets when it goes from state s s s to state s ′ s′ s,
    given that the agent chose action a a a.
  • γ \gamma γ is the discount rate.

This equation leads directly to an algorithm that can precisely estimate the optimal state value of every possible state: you first initialize all the state value estimates to zero, and then you iteratively update them using the Value Iteration algorithm (see Equation 16-2).

Equation 16-2. Value Iteration algorithm
V k + 1 ( s ) ← max ⁡ a ∑ s ′ T ( s , a , s ′ ) [ R ( s , a , s ′ ) + γ ⋅ V k ( s ′ ) ]  for all  s V_{k+1}(s)\leftarrow \max_a\sum_{s&#x27;}T(s,a,s&#x27;)[R(s,a,s&#x27;)+\gamma\cdot V_k(s&#x27;)] \textrm{ for all }s Vk+1(s)amaxsT(s,a,s)[R(s,a,s)+γVk(s)] for all s

  • V k ( s ) V_k(s) Vk(s) is the estimated value of state s s s at the k k k-th iteration of the algorithm.

Knowing the optimal state values can be useful, in particular to evaluate a policy, but it does not tell the agent explicitly what to do. A very similar algorithm to estimate the optimal state-action values is called Q-Values. The optimal Q-Value of the state-action pair ( s , a ) (s,a) (s,a), noted Q ∗ ( s , a ) Q^*(s,a) Q(s,a), is the sum of discounted future rewards the agent can expect on average after it reaches the state s s s and chooses action a a a, but before it sees the outcome of this action, assuming it acts optimally after that action.

Here is how it works: once again, you start by initializing all the Q-Value estimates to zero, then you update them using the Q-Value Iteration algorithm (see Equation 16-3).

Equation 16-3. Q-Value Iteration algorithm
Q k + 1 ( s , a ) ← ∑ s ′ T ( s , a , s ′ ) [ R ( s , a , s ′ ) + γ ⋅ max ⁡ a ′ Q k ( s ′ , a ′ ) ]  for all  ( s , a ) Q_{k+1}(s,a)\leftarrow \sum_{s&#x27;}T(s,a,s&#x27;)[R(s,a,s&#x27;)+\gamma \cdot\max_{a&#x27;}Q_k(s&#x27;,a&#x27;)]\textrm{ for all }(s,a) Qk+1(s,a)sT(s,a,s)[R(s,a,s)+γamaxQk(s,a)] for all (s,a)
Once you have the optimal Q-Values, defining the optimal policy, noted π ∗ ( s ) \pi^*(s) π(s), is trivial: when the agent is in state s s s, it should choose the action with the highest Q-Value for that state: π ∗ ( s ) = argmax a Q ∗ ( s , a ) \pi ^*(s) = \mathop{\textrm{argmax}}_a Q^*(s, a) π(s)=argmaxaQ(s,a).

#Q-Value Iteration algorithm
nan=np.nan # represents impossible actions
T = np.array([ # shape=[s, a, s']
[[0.7, 0.3, 0.0], [1.0, 0.0, 0.0], [0.8, 0.2, 0.0]],
    [[0.0, 1.0, 0.0], [nan, nan, nan], [0.0, 0.0, 1.0]],
[[nan, nan, nan], [0.8, 0.1, 0.1], [nan, nan, nan]],
])
R = np.array([ # shape=[s, a, s']
[[10., 0.0, 0.0], [0.0, 0.0, 0.0], [0.0, 0.0, 0.0]],
[[10., 0.0, 0.0], [nan, nan, nan], [0.0, 0.0, -50.]],
[[nan, nan, nan], [40., 0.0, 0.0], [nan, nan, nan]],
])
possible_actions = [[0, 1, 2], [0, 2], [1]]

Q = np.full((3, 3), -np.inf) # -inf for impossible actions
for state, actions in enumerate(possible_actions):
    Q[state, actions] = 0.0 # Initial value = 0.0, for all possible actions
learning_rate = 0.01
discount_rate = 0.95
n_iterations = 100

for iteration in range(n_iterations):
    Q_prev = Q.copy()
    for s in range(3):
        for a in possible_actions[s]:
            Q[s, a] = np.sum([T[s, a, sp] * (R[s, a, sp] + discount_rate * np.max(Q_prev[sp])) for sp in range(3)])

Q

16.8 Temporal Difference Learning and Q-Learning

Reinforcement Learning problems with discrete actions can often be modeled as Markov decision processes, but the agent initially has no idea what the transition probabilities are (it does not know T ( s , a , s ′ ) T(s, a, s′) T(s,a,s)), and it does not know what the rewards are going to be either (it does not know R ( s , a , s ′ ) R(s, a, s′) R(s,a,s)). It must experience each state and each transition at least once to know the rewards, and it must experience them multiple times if it is to have a reasonable estimate of the transition probabilities.

The Temporal Difference Learning (TD Learning) algorithm is very similar to the Value Iteration algorithm, but tweaked to take into account the fact that the agent has only partial knowledge of the MDP. In general we assume that the agent initially knows only the possible states and actions, and nothing more. The agent uses an exploration policy—for example, a purely random policy—to explore the MDP, and as it progresses the TD Learning algorithm updates the estimates of the state values based on the transitions and rewards that are actually observed.

Equation 16-4. TD Learning algorithm
V k + 1 ( s ) ← ( 1 − α ) V k ( s ) + α ( r + γ ⋅ V k ( s ′ ) ) V_{k+1}(s)\leftarrow (1-\alpha)V_k(s)+\alpha (r+\gamma\cdot V_k(s&#x27;)) Vk+1(s)(1α)Vk(s)+α(r+γVk(s))

  • α \alpha α is the learning rate (e.g., 0.01).

TD Learning has many similarities with Stochastic Gradient Descent, in particular the fact that it handles one sample at a time. Just like SGD, it can only truly converge if you gradually reduce the learning rate (otherwise it will keep bouncing around the optimum).

For each state s s s, this algorithm simply keeps track of a running average of the immediate rewards the agent gets upon leaving that state, plus the rewards it expects to get later (assuming it acts optimally).

Similarly, the Q-Learning algorithm is an adaptation of the Q-Value Iteration algorithm to the situation where the transition probabilities and the rewards are initially unknown (see Equation 16-5).
Equation 16-5. Q-Learning algorithm
Q k + 1 ( s , a ) ← ( 1 − α ) Q k ( s , a ) + α ( r + γ ⋅ max ⁡ a ′ Q k ( s ′ , a ′ ) ) Q_{k + 1}(s, a)\leftarrow (1 - \alpha) Q_k (s, a) + \alpha\left(r + \gamma\cdot \max_{a′}Q_k(s′, a′)\right) Qk+1(s,a)(1α)Qk(s,a)+α(r+γamaxQk(s,a))
For each state-action pair ( s , a ) (s, a) (s,a), this algorithm keeps track of a running average of the rewards r r r the agent gets upon leaving the state s s s with action a a a, plus the rewards it expects to get later. Since the target policy would act optimally, we take the maximum of the Q-Value estimates for the next state.

This is called an off-policy algorithm because the policy being trained is not the one being executed.

16.8.1 Exploration Policies

A better option is to use the ε \varepsilon ε-greedy policy: at each step it acts randomly with probability ε \varepsilon ε, or greedily (choosing the action with the highest QValue) with probability 1- ε \varepsilon ε. The advantage of the ε \varepsilon ε-greedy policy (compared to a completely random policy) is that it will spend more and more time exploring the interesting parts of the environment, as the Q-Value estimates get better and better, while still spending some time visiting unknown regions of the MDP. It is quite common to start with a high value for ε \varepsilon ε (e.g., 1.0) and then gradually reduce it (e.g., down to 0.05).

Another approach is to encourage the exploration policy to try actions that it has not tried much before. This can be implemented as a bonus added to the Q-Value estimates, as shown in Equation 16-6.

Equation 16-6. Q-Learning using an exploration function
Q ( s , a ) ← ( 1 − α ) Q ( s , a ) + α ( r + γ ⋅ max ⁡ a ′ f ( Q ( s ′ , a ′ ) , N ( s ′ , a ′ ) ) ) Q(s,a)\leftarrow (1-\alpha)Q(s,a) +\alpha \left(r+\gamma\cdot \max_{a&#x27;} f(Q(s&#x27;,a&#x27;),N(s&#x27;,a&#x27;) )\right) Q(s,a)(1α)Q(s,a)+α(r+γamaxf(Q(s,a),N(s,a)))

  • N ( s ′ , a ′ ) N(s′, a′) N(s,a) counts the number of times the action a ′ a′ a was chosen in state s ′ s′ s.
  • f ( q , n ) f(q, n) f(q,n) is an exploration function, such as f ( q , n ) = q + K / ( 1 + n ) f(q, n) = q + K/(1 + n) f(q,n)=q+K/(1+n), where K K K is a curiosity hyperparameter that measures how much the agent is attracted to to the unknown.

16.8.2 Approximate Q-Learning

The main problem with Q-Learning is that it does not scale well to large (or even medium) MDPs with many states and actions.

The solution is to find a function that approximates the Q-Values using a manageable number of parameters. This is called Approximate Q-Learning. For years it was recommended to use linear combinations of hand-crafted features extracted from the state (e.g., distance of the closest ghosts, their directions, and so on) to estimate QValues, but DeepMind showed that using deep neural networks can work much better, especially for complex problems, and it does not require any feature engineering. A DNN used to estimate Q-Values is called a deep Q-network (DQN), and using a DQN for Approximate Q-Learning is called Deep Q-Learning.

16.9 Learning to Play Ms. Pac-Man Using Deep Q-Learning

$ pip3 install --upgrade gym[all]

Or

pip install --no-index -f https://github.com/Kojoley/atari-py/releases atari_py
#原文:https://blog.csdn.net/luolia233/article/details/82995328
import gym
env=gym.make("MsPacman-v0")
obs=env.reset()
obs.shape # (210, 160, 3)

env.action_space # Discrete(9)
#preprocessing
mspacman_color = 210 + 164 + 74

def preprocess_observation(obs):
    img = obs[1:176:2, ::2] # crop and downsize
    img = img.sum(axis=2) # to greyscale
    img[img==mspacman_color] = 0 # Improve contrast
    img = (img // 3 - 128).astype(np.int8) # normalize from -128 to 127
    return img.reshape(88, 80, 1)

img = preprocess_observation(obs)

import matplotlib.pyplot as plt
plt.figure(figsize=(11, 7))
plt.subplot(121)
plt.title("Original observation (160×210 RGB)")
plt.imshow(obs)
plt.axis("off")
plt.subplot(122)
plt.title("Preprocessed observation (88×80 greyscale)")
plt.imshow(img.reshape(88, 80), interpolation="nearest", cmap="gray")
plt.axis("off")
plt.show()

Next, let’s create the DQN. It is more convenient to use a neural network that takes only a state s as input and outputs one Q-Value estimate per action. The DQN will be composed of three convolutional layers, followed by two fully connected layers, including the output layer (see Figure 16-10).

Since we need two identical DQNs, we will create a q_network() function to build them:

tf.reset_default_graph()

input_height = 88
input_width = 80
input_channels = 1
conv_n_maps = [32, 64, 64]
conv_kernel_sizes = [(8,8), (4,4), (3,3)]
conv_strides = [4, 2, 1]
conv_paddings = ["SAME"] * 3 
conv_activation = [tf.nn.relu] * 3
n_hidden_in = 64 * 11 * 10  # conv3 has 64 maps of 11x10 each
n_hidden = 512
hidden_activation = tf.nn.relu
n_outputs = env.action_space.n  # 9 discrete actions are available
initializer = tf.variance_scaling_initializer()

def q_network(X_state, name):
    prev_layer = X_state / 128.0 # scale pixel intensities to the [-1.0, 1.0] range.
    with tf.variable_scope(name) as scope:
        for n_maps, kernel_size, strides, padding, activation in zip(
                conv_n_maps, conv_kernel_sizes, conv_strides,
                conv_paddings, conv_activation):
            prev_layer = tf.layers.conv2d(
                prev_layer, filters=n_maps, kernel_size=kernel_size,
                strides=strides, padding=padding, activation=activation,
                kernel_initializer=initializer)
        last_conv_layer_flat = tf.reshape(prev_layer, shape=[-1, n_hidden_in])
        hidden = tf.layers.dense(last_conv_layer_flat, n_hidden,
                                 activation=hidden_activation,
                                 kernel_initializer=initializer)
        outputs = tf.layers.dense(hidden, n_outputs,
                                  kernel_initializer=initializer)
    trainable_vars = tf.get_collection(tf.GraphKeys.TRAINABLE_VARIABLES,
                                       scope=scope.name)
    trainable_vars_by_name = {var.name[len(scope.name):]: var
                              for var in trainable_vars}
    return outputs, trainable_vars_by_name

Now let’s create the input placeholder, the two DQNs, and the operation to copy the
online DQN to the target DQN:

X_state = tf.placeholder(tf.float32, shape=[None, input_height, input_width,
                                            input_channels])
online_q_values, online_vars = q_network(X_state, name="q_networks/online")
target_q_values, target_vars = q_network(X_state, name="q_networks/target")

copy_ops = [target_var.assign(online_vars[var_name])
            for var_name, target_var in target_vars.items()]
copy_online_to_target = tf.group(*copy_ops)

We have an operation called copy_online_to_target to copy all the trainable variables of the online DQN to the target DQN. We use TensorFlow’s tf.group() function to group all the assignment operations into a single convenient operation.

As we will see, the training algorithm we will use requires two DQNs with the same architecture (but different parameters): the online DQN is the one that learns and is copied to the target DQN at regular intervals. The target DQN’s only role is to estimate the next state’s Q-Values for each possible action. This is needed to compute the target Q-Values for training the online DQN, as shown in this equation:
y ( s , a ) = r + γ ⋅ max ⁡ a ′ Q t a r g e t ( s ′ , a ′ ) y(s,a)=r+\gamma\cdot\max_{a&#x27;}Q_{target}(s&#x27;,a&#x27;) y(s,a)=r+γamaxQtarget(s,a)

  • y ( s , a ) y(s,a) y(s,a) is the target Q-Value to train the online DQN for the state-action pair ( s , a ) (s,a) (s,a).
  • r r r is the reward actually collected after playing action a a a in state s s s.
  • γ \gamma γ is the discount rate.
  • s ′ s&#x27; s is the state actually reached after played action a a a in state s s s.
  • a ′ a′ a is one of the possible actions in state s ′ s′ s.
  • Q t a r g e t ( s ′ , a ′ ) Q_{target}(s′,a′) Qtarget(s,a) is the target DQN’s estimate of the Q-Value of playing action a ′ a′ a while in state s ′ s′ s.

Because the experiences are very noisy, it is better to use a quadratic loss only for small errors (below 1.0) and a linear loss (twice the absolute error) for larger errors, which is what the code above computes. This way large errors don’t push the model parameters around as much. Note that we also tweaked some hyperparameters (using a smaller learning rate, and using Nesterov Accelerated Gradients rather than Adam optimization, since adaptive gradient algorithms may sometimes be bad, according to this paper). We also tweaked a few other hyperparameters below (a larger replay memory, longer decay for the ε \varepsilon ε-greedy policy, larger discount rate, less frequent copies of the online DQN to the target DQN, etc.).

learning_rate = 0.001
momentum = 0.95

with tf.variable_scope("train"):
    X_action = tf.placeholder(tf.int32, shape=[None])
    y = tf.placeholder(tf.float32, shape=[None, 1])
    q_value = tf.reduce_sum(online_q_values * tf.one_hot(X_action, n_outputs),
                            axis=1, keepdims=True)
    error = tf.abs(y - q_value)
    clipped_error = tf.clip_by_value(error, 0.0, 1.0)
    linear_error = 2 * (error - clipped_error)
    loss = tf.reduce_mean(tf.square(clipped_error) + linear_error)

    global_step = tf.Variable(0, trainable=False, name='global_step')
    optimizer = tf.train.MomentumOptimizer(learning_rate, momentum, use_nesterov=True)
    training_op = optimizer.minimize(loss, global_step=global_step)

init = tf.global_variables_initializer()
saver = tf.train.Saver()

Since the DQN outputs one Q-Value for every possible action, we need to keep only the Q-Value that corresponds to the action that was actually chosen in this memory. For this, we will convert the action to a one-hot vector (recall that this is a vector full of 0s except for a 1 at the ith index), and multiply it by the Q-Values: this will zero out all Q-Values except for the one corresponding to the memorized action. Then just sum over the first axis to obtain only the desired Q-Value prediction for each memory. We also create a nontrainable variable called global_step. The optimizer’s minimize() operation will take care of incrementing it.

We will let the online DQN play for a while, storing all its experiences in a replay memory. Each memory will be a 5-tuple (state, action, next state, reward, continue), where the “continue” item will be equal to 0.0 when the game is over, or 1.0 otherwise. Next, at regular intervals we will sample a batch of memories from the replay memory, and we will estimate the Q-Values from these memories. Finally, we will train the target DQN to predict these Q-Values using regular supervised learning techniques. Once every few training iterations, we will copy the target DQN to the online DQN. And that’s it! Equation 16-7 shows the cost function used to train the target DQN:

class ReplayMemory:
    def __init__(self, maxlen):
        self.maxlen = maxlen
        self.buf = np.empty(shape=maxlen, dtype=np.object)
        self.index = 0
        self.length = 0
        
    def append(self, data):
        self.buf[self.index] = data
        self.length = min(self.length + 1, self.maxlen)
        self.index = (self.index + 1) % self.maxlen
    
    def sample(self, batch_size, with_replacement=True):
        if with_replacement:
            indices = np.random.randint(self.length, size=batch_size) # faster
        else:
            indices = np.random.permutation(self.length)[:batch_size]
        return self.buf[indices]

replay_memory_size = 500000
replay_memory = ReplayMemory(replay_memory_size)

def sample_memories(batch_size):
    cols = [[], [], [], [], []] # state, action, reward, next_state, continue
    for memory in replay_memory.sample(batch_size):
        for col, value in zip(cols, memory):
            col.append(value)
    cols = [np.array(col) for col in cols]
    return cols[0], cols[1], cols[2].reshape(-1, 1), cols[3], cols[4].reshape(-1, 1)

eps_min = 0.1
eps_max = 1.0
eps_decay_steps = 2000000

def epsilon_greedy(q_values, step):
    epsilon = max(eps_min, eps_max - (eps_max-eps_min) * step/eps_decay_steps)
    if np.random.rand() < epsilon:
        return np.random.randint(n_outputs) # random action
    else:
        return np.argmax(q_values) # optimal action

Training

import os
n_steps = 4000000  # total number of training steps
training_start = 10000  # start training after 10,000 game iterations
training_interval = 4  # run a training step every 4 game iterations
save_steps = 1000  # save the model every 1,000 training steps
copy_steps = 10000  # copy online DQN to target DQN every 10,000 training steps
discount_rate = 0.99
skip_start = 90  # Skip the start of every game (it's just waiting time).
batch_size = 50
iteration = 0  # game iterations
checkpoint_path = "./my_dqn.ckpt"
done = True # env needs to be reset

loss_val = np.infty
game_length = 0
total_max_q = 0
mean_max_q = 0.0

with tf.Session() as sess:
    if os.path.isfile(checkpoint_path + ".index"):
        saver.restore(sess, checkpoint_path)
    else:
        init.run()
        copy_online_to_target.run()
    while True:
        step = global_step.eval()
        if step >= n_steps:
            break
        iteration += 1
        print("\rIteration {}\tTraining step {}/{} ({:.1f})%\tLoss {:5f}\tMean Max-Q {:5f}   ".format(
            iteration, step, n_steps, step * 100 / n_steps, loss_val, mean_max_q), end="")
        if done: # game over, start again
            obs = env.reset()
            for skip in range(skip_start): # skip the start of each game
                obs, reward, done, info = env.step(0)
            state = preprocess_observation(obs)

        # Online DQN evaluates what to do
        q_values = online_q_values.eval(feed_dict={X_state: [state]})
        action = epsilon_greedy(q_values, step)

        # Online DQN plays
        obs, reward, done, info = env.step(action)
        next_state = preprocess_observation(obs)

        # Let's memorize what happened
        replay_memory.append((state, action, reward, next_state, 1.0 - done))
        state = next_state

        # Compute statistics for tracking progress (not shown in the book)
        total_max_q += q_values.max()
        game_length += 1
        if done:
            mean_max_q = total_max_q / game_length
            total_max_q = 0.0
            game_length = 0

        if iteration < training_start or iteration % training_interval != 0:
            continue # only train after warmup period and at regular intervals
        
        # Sample memories and use the target DQN to produce the target Q-Value
        X_state_val, X_action_val, rewards, X_next_state_val, continues = (
            sample_memories(batch_size))
        next_q_values = target_q_values.eval(
            feed_dict={X_state: X_next_state_val})
        max_next_q_values = np.max(next_q_values, axis=1, keepdims=True)
        y_val = rewards + continues * discount_rate * max_next_q_values

        # Train the online DQN
        _, loss_val = sess.run([training_op, loss], feed_dict={
            X_state: X_state_val, X_action: X_action_val, y: y_val})

        # Regularly copy the online DQN to the target DQN
        if step % copy_steps == 0:
            copy_online_to_target.run()

        # And save regularly
        if step % save_steps == 0:
            saver.save(sess, checkpoint_path)

You can interrupt the cell above at any time to test your agent using the cell below. You can then run the cell above once again, it will load the last parameters saved and resume training.

frames = []
n_max_steps = 10000

with tf.Session() as sess:
    saver.restore(sess, checkpoint_path)

    obs = env.reset()
    for step in range(n_max_steps):
        state = preprocess_observation(obs)

        # Online DQN evaluates what to do
        q_values = online_q_values.eval(feed_dict={X_state: [state]})
        action = np.argmax(q_values)

        # Online DQN plays
        obs, reward, done, info = env.step(action)

        img = env.render(mode="rgb_array")
        frames.append(img)

        if done:
            break
%matplotlib nbagg
import matplotlib.animation as animation
def plot_animation(frames, repeat=False, interval=40):
    plt.close()  # or else nbagg sometimes plots in the previous cell
    fig = plt.figure()
    patch = plt.imshow(frames[0])
    plt.axis('off')
    return animation.FuncAnimation(fig, update_scene, fargs=(frames, patch), frames=len(frames), repeat=repeat, interval=interval)

plot_animation(frames)
Logo

CSDN联合极客时间,共同打造面向开发者的精品内容学习社区,助力成长!

更多推荐