Homework 02: Probability Theory and Density Estimation#

%matplotlib inline
import matplotlib.pyplot as plt
import seaborn as sns; sns.set()
import numpy as np
import pandas as pd
import os.path
import subprocess
import scipy.stats
from sklearn import neighbors, cluster

Helpers for Getting, Loading and Locating Data

def wget_data(url: str):
    local_path = './tmp_data'
    p = subprocess.Popen(["wget", "-nc", "-P", local_path, url], stderr=subprocess.PIPE, encoding='UTF-8')
    rc = None
    while rc is None:
      line = p.stderr.readline().strip('\n')
      if len(line) > 0:
        print(line)
      rc = p.poll()

def locate_data(name, check_exists=True):
    local_path='./tmp_data'
    path = os.path.join(local_path, name)
    if check_exists and not os.path.exists(path):
        raise RuxntimeError('No such data file: {}'.format(path))
    return path

Problem 1#

Implement the function below to calculate the event probabilities \(P(A)\), \(P(B)\), \(P(A \cap B)\) and the conditional probabilities \(P(A\mid B)\), \(P(B\mid A)\) for an arbitrary (finite) probability space specified by each outcome’s probability. Hint: the probability of an event containing a set of outcomes is just the sum of the individual outcome probabilities.

def calculate_probabilities(p, A, B):
    """Calculate probabilities for an arbitrary probability space.
    
    Parameters
    ----------
    p : float array of shape (N,)
        Probabilities for each of the N possible outcomes in the probability space.
    A : boolean array of shape (N,)
        Identifies members of event set A in the probability space.
    B : boolean array of shape (N,)
        Identifies members of event set B in the probability space.
        
    Returns
    -------
    tuple
        Tuple of five probabilities values:
        P(A), P(B), P(A instersect B), P(A | B), P(B | A).
    """
    assert np.all((p >= 0) & (p <= 1))
    assert np.sum(p) == 1

    # YOUR CODE HERE
    raise NotImplementedError()
# A correction solution should pass the tests below.
gen = np.random.RandomState(seed=123)
N = 100
p = gen.uniform(size=(4, N))
p = (p / p.sum(axis=1).reshape(-1, 1)).reshape(-1) / 4.

# Test when A and B are "independent" events, i.e., P(A interset B) = P(A) P(B).
A = np.arange(4 * N) < 2 * N
B = (np.arange(4 * N) >= N) & (np.arange(4 * N) < 3 * N)
assert np.allclose(
    np.round(calculate_probabilities(p, A, B), 3),
    [0.5, 0.5, 0.25, 0.5, 0.5])

# Test with randomly generated events A, B.
A = gen.uniform(size=4*N) < 0.3
B = gen.uniform(size=4*N) > 0.6
#print(np.round(event_probabilities(p, A, B), 3))
assert np.allclose(
    np.round(calculate_probabilities(p, A, B), 3),
    [0.278, 0.33, 0.076, 0.23, 0.273])

Problem 2#

The cumulative distribution function (CDF) is the fundamental representation of a random variable, rather than the probability density function (PDF) which might not be defined, is not a probability and generally has dimensions. In this problem, you will explore a practical application of the CDF for generating random numbers.

Since the CDF \(y = F_X(x)\) maps from random variable values to the range \([0,1]\), its inverse \(x = F_X^{-1}(y)\) maps from \([0,1]\) back to the random variable. What distribution of \(y\) values would generate values according to the PDF \(f_X(x)\) when transformed by the inverse \(F_X^{-1}(y)\)? The answer is a uniform distribution, as we can demonstrate numerically for an arbitrary random variable:

def cdf_hist(X, n=10000, seed=123):
    gen = np.random.RandomState(seed=seed)
    # Generate n random value from the scipy.stats distribution X.
    x = X.rvs(n, random_state=gen)
    plt.hist(x, bins=50, label='$f_X(x)$', histtype='step', lw=2, density=True)
    # Histogram the corresponding CDF values.
    y = X.cdf(x)
    plt.hist(y, bins=20, label='$F_X(x)$', alpha=0.5, density=True)
    plt.xlabel('x')
    plt.legend(loc='upper center', ncol=2)
    
cdf_hist(scipy.stats.beta(0.9, 1.5))

When the function \(F_X(x)\) can be inverted analytically, you can use it to transform uniformly generated random values into a random sampling of the PDF \(f_X(x)\).

