# Mini Project: Dynamic Programming¶

In this notebook, you will write your own implementations of many classical dynamic programming algorithms.

While we have provided some starter code, you are welcome to erase these hints and write your code from scratch.

### Part 0: Explore FrozenLakeEnv¶

Use the code cell below to create an instance of the FrozenLake environment.

In [1]:
from frozenlake import FrozenLakeEnv

env = FrozenLakeEnv()


The agent moves through a $4 \times 4$ gridworld, with states numbered as follows:

[[ 0  1  2  3]
[ 4  5  6  7]
[ 8  9 10 11]
[12 13 14 15]]

and the agent has 4 potential actions:

LEFT = 0
DOWN = 1
RIGHT = 2
UP = 3

Thus, $\mathcal{S}^+ = \{0, 1, \ldots, 15\}$, and $\mathcal{A} = \{0, 1, 2, 3\}$. Verify this by running the code cell below.

In [2]:
# print the state space and action space
print(env.observation_space)
print(env.action_space)

# print the total number of states and actions
print(env.nS)
print(env.nA)

Discrete(16)
Discrete(4)
16
4


Dynamic programming assumes that the agent has full knowledge of the MDP. We have already amended the frozenlake.py file to make the one-step dynamics accessible to the agent.

Execute the code cell below to return the one-step dynamics corresponding to a particular state and action. In particular, env.P[1][0] returns the the probability of each possible reward and next state, if the agent is in state 1 of the gridworld and decides to go left.

In [3]:
# the frozen lake is slippery, so every intended action has a variety of outcomes:
# here the intention is to move down (1), but the outcomes are that you stay put,
# move left or move right, which are all equal likely.
env.P[14][1]

Out[3]:
[(0.3333333333333333, 13, 0.0, False),
(0.3333333333333333, 14, 0.0, False),
(0.3333333333333333, 15, 1.0, True)]
In [4]:
# so for every state you have all moving intentions as possibilities,
# which produce different outcomes
env.P[14]

Out[4]:
{0: [(0.3333333333333333, 10, 0.0, False),
(0.3333333333333333, 13, 0.0, False),
(0.3333333333333333, 14, 0.0, False)],
1: [(0.3333333333333333, 13, 0.0, False),
(0.3333333333333333, 14, 0.0, False),
(0.3333333333333333, 15, 1.0, True)],
2: [(0.3333333333333333, 14, 0.0, False),
(0.3333333333333333, 15, 1.0, True),
(0.3333333333333333, 10, 0.0, False)],
3: [(0.3333333333333333, 15, 1.0, True),
(0.3333333333333333, 10, 0.0, False),
(0.3333333333333333, 13, 0.0, False)]}
In [5]:
# here you can see that in every state all actions as moving intentions are possible
[len(env.P[s]) for s in range(env.nS)]

Out[5]:
[4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4]

Each entry takes the form

prob, next_state, reward, done

where:

• prob details the conditional probability of the corresponding (next_state, reward) pair, and
• done is True if the next_state is a terminal state, and otherwise False.

