In [1]:

```
from time import time
from operator import itemgetter
import numpy as np
```

Many scheduling problems are NP-hard, meaning among other things that the number of possible solutions grows exponentially with problem size. Searching efficiently through the solution space becomes very important for finding a high-quality solution in a reasonable amount of time, because otherwise we would be overwhelmed with candidate solutions and spend an inordinate amount of time collecting them all when we only need the best one.

Some problems exhibit what is called optimal substructure, meaning that the accumulation of greedy solutions of subproblems leads to the optimal solution of the full problem. The approach to solving problems through this lens is called dynamic programming.

This tutorial is based on a problem statement assigned to me during the hiring process at a certain company. I thought it makes a neat, encapsulated tutorial subject and put together this notebook to demonstrate the thought process of breaking down such a problem.

I find that there are many still wrapping their heads around what exactly is dynamic programming. Some think it's just memoization plus recursion -- but really, it's more than that. It's an approach that exploits properties of the problem that allow for an elegant method to finding the optimal solution.

The approach is a bit like induction: first we solve a base case, then we show that the solution method gives an optimal solution for a problem of size $n$, then we show that the solution also works for a problem of size $n+1$ using one more step of the solution method.

What differentiates dynamic programming from other recursion-based search methods is that we cache the results of the solutions to subproblems so that the computation need only occur once instead of $O(2^m)$ times, where $m$ is the level of the subproblem. We also exploit a recursive definition of the problem so that we end up with an elegant and compact solution method. Dynamic programming is thus the happiest marriage of induction, recursion, and greedy optimization.

The "dynamic" part of this approach is that we only have to apply one function repeatedly to the problem, and this function will return optimal values of the full problem as well as any sub- or superproblem.

You are an operations specialist in a company whose supply chain is undergoing restructuring. The machines you own are failing, so you've done your research and have collected a number of offers for buying and selling machines while your supply chain is brought back up to speed.

Only one machine may be used at a time, and you must sell the machine you own before buying another;

You can only buy machines with the cash you have in hand at that instant;

Each machine $i$ has:

a day of availability $D_i$,

purchase cost $P_i$,

resale value $R_i < P_i$, and

a certain amount of profit generated per day $G_i$;

A machine $i$ is

*only*available on one day, $D_i$;You can't earn any money on a day where you are buying a machine since the one you already have must be taken offline and switched out.

Your task is to determine exactly which machines to buy and sell to maximize profit during the restructuring period.

To really get a grip on this problem, it's vital to be able to define the problem statement in terms of the composition of subproblems. We must identify the atomic subproblem upon which problems of increasing size are built.

To figure out how to attack this problem, let's start off by listing some of its properties:

we only care about the moments when we have the opportunity to buy a new machine and sell the old one because we can easily compute the profit gained between and during purchases and the function for calculating the profit is deterministic;

state transitions only occur when machines are purchased;

the outcome of a given purchase is not constrained by any previous or future purchases besides the constraints on the evolving state of the system;

if the cash-only constraint is violated at any time, any solutions derived from the present partial solution are infeasible.

Take some time to fully comprehend these properties and their implications. These are important for moving forward confidently in our decision to apply dynamic programming to this problem.

One way to model this problem is as a graph, where nodes are machine-purchases and edges are the amount of profit gained between purchases. In this case, we can easily calculate the profit gained between the actions of buying machine $i$ and machine $j$ (and not paying for any machines between $i$ and $j$). Because the profit function is deterministic, we can store these computations and recall them instead of recomputing them many times over.

We include two additional nodes, one at the beginning and one at the end, to represent respectively the initial and final states of the system. The reason for this is to avoid having to deal with boundary cases in the `profit()`

function. Then we model our decision process as a path in this graph.

Ultimately we must find the path with the highest value whose running sum never drops below zero. However this constraint on the problem makes it very hard to solve with traditional graph methods like shortest-path algorithms. So we must use a more iterative approach, akin to a search for a highest-value path but with additional stopping conditions if the present path is infeasible according to the constraint. Because of the irreversibility of time, the graph is guaranteed acyclic.

All these properties reinforce the notion that treating this problem as the successive optimization of larger and larger subproblems will prove fruitful. So we recognize that *recursion* will play a large role. Recursion is nothing more than applying the same function to a smaller version of the original input until we reach a "base case". Our base case is just any path which begins at the initial state and ends at the final state.

First, we need to be able to generate and represent a problem case.

In [2]:

```
class Case:
def __init__(self, inits, specs):
self.inits = list(inits)
# store `specs` in the original format
self.orig_specs = specs
# create also a `specs` which has an initial and final state added to it, and is sorted for easy processing
self.specs = np.vstack([np.array([[0, 0, self.inits[1], 0]]),
np.array(sorted(specs, key=itemgetter(0))),
np.array([[self.inits[2]+1, 0, 0, 0]])])
def generate_case(N, C, D):
inits = (N,C,D)
specs = np.zeros((N, 4))
specs[:,3] = np.random.randint(1, np.sqrt(C), size=N) # Gi
specs[:,0] = np.random.randint(1, D+1, size=N) # Di
specs[:,1:3] = np.sort(np.random.randint(1, C, size=(N,2)), axis=1)[:,::-1] # Pi > Ri
return Case(inits, specs)
```

