Chapter 6 Temporal-Difference Learning

“Reinforcement Learning: An Introduction” by Richard Sutton & Andrew Barto, 2nd Ed

Author: Charles Zhang
All Notes Catelog for Reinforcement Learning: An Introduction. This post is created following BY-NC-ND 4.0 agreement, please follow terms while sharing.

Implementation for Exercises and Figures in the Book

import numpy as np
import matplotlib
import matplotlib.pyplot as plt
%matplotlib inline
from tqdm import tqdm
import copy
# 0 is the left terminal state
# 6 is the right terminal state
# 1 ... 5 represents A ... E
VALUES = np.zeros(7)
VALUES[1:6] = 0.5
# For convenience, we assume all rewards are 0
# and the left terminal state has value 0, the right terminal state has value 1
# This trick has been used in Gambler's Problem
VALUES[6] = 1

# set up true state values
TRUE_VALUE = np.zeros(7)
TRUE_VALUE[1:6] = np.arange(1, 6) / 6.0
TRUE_VALUE[6] = 1

ACTION_LEFT = 0
ACTION_RIGHT = 1

# @values: current states value, will be updated if @batch is False
# @alpha: step size
# @batch: whether to update @values
def temporal_difference(values, alpha=0.1, batch=False):
    state = 3
    trajectory = [state]
    rewards = [0]
    while True:
        old_state = state
        if np.random.binomial(1, 0.5) == ACTION_LEFT:
            state -= 1
        else:
            state += 1
        # Assume all rewards are 0
        reward = 0
        trajectory.append(state)
        # TD update
        if not batch:
            values[old_state] += alpha * (reward + values[state] - values[old_state])
        if state == 6 or state == 0:
            break
        rewards.append(reward)
    return trajectory, rewards

# @values: current states value, will be updated if @batch is False
# @alpha: step size
# @batch: whether to update @values
def monte_carlo(values, alpha=0.1, batch=False):
    state = 3
    trajectory = [3]

    # if end up with left terminal state, all returns are 0
    # if end up with right terminal state, all returns are 1
    while True:
        if np.random.binomial(1, 0.5) == ACTION_LEFT:
            state -= 1
        else:
            state += 1
        trajectory.append(state)
        if state == 6:
            returns = 1.0
            break
        elif state == 0:
            returns = 0.0
            break

    if not batch:
        for state_ in trajectory[:-1]:
            # MC update
            values[state_] += alpha * (returns - values[state_])
    return trajectory, [returns] * (len(trajectory) - 1)

# Example 6.2 left
def compute_state_value():
    episodes = [0, 1, 10, 100]
    current_values = np.copy(VALUES)
    plt.figure(1)
    for i in range(episodes[-1] + 1):
        if i in episodes:
            plt.plot(current_values, label=str(i) + ' episodes')
        temporal_difference(current_values)
    plt.plot(TRUE_VALUE, label='true values')
    plt.xlabel('state')
    plt.ylabel('estimated value')
    plt.legend()

# Example 6.2 right
def rms_error():
    # Same alpha value can appear in both arrays
    td_alphas = [0.15, 0.1, 0.05]
    mc_alphas = [0.01, 0.02, 0.03, 0.04]
    episodes = 100 + 1
    runs = 100
    for i, alpha in enumerate(td_alphas + mc_alphas):
        total_errors = np.zeros(episodes)
        if i < len(td_alphas):
            method = 'TD'
            linestyle = 'solid'
        else:
            method = 'MC'
            linestyle = 'dashdot'
        for r in tqdm(range(runs)):
            errors = []
            current_values = np.copy(VALUES)
            for i in range(0, episodes):
                errors.append(np.sqrt(np.sum(np.power(TRUE_VALUE - current_values, 2)) / 5.0))
                if method == 'TD':
                    temporal_difference(current_values, alpha=alpha)
                else:
                    monte_carlo(current_values, alpha=alpha)
            total_errors += np.asarray(errors)
        total_errors /= runs
        plt.plot(total_errors, linestyle=linestyle, label=method + ', alpha = %.02f' % (alpha))
    plt.xlabel('episodes')
    plt.ylabel('RMS')
    plt.legend()

# Figure 6.2
# @method: 'TD' or 'MC'
def batch_updating(method, episodes, alpha=0.001):
    # perform 100 independent runs
    runs = 100
    total_errors = np.zeros(episodes)
    for r in tqdm(range(0, runs)):
        current_values = np.copy(VALUES)
        errors = []
        # track shown trajectories and reward/return sequences
        trajectories = []
        rewards = []
        for ep in range(episodes):
            if method == 'TD':
                trajectory_, rewards_ = temporal_difference(current_values, batch=True)
            else:
                trajectory_, rewards_ = monte_carlo(current_values, batch=True)
            trajectories.append(trajectory_)
            rewards.append(rewards_)
            while True:
                # keep feeding our algorithm with trajectories seen so far until state value function converges
                updates = np.zeros(7)
                for trajectory_, rewards_ in zip(trajectories, rewards):
                    for i in range(0, len(trajectory_) - 1):
                        if method == 'TD':
                            updates[trajectory_[i]] += rewards_[i] + current_values[trajectory_[i + 1]] - current_values[trajectory_[i]]
                        else:
                            updates[trajectory_[i]] += rewards_[i] - current_values[trajectory_[i]]
                updates *= alpha
                if np.sum(np.abs(updates)) < 1e-3:
                    break
                # perform batch updating
                current_values += updates
            # calculate rms error
            errors.append(np.sqrt(np.sum(np.power(current_values - TRUE_VALUE, 2)) / 5.0))
        total_errors += np.asarray(errors)
    total_errors /= runs
    return total_errors

def example_6_2():
    plt.figure(figsize=(6, 9))
    plt.subplot(2, 1, 1)
    compute_state_value()

    plt.subplot(2, 1, 2)
    rms_error()
    plt.tight_layout()

    plt.plot()
    
example_6_2()
100%|██████████| 100/100 [00:00<00:00, 223.94it/s]
100%|██████████| 100/100 [00:00<00:00, 258.15it/s]
100%|██████████| 100/100 [00:00<00:00, 234.18it/s]
100%|██████████| 100/100 [00:00<00:00, 285.93it/s]
100%|██████████| 100/100 [00:00<00:00, 289.74it/s]
100%|██████████| 100/100 [00:00<00:00, 256.06it/s]
100%|██████████| 100/100 [00:00<00:00, 283.01it/s]
def figure_6_2():
    episodes = 100 + 1
    td_erros = batch_updating('TD', episodes)
    mc_erros = batch_updating('MC', episodes)

    plt.plot(td_erros, label='TD')
    plt.plot(mc_erros, label='MC')
    plt.xlabel('episodes')
    plt.ylabel('RMS error')
    plt.legend()

    plt.plot()

