Homework 11: Unsupervised Learning and Anomaly Detection#

import numpy as np
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
import torch.nn.functional as F

torch.manual_seed(7)
np.random.seed(7)

device = torch.device(
    "cuda:0" if torch.cuda.is_available()
    else "mps" if torch.backends.mps.is_available()
    else "cpu"
)
print("CUDA:", torch.cuda.is_available())
print("MPS:", torch.backends.mps.is_available())
print("Selected device:", device)

Synthetic Detector Data#

Imagine a simplified collider detector in which each event corresponds to a reconstructed particle shower or track-like object. In ordinary running conditions, most events come from a few well-understood Standard Model background processes. Those backgrounds populate characteristic regions of feature space because they have typical energy scales, spatial profiles, and timing signatures.

In contrast, a rare new-physics or detector-pathology event could look different: it might deposit unusually large energy, appear at larger radius, arrive with an unusual timing offset, or produce an atypical shower shape. We will model those unusual events as anomalies. The point of the exercise is not high-fidelity detector simulation, but a controlled setting where unsupervised methods can learn normal structure and then flag rare departures from it.

Each event has four features:

  • E: reconstructed energy

  • R: radial displacement from the beamline

  • T: arrival time offset

  • S: shower-shape observable

Most events come from two common detector populations, which you can think of as two different background channels. Rare anomalous events are generated from a different mechanism with shifted feature values.

def make_detector_data(n_normal=1200, n_anomaly=60, seed=7):
    rng = np.random.default_rng(seed)

    n1 = n_normal // 2
    n2 = n_normal - n1

    normal_1 = rng.multivariate_normal(
        mean=[4.8, 1.4, 0.1, 0.9],
        cov=[[0.30, 0.05, 0.00, 0.02],
             [0.05, 0.20, 0.01, 0.03],
             [0.00, 0.01, 0.15, 0.01],
             [0.02, 0.03, 0.01, 0.12]],
        size=n1,
    )
    normal_2 = rng.multivariate_normal(
        mean=[7.3, 2.7, -0.4, 1.9],
        cov=[[0.45, 0.02, 0.03, 0.04],
             [0.02, 0.28, 0.01, 0.05],
             [0.03, 0.01, 0.18, 0.00],
             [0.04, 0.05, 0.00, 0.20]],c
        size=n2,
    )
    anomalies = rng.multivariate_normal(
        mean=[10.5, 4.6, 2.0, 3.0],
        cov=[[0.55, 0.10, 0.05, 0.00],
             [0.10, 0.35, 0.02, 0.02],
             [0.05, 0.02, 0.25, 0.01],
             [0.00, 0.02, 0.01, 0.25]],
        size=n_anomaly,
    )

    X = np.vstack([normal_1, normal_2, anomalies]).astype(np.float32)
    y = np.concatenate([
        np.zeros(n_normal, dtype=np.int64),
        np.ones(n_anomaly, dtype=np.int64),
    ])
    return X, y

X, y = make_detector_data()
feature_names = ['E', 'R', 'T', 'S']
X_normal = X[y == 0]
X_anomaly = X[y == 1]
print(X.shape, y.shape)

Problem 1: Standardize Features and Compute PCA Projection#

Unsupervised methods are sensitive to feature scales. Before applying PCA or clustering, we usually standardize each feature so that it has comparable numerical scale.

Implement:

  • fit_standardizer(X) returning (mean, std)

  • apply_standardizer(X, mean, std)

  • pca_project_2d(X) using SVD or eigendecomposition to return a 2D projection of shape (n_samples, 2)

Use only the normal events to fit the standardizer and PCA directions. This is typical in anomaly detection, where the representation is learned from background-like data.

def fit_standardizer(X):
    # YOUR CODE HERE
    raise NotImplementedError('Implement fit_standardizer')

def apply_standardizer(X, mean, std):
    # YOUR CODE HERE
    raise NotImplementedError('Implement apply_standardizer')

def pca_project_2d(X):
    # YOUR CODE HERE
    raise NotImplementedError('Implement pca_project_2d')

# Local checks
mean, std = fit_standardizer(X_normal)
Xn = apply_standardizer(X_normal, mean, std)
Z = pca_project_2d(Xn)

