Skip to content

VAE (Variational Autoencoder)

bnode_core.nn.vae.vae_architecture

Variational Autoencoder (VAE) architecture for timeseries reconstruction.

This module implements a Variational Autoencoder with parameter conditioning for timeseries data (states and outputs). The architecture supports multiple modes:

  • Standard VAE: Encoder-Decoder with latent space
  • PELS-VAE: Parameter-conditioned VAE with Regressor for mu/logvar prediction
  • Feed-forward NN: Direct mapping from parameters to timeseries (bypasses latent space)

The model can reconstruct timeseries from either the encoder (during training) or from the regressor (during testing/prediction), enabling parameter-conditioned generation.

It is intedend to be used for task that model physical parameters --> complete timeseries, e.g. the transient response of a RC circuit with fixed initial condition on different parameter values R,L,C.

Attention

This documentation is AI generated. Be aware of possible inaccuricies.

Key components:

- Encoder: Maps timeseries (states + outputs) to latent distribution (mu, logvar)
- Decoder: Maps latent samples (and optionally parameters) to reconstructed timeseries
- Regressor: Maps parameters to latent distribution for parameter-conditioned generation
- Normalization: Time-series and parameter normalization layers

Loss function:

loss = mse_loss + beta * kl_loss + regressor_loss
or with capacity scheduling:
loss = mse_loss + gamma * |kl_loss - capacity| + regressor_loss

Encoder

Bases: Module

Encoder network mapping timeseries to latent distribution parameters.

Maps concatenated states and outputs to mean (mu) and log-variance (logvar) of a multivariate Gaussian distribution in latent space. Uses a multi-layer perceptron (MLP) with configurable depth and hidden dimensions.

Architecture:

Flatten -> Linear(n_channels*seq_len, hidden_dim) -> Activation
-> [Linear(hidden_dim, hidden_dim) -> Activation] x (n_layers-2)
-> Linear(hidden_dim, 2*bottleneck_dim) -> Reshape to [mu, logvar]

Attributes:

Name Type Description
bottleneck_dim

Dimensionality of the latent space.

flatten

Flattens input timeseries to 1D.

linear

Sequential MLP mapping flattened input to 2*bottleneck_dim outputs.

Source code in src/bnode_core/nn/vae/vae_architecture.py
class Encoder(nn.Module):
    """Encoder network mapping timeseries to latent distribution parameters.

    Maps concatenated states and outputs to mean (mu) and log-variance (logvar) 
    of a multivariate Gaussian distribution in latent space. Uses a multi-layer 
    perceptron (MLP) with configurable depth and hidden dimensions.

    Architecture:

        Flatten -> Linear(n_channels*seq_len, hidden_dim) -> Activation
        -> [Linear(hidden_dim, hidden_dim) -> Activation] x (n_layers-2)
        -> Linear(hidden_dim, 2*bottleneck_dim) -> Reshape to [mu, logvar]

    Attributes:
        bottleneck_dim: Dimensionality of the latent space.
        flatten: Flattens input timeseries to 1D.
        linear: Sequential MLP mapping flattened input to 2*bottleneck_dim outputs.
    """

    def __init__(self, n_channels: int, seq_len: int, hidden_dim: int, bottleneck_dim: int,
                 activation: nn.Module = nn.ReLU, n_layers: int = 3):
        """Initialize the Encoder network.

        Args:
            n_channels: Number of input channels (states + outputs concatenated).
            seq_len: Length of the timeseries sequence.
            hidden_dim: Number of hidden units in intermediate layers.
            bottleneck_dim: Dimensionality of latent space (output is 2*bottleneck_dim for mu and logvar).
            activation: Activation function class (default: nn.ReLU).
            n_layers: Total number of linear layers (minimum 2, includes input and output layers).
        """
        super().__init__()
        # save dimensions of output
        self.bottleneck_dim = bottleneck_dim

        self.flatten = nn.Flatten()

        # construct MLP
        modules = [
            nn.Linear(n_channels*seq_len, hidden_dim),
            activation(),
        ]
        if n_layers < 2:
            logging.warning('n_layers must be at least 2, setting n_layers to 2')
        for i in range(n_layers-2):
            modules.append(nn.Linear(hidden_dim, hidden_dim))
            modules.append(activation())
        modules.append(nn.Linear(hidden_dim, 2*bottleneck_dim))
        self.linear = nn.Sequential(*modules)  

    def forward(self, x: torch.Tensor) -> Tuple[torch.Tensor, torch.Tensor]:
        """Encode timeseries to latent distribution parameters.

        Args:
            x: Input timeseries tensor of shape (batch, n_channels, seq_len).

        Returns:
            Tuple of (mu, logvar) where:

                - mu: Mean of latent distribution, shape (batch, bottleneck_dim)
                - logvar: Log-variance of latent distribution, shape (batch, bottleneck_dim)
        """
        x = self.flatten(x)
        latent = self.linear(x)
        latent = torch.reshape(latent, (-1, 2, self.bottleneck_dim))
        mu, logvar = latent[:,0], latent[:,1]
        return mu, logvar

__init__(n_channels: int, seq_len: int, hidden_dim: int, bottleneck_dim: int, activation: nn.Module = nn.ReLU, n_layers: int = 3)

Initialize the Encoder network.

Parameters:

Name Type Description Default
n_channels int

Number of input channels (states + outputs concatenated).

required
seq_len int

Length of the timeseries sequence.

required
hidden_dim int

Number of hidden units in intermediate layers.

required
bottleneck_dim int

Dimensionality of latent space (output is 2*bottleneck_dim for mu and logvar).

required
activation Module

Activation function class (default: nn.ReLU).

ReLU
n_layers int

Total number of linear layers (minimum 2, includes input and output layers).

3
Source code in src/bnode_core/nn/vae/vae_architecture.py
def __init__(self, n_channels: int, seq_len: int, hidden_dim: int, bottleneck_dim: int,
             activation: nn.Module = nn.ReLU, n_layers: int = 3):
    """Initialize the Encoder network.

    Args:
        n_channels: Number of input channels (states + outputs concatenated).
        seq_len: Length of the timeseries sequence.
        hidden_dim: Number of hidden units in intermediate layers.
        bottleneck_dim: Dimensionality of latent space (output is 2*bottleneck_dim for mu and logvar).
        activation: Activation function class (default: nn.ReLU).
        n_layers: Total number of linear layers (minimum 2, includes input and output layers).
    """
    super().__init__()
    # save dimensions of output
    self.bottleneck_dim = bottleneck_dim

    self.flatten = nn.Flatten()

    # construct MLP
    modules = [
        nn.Linear(n_channels*seq_len, hidden_dim),
        activation(),
    ]
    if n_layers < 2:
        logging.warning('n_layers must be at least 2, setting n_layers to 2')
    for i in range(n_layers-2):
        modules.append(nn.Linear(hidden_dim, hidden_dim))
        modules.append(activation())
    modules.append(nn.Linear(hidden_dim, 2*bottleneck_dim))
    self.linear = nn.Sequential(*modules)  

forward(x: torch.Tensor) -> Tuple[torch.Tensor, torch.Tensor]

Encode timeseries to latent distribution parameters.

Parameters:

Name Type Description Default
x Tensor

Input timeseries tensor of shape (batch, n_channels, seq_len).

required

Returns:

Type Description
Tuple[Tensor, Tensor]

Tuple of (mu, logvar) where:

  • mu: Mean of latent distribution, shape (batch, bottleneck_dim)
  • logvar: Log-variance of latent distribution, shape (batch, bottleneck_dim)
Source code in src/bnode_core/nn/vae/vae_architecture.py
def forward(self, x: torch.Tensor) -> Tuple[torch.Tensor, torch.Tensor]:
    """Encode timeseries to latent distribution parameters.

    Args:
        x: Input timeseries tensor of shape (batch, n_channels, seq_len).

    Returns:
        Tuple of (mu, logvar) where:

            - mu: Mean of latent distribution, shape (batch, bottleneck_dim)
            - logvar: Log-variance of latent distribution, shape (batch, bottleneck_dim)
    """
    x = self.flatten(x)
    latent = self.linear(x)
    latent = torch.reshape(latent, (-1, 2, self.bottleneck_dim))
    mu, logvar = latent[:,0], latent[:,1]
    return mu, logvar

Decoder

Bases: Module

Decoder network for VAE, generating timeseries from latent vectors.

The decoder maps latent vectors (and optionally system parameters) back to timeseries data. It supports three modes:

  • Standard VAE: z_latent → timeseries
  • PELS-VAE: (z_latent, parameters) → timeseries (params_to_decoder=True)
  • Feed-forward: parameters → timeseries (bottleneck_dim=0, params_to_decoder=True)

Architecture: Linear (latent+params → hidden) → MLP → Linear (hidden → n_channels*seq_len) → Reshape

Attributes:

Name Type Description
channels

Number of output channels in reconstructed timeseries.

seq_len

Length of output timeseries sequence.

params_to_decoder

If True, concatenate normalized parameters to latent vector as decoder input.

param_normalization

Normalization layer for parameters (if params_to_decoder=True).

feed_forward_nn

If True, decoder operates in feed-forward mode (no latent vector).

linear

Sequential MLP mapping latent (+ params) to flattened timeseries.