figure_6_2()
100%|██████████| 100/100 [00:31<00:00,  3.17it/s]
100%|██████████| 100/100 [00:27<00:00,  3.68it/s]

Example 6.5: Windy Gridworld Shown inset below is a standard gridworld, with start and goal states, but with one difference: there is a crosswind running upward through the middle of the grid. The actions are the standard four: up, down, right, and left—but in the middle region the resultant next states are shifted upward by a “wind,” the strength of which varies from column to column. The strength of the wind is given below each column, in number of cells shifted upward. For example, if you are one cell to the right of the goal, then the action left takes you to the cell just above the goal. This is an undiscounted episodic task, with constant rewards of $-1$ until the goal state is reached.

# world height
WORLD_HEIGHT = 7

# world width
WORLD_WIDTH = 10

# wind strength for each column
WIND = [0, 0, 0, 1, 1, 1, 2, 2, 1, 0]

# possible actions
ACTION_UP = 0
ACTION_DOWN = 1
ACTION_LEFT = 2
ACTION_RIGHT = 3

# probability for exploration
EPSILON = 0.1

# Sarsa step size
ALPHA = 0.5

# reward for each step
REWARD = -1.0

START = [3, 0]
GOAL = [3, 7]
ACTIONS = [ACTION_UP, ACTION_DOWN, ACTION_LEFT, ACTION_RIGHT]

def step(state, action):
    i, j = state
    if action == ACTION_UP:
        return [max(i - 1 - WIND[j], 0), j]
    elif action == ACTION_DOWN:
        return [max(min(i + 1 - WIND[j], WORLD_HEIGHT - 1), 0), j]
    elif action == ACTION_LEFT:
        return [max(i - WIND[j], 0), max(j - 1, 0)]
    elif action == ACTION_RIGHT:
        return [max(i - WIND[j], 0), min(j + 1, WORLD_WIDTH - 1)]
    else:
        assert False

# play for an episode
def episode(q_value):
    # track the total time steps in this episode
    time = 0

    # initialize state
    state = START

    # choose an action based on epsilon-greedy algorithm
    if np.random.binomial(1, EPSILON) == 1:
        action = np.random.choice(ACTIONS)
    else:
        values_ = q_value[state[0], state[1], :]
        action = np.random.choice([action_ for action_, value_ in enumerate(values_) if value_ == np.max(values_)])

    # keep going until get to the goal state
    while state != GOAL:
        next_state = step(state, action)
        if np.random.binomial(1, EPSILON) == 1:
            next_action = np.random.choice(ACTIONS)
        else:
            values_ = q_value[next_state[0], next_state[1], :]
            next_action = np.random.choice([action_ for action_, value_ in enumerate(values_) if value_ == np.max(values_)])

        # Sarsa update
        q_value[state[0], state[1], action] += \
            ALPHA * (REWARD + q_value[next_state[0], next_state[1], next_action] -
                     q_value[state[0], state[1], action])
        state = next_state
        action = next_action
        time += 1
    return time

def example_6_5():
    q_value = np.zeros((WORLD_HEIGHT, WORLD_WIDTH, 4))
    episode_limit = 500

    steps = []
    ep = 0
    while ep < episode_limit:
        steps.append(episode(q_value))
        # time = episode(q_value)
        # episodes.extend([ep] * time)
        ep += 1

    steps = np.add.accumulate(steps)

    plt.plot(steps, np.arange(1, len(steps) + 1))
    plt.xlabel('Time steps')
    plt.ylabel('Episodes')

    plt.plot()

    # display the optimal policy
    optimal_policy = []
    for i in range(0, WORLD_HEIGHT):
        optimal_policy.append([])
        for j in range(0, WORLD_WIDTH):
            if [i, j] == GOAL:
                optimal_policy[-1].append('G')
                continue
            bestAction = np.argmax(q_value[i, j, :])
            if bestAction == ACTION_UP:
                optimal_policy[-1].append('U')
            elif bestAction == ACTION_DOWN:
                optimal_policy[-1].append('D')
            elif bestAction == ACTION_LEFT:
                optimal_policy[-1].append('L')
            elif bestAction == ACTION_RIGHT:
                optimal_policy[-1].append('R')
    print('Optimal policy is:')
    for row in optimal_policy:
        print(row)
    print('Wind strength for each column:\n{}'.format([str(w) for w in WIND]))
    
example_6_5()
Optimal policy is:
['R', 'R', 'R', 'R', 'R', 'R', 'R', 'R', 'R', 'D']
['L', 'R', 'U', 'R', 'R', 'R', 'R', 'R', 'R', 'D']
['L', 'R', 'R', 'R', 'R', 'R', 'R', 'L', 'R', 'D']
['R', 'R', 'R', 'R', 'R', 'R', 'R', 'G', 'U', 'D']
['R', 'R', 'R', 'R', 'R', 'R', 'U', 'D', 'L', 'L']
['D', 'R', 'R', 'R', 'R', 'U', 'U', 'D', 'L', 'L']
['R', 'R', 'R', 'R', 'U', 'U', 'U', 'U', 'D', 'L']
Wind strength for each column:
['0', '0', '0', '1', '1', '1', '2', '2', '1', '0']

Example 6.6: Cliff Walking This gridworld example compares Sarsa and Q-learning, highlighting the difference between on-policy (Sarsa) and off-policy (Q-learning) methods. Consider the gridworld shown to the right. This is a standard undiscounted, episodic task, with start SaferRpath and goal states, and the usual actions causing movement up, down, Optimal path right, and left. Reward is $-1$ on all transitions except those into the region marked “The Cliff.” Stepping into this region incurs a reward of $-100$ and sends the agent instantly back to the start.

# world height
WORLD_HEIGHT = 4

# world width
WORLD_WIDTH = 12

# probability for exploration
EPSILON = 0.1

# step size
ALPHA = 0.5

# gamma for Q-Learning and Expected Sarsa
GAMMA = 1

# all possible actions
ACTION_UP = 0
ACTION_DOWN = 1
ACTION_LEFT = 2
ACTION_RIGHT = 3
ACTIONS = [ACTION_UP, ACTION_DOWN, ACTION_LEFT, ACTION_RIGHT]

# initial state action pair values
START = [3, 0]
GOAL = [3, 11]

def step(state, action):
    i, j = state
    if action == ACTION_UP:
        next_state = [max(i - 1, 0), j]
    elif action == ACTION_LEFT:
        next_state = [i, max(j - 1, 0)]
    elif action == ACTION_RIGHT:
        next_state = [i, min(j + 1, WORLD_WIDTH - 1)]
    elif action == ACTION_DOWN:
        next_state = [min(i + 1, WORLD_HEIGHT - 1), j]
    else:
        assert False

    reward = -1
    if (action == ACTION_DOWN and i == 2 and 1 <= j <= 10) or (
        action == ACTION_RIGHT and state == START):
        reward = -100
        next_state = START

    return next_state, reward