assert mean.shape == (4,)
assert std.shape == (4,)
assert Xn.shape == X_normal.shape
assert Z.shape == (X_normal.shape[0], 2)
assert np.allclose(Xn.mean(axis=0), 0.0, atol=1e-1)
print('Problem 1 checks passed.')

Figure for Problem 1#

This figure helps you inspect two things:

  • whether standardization places features on a comparable scale

  • whether a 2D PCA view begins to separate the dominant detector populations

If your PCA implementation is working, the normal events should organize into visible low-dimensional structure, even before any labels are used.

mean_demo, std_demo = fit_standardizer(X_normal)
Xn_demo = apply_standardizer(X_normal, mean_demo, std_demo)
Za_demo = pca_project_2d(apply_standardizer(X, mean_demo, std_demo))

fig, ax = plt.subplots(1, 2, figsize=(10, 4))
ax[0].boxplot(Xn_demo, tick_labels=feature_names)
ax[0].set_title('Standardized normal features')
ax[0].set_ylabel('z-score')

ax[1].scatter(Za_demo[y == 0, 0], Za_demo[y == 0, 1], s=10, alpha=0.5, label='normal')
ax[1].scatter(Za_demo[y == 1, 0], Za_demo[y == 1, 1], s=18, alpha=0.9, label='anomaly')
ax[1].set_title('PCA projection of detector events')
ax[1].set_xlabel('PC1')
ax[1].set_ylabel('PC2')
ax[1].legend(frameon=False)
plt.tight_layout()
plt.show()

Problem 2: K-means Clustering on Normal Events#

K-means is a simple and widely used unsupervised learning algorithm. Here we will use it to find two common event populations in the normal detector sample.

Implement:

  • assign_clusters(X, centers) returning the nearest-center assignment for each point

  • update_centers(X, labels, k) returning updated cluster centers

  • run_kmeans(X, init_centers, n_steps=10) returning (labels, centers)

Use Euclidean distance and fit only on the standardized normal data.

def assign_clusters(X, centers):
    # YOUR CODE HERE
    raise NotImplementedError('Implement assign_clusters')

def update_centers(X, labels, k):
    # YOUR CODE HERE
    raise NotImplementedError('Implement update_centers')

def run_kmeans(X, init_centers, n_steps=10):
    # YOUR CODE HERE
    raise NotImplementedError('Implement run_kmeans')

# Local checks
Xn = apply_standardizer(X_normal, *fit_standardizer(X_normal))
init_centers = Xn[[0, 10]].copy()
labels, centers = run_kmeans(Xn, init_centers, n_steps=6)

assert labels.shape == (Xn.shape[0],)
assert centers.shape == (2, Xn.shape[1])
assert set(np.unique(labels)).issubset({0, 1})
print('Problem 2 checks passed.')

Figure for Problem 2#

This plot shows the learned K-means partition in the 2D PCA plane. The cluster colors come entirely from unsupervised fitting on the normal sample.

You should see that K-means is capturing the two dominant normal event populations, even though it has no access to labels or anomaly information.

mean_demo, std_demo = fit_standardizer(X_normal)
Xn_demo = apply_standardizer(X_normal, mean_demo, std_demo)
Z_demo = pca_project_2d(Xn_demo)
labels_demo, centers_demo = run_kmeans(Xn_demo, Xn_demo[[0, 10]].copy(), n_steps=8)
center_proj = pca_project_2d(np.vstack([Xn_demo, centers_demo]))[-2:]

plt.figure(figsize=(5.5, 4.5))
plt.scatter(Z_demo[:, 0], Z_demo[:, 1], c=labels_demo, s=10, cmap='coolwarm', alpha=0.6)
plt.scatter(center_proj[:, 0], center_proj[:, 1], s=180, c='k', marker='x', label='cluster centers')
plt.xlabel('PC1')
plt.ylabel('PC2')
plt.title('K-means clustering of normal detector events')
plt.legend(frameon=False)
plt.tight_layout()
plt.show()

Problem 3: Distance-based Anomaly Score#

Once we have a model of normal structure, we can score new events by how far they lie from that structure. A simple approach is to assign each event the squared distance to its nearest K-means center.

Implement:

  • nearest_center_distance2(X, centers) returning the squared distance to the nearest center

  • anomaly_threshold(scores, frac=0.05) returning a score threshold based on the top frac fraction of normal scores

  • predict_anomaly(scores, threshold) returning a binary anomaly mask