Source code in src/bnode_core/nn/vae/vae_architecture.py
class Decoder(nn.Module):
    """Decoder network for VAE, generating timeseries from latent vectors.

    The decoder maps latent vectors (and optionally system parameters) back to timeseries
    data. It supports three modes:

    - Standard VAE: z_latent → timeseries
    - PELS-VAE: (z_latent, parameters) → timeseries (params_to_decoder=True)
    - Feed-forward: parameters → timeseries (bottleneck_dim=0, params_to_decoder=True)

    Architecture: Linear (latent+params → hidden) → MLP → Linear (hidden → n_channels*seq_len) → Reshape

    Attributes:
        channels: Number of output channels in reconstructed timeseries.
        seq_len: Length of output timeseries sequence.
        params_to_decoder: If True, concatenate normalized parameters to latent vector as decoder input.
        param_normalization: Normalization layer for parameters (if params_to_decoder=True).
        feed_forward_nn: If True, decoder operates in feed-forward mode (no latent vector).
        linear: Sequential MLP mapping latent (+ params) to flattened timeseries.
    """

    def __init__(self, n_channels: int, seq_len: int, hidden_dim: int, bottleneck_dim: int,
                 activation: nn.Module = nn.ReLU, n_layers: int = 3, params_to_decoder: bool = False, 
                 param_dim: Optional[int] = None):
        """Initialize the Decoder network.

        Args:
            n_channels: Number of output channels in reconstructed timeseries.
            seq_len: Length of output timeseries sequence.
            hidden_dim: Number of hidden units in intermediate layers.
            bottleneck_dim: Dimensionality of latent space input (0 for feed-forward mode).
            activation: Activation function class (default: nn.ReLU).
            n_layers: Total number of linear layers (minimum 2).
            params_to_decoder: If True, concatenate system parameters to latent input (PELS-VAE mode).
            param_dim: Dimensionality of parameter vector (required if params_to_decoder=True).
        """
        super().__init__()

        # save dimensions of output
        self.channels = n_channels
        self.seq_len = seq_len
        self.params_to_decoder = params_to_decoder
        if params_to_decoder:
            assert param_dim is not None, 'param_dim must be specified if params_to_decoder is True'
            self.param_normalization = NormalizationLayer1D(num_features=param_dim)

        self.feed_forward_nn = True if bottleneck_dim == 0 else False
        if self.feed_forward_nn:
            assert params_to_decoder is True, 'params_to_decoder must be True if bottleneck_dim is 0'
        # construct MLP
        modules = [
            nn.Linear(bottleneck_dim if params_to_decoder is False else bottleneck_dim + param_dim, hidden_dim),
            activation(),
        ]
        if n_layers < 2:
            logging.warning('n_layers must be at least 2, setting n_layers to 2')
        for i in range(n_layers-2):
            modules.append(nn.Linear(hidden_dim, hidden_dim))
            modules.append(activation())
        modules.append(nn.Linear(hidden_dim, n_channels*seq_len))
        self.linear = nn.Sequential(*modules)  

    def forward(self, z_latent: torch.Tensor, param: Optional[torch.Tensor] = None) -> torch.Tensor:
        """Decode latent vector (and optionally parameters) to timeseries.

        Args:
            z_latent: Latent vector of shape (batch, bottleneck_dim).
            param: System parameters of shape (batch, param_dim) (required if params_to_decoder=True).

        Returns:
            Reconstructed timeseries tensor of shape (batch, n_channels, seq_len).
        """
        if self.params_to_decoder:
            param = self.param_normalization(param)
            x = self.linear(torch.cat((z_latent, param), dim=1)) if not self.feed_forward_nn else self.linear(param)
        else:
            x = self.linear(z_latent)
        x = torch.reshape(x,(-1, self.channels, self.seq_len))
        return x

__init__(n_channels: int, seq_len: int, hidden_dim: int, bottleneck_dim: int, activation: nn.Module = nn.ReLU, n_layers: int = 3, params_to_decoder: bool = False, param_dim: Optional[int] = None)

Initialize the Decoder network.

Parameters:

Name Type Description Default
n_channels int

Number of output channels in reconstructed timeseries.

required
seq_len int

Length of output timeseries sequence.

required
hidden_dim int

Number of hidden units in intermediate layers.

required
bottleneck_dim int

Dimensionality of latent space input (0 for feed-forward mode).

required
activation Module

Activation function class (default: nn.ReLU).

ReLU
n_layers int

Total number of linear layers (minimum 2).

3
params_to_decoder bool

If True, concatenate system parameters to latent input (PELS-VAE mode).

False
param_dim Optional[int]

Dimensionality of parameter vector (required if params_to_decoder=True).

None
Source code in src/bnode_core/nn/vae/vae_architecture.py
def __init__(self, n_channels: int, seq_len: int, hidden_dim: int, bottleneck_dim: int,
             activation: nn.Module = nn.ReLU, n_layers: int = 3, params_to_decoder: bool = False, 
             param_dim: Optional[int] = None):
    """Initialize the Decoder network.

    Args:
        n_channels: Number of output channels in reconstructed timeseries.
        seq_len: Length of output timeseries sequence.
        hidden_dim: Number of hidden units in intermediate layers.
        bottleneck_dim: Dimensionality of latent space input (0 for feed-forward mode).
        activation: Activation function class (default: nn.ReLU).
        n_layers: Total number of linear layers (minimum 2).
        params_to_decoder: If True, concatenate system parameters to latent input (PELS-VAE mode).
        param_dim: Dimensionality of parameter vector (required if params_to_decoder=True).
    """
    super().__init__()

    # save dimensions of output
    self.channels = n_channels
    self.seq_len = seq_len
    self.params_to_decoder = params_to_decoder
    if params_to_decoder:
        assert param_dim is not None, 'param_dim must be specified if params_to_decoder is True'
        self.param_normalization = NormalizationLayer1D(num_features=param_dim)

    self.feed_forward_nn = True if bottleneck_dim == 0 else False
    if self.feed_forward_nn:
        assert params_to_decoder is True, 'params_to_decoder must be True if bottleneck_dim is 0'
    # construct MLP
    modules = [
        nn.Linear(bottleneck_dim if params_to_decoder is False else bottleneck_dim + param_dim, hidden_dim),
        activation(),
    ]
    if n_layers < 2:
        logging.warning('n_layers must be at least 2, setting n_layers to 2')
    for i in range(n_layers-2):
        modules.append(nn.Linear(hidden_dim, hidden_dim))
        modules.append(activation())
    modules.append(nn.Linear(hidden_dim, n_channels*seq_len))
    self.linear = nn.Sequential(*modules)  

forward(z_latent: torch.Tensor, param: Optional[torch.Tensor] = None) -> torch.Tensor

Decode latent vector (and optionally parameters) to timeseries.

Parameters:

Name Type Description Default
z_latent Tensor

Latent vector of shape (batch, bottleneck_dim).

required
param Optional[Tensor]

System parameters of shape (batch, param_dim) (required if params_to_decoder=True).

None

Returns:

Type Description
Tensor

Reconstructed timeseries tensor of shape (batch, n_channels, seq_len).

Source code in src/bnode_core/nn/vae/vae_architecture.py
def forward(self, z_latent: torch.Tensor, param: Optional[torch.Tensor] = None) -> torch.Tensor:
    """Decode latent vector (and optionally parameters) to timeseries.

    Args:
        z_latent: Latent vector of shape (batch, bottleneck_dim).
        param: System parameters of shape (batch, param_dim) (required if params_to_decoder=True).

    Returns:
        Reconstructed timeseries tensor of shape (batch, n_channels, seq_len).
    """
    if self.params_to_decoder:
        param = self.param_normalization(param)
        x = self.linear(torch.cat((z_latent, param), dim=1)) if not self.feed_forward_nn else self.linear(param)
    else:
        x = self.linear(z_latent)
    x = torch.reshape(x,(-1, self.channels, self.seq_len))
    return x

Regressor

Bases: Module

Regressor network mapping system parameters to latent distribution.

Used in PELS-VAE mode to predict latent distribution parameters (mu, logvar) directly from system parameters, without requiring timeseries input. This allows the VAE to learn relationships between system parameters and latent representations.

Architecture:

Normalize params → Linear (params → hidden) → MLP → Linear (hidden → 2*bottleneck_dim) → Reshape to (mu, logvar)

Attributes:

Name Type Description
bottleneck_dim

Dimensionality of the latent space.

normalization

Normalization layer for input parameters.

linear

Sequential MLP mapping parameters to 2*bottleneck_dim outputs.

Source code in src/bnode_core/nn/vae/vae_architecture.py
class Regressor(nn.Module):
    """Regressor network mapping system parameters to latent distribution.

    Used in PELS-VAE mode to predict latent distribution parameters (mu, logvar) 
    directly from system parameters, without requiring timeseries input. This allows
    the VAE to learn relationships between system parameters and latent representations.

    Architecture: 

        Normalize params → Linear (params → hidden) → MLP → Linear (hidden → 2*bottleneck_dim) → Reshape to (mu, logvar)

    Attributes:
        bottleneck_dim: Dimensionality of the latent space.
        normalization: Normalization layer for input parameters.
        linear: Sequential MLP mapping parameters to 2*bottleneck_dim outputs.
    """

    def __init__(self, parameter_dim: int, hidden_dim: int, 
                 bottleneck_dim: int, activation: nn.Module = nn.ReLU, 
                 n_layers: int = 3):
        """Initialize the Regressor network.

        Args:
            parameter_dim: Dimensionality of input parameter vector.
            hidden_dim: Number of hidden units in intermediate layers.
            bottleneck_dim: Dimensionality of latent space (output is 2*bottleneck_dim for mu and logvar).
            activation: Activation function class (default: nn.ReLU).
            n_layers: Total number of linear layers (minimum 2).
        """
        super().__init__()
        # save dimensions of output
        self.bottleneck_dim = bottleneck_dim

        self.normalization = NormalizationLayer1D(num_features=parameter_dim)

        # construct MLP
        modules = [
            nn.Linear(parameter_dim, hidden_dim),
            activation(),
        ]
        if n_layers < 2:
            logging.warning('n_layers must be at least 2, setting n_layers to 2')
        for i in range(n_layers-2):
            modules.append(nn.Linear(hidden_dim, hidden_dim))
            modules.append(activation())
        modules.append(nn.Linear(hidden_dim, 2*bottleneck_dim))
        self.linear = nn.Sequential(*modules)

    def forward(self, param: torch.Tensor) -> Tuple[torch.Tensor, torch.Tensor]:
        """Predict latent distribution parameters from system parameters.

        Args:
            param: System parameters of shape (batch, parameter_dim).

        Returns:
            Tuple of (mu, logvar) where:
                - mu: Predicted mean of latent distribution, shape (batch, bottleneck_dim)
                - logvar: Predicted log-variance of latent distribution, shape (batch, bottleneck_dim)
        """
        param = self.normalization(param)
        latent = self.linear(param)
        latent = torch.reshape(latent,(-1, 2, self.bottleneck_dim))
        mu, logvar = latent[:,0], latent[:,1]
        return mu, logvar