# choose an action based on epsilon greedy algorithm
def choose_action(state, q_value):
    if np.random.binomial(1, EPSILON) == 1:
        return np.random.choice(ACTIONS)
    else:
        values_ = q_value[state[0], state[1], :]
        return np.random.choice([action_ for action_, value_ in enumerate(values_) if value_ == np.max(values_)])

# an episode with Sarsa
# @q_value: values for state action pair, will be updated
# @expected: if True, will use expected Sarsa algorithm
# @step_size: step size for updating
# @return: total rewards within this episode
def sarsa(q_value, expected=False, step_size=ALPHA):
    state = START
    action = choose_action(state, q_value)
    rewards = 0.0
    while state != GOAL:
        next_state, reward = step(state, action)
        next_action = choose_action(next_state, q_value)
        rewards += reward
        if not expected:
            target = q_value[next_state[0], next_state[1], next_action]
        else:
            # calculate the expected value of new state
            target = 0.0
            q_next = q_value[next_state[0], next_state[1], :]
            best_actions = np.argwhere(q_next == np.max(q_next))
            for action_ in ACTIONS:
                if action_ in best_actions:
                    target += ((1.0 - EPSILON) / len(best_actions) + EPSILON / len(ACTIONS)) * q_value[next_state[0], next_state[1], action_]
                else:
                    target += EPSILON / len(ACTIONS) * q_value[next_state[0], next_state[1], action_]
        target *= GAMMA
        q_value[state[0], state[1], action] += step_size * (
                reward + target - q_value[state[0], state[1], action])
        state = next_state
        action = next_action
    return rewards

# an episode with Q-Learning
# @q_value: values for state action pair, will be updated
# @step_size: step size for updating
# @return: total rewards within this episode
def q_learning(q_value, step_size=ALPHA):
    state = START
    rewards = 0.0
    while state != GOAL:
        action = choose_action(state, q_value)
        next_state, reward = step(state, action)
        rewards += reward
        # Q-Learning update
        q_value[state[0], state[1], action] += step_size * (
                reward + GAMMA * np.max(q_value[next_state[0], next_state[1], :]) -
                q_value[state[0], state[1], action])
        state = next_state
    return rewards

# print optimal policy
def print_optimal_policy(q_value):
    optimal_policy = []
    for i in range(0, WORLD_HEIGHT):
        optimal_policy.append([])
        for j in range(0, WORLD_WIDTH):
            if [i, j] == GOAL:
                optimal_policy[-1].append('G')
                continue
            bestAction = np.argmax(q_value[i, j, :])
            if bestAction == ACTION_UP:
                optimal_policy[-1].append('U')
            elif bestAction == ACTION_DOWN:
                optimal_policy[-1].append('D')
            elif bestAction == ACTION_LEFT:
                optimal_policy[-1].append('L')
            elif bestAction == ACTION_RIGHT:
                optimal_policy[-1].append('R')
    for row in optimal_policy:
        print(row)
# Use multiple runs instead of a single run and a sliding window
# With a single run I failed to present a smooth curve
# However the optimal policy converges well with a single run
# Sarsa converges to the safe path, while Q-Learning converges to the optimal path
def example_6_6():
    # episodes of each run
    episodes = 500

    # perform 40 independent runs
    runs = 50

    rewards_sarsa = np.zeros(episodes)
    rewards_q_learning = np.zeros(episodes)
    for r in tqdm(range(runs)):
        q_sarsa = np.zeros((WORLD_HEIGHT, WORLD_WIDTH, 4))
        q_q_learning = np.copy(q_sarsa)
        for i in range(0, episodes):
            # cut off the value by -100 to draw the figure more elegantly
            # rewards_sarsa[i] += max(sarsa(q_sarsa), -100)
            # rewards_q_learning[i] += max(q_learning(q_q_learning), -100)
            rewards_sarsa[i] += sarsa(q_sarsa)
            rewards_q_learning[i] += q_learning(q_q_learning)

    # averaging over independt runs
    rewards_sarsa /= runs
    rewards_q_learning /= runs

    # draw reward curves
    plt.plot(rewards_sarsa, label='Sarsa')
    plt.plot(rewards_q_learning, label='Q-Learning')
    plt.xlabel('Episodes')
    plt.ylabel('Sum of rewards during episode')
    plt.ylim([-100, 0])
    plt.legend()

    plt.plot()

    # display optimal policy
    print('Sarsa Optimal Policy:')
    print_optimal_policy(q_sarsa)
    print('Q-Learning Optimal Policy:')
    print_optimal_policy(q_q_learning)
example_6_6()
100%|██████████| 50/50 [00:44<00:00,  1.12it/s]
Sarsa Optimal Policy:
['R', 'R', 'R', 'R', 'R', 'R', 'R', 'D', 'R', 'R', 'D', 'D']
['R', 'R', 'U', 'U', 'R', 'U', 'R', 'R', 'R', 'R', 'R', 'D']
['U', 'R', 'U', 'U', 'U', 'L', 'U', 'U', 'U', 'U', 'R', 'D']
['U', 'U', 'U', 'U', 'U', 'U', 'U', 'U', 'U', 'U', 'U', 'G']
Q-Learning Optimal Policy:
['U', 'R', 'R', 'R', 'D', 'R', 'R', 'R', 'R', 'R', 'D', 'D']
['U', 'D', 'D', 'R', 'R', 'R', 'R', 'D', 'D', 'R', 'R', 'D']
['R', 'R', 'R', 'R', 'R', 'R', 'R', 'R', 'R', 'R', 'R', 'D']
['U', 'U', 'U', 'U', 'U', 'U', 'U', 'U', 'U', 'U', 'U', 'G']

Interim and asymptotic performance of TD control methods on the cliff-walking task as a function of $\alpha$.