Fit the score model using only normal events, then evaluate on the combined dataset.

def nearest_center_distance2(X, centers):
    # YOUR CODE HERE
    raise NotImplementedError('Implement nearest_center_distance2')


def anomaly_threshold(scores, frac=0.05):
    # YOUR CODE HERE
    raise NotImplementedError('Implement anomaly_threshold')


def predict_anomaly(scores, threshold):
    # YOUR CODE HERE
    raise NotImplementedError('Implement predict_anomaly')

# Local checks
mean, std = fit_standardizer(X_normal)
Xn_train = apply_standardizer(X_normal, mean, std)
labels_train, centers = run_kmeans(Xn_train, Xn_train[[0, 10]].copy(), n_steps=8)
scores_train = nearest_center_distance2(Xn_train, centers)
thr = anomaly_threshold(scores_train, frac=0.05)
pred = predict_anomaly(scores_train, thr)

assert scores_train.shape == (Xn_train.shape[0],)
assert np.isscalar(thr)
assert pred.shape == (Xn_train.shape[0],)
print('Problem 3 checks passed.')

Figure for Problem 3#

This figure compares anomaly scores for normal and anomalous events using the nearest-cluster-center distance.

A useful anomaly score should push most normal events toward lower values and assign larger values to the rare, structurally unusual events.

mean_demo, std_demo = fit_standardizer(X_normal)
X_all_demo = apply_standardizer(X, mean_demo, std_demo)
Xn_demo = apply_standardizer(X_normal, mean_demo, std_demo)
_, centers_demo = run_kmeans(Xn_demo, Xn_demo[[0, 10]].copy(), n_steps=8)
scores_demo = nearest_center_distance2(X_all_demo, centers_demo)
thr_demo = anomaly_threshold(nearest_center_distance2(Xn_demo, centers_demo), frac=0.05)

plt.figure(figsize=(6.5, 4))
plt.hist(scores_demo[y == 0], bins=30, alpha=0.7, label='normal')
plt.hist(scores_demo[y == 1], bins=20, alpha=0.7, label='anomaly')
plt.axvline(thr_demo, color='k', ls='--', label='threshold')
plt.xlabel('nearest-center squared distance')
plt.ylabel('count')
plt.title('Distance-based anomaly score')
plt.legend(frameon=False)
plt.tight_layout()
plt.show()

Problem 4: Autoencoder Reconstruction Error#

Autoencoders learn to compress and reconstruct normal data. If they are trained only on normal events, unusual events often reconstruct poorly and receive larger reconstruction errors.

Implement:

  • AutoEncoder.forward(x)

  • reconstruction_error(x, xhat) returning mean squared error per event

  • the training loop for the autoencoder on standardized normal data only

This is a classic unsupervised anomaly-detection approach and connects directly to the lecture discussion of representation learning.

class AutoEncoder(nn.Module):
    def __init__(self, input_dim=4, latent_dim=2):
        super().__init__()
        self.encoder = nn.Sequential(
            nn.Linear(input_dim, 16),
            nn.ReLU(),
            nn.Linear(16, latent_dim)
        )
        self.decoder = nn.Sequential(
            nn.Linear(latent_dim, 16),
            nn.ReLU(),
            nn.Linear(16, input_dim)
        )

    def forward(self, x):
        # YOUR CODE HERE
        raise NotImplementedError('Implement AutoEncoder.forward')

def reconstruction_error(x, xhat):
    # YOUR CODE HERE
    raise NotImplementedError('Implement reconstruction_error')

mean, std = fit_standardizer(X_normal)
Xn_train = apply_standardizer(X_normal, mean, std)
train_tensor = torch.tensor(Xn_train, dtype=torch.float32).to(device)

ae = AutoEncoder().to(device)
opt = torch.optim.Adam(ae.parameters(), lr=1e-2)

for step in range(300):
    # YOUR CODE HERE. TO DO:
    # 1) reconstruct the normal training data
    # 2) compute mean reconstruction loss
    # 3) backprop and update
    pass

# Local checks
with torch.no_grad():
    xhat = ae(train_tensor[:5])
    err = reconstruction_error(train_tensor[:5], xhat)