You will notice above that `specs`

is saved in a form which makes traversal easier. In particular, we treat the initial state as being represented by another machine which makes no profit and has a resale value equivalent to the starting cash amount. Further, the final state is assigned to a particular day but neither requires any money to "buy" nor generates any profit. This way we can define a `profit()`

function that handles the boundary conditions naturally.

It's useful to be able to calculate how much money we earn between purchases of machines. This amount will be constant over the course of the problem and will need to be recalled many times.

In [3]:

```
def profit(spec1, spec2):
# Two lines of `specs` which describe two possible states
d1, p1, r1, g1 = spec1
d2, p2, r2, g2 = spec2
# we cannot travel back in time
if d1 >= d2:
return -np.inf
# the first term is the profit generated from running machine 1
# the second term is the resale value of machine 1
# the third term is the purchase cost of machine 2
return g1 * (d2 - d1 - 1) + r1 - p2
```

The definition of `profit()`

above implies that the moment in time represented by a state-node in the graph is immediately following the purchase of the new machine.

So now we have our subproblem. It can be summarized as the question, "Given a certain initial cash amount and a time horizon, how much can I profit if there is one machine offered and the cash constraint is enforced?" The answer to this question is zero if the `profit()`

function returns a negative number with magnitude greater than the original cash amount, and positive if it turns out buying the machine is beneficial in the long run.

The beauty of dynamic programming comes in its exploitation of the memoizability of the problem's components. In our case, we know we can cache the amount of profit gained between machines $i$ and $j$ and recall this number when computing the path value. As we walk recursively through the graph, we only need to track our current cash amount by adding these precomputed values and follow the paths which satisfy the constraint.

The man who developed dynamic programming, Richard Bellman, did so on the coattails of his principle of optimality, from which the optimal substructure property is derived and which makes it possible to create elegant solutions to combinatorial problems such as this one.

At its most basic, the Bellman equation describes the iterative accumulation of value as actions are taken toward a final solution,

$$ V(x_i) = \max_{a_i} \left\{ F(x_i, a_i) + V(x_{i+1}) \right\}, \quad x_{i+1} = T(x_i, a_i) $$

where $x_i$ is the state, $a_i$ is an action, $F$ is the atomic value function (in our case, `profit()`

), $T$ is the state transition function from $i$ to $i+1$, and $V$ is of course the value of the solution. For our purposes, $a_i \in \{0,1\}$ represents the choice to purchase machine $i$: 1 for yes, 0 for no. $x_i$ stores the machines' properties as well as the profit accumulated so far. Thus to maximize this, we must first recursively dive toward the end of the solution and return back out with the maximum value by greedily maximizing each subproblem, starting with the smallest ones.

In [4]:

```
cache = {}
# We store the solution and step lengths in terms of the graph's nodes and edges, respectively,
# rather than the specs originally fed to the Case object.
def dp1(specs, solution=[0], step_lengths=[0]):
# We have a global variable which caches oft-needed values.
global cache
# The machine we start at for this subproblem.
source = solution[-1]
# If we haven't stored these values yet, do it now.
# We need them later in this function call and many more times down the line.
if cache[source] == -1:
cache[source] = {option : profit(specs[source], spec) for option,spec in enumerate(specs[source+1:], start=source+1)}
# We get the list of possible next steps based on satisfaction of the cash constraint.
cash_so_far = sum(step_lengths)
next_options = [option for option in cache[source] if cash_so_far + cache[source][option] >= 0]
# If we reach this condition, there is no way to move forward so we return what we have.
if len(next_options) == 0:
return np.array(step_lengths), np.array(solution)
max_step_lengths, max_solution = np.array([0]), []
for option in next_options:
# Recursive call: start solving each of the next smallest subproblems
# and return the solution which represents the optimal policy.
this_step_lengths, this_solution = dp1(specs, solution+[option], step_lengths+[cache[source][option]])
# Record the result of the optimal subproblem and return at the end.
if max_step_lengths.sum() < this_step_lengths.sum():
max_step_lengths = this_step_lengths.copy()
max_solution = this_solution.copy()
return max_step_lengths, max_solution
```

We can apply additional constraints, but only if we are capable of proving that solutions meet or violate those constraints. For the case of the cash constraint, we simply neglect searching further beyond an action which results in a net cash amount less than zero.

In addition to the cash constraint, we can neglect solution paths which are provably incapable of reaching a score above the best among all solutions found so far. This is called branch and bound and is the basis for many efficient combinatorial search techniques, most notably in powerful (hard-coded) chess engines today as *alpha-beta pruning*. Note that branch and bound is not essential to dynamic programming *per se*, but does help to remove low-quality paths from consideration and speed up convergence considerably.