# Due to limited capacity of calculation of my machine, I can't complete this experiment
# with 100,000 episodes and 50,000 runs to get the fully averaged performance
# However even I only play for 1,000 episodes and 10 runs, the curves looks still good.
def figure_6_3():
    step_sizes = np.arange(0.1, 1.1, 0.1)
    episodes = 1000
    runs = 10

    ASY_SARSA = 0
    ASY_EXPECTED_SARSA = 1
    ASY_QLEARNING = 2
    INT_SARSA = 3
    INT_EXPECTED_SARSA = 4
    INT_QLEARNING = 5
    methods = range(0, 6)

    performace = np.zeros((6, len(step_sizes)))
    for run in range(runs):
        for ind, step_size in tqdm(list(zip(range(0, len(step_sizes)), step_sizes))):
            q_sarsa = np.zeros((WORLD_HEIGHT, WORLD_WIDTH, 4))
            q_expected_sarsa = np.copy(q_sarsa)
            q_q_learning = np.copy(q_sarsa)
            for ep in range(episodes):
                sarsa_reward = sarsa(q_sarsa, expected=False, step_size=step_size)
                expected_sarsa_reward = sarsa(q_expected_sarsa, expected=True, step_size=step_size)
                q_learning_reward = q_learning(q_q_learning, step_size=step_size)
                performace[ASY_SARSA, ind] += sarsa_reward
                performace[ASY_EXPECTED_SARSA, ind] += expected_sarsa_reward
                performace[ASY_QLEARNING, ind] += q_learning_reward

                if ep < 100:
                    performace[INT_SARSA, ind] += sarsa_reward
                    performace[INT_EXPECTED_SARSA, ind] += expected_sarsa_reward
                    performace[INT_QLEARNING, ind] += q_learning_reward

    performace[:3, :] /= episodes * runs
    performace[3:, :] /= 100 * runs
    labels = ['Asymptotic Sarsa', 'Asymptotic Expected Sarsa', 'Asymptotic Q-Learning',
              'Interim Sarsa', 'Interim Expected Sarsa', 'Interim Q-Learning']

    for method, label in zip(methods, labels):
        plt.plot(step_sizes, performace[method, :], label=label)
    plt.xlabel('alpha')
    plt.ylabel('reward per episode')
    plt.legend()

    plt.plot()
figure_6_3()
100%|██████████| 10/10 [00:33<00:00,  3.38s/it]
100%|██████████| 10/10 [00:33<00:00,  3.34s/it]
100%|██████████| 10/10 [00:33<00:00,  3.36s/it]
100%|██████████| 10/10 [00:33<00:00,  3.36s/it]
100%|██████████| 10/10 [00:33<00:00,  3.35s/it]
100%|██████████| 10/10 [00:35<00:00,  3.53s/it]
100%|██████████| 10/10 [00:38<00:00,  3.87s/it]
100%|██████████| 10/10 [00:34<00:00,  3.44s/it]
100%|██████████| 10/10 [00:37<00:00,  3.70s/it]
100%|██████████| 10/10 [00:34<00:00,  3.44s/it]

Example 6.7: Maximization Bias Example The small MDP shown inset in Figure below provides a simple example of how maximization bias can harm the performance of TD control algorithms. The MDP has two non-terminal states $A$ and $B$. Episodes always start in $A$ with a choice between two actions, left and right. The right action transitions immediately to the terminal state with a reward and return of zero. The left action transitions to $B$, also with a reward of zero, from which there are many possible actions all of which cause immediate termination with a reward drawn from a normal distribution with mean $0.1$ and variance $1.0$. Thus, the expected return for any trajectory starting with left is $0.1$, and thus taking left in state $A$ is always a mistake. Nevertheless, our control methods may favor left because of maximization bias making B appear to have a positive value.

# state A
STATE_A = 0

# state B
STATE_B = 1

# use one terminal state
STATE_TERMINAL = 2

# starts from state A
STATE_START = STATE_A

# possible actions in A
ACTION_A_RIGHT = 0
ACTION_A_LEFT = 1

# probability for exploration
EPSILON = 0.1

# step size
ALPHA = 0.1

# discount for max value
GAMMA = 1.0

# possible actions in B, maybe 10 actions
ACTIONS_B = range(0, 10)

# all possible actions
STATE_ACTIONS = [[ACTION_A_RIGHT, ACTION_A_LEFT], ACTIONS_B]

# state action pair values, if a state is a terminal state, then the value is always 0
INITIAL_Q = [np.zeros(2), np.zeros(len(ACTIONS_B)), np.zeros(1)]

# set up destination for each state and each action
TRANSITION = [[STATE_TERMINAL, STATE_B], [STATE_TERMINAL] * len(ACTIONS_B)]

# choose an action based on epsilon greedy algorithm
def choose_action(state, q_value):
    if np.random.binomial(1, EPSILON) == 1:
        return np.random.choice(STATE_ACTIONS[state])
    else:
        values_ = q_value[state]
        return np.random.choice([action_ for action_, value_ in enumerate(values_) if value_ == np.max(values_)])

# take @action in @state, return the reward
def take_action(state, action):
    if state == STATE_A:
        return 0
    return np.random.normal(-0.1, 1)

# if there are two state action pair value array, use double Q-Learning
# otherwise use normal Q-Learning
def q_learning(q1, q2=None):
    state = STATE_START
    # track the # of action left in state A
    left_count = 0
    while state != STATE_TERMINAL:
        if q2 is None:
            action = choose_action(state, q1)
        else:
            # derive a action form Q1 and Q2
            action = choose_action(state, [item1 + item2 for item1, item2 in zip(q1, q2)])
        if state == STATE_A and action == ACTION_A_LEFT:
            left_count += 1
        reward = take_action(state, action)
        next_state = TRANSITION[state][action]
        if q2 is None:
            active_q = q1
            target = np.max(active_q[next_state])
        else:
            if np.random.binomial(1, 0.5) == 1:
                active_q = q1
                target_q = q2
            else:
                active_q = q2
                target_q = q1
            best_action = np.random.choice([action_ for action_, value_ in enumerate(active_q[next_state]) if value_ == np.max(active_q[next_state])])
            target = target_q[next_state][best_action]

        # Q-Learning update
        active_q[state][action] += ALPHA * (
            reward + GAMMA * target - active_q[state][action])
        state = next_state
    return left_count
# Figure 6.7, 1,000 runs may be enough, # of actions in state B will also affect the curves
def figure_6_5():
    # each independent run has 300 episodes
    episodes = 300
    runs = 1000
    left_counts_q = np.zeros((runs, episodes))
    left_counts_double_q = np.zeros((runs, episodes))
    for run in tqdm(range(runs)):
        q = copy.deepcopy(INITIAL_Q)
        q1 = copy.deepcopy(INITIAL_Q)
        q2 = copy.deepcopy(INITIAL_Q)
        for ep in range(0, episodes):
            left_counts_q[run, ep] = q_learning(q)
            left_counts_double_q[run, ep] = q_learning(q1, q2)
    left_counts_q = left_counts_q.mean(axis=0)
    left_counts_double_q = left_counts_double_q.mean(axis=0)

    plt.plot(left_counts_q, label='Q-Learning')
    plt.plot(left_counts_double_q, label='Double Q-Learning')
    plt.plot(np.ones(episodes) * 0.05, label='Optimal')
    plt.xlabel('episodes')
    plt.ylabel('% left actions from A')
    plt.legend()
    
    plt.plot()

figure_6_5()
100%|██████████| 1000/1000 [00:39<00:00, 25.27it/s]

Test every TD Method in Stochastic Gridworld

course tutorials and homework I designed at Macalester College as a TA