Thus, we can interpret env.P[1][0] as follows: $$\mathbb{P}(S_{t+1}=s',R_{t+1}=r|S_t=1,A_t=0) = \begin{cases} \frac{1}{3} \text{ if } s'=1, r=0\\ \frac{1}{3} \text{ if } s'=0, r=0\\ \frac{1}{3} \text{ if } s'=5, r=0\\ 0 \text{ else} \end{cases}$$

Feel free to change the code cell above to explore how the environment behaves in response to other (state, action) pairs.

### Part 1: Iterative Policy Evaluation¶

In this section, you will write your own implementation of iterative policy evaluation.

Your algorithm should accept four arguments as input:

• env: This is an instance of an OpenAI Gym environment, where env.P returns the one-step dynamics.
• policy: This is a 2D numpy array with policy.shape[0] equal to the number of states (env.nS), and policy.shape[1] equal to the number of actions (env.nA). policy[s][a] returns the probability that the agent takes action a while in state s under the policy.
• gamma: This is the discount rate. It must be a value between 0 and 1, inclusive (default value: 1).
• theta: This is a very small positive number that is used to decide if the estimate has sufficiently converged to the true value function (default value: 1e-8).

The algorithm returns as output:

• V: This is a 1D numpy array with V.shape[0] equal to the number of states (env.nS). V[s] contains the estimated value of state s under the input policy.

Please complete the function in the code cell below.

In [6]:
import numpy as np

def policy_evaluation(env, policy, gamma=1, theta=1e-8):
"""if the policy is known the state value function can be calculated from it
in an iterative way"""

# the state value function is initialized with a zero guess
V = np.zeros(env.nS)

# now the iteration begins
while True:
# initialize the difference to the next value function
delta = 0

# for every state the the state value is calculated:
# in every calculation the latest values of the state function are
# used for other states
for s in range(env.nS):

# initialize the next calculation of V[s]
Vs_new = 0

# now the policy is a stochastic policy with actions that are
# executed with a probability
for a_intended, p_a_intented in enumerate(policy[s]):

# for intended action there is a list of possible outcomes
# the outcomes are tuples of probability, state, reward (and an indicator
# whether the state is a final state)
for p_outcome, s_outcome, r_outcome, _ in env.P[s][a_intended]:

# the statefunction sums up that outcomes weigthed by their probabilities
Vs_new += p_a_intented * p_outcome * (r_outcome + gamma * V[s_outcome])

# get the stop criteria for the iteration which is the maximal
# difference between the new and the old value of the state function in this
# iteration
delta = max(delta, abs(V[s] - Vs_new))

# the newly calculated value is immediately replaced in the value state function
# and used to calculate the next states
V[s] = Vs_new

# test the stop criteria for the iteration after every complete run through all states
if delta < theta:
break

# when the iteration has ended the state value function is returned
return V


We will evaluate the equiprobable random policy $\pi$, where $\pi(a|s) = \frac{1}{|\mathcal{A}(s)|}$ for all $s\in\mathcal{S}$ and $a\in\mathcal{A}(s)$.

Use the code cell below to specify this policy in the variable random_policy.

In [7]:
random_policy = np.ones([env.nS, env.nA]) / env.nA


Run the next code cell to evaluate the equiprobable random policy and visualize the output. The state-value function has been reshaped to match the shape of the gridworld.

In [8]:
from plot_utils import plot_values

# evaluate the policy
V = policy_evaluation(env, random_policy)

plot_values(V)

<matplotlib.figure.Figure at 0x7fb2921c7828>

Run the code cell below to test your function. If the code cell returns PASSED, then you have implemented the function correctly!

Note: In order to ensure accurate results, make sure that your policy_evaluation function satisfies the requirements outlined above (with four inputs, a single output, and with the default values of the input arguments unchanged).

In [9]:
import check_test

check_test.run_check('policy_evaluation_check', policy_evaluation)


PASSED

### Part 2: Obtain $q_\pi$ from $v_\pi$¶

In this section, you will write a function that takes the state-value function estimate as input, along with some state $s\in\mathcal{S}$. It returns the row in the action-value function corresponding to the input state $s\in\mathcal{S}$. That is, your function should accept as input both $v_\pi$ and $s$, and return $q_\pi(s,a)$ for all $a\in\mathcal{A}(s)$.

Your algorithm should accept four arguments as input:

• env: This is an instance of an OpenAI Gym environment, where env.P returns the one-step dynamics.
• V: This is a 1D numpy array with V.shape[0] equal to the number of states (env.nS). V[s] contains the estimated value of state s.
• s: This is an integer corresponding to a state in the environment. It should be a value between 0 and (env.nS)-1, inclusive.
• gamma: This is the discount rate. It must be a value between 0 and 1, inclusive (default value: 1).

The algorithm returns as output:

• q: This is a 1D numpy array with q.shape[0] equal to the number of actions (env.nA). q[a] contains the (estimated) value of state s and action a.

Please complete the function in the code cell below.

In [10]:
def q_from_v(env, V, s, gamma=1):
"""from the state function v the state action function q can be calculated for any state:
q(s, a) is the value that you get by performing action a in state s and then afterwards
following the policy for all subsequent steps.

the function computes q(s , a) for one s and all a's
"""
# initialize the q values
q = np.zeros(env.nA)

# for all possible actions in state s, which is actually all actions in env.nA as we saw
# earlier
for a_intended in env.P[s].keys():

# for all possible outcomes if action a_intended is taken
for p_outcome, s_outcome, r_outcome, _ in env.P[s][a_intended]:

# the new value is calculated
q[a_intended] += p_outcome * (r_outcome + gamma * V[s_outcome])

return q


Run the code cell below to print the action-value function corresponding to the above state-value function.

In [11]:
Q = np.zeros([env.nS, env.nA])
for s in range(env.nS):
Q[s] = q_from_v(env, V, s)
print("Action-Value Function:")
print(Q)

Action-Value Function:
[[ 0.0147094   0.01393978  0.01393978  0.01317015]
[ 0.00852356  0.01163091  0.0108613   0.01550788]
[ 0.02444514  0.02095298  0.02406033  0.01435346]
[ 0.01047649  0.01047649  0.00698432  0.01396865]
[ 0.02166487  0.01701828  0.01624865  0.01006281]
[ 0.          0.          0.          0.        ]
[ 0.05433538  0.04735105  0.05433538  0.00698432]
[ 0.          0.          0.          0.        ]
[ 0.01701828  0.04099204  0.03480619  0.04640826]
[ 0.07020885  0.11755991  0.10595784  0.05895312]
[ 0.18940421  0.17582037  0.16001424  0.04297382]
[ 0.          0.          0.          0.        ]
[ 0.          0.          0.          0.        ]
[ 0.08799677  0.20503718  0.23442716  0.17582037]
[ 0.25238823  0.53837051  0.52711478  0.43929118]
[ 0.          0.          0.          0.        ]]


Run the code cell below to test your function. If the code cell returns PASSED, then you have implemented the function correctly!

Note: In order to ensure accurate results, make sure that the q_from_v function satisfies the requirements outlined above (with four inputs, a single output, and with the default values of the input arguments unchanged).

In [12]:
check_test.run_check('q_from_v_check', q_from_v)


PASSED

### Part 3: Policy Improvement¶

In this section, you will write your own implementation of policy improvement.

Your algorithm should accept three arguments as input:

• env: This is an instance of an OpenAI Gym environment, where env.P returns the one-step dynamics.
• V: This is a 1D numpy array with V.shape[0] equal to the number of states (env.nS). V[s] contains the estimated value of state s.
• gamma: This is the discount rate. It must be a value between 0 and 1, inclusive (default value: 1).

The algorithm returns as output:

• policy: This is a 2D numpy array with policy.shape[0] equal to the number of states (env.nS), and policy.shape[1] equal to the number of actions (env.nA). policy[s][a] returns the probability that the agent takes action a while in state s under the policy.

Please complete the function in the code cell below. You are encouraged to use the q_from_v function you implemented above.

In [13]:
def policy_improvement(env, V, gamma=1):
"""this function improves a policy by building on a state function V:
in every state the improved policy takes the action with the highest state action function
value
"""
# initialize the new policy
policy = np.zeros([env.nS, env.nA])

# for all states s, define the next action
for s in range(env.nS):

# get the state action values
q_s = q_from_v(env, V, s, gamma=1)

# deterministic policy: chose the action with the highest state action value
policy[s][np.argmax(q_s)] = 1

# stochastic policy:
# we could also choose a random combination of the actions with the highest
# state action values, if several actions have the same value

return policy


Run the code cell below to test your function. If the code cell returns PASSED, then you have implemented the function correctly!

Note: In order to ensure accurate results, make sure that the policy_improvement function satisfies the requirements outlined above (with three inputs, a single output, and with the default values of the input arguments unchanged).

Before moving on to the next part of the notebook, you are strongly encouraged to check out the solution in Dynamic_Programming_Solution.ipynb. There are many correct ways to approach this function!

In [14]:
check_test.run_check('policy_improvement_check', policy_improvement)


PASSED

### Part 4: Policy Iteration¶

In this section, you will write your own implementation of policy iteration. The algorithm returns the optimal policy, along with its corresponding state-value function.

Your algorithm should accept three arguments as input:

• env: This is an instance of an OpenAI Gym environment, where env.P returns the one-step dynamics.
• gamma: This is the discount rate. It must be a value between 0 and 1, inclusive (default value: 1).
• theta: This is a very small positive number that is used to decide if the policy evaluation step has sufficiently converged to the true value function (default value: 1e-8).

The algorithm returns as output:

• policy: This is a 2D numpy array with policy.shape[0] equal to the number of states (env.nS), and policy.shape[1] equal to the number of actions (env.nA). policy[s][a] returns the probability that the agent takes action a while in state s under the policy.
• V: This is a 1D numpy array with V.shape[0] equal to the number of states (env.nS). V[s] contains the estimated value of state s.

Please complete the function in the code cell below. You are strongly encouraged to use the policy_evaluation and policy_improvement functions you implemented above.

In [15]:
import copy

def policy_iteration(env, gamma=1, theta=1e-8):
"""this function starts with a trivial guess for a policy
then it iterates over policy evaluation and policy improvement:
- from the policy get the state function V approximately through iteration
- with the state function given, we can then improve the policy (implemented above)
- we iterate these steps until the policy becomes stable (difference between the policy
and its next iteration smaller then theta)
"""

# initialize the policy
policy = np.ones([env.nS, env.nA]) / env.nA

while True:
# get state value function for policy
V = policy_evaluation(env, policy, gamma, theta)

# use the state function to improve the policy
next_policy = policy_improvement(env, V, gamma)

# test whether policy is stable
delta = policy - next_policy

# the calculated policy will be the next policy in the iteration
# or the policy that gets to be returned
policy = next_policy.copy()

# break the iteration if the stop criteria is reached
if delta.all() < theta:
break

# policy and state function are returned
return policy, V


Run the next code cell to solve the MDP and visualize the output. The optimal state-value function has been reshaped to match the shape of the gridworld.

Compare the optimal state-value function to the state-value function from Part 1 of this notebook. Is the optimal state-value function consistently greater than or equal to the state-value function for the equiprobable random policy?

In [16]:
# obtain the optimal policy and optimal state-value function
policy_pi, V_pi = policy_iteration(env)

# print the optimal policy
print("\nOptimal Policy (LEFT = 0, DOWN = 1, RIGHT = 2, UP = 3):")
print(policy_pi,"\n")

plot_values(V_pi)

Optimal Policy (LEFT = 0, DOWN = 1, RIGHT = 2, UP = 3):
[[ 1.  0.  0.  0.]
[ 0.  0.  0.  1.]
[ 0.  0.  0.  1.]
[ 0.  0.  0.  1.]
[ 1.  0.  0.  0.]
[ 1.  0.  0.  0.]
[ 1.  0.  0.  0.]
[ 1.  0.  0.  0.]
[ 0.  0.  0.  1.]
[ 0.  1.  0.  0.]
[ 1.  0.  0.  0.]
[ 1.  0.  0.  0.]
[ 1.  0.  0.  0.]
[ 0.  0.  1.  0.]
[ 0.  1.  0.  0.]
[ 1.  0.  0.  0.]]



Run the code cell below to test your function. If the code cell returns PASSED, then you have implemented the function correctly!

Note: In order to ensure accurate results, make sure that the policy_iteration function satisfies the requirements outlined above (with three inputs, two outputs, and with the default values of the input arguments unchanged).

In [17]:
check_test.run_check('policy_iteration_check', policy_iteration)


PASSED

### Part 5: Truncated Policy Iteration¶

In this section, you will write your own implementation of truncated policy iteration.

You will begin by implementing truncated policy evaluation. Your algorithm should accept five arguments as input:

• env: This is an instance of an OpenAI Gym environment, where env.P returns the one-step dynamics.
• policy: This is a 2D numpy array with policy.shape[0] equal to the number of states (env.nS), and policy.shape[1] equal to the number of actions (env.nA). policy[s][a] returns the probability that the agent takes action a while in state s under the policy.
• V: This is a 1D numpy array with V.shape[0] equal to the number of states (env.nS). V[s] contains the estimated value of state s.
• max_it: This is a positive integer that corresponds to the number of sweeps through the state space (default value: 1).
• gamma: This is the discount rate. It must be a value between 0 and 1, inclusive (default value: 1).

The algorithm returns as output:

• V: This is a 1D numpy array with V.shape[0] equal to the number of states (env.nS). V[s] contains the estimated value of state s.

Please complete the function in the code cell below.

In [18]:
def truncated_policy_evaluation(env, policy, V, max_it=1, gamma=1):
"""truncated policy iteration gets a policy and state function that is an initial guess
then it iterates a given number of times to get the approximate state function
for the policy

this time we do not iterate until the state function becomes stable, but just a fixed
number of times: because of this the method is called 'truncated'
"""

# iterate a fixed number of times
for i in range(max_it):

# get the state function for the policy

# for all states
for s in range(env.nS):

# calculate the next value for s and use all already calculated state values
v_s = 0

# get the state action values for that state
q_s = q_from_v(env, V, s, gamma)

# for all actions in the policy
for a, p_action in enumerate(policy[s]):

# the new state value is calculated as the sum of
# the products probability for an action * action state value for that action
v_s += p_action * q_s[a]

# the state value is replaced in the state function for immediate usage of
# calculation the next states
V[s] = v_s

# return the state value function
return V


Next, you will implement truncated policy iteration. Your algorithm should accept five arguments as input:

• env: This is an instance of an OpenAI Gym environment, where env.P returns the one-step dynamics.
• max_it: This is a positive integer that corresponds to the number of sweeps through the state space (default value: 1).
• gamma: This is the discount rate. It must be a value between 0 and 1, inclusive (default value: 1).
• theta: This is a very small positive number that is used for the stopping criterion (default value: 1e-8).

The algorithm returns as output:

• policy: This is a 2D numpy array with policy.shape[0] equal to the number of states (env.nS), and policy.shape[1] equal to the number of actions (env.nA). policy[s][a] returns the probability that the agent takes action a while in state s under the policy.
• V: This is a 1D numpy array with V.shape[0] equal to the number of states (env.nS). V[s] contains the estimated value of state s.

Please complete the function in the code cell below.

In [19]:
def truncated_policy_iteration(env, max_it=1, gamma=1, theta=1e-8):
"""truncated policy iteration iterates as before from a trivial policy via getting the
state function with the truncated method to improving the policy

the policy is improved until it becomes stable"""
V = np.zeros(env.nS)
policy = np.zeros([env.nS, env.nA])

while True:

# get policy improvement
policy = policy_improvement(env, V, gamma)

# copy V for comparision
V_last = V.copy()

# get state value function for policy
V = truncated_policy_evaluation(env, policy, V, max_it, gamma)

# get difference between value state functions
delta = max(abs(V - V_last))

if delta < theta:
break

return policy, V


Run the next code cell to solve the MDP and visualize the output. The state-value function has been reshaped to match the shape of the gridworld.

Play with the value of the max_it argument. Do you always end with the optimal state-value function?

In [20]:
policy_tpi, V_tpi = truncated_policy_iteration(env, max_it=2)

# print the optimal policy
print("\nOptimal Policy (LEFT = 0, DOWN = 1, RIGHT = 2, UP = 3):")
print(policy_tpi,"\n")

# plot the optimal state-value function
plot_values(V_tpi)

Optimal Policy (LEFT = 0, DOWN = 1, RIGHT = 2, UP = 3):
[[ 1.  0.  0.  0.]
[ 0.  0.  0.  1.]
[ 0.  0.  0.  1.]
[ 0.  0.  0.  1.]
[ 1.  0.  0.  0.]
[ 1.  0.  0.  0.]
[ 1.  0.  0.  0.]
[ 1.  0.  0.  0.]
[ 0.  0.  0.  1.]
[ 0.  1.  0.  0.]
[ 1.  0.  0.  0.]
[ 1.  0.  0.  0.]
[ 1.  0.  0.  0.]
[ 0.  0.  1.  0.]
[ 0.  1.  0.  0.]
[ 1.  0.  0.  0.]]



Run the code cell below to test your function. If the code cell returns PASSED, then you have implemented the function correctly!

Note: In order to ensure accurate results, make sure that the truncated_policy_iteration function satisfies the requirements outlined above (with four inputs, two outputs, and with the default values of the input arguments unchanged).

In [21]:
check_test.run_check('truncated_policy_iteration_check', truncated_policy_iteration)


PASSED

### Part 6: Value Iteration¶

In this section, you will write your own implementation of value iteration.

Your algorithm should accept three arguments as input:

• env: This is an instance of an OpenAI Gym environment, where env.P returns the one-step dynamics.
• gamma: This is the discount rate. It must be a value between 0 and 1, inclusive (default value: 1).
• theta: This is a very small positive number that is used for the stopping criterion (default value: 1e-8).

The algorithm returns as output:

• policy: This is a 2D numpy array with policy.shape[0] equal to the number of states (env.nS), and policy.shape[1] equal to the number of actions (env.nA). policy[s][a] returns the probability that the agent takes action a while in state s under the policy.
• V: This is a 1D numpy array with V.shape[0] equal to the number of states (env.nS). V[s] contains the estimated value of state s.
In [22]:
def value_iteration(env, gamma=1, theta=1e-8):
"""value iteration is the short cut to policy iteration: here the state function for
the next policy is just calculated in one step, that shortens many computations"""

# starting with an intial state value fnction
V = np.zeros(env.nS)

while True:

delta = 0

# for all states get the new value function
for s in range(env.nS):

# the old value is remembered
v_s = V[s]

# the action value function is calculated for all actions
q_s = q_from_v(env, V, s, gamma)

# the next value of the state function is the maximum of all action values
V[s] = max(q_s)

# the difference is recorded
delta = max(delta, abs(V[s] - v_s))

# the iteration ends when the differences for all states are small enough
if delta < theta:
break

# the policy is improved using the final state function
policy = policy_improvement(env, V, gamma)

# the policy and the state function are returned
return policy, V


Use the next code cell to solve the MDP and visualize the output. The state-value function has been reshaped to match the shape of the gridworld.

In [23]:
policy_vi, V_vi = value_iteration(env)

# print the optimal policy
print("\nOptimal Policy (LEFT = 0, DOWN = 1, RIGHT = 2, UP = 3):")
print(policy_vi,"\n")

# plot the optimal state-value function
plot_values(V_vi)

Optimal Policy (LEFT = 0, DOWN = 1, RIGHT = 2, UP = 3):
[[ 1.  0.  0.  0.]
[ 0.  0.  0.  1.]
[ 0.  0.  0.  1.]
[ 0.  0.  0.  1.]
[ 1.  0.  0.  0.]
[ 1.  0.  0.  0.]
[ 1.  0.  0.  0.]
[ 1.  0.  0.  0.]
[ 0.  0.  0.  1.]
[ 0.  1.  0.  0.]
[ 1.  0.  0.  0.]
[ 1.  0.  0.  0.]
[ 1.  0.  0.  0.]
[ 0.  0.  1.  0.]
[ 0.  1.  0.  0.]
[ 1.  0.  0.  0.]]



Run the code cell below to test your function. If the code cell returns PASSED, then you have implemented the function correctly!

Note: In order to ensure accurate results, make sure that the value_iteration function satisfies the requirements outlined above (with three inputs, two outputs, and with the default values of the input arguments unchanged).

In [24]:
check_test.run_check('value_iteration_check', value_iteration)


PASSED