Source code for deep_nilmtk.model.unet

import torch
import torch.nn as nn
import torch.nn.functional as F
import math
import sys
from tqdm import tqdm
from pyro.contrib.forecast import eval_crps
from pyro.ops.stats import quantile
from .layers import  create_conv1, create_deconv1, create_linear, elu_plus_one_plus_epsilon, Up, MDGMM


import numpy as np


[docs]class UNETNILM(nn.Module): """ .. _unet: UNET-NILM impelementation The orginal paper can be found here: https://dl.acm.org/doi/abs/10.1145/3427771.3427859 The hyperparameter dictionnary is expected to include the following parameters :param appliances: List of appliances, defaults to 1 :type appliances: list :param feature_type: The type of input features generated in the pre-processing, defaults to 'main' :type feature_type: str :param n_channels: the number of output channels, defaults to 1 :type n_channels: int :param pool_filter: Pooling filter, defaults to 8 :type pool_filter: int :param latent_size: The latent size, defaults to 1024 :type latent_size: int """ def __init__( self, params): super().__init__() out_size= len(params['appliances']) if 'appliance' in params else params['out_size'] if 'feature_type' in params: n_channels=4 if params['feature_type']=="combined" else 1 else: n_channels = params['n_channels'] pool_filter = params['pool_filter'] if 'pool_filter' in params else 8 latent_size = params['latent_size'] if 'latent_size' in params else 1024 layers = [nn.Sequential(create_conv1(n_channels, 30, kernel_size=10, stride=2), nn.ReLU())] layers.append(nn.Sequential(create_conv1(30, 30, 8, bias=True, stride=2), nn.ReLU())) layers.append(nn.Sequential(create_conv1(30, 40, 6, bias=True, stride=1), nn.ReLU())) layers.append(nn.Sequential(create_conv1(40, 50, 5, bias=True, stride=1), nn.ReLU(), nn.Dropout(0.2))) layers.append(nn.Sequential(create_conv1(50, 50, 5, bias=True, stride=1), nn.ReLU())) self.enc_layers = nn.ModuleList(layers) layers = [] layers.append(Up(50, 40, 5, 1, nn.ReLU())) layers.append(Up(40, 30, 5, 1, nn.ReLU())) layers.append(Up(30, 30, 6, 1, nn.ReLU())) self.dec_layers = nn.ModuleList(layers) self.fc = nn.Sequential(nn.AdaptiveAvgPool1d(pool_filter), nn.Flatten(), create_linear(30*pool_filter, latent_size), nn.ReLU(), nn.Dropout(0.2), nn.Linear(latent_size, out_size)) self.mean = params['mean'] if 'mean' in params else 0 self.std = params['std'] if 'std' in params else 1
[docs] def forward(self, x): if x.ndim!=3: x = torch.unsqueeze(x, 1) else: x = x.permute(0,2,1) xi = [self.enc_layers[0](x)] for layer in self.enc_layers[1:]: xi.append(layer(xi[-1])) for i, layer in enumerate(self.dec_layers): xi[-1] = layer(xi[-1], xi[-2 - i]) out = self.fc(xi[-1]) return out
[docs] def step(self, batch): """Disaggregates a batch of data :param batch: A batch of data. :type batch: Tensor :return: loss function as returned form the model and MAE as returned from the model. :rtype: tuple(float,float) """ x, y = batch out = self(x) # BxCxT error = (y - out) loss = F.mse_loss(out, y) mae = error.abs().data.mean() return loss, mae
[docs] def predict(self, model, test_dataloader): """Generate prediction during testing for the test_dataLoader :param model: pre-trained model. :param test_dataloader: data loader for the testing period. :type test_dataloader: dataLoader :return: Disaggregated power consumption. :rtype: tensor """ net = model.model.eval() num_batches = len(test_dataloader) values = range(num_batches) pred = [] true = [] with tqdm(total = len(values), file=sys.stdout) as pbar: with torch.no_grad(): for batch_idx, batch in enumerate(test_dataloader): if len(batch)==2: x, y = batch out = net(x) pred.append(out) true.append(y) else: x = batch out = net(x) pred.append(out) del batch pbar.set_description('processed: %d' % (1 + batch_idx)) pbar.update(1) pbar.close() # Perform the denormalization Here # Denormalise the output if self.target_norm == 'z-norm': # z-normalisation pred = self.mean + self.std * pred pred = torch.tensor(np.where(pred > 0, pred, 0)) elif self.target_norm =='min-max': # min-max normalisation pred = self.min + (self.max - self.min) * pred pred = torch.tensor(np.where(pred > 0, pred, 0)) else: # log normalisation was perfomed pred = pred.expm1() if len(true)!=0: true = torch.cat(true, 0).expm1() results = {"pred":pred, 'true':true} else: results = {"pred":pred} return results
[docs]class UNETNILMSeq2Quantile(nn.Module): """UNET-NILM impelementation with quantile regression The orginal paper can be found here: https://dl.acm.org/doi/abs/10.1145/3427771.3427859 The hyperparameter dictionnary is expected to include the following parameters :param appliances: List of appliances, defaults to 1 :type appliances: list :param feature_type: The type of input features generated in the pre-processing, defaults to 'main' :type feature_type: str :param n_channels: the number of output channels, defaults to 1 :type n_channels: int :param pool_filter: Pooling filter, defaults to 8 :type pool_filter: int :param latent_size: The latent size, defaults to 1024 :type latent_size: int :param quantile: The quantiles to use during prediction, defaults to [0.1, 0.25, 0.5, 0.75, 0.9] :param quantile: list It can be used as follows: .. code-block:: python 'UNETNiLMSeq2Q':NILMExperiment({ "model_name": 'UNETNiLMSeq2Quantile', 'in_size': 480, 'feature_type':'mains', 'input_norm':'z-norm', 'target_norm':'z-norm', 'kfolds':3, 'seq_type':'seq2quantile', 'max_nb_epochs':1 }), """ def __init__(self, params): super().__init__() out_size=len(params['appliances']) seq_len= params['in_size'] if 'in_size' in params else 99 seq_len+= params['out_size'] if 'out_size' in params else 0 self.seq_len = seq_len n_channels=4 if params['feature_type']=="combined" else 1 quantiles= params['quantiles'] if 'quantiles' in params else [0.1, 0.25, 0.5, 0.75, 0.9] pool_filter = params['pool_filter'] if 'pool_filter' in params else 8 latent_size = params['latent_size'] if 'latent_size' in params else 1024 self.q = torch.tensor(quantiles) self.out_size = out_size self.unet = UNETNILM({ 'n_channels' : n_channels, 'out_size' : out_size*seq_len//2, 'pool_filter' : pool_filter, 'latent_size': latent_size }) self.target_norm = params['target_norm'] self.mean = params['mean'] if 'mean' in params else 0 self.std = params['std'] if 'std' in params else 1
[docs] def forward(self, x): out = self.unet(x).reshape(x.size(0), -1, self.out_size) return out
[docs] def smooth_pinball_loss(self, y, q, tau, alpha = 1e-2, kappa = 1e3, margin = 1e-2): """ The implementation of the Pinball loss for NILM, original code can be found in : https://github.com/hatalis/smooth-pinball-neural-network/blob/master/pinball_loss.py Hatalis, Kostas, et al. "A Novel Smoothed Loss and Penalty Function for Noncrossing Composite Quantile Estimation via Deep Neural Networks." arXiv preprint (2019). """ error = (y - q) q_loss = (tau * error + alpha * F.softplus(-error / alpha)).sum(0).mean() # calculate smooth cross-over penalty diff = q[1:,:,:] - q[:-1,:,:] penalty = kappa * torch.square(F.relu(margin - diff)).mean() loss = penalty+q_loss return loss
[docs] def step(self, batch): """Disaggregates a batch of data :param batch: A batch of data. :type batch: Tensor :return: loss function as returned form the model and MAE as returned from the model. :rtype: tuple(float,float) """ x, y = batch out = self(x) self.q = self.q.to(x.device) q_pred = torch.quantile(out, q=self.q, dim=1) y_q = y.permute(1,0,2) tau = self.q[:,None, None].expand_as(q_pred) loss = self.smooth_pinball_loss(y_q, q_pred, tau) mae = (y_q - q_pred).abs().data.sum(0).mean() return loss, mae
[docs] def predict(self, model, test_dataloader): """Generate prediction during testing for the test_dataLoader :param model: pre-trained model. :param test_dataloader: data loader for the testing period. :type test_dataloader: dataLoader :return: Disaggregated power consumption. :rtype: tensor """ net = model.model.eval() num_batches = len(test_dataloader) values = range(num_batches) pred = [] true = [] with tqdm(total = len(values), file=sys.stdout) as pbar: with torch.no_grad(): for batch_idx, batch in enumerate(test_dataloader): if len(batch)==2: x, y = batch out = net(x) pred.append(out) true.append(y) else: x = batch out = net(x) pred.append(out) del batch pbar.set_description('processed: %d' % (1 + batch_idx)) pbar.update(1) pbar.close() pred = torch.cat(pred, 0) # Perform the denormalization Here # Denormalise the output if self.target_norm == 'z-norm': # z-normalisation pred = self.mean + self.std * torch.tensor(pred) pred = torch.tensor(np.where(pred > 0, pred, 0)) elif self.target_norm =='min-max': # min-max normalisation pred = self.min + (self.max - self.min) * torch.tensor(pred) pred = torch.tensor(np.where(pred > 0, pred, 0)) else: # log normalisation was perfomed pred = pred.expm1() q = self.q.to(x.device) q_pred = torch.quantile(pred, q=q, dim=1).permute(1,0,2) pred = q_pred[self.seq_len//2:,q_pred.size(1)//2,:] results = {"pred":pred, "q_pred":q_pred, "pred_quantile":pred} return results