import random
import numpy as np
from matplotlib.table import Table
import matplotlib.pyplot as plt
from collections import defaultdict
from collections import Counter
import tqdm
"""
--------------------------
Environment

 c   0    1    2    3
r  - - - - - - - - - -
0 |    |    |    | +1 |
   - - - - - - - - - -
1 |    |WALL|    | -1 |
   - - - - - - - - - -
2 |    |    |    |    |
   - - - - - - - - - -

For Visualization:

   - - - - - - - - - -
3 |    |    |    | +1 |
   - - - - - - - - - -
2 |    |WALL|    | -1 |
   - - - - - - - - - -
1 |    |    |    |    |
r  - - - - - - - - - -
  c  1    2    3    4

@author: Charles Zhang
@date Mar 28, 2021
---------------------------
"""

# Grid Environment for MDP Value Iteration
class GridWorldEnv:
    START = (0, 0)
    EXIT = (float("inf"), float("inf"))
    # actions
    NORTH = (-1, 0)
    EAST = (0, 1)
    SOUTH = (1, 0)
    WEST = (0, -1)
    ACTIONS = [NORTH, EAST, SOUTH, WEST]
    index = {NORTH: 0, EAST: 1, SOUTH: 2, WEST: 3, EXIT: -1}
    GAMEOVER = (-1, -1)  # by convenience, the next state of terminals is (-1, -1)

    def __init__(self, shape, prob, walls, terminals, alive_reward=0.0):
        """
        :param shape: shape of gridworld: (row, col)
        :param prob: probability to go north(for stochastic move)
        :param walls: list of walls: [(1, 1)] if only one wall at (1, 1)
        :param terminals: dictionary of goal and death terminal states with reward
               {(0, 3): +1, (1, 3): -1} in this problem
        :param alive_reward: alive reward
        """
        self.rows, self.cols = shape
        p_not_north = (1 - prob) / 2
        self.turns = {-1: p_not_north, 0: prob, 1: p_not_north}  # turn west, north, or east
        self.walls = set(walls)
        self.terminals = terminals
        self.alive_reward = alive_reward
        self.state = self.START

    def reset(self):
        self.state = self.START

    def step(self, action):
        state = self.state
        if action == self.EXIT:
            self.state = self.GAMEOVER
            return self.getReward(state, action, state), self.state
        direction = action
        prob, accident = self.turns[0], self.turns[1]
        if prob <= random.random() < prob + accident:
            direction = self.ACTIONS[(self.index[action] - 1) % len(self.ACTIONS)]
        elif random.random() >= prob + accident:
            direction = self.ACTIONS[(self.index[action] + 1) % len(self.ACTIONS)]
        row = state[0] + direction[0]
        col = state[1] + direction[1]
        next_state = (row if 0 <= row < self.rows else state[0],
                      col if 0 <= col < self.cols else state[1])
        if next_state in self.walls:
            next_state = state
        self.state = next_state
        return self.getReward(state, action, self.state), self.state

    def getStates(self):
        """
        All available states(all states except wall(s))
        """
        return [(i, j) for i in range(self.rows)
                for j in range(self.cols) if (i, j) not in self.walls]

    def getTransitionStatesAndProbs(self, state, action):
        """
        get all (next state, p(s' | s, a)), in terms of stochastic action
        :return: list of (next_state, probability to the next state given current state and action)
        """
        if state in self.terminals:
            # if the state is terminal state, the probability of game over is 1
            return [(GridWorldEnv.GAMEOVER, 1.0)]
        result = []
        for turn in self.turns:  # loop over all turns(west, north, east)
            # turn west, north, or east of the "planned" action for the "real" action
            # -1, 0, 1, or 2 % 4 to get the "real" tuple of direction(action), e.g.:
            # planned: EAST, index 1 => turn is WEST(of EAST), index -1 => 1 + -1 = 0 => mod 4 => 0: real action NORTH
            #                        => turn is NORTH(of EAST), index 0 => 1 + 0 = 1 => mod 4 => 1: real action EAST
            direction = GridWorldEnv.ACTIONS[(GridWorldEnv.index[action] + turn) % len(GridWorldEnv.ACTIONS)]
            row = state[0] + direction[0]  # x coordinate of next state
            col = state[1] + direction[1]  # y coordinate of next state
            next_state = (row if 0 <= row < self.rows else state[0],
                          col if 0 <= col < self.cols else state[1])  # possible next state considered wall(s)
            if next_state in self.walls:  # stay if is wall
                next_state = state
            prob = self.turns[turn]
            result.append((next_state, prob))
        return result

    def getReward(self, state, action, nextState):
        """
        r(s, a, s')
        """
        if state in self.terminals:
            return self.terminals[state]  # get reward(penalty) for terminal states
        else:
            return self.alive_reward  # alive exploration rewards

    @staticmethod
    def isTerminal(next_state):
        """
        check if next state is terminal state, and by convenience, the next state of terminals is (-1, -1)
        """
        return next_state == GridWorldEnv.GAMEOVER

    def getLegalActions(self, state):
        """
        get all possible actions, if state the terminal, by convenience return (INF, INF)
        """
        if state in self.terminals:
            return [GridWorldEnv.EXIT]
        else:
            return GridWorldEnv.ACTIONS

    def to_2d_array(self, in_list, add=None):
        """
        input list to 2D array(matrix) based on rows and columns
        """
        mat = []
        k = False
        for i in range(self.rows):
            temp = []
            for j in range(self.cols):
                if add is not None:
                    if (i, j) in self.walls:
                        temp.append(add)
                        k = True
                        continue
                if k:
                    temp.append(in_list[i * self.cols + j - 1])
                else:
                    temp.append(in_list[i * self.cols + j])
            mat.append(temp)
        return mat

    def printValues(self, values):
        """
        visualize values in a table
        """
        fig, ax = plt.subplots()
        ax.set_axis_off()
        tb = Table(ax, bbox=[0, 0, 1, 1])
        values = list(values.values())
        values = np.round(np.array(self.to_2d_array(values, add=0)), decimals=2)
        width, height = 1.0 / self.rows, 1.0 / self.cols
        # Add cells
        for (i, j), val in np.ndenumerate(values):
            tb.add_cell(i, j, width, height, text=val,
                        loc='center', facecolor='white')
        # Row and column labels...
        n = max(self.rows, self.cols)
        for i in range(n):
            if i < self.rows:
                tb.add_cell(i, -1, width, height, text=self.rows - i, loc='right',
                            edgecolor='none', facecolor='none')
            if i < self.cols:
                tb.add_cell(self.rows, i, width, height / 2, text=i + 1, loc='center',
                            edgecolor='none', facecolor='none')
        tb.set_fontsize(13)
        ax.add_table(tb)
        plt.plot()

    def printQValues(self, q_values):
        fig, ax = plt.subplots()
        ax.set_axis_off()
        tb = Table(ax, bbox=[0, 0, 1, 1])
        width, height = 1.0 / self.rows, 1.0 / self.cols
        qs = []
        for i in range(self.rows):
            for j in range(self.cols):
                q = ''
                if (i, j) in self.walls or (i, j) in list(self.terminals.keys()):
                    q += "           \n"
                else:
                    q += "   %.2f   \n" % (q_values[(i, j), GridWorldEnv.NORTH])
                if (i, j) in self.walls:
                    q += "           \n"
                elif (i, j) in self.terminals:
                    q += "   %.2f   \n" % q_values[(i, j), GridWorldEnv.EXIT]
                else:
                    q += "%.2f    %.2f\n" % (q_values[(i, j), GridWorldEnv.WEST], q_values[(i, j), GridWorldEnv.EAST])
                if (i, j) in self.walls or (i, j) in list(self.terminals.keys()):
                    q += "           "
                else:
                    q += "   %.2f   " % (q_values[(i, j), GridWorldEnv.SOUTH])
                qs.append(q)
        qs = self.to_2d_array(qs)
        qs = np.array(qs)
        for (i, j), q in np.ndenumerate(qs):
            tb.add_cell(i, j, width, height, text=q,
                        loc='center', facecolor='white')
        n = max(self.rows, self.cols)
        for i in range(n):
            if i < self.rows:
                tb.add_cell(i, -1, width, height, text=self.rows - i, loc='right',
                            edgecolor='none', facecolor='none')
            if i < self.cols:
                tb.add_cell(self.rows, i, width, height / 2, text=i + 1, loc='center',
                            edgecolor='none', facecolor='none')
        tb.set_fontsize(15)
        ax.add_table(tb)
        plt.plot()

    def printPolicy(self, policy):
        action_map = {0: '↑', 1: '→',
                      2: '↓', 3: '←',
                      -1: ' '}
        policy = list(policy.values())
        policy = self.to_2d_array(policy, add=-1)
        policy = np.array(policy, dtype=object)
        fig, ax = plt.subplots()
        ax.set_axis_off()
        tb = Table(ax, bbox=[0, 0, 1, 1])
        width, height = 1.0 / self.rows, 1.0 / self.cols
        for (i, j), action in np.ndenumerate(policy):
            if (i, j) in self.walls:
                tb.add_cell(i, j, width, height, text='',
                            loc='center', facecolor='gray')
            elif (i, j) == list(self.terminals.keys())[0]:
                tb.add_cell(i, j, width, height, text='+1',
                            loc='center', facecolor='blue')
            elif (i, j) == list(self.terminals.keys())[1]:
                tb.add_cell(i, j, width, height, text='-1',
                            loc='center', facecolor='red')
            else:
                tb.add_cell(i, j, width, height, text=action_map[action],
                            loc='center', facecolor='white')

        n = max(self.rows, self.cols)
        for i in range(n):
            if i < self.rows:
                tb.add_cell(i, -1, width, height, text=self.rows - i, loc='right',
                            edgecolor='none', facecolor='none')
            if i < self.cols:
                tb.add_cell(self.rows, i, width, height / 2, text=i + 1, loc='center',
                            edgecolor='none', facecolor='none')
        tb.set_fontsize(13)
        ax.add_table(tb)
        plt.plot()