__init__(parameter_dim: int, hidden_dim: int, bottleneck_dim: int, activation: nn.Module = nn.ReLU, n_layers: int = 3)

Initialize the Regressor network.

Parameters:

Name Type Description Default
parameter_dim int

Dimensionality of input parameter vector.

required
hidden_dim int

Number of hidden units in intermediate layers.

required
bottleneck_dim int

Dimensionality of latent space (output is 2*bottleneck_dim for mu and logvar).

required
activation Module

Activation function class (default: nn.ReLU).

ReLU
n_layers int

Total number of linear layers (minimum 2).

3
Source code in src/bnode_core/nn/vae/vae_architecture.py
def __init__(self, parameter_dim: int, hidden_dim: int, 
             bottleneck_dim: int, activation: nn.Module = nn.ReLU, 
             n_layers: int = 3):
    """Initialize the Regressor network.

    Args:
        parameter_dim: Dimensionality of input parameter vector.
        hidden_dim: Number of hidden units in intermediate layers.
        bottleneck_dim: Dimensionality of latent space (output is 2*bottleneck_dim for mu and logvar).
        activation: Activation function class (default: nn.ReLU).
        n_layers: Total number of linear layers (minimum 2).
    """
    super().__init__()
    # save dimensions of output
    self.bottleneck_dim = bottleneck_dim

    self.normalization = NormalizationLayer1D(num_features=parameter_dim)

    # construct MLP
    modules = [
        nn.Linear(parameter_dim, hidden_dim),
        activation(),
    ]
    if n_layers < 2:
        logging.warning('n_layers must be at least 2, setting n_layers to 2')
    for i in range(n_layers-2):
        modules.append(nn.Linear(hidden_dim, hidden_dim))
        modules.append(activation())
    modules.append(nn.Linear(hidden_dim, 2*bottleneck_dim))
    self.linear = nn.Sequential(*modules)

forward(param: torch.Tensor) -> Tuple[torch.Tensor, torch.Tensor]

Predict latent distribution parameters from system parameters.

Parameters:

Name Type Description Default
param Tensor

System parameters of shape (batch, parameter_dim).

required

Returns:

Type Description
Tuple[Tensor, Tensor]

Tuple of (mu, logvar) where: - mu: Predicted mean of latent distribution, shape (batch, bottleneck_dim) - logvar: Predicted log-variance of latent distribution, shape (batch, bottleneck_dim)

Source code in src/bnode_core/nn/vae/vae_architecture.py
def forward(self, param: torch.Tensor) -> Tuple[torch.Tensor, torch.Tensor]:
    """Predict latent distribution parameters from system parameters.

    Args:
        param: System parameters of shape (batch, parameter_dim).

    Returns:
        Tuple of (mu, logvar) where:
            - mu: Predicted mean of latent distribution, shape (batch, bottleneck_dim)
            - logvar: Predicted log-variance of latent distribution, shape (batch, bottleneck_dim)
    """
    param = self.normalization(param)
    latent = self.linear(param)
    latent = torch.reshape(latent,(-1, 2, self.bottleneck_dim))
    mu, logvar = latent[:,0], latent[:,1]
    return mu, logvar

VAE

Bases: Module

Variational Autoencoder for timeseries modeling with parameter conditioning.

This class implements three operational modes:

  1. Standard VAE: Encodes timeseries to latent space, decodes back to timeseries. Uses both Encoder and Regressor to predict latent distributions.
  2. PELS-VAE (params_to_decoder=True): Decoder receives both latent vector and system parameters, allowing parameter-conditioned reconstruction.
  3. Feed-forward NN (feed_forward_nn=True): Bypasses latent space entirely, directly mapping parameters to timeseries outputs.

The model jointly trains:

  • Encoder: timeseries → (mu_encoder, logvar_encoder)
  • Regressor: parameters → (mu_regressor, logvar_regressor)
  • Decoder: latent vector (+ params) → timeseries

During training, reconstruction uses Encoder's latent distribution. During prediction, reconstruction uses Regressor's latent distribution.

Attributes:

Name Type Description
n_channels

Total number of channels (n_states + n_outputs).

n_states

Number of state channels.

n_outputs

Number of output channels.

timeseries_normalization

Normalization layer for timeseries data.

feed_forward_nn

If True, operates in feed-forward mode (no latent space).

Regressor

Parameter-to-latent network (if not feed_forward_nn).

Encoder

Timeseries-to-latent network (if not feed_forward_nn).

Decoder

Latent-to-timeseries network.