We need to define an upper bound of a partial solution's eventual value. Let us imagine that we have just purchased machine $i$. Let us create a new machine which has the highest profit generation of any machines which come after $i$ as well as the highest resale value of any of them. Then we replace our current machine at no cost and begin running the new one immediately. How much profit is possible at the end of the restructuring period after doing this defines a hard upper bound of the partial solution's eventual value.

In [5]:

```
lower_bound = 0
def dp2(specs, solution=[0], step_lengths=[0]):
global cache
# Addition one: another global variable which stores a lower bound of the optimal solution
# which is used for comparison against the upper bound of partial solutions.
global lower_bound
source = solution[-1]
if cache[source] == -1:
cache[source] = {option : profit(specs[source], spec)
for option,spec in enumerate(specs[source+1:], start=source+1)}
cash_so_far = sum(step_lengths)
# Addition two: calculate the upper bound of this branch and compare against the global lower bound
profit_w_best = specs[source:,3].max()
remaining_time = (specs[-1,0] - 1) - specs[source,0]
resale_w_best = specs[source:,2].max()
if cash_so_far + (profit_w_best * remaining_time) + resale_w_best < lower_bound:
return np.array([-np.inf]), np.array([])
next_options = [option for option in cache[source] if cash_so_far + cache[source][option] >= 0]
if len(next_options) == 0:
# Addition three: tracking the lower bound as we find better and better solutions.
value = sum(step_lengths)
if lower_bound < value:
lower_bound = value
return np.array(step_lengths), np.array(solution)
max_step_lengths, max_solution = np.array([0]), []
# Addition four: we permute the list of next steps to mix up the traversal order.
# The intention is to introduce mixing and find higher-quality solutions faster,
# to make the pruning more effective.
for option in np.random.permutation(next_options):
this_step_lengths, this_solution = dp2(specs, solution+[option], step_lengths+[cache[source][option]])
if max_step_lengths.sum() < this_step_lengths.sum():
max_step_lengths = this_step_lengths.copy()
max_solution = this_solution.copy()
return max_step_lengths, max_solution
```

Now we slap together all the logic and create a singular function which solves every case in a list of cases using the recursive function of your choosing.

In [6]:

```
def solve(f, cases, verbose=False):
"""
Solve all "cases" using preferred recursive function.
"""
global cache
global lower_bound
for k,case in enumerate(cases):
N,C,D = case.inits
cache = {i:-1 for i in range(N+2)}
lower_bound = 0
best_profit, best_solution = f(case.specs)
if verbose:
case_str = "Case {}: {:d}".format(k+1, int(best_profit.sum()))
print(case_str)
```

And test everything:

In [7]:

```
cases = [generate_case(6, 11, 20)]
solve(dp1, cases, True)
solve(dp2, cases, True)
```

Here we can take a quick look at the cache:

In [8]:

```
def pprint_cache(cache):
N = max(cache.keys())+1
output = np.zeros((N,N)) - np.inf
for n1 in cache:
row = cache[n1]
for n2 in row:
output[n1,n2] = row[n2]
print(output)
```

In [9]:

```
solve(dp2, [generate_case(12, 25, 100)], True)
pprint_cache(cache)
```

Let's see how much branch and bound helps our search. We will run each recursive function on each generated case and get a sense of the time difference between them.

In [10]:

```
def compare(f1, f2, NCD, time_limit=60):
global cache
global lower_bound
times1 = 0
times2 = 0
t0 = time()
while time()-t0 <= time_limit:
case = generate_case(*NCD)
N,C,D = case.inits
tx = time()
cache = {i:-1 for i in range(N+2)}
lower_bound = 0
best_profit1, best_solution1 = f1(case.specs)
ty = time()
cache = {i:-1 for i in range(N+2)}
lower_bound = 0
best_profit2, best_solution2 = f2(case.specs)
tz = time()
time1 = ty - tx
times1 += time1
time2 = tz - ty
times2 += time2
print("Total time for DP1: {:.1f}s".format(times1))
print("Total time for DP2: {:.1f}s".format(times2))
if times1 > times2:
print("DP1 takes {:.1f}x longer on average".format(times1/times2-1))
else:
print("DP2 takes {:.1f}x longer on average".format(times2/times1-1))
```

In [11]:

```
compare(dp1, dp2, (20, 35, 50), time_limit=600)
```

Surprisingly, even though quite lenient, this upper bound proves very effective at speeding up convergence. The upper bound is stricter closer to the final state (there are fewer good machines available) and so prunes more aggressively there. As the graph gets deeper, too, the pruning happens high enough that the speedup over the vanilla implementation is remarkable.

In this tutorial you learned what dynamic programming really is, and how to properly approach a problem that warrants it. You also learned that dynamic programming is not incompatible with other methods and indeed benefits from ones which discard low-quality solutions early.

There are many other ways to solve this problem, for instance brute force search, tree search, or evolutionary algorithms, but none as thorough, direct, and efficient.