"""Top-level module for instantiating a CrabNet model to predict properties."""
import os
from os import PathLike
from os.path import dirname, join
from typing import Callable, List, Optional, Tuple, Union
from warnings import warn
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import torch
from sklearn.metrics import mean_absolute_error, roc_auc_score
from sklearn.model_selection import train_test_split
from torch import nn
from torch.optim.lr_scheduler import CyclicLR
from crabnet.kingcrab import SubCrab
# for backwards compatibility of imports
from crabnet.utils.data import get_data, groupby_formula # noqa: F401
from crabnet.utils.optim import SWA
from crabnet.utils.get_compute_device import get_compute_device
from crabnet.utils.utils import (
BCEWithLogitsLoss,
DummyScaler,
EDM_CsvLoader,
Lamb,
Lookahead,
RobustL1,
RobustL2,
Scaler,
count_parameters,
)
# retrieve static file from package: https://stackoverflow.com/a/20885799/13697228
# %%
[docs]class CrabNet(nn.Module):
"""Model class for instantiating, training, and predicting with CrabNet models."""
[docs] def __init__(
self,
model: Optional[Union[str, SubCrab]] = None,
model_name: str = "UnnamedModel",
n_elements: Union[str, int] = "infer",
classification: bool = False,
verbose: bool = True,
force_cpu: bool = False,
prefer_last: bool = True,
batch_size: Optional[int] = None,
epochs: Optional[int] = None,
epochs_step: int = 10,
checkin: Optional[int] = None,
fudge: float = 0.02,
out_dims: int = 3,
d_model: int = 512,
extend_features: Optional[List[str]] = None,
N: int = 3,
heads: int = 4,
elem_prop: str = "mat2vec",
compute_device: Optional[Union[str, torch.device]] = None,
out_hidden: List[int] = [1024, 512, 256, 128],
pe_resolution: int = 5000,
ple_resolution: int = 5000,
bias=False,
emb_scaler: float = 1.0,
pos_scaler: float = 1.0,
pos_scaler_log: float = 1.0,
dim_feedforward: int = 2048,
dropout: float = 0.1,
val_size: float = 0.2,
criterion: Optional[Union[str, Callable]] = None,
lr: float = 1e-3,
betas: Tuple[float, float] = (0.9, 0.999),
eps: float = 1e-6,
weight_decay: float = 0,
adam: bool = False,
min_trust: Optional[float] = None,
alpha: float = 0.5,
k: int = 6,
base_lr: float = 1e-4,
max_lr: float = 6e-3,
random_state: Optional[int] = None,
mat_prop: Optional[Union[str, PathLike]] = None,
losscurve: bool = True,
learningcurve: bool = True,
save: bool = True,
):
"""
Instantiate a CrabNet model.
Parameters
----------
model : _CrabNet
Instantiated CrabNet class, by default None.
model_name : str, optional
The name of your model, by default "UnnamedModel"
n_elements : str, optional
The maximum number of elements to consider during featurization, by default
"infer"
classification : bool, optional
Whether to perform classification. If False, then assume regression. By
default, False
verbose : bool, optional
Whether model information and progress should be printed, by default True
force_cpu : bool, optional
Put all models on the cpu regardless of other available devices CPU, by default False
prefer_last : bool, optional
Whether to prefer last used compute_device, by default True
batch_size : int
The batch size to use during training. If not None, then used as-is. If
specified, then it is assigned either 2 ** 7 == 128 or 2 ** 12 == 4096
based on the value of `data_size`.
epochs : int, optional
How many epochs (# of passes through entire dataset). If None, then this is
automatically assigned based on the dataset size using
`get_epochs_checkin_stepsize`. The number of epochs must be even. By default
None
checkin : int, optional
When to do the checkin step. If None, then automatically assigned as half
the number of epochs, by default None
fudge : float, optional
The "fudge" (i.e. noise) applied to the fractional encodings, by default 0.02
out_dims : int, optional
Output dimensions for Residual Network, by default 3
d_model : int, optional
Size of the Model, see paper, by default 512
extend_features : _type_, optional
Whether extended features will be included, by default None
N : int, optional
Number of attention layers, by default 3
heads : int, optional
Number of attention heads to use, by default 4
elem_prop : str, optional
Which elemental feature vector to use. Possible values are "jarvis", "magpie",
"mat2vec", "oliynyk", "onehot", "ptable", and "random_200", by default
"mat2vec"
compute_device : _type_, optional
Computing device to run model on, by default None
out_hidden : list(int), optional
Architecture of hidden layers in the Residual Network, by default [1024, 512, 256, 128]
pe_resolution : int, optional
Number of discretizations for the prevalence encoding, by default 5000
ple_resolution : int, optional
Number of discretizations for the prevalence log encoding, by default 5000
elem_prop : str, optional
Which elemental feature vector to use. Possible values are "jarvis", "magpie",
"mat2vec", "oliynyk", "onehot", "ptable", and "random_200", by default "mat2vec"
bias : bool, optional
Whether to bias the Residual Network, by default False
emb_scaler : float, optional
Float value by which to scale the elemental embeddings, by default 1.0
pos_scaler : float, optional
Float value by which to scale the fractional encodings, by default 1.0
pos_scaler_log : float, optional
Float value by which to scale the log fractional encodings, by default 1.0
dim_feedforward : int, optional
Dimenions of the feed forward network following transformer, by default 2048
dropout : float, optional
Percent dropout in the feed forward network following the transformer, by
default 0.1
val_size : float, optional
fraction of validation data to take from training data only if `val_df` is
None. By default, 0.2
criterion : torch.nn Module, optional
Or in other words the loss function (e.g. BCEWithLogitsLoss for classification
or RobustL1 for regression), by default None. Possible values are
`BCEWithLogitsLoss`, `RobustL1`, and `RobustL2`.
lr : float, optional
Learning rate, by default 1e-3
betas : tuple, optional
Coefficients on gradient and squared gradient during ``Lamb`` optimization, by default (0.9, 0.999)
eps : float, optional
Value added to the denominator during ``Lamb`` optimization, by default 1e-6
weight_decay : float, optional
L2 penalty in ``Lamb``, by default 0
adam : bool, optional
Whether to constrain the ``Lamb`` model to be the Adam model, by default False
min_trust : float, optional
[description], by default None
alpha : float, optional
``Lookahead`` "slow update" rate, by default 0.5
k : int, optional
Number of ``Lookahead`` steps, by default 6
base_lr : float, optional
Base learning rate, by default 1e-4
max_lr : float, optional
Max learning rate, by default 6e-3
random_state : int, optional
The seed to use for both `torch` and `numpy` random number generators. If
None, then this has no effect. By default None.
mat_prop : str, optional
name of material property (doesn't affect computation), by default None
losscurve : bool, optional
Whether to plot a loss curve, by default False
learningcurve : bool, optional
Whether to plot a learning curve, by default True
save : bool, optional
Whether to save the weights of the model, by default True
"""
super().__init__()
if compute_device is None:
compute_device = get_compute_device(
force_cpu=force_cpu, prefer_last=prefer_last
)
elif compute_device == "cpu":
compute_device = torch.device("cpu")
self.compute_device = compute_device
self.avg = True
self.out_dims = out_dims
self.d_model = d_model
self.extend_features = extend_features
self.N = N
self.heads = heads
self.compute_device = compute_device
self.bias = bias
self.out_hidden = out_hidden
self.batch_size = batch_size
self.epochs = epochs
self.epochs_step = epochs_step
self.checkin = checkin
self.pe_resolution = pe_resolution
self.ple_resolution = ple_resolution
self.emb_scaler = emb_scaler
self.pos_scaler = pos_scaler
self.pos_scaler_log = pos_scaler_log
self.dim_feedforward = dim_feedforward
self.dropout = dropout
self.criterion = criterion
self.lr = lr
self.betas = betas
self.eps = eps
self.weight_decay = weight_decay
self.adam = adam
self.min_trust = min_trust
self.alpha = alpha
self.k = k
self.base_lr = base_lr
self.max_lr = max_lr
# Apply BCEWithLogitsLoss to model output if binary classification is True
if classification:
self.classification = True
self.model_name = model_name
self.mat_prop = mat_prop
self.data_loader = None
self.train_loader = None
self.classification = False
self.n_elements = n_elements
self.fudge = fudge # expected fractional tolerance (std. dev) ~= 2%
self.verbose = verbose
self.elem_prop = elem_prop
self.losscurve = losscurve
self.learningcurve = learningcurve
self.losscurve_fig = None
self.learningcurve_fig = None
self.val_size = val_size
self.model = model
self.save = save
self.criterion_lookup = {
"RobustL1": RobustL1,
"RobustL2": RobustL2,
"BCEWithLogitsLoss": BCEWithLogitsLoss,
}
if random_state is not None:
torch.manual_seed(random_state)
np.random.seed(random_state)
self.data_type_torch = torch.float32
if self.verbose:
print("\nModel architecture: out_dims, d_model, N, heads")
print(f"{self.out_dims}, {self.d_model}, " f"{self.N}, {self.heads}")
print(f"Running on compute device: {self.compute_device}")
[docs] def fit(
self,
train_df: pd.DataFrame = None,
val_df: pd.DataFrame = None,
extend_features: List[str] = None,
data_dir: Union[str, PathLike] = join(
dirname(__file__), "data", "materials_data"
),
transfer: str = None,
):
"""Fit CrabNet to training data and update hyperparams with validation data.
Parameters
----------
train_df, val_df : pd.DataFrame, optional
Training and validation data with at minimum, "formula" and "target" columns
and optionally, "extra features" (based on names in `extend_features`). If
`val_df` is None, then `test_size` determines the amount of training data to
be split into `val_df`. By default None
extend_features : List[str], optional
Names of columns to use as extra features from `train_df` and `val_df`, by default None
data_dir : str, optional
The directory from which to load data if loading from a file rather than
a DataFrame. `data_dir` is only used if both `train_df` and `val_df` are
None. It is assumed that the files in the data directory will be named
`train.csv`, `val.csv`, and `test.csv`. By default join(dirname(__file__), "data", "materials_data")
transfer : str, optional
Path to the saved weights to use for transfer learning. If None, then no
transfer learning is performed. By default None
"""
self.d_extend = 0 if extend_features is None else len(extend_features)
if self.model is None:
self.model = SubCrab(
compute_device=self.compute_device,
out_dims=self.out_dims,
d_model=self.d_model,
d_extend=self.d_extend,
N=self.N,
heads=self.heads,
pe_resolution=self.pe_resolution,
ple_resolution=self.ple_resolution,
emb_scaler=self.emb_scaler,
pos_scaler=self.pos_scaler,
pos_scaler_log=self.pos_scaler_log,
dim_feedforward=self.dim_feedforward,
dropout=self.dropout,
).to(self.compute_device)
if self.verbose:
print(f"Model size: {count_parameters(self.model)} parameters\n")
# self.transfer_nn = TransferNetwork(512, 512)
# Train network starting at pretrained weights
if transfer is not None:
self.load_network(f"{transfer}.pth")
self.model_name = f"{self.mat_prop}"
(
train_data,
val_data,
data_size,
extra_train_data,
extra_val_data,
) = self._separate_extended_features(train_df, val_df, data_dir)
self.batch_size = self._default_batch_size(self.batch_size, data_size)
assert isinstance(self.batch_size, int)
self._load_trainval_data(
self.batch_size, train_data, val_data, extra_train_data, extra_val_data
)
self.epochs, self.checkin, self.stepsize = self._get_epochs_checkin_stepsize(
self.epochs, self.checkin
)
assert isinstance(self.epochs, int)
assert isinstance(self.checkin, int)
self.step_count = 0
self._select_criterion(self.criterion)
assert self.criterion is not None
assert isinstance(self.model, SubCrab)
base_optim = Lamb(
params=self.model.parameters(),
lr=self.lr,
betas=self.betas,
eps=self.eps,
weight_decay=self.weight_decay,
adam=self.adam,
min_trust=self.min_trust,
)
optimizer = Lookahead(base_optimizer=base_optim, alpha=self.alpha, k=self.k)
self.optimizer = SWA(optimizer)
lr_scheduler = CyclicLR(
self.optimizer,
base_lr=self.base_lr,
max_lr=self.max_lr,
cycle_momentum=False,
step_size_up=self.stepsize,
)
self.lr_scheduler = lr_scheduler
self.loss_curve: dict = {"train": [], "val": []}
self.stepping = True
self.swa_start = 2 # start at (n/2) cycle (lr minimum)
self.xswa: List[int] = []
self.yswa: List[float] = []
self.lr_list: List[float] = []
self.discard_n = 3
assert isinstance(self.epochs, int)
assert isinstance(self.checkin, int)
for epoch in range(self.epochs):
self.epoch = epoch
self.epochs = self.epochs
self._train()
self.lr_list.append(self.optimizer.param_groups[0]["lr"])
if (
(epoch + 1) % self.checkin == 0
or epoch == self.epochs - 1
or epoch == 0
):
self._losscurve_stats(self.epochs, epoch)
if self.losscurve:
self._plot_losscurve(self.checkin)
self._track_stats(self.epochs, self.checkin, self.learningcurve, epoch)
if self.optimizer.discard_count >= self.discard_n:
if self.verbose:
print(
f"Discarded: {self.optimizer.discard_count}/{self.discard_n}weight updates, early-stopping now"
)
self.optimizer.swap_swa_sgd()
break
if not (self.optimizer.discard_count >= self.discard_n):
self.optimizer.swap_swa_sgd()
if self.save:
self.save_network()
[docs] def predict(
self,
test_df: pd.DataFrame = None,
loader=None,
return_uncertainty=False,
return_true=False,
):
"""Predict on new data using a fitted CrabNet model.
Parameters
----------
test_df : pd.DataFrame, optional
_description_, by default None
loader : torch.Dataloader, optional
The Dataloader corresponding to the test data, by default None
return_uncertainty : bool, optional
Whether to return standard deviation uncertainties. If `return_true`, then
`return_uncertainty` takes precendence and is returned as the second output. By default False
return_true : bool, optional
Whether to return the true values (used for comparison with the predicted
values). If `return_uncertainty` is also specified, then the uncertainties
appear before the true values (i.e. pred, std, true), by default False
Returns
-------
pred : np.array
Predicted values. Always returned.
uncert : np.array
Standard deviation uncertainty. Returned if `return_uncertainty`. Precedes
`act` if `act` is also returned.
act : np.array
True values. Returned if `return_true`. `uncert` precedes `act` if both
`uncert` and `act` are returned.
Raises
------
SyntaxError
"Specify either data *or* loader, not neither."
SyntaxError
"Specify either data *or* loader, not both."
"""
if test_df is None and loader is None:
raise SyntaxError("Specify either data *or* loader, not neither.")
elif test_df is not None and loader is None:
if self.extend_features is not None:
extra_features = test_df[self.extend_features]
else:
extra_features = None
self.load_data(test_df, extra_features=extra_features)
loader = self.data_loader
elif test_df is not None and loader is not None:
raise SyntaxError("Specify either data *or* loader, not both.")
len_dataset = len(loader.dataset)
n_atoms = int(len(loader.dataset[0][0]) / 2)
act = np.zeros(len_dataset)
pred = np.zeros(len_dataset)
uncert = np.zeros(len_dataset)
formulae = np.empty(len_dataset, dtype=list)
atoms = np.empty((len_dataset, n_atoms))
fractions = np.empty((len_dataset, n_atoms))
assert isinstance(self.model, SubCrab)
self.model.eval()
with torch.no_grad():
for i, batch_df in enumerate(loader):
# extract data
X, y, formula, extra_features = batch_df
src, frac = X.squeeze(-1).chunk(2, dim=1)
# send to device
src = src.to(self.compute_device, dtype=torch.long, non_blocking=True)
frac = frac.to(
self.compute_device, dtype=self.data_type_torch, non_blocking=True
)
y = y.to(
self.compute_device, dtype=self.data_type_torch, non_blocking=True
)
extra_features = extra_features.to(
self.compute_device, dtype=self.data_type_torch, non_blocking=True
)
# predict
output = self.model.forward(src, frac, extra_features=extra_features)
prediction, uncertainty = output.chunk(2, dim=-1)
uncertainty = torch.exp(uncertainty) * self.scaler.std
prediction = self.scaler.unscale(prediction)
if self.classification:
prediction = torch.sigmoid(prediction)
assert self.batch_size is not None
data_loc = slice(i * self.batch_size, i * self.batch_size + len(y), 1)
atoms[data_loc, :] = src.cpu().numpy()
fractions[data_loc, :] = frac.cpu().numpy()
act[data_loc] = y.view(-1).cpu().numpy()
pred[data_loc] = prediction.view(-1).cpu().detach().numpy()
uncert[data_loc] = uncertainty.view(-1).cpu().detach().numpy()
formulae[data_loc] = formula
if return_uncertainty and return_true:
return pred, uncert, act
elif return_uncertainty and not return_true:
return pred, uncert
elif not return_uncertainty and return_true:
return pred, act
else:
return pred
[docs] def load_data(
self,
data: Union[str, pd.DataFrame],
extra_features: pd.DataFrame = None,
batch_size: int = 2**9,
train: bool = False,
):
"""Load data using PyTorch Dataloader.
Parameters
----------
data : Union[str, pd.DataFrame]
The data to load, which can be the folder in which the ``.csv`` file resides
or a pandas DataFrame.
extra_features : pd.DataFrame, optional
DataFrame containing the extra features that will be used for training (e.g.
state variables) that were extracted based on the column names in `CrabNet().extend_features`, by default None
batch_size : int, optional
The batch size to use during training. By default 2 ** 9
train : bool, optional
Whether this is the training data, by default False
"""
if self.batch_size is None:
self.batch_size = batch_size
inference = not train
data_loaders = EDM_CsvLoader(
data=data,
extra_features=extra_features,
batch_size=self.batch_size,
n_elements=self.n_elements,
inference=inference,
verbose=self.verbose,
elem_prop=self.elem_prop,
)
if self.verbose:
print(
f"loading data with up to {data_loaders.n_elements:0.0f} elements in the formula"
)
# update n_elements after loading dataset
self.n_elements = data_loaders.n_elements
data_loader = data_loaders.get_data_loaders(inference=inference)
y = data_loader.dataset.data[1]
if train:
self.train_len = len(y)
if self.classification:
self.scaler: Union[Scaler, DummyScaler] = DummyScaler(y)
else:
self.scaler = Scaler(y)
self.train_loader = data_loader
self.data_loader = data_loader
def _train(self):
"""Train the SubCrab PyTorch model using backpropagation."""
minima = []
for data in self.train_loader:
# separate into src and frac
X, y, _, extra_features = data
y = self.scaler.scale(y)
src, frac = X.squeeze(-1).chunk(2, dim=1)
frac = self._add_jitter(src, frac)
# send to PyTorch device
src = src.to(self.compute_device, dtype=torch.long, non_blocking=True)
frac = frac.to(
self.compute_device, dtype=self.data_type_torch, non_blocking=True
)
y = y.to(self.compute_device, dtype=self.data_type_torch, non_blocking=True)
extra_features = extra_features.to(
self.compute_device, dtype=self.data_type_torch, non_blocking=True
)
# train
output = self.model.forward(src, frac, extra_features=extra_features)
prediction, uncertainty = output.chunk(2, dim=-1)
loss = self.criterion(prediction.view(-1), uncertainty.view(-1), y.view(-1))
# backpropagation
loss.backward()
self.optimizer.step()
self.optimizer.zero_grad()
if self.stepping:
self.lr_scheduler.step()
# hyperparameter updates
swa_check = self.epochs_step * self.swa_start - 1
epoch_check = (self.epoch + 1) % (2 * self.epochs_step) == 0
learning_time = epoch_check and self.epoch >= swa_check
if learning_time:
pred_v, true_v = self.predict(loader=self.data_loader, return_true=True)
if np.any(np.isnan(pred_v)):
warn(
"NaN values found in `pred_v`. Replacing with DummyRegressor() values (i.e. mean of training targets)."
)
pred_v = np.nan_to_num(pred_v)
mae_v = mean_absolute_error(true_v, pred_v)
# https://github.com/pytorch/contrib/blob/master/torchcontrib/optim/swa.py
# https://pytorch.org/blog/pytorch-1.6-now-includes-stochastic-weight-averaging/
self.optimizer.update_swa(mae_v)
minima.append(self.optimizer.minimum_found)
if learning_time and not any(minima):
self.optimizer.discard_count += 1
if self.verbose:
print(f"Epoch {self.epoch} failed to improve.")
print(
f"Discarded: {self.optimizer.discard_count}/"
f"{self.discard_n} weight updates"
)
def _add_jitter(self, src, frac, type="normal"):
"""Add a small jitter to the input fractions.
This improves model robustness and increases stability.
Parameters
----------
src : torch.tensor
Tensor containing integers corresponding to elements in compound
frac : torch.tensor
Tensor containing the fractions of each element in compound
type : str, optional
How to add the jitter. Possible options are "normal" and "uniform". By
default, "normal"
Returns
-------
frac : torch.tensor
Tensor containing the fractions of each element in compound with added jitter.
"""
if type == "normal":
frac = frac * (1 + (torch.randn_like(frac)) * self.fudge) # normal
elif type == "uniform":
frac = frac * (1 + (torch.rand_like(frac) - 0.5) * self.fudge) # uniform
else:
raise NotImplementedError(f"{type} not supported as jitter type.")
frac = torch.clamp(frac, 0, 1)
frac[src == 0] = 0
frac = frac / frac.sum(dim=1).unsqueeze(1).repeat(1, frac.shape[-1])
return frac
def _get_epochs_checkin_stepsize(
self, epochs: Optional[int], checkin: Optional[int]
):
"""Automatically assign epochs, checkin point, and stepsize.
Parameters
----------
epochs : int, optional
How many epochs (# of passes through entire dataset). If None, then this is
automatically assigned based on the dataset size. The number of epochs must be even. By default None
checkin : int, optional
When to do the checkin step. If None, then automatically assigned as half
the number of epochs, by default None
Returns
-------
epochs : int
How many epochs (# of passes through entire dataset). If None, then this is
automatically assigned based on the dataset size. The number of epochs must be even. By default None
checkin : int
When to do the checkin step. If None, then automatically assigned as half
the number of epochs, by default None
stepsize : int
This is equal to `self.epochs_step * len(self.train_loader)`, or in other
the number of batches that are processed within each set of `epochs_step`.
"""
assert self.train_loader is not None
stepsize = self.epochs_step * len(self.train_loader)
if self.verbose:
print(
f"stepping every {stepsize} training passes, cycling lr every {self.epochs_step} epochs"
)
if epochs is None:
# n_iterations = 1e4
# epochs = max(int(n_iterations / len(self.data_loader)), 40)
epochs = 300
if self.verbose:
print(f"running for {epochs} epochs, unless early stopping occurs")
if checkin is None:
checkin = self.epochs_step * 2
if self.verbose:
print(
f"checkin at {self.epochs_step*2} " f"epochs to match lr scheduler"
)
assert isinstance(epochs, int)
mod = epochs % (self.epochs_step * 2)
if mod != 0:
updated_epochs = epochs + (self.epochs_step * 2) - mod
if self.verbose:
print(
f"{epochs} epochs not divisible by {self.epochs_step * 2} (2*epochs_step), "
f"updating epochs to {updated_epochs} for learning"
)
epochs = updated_epochs
return epochs, checkin, stepsize
def _losscurve_stats(self, epochs, epoch):
"""Compute loss curve statistics.
Parameters
----------
epochs : int
How many epochs (# of passes through entire dataset). If None, then this is
automatically assigned based on the dataset size. The number of epochs must be even. By default None
epoch : int
The current epoch.
"""
pred_t, true_t = self.predict(
loader=self.train_loader,
return_true=True,
)
mae_t = mean_absolute_error(true_t, pred_t)
self.loss_curve["train"].append(mae_t)
pred_v, true_v = self.predict(loader=self.data_loader, return_true=True)
mae_v = mean_absolute_error(true_v, pred_v)
self.loss_curve["val"].append(mae_v)
epoch_str = f"Epoch: {epoch}/{epochs} ---"
train_str = f'train mae: {self.loss_curve["train"][-1]:0.3g}'
val_str = f'val mae: {self.loss_curve["val"][-1]:0.3g}'
if self.classification:
train_auc = roc_auc_score(true_t, pred_t)
val_auc = roc_auc_score(true_v, pred_v)
train_str = f"train auc: {train_auc:0.3f}"
val_str = f"val auc: {val_auc:0.3f}"
if self.verbose:
print(epoch_str, train_str, val_str)
if self.epoch >= (self.epochs_step * self.swa_start - 1):
if (self.epoch + 1) % (self.epochs_step * 2) == 0:
self.xswa.append(self.epoch)
self.yswa.append(mae_v)
def _track_stats(self, epochs, checkin, learningcurve, epoch):
"""Track performance statistics of the learning process.
Parameters
----------
epochs : int
How many epochs (# of passes through entire dataset). If None, then this is
automatically assigned based on the dataset size. The number of epochs must be even. By default None
checkin : int
When to do the checkin step. If None, then automatically assigned as half
the number of epochs, by default None
learningcurve : bool, optional
Whether to plot the learning curve, by default True
epoch : int
The current epoch.
"""
if epoch == epochs - 1 or self.optimizer.discard_count >= self.discard_n:
# save output df for stats tracking
xval = np.arange(len(self.loss_curve["val"])) * checkin - 1
xval[0] = 0
tval = self.loss_curve["train"]
vval = self.loss_curve["val"]
os.makedirs("figures/lc_data", exist_ok=True)
df_loss = pd.DataFrame([xval, tval, vval]).T
df_loss.columns = ["epoch", "train loss", "val loss"]
df_loss["swa"] = ["n"] * len(xval)
df_loss.loc[df_loss["epoch"].isin(self.xswa), "swa"] = "y"
df_loss.to_csv(f"figures/lc_data/{self.model_name}_lc.csv", index=False)
if learningcurve:
self._plot_learningcurve(checkin)
def _plot_learningcurve(self, checkin: int):
"""Plot the learning curve periodically (beginning, checkin, end).
checkin : int
When to do the checkin step. If None, then automatically assigned as half
the number of epochs, by default None
"""
if self.learningcurve_fig is None:
self.learningcurve_fig = plt.figure(figsize=(8, 5))
else:
plt.cla()
xval = np.arange(len(self.loss_curve["val"])) * checkin - 1
xval[0] = 0
plt.plot(
xval,
self.loss_curve["train"],
"o-",
label=r"""$\mathrm{MAE}_\mathrm{train}$""",
)
plt.plot(
xval,
self.loss_curve["val"],
"s--",
label=r"""$\mathrm{MAE}_\mathrm{val}$""",
)
if self.epoch >= (self.epochs_step * self.swa_start - 1):
plt.plot(
self.xswa,
self.yswa,
"o",
ms=12,
mfc="none",
label="SWA point",
)
plt.ylim(0, 2 * np.mean(self.loss_curve["val"]))
plt.title(f"{self.model_name}")
plt.xlabel("epochs")
plt.ylabel("MAE")
plt.legend()
plt.savefig(f"figures/lc_data/{self.model_name}_lc.png")
# plt.show()
# https://stackoverflow.com/a/56119926/13697228
assert self.learningcurve_fig is not None
self.learningcurve_fig.canvas.draw()
plt.pause(0.01)
def _plot_losscurve(self, checkin):
"""Plot the loss curve periodically (beginning, checkin, end).
Parameters
----------
checkin : int
When to do the checkin step. If None, then automatically assigned as half
the number of epochs, by default None
"""
if self.losscurve_fig is None:
self.losscurve_fig = plt.figure(figsize=(8, 5))
else:
plt.cla()
xval = np.arange(len(self.loss_curve["val"])) * checkin - 1
xval[0] = 0
plt.plot(xval, self.loss_curve["train"], "o-", label="train_mae")
plt.plot(xval, self.loss_curve["val"], "s--", label="val_mae")
plt.plot(self.xswa, self.yswa, "o", ms=12, mfc="none", label="SWA point")
plt.ylim(0, 2 * np.mean(self.loss_curve["val"]))
plt.title(f"{self.model_name}")
plt.xlabel("epochs")
plt.ylabel("MAE")
plt.legend()
# https://stackoverflow.com/a/56119926/13697228
assert self.losscurve_fig is not None
self.losscurve_fig.canvas.draw()
plt.pause(0.01)
def _select_criterion(self, criterion: Optional[Union[str, Callable]]):
"""Automatically select a criterion if None was specified.
Parameters
----------
criterion : Union[str, Callable]
If a str, then must be one of "RobustL1", "RobustL2", or
"BCEWithLogitsLoss". If None and classification, then
`BCEWithLogitsLoss` is used. If None and not `classification`, then
`RobustL1` is used. If a Callable, then it must follow a similar API to
e.g. `RobustL1`.
"""
if criterion is None:
if self.classification:
if self.verbose:
print("Using BCE loss for classification task")
self.criterion = BCEWithLogitsLoss
else:
self.criterion = RobustL1
elif type(criterion) is str:
self.criterion = self.criterion_lookup[criterion]
else:
self.criterion = criterion
def _load_trainval_data(
self,
batch_size: int,
train_data: Union[str, pd.DataFrame],
val_data: Union[str, pd.DataFrame],
extra_train_data: pd.DataFrame,
extra_val_data: pd.DataFrame,
):
"""Load both the training and validation data via PyTorch Dataloaders.
Parameters
----------
batch_size : int, optional
The batch size to use during training. By default 2 ** 9
train_data, val_data : Union[str, pd.DataFrame]
Either a path to the data file or a DataFrame containing at minimum
and "target" columns for training and validation data, respectively.
extra_train_data, extra_val_data : pd.DataFrame
DataFrame containing feature data for columns in `train_data` and `val_data`
by `extend_features` for training and validation data, respectively.
"""
self.load_data(
train_data,
batch_size=batch_size,
train=True,
extra_features=extra_train_data,
)
if self.verbose:
print(
f"training with batchsize {batch_size} "
f"(2**{np.log2(batch_size):0.3f})"
)
if val_data is not None:
self.load_data(
val_data, batch_size=batch_size, extra_features=extra_val_data
)
assert_train_str = "Please Load Training Data (self.train_loader)"
assert_val_str = "Please Load Validation Data (self.data_loader)"
assert self.train_loader is not None, assert_train_str
assert self.data_loader is not None, assert_val_str
def _default_batch_size(self, batch_size: Optional[int], data_size: int):
"""Assign a default batch size based on the size of the dataset.
Parameters
----------
batch_size : int
The batch size to use during training. If not None, then used as-is. If
specified, then it is assigned either 2 ** 7 == 128 or 2 ** 12 == 4096
based on the value of `data_size`.
data_size : int
The number of training datapoints.
Returns
-------
batch_size : int
The batch size to use during training. If not None, then used as-is. If
specified, then it is assigned either 2 ** 7 == 128 or 2 ** 12 == 4096
based on the value of `data_size`.
"""
# Load the train and validation data before fitting the network
if batch_size is None:
batch_size = 2 ** round(np.log2(data_size) - 4)
if batch_size < 2**7:
batch_size = 2**7
if batch_size > 2**12:
batch_size = 2**12
return batch_size
def _separate_extended_features(
self,
train_df: pd.DataFrame,
val_df: pd.DataFrame,
data_dir: Union[str, PathLike],
):
"""Extract extra features specified by `extend_features` from data.
Because `data_dir` can be specified instead of `train_df` and `val_df`, the data
will be read twice if using a file: once to pull out the extra features here,
and once when the dataset is loaded into the Dataloader.
Parameters
----------
train_df, val_df : pd.DataFrame, optional
Training and validation data with at minimum, "formula" and "target" columns
and optionally, "extra features" (based on names in `extend_features`). If
`val_df` is None, then `test_size` determines the amount of training data to
be split into `val_df`. By default None
data_dir : str, optional
The directory from which to load data if loading from a file rather than
a DataFrame. `data_dir` is only used if both `train_df` and `val_df` are
None. It is assumed that the files in the data directory will be named
`train.csv`, `val.csv`, and `test.csv`. By default join(dirname(__file__), "data", "materials_data")
Returns
-------
train_data, val_data : pd.DataFrame
Training and validation data, respectively. Either the original DataFrame
or, if `train_df` and `val_df` were both None, the path to the training and
validation data. If paths are returned, the training and validation data is
assumed to be located in the following two directories:
``join(data_dir, self.mat_prop, "train.csv")``
``join(data_dir, self.mat_prop, "val.csv")``
data_size : int
The number of training datapoints. While not entirely necessary, this is
what mainly causes the need to the data twice. It's difficult to know the
dataset size beforehand without first the data.
extra_train_data, extra_val_data : pd.DataFrame
Extra training and validation data, respectively. These are the feature data
corresponding to the column names in `extend_features`, such as state
variables (e.g. applied load or temperature).
"""
if train_df is None and val_df is None:
use_path = True
else:
use_path = False
if val_df is None:
# val_df gets used for hyperparameter optimization to improve generalizability
train_df, val_df = train_test_split(train_df, test_size=self.val_size)
if use_path:
# Get the datafiles you will learn from
assert self.mat_prop is not None
train_data = join(data_dir, self.mat_prop, "train.csv")
try:
val_data = join(data_dir, self.mat_prop, "val.csv")
except IOError:
print(
"Please ensure you have train (train.csv) and validation data",
f'(val.csv) in folder "data/materials_data/{self.mat_prop}"',
)
train_df_tmp = pd.read_csv(train_data)
val_df_tmp = pd.read_csv(val_data)
data_size = pd.read_csv(train_data).shape[0]
if self.extend_features is not None:
extra_train_data = train_df_tmp[self.extend_features]
extra_val_data = val_df_tmp[self.extend_features]
else:
extra_train_data = None
extra_val_data = None
else:
train_data = train_df
val_data = val_df
if self.extend_features is not None:
extra_train_data = train_df[self.extend_features]
extra_val_data = val_df[self.extend_features]
else:
extra_train_data = None
extra_val_data = None
assert isinstance(train_data, pd.DataFrame)
data_size = train_data.shape[0]
return train_data, val_data, data_size, extra_train_data, extra_val_data
[docs] def save_network(self, model_name: str = None):
"""Save network weights to a ``.pth`` file.
Parameters
----------
model_name : str, optional
The name of the `.pth` file. If None, then use `self.model_name`. By default None
"""
if model_name is None:
model_name = self.model_name
os.makedirs(join("models", "trained_models"), exist_ok=True)
path = join("models", "trained_models", f"{model_name}.pth")
if self.verbose:
print(f"Saving network ({model_name}) to {path}")
assert isinstance(self.model, SubCrab)
self.network = {
"weights": self.model.state_dict(),
"scaler_state": self.scaler.state_dict(),
"model_name": model_name,
}
torch.save(self.network, path)
[docs] def load_network(self, model_data: Union[str, dict]):
"""Load network weights from a ``.pth`` file.
Parameters
----------
model_data : Union[str, Any]
Either the filename of the saved model or the network (see `self.network`)
as a dictionary of the form:
{
"weights": self.model.state_dict(),
"scaler_state": self.scaler.state_dict(),
"model_name": model_name,
}
"""
if type(model_data) is str:
path = join("models", "trained_models", model_data)
network = torch.load(path, map_location=self.compute_device)
else:
network = model_data
assert isinstance(self.model, SubCrab)
base_optim = Lamb(params=self.model.parameters())
optimizer = Lookahead(base_optimizer=base_optim)
self.optimizer = SWA(optimizer)
self.scaler = Scaler(torch.zeros(3))
self.model.load_state_dict(network["weights"])
self.scaler.load_state_dict(network["scaler_state"])
self.model_name = network["model_name"]
[docs] def load_model(
self,
model: Union[str, SubCrab],
data: Union[str, pd.DataFrame],
classification: bool = False,
verbose: bool = True,
):
"""Load a _CrabNet model.
Parameters
----------
model : Union[str, _CrabNet]
The CrabNet model to load or the filename of the saved network.
data : Union[str, pd.DataFrame]
The data to load, which can be the folder in which the ``.csv`` file resides
or a pandas DataFrame.
classification : bool, optional
Whether to perform classification. If False, then assume regression. By
default, False
verbose : bool, optional
Whether model information and progress should be printed, by default True
"""
# Load up a saved network.
if type(model) is str:
self.load_network(model)
# Check if classifcation task
if classification:
self.classification = True
# data is reloaded to self.data_loader
self.load_data(data, batch_size=2**9, train=False)
# %%
if __name__ == "__main__":
pass