Source code in src/bnode_core/nn/vae/vae_architecture.py
class VAE(nn.Module):
    """Variational Autoencoder for timeseries modeling with parameter conditioning.

    This class implements three operational modes:

    1. **Standard VAE**: Encodes timeseries to latent space, decodes back to timeseries.
       Uses both Encoder and Regressor to predict latent distributions.
    2. **PELS-VAE** (params_to_decoder=True): Decoder receives both latent vector and 
       system parameters, allowing parameter-conditioned reconstruction.
    3. **Feed-forward NN** (feed_forward_nn=True): Bypasses latent space entirely,
       directly mapping parameters to timeseries outputs.

    The model jointly trains:

    - Encoder: timeseries → (mu_encoder, logvar_encoder)
    - Regressor: parameters → (mu_regressor, logvar_regressor)
    - Decoder: latent vector (+ params) → timeseries

    During training, reconstruction uses Encoder's latent distribution.
    During prediction, reconstruction uses Regressor's latent distribution.

    Attributes:
        n_channels: Total number of channels (n_states + n_outputs).
        n_states: Number of state channels.
        n_outputs: Number of output channels.
        timeseries_normalization: Normalization layer for timeseries data.
        feed_forward_nn: If True, operates in feed-forward mode (no latent space).
        Regressor: Parameter-to-latent network (if not feed_forward_nn).
        Encoder: Timeseries-to-latent network (if not feed_forward_nn).
        Decoder: Latent-to-timeseries network.
    """

    def __init__(self, n_states: int, n_outputs: int, seq_len: int, parameter_dim: int, 
                 hidden_dim: int, bottleneck_dim: int, activation: nn.Module = nn.ReLU, 
                 n_layers: int = 3, params_to_decoder: bool = False, feed_forward_nn: bool = False):
        """Initialize the VAE model.

        Args:
            n_states: Number of state channels in timeseries.
            n_outputs: Number of output channels in timeseries.
            seq_len: Length of timeseries sequence.
            parameter_dim: Dimensionality of system parameters.
            hidden_dim: Number of hidden units in all sub-networks.
            bottleneck_dim: Dimensionality of latent space.
            activation: Activation function class (default: nn.ReLU).
            n_layers: Number of layers in all sub-networks (minimum 2).
            params_to_decoder: If True, decoder receives parameters as additional input (PELS-VAE mode).
            feed_forward_nn: If True, operate in feed-forward mode without latent space.
        """
        if feed_forward_nn is True:
            if params_to_decoder is False:
                Warning('params_to_decoder is set to False, but feed_forward_nn is set to True. Setting params_to_decoder to True')
        super().__init__()
        self.n_channels = n_states + n_outputs
        self.n_states = n_states
        self.n_outputs = n_outputs
        self.timeseries_normalization = NormalizationLayerTimeSeries(n_channels=self.n_channels)
        self.feed_forward_nn = feed_forward_nn

        if feed_forward_nn is False:
            self.Regressor = Regressor(parameter_dim, hidden_dim, bottleneck_dim, activation, n_layers)
            self.Encoder = Encoder(self.n_channels, seq_len, hidden_dim,
                                bottleneck_dim, activation, n_layers)
            self.Decoder = Decoder(self.n_channels, seq_len, hidden_dim,
                                bottleneck_dim, activation, n_layers,
                                params_to_decoder, parameter_dim)
        else:
            _bottleneck_dim = 0
            _params_to_decoder = True
            self.Decoder = Decoder(self.n_channels, seq_len, hidden_dim,
                                   _bottleneck_dim, activation, n_layers,
                                      _params_to_decoder, parameter_dim)
        logging.info('VAE with n_channels = {}, seq_len = {}, parameter_dim = {}, \
                     hidden_dim = {}, bottleneck_dim = {}, activation = {}, n_layers = {}, params to decoder: {}'.format(
                         self.n_channels, seq_len, parameter_dim, hidden_dim, bottleneck_dim, activation, n_layers, self.Decoder.params_to_decoder))
        logging.info('VAE initialized with {} parameters'.format(count_parameters(self)))

    def reparametrize(self, mu: torch.Tensor, logvar: torch.Tensor) -> torch.Tensor:
        """Apply reparametrization trick to sample from latent distribution.

        Samples z ~ N(mu, exp(0.5 * logvar)) using z = mu + eps * std, where eps ~ N(0, I).
        This allows backpropagation through the sampling operation.

        Args:
            mu: Mean of latent distribution, shape (batch, bottleneck_dim).
            logvar: Log-variance of latent distribution, shape (batch, bottleneck_dim).


        Returns:
            Sampled latent vector z of shape (batch, bottleneck_dim).
        """
        # if device.type == 'cuda':
        #     eps = torch.autograd.Variable(torch.cuda.FloatTensor(mu.shape).normal_())
        # else: 
        #     eps = torch.autograd.Variable(torch.FloatTensor(mu.shape).normal_())
        eps = torch.randn_like(mu, device=mu.device)
        std = logvar.mul(0.5).exp()
        z_latent = eps.mul(std).add_(mu)
        return z_latent

    def forward(
        self, 
        states: torch.Tensor, 
        outputs: torch.Tensor, 
        params: torch.Tensor, 
        train: bool = True, 
        predict: bool = False, 
        n_passes: int = 1, 
        test_with_zero_eps: bool = False, 
        device: Optional[torch.device] = None
    ) -> Tuple:
        """Perform forward pass through the VAE network.

        Three operational modes based on flags:

        1. Training (train=True, predict=False): Encode timeseries, reconstruct using Encoder's latent distribution
        2. Testing (train=False, predict=False): Encode timeseries, reconstruct using Regressor's latent distribution
        3. Prediction (predict=True, train=False): Skip Encoder, reconstruct using Regressor's latent distribution only

        Args:
            states: State timeseries of shape (batch, n_states, seq_len).
            outputs: Output timeseries of shape (batch, n_outputs, seq_len).
            params: System parameters of shape (batch, parameter_dim).
            train: If True, use Encoder's latent distribution for reconstruction.
            predict: If True, bypass Encoder and reconstruct from parameters only.
            n_passes: Number of decoder passes to average (for stochastic predictions).
            test_with_zero_eps: If True during testing, use mu directly (zero variance sampling).
            device: Device for tensor operations.

        Returns:
            Tuple of (x, x_hat, states_hat, outputs_hat, mu_encoder, logvar_encoder, 
                     mu_regressor, logvar_regressor, retvals_norm) where:

                - x: Concatenated input timeseries (states + outputs)
                - x_hat: Reconstructed timeseries
                - states_hat: Reconstructed states
                - outputs_hat: Reconstructed outputs
                - mu_encoder: Encoder's predicted latent mean
                - logvar_encoder: Encoder's predicted latent log-variance
                - mu_regressor: Regressor's predicted latent mean
                - logvar_regressor: Regressor's predicted latent log-variance
                - retvals_norm: Dictionary of normalized versions of above tensors
        """
        if self.feed_forward_nn is False:
            if predict:
                assert not train, 'predict and train cannot be true at the same time'
            else:
                # concatenate states and outputs
                x = torch.cat((states, outputs), dim=1)
                x_norm = self.timeseries_normalization(x)
                states_norm = x_norm[:,:self.n_states]
                outputs_norm = x_norm[:,self.n_states:]
                mu_encoder, logvar_encoder = self.Encoder(x_norm)
            mu_regressor, logvar_regressor = self.Regressor(params)
            # assign mu, logvar based on if train or not
            if train:
                mu = mu_encoder
                logvar = logvar_encoder
            else:
                mu = mu_regressor
                logvar = logvar_regressor
            # if predict, we need some dummy values for mu_encoder and logvar_encoder
            if predict:
                mu_encoder = torch.ones_like(mu_encoder, device=device) * np.inf
                logvar_encoder = torch.ones_like(logvar_encoder, device=device) * np.inf
            # perform multiple passes through decoder
            x_pass = []
            x_pass_norm = []
            for _ in range(n_passes):
                if train or not test_with_zero_eps:
                    z_latent = self.reparametrize(mu, logvar)
                else:
                    z_latent = mu
                if self.Decoder.params_to_decoder:
                    x_i_hat_norm = self.Decoder(z_latent, params)
                else:
                    x_i_hat_norm = self.Decoder(z_latent)
                x_i_hat = self.timeseries_normalization(x_i_hat_norm, denormalize = True)
                x_pass.append(x_i_hat)
                x_pass_norm.append(x_i_hat_norm)
            # stack along new dimension 1 and take mean along that dimension
            x_hat = torch.stack(x_pass, dim=0).mean(dim=0)
            x_hat_norm = torch.stack(x_pass_norm, dim=0).mean(dim=0)
            # unpack x
            states_hat, outputs_hat = torch.split(x_hat, [self.n_states, self.n_outputs], dim=1)
            # unpack x_norm
            states_hat_norm, outputs_hat_norm = torch.split(x_hat_norm, [self.n_states, self.n_outputs], dim=1)
            retvals_norm = {
                'x': x_norm,
                'x_hat': x_hat_norm,
                'states': states_norm,
                'outputs': outputs_norm,
                'states_hat': states_hat_norm,
                'outputs_hat': outputs_hat_norm,
            }
        else:
            x = torch.cat((states, outputs), dim=1)
            x_norm = self.timeseries_normalization(x)
            states_norm = x_norm[:,:self.n_states]
            outputs_norm = x_norm[:,self.n_states:]
            x_hat_norm = self.Decoder(None, params)
            x_hat = self.timeseries_normalization(x_hat_norm, denormalize = True)
             # unpack x
            states_hat, outputs_hat = torch.split(x_hat, [self.n_states, self.n_outputs], dim=1)
            # unpack x_norm
            states_hat_norm, outputs_hat_norm = torch.split(x_hat_norm, [self.n_states, self.n_outputs], dim=1)
            retvals_norm = {
                'x': x_norm,
                'x_hat': x_hat_norm,
                'states': states_norm,
                'outputs': outputs_norm,
                'states_hat': states_hat_norm,
                'outputs_hat': outputs_hat_norm,
            }
            mu_encoder = torch.ones_like(states_hat_norm, device=device) * np.inf
            logvar_encoder = torch.ones_like(states_hat_norm, device=device) * np.inf
            mu_regressor = torch.ones_like(states_hat_norm, device=device) * np.inf
            logvar_regressor = torch.ones_like(states_hat_norm, device=device) * np.inf

        return x, x_hat, states_hat, outputs_hat, mu_encoder, logvar_encoder, mu_regressor, logvar_regressor, retvals_norm

    def predict(self, param: torch.Tensor) -> Tuple[torch.Tensor, torch.Tensor]:
        """Generate timeseries predictions from system parameters only.

        Convenience method for inference mode. Bypasses Encoder and generates
        predictions using only Regressor and Decoder.

        Args:
            param: System parameters of shape (batch, parameter_dim).

        Returns:
            Same as forward() method with predict=True.
        """
        return self.forward(states=None, outputs=None, params=param, train=False, predict=True)

    def save(self, path: Path):
        """Save model state dictionary to disk.

        Args:
            path: Path to save the model weights. Parent directories are created if needed.
        """
        if not path.parent.exists():
            path.parent.mkdir(parents=True)
        torch.save(self.state_dict(), path)
        logging.info('\t \t \tSaved model to {}'.format(path))

    def load(self, path: Path, device: Optional[torch.device] = None):
        """Load model state dictionary from disk.

        Args:
            path: Path to the saved model weights.
            device: Device to map the loaded weights to (e.g., 'cpu', 'cuda').
        """
        self.load_state_dict(torch.load(path, map_location=device))
        logging.info('\tLoaded model from {}'.format(path))

__init__(n_states: int, n_outputs: int, seq_len: int, parameter_dim: int, hidden_dim: int, bottleneck_dim: int, activation: nn.Module = nn.ReLU, n_layers: int = 3, params_to_decoder: bool = False, feed_forward_nn: bool = False)

Initialize the VAE model.

Parameters:

Name Type Description Default
n_states int

Number of state channels in timeseries.

required
n_outputs int

Number of output channels in timeseries.

required
seq_len int

Length of timeseries sequence.

required
parameter_dim int

Dimensionality of system parameters.

required
hidden_dim int

Number of hidden units in all sub-networks.

required
bottleneck_dim int

Dimensionality of latent space.

required
activation Module

Activation function class (default: nn.ReLU).

ReLU
n_layers int

Number of layers in all sub-networks (minimum 2).

3
params_to_decoder bool

If True, decoder receives parameters as additional input (PELS-VAE mode).

False
feed_forward_nn bool

If True, operate in feed-forward mode without latent space.

False
Source code in src/bnode_core/nn/vae/vae_architecture.py
def __init__(self, n_states: int, n_outputs: int, seq_len: int, parameter_dim: int, 
             hidden_dim: int, bottleneck_dim: int, activation: nn.Module = nn.ReLU, 
             n_layers: int = 3, params_to_decoder: bool = False, feed_forward_nn: bool = False):
    """Initialize the VAE model.

    Args:
        n_states: Number of state channels in timeseries.
        n_outputs: Number of output channels in timeseries.
        seq_len: Length of timeseries sequence.
        parameter_dim: Dimensionality of system parameters.
        hidden_dim: Number of hidden units in all sub-networks.
        bottleneck_dim: Dimensionality of latent space.
        activation: Activation function class (default: nn.ReLU).
        n_layers: Number of layers in all sub-networks (minimum 2).
        params_to_decoder: If True, decoder receives parameters as additional input (PELS-VAE mode).
        feed_forward_nn: If True, operate in feed-forward mode without latent space.
    """
    if feed_forward_nn is True:
        if params_to_decoder is False:
            Warning('params_to_decoder is set to False, but feed_forward_nn is set to True. Setting params_to_decoder to True')
    super().__init__()
    self.n_channels = n_states + n_outputs
    self.n_states = n_states
    self.n_outputs = n_outputs
    self.timeseries_normalization = NormalizationLayerTimeSeries(n_channels=self.n_channels)
    self.feed_forward_nn = feed_forward_nn

    if feed_forward_nn is False:
        self.Regressor = Regressor(parameter_dim, hidden_dim, bottleneck_dim, activation, n_layers)
        self.Encoder = Encoder(self.n_channels, seq_len, hidden_dim,
                            bottleneck_dim, activation, n_layers)
        self.Decoder = Decoder(self.n_channels, seq_len, hidden_dim,
                            bottleneck_dim, activation, n_layers,
                            params_to_decoder, parameter_dim)
    else:
        _bottleneck_dim = 0
        _params_to_decoder = True
        self.Decoder = Decoder(self.n_channels, seq_len, hidden_dim,
                               _bottleneck_dim, activation, n_layers,
                                  _params_to_decoder, parameter_dim)
    logging.info('VAE with n_channels = {}, seq_len = {}, parameter_dim = {}, \
                 hidden_dim = {}, bottleneck_dim = {}, activation = {}, n_layers = {}, params to decoder: {}'.format(
                     self.n_channels, seq_len, parameter_dim, hidden_dim, bottleneck_dim, activation, n_layers, self.Decoder.params_to_decoder))
    logging.info('VAE initialized with {} parameters'.format(count_parameters(self)))

reparametrize(mu: torch.Tensor, logvar: torch.Tensor) -> torch.Tensor

Apply reparametrization trick to sample from latent distribution.

Samples z ~ N(mu, exp(0.5 * logvar)) using z = mu + eps * std, where eps ~ N(0, I). This allows backpropagation through the sampling operation.

Parameters:

Name Type Description Default
mu Tensor

Mean of latent distribution, shape (batch, bottleneck_dim).

required
logvar Tensor

Log-variance of latent distribution, shape (batch, bottleneck_dim).

required

Returns:

Type Description
Tensor