Value Iteration Method for $V_*$

"""
--------------------------------------------------------------------------------------
Bellman Equation:
v(s) = Σ_{a} π(a | s) Σ_{s', r} p(s', r | s, a) * [r + γ * v(s')]
Bellman Optimality Equation:
v*(s) = max_{a} { Σ p(s', r | s, a) * [r + γ * v(s')] }
      = max q_{π*} (s, a) = max q(s, a), as π(a | s) is unchanged in this MDP problem
@author: Charles Zhang
@date Mar 28, 2021
--------------------------------------------------------------------------------------
"""


# MDP Value Iteration with Bellman update
class ValueIteration:
    """
    Class to Find Optimal Value Functions
    """

    def valueIteration(self, GridWorld, discount=0.9, iterations=100):
        """
        Implement the value iteration algorithm by the optimal value function derived by Bellman equation
        v*(s) = max q(s, a)
        :param GridWorld: gridworld environment
        :param discount: discount gamma
        :param iterations: iteration times
        :return dictionary of values of all states
        """
        values = defaultdict(lambda: 0)  # initialize values with all 0 of states
        for i in range(iterations):  # or while TRUE:
            vnext = defaultdict(lambda: 0)  # initialize values for each iteration with all 0 of states
            for state in GridWorld.getStates():  # loop over all possible state(all states except wall(s))
                if not GridWorld.isTerminal(state):  # check if the state is terminal
                    maximum = float("-inf")
                    for action in GridWorld.getLegalActions(state):  # loop over all possible actions
                        q_value = self.get_q_from_v(GridWorld, state, action, values, discount)
                        maximum = max(maximum, q_value)  # update the max q value among actions of current state
                    vnext[state] = maximum  # optimal v*(s) = max q(s, a)
            values = vnext  # update the new value table
            # if while TURE above, here could compare the difference between
            # values and vnext to see if converges, and end the loop
        return values

    @staticmethod
    def get_q_from_v(GridWorld, state, action, values, discount=0.9):
        """
        get q(s, a) from values v(s) and action a by
        Bellman Equation: q(s, a) = Σ p(s', r | s, a) * [r + γ * v(s')]
        :param GridWorld: gridworld environment
        :param state: current state s
        :param action: current selected action a
        :param values: current values table
        :param discount: discount gamma
        :return: float q(s, a)
        """
        q_val = 0
        # implement Bellman equation: q(s, a) = Σ p(s', r | s, a) * [r + γ * v(s')]
        for next_state, prob in GridWorld.getTransitionStatesAndProbs(state, action):
            q_val += prob * (GridWorld.getReward(state, action, next_state) + discount * values[next_state])
        return q_val

    def getQValues(self, GridWorld, values, discount=0.9):
        """
        Get q values by Values
        :return dictionary of q values of all states
        """
        q_values = {}
        for state in GridWorld.getStates():
            if not GridWorld.isTerminal(state):
                for action in GridWorld.getLegalActions(state):
                    q_values[state, action] = self.get_q_from_v(GridWorld, state, action, values, discount)
        return q_values

    def getPolicy(self, GridWorld, values, discount=0.9):
        """
        get policy by values
        π*(s) ≈ π(s) = argmax_a Σ p(s', r | s, a) * [r + γ * v(s')] = argmax_a q(s, a)
        :return dictionary of optimal actions of all states
        """
        policy = {}
        for state in GridWorld.getStates():
            if not GridWorld.isTerminal(state):
                maximum = -float("inf")
                best_action = None
                # Choose action for the policy based no the max q values among actions of state s
                for action in GridWorld.getLegalActions(state):
                    q_value = self.get_q_from_v(GridWorld, state, action, values, discount)
                    if q_value > maximum:
                        maximum = q_value
                        best_action = action
                best_action = GridWorld.index[best_action]
                policy[state] = best_action
        return policy

