In [None]:
import pickle
from pathlib import Path

In [None]:
# set the seeds to make the notebook reproducible
import lightning as L
import matplotlib.pyplot as plt

# Regression with MNIST-1D

In order to make our lives a bit easier, we will use a simplified dataset called [MNIST1D](https://github.com/greydanus/mnist1d). This dataset is small and yet exposes quite some intricacies.

import pickle
from pathlib import Path

import lightning as L
import matplotlib.pyplot as plt

In [None]:
import numpy as np
import torch

L.seed_everything(41)

Download the MNIST1d demo dataset at [this URL](https://github.com/greydanus/mnist1d/raw/master/mnist1d_data.pkl) using your browser
or download the dataset directly from this notebook by uncommenting the following lines:

In [None]:
# import requests
# with open('mnist1d_data.pkl', 'wb') as out_file:
#    out_file.write(requests.get('https://github.com/greydanus/mnist1d/raw/master/mnist1d_data.pkl').content)

In [None]:
# load the dataset into this notebook
output = Path("./mnist1d_data.pkl")
data = pickle.load(output.open("rb"))

## Step 1: Visualize dataset

The dataset is inspired by the original MNIST dataset, but it is way smaller and 1D only.

In [None]:
x = data["x"]
y = data["y"]
x_test = data["x_test"]
y_test = data["y_test"]
t = data["t"]  # we will not need this variable
print("training", x.shape, y.shape)
print("test", x_test.shape, y_test.shape)

## Step 2: Normalisation

The signal in the mnist1d dataset is not normalized. We need to perform normalisation before we can proceed. We choose a min-max normalisation.

In [None]:
xmin, xmax = x.min(), x.max()
print(f"min/max of x: {xmin, xmax}")

In [None]:
x_ = (x - xmin) / (xmax - xmin)
x_test_ = (x_test - xmin) / (
    xmax - xmin
)  # we need to apply the same normalisation constants to the test set

print(
    f"normalisation: raw min/max {x.min(), x.max()} -> normed min/max {x_.min(), x_.max()}"
)

## Step 3: prepare training

Torch is a bit peculiar with respect to the encoding of the the signals. We need to inject a dimension in all data.

In [None]:
# 1D Convolutions are special, they need an additional axis inserted
# which represents the number of channels being present
x_ = np.expand_dims(x_, axis=1)
assert x_.shape == (
    x.shape[0],
    1,
    x.shape[-1],
), f"{x_.shape} does not match {x.shape[0], 1, x.shape[-1]}"
x_test_ = np.expand_dims(x_test_, axis=1)
y = np.expand_dims(y, axis=1)
y_test = np.expand_dims(y_test, axis=1)

print(x_.shape, y.shape)
print(x_test_.shape, y_test.shape)

Let's create a simple 1D regression training set by computing the sum across all input channels. Thus, we will have

- `x` - signals from mnist1d
- `y` - the sum of mnist1d for a given `x`

In [None]:
# Let's create a 1D training set
y_ = np.expand_dims(np.sum(x_, axis=-1), axis=-1)
y_test_ = np.expand_dims(np.sum(x_test_, axis=-1), axis=-1)

# for the sake of demonstration, we add some noise to the data
y_ *= np.random.randn(*y_.shape) / 25.0 + 1.0
y_test_ *= np.random.randn(*y_test_.shape) / 25.0 + 1.0

print(y_.shape, y_test_.shape)
plt.scatter(np.squeeze(y), np.squeeze(y_))
plt.xlabel("labels")
plt.ylabel("target sum(x)")

In [None]:
# check if there are doublicates in the training data
unique_y_ = np.unique(y_)
unique_test_ = np.unique(y_test_)
unique_all = np.unique(np.concatenate([y_, y_test_]))

print(unique_y_.shape, unique_test_.shape, unique_all.shape)

In [None]:
uy = np.unique(y)
uy_stat = []
my_stat = []
for yvalue in [int(item) for item in uy]:
    ymask = y == yvalue
    uy_stat.append(np.std(y_[ymask]))
    my_stat.append(np.mean(y_[ymask]))

fig, ax = plt.subplots(1, 2, figsize=(8, 4), tight_layout=True)
ax[0].plot(uy, my_stat)
ax[0].set_ylabel("mean(target)")
ax[0].set_xlabel("y label")

ax[1].plot(uy, uy_stat)
ax[1].set_ylabel("std(target)")
ax[1].set_xlabel("y label")

In [None]:
# let's plot the result, use the first 10 samples
n = 10
xaxis = np.arange(0, x_.shape[-1], 1)
fig, ax = plt.subplots(1, n, figsize=(15, 3), sharey=True)
for i in range(n):
    ax[i].plot(xaxis, x_[i, 0, ...], label="original")
    ax[i].set_title(f"label y={y[i,0]}\ntarget={y_[i,0,0]:02.1f}")
    ax[i].set_xlabel(f"input x")

Convert everything to a `torch.Tensor` object.

In [None]:
x_, y = torch.tensor(x_, dtype=torch.float32), torch.tensor(y_, dtype=torch.float32)

In [None]:
from sklearn.model_selection import train_test_split

X_test, x_holdout, Y_test_, y_holdout = train_test_split(
    x_test_, y_test_, test_size=500
)
x_holdout = x_holdout.astype(np.float32)
y_holdout = y_holdout.astype(np.float32)

print(X_test.shape, x_holdout.shape, Y_test_.shape, y_holdout.shape)
print(X_test.dtype, x_holdout.dtype, Y_test_.dtype, y_holdout.dtype)

In [None]:
x_test_, y_test = torch.tensor(X_test, dtype=torch.float32), torch.tensor(
    Y_test_, dtype=torch.float32
)

In [None]:
fig, ax = plt.subplots(1, 3, figsize=(15, 3))
ax[0].hist(y.numpy().squeeze())
ax[0].set_title("training")
ax[1].hist(y_test.numpy().squeeze())
ax[1].set_title("test")
ax[2].hist(np.squeeze(y_holdout))
ax[2].set_title("holdout")