Sampled latent vector z of shape (batch, bottleneck_dim).

Source code in src/bnode_core/nn/vae/vae_architecture.py
def reparametrize(self, mu: torch.Tensor, logvar: torch.Tensor) -> torch.Tensor:
    """Apply reparametrization trick to sample from latent distribution.

    Samples z ~ N(mu, exp(0.5 * logvar)) using z = mu + eps * std, where eps ~ N(0, I).
    This allows backpropagation through the sampling operation.

    Args:
        mu: Mean of latent distribution, shape (batch, bottleneck_dim).
        logvar: Log-variance of latent distribution, shape (batch, bottleneck_dim).


    Returns:
        Sampled latent vector z of shape (batch, bottleneck_dim).
    """
    # if device.type == 'cuda':
    #     eps = torch.autograd.Variable(torch.cuda.FloatTensor(mu.shape).normal_())
    # else: 
    #     eps = torch.autograd.Variable(torch.FloatTensor(mu.shape).normal_())
    eps = torch.randn_like(mu, device=mu.device)
    std = logvar.mul(0.5).exp()
    z_latent = eps.mul(std).add_(mu)
    return z_latent

forward(states: torch.Tensor, outputs: torch.Tensor, params: torch.Tensor, train: bool = True, predict: bool = False, n_passes: int = 1, test_with_zero_eps: bool = False, device: Optional[torch.device] = None) -> Tuple

Perform forward pass through the VAE network.

Three operational modes based on flags:

  1. Training (train=True, predict=False): Encode timeseries, reconstruct using Encoder's latent distribution
  2. Testing (train=False, predict=False): Encode timeseries, reconstruct using Regressor's latent distribution
  3. Prediction (predict=True, train=False): Skip Encoder, reconstruct using Regressor's latent distribution only

Parameters:

Name Type Description Default
states Tensor

State timeseries of shape (batch, n_states, seq_len).

required
outputs Tensor

Output timeseries of shape (batch, n_outputs, seq_len).

required
params Tensor

System parameters of shape (batch, parameter_dim).

required
train bool

If True, use Encoder's latent distribution for reconstruction.

True
predict bool

If True, bypass Encoder and reconstruct from parameters only.

False
n_passes int

Number of decoder passes to average (for stochastic predictions).

1
test_with_zero_eps bool

If True during testing, use mu directly (zero variance sampling).

False
device Optional[device]

Device for tensor operations.

None

Returns:

Type Description
Tuple

Tuple of (x, x_hat, states_hat, outputs_hat, mu_encoder, logvar_encoder, mu_regressor, logvar_regressor, retvals_norm) where:

  • x: Concatenated input timeseries (states + outputs)
  • x_hat: Reconstructed timeseries
  • states_hat: Reconstructed states
  • outputs_hat: Reconstructed outputs
  • mu_encoder: Encoder's predicted latent mean
  • logvar_encoder: Encoder's predicted latent log-variance
  • mu_regressor: Regressor's predicted latent mean
  • logvar_regressor: Regressor's predicted latent log-variance
  • retvals_norm: Dictionary of normalized versions of above tensors
Source code in src/bnode_core/nn/vae/vae_architecture.py
def forward(
    self, 
    states: torch.Tensor, 
    outputs: torch.Tensor, 
    params: torch.Tensor, 
    train: bool = True, 
    predict: bool = False, 
    n_passes: int = 1, 
    test_with_zero_eps: bool = False, 
    device: Optional[torch.device] = None
) -> Tuple:
    """Perform forward pass through the VAE network.

    Three operational modes based on flags:

    1. Training (train=True, predict=False): Encode timeseries, reconstruct using Encoder's latent distribution
    2. Testing (train=False, predict=False): Encode timeseries, reconstruct using Regressor's latent distribution
    3. Prediction (predict=True, train=False): Skip Encoder, reconstruct using Regressor's latent distribution only

    Args:
        states: State timeseries of shape (batch, n_states, seq_len).
        outputs: Output timeseries of shape (batch, n_outputs, seq_len).
        params: System parameters of shape (batch, parameter_dim).
        train: If True, use Encoder's latent distribution for reconstruction.
        predict: If True, bypass Encoder and reconstruct from parameters only.
        n_passes: Number of decoder passes to average (for stochastic predictions).
        test_with_zero_eps: If True during testing, use mu directly (zero variance sampling).
        device: Device for tensor operations.

    Returns:
        Tuple of (x, x_hat, states_hat, outputs_hat, mu_encoder, logvar_encoder, 
                 mu_regressor, logvar_regressor, retvals_norm) where:

            - x: Concatenated input timeseries (states + outputs)
            - x_hat: Reconstructed timeseries
            - states_hat: Reconstructed states
            - outputs_hat: Reconstructed outputs
            - mu_encoder: Encoder's predicted latent mean
            - logvar_encoder: Encoder's predicted latent log-variance
            - mu_regressor: Regressor's predicted latent mean
            - logvar_regressor: Regressor's predicted latent log-variance
            - retvals_norm: Dictionary of normalized versions of above tensors
    """
    if self.feed_forward_nn is False:
        if predict:
            assert not train, 'predict and train cannot be true at the same time'
        else:
            # concatenate states and outputs
            x = torch.cat((states, outputs), dim=1)
            x_norm = self.timeseries_normalization(x)
            states_norm = x_norm[:,:self.n_states]
            outputs_norm = x_norm[:,self.n_states:]
            mu_encoder, logvar_encoder = self.Encoder(x_norm)
        mu_regressor, logvar_regressor = self.Regressor(params)
        # assign mu, logvar based on if train or not
        if train:
            mu = mu_encoder
            logvar = logvar_encoder
        else:
            mu = mu_regressor
            logvar = logvar_regressor
        # if predict, we need some dummy values for mu_encoder and logvar_encoder
        if predict:
            mu_encoder = torch.ones_like(mu_encoder, device=device) * np.inf
            logvar_encoder = torch.ones_like(logvar_encoder, device=device) * np.inf
        # perform multiple passes through decoder
        x_pass = []
        x_pass_norm = []
        for _ in range(n_passes):
            if train or not test_with_zero_eps:
                z_latent = self.reparametrize(mu, logvar)
            else:
                z_latent = mu
            if self.Decoder.params_to_decoder:
                x_i_hat_norm = self.Decoder(z_latent, params)
            else:
                x_i_hat_norm = self.Decoder(z_latent)
            x_i_hat = self.timeseries_normalization(x_i_hat_norm, denormalize = True)
            x_pass.append(x_i_hat)
            x_pass_norm.append(x_i_hat_norm)
        # stack along new dimension 1 and take mean along that dimension
        x_hat = torch.stack(x_pass, dim=0).mean(dim=0)
        x_hat_norm = torch.stack(x_pass_norm, dim=0).mean(dim=0)
        # unpack x
        states_hat, outputs_hat = torch.split(x_hat, [self.n_states, self.n_outputs], dim=1)
        # unpack x_norm
        states_hat_norm, outputs_hat_norm = torch.split(x_hat_norm, [self.n_states, self.n_outputs], dim=1)
        retvals_norm = {
            'x': x_norm,
            'x_hat': x_hat_norm,
            'states': states_norm,
            'outputs': outputs_norm,
            'states_hat': states_hat_norm,
            'outputs_hat': outputs_hat_norm,
        }
    else:
        x = torch.cat((states, outputs), dim=1)
        x_norm = self.timeseries_normalization(x)
        states_norm = x_norm[:,:self.n_states]
        outputs_norm = x_norm[:,self.n_states:]
        x_hat_norm = self.Decoder(None, params)
        x_hat = self.timeseries_normalization(x_hat_norm, denormalize = True)
         # unpack x
        states_hat, outputs_hat = torch.split(x_hat, [self.n_states, self.n_outputs], dim=1)
        # unpack x_norm
        states_hat_norm, outputs_hat_norm = torch.split(x_hat_norm, [self.n_states, self.n_outputs], dim=1)
        retvals_norm = {
            'x': x_norm,
            'x_hat': x_hat_norm,
            'states': states_norm,
            'outputs': outputs_norm,
            'states_hat': states_hat_norm,
            'outputs_hat': outputs_hat_norm,
        }
        mu_encoder = torch.ones_like(states_hat_norm, device=device) * np.inf
        logvar_encoder = torch.ones_like(states_hat_norm, device=device) * np.inf
        mu_regressor = torch.ones_like(states_hat_norm, device=device) * np.inf
        logvar_regressor = torch.ones_like(states_hat_norm, device=device) * np.inf

    return x, x_hat, states_hat, outputs_hat, mu_encoder, logvar_encoder, mu_regressor, logvar_regressor, retvals_norm

predict(param: torch.Tensor) -> Tuple[torch.Tensor, torch.Tensor]

Generate timeseries predictions from system parameters only.

Convenience method for inference mode. Bypasses Encoder and generates predictions using only Regressor and Decoder.

Parameters:

Name Type Description Default
param Tensor

System parameters of shape (batch, parameter_dim).

required

Returns:

Type Description
Tuple[Tensor, Tensor]

Same as forward() method with predict=True.

Source code in src/bnode_core/nn/vae/vae_architecture.py
def predict(self, param: torch.Tensor) -> Tuple[torch.Tensor, torch.Tensor]:
    """Generate timeseries predictions from system parameters only.

    Convenience method for inference mode. Bypasses Encoder and generates
    predictions using only Regressor and Decoder.

    Args:
        param: System parameters of shape (batch, parameter_dim).

    Returns:
        Same as forward() method with predict=True.
    """
    return self.forward(states=None, outputs=None, params=param, train=False, predict=True)

save(path: Path)

Save model state dictionary to disk.

Parameters:

Name Type Description Default
path Path

Path to save the model weights. Parent directories are created if needed.

required
Source code in src/bnode_core/nn/vae/vae_architecture.py
def save(self, path: Path):
    """Save model state dictionary to disk.

    Args:
        path: Path to save the model weights. Parent directories are created if needed.
    """
    if not path.parent.exists():
        path.parent.mkdir(parents=True)
    torch.save(self.state_dict(), path)
    logging.info('\t \t \tSaved model to {}'.format(path))

load(path: Path, device: Optional[torch.device] = None)