Solve GridWorld

gamma = 0.9
alive_reward = 0
print("GridWorld Value Iteration with alive reward = %.2f, discount gamma  = %.2f\n" % (alive_reward, gamma))
terminals = {(0, 3): 1, (1, 3): -1}
gridworld0 = GridWorldEnv(shape=(3, 4), prob=0.8, walls=[(1, 1)], terminals=terminals, alive_reward=alive_reward)
vi = ValueIteration()
values = vi.valueIteration(GridWorld=gridworld0, discount=gamma, iterations=500)
gridworld0.printValues(values)
GridWorld Value Iteration with alive reward = 0.00, discount gamma  = 0.90

q_values = vi.getQValues(GridWorld=gridworld0, values=values, discount=gamma)
gridworld0.printQValues(q_values)
policy = vi.getPolicy(GridWorld=gridworld0, values=values, discount=gamma)
gridworld0.printPolicy(policy)

TD Learning

"""
--------------------------------------------------------------------------------------
Temporal Difference Methods including TD(0), SARSA, Q Learning, and Double Q learning 
Algorithm to approximate the optimal policy v*, q*, π*, and q* respectively.
Also verified by sum of rewards of each epsidoe, and RMSE with the optimal Q values, 
approximated by q values derived by value iteration using optimal Bellman Equation.
@author: Charles Zhang
@date Apr 14, 2021
--------------------------------------------------------------------------------------
"""

class TD:
    """
    Base Temporal Difference Learning Agent
    """
    def __init__(self, env, alpha=0.1, gamma=0.9, epsilon=0.3):
        self.env = env
        self.alpha = alpha
        self.gamma = gamma
        self.epsilon = epsilon
        self.qvalues = defaultdict(lambda: 0)
        self.rmse = []

    def getAction(self, state):
        """
        Implement the epsilon-greedy method
        """
        if np.random.binomial(1, self.epsilon) == 1:
            return random.choice(self.env.getLegalActions(state))
        # if random.random() < self.epsilon:
        #     return random.choice(self.env.getLegalActions(state))
        else:
            return self.getPolicy(state)

    def getPolicy(self, state):
        """
        choose the action of the max_a q(s, a)
        """
        actions = self.env.getLegalActions(state)
        return max(actions, key=lambda action: self.qvalues[state, action])

    def get_expected_q(self):
        """
        approximated expected q derived by value iteration algorithm
        """
        vi = ValueIteration()
        values = vi.valueIteration(self.env, discount=self.gamma, iterations=5000)
        return vi.getQValues(GridWorld=self.env, values=values, discount=gamma)

    @staticmethod
    def getRMSError(expect, result):
        """
        Root Mean Square Error
        """
        sumSE = 0
        for key in expect:
            sumSE += (expect[key] - result[key]) ** 2
        MSE = sumSE / len(expect)
        return MSE ** 0.5

TD(0)

class TDZero:

    def __init__(self, env, policy, alpha=0.3, gamma=0.9):
        self.env = env
        self.policy = policy
        self.alpha = alpha
        self.gamma = gamma
        self.v = {s: 0 for s in self.env.getStates()}
        self.v[(-1, -1)] = 0
        self.rmse = []

    @staticmethod
    def getAction(state, policy):
        """
        get the action based on input policy
        """
        prob_a = policy[state]
        actions = list(prob_a.keys())
        return actions[np.random.choice(a=len(actions), p=list(prob_a.values()))]

    def td0(self, episodes=1000):
        expected_v = vi.valueIteration(self.env, discount=self.gamma, iterations=1000)
        for _ in tqdm.tqdm(range(episodes)):
            self.env.reset()
            while not self.env.isTerminal(self.env.state):
                state = self.env.state
                action = self.getAction(state, self.policy)
                reward, next_state = self.env.step(action)
                self.v[state] += self.alpha * (reward + self.gamma * self.v[next_state] - self.v[state])
            error = self.getRMSError(expected_v, self.v)
            self.rmse.append(error)
        return self.v

    @staticmethod
    def getRMSError(expect, result):
        sumSE = 0
        ev = list(expect.values())
        er = list(result.values())
        for i in range(len(ev)):
            sumSE += (ev[i] - er[i]) ** 2
        MSE = sumSE / len(ev)
        return MSE ** 0.5
# A policy to be evaluated: 0.7 for action corresponding with the optimal policy, 0.1 for the rest of actions 
input_policy = {s: {a: 0.1 for a in gridworld0.getLegalActions(s)} for s in
                       gridworld0.getStates() if s not in gridworld0.terminals.keys()}
for s in gridworld0.terminals.keys():
    input_policy[s] = {(float('inf'), float('inf')): 1}
vi = ValueIteration()
for s in policy.keys():
    if s not in gridworld0.terminals.keys():
        expected_a_index = policy[s]
        expected_a = list(gridworld0.index.keys())[list(gridworld0.index.values()).index(expected_a_index)]
        input_policy[s][expected_a] = 0.7

td0 = TDZero(gridworld0, policy=input_policy, alpha=0.1, gamma=gamma)
values = td0.td0(episodes=2000)
print("Values derived by TD(0) Method")
gridworld0.printValues(values)
100%|██████████| 2000/2000 [00:00<00:00, 5293.62it/s]
Values derived by TD(0) Method
values = vi.valueIteration(GridWorld=gridworld0, discount=gamma, iterations=500)
print("Values derived by Value Iteration Algorithm")
gridworld0.printValues(values)
Values derived by Value Iteration Algorithm
rmse = td0.rmse
plt.plot(rmse)
plt.xlabel("Episodes")
plt.ylabel("RMS Error")
plt.show()
print(rmse[-1])
0.2198434580272065
gridworld0.printPolicy(vi.getPolicy(gridworld0, values, discount=gamma))
print('TD(0) Policy: ')
TD(0) Policy: 

SARSA

class SARSA(TD):
    def sarsa(self, episodes=5000):
        expected_q = self.get_expected_q()
        for _ in tqdm.tqdm(range(episodes)):
            self.env.reset()        # Initialize S
            state = self.env.state
            action = self.getAction(state)   # Choose A from S using policy derived from Q (e.g., eps greedy)
            while not self.env.isTerminal(state):
                reward, next_state = self.env.step(action)      # Take action A, observe R, S'
                next_action = self.getAction(next_state)  # Choose A0 from S0 using policy derived from Q (e.g.,greedy)
                diff = self.alpha * (reward +
                                     self.gamma * self.qvalues[next_state, next_action] -
                                     self.qvalues[state, action])
                self.qvalues[state, action] += diff        # SARSA TD update
                state = next_state        # S <- S'
                action = next_action      # A <- A'
            error = self.getRMSError(expected_q, self.qvalues)
            self.rmse.append(error)
        policy = {s: self.env.index[self.getPolicy(s)] for s in self.env.getStates()}
        return policy