assert xhat.shape == train_tensor[:5].shape
assert err.shape == (5,)
print('Problem 4 checks passed.')

Figure for Problem 4#

This figure compares reconstruction-error distributions for normal and anomalous events after training on normal events only.

If the autoencoder learns the structure of the normal detector population, anomalous events should tend to reconstruct less accurately and shift toward larger errors.

mean_demo, std_demo = fit_standardizer(X_normal)
X_all_demo = apply_standardizer(X, mean_demo, std_demo)
X_all_tensor = torch.tensor(X_all_demo, dtype=torch.float32).to(device)

with torch.no_grad():
    xhat_demo = ae(X_all_tensor)
    err_demo = reconstruction_error(X_all_tensor, xhat_demo).detach().cpu().numpy()

plt.figure(figsize=(6.5, 4))
plt.hist(err_demo[y == 0], bins=30, alpha=0.7, label='normal')
plt.hist(err_demo[y == 1], bins=20, alpha=0.7, label='anomaly')
plt.xlabel('reconstruction error')
plt.ylabel('count')
plt.title('Autoencoder anomaly score')
plt.legend(frameon=False)
plt.tight_layout()
plt.show()

Problem 5: Thresholding and Basic Detection Metrics#

An anomaly score becomes operational only after we choose a threshold. In a realistic analysis, thresholds are often tuned to control false positives on well-understood background data.

Implement:

  • precision_recall(y_true, y_pred) returning (precision, recall)

  • f1_score(precision, recall)

  • scan_thresholds(scores_normal, scores_all, y_all, n_steps=50) returning the best threshold and best F1 score over a grid of thresholds based on the normal score range

Use the autoencoder reconstruction error as the anomaly score in this problem.

def precision_recall(y_true, y_pred):
    # YOUR CODE HERE
    raise NotImplementedError('Implement precision_recall')

def f1_score(precision, recall):
    # YOUR CODE HERE
    raise NotImplementedError('Implement f1_score')

def scan_thresholds(scores_normal, scores_all, y_all, n_steps=50):
    # YOUR CODE HERE
    raise NotImplementedError('Implement scan_thresholds')

# Local checks
y_true_test = np.array([0, 0, 1, 1, 1])
y_pred_test = np.array([0, 1, 1, 0, 1])
p, r = precision_recall(y_true_test, y_pred_test)
f1 = f1_score(p, r)

assert np.isclose(p, 2 / 3)
assert np.isclose(r, 2 / 3)
assert np.isclose(f1, 2 / 3)
print('Problem 5 checks passed.')

Figure for Problem 5#

This final plot scans thresholds on the autoencoder score and shows the resulting F1 value. It turns a continuous anomaly score into a concrete decision rule.

The best threshold depends on the tradeoff between rejecting rare anomalies and avoiding too many false positives among normal detector events.

mean_demo, std_demo = fit_standardizer(X_normal)
Xn_demo = apply_standardizer(X_normal, mean_demo, std_demo)
Xa_demo = apply_standardizer(X, mean_demo, std_demo)
Xn_tensor = torch.tensor(Xn_demo, dtype=torch.float32).to(device)
Xa_tensor = torch.tensor(Xa_demo, dtype=torch.float32).to(device)

with torch.no_grad():
    score_normal = reconstruction_error(Xn_tensor, ae(Xn_tensor)).detach().cpu().numpy()
    score_all = reconstruction_error(Xa_tensor, ae(Xa_tensor)).detach().cpu().numpy()

ths = np.linspace(score_normal.min(), max(score_all.max(), score_normal.max()), 60)
f1_vals = []
for th in ths:
    y_pred = (score_all > th).astype(np.int64)
    p, r = precision_recall(y, y_pred)
    f1_vals.append(f1_score(p, r))
best_th, best_f1 = scan_thresholds(score_normal, score_all, y, n_steps=60)

plt.figure(figsize=(6.5, 4))
plt.plot(ths, f1_vals, lw=2)
plt.axvline(best_th, color='k', ls='--', label='best threshold')
plt.xlabel('threshold on reconstruction error')
plt.ylabel('F1 score')
plt.title('Threshold scan for anomaly detection')
plt.legend(frameon=False)
plt.tight_layout()
plt.show()
print('best F1 =', best_f1)

Acknowledgments#

  • Initial version: Mark Neubauer

© Copyright 2026