Load model state dictionary from disk.

Parameters:

Name Type Description Default
path Path

Path to the saved model weights.

required
device Optional[device]

Device to map the loaded weights to (e.g., 'cpu', 'cuda').

None
Source code in src/bnode_core/nn/vae/vae_architecture.py
def load(self, path: Path, device: Optional[torch.device] = None):
    """Load model state dictionary from disk.

    Args:
        path: Path to the saved model weights.
        device: Device to map the loaded weights to (e.g., 'cpu', 'cuda').
    """
    self.load_state_dict(torch.load(path, map_location=device))
    logging.info('\tLoaded model from {}'.format(path))

loss_function(x: torch.Tensor, x_hat: torch.Tensor, mu: torch.Tensor, mu_hat: torch.Tensor, logvar: torch.Tensor, logvar_hat: torch.Tensor, beta: float = 1.0, gamma: float = 1000.0, capacity: Optional[float] = None, reduce: bool = True, device: Optional[torch.device] = None) -> Tuple[torch.Tensor, torch.Tensor, torch.Tensor, torch.Tensor]

Compute composite loss function for VAE training.

Implements the PELS-VAE loss combining reconstruction, KL divergence, and regressor losses. Supports two modes:

  1. Standard β-VAE: loss = mse_loss + β * kl_loss + regressor_loss
  2. Capacity-constrained: loss = mse_loss + γ * |kl_loss - capacity| + regressor_loss

The regressor loss ensures that the Regressor's predicted latent distribution matches the Encoder's latent distribution, enabling parameter-to-latent predictions.

Parameters:

Name Type Description Default
x Tensor

Original timeseries (normalized), shape (batch, n_channels, seq_len).

required
x_hat Tensor

Reconstructed timeseries (normalized), shape (batch, n_channels, seq_len).

required
mu Tensor

Encoder's latent mean, shape (batch, bottleneck_dim).

required
mu_hat Tensor

Regressor's latent mean, shape (batch, bottleneck_dim).

required
logvar Tensor

Encoder's latent log-variance, shape (batch, bottleneck_dim).

required
logvar_hat Tensor

Regressor's latent log-variance, shape (batch, bottleneck_dim).

required
beta float

Weight for KL divergence term (ignored if capacity is not None).

1.0
gamma float

Weight for capacity constraint term (used only if capacity is not None).

1000.0
capacity Optional[float]

Target KL divergence capacity. If None, uses standard β-VAE loss.

None
reduce bool

If True, return scalar losses. If False, return per-sample losses.

True
device Optional[device]

Device for tensor operations.

None

Returns:

Type Description
Tuple[Tensor, Tensor, Tensor, Tensor]

Tuple of (loss, mse_loss, kl_loss, regressor_loss) where:

  • loss: Total loss (inf if reduce=False)
  • mse_loss: Mean squared error between x and x_hat
  • kl_loss: KL divergence KL(N(mu, exp(logvar)) || N(0, I))
  • regressor_loss: MSE between (mu, logvar) and (mu_hat, logvar_hat)
Notes

The capacity constraint encourages the model to use exactly 'capacity' nats of information in the latent space, preventing posterior collapse or over-regularization.

Source code in src/bnode_core/nn/vae/vae_architecture.py
def loss_function(
    x: torch.Tensor, 
    x_hat: torch.Tensor, 
    mu: torch.Tensor, 
    mu_hat: torch.Tensor, 
    logvar: torch.Tensor, 
    logvar_hat: torch.Tensor,
    beta: float = 1.0, 
    gamma: float = 1000.0, 
    capacity: Optional[float] = None,
    reduce: bool = True,
    device: Optional[torch.device] = None
) -> Tuple[torch.Tensor, torch.Tensor, torch.Tensor, torch.Tensor]:
    """Compute composite loss function for VAE training.

    Implements the PELS-VAE loss combining reconstruction, KL divergence, and regressor losses.
    Supports two modes:

    1. Standard β-VAE: loss = mse_loss + β * kl_loss + regressor_loss
    2. Capacity-constrained: loss = mse_loss + γ * |kl_loss - capacity| + regressor_loss

    The regressor loss ensures that the Regressor's predicted latent distribution
    matches the Encoder's latent distribution, enabling parameter-to-latent predictions.

    Args:
        x: Original timeseries (normalized), shape (batch, n_channels, seq_len).
        x_hat: Reconstructed timeseries (normalized), shape (batch, n_channels, seq_len).
        mu: Encoder's latent mean, shape (batch, bottleneck_dim).
        mu_hat: Regressor's latent mean, shape (batch, bottleneck_dim).
        logvar: Encoder's latent log-variance, shape (batch, bottleneck_dim).
        logvar_hat: Regressor's latent log-variance, shape (batch, bottleneck_dim).
        beta: Weight for KL divergence term (ignored if capacity is not None).
        gamma: Weight for capacity constraint term (used only if capacity is not None).
        capacity: Target KL divergence capacity. If None, uses standard β-VAE loss.
        reduce: If True, return scalar losses. If False, return per-sample losses.
        device: Device for tensor operations.

    Returns:
        Tuple of (loss, mse_loss, kl_loss, regressor_loss) where:

            - loss: Total loss (inf if reduce=False)
            - mse_loss: Mean squared error between x and x_hat
            - kl_loss: KL divergence KL(N(mu, exp(logvar)) || N(0, I))
            - regressor_loss: MSE between (mu, logvar) and (mu_hat, logvar_hat)

    Notes:
        The capacity constraint encourages the model to use exactly 'capacity' nats
        of information in the latent space, preventing posterior collapse or over-regularization.
    """
    mse = nn.MSELoss(reduction='mean' if reduce else 'none')
    mse_loss = mse(x_hat, x)
    kl_loss = kullback_leibler(mu, logvar, per_dimension=not reduce, reduce=reduce)
    regressor_loss = mse(mu_hat, mu) + mse(logvar_hat, logvar)
    if reduce:
        if capacity is None:
            loss = mse_loss + beta * kl_loss + regressor_loss
        else:
            if capacity < 0:
                raise ValueError('capacity must be positive')
            # kl_loss is always positive, so subtracting capacity and 
            # taking the absolute value sets a capacity
            loss = mse_loss + gamma * (kl_loss - capacity).abs() + regressor_loss
    else:
        loss = torch.tensor(np.inf, device=device)
    return loss, mse_loss, kl_loss, regressor_loss

bnode_core.nn.vae.vae_train_test

VAE training and testing pipeline for timeseries modeling.

This module implements the complete training pipeline for Variational Autoencoders (VAE) with parameter conditioning, supporting standard VAE, PELS-VAE, and feed-forward modes.

Attention

This documentation is AI generated. Be aware of possible inaccuricies.

Command-line Usage

The module uses Hydra for configuration management and MLflow for experiment tracking. Training is launched via the command line:

uv run python -m bnode_core.nn.vae.vae_train_test

uv run <path to vae_train_test.py>

Configuration files are loaded from conf/train_test_vae.yaml (or specified config path).

Configuration

Key configuration parameters (via Hydra config):

  • dataset_name: Name of HDF5 dataset to load
  • use_cuda: Enable CUDA acceleration
  • use_amp: Enable automatic mixed precision training
  • nn_model.network.*: Model architecture parameters (hidden_dim, n_latent, activation, etc.)
  • nn_model.training.*: Training hyperparameters (batch_size, lr, max_epochs, etc.)
Training Workflow
  1. Load HDF5 dataset and create train/validation/test/common_test dataloaders
  2. Initialize VAE model with specified architecture
  3. Initialize normalization layers on full training dataset
  4. Train with:

  5. Adam optimizer with learning rate scheduling (ReduceLROnPlateau)

  6. Early stopping monitoring validation loss
  7. Capacity scheduling for controlled KL divergence growth
  8. Automatic mixed precision (AMP) support
  9. Gradient clipping for stability
  10. Save best model checkpoint based on validation loss
  11. Evaluate on all dataset splits and save predictions to HDF5 file
  12. Log all metrics and artifacts to MLflow
Output Files
  • model.pth: Best model checkpoint (state_dict)
  • dataset_with_predictions.h5: Copy of input dataset with added model predictions
  • vae_train_test.py, vae_architecture.py: Copies of source files for reproducibility
Key Features
  • Multi-pass prediction: Average multiple stochastic forward passes for robust predictions
  • Capacity scheduling: Gradually increase KL divergence capacity to prevent posterior collapse
  • Early stopping: Monitor validation loss with configurable patience and threshold
  • MLflow integration: Automatic logging of metrics, parameters, and artifacts via decorator
  • Reproducibility: Saves source code and full configuration to output directory

train(cfg: train_test_config_class) -> float

Train VAE model on timeseries dataset with MLflow tracking.

Complete training pipeline including:

  • Dataset loading and preprocessing
  • Model initialization and normalization layer setup
  • Training loop with early stopping and capacity scheduling
  • Evaluation on all dataset splits
  • Model checkpoint saving and artifact logging

Parameters:

Name Type Description Default
cfg train_test_config_class

Hydra configuration object containing all training parameters. Key sections: dataset_name, use_cuda, use_amp, nn_model.network, nn_model.training

required

Returns:

Type Description
float

Final MSE loss on test set (float).

Notes
  • Uses @log_hydra_to_mlflow decorator for automatic MLflow experiment tracking
  • Saves best model based on validation loss
  • Copies dataset to output directory with added model predictions
  • Logs metrics at each epoch: loss, mse_loss, kl_loss, regressor_loss, populated_dims
Source code in src/bnode_core/nn/vae/vae_train_test.py
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
@log_hydra_to_mlflow
def train(cfg: train_test_config_class) -> float:
    """Train VAE model on timeseries dataset with MLflow tracking.

    Complete training pipeline including:

    - Dataset loading and preprocessing
    - Model initialization and normalization layer setup
    - Training loop with early stopping and capacity scheduling
    - Evaluation on all dataset splits
    - Model checkpoint saving and artifact logging

    Args:
        cfg: Hydra configuration object containing all training parameters.
            Key sections: dataset_name, use_cuda, use_amp, nn_model.network, nn_model.training

    Returns:
        Final MSE loss on test set (float).

    Notes:
        - Uses @log_hydra_to_mlflow decorator for automatic MLflow experiment tracking
        - Saves best model based on validation loss
        - Copies dataset to output directory with added model predictions
        - Logs metrics at each epoch: loss, mse_loss, kl_loss, regressor_loss, populated_dims
    """
    device = torch.device('cuda' if torch.cuda.is_available() and cfg.use_cuda else 'cpu')

    # load dataset and config
    dataset, dataset_config = load_dataset_and_config(cfg.dataset_name, cfg.dataset_path)

    # make train and test torch tensor datasets
    train_dataset = make_stacked_dataset(dataset, 'train')
    test_dataset = make_stacked_dataset(dataset, 'test')
    validation_dataset = make_stacked_dataset(dataset, 'validation')
    common_test_dataset = make_stacked_dataset(dataset, 'common_test')

    # initialize data loaders
    train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=cfg.nn_model.training.batch_size, shuffle=True)
    test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=cfg.nn_model.training.batch_size, shuffle=True)
    validation_loader = torch.utils.data.DataLoader(validation_dataset, batch_size=cfg.nn_model.training.batch_size, shuffle=True)
    common_test_loader = torch.utils.data.DataLoader(common_test_dataset, batch_size=cfg.nn_model.training.batch_size, shuffle=True)

    # initialize model
    model = VAE(
        n_states=dataset['train']['states'].shape[1],
        n_outputs=dataset['train']['outputs'].shape[1],
        seq_len=dataset['train']['states'].shape[2],
        parameter_dim=dataset['train']['parameters'].shape[1],
        hidden_dim=cfg.nn_model.network.linear_hidden_dim,
        bottleneck_dim=cfg.nn_model.network.n_latent,
        activation=eval(cfg.nn_model.network.activation),
        n_layers=cfg.nn_model.network.n_linear_layers,
        params_to_decoder=cfg.nn_model.network.params_to_decoder,
        feed_forward_nn=cfg.nn_model.network.feed_forward_nn,
    )
    model.to(device)

    # initialize timeseries_normalization layer on whole dataset
    _states = train_dataset.datasets['states'].to(device)
    _outputs = train_dataset.datasets['outputs'].to(device)
    _parameters = train_dataset.datasets['parameters'].to(device)
    _x = x = torch.cat((_states, _outputs), dim=1)
    model.timeseries_normalization.initialize_normalization(_x)
    model.Regressor.normalization(_parameters) if model.feed_forward_nn is False else None
    del _states, _outputs, _parameters, _x
    logging.info('Initialized timeseries_normalization layer on whole dataset')
    logging.info('Initialized Regressor normalization layer on whole dataset')

    # initialize optimizer
    optimizer = torch.optim.Adam(model.parameters(), lr=cfg.nn_model.training.lr_start)
    #optimizer = torch.optim.SGD(model.parameters(), lr=cfg.nn_model.training.lr_start)

    # initialize lr scheduler
    lr_scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer=optimizer,
                                                            mode='min',
                                                            factor=cfg.nn_model.training.lr_scheduler_plateau_gamma,
                                                            patience=cfg.nn_model.training.lr_scheduler_plateau_patience,
                                                            threshold=cfg.nn_model.training.lr_scheduler_threshold,
                                                            threshold_mode=cfg.nn_model.training.lr_scheduler_threshold_mode,
                                                            min_lr=cfg.nn_model.training.lr_min,
                                                            )
    # initialize early stopping
    early_stopping = EarlyStopping(patience=cfg.nn_model.training.early_stopping_patience,
                                      verbose=True,
                                      threshold=cfg.nn_model.training.early_stopping_threshold,
                                      threshold_mode=cfg.nn_model.training.early_stopping_threshold_mode,
                                      path=filepaths.filepath_model_current_hydra_output(),
                                      trace_func=logging.info)

    # initialize capacity scheduler
    capacity_scheduler = CapacityScheduler(
        patience = cfg.nn_model.training.capacity_patience,
        capacity_start = cfg.nn_model.training.capacity_start,
        capacity_max=cfg.nn_model.training.capacity_max,
        capacity_increment = cfg.nn_model.training.capacity_increment,
        capacity_increment_mode = cfg.nn_model.training.capacity_increment_mode,
        threshold = cfg.nn_model.training.capacity_threshold,
        threshold_mode = cfg.nn_model.training.capacity_threshold_mode,
        trace_func = logging.info,
        enabled=cfg.nn_model.training.use_capacity
    )
    # initialize gradient scaler
    scaler = torch.cuda.amp.GradScaler(enabled=cfg.use_amp)
    logging.info('Training with automatic mixed precision: {}'.format(cfg.use_amp))

    # define one model and loss evaluation
    def model_and_loss_evaluation(model, states, outputs, parameters, train=True, n_passes: int = 1, return_model_outputs: bool = False, test_from_regressor: bool = True):
        """Evaluate model forward pass and compute all loss components.

        Args:
            model: VAE model instance.
            states: State timeseries tensor, shape (batch, n_states, seq_len).
            outputs: Output timeseries tensor, shape (batch, n_outputs, seq_len).
            parameters: System parameters tensor, shape (batch, parameter_dim).
            train: If True, use Encoder's latent distribution. If False, controlled by test_from_regressor.
            n_passes: Number of stochastic forward passes to average.
            return_model_outputs: If True, return model outputs (predictions, latent variables, raw losses).
            test_from_regressor: If True during testing, use Regressor's latent distribution instead of Encoder's.

        Returns:
            Dictionary with keys: loss, mse_loss, kl_loss, regressor_loss, populated_dims.
            If return_model_outputs=True, returns tuple (ret_val, model_outputs) where model_outputs
            contains: mse_loss_raw, kl_loss_raw, regressor_loss_raw, states_hat, outputs_hat, 
            mu, logvar, mu_hat, logvar_hat.
        """
        _train = train if train is True else test_from_regressor # if not training, do the test with either mu, logvar from regressor or from encoder
        x, x_hat, states_hat, outputs_hat, mu, logvar, mu_hat, logvar_hat, normed_values = model(states, outputs, parameters, train=_train, 
                                                                                                 predict = False, n_passes=n_passes, 
                                                                                                 test_with_zero_eps=cfg.nn_model.training.test_with_zero_eps,
                                                                                                 device=device)
        loss, mse_loss, kl_loss, regressor_loss = loss_function(
                    normed_values['x'], normed_values['x_hat'], mu, mu_hat, 
                    logvar, logvar_hat, 
                    beta=cfg.nn_model.training.beta_start, 
                    gamma=cfg.nn_model.training.gamma,
                    capacity= None if cfg.nn_model.training.use_capacity is False
                        else capacity_scheduler.get_capacity(),
                    device=device,
        )
        _populated_dimensions, _ = count_populated_dimensions(mu, logvar, cfg.nn_model.training.count_populated_dimensions_threshold)
        ret_val = {
            'loss': loss,
            'mse_loss': mse_loss,
            'kl_loss': kl_loss,
            'regressor_loss': regressor_loss,
            'populated_dims': _populated_dimensions,
        }
        if return_model_outputs:
            # losses per dim
            _, mse_loss_raw, kl_loss_raw, regressor_loss_raw = loss_function(
                    x, x_hat, mu, mu_hat, 
                    logvar, logvar_hat, 
                    beta=cfg.nn_model.training.beta_start, 
                    gamma=cfg.nn_model.training.gamma,
                    capacity= None,
                    reduce=False
                    )   
            model_outputs = {
                'mse_loss_raw': mse_loss_raw,
                'kl_loss_raw': kl_loss_raw,
                'regressor_loss_raw': regressor_loss_raw,
                'states_hat': states_hat,
                'outputs_hat': outputs_hat,
                'mu': mu,
                'logvar': logvar,
                'mu_hat': mu_hat,
                'logvar_hat': logvar_hat,
            }
        if not train:
            # call value.item() for each value in return_value
            ret_val = dict({key: value.item() for key, value in ret_val.items()})
            if return_model_outputs:
                model_outputs = dict({key: value.cpu().detach().numpy() for key, value in model_outputs.items()})
        return ret_val if not return_model_outputs else (ret_val, model_outputs)

    def get_model_inputs(data_loader: torch.utils.data.DataLoader, data: dict = None):
        """Extract model inputs from data loader or data dictionary.

        Args:
            data_loader: PyTorch DataLoader (if provided, fetches next batch).
            data: Dictionary with keys 'states', 'outputs', 'parameters' (alternative to data_loader).

        Returns:
            Tuple of (states, outputs, parameters) tensors moved to device.
        """
        if data_loader is None:
            assert data is not None, 'Either data_loader or data must be not None'
        else:
            data = next(iter(data_loader))
        # get data from data loader
        states = data['states'].to(device)
        outputs = data['outputs'].to(device)
        parameters = data['parameters'].to(device)
        return states, outputs, parameters


    # define train loop for one epoch
    def train_one_epoch(model, train_loader, optimizer, scaler, epoch):
        """Execute one training epoch with gradient updates.

        Iterates through all batches in train_loader, computes losses, performs
        backpropagation with gradient clipping and AMP scaling.

        Args:
            model: VAE model instance.
            train_loader: PyTorch DataLoader for training data.
            optimizer: PyTorch optimizer.
            scaler: CUDA AMP gradient scaler.
            epoch: Current epoch number (for logging).

        Returns:
            Dictionary with training metrics: loss, mse_loss, kl_loss, regressor_loss, populated_dims.
        """
        # get data from train loader
        model.train()
        for batch_idx, data in enumerate(train_loader):
            # get data from data loader
            states, outputs, parameters = get_model_inputs(train_loader)
            # zero the parameter gradients
            optimizer.zero_grad()
            # forward + backward + optimize
            with torch.cuda.amp.autocast(enabled=cfg.use_amp):
                ret_vals_train = model_and_loss_evaluation(model, states, outputs, parameters, n_passes=cfg.nn_model.training.n_passes_train, test_from_regressor=cfg.nn_model.training.test_from_regressor)
            loss = ret_vals_train['loss'] if cfg.nn_model.network.feed_forward_nn is False else ret_vals_train['mse_loss']
            scaler.scale(loss).backward()
            scaler.unscale_(optimizer)
            torch.nn.utils.clip_grad_norm_(model.parameters(), cfg.nn_model.training.clip_grad_norm)
            scaler.step(optimizer)
            scaler.update()

        # call value.item() for each value in return_value
        ret_vals_train = dict({key: value.item() for key, value in ret_vals_train.items()})

        return ret_vals_train

    def test_or_validate_one_epoch(model, _data_loader, n_passes: int = 1, all_batches: bool = False,
                                   return_model_outputs: bool = False):
        """Evaluate model on validation or test set without gradient computation.

        Args:
            model: VAE model instance.
            _data_loader: PyTorch DataLoader for evaluation data.
            n_passes: Number of stochastic forward passes to average.
            all_batches: If True, evaluate on all batches and average metrics. If False, evaluate only first batch.
            return_model_outputs: If True, return model predictions and latent variables.

        Returns:
            Dictionary with evaluation metrics: loss, mse_loss, kl_loss, regressor_loss, populated_dims.
            If return_model_outputs=True, returns tuple (ret_vals, model_outputs) where model_outputs
            contains predictions and latent variables for all evaluated batches.
        """
        model.eval()
        # make sure that the data loader is not shuffled by initializing a new data loader
        if all_batches:
            data_loader = torch.utils.data.DataLoader(_data_loader.dataset, batch_size=_data_loader.batch_size, shuffle=False)
        else:
            data_loader = _data_loader
        _ret_vals = []
        _model_outputs = []
        for step, data in enumerate(data_loader):
            states, outputs, parameters = get_model_inputs(data_loader=None, data=data)
            # forward
            with torch.no_grad():
                ret_vals, model_outputs = model_and_loss_evaluation(model, states, outputs, parameters, train=False, n_passes=n_passes, return_model_outputs=True, test_from_regressor = cfg.nn_model.training.test_from_regressor)
            _ret_vals.append(ret_vals)
            _model_outputs.append(model_outputs)
            if all_batches is False:
                break
        # average over all calls
        if all_batches:
            ret_vals = {}
            for key in _ret_vals[0].keys():
                ret_vals[key] = sum([_ret_val[key] for _ret_val in _ret_vals]) / len(_ret_vals)
        else:
            ret_vals = _ret_vals[0]
        # make one tensor from all model outputs
        if return_model_outputs:
            model_outputs = {key: np.concatenate([_batch_output[key] for _batch_output in _model_outputs], axis=0) for key in _model_outputs[0].keys()}
        return ret_vals if not return_model_outputs else (ret_vals, model_outputs)

    def append_context_to_dict_keys(dictionary: dict, context: str):
        """Add context suffix to all dictionary keys for MLflow logging.

        Args:
            dictionary: Dictionary with metric names as keys.
            context: Suffix to append (e.g., 'train', 'validation', 'test').

        Returns:
            New dictionary with keys formatted as 'original_key_context'.
        """
        return dict({'{}_{}'.format(key, context): value for key, value in dictionary.items()})

    # training loop
    _flag_break_next_epoch = False
    for epoch in range(cfg.nn_model.training.max_epochs):
        # train one epoch
        if not _flag_break_next_epoch:
            ret_vals_train = train_one_epoch(model, train_loader, optimizer, scaler, epoch)
        else:
            ret_vals_train = test_or_validate_one_epoch(model, train_loader, n_passes=cfg.nn_model.training.n_passes_test)
        # validate one epoch
        ret_vals_validation = test_or_validate_one_epoch(model, validation_loader, n_passes=cfg.nn_model.training.n_passes_test)
        # test one epoch
        ret_vals_test = test_or_validate_one_epoch(model, test_loader, n_passes=cfg.nn_model.training.n_passes_test)
        # lr scheduler step
        if not _flag_break_next_epoch:
            lr_scheduler.step(ret_vals_validation['loss'] if cfg.nn_model.network.feed_forward_nn is False else ret_vals_validation['mse_loss'])
        # early stopping
            early_stopping(ret_vals_validation['loss'] if cfg.nn_model.network.feed_forward_nn is False else ret_vals_validation['mse_loss'],
                           model, epoch, corresponding_loss = ret_vals_train['loss'])
        # capacity scheduler
        capacity_scheduler.update(ret_vals_validation['mse_loss'])
        # log stats with logging
        string = 'Epoch: {}/{} | train/validate/test: {:.4f}/{:.4f}/{:.4f} | mse: {:.4f}/{:.4f}/{:.4f} | kl_loss: {:.4f}/{:.4f}/{:.4f} | regressor_loss: {:.4f}/{:.4f}/{:.4f} | pop. dim: {}/{}/{} | \
            \t\t\t| batches: {} | lr: {:.6f} |'.format(
            epoch, cfg.nn_model.training.max_epochs,
            ret_vals_train['loss'], ret_vals_validation['loss'], ret_vals_test['loss'],
            ret_vals_train['mse_loss'], ret_vals_validation['mse_loss'], ret_vals_test['mse_loss'],
            ret_vals_train['kl_loss'], ret_vals_validation['kl_loss'], ret_vals_test['kl_loss'],
            ret_vals_train['regressor_loss'], ret_vals_validation['regressor_loss'], ret_vals_test['regressor_loss'],
            ret_vals_train['populated_dims'], ret_vals_validation['populated_dims'], ret_vals_test['populated_dims'],
            len(train_loader),
            optimizer.param_groups[0]['lr'])
        string = string + ' capacity: {:.4f} |'.format(capacity_scheduler.get_capacity()) if cfg.nn_model.training.use_capacity else string
        string = string + ' EarlyStopping: {}/{} |'.format(early_stopping.counter, early_stopping.patience)
        logging.info(string)
        # log stats with mlflow
        mlflow.log_metrics(append_context_to_dict_keys(ret_vals_train, 'train'), step=epoch, )
        mlflow.log_metrics(append_context_to_dict_keys(ret_vals_validation, 'validation'), step=epoch)
        mlflow.log_metrics(append_context_to_dict_keys(ret_vals_test, 'test'), step=epoch)
        mlflow.log_metric('lr', optimizer.param_groups[0]['lr'], step=epoch)
        mlflow.log_metric('EarlyStopping_counter', early_stopping.counter, step=epoch)
        mlflow.log_metric('capacity', capacity_scheduler.get_capacity(), step=epoch) if cfg.nn_model.training.use_capacity else None

        # check early stopping
        if early_stopping.early_stop:
            logging.info("Early stopping")
            mlflow.log_param('ended_by', 'early_stopping')
            # let the evaluation run one more time to record the outputs of the best model
            _flag_break_next_epoch = True
            # load the best model
            model.load(filepaths.filepath_model_current_hydra_output(), device=device)
        if _flag_break_next_epoch:
            break

    # Check performance of model on all datasets

    # load best model
    model.load(filepaths.filepath_model_current_hydra_output(), device=device)

    # close initial dataset
    dataset.close()
    # copy dataset to hydra output directory
    _path = filepaths.filepath_dataset_current_hydra_output()
    shutil.copy(filepaths.filepath_dataset_from_config(cfg.dataset_name, cfg.dataset_path), _path)
    dataset = h5py.File(_path, 'r+')
    # add model outputs to dataset
    for context, dataloader in zip(['train', 'test', 'validation', 'common_test'], [train_loader, test_loader, validation_loader, common_test_loader]):
        ret_vals, model_outputs = test_or_validate_one_epoch(model, dataloader, n_passes=cfg.nn_model.training.n_passes_test, all_batches=True, return_model_outputs=True)

        # log stats with logging
        string = context
        string = string + ': loss: {:.4f} | mse: {:.4f} | kl_loss: {:.4f} | regressor_loss: {:.4f} | pop. dim: {} |'.format(
            ret_vals['loss'],
            ret_vals['mse_loss'],
            ret_vals['kl_loss'],
            ret_vals['regressor_loss'],
            ret_vals['populated_dims'],
        )
        logging.info(string)

        # save loss function values
        for key, value in ret_vals.items():
            dataset.create_dataset(context+'/'+key, data=value) 
        # save reconstructed timeseries and raw loss function values
        for key, value in model_outputs.items():
            dataset.create_dataset(context+'/'+key, data=value)
        # log to mlflow
        mlflow.log_metrics(append_context_to_dict_keys(ret_vals, context), step=epoch)
    dataset.close()

    # save this file and the vae_architecture.py file to hydra output directory
    shutil.copy(Path(__file__), filepaths.dir_current_hydra_output())
    return ret_vals['mse_loss']