sarsa = SARSA(gridworld0, alpha=0.1, gamma=gamma, epsilon=0.3)
policy = sarsa.sarsa(episodes=2000)
gridworld0.printPolicy(policy)
print('SARSA Policy: ')
100%|██████████| 2000/2000 [00:00<00:00, 16409.86it/s]
SARSA Policy: 
rmse = sarsa.rmse
plt.plot(rmse)
plt.xlabel("Episodes")
plt.ylabel("RMS Error")
plt.show()
print('The Q values derived by value iteration')
gridworld0.printQValues(q_values)  
The Q values derived by value iteration
print('The predicted optimal Q values by SARSA')
gridworld0.printQValues(sarsa.qvalues)  
The predicted optimal Q values by SARSA

Q Learning

class QLearning(TD):
    def q_learning(self, episodes=500):
        expected_q = self.get_expected_q()
        for _ in tqdm.tqdm(range(episodes)):                 # loop for each episode
            self.env.reset()                                 # Initialize S
            while not self.env.isTerminal(self.env.state):   # loop for each step of episode(can also use trajectories)
                state = self.env.state      # current state
                action = self.getAction(state)      # Choose A from S using policy derived from Q (e.g., epsilon-greedy)
                reward, next_state = self.env.step(action)              # Take action A, observe R, S'
                next_actions = self.env.getLegalActions(next_state)     # get legal next actions to compute max_a
                diff = self.alpha * (reward +
                                     self.gamma * max(self.qvalues[next_state, a] for a in next_actions) -
                                     self.qvalues[state, action])
                self.qvalues[state, action] += diff                      # temporal difference update
            error = self.getRMSError(expected_q, self.qvalues)           # store the RMS error per episode
            self.rmse.append(error)
        policy = {s: self.env.index[self.getPolicy(s)] for s in self.env.getStates()}
        return policy
q = QLearning(gridworld0, alpha=0.2, gamma=gamma, epsilon=0.8)
policy = q.q_learning(episodes=1000)
print('Q Learning Policy: ')
gridworld0.printPolicy(policy)
100%|██████████| 1000/1000 [00:00<00:00, 6588.35it/s]
Q Learning Policy: 
rmse = q.rmse
plt.plot(rmse)
plt.xlabel("Episodes")
plt.ylabel("RMS Error")
plt.show()
print("The Last RMSE: %f" % (rmse[-1]))
The Last RMSE: 0.068089
print('The Q values derived by value iteration')
gridworld0.printQValues(q_values)  
The Q values derived by value iteration
print('The predicted Q values by Q-learning')
gridworld0.printQValues(q.qvalues)   
The predicted Q values by Q-learning

Double Q Learning

class DoubleQLearning(TD):
    """
    Double Q-learning, for estimating Q1 ≈ Q2 ≈ q*
    """
    def __init__(self, env, alpha=0.1, gamma=0.9, epsilon=0.3):
        super().__init__(env, alpha, gamma, epsilon)
        self.q1 = defaultdict(lambda: 0)
        self.q2 = defaultdict(lambda: 0)
        self.rmse1 = []
        self.rmse2 = []

    def getDoubleQPolicy(self, state):
        """
        Choose A from S using greedy policy in Q1+Q2
        """
        actions = self.env.getLegalActions(state)
        return max(actions, key=lambda action: self.q1[state, action] + self.q2[state, action])

    def double_q_learning(self, episodes=5000):
        expected_q = self.get_expected_q()
        for _ in tqdm.tqdm(range(episodes)):  # loop for each episode
            self.env.reset()  # Initialize S
            while not self.env.isTerminal(self.env.state):  # loop for each step of episode
                state = self.env.state  # current state
                action = self.getAction(state)  # Choose A from S using greedy policy in Q1+Q2
                reward, next_state = self.env.step(action)  # Take action A, observe R, S'
                if random.random() < 0.5:   # With 0.5 probability
                    self.qvalues = self.q1
                    argmax_action = self.getPolicy(next_state)    # best action for next state
                    diff = self.alpha * (reward +
                                         self.gamma * self.q2[next_state, argmax_action] -
                                         self.q1[state, action])
                    self.q1[state, action] += diff
                else:
                    self.qvalues = self.q1
                    argmax_action = self.getPolicy(next_state)    # best action for next state
                    diff = self.alpha * (reward +
                                         self.gamma * self.q1[next_state, argmax_action] -
                                         self.q2[state, action])
                    self.q2[state, action] += diff  # temporal difference update
            error1 = self.getRMSError(expected_q, self.q1)  # store the Q1 RMS error per episode
            self.rmse1.append(error1)
            error2 = self.getRMSError(expected_q, self.q2)  # store the Q2 RMS error per episode
            self.rmse2.append(error2)
        self.qvalues = self.q1
        q1_policy = {s: self.env.index[self.getPolicy(s)] for s in self.env.getStates()}
        self.qvalues = self.q2
        q2_policy = {s: self.env.index[self.getPolicy(s)] for s in self.env.getStates()}
        return q1_policy, q2_policy
dq = DoubleQLearning(gridworld0, alpha=0.1, gamma=gamma, epsilon=1)
q1_policy, q2_policy = dq.double_q_learning(episodes=2000)
print('Double Q Learning Policy derived by Q1: ')
gridworld0.printPolicy(q1_policy)
100%|██████████| 2000/2000 [00:00<00:00, 3496.15it/s]
Double Q Learning Policy derived by Q1: 
print('Double Q Learning Policy derived by Q2: ')
gridworld0.printPolicy(q2_policy)
Double Q Learning Policy derived by Q2: 
rmse1 = dq.rmse1
plt.plot(rmse1)
plt.xlabel("Episodes")
plt.ylabel("RMS Error of Q1")
plt.show()
rmse2 = dq.rmse2
plt.plot(rmse2)
plt.xlabel("Episodes")
plt.ylabel("RMS Error of Q2")
Text(0, 0.5, 'RMS Error of Q2')
print("The Last RMSE of Q1: %f" % (rmse1[-1]))
print("The Last RMSE of Q2: %f" % (rmse2[-1]))
The Last RMSE of Q1: 0.037021
The Last RMSE of Q2: 0.064164
print('The Q1 values derived by Double Q Learning')
gridworld0.printQValues(dq.q1)  
The Q1 values derived by Double Q Learning
print('The Q2 values derived by Double Q Learning')
gridworld0.printQValues(dq.q2)  
The Q2 values derived by Double Q Learning
print('The Q values derived by value iteration')
gridworld0.printQValues(q_values)
The Q values derived by value iteration