Homework 04: Metropolis-Hastings and Cross Validation#
%matplotlib inline
import matplotlib.pyplot as plt
import seaborn as sns; sns.set_theme()
import numpy as np
import pandas as pd
import os.path
import subprocess
from sklearn import model_selection, ensemble
Helpers for Getting, Loading and Locating Data
def wget_data(url: str):
local_path = './tmp_data'
p = subprocess.Popen(["wget", "-nc", "-P", local_path, url], stderr=subprocess.PIPE, encoding='UTF-8')
rc = None
while rc is None:
line = p.stderr.readline().strip('\n')
if len(line) > 0:
print(line)
rc = p.poll()
def locate_data(name, check_exists=True):
local_path='./tmp_data'
path = os.path.join(local_path, name)
if check_exists and not os.path.exists(path):
raise RuxntimeError('No such data file: {}'.format(path))
return path
Problem 1#
In this problem, you will implement a Metropolis update rule to sample a Markov chain for the un-normalized probability density:
with hyperparameter \(s > 0\).
def P(x,y,s):
return 0.5 * (np.exp(-0.5 * ((x / s) ** 2 + (y * s) ** 2)) + np.exp(-0.5 * ((x * s) ** 2 + (y / s) ** 2)))
def plot_P(s=3, lim=5):
fig = plt.figure(figsize=(6, 6))
xy = np.linspace(-lim, +lim, 250)
Pxy = P(xy, xy[:, np.newaxis], s)
plt.contour(xy, xy, Pxy, [0.1, 0.2, 0.3], colors='r')
plot_P()
Implement the function below to perform a Metropolis update starting from \((x,y)\) and using a Gaussian proposal distribution \(Q(x,y)\) centered at \((0,0)\) with standard deviation \(\sigma\) along both coordinates. Use the “random walk” mode for your proposed updates. (Recall that Metropolis updates are a special case of Metropolis-Hastings updates where the ratio of \(Q\) factors cancels in the Hastings ratio.)
def metropolis_update(x, y, s, gen, sigma=1):
"""Perform a single Metropolis update.
Parameters
----------
x : float
Value of x from the previous step.
y : float
Value of y from the previous step.
s : float
Value of the hyperparameter s.
gen : np.random.RandomState
Random state to use for reproducible random samples.
sigma : float
Standard deviation of the Gaussian proposal distribution Q(x,y).
Returns
-------
tuple
Tuple (x,y) of the position after the update.
"""
# YOUR CODE HERE
raise NotImplementedError()
# A correct solution should pass these tests.
s, n = 3, 1000
gen = np.random.RandomState(seed=123)
# Generate steps from (0, 0) with sigma=1
xy = np.array([metropolis_update(0, 0, s, gen, sigma=1) for i in range(n)])
nrepeat = np.count_nonzero(np.all(xy == [0, 0], axis=1))
assert np.allclose(nrepeat / n, 0.69967, atol=0.1, rtol=0.1)
assert np.allclose(np.mean(xy, axis=0), [0, 0], atol=0.1, rtol=0.1)
# Generate steps from (5, 0) with sigma=1
xy = np.array([metropolis_update(5, 0, s, gen, sigma=1) for i in range(n)])
nrepeat = np.count_nonzero(np.all(xy == [5, 0], axis=1))
assert np.allclose(nrepeat / n, 0.70136, atol=0.1, rtol=0.1)
assert np.allclose(np.mean(xy, axis=0), [4.893, 0], atol=0.1, rtol=0.1)
# Generate steps from (1, -1) with sigma=1
xy = np.array([metropolis_update(1, -1, s, gen, sigma=1) for i in range(n)])
nrepeat = np.count_nonzero(np.all(xy == [1, -1], axis=1))
assert np.allclose(nrepeat / n, 0.26665, atol=0.1, rtol=0.1)
assert np.allclose(np.mean(xy, axis=0), [0.822, -0.822], atol=0.1, rtol=0.1)
# Generate steps from (1, -1) with sigma=2
xy = np.array([metropolis_update(1, -1, s, gen, sigma=2) for i in range(n)])
nrepeat = np.count_nonzero(np.all(xy == [1, -1], axis=1))
assert np.allclose(nrepeat / n, 0.43847, atol=0.1, rtol=0.1)
assert np.allclose(np.mean(xy, axis=0), [0.781, -0.781], atol=0.1, rtol=0.1)
Test your solution with the following visualization:
def plot_chain(update_rule, x0=0, y0=0, s=3, n_updates=200, lim=5, seed=123, **kwargs):
gen = np.random.RandomState(seed=seed)
path = [(x0, y0)]
for i in range(n_updates):
path.append(update_rule(*path[-1], s, gen, **kwargs))
plot_P(s, lim)
x, y = np.array(path).T
plt.scatter(x, y, s=10, c='k')
plt.plot(x, y, 'k-', lw=0.5, alpha=0.3)
plt.xlim(-lim, +lim)
plt.ylim(-lim, +lim)
plt.xlabel('$x$')
plt.ylabel('$y$')
plot_chain(metropolis_update, sigma=2.0)
plot_chain(metropolis_update, sigma=1.0)
plot_chain(metropolis_update, sigma=0.5)
Problem 2#
In this problem you will implement a Gibbs update rule for the same probability density.
Implement the function below to sample from the conditional distribution \(P(x\mid y)\). Hint: each sample can be drawn from a single Gaussian with \(\sigma = s\) or \(\sigma = 1/s\) as long as you weight the contributions from each Gaussian correctly given \(y\).
def sample_conditional(y, s, gen):
"""Sample from the conditional distribution P(x | y).
Parameters
----------
y : float
Fixed value of y to use.
s : float
Value of the hyperparameter s.
gen : np.random.RandomState
Random state to use for reproducible random samples.
Returns
-------
float
Random value of x sampled from P(x | y).
"""
# YOUR CODE HERE
raise NotImplementedError()
# A correct solution should pass these tests.
s, n = 3, 10000
gen = np.random.RandomState(seed=123)
# With y=+/-4, the distribution of x should be narrow.
x = [sample_conditional(+4, s, gen) for i in range(n)]
assert np.allclose(np.percentile(x, (5, 50, 95)), [-0.549, 0, +0.549], atol=0.1, rtol=0.1)
x = [sample_conditional(-4, s, gen) for i in range(n)]
assert np.allclose(np.percentile(x, (5, 50, 95)), [-0.549, 0, +0.549], atol=0.1, rtol=0.1)
# With y=0, the distribution of x should have a narrow core and a wide tail.
x = [sample_conditional(0, s, gen) for i in range(n)]
assert np.allclose(np.percentile(x, (5, 25, 50, 75, 95)), [-3.84, -0.50, 0, +0.50, +3.84], atol=0.1, rtol=0.1)
Implement the function below to perform a Gibbs update consisting of:
Sample \(x_{n+1}\) from \(P_X(x\mid y_n)\)
Sample \(y_{n+1}\) from \(P_Y(y\mid x_{n+1})\)
Note that you can use sample_conditional()
for both steps by noticing that \(P_Y(y\mid x)\) equals \(P_X(x\mid y)\) when \(x\) and \(y\) are swapped.
def gibbs_update(x, y, s, gen):
"""Perform a single Gibbs update.
Parameters
----------
x : float
Value of x from the previous step.
y : float
Value of y from the previous step.
s : float
Value of the hyperparameter s.
gen : np.random.RandomState
Random state to use for reproducible random samples.
Returns
-------
tuple
Tuple (x,y) of the position after the update.
"""
# YOUR CODE HERE
raise NotImplementedError()
# A correct solution should pass these tests.
s, n = 3, 10000
gen = np.random.RandomState(seed=123)
# Generate steps from (0,0).
xy = np.array([gibbs_update(0, 0, s, gen) for i in range(n)])
assert np.allclose(
np.percentile(xy[:, 0], (5, 25, 50, 75, 95)),
[-3.849, -0.502, 0, +0.502, +3.849], atol=0.1, rtol=0.1)
assert np.allclose(
np.percentile(xy[:, 1], (5, 25, 50, 75, 95)),
[-2.36, -0.297, 0, +0.297, +2.36], atol=0.1, rtol=0.1)
# Steps from (5,0) have the same distribution.
xy = np.array([gibbs_update(5, 0, s, gen) for i in range(n)])
assert np.allclose(
np.percentile(xy[:, 0], (5, 25, 50, 75, 95)),
[-3.849, -0.502, 0, +0.502, +3.849], atol=0.1, rtol=0.1)
assert np.allclose(
np.percentile(xy[:, 1], (5, 25, 50, 75, 95)),
[-2.36, -0.297, 0, +0.297, +2.36], atol=0.1, rtol=0.1)
# Steps from (0,-5) have a different distribution.
xy = np.array([gibbs_update(0, -5, s, gen) for i in range(n)])
assert np.allclose(
np.percentile(xy[:, 0], (5, 25, 50, 75, 95)),
[-0.548, -0.225, 0, +0.225, +0.548], atol=0.1, rtol=0.1)
assert np.allclose(
np.percentile(xy[:, 1], (5, 25, 50, 75, 95)),
[-3.42, -0.391, 0, +0.391, +3.42], atol=0.1, rtol=0.1)
Test your solution with the following visualization:
plot_chain(gibbs_update)
Problem 3#
If we define the potential energy for a “particle” as:
it has partial derivatives:
with
A Hamiltonian MC simulates the trajectory of a particle using the equations of motion:
where we have set all masses equal to 1 and the temperature \(k_B T = 1\).
Implement the function below to perform a single \(\Delta t\) step according to the equations above:
def HMC_step(x, y, px, py, s, dt):
"""Perform a single HMC dt step.
Parameters
----------
x : float
Current x position.
y : float
Current y position.
px : float
Current x momentum.
py : float
Current y momentum.
s : float
Value of the hyperparameter s.
dt : float
Step size to take.
Returns
-------
tuple
Tuple (x, y, px, py) with particle position and momentum after this step.
"""
# YOUR CODE HERE
raise NotImplementedError()
# A correct solution should pass these tests.
assert np.all(HMC_step(0., 0., 0., 0., 3, 1.) == np.array([0.,0.,0.,0.]))
assert np.all(HMC_step(0., 0., 1., 1., 3, 0.) == np.array([0.,0.,1.,1.]))
assert np.all(HMC_step(0., 0., 1., 1., 3, 1.) == np.array([1.,1.,1.,1.]))
assert np.all(HMC_step(0., 0., 1., 1., 3, 2.) == np.array([2.,2.,1.,1.]))
assert np.all(np.round(HMC_step(1., 1., -1., 1., 3, 1.), 3) == np.array([0.,2.,-5.556,-3.556]))
assert np.all(np.round(HMC_step(0., 1., -1., 1., 3, 1.), 3) == np.array([-1.,2.,-1.,0.786]))
assert np.all(np.round(HMC_step(1., 0., -1., 1., 3, 1.), 3) == np.array([0.,1.,-1.214,1.]))
In order to perform an HMC update, we first need to generate random values of the (nuisance) parameters \(p_x\) and \(p_y\), then we follow the resulting particle from its initial conditions through a fixed number of steps. The result of the update is wherever the particle ends up. Note that the only use of random numbers is to generate the particle’s initial momentum.
def HMC_update(x0, y0, s, gen, p_sigma, n_steps, dt):
"""Perform a single HMC update by following a single particle for a fixed time.
Parameters
----------
x0 : float
Initial x position.
y0 : float
Initial y position.
s : float
Value of the hyperparameter s.
gen : np.random.RandomState
Random state to use for reproducible random samples.
p_sigma : float
Gaussian sigma for generating random initial (px,py) values.
n_steps : int
Number of particle steps to simulate.
dt : float
Size of each step.
Returns
-------
array
Array of shape (n_steps + 1, 4) with the particle position and momentum after each step.
"""
px0, py0 = gen.normal(scale=p_sigma, size=2)
path = [(x0, y0, px0, py0)]
for i in range(n_steps):
path.append(HMC_step(*path[-1], s, dt))
return np.array(path)
Finally, use the following visualization to see the particle trajectories from different updates:
def plot_HMC(x0=0, y0=0, s=3, p_sigma=1., n_steps=500, dt=0.01, n_updates=50, lim=5, seed=3, **kwargs):
gen = np.random.RandomState(seed=seed)
plot_P(s, lim)
for i in range(n_updates):
path = HMC_update(x0, y0, s, gen, p_sigma, n_steps, dt)
x, y, px, py = np.array(path).T
plt.scatter(x[-1], y[-1], s=25, c='k')
plt.plot(x, y, 'b-', lw=1, alpha=0.3)
plt.xlim(-lim, +lim)
plt.ylim(-lim, +lim)
plt.xlabel('$x$')
plt.ylabel('$y$')
plot_HMC()
Problem 4#
The default score function used by sklearn to evaluate how well a regression model predicts data is the coefficient of determination \(R^2\). Implement the function below to calculate \(R^2\):
def calculate_R2(y_data, y_pred):
"""Calculate the coefficient of determination R2 for two arrays.
Parameters
----------
y_data : array
Array of data values, must have the same shape as y_pred.
y_pred : array
Array of predicted values, must have the same shape as y_data.
Returns
-------
float
Calculated coefficient of determination R2.
"""
assert y_data.shape == y_pred.shape
# YOUR CODE HERE
raise NotImplementedError()
# A correct solution should pass the tests below.
gen = np.random.RandomState(seed=123)
N = 100
x = gen.uniform(size=N)
y_pred = 2 * x - 1
y_data = y_pred + gen.normal(scale=0.1, size=N)
assert np.round(calculate_R2(y_data, y_pred), 3) == 0.961
assert np.round(calculate_R2(y_data, -y_pred), 3) == -2.935
assert np.round(calculate_R2(y_pred, y_pred), 3) == 1.000
assert np.round(calculate_R2(y_pred, -y_pred), 3) == -3.000
assert np.round(calculate_R2(y_data, np.full(N, np.mean(y_pred))), 3) == 0.000
Problem 5#
Implement the function below to perform a grid-search cross validation of a random forest regression over the following grid:
min_samples_leaf
= 1, 10, 20n_estimators
= 5, 10, 15
Hint: you will need to ensure reproducible “random” behavior in order to pass all the tests.
def cv_summary(cv):
"""Summarize the results from a GridSearchCV fit.
Summarize a cross-validation grid search in a pandas DataFrame with the
following transformations of the full results:
- Remove all columns with timing measurements.
- Remove the 'param_' prefix from column names.
- Remove the '_score' suffix from column names.
- Round scores to 3 decimal places.
If the parameter grid is 1D, then this function also plots the test
and training R2 scores versus the parameter.
Parameters
----------
cv : sklearn.model_selection.GridSearchCV
Instance of a GridSearchCV object that has been fit to some data.
Returns
-------
pandas.DataFrame
Summary table of cross-validation results.
"""
# Look up the list of parameters used in the grid.
params = list(cv.cv_results_['params'][0].keys())
# Index results by the test score rank.
index = cv.cv_results_['rank_test_score']
df = pd.DataFrame(cv.cv_results_, index=index).drop(columns=['params', 'rank_test_score'])
# Remove columns that measure running time.
df = df.drop(columns=[n for n in df.columns.values if n.endswith('_time')])
# Remove param_ prefix from column names.
df = df.rename(lambda n: n[6:] if n.startswith('param_') else n, axis='columns')
# Remove _score suffix from column names.
df = df.rename(lambda n: n[:-6] if n.endswith('_score') else n, axis='columns')
if len(params) == 1:
# Plot the test and training scores vs the grid parameter when there is only one.
plt.plot(df[params[0]], df['mean_train'], 'o:', label='train')
plt.plot(df[params[0]], df['mean_test'], 'o-', label='test')
plt.legend(fontsize='x-large')
plt.xlabel('Hyperparameter value')
plt.ylabel('Score $R^2$')
plt.ylim(max(-2, np.min(df['mean_test'])), 1)
return df.sort_index().round(3)
def cross_validate(X, Y, gen):
"""Perform cross validation of a random forest regression.
Parameters
----------
X : array
Array with shape (N, DX) of N samples with DX features.
Y : array
Array with shape (N, DY) of N samples with DY features.
gen : np.random.RandomState
Random state to use for reproducible random numbers.
Returns
-------
pandas.DataFrame
Cross-validation summary table produced by cv_summary().
"""
assert len(X) == len(Y)
# YOUR CODE HERE
raise NotImplementedError()
wget_data('https://raw.githubusercontent.com/illinois-mlp/MachineLearningForPhysics/main/data/spectra_data.hf5')
wget_data('https://raw.githubusercontent.com/illinois-mlp/MachineLearningForPhysics/main/data/spectra_targets.hf5')
X = pd.read_hdf(locate_data('spectra_data.hf5')).values
Y = pd.read_hdf(locate_data('spectra_targets.hf5')).values
# A correct solution should pass the tests below.
gen = np.random.RandomState(seed=123)
cvs = cross_validate(X, Y, gen)
assert np.all(cvs.columns.values == [
'min_samples_leaf', 'n_estimators', 'split0_test', 'split1_test',
'mean_test', 'std_test', 'split0_train', 'split1_train', 'mean_train',
'std_train'])
assert np.all(cvs['min_samples_leaf'].values == [1, 1, 1, 10, 10, 10, 20, 20, 20])
assert np.all(cvs['n_estimators'].values == [15, 10, 5, 15, 10, 5, 15, 10, 5])
# Works on Google Colab (Feb '25)
#
assert np.allclose(
cvs['mean_test'].values,
[0.961, 0.955, 0.942, 0.896, 0.891, 0.879, 0.496, 0.490, 0.480], atol=1e-3)
# Note: I have seen some platform dependence to the numerics in the above assess. Specifically, cvs['mean_test'] gives something different on my mac and I have to use the one below for the assert to pass.
#
#assert np.allclose(
# cvs['mean_test'].values,
# [0.960, 0.953, 0.938, 0.893, 0.886, 0.866, 0.493, 0.485, 0.464], atol=1e-3)
# If you run into trouble like this, include the line for the grader:
#print(cvs['mean_test'])
assert np.allclose(
cvs['mean_train'].values,
[0.993, 0.992, 0.990, 0.909, 0.908, 0.905, 0.512, 0.515, 0.507], atol=1e-3)
Acknowledgments#
Initial version: Mark Neubauer
© Copyright 2025