main()

Entry point for VAE training via Hydra CLI.

Initializes Hydra configuration system and launches train with validated config. Auto-detects config directory and uses 'train_test_vae' as the default config name.

This function can be registered in pyproject.toml, enabling command-line execution via a custom script name.

Examples:

Run from command line::

uv run python -m bnode_core.nn.vae.vae_train_test

With config overrides::

uv run python -m bnode_core.nn.vae.vae_train_test \
    nn_model.training.lr_start=0.0001 \
    dataset_name=my_dataset
Side Effects
  • Registers config store with Hydra
  • Auto-detects config directory from filepaths
  • Launches Hydra-decorated train function
Source code in src/bnode_core/nn/vae/vae_train_test.py
def main():
    """Entry point for VAE training via Hydra CLI.

    Initializes Hydra configuration system and launches train with validated
    config. Auto-detects config directory and uses 'train_test_vae' as the
    default config name.

    This function can be registered in pyproject.toml, enabling command-line
    execution via a custom script name.

    Examples:
        Run from command line::

            uv run python -m bnode_core.nn.vae.vae_train_test

        With config overrides::

            uv run python -m bnode_core.nn.vae.vae_train_test \\
                nn_model.training.lr_start=0.0001 \\
                dataset_name=my_dataset


    Side Effects:
        - Registers config store with Hydra
        - Auto-detects config directory from filepaths
        - Launches Hydra-decorated train function
    """
    from bnode_core.config import get_config_store
    cs = get_config_store()
    config_dir = filepaths.config_dir_auto_recognize()
    config_name = 'train_test_vae'
    hydra.main(config_path=str(config_dir.absolute()), config_name=config_name, version_base=None)(train)()