For example, consider random outcomes consisting of \((x,y)\) points uniformly distributed on the disk, $\( 0 \le r_1 \le \sqrt{x^2 + y^2} \le r_2 \; . \)\( The CDF of the random variable \)r \equiv \sqrt{x^2 + y^2}\( is then \)\( F_R(r) = \begin{cases} 1 & r > r_2 \\ \frac{r^2 - r_1^2}{r_2^2 - r_1^2} & r_1 \le r \le r_2 \\ 0 & r < r_1 \end{cases}\; . \)\( Implement the function below to apply \)F_R^{-1}(y)\( to uniformly distributed random values in order to sample \)f_R(x)$:

def sample_disk(r1, r2, n, gen):
    """Sample random radii for points uniformly distributed on a disk.
    
    Parameters
    ----------
    r1 : float
        Inner radius of disk.
    r2 : float
        Outer radius of disk.
    n : int
        Number of random samples to generate.
    gen : np.random.RandomState
        Random state for reproducible random numbers.
        Uses gen.uniform() internally, not gen.rand().
        
    Returns
    -------
    array
        Array of n randomly generated r values.
    """
    assert (r1 >= 0) and (r1 < r2)

    # YOUR CODE HERE
    raise NotImplementedError()
# A correct solution should pass these tests.
r1, r2, n = 1., 2., 1000
gen = np.random.RandomState(seed=123)
r = sample_disk(r1, r2, n, gen)
assert np.all((r >= r1) & (r <= r2))
assert np.allclose(np.round(np.mean(r), 3), 1.556)
assert np.allclose(np.round(np.std(r), 3), 0.279)

r1, r2, n = 0., 2., 1000
r = sample_disk(r1, r2, n, gen)
assert np.all((r >= r1) & (r <= r2))
assert np.allclose(np.round(np.mean(r), 3), 1.325)
assert np.allclose(np.round(np.std(r), 3), 0.494)

Test your implementation by plotting some \((x,y)\) points with uniformly random \(0 \le \theta < 2\pi\):

gen = np.random.RandomState(seed=123)
r = sample_disk(0.8, 1.2, 1000, gen)
theta = gen.uniform(0, 2 * np.pi, size=len(r))
plt.scatter(r * np.cos(theta), r * np.sin(theta), s=5)
plt.gca().set_aspect(1)

Sometimes \(F_X(x)\) cannot be inverted explicitly, either because the inverse has no closed form or because the underlying distribution is arbitrary. In these cases, we can still apply the same method numerically.

Implement the function below to tabulate an empirical estimate of the CDF for an arbitrary random variable, as: $\( x_{CDF} = x_{\text{lo}}, x_0, x_1, \ldots, x_{N-1}, x_{\text{hi}} \; , \)\( where the \)x_i\( are [sorted](https://docs.scipy.org/doc/numpy/reference/generated/numpy.interp.html), \)x_0 \le x_1 \le \ldots \le x_{N-1}\(, and corresponding CDF values: \)\( y_{CDF} = 0, \frac{1}{N+1}, \frac{2}{N+1}, \ldots, \frac{N}{N+1}, 1 \; . \)$

def empirical_cdf(x, xlo, xhi):
    """Tabulate the empirical CDF from samples of an arbitrary random variable.
    
    Parameters
    ----------
    x : array of shape (N,)
        Array of input random variable values to use.
    xlo : float
        Low limit for the random variable x.
    xhi : float
        High limit for the random variable x.
        
    Returns
    -------
    tuple
        Tuple (x_cdf, y_cdf) of arrays both of shape (N+2,), padded at each end
        as described above.
    """
    assert xlo < xhi
    x = np.asarray(x)
    assert np.all((x >= xlo) & (x <= xhi))

    # YOUR CODE HERE
    raise NotImplementedError()
# A correct solution should pass these tests.
x_cdf, y_cdf = empirical_cdf([1, 2, 3, 4], 0, 5)
assert np.array_equal(x_cdf, [0, 1, 2, 3, 4, 5])
assert np.allclose(y_cdf, [0., .2, .4, .6, .8, 1.])

x_cdf, y_cdf = empirical_cdf([4, 2, 1, 3], 0, 5)
assert np.array_equal(x_cdf, [0, 1, 2, 3, 4, 5])
assert np.allclose(y_cdf, [0., .2, .4, .6, .8, 1.])

gen = np.random.RandomState(seed=123)
x = scipy.stats.beta(0.9, 1.5).rvs(size=4, random_state=gen)
x_cdf, y_cdf = empirical_cdf(x, 0., 1.)
assert np.allclose(
    np.round(x_cdf, 3),
    [ 0.   ,  0.087,  0.152,  0.42 ,  0.721,  1.   ])
assert np.allclose(y_cdf, [0., .2, .4, .6, .8, 1.])

Test your implementation by generating CDF samples matched to an unknown distribution. Note that we use linear interpolation to numerically invert the empirical CDF in this approach, which is a useful trick to remember:

n = 5000
gen = np.random.RandomState(seed=123)
X = scipy.stats.beta(0.9, 1.5)
# Generate samples using scipy.stats
x_in = X.rvs(n, random_state=gen)
plt.hist(x_in, bins=50, label='Input data', alpha=0.5, density=True)
# Generate samples using the empirical CDF of x_in
x_cdf, y_cdf = empirical_cdf(x_in, 0., 1.)
y = gen.uniform(size=n)
x = np.interp(y, y_cdf, x_cdf)
plt.hist(x, bins=50, label='CDF samples', histtype='step', lw=2, density=True)
plt.legend();

Problem 3#

Consider a sequence of \(n\) Bernoulli trials with success probabilty \(p\) per trial. A string of consecutive successes is known as a success run. Write a function that returns the counts for runs of length \(k\) for each \(k\) observed in a python dictionary.

For example: if the trials were [0, 1, 0, 1, 1, 0, 0, 0, 0, 1], the function should return {1: 2, 2: 1}

from collections import Counter

def count_runs(xs):
    """Count number of success runs of length k.
    
    Parameters
    ----------
    xs : array of shape (1, nx)
        Vector of Bernoulli trials
        
    Returns
    -------
    dict
        Returns a dictionary the counts (val) for runs of length k (key) for each k observed
    """

    # YOUR CODE HERE
    raise NotImplementedError()
cnt = count_runs([0, 1, 0, 1, 1, 0, 0, 0, 0, 1],)
assert [cnt[1],cnt[2]] == [2,1]

cnt = count_runs([0, 1, 0, 1, 1, 0, 0, 0, 0, 1, 1, 1],)
assert [cnt[1],cnt[2],cnt[3]] == [1,1,1]

rng = np.random.default_rng(seed=1234)
cnt = count_runs(rng.integers(2,size=1000))
assert [cnt[1],cnt[2],cnt[3],cnt[4],cnt[5],cnt[6],cnt[7],cnt[8],cnt[9]] == [127,60,26,24,6,4,1,1,1]

Using count_runs above, write a method run_prob that returns the probability of observing at least one run of length k or more from n trials with success probabilty p per trial. This probability is estimated from expts simulated experinents.

def run_prob(expts, n, k, p, seed=1234):
    """Calculate the probability of observing at least one run of length `k` or more from `n` trials with success probabilty `p` per trial. This probability is estimated from `expts` simulated experinents.
    
    Parameters
    ----------
    expts : int
        Number of simulated experiments
    k: int
        Number of consecutive successes defining a success run
    n: int
        Number of trials per experiment
    p: float
        Success probability per trial
    seed : int
        Random number seed to use.    
        
    Returns
    -------
    float
        Returns the probability of observing at least one run of length `k` or more
    """

    # Initialize random generator. Use this generator in your code below
    rng = np.random.default_rng(seed=seed)

    # YOUR CODE HERE
    raise NotImplementedError()

Determine the probability of observing at least one run of length \(k\)=5 or more when \(n\)=100 and \(p\)=0.5. Estimate this probability from 100,000 simulated experiments.

prob = run_prob(expts=100000, n=100, k=5, p=0.5)
print (np.round(prob*100,1),'%')
assert np.allclose(prob, 0.81044, rtol=0.001, atol=0.1)

Determine the probability of observing at least one run of length \(k\)=7 or more when \(n\)=100 and \(p\)=0.7. Estimate this probability from 100,000 simulated experiments.

prob = run_prob(expts=100000, n=100, k=7, p=0.7)
print (np.round(prob*100,1),'%')
assert np.allclose(prob, 0.9489, rtol=0.001, atol=0.1)

Problem 4#

In this problem you will implement the core of the E- and M-steps for the Gaussian mixture model (GMM) method. Note the similarities with the E- and M-steps of the K-means method.

First, implement the function below to evaluate the multidimensional Gaussian probability density for arbitrary mean \(\vec{\mu}\) and covariance matrix \(C\) (refer to the lecture for more details on the notation used here): $\( G(\vec{x} ; \vec{\mu}, C) = \left(2\pi\right)^{-D/2}\,\left| C\right|^{-1/2}\, \exp\left[ -\frac{1}{2} \left(\vec{x} - \vec{\mu}\right)^T C^{-1} \left(\vec{x} - \vec{\mu}\right) \right] \)$

def Gaussian_pdf(x, mu, C):
    """Evaluate the Gaussian probability density.
    
    Parameters
    ----------
    x : array
        1D array of D feature values for a single sample
    mu : array
        1D array of D mean feature values for this component.
    C : array
        2D array with shape (D, D) of covariance matrix elements for this component.
        Must be positive definite (and therefore symmetric).
        
    Returns
    -------
    float
        Probability density.
    """
    x = np.asarray(x)
    mu = np.asarray(mu)
    C = np.asarray(C)
    D = len(x)
    assert x.shape == (D,) and mu.shape == (D,)
    assert C.shape == (D, D)
    assert np.allclose(C.T, C)

    # YOUR CODE HERE
    raise NotImplementedError()
# A correct solution should pass these tests.
assert np.allclose(Gaussian_pdf([0], [0], [[1]]), 1 / np.sqrt(2 * np.pi))
assert np.allclose(Gaussian_pdf([1], [1], [[1]]), 1 / np.sqrt(2 * np.pi))
assert np.allclose(Gaussian_pdf([0], [0], [[2]]), 1 / np.sqrt(4 * np.pi))
assert np.allclose(Gaussian_pdf([1], [0], [[1]]), np.exp(-0.5) / np.sqrt(2 * np.pi))

assert np.allclose(Gaussian_pdf([0, 0], [0, 0], [[1, 0], [0, 1]]), 1 / (2 * np.pi))
assert np.allclose(Gaussian_pdf([1, 0], [1, 0], [[1, 0], [0, 1]]), 1 / (2 * np.pi))
assert np.allclose(Gaussian_pdf([1, -1], [1, -1], [[1, 0], [0, 1]]), 1 / (2 * np.pi))
assert np.allclose(Gaussian_pdf([1, 0], [1, 0], [[4, 0], [0, 1]]), 1 / (4 * np.pi))

assert np.round(Gaussian_pdf([0, 0], [1, 0], [[4, +1], [+1, 1]]), 5) == 0.07778
assert np.round(Gaussian_pdf([0, 0], [1, 0], [[4, -1], [-1, 1]]), 5) == 0.07778
assert np.round(np.log(Gaussian_pdf([1, 0, -1], [1, 2, 3], [[4, -1, 0], [-1, 1, 0], [0, 0, 2]])), 5) == -10.31936

Next, implement the E-step in the function below. This consists of calculating the relative probability that each sample \(\vec{x}_n\) (\(n\)-th row of \(X\)) belongs to each component \(k\): $\( p_{nk} = \frac{\omega_k G(\vec{x}_n; \vec{\mu}_k, C_k)} {\sum_{j=1}^K\, \omega_j G(\vec{x}_n; \vec{\mu}_j, C_j)} \)\( Note that these relative probabilities (also called *responsibilities*) sum to one over components \)k\( for each sample \)n\(. Also note that we consider the parameters (\)\omega_k\(, \)\vec{\mu}_k\(, \)C_k$) of each component fixed during this step. Hint: use your Gaussian_pdf function here.

def E_step(X, w, mu, C):
    """Perform a GMM E-step.
    
    Parameters
    ----------
    X : array with shape (N, D)
        Input data consisting of N samples in D dimensions.
    w : array with shape (K,)
        Per-component weights.
    mu : array with shape (K, D)
        Array of mean vectors for each component.
    C : array with shape (K, D, D).
        Array of covariance matrices for each component.
    
    Returns
    -------
    array with shape (K, N)
        Array of relative probabilities that each sample belongs to
        each component, normalized so that the per-component probabilities
        for each sample sum to one.
    """
    N, D = X.shape
    K = len(w)
    assert w.shape == (K,)
    assert mu.shape == (K, D)
    assert C.shape == (K, D, D)

    # YOUR CODE HERE
    raise NotImplementedError()
# A correct solution should pass these tests.
X = np.linspace(-1, 1, 5).reshape(-1, 1)
w = np.full(4, 0.25)
mu = np.array([[-2], [-1], [0], [1]])
C = np.ones((4, 1, 1))
#print(repr(np.round(E_step(X, w, mu, C), 3)))
assert np.all(
    np.round(E_step(X, w, mu, C), 3) ==
    [[ 0.258,  0.134,  0.058,  0.021,  0.006],
     [ 0.426,  0.366,  0.258,  0.152,  0.077],
     [ 0.258,  0.366,  0.426,  0.414,  0.346],
     [ 0.058,  0.134,  0.258,  0.414,  0.57 ]])

X = np.zeros((1, 3))
w = np.ones((2,))
mu = np.zeros((2, 3))
C = np.zeros((2, 3, 3))
diag = range(3)
C[:, diag, diag] = 1
#print(repr(np.round(E_step(X, w, mu, C), 3)))
assert np.all(
    np.round(E_step(X, w, mu, C), 3) ==
    [[ 0.5], [ 0.5]])

X = np.array([[0,0,0], [1,0,0]])
mu = np.array([[0,0,0], [1,0,0]])
#print(repr(np.round(E_step(X, w, mu, C), 3)))
assert np.all(
    np.round(E_step(X, w, mu, C), 3) ==
    [[ 0.622,  0.378], [ 0.378,  0.622]])

gen = np.random.RandomState(seed=123)
K, N, D = 4, 1000, 5
X = gen.normal(size=(N, D))
subsample = X.reshape(K, (N//K), D)
mu = subsample.mean(axis=1)
C = np.empty((K, D, D))
w = gen.uniform(size=K)
w /= w.sum()
for k in range(K):
    C[k] = np.cov(subsample[k], rowvar=False)
#print(repr(np.round(E_step(X, w, mu, C)[:, :5], 3)))
assert np.all(
    np.round(E_step(X, w, mu, C)[:, :5], 3) ==
    [[ 0.422,  0.587,  0.344,  0.279,  0.19 ],
     [ 0.234,  0.11 ,  0.269,  0.187,  0.415],
     [ 0.291,  0.194,  0.309,  0.414,  0.279],
     [ 0.053,  0.109,  0.077,  0.12 ,  0.116]])

Finally, implement the M-step in the function below. During this step, we consider the relative weights \(p_{nk}\) from the previous step fixed and instead update the parameters of each component (which were fixed in the previous step), using: $\( \begin{aligned} \omega_k &= \frac{1}{N}\, \sum_{n=1}^N\, p_{nk} \\ \vec{\mu}_k &= \frac{\sum_{n=1}^N\, p_{nk} \vec{x}_n}{\sum_{n=1}^N\, p_{nk}} \\ C_k &= \frac{\sum_{n=1}^N\, p_{nk} \left( \vec{x}_n - \vec{\mu}_k\right) \left( \vec{x}_n - \vec{\mu}_k\right)^T} {\sum_{n=1}^N\, p_{nk}} \end{aligned} \)$ Make sure you understand why the last expression yields a matrix rather than a scalar dot product before jumping into the code. (If you would like a numpy challenge, try implementing this function without any loops, e.g., with np.einsum)

def M_step(X, p):
    """Perform a GMM M-step.
    
    Parameters
    ----------
    X : array with shape (N, D)
        Input data consisting of N samples in D dimensions.
    p : array with shape (K, N)
        Array of relative probabilities that each sample belongs to
        each component, normalized so that the per-component probabilities
        for each sample sum to one.
    
    Returns
    -------
    tuple
        Tuple w, mu, C of arrays with shapes (K,), (K, D) and (K, D, D) giving
        the updated component parameters.
    """
    N, D = X.shape
    K = len(p)
    assert p.shape == (K, N)
    assert np.allclose(p.sum(axis=0), 1)

    # YOUR CODE HERE
    raise NotImplementedError()
# A correct solution should pass these tests.
X = np.linspace(-1, 1, 5).reshape(-1, 1)
p = np.full(20, 0.25).reshape(4, 5)
w, mu, C = M_step(X, p)
#print(repr(np.round(w, 5)))
#print(repr(np.round(mu, 5)))
#print(repr(np.round(C, 5)))
assert np.all(np.round(w, 5) == 0.25)
assert np.all(np.round(mu, 5) == 0.0)
assert np.all(np.round(C, 5) == 0.5)

gen = np.random.RandomState(seed=123)
K, N, D = 4, 1000, 5
X = gen.normal(size=(N, D))
p = gen.uniform(size=(K, N))
p /= p.sum(axis=0)
w, mu, C = M_step(X, p)
#print(repr(np.round(w, 5)))
#print(repr(np.round(mu, 5)))
#print(repr(np.round(C[0], 5)))
assert np.all(
    np.round(w, 5) == [ 0.25216,  0.24961,  0.24595,  0.25229])
assert np.all(
    np.round(mu, 5) ==
    [[ 0.06606,  0.06   , -0.00413,  0.01562,  0.00258],
     [ 0.02838,  0.01299,  0.01286,  0.03068, -0.01714],
     [ 0.03157,  0.04558, -0.01206,  0.03493, -0.0326 ],
     [ 0.05467,  0.06293, -0.01779,  0.04454,  0.00065]])
assert np.all(
    np.round(C[0], 5) ==
    [[ 0.98578,  0.01419, -0.03717,  0.01403,  0.0085 ],
     [ 0.01419,  0.95534, -0.02724,  0.03201, -0.00648],
     [-0.03717, -0.02724,  0.90722,  0.00313,  0.0299 ],
     [ 0.01403,  0.03201,  0.00313,  1.02891,  0.0813 ],
     [ 0.0085 , -0.00648,  0.0299 ,  0.0813 ,  0.922  ]])

You have now implemented the core of the GMM algorithm. Next you will use KMeans as a means of seeding the GMM model fit. First we include two helpful functions draw_ellipses and GMM_parplot to help display results. The details of these methods are not important.

from matplotlib.colors import colorConverter, ListedColormap
from matplotlib.collections import EllipseCollection

def draw_ellipses(w, mu, C, nsigmas=2, color='red', outline=None, filled=True, axis=None):
    """Draw a collection of ellipses.

    Uses the low-level EllipseCollection to efficiently draw a large number
    of ellipses. Useful to visualize the results of a GMM fit via
    GMM_pairplot() defined below.

    Parameters
    ----------
    w : array
        1D array of K relative weights for each ellipse. Must sum to one.
        Ellipses with smaller weights are rendered with greater transparency
        when filled is True.
    mu : array
        Array of shape (K, 2) giving the 2-dimensional centroids of
        each ellipse.
    C : array
        Array of shape (K, 2, 2) giving the 2 x 2 covariance matrix for
        each ellipse.
    nsigmas : float
        Number of sigmas to use for scaling ellipse area to a confidence level.
    color : matplotlib color spec
        Color to use for the ellipse edge (and fill when filled is True).
    outline : None or matplotlib color spec
        Color to use to outline the ellipse edge, or no outline when None.
    filled : bool
        Fill ellipses with color when True, adjusting transparency to
        indicate relative weights.
    axis : matplotlib axis or None
        Plot axis where the ellipse collection should be drawn. Uses the
        current default axis when None.
    """
    # Calculate the ellipse angles and bounding boxes using SVD.
    U, s, _ = np.linalg.svd(C)
    angles = np.degrees(np.arctan2(U[:, 1, 0], U[:, 0, 0]))
    widths, heights = 2 * nsigmas * np.sqrt(s.T)
    # Initialize colors.
    color = colorConverter.to_rgba(color)
    if filled:
        # Use transparency to indicate relative weights.
        ec = np.tile([color], (len(w), 1))
        ec[:, -1] *= w
        fc = np.tile([color], (len(w), 1))
        fc[:, -1] *= w ** 2
    # Data limits must already be defined for axis.transData to be valid.
    axis = axis or plt.gca()
    if outline is not None:
        axis.add_collection(EllipseCollection(
            widths, heights, angles, units='xy', offsets=mu, linewidths=4,
            transOffset=axis.transData, facecolors='none', edgecolors=outline))
    if filled:
        axis.add_collection(EllipseCollection(
            widths, heights, angles, units='xy', offsets=mu, linewidths=2,
            transOffset=axis.transData, facecolors=fc, edgecolors=ec))
    else:
        axis.add_collection(EllipseCollection(
            widths, heights, angles, units='xy', offsets=mu, linewidths=2.5,
            transOffset=axis.transData, facecolors='none', edgecolors=color))
def GMM_pairplot(data, w, mu, C, limits=None, entropy=False):
    """Display 2D projections of a Gaussian mixture model fit.

    Parameters
    ----------
    data : pandas DataFrame
        N samples of D-dimensional data.
    w : array
        1D array of K relative weights for each ellipse. Must sum to one.
    mu : array
        Array of shape (K, 2) giving the 2-dimensional centroids of
        each ellipse.
    C : array
        Array of shape (K, 2, 2) giving the 2 x 2 covariance matrix for
        each ellipse.
    limits : array or None
        Array of shape (D, 2) giving [lo,hi] plot limits for each of the
        D dimensions. Limits are determined by the data scatter when None.
    """
    colnames = data.columns.values
    X = data.values
    N, D = X.shape
    if entropy:
        n_components = len(w)
        # Pick good colors to distinguish the different clusters.
        cmap = ListedColormap(
            sns.color_palette('husl', n_components).as_hex())
        # Calculate the relative probability that each sample belongs to each cluster.
        # This is equivalent to fit.predict_proba(X)
        lnprob = np.zeros((n_components, N))
        for k in range(n_components):
            lnprob[k] = scipy.stats.multivariate_normal.logpdf(X, mu[k], C[k])
        lnprob += np.log(w)[:, np.newaxis]
        prob = np.exp(lnprob)
        prob /= prob.sum(axis=0)
        prob = prob.T
        # Assign each sample to its most probable cluster.
        labels = np.argmax(prob, axis=1)
        color = cmap(labels)
        if n_components > 1:
            # Calculate the relative entropy (0-1) as a measure of cluster assignment ambiguity.
            relative_entropy = -np.sum(prob * np.log(prob), axis=1) / np.log(n_components)
            color[:, :3] *= (1 - relative_entropy).reshape(-1, 1)        
    # Build a pairplot of the results.
    fs = 5 * min(D - 1, 3)
    fig, axes = plt.subplots(D - 1, D - 1, sharex='col', sharey='row',
                             squeeze=False, figsize=(fs, fs))
    for i in range(1, D):
        for j in range(D - 1):
            ax = axes[i - 1, j]
            if j >= i:
                ax.axis('off')
                continue
            # Plot the data in this projection.
            if entropy:
                ax.scatter(X[:, j], X[:, i], s=5, c=color, cmap=cmap)
                draw_ellipses(
                    w, mu[:, [j, i]], C[:, [[j], [i]], [[j, i]]],
                    color='w', outline='k', filled=False, axis=ax)
            else:
                ax.scatter(X[:, j], X[:, i], s=10, alpha=0.3, c='k', lw=0)
                draw_ellipses(
                    w, mu[:, [j, i]], C[:, [[j], [i]], [[j, i]]],
                    color='red', outline=None, filled=True, axis=ax)
            # Overlay the fit components in this projection.
            # Add axis labels and optional limits.
            if j == 0:
                ax.set_ylabel(colnames[i])
                if limits: ax.set_ylim(limits[i])
            if i == D - 1:
                ax.set_xlabel(colnames[j])
                if limits: ax.set_xlim(limits[j])
    plt.subplots_adjust(hspace=0.02, wspace=0.02)

Here is a simple wrapper that uses KMeans to initialize the relative probabilities to all be either zero or one, based on each sample’s cluster assignment:

def GMM_fit(data, n_components, nsteps, init='random', seed=123):
    X = data.values
    N, D = X.shape
    gen = np.random.RandomState(seed=seed)
    p = np.zeros((n_components, N))
    if init == 'kmeans':
        # Use KMeans to divide the data into clusters.
        fit = cluster.KMeans(n_clusters=n_components, random_state=gen, n_init=10).fit(data)
        # Initialize the relative weights using cluster membership.
        # The initial weights are therefore all either 0 or 1.
        for k in range(n_components):
            p[k, fit.labels_ == k] = 1
    else:
        # Assign initial relative weights in quantiles of the first feature.
        # This is not a good initialization strategy, but shows how well
        # GMM converges from a poor starting point.
        x0 = X[:, 0]
        edges = np.percentile(x0, np.linspace(0, 100, n_components + 1))
        for k in range(n_components):
            quantile = (edges[k] <= x0) & (x0 <= edges[k + 1])
            p[k, quantile] = 1.
    # Normalize relative weights.
    p /= p.sum(axis=0)
    # Perform an initial M step to initialize the component params.
    w, mu, C = M_step(X, p)
    # Loop over iterations.
    for i in range(nsteps):
        p = E_step(X, w, mu, C)
        w, mu, C = M_step(X, p)
    # Plot the results.
    GMM_pairplot(data, w, mu, C)

Try this out on the 3D blobs_data and notice that it converges close to the correct solution after 8 iterations:

wget_data('https://raw.githubusercontent.com/illinois-mlp/MachineLearningForPhysics/main/data/blobs_data.hf5')
blobs_data = pd.read_hdf(locate_data('blobs_data.hf5'))
GMM_fit(blobs_data, 3, nsteps=0)
GMM_fit(blobs_data, 3, nsteps=4)
GMM_fit(blobs_data, 3, nsteps=8)

Convergence is even faster if you use KMeans to initialize the relative weights (which is why most implementations do this):

GMM_fit(blobs_data, 3, nsteps=0, init='kmeans')
GMM_fit(blobs_data, 3, nsteps=1, init='kmeans')

Problem 5#

A density estimator should provide a probability density function \(P(\vec{x})\) that is normalized over its feature space \(\vec{x}\) $\( \int d\vec{x}\, P(\vec{x}) = 1 \; . \)$ In this problem you will verify this normalization for KDE using two different numerical approaches for the integral.

First, implement the function below to accept a 1D KDE fit object and estimate its normalization integral using the trapezoid rule with the specified grid. Hint: the np.trapz function will be useful.

def check_grid_normalization(fit, xlo, xhi, ngrid):
    """Check 1D denstity estimator fit result normlization using grid quadrature.
    
    Parameters
    ----------
    fit : neighbors.KernelDensity fit object
        Result of fit to 1D dataset.
    xlo : float
        Low edge of 1D integration range.
    xhi : float
        High edge of 1D integration range.
    ngrid : int
        Number of equally spaced grid points covering [xlo, xhi],
        including both end points.
    """
    # YOUR CODE HERE
    raise NotImplementedError()
# A correct solution should pass these tests.
fit = neighbors.KernelDensity(kernel='gaussian', bandwidth=0.1).fit(blobs_data.drop(columns=['x1', 'x2']).values)
assert np.round(check_grid_normalization(fit, 0, 15, 5), 3) == 1.351
assert np.round(check_grid_normalization(fit, 0, 15, 10), 3) == 1.019
assert np.round(check_grid_normalization(fit, 0, 15, 20), 3) == 0.986
assert np.round(check_grid_normalization(fit, 0, 15, 100), 3) == 1.000

fit = neighbors.KernelDensity(kernel='gaussian', bandwidth=0.1).fit(blobs_data.drop(columns=['x0', 'x2']).values)
assert np.round(check_grid_normalization(fit, -4, 12, 5), 3) == 1.108
assert np.round(check_grid_normalization(fit, -4, 12, 10), 3) == 0.993
assert np.round(check_grid_normalization(fit, -4, 12, 20), 3) == 0.971
assert np.round(check_grid_normalization(fit, -4, 12, 100), 3) == 1.000

fit = neighbors.KernelDensity(kernel='gaussian', bandwidth=0.1).fit(blobs_data.drop(columns=['x0', 'x1']).values)
assert np.round(check_grid_normalization(fit, 2, 18, 5), 3) == 1.311
assert np.round(check_grid_normalization(fit, 2, 18, 10), 3) == 0.954
assert np.round(check_grid_normalization(fit, 2, 18, 20), 3) == 1.028
assert np.round(check_grid_normalization(fit, 2, 18, 100), 3) == 1.000

Next, implement the function below to estimate a multidimensional fit normalization using Monte Carlo integration: $\( \int d\vec{x}\, P(\vec{x}) \simeq \frac{V}{N_{mc}}\, \sum_{j=1}^{N_{mc}} P(\vec{x}_j) = V \langle P\rangle \; , \)\( where the \)\vec{x}_j\( are uniformly distributed over the integration domain and \)V\( is the integration domain volume. Note that `trapz` gives more accurate results for a fixed number of \)P(\vec{x})$ evaluations, but MC integration is much easier to generalize to higher dimensions.

def check_mc_normalization(fit, xlo, xhi, nmc, seed=123):
    """Check denstity estimator fit result normlization using MC integration.
    
    Parameters
    ----------
    fit : neighbors.KernelDensity fit object
        Result of fit to arbitrary dataset of dimension D.
    xlo : array
        1D array of length D with low limits of integration domain along each dimension.
    xhi : array
        1D array of length D with high limits of integration domain along each dimension.
    nmc : int
        Number of random MC integration points within the domain to use.
    """
    xlo = np.asarray(xlo)
    xhi = np.asarray(xhi)
    assert xlo.shape == xhi.shape
    assert np.all(xhi > xlo)
    D = len(xlo)
    gen = np.random.RandomState(seed=seed)
    # Use gen.uniform() in your solution, not gen.rand(), for consistent random numbers.

    # YOUR CODE HERE
    raise NotImplementedError()
##### A correct solution should pass these tests.
fit = neighbors.KernelDensity(kernel='gaussian', bandwidth=0.1).fit(blobs_data.drop(columns=['x1', 'x2']).values)
assert np.round(check_mc_normalization(fit, [0], [15], 10), 3) == 1.129
assert np.round(check_mc_normalization(fit, [0], [15], 100), 3) == 1.022
assert np.round(check_mc_normalization(fit, [0], [15], 1000), 3) == 1.010
assert np.round(check_mc_normalization(fit, [0], [15], 10000), 3) == 0.999

fit = neighbors.KernelDensity(kernel='gaussian', bandwidth=0.1).fit(blobs_data.drop(columns=['x2']).values)
assert np.round(check_mc_normalization(fit, [0, -4], [15, 12], 10), 3) == 1.754
assert np.round(check_mc_normalization(fit, [0, -4], [15, 12], 100), 3) == 1.393
assert np.round(check_mc_normalization(fit, [0, -4], [15, 12], 1000), 3) == 0.924
assert np.round(check_mc_normalization(fit, [0, -4], [15, 12], 10000), 3) == 1.019

fit = neighbors.KernelDensity(kernel='gaussian', bandwidth=0.1).fit(blobs_data.values)
assert np.round(check_mc_normalization(fit, [0, -4, 2], [15, 12, 18], 10), 3) == 2.797
assert np.round(check_mc_normalization(fit, [0, -4, 2], [15, 12, 18], 100), 3) == 0.613
assert np.round(check_mc_normalization(fit, [0, -4, 2], [15, 12, 18], 1000), 3) == 1.316
assert np.round(check_mc_normalization(fit, [0, -4, 2], [15, 12, 18], 10000), 3) == 1.139

Acknowledgments#

  • Initial version: Mark Neubauer

© Copyright 2024