import torch
import torch.nn as nn
import torch.nn.functional as F
import math
import random
from tqdm import tqdm
import sys
from .layers import create_conv1, create_linear, MLPLayer
import numpy as np
from collections import OrderedDict
# =============================================================================
# Sequence-to-Point models
# =============================================================================
[docs]class S2P(nn.Module):
"""
This class is an abstract class for Sequence to point models. By implementing this class,
you can avoid to implement the predict and the forward functions.
:param params: dictionnary of values relative to hyper-parameters.
:type params: dict
The dictionnay is expected to include the following keys:
:param target_norm: the type of normlization of the target power, defaults to 'z-norm'.
:type target_norm: str.
:param mean: The mean consumption value of the target appliance, defaults to 0.
:type mean: float.
:param std: The std consumption value the target power, defaults to 1
:type std: float.
:param min: The mininum consumption value of the target appliance, defaults to 0.
:type min: float.
:param max: The maximum consumption value the target power, defaults to 1
:type max: float.
"""
def __init__(self, params):
"""Initialise the Seq-to-Point model
:param params: dictionnary of values relative to hyper-parameters.
:type params: dict
"""
super(S2P, self).__init__()
self.target_norm = params['target_norm'] if 'target_norm' in params else 'z-norm'
self.mean = params['mean'] if 'mean' in params else 0
self.std = params['std'] if 'std' in params else 1
self.min = params['min'] if 'min' in params else 0
self.max = params['max'] if 'max' in params else 1
[docs] @staticmethod
def suggest_hparams(self, trial):
"""
Function returning list of params that will be suggested from optuna
:param trial: Optuna Trial.
:type trial: optuna.trial
:return: Parameters with values suggested by optuna
:rtype: dict
"""
norm_ = trial.suggest_categorical('normalize', ['z-norm', 'lognorm'])
window_length = trial.suggest_int('in_size', low=50, high=1800)
window_length += 1 if window_length % 2 == 0 else 0
return {
'input_norm': norm_,
'output_norm':norm_,
'in_size': window_length,
'point_position':trial.suggest_categorical('point_position',['median','last_point'])
}
[docs] def step(self, batch):
"""
Disaggregates a batch of data
:param batch: A batch of data
:type batch: Tensor
:return: loss function as returned form the model and the MAE as returned from the model.
:rtype: tuple(float, float)
"""
x, y = batch
out = self(x) # BxCxT
error = (y - out)
loss = F.mse_loss(out, y)
mae = error.abs().data.mean()
return loss, mae
[docs] def predict(self, model, test_dataloader):
"""Generate prediction during testing for the test_dataLoader
:param model: pre-trained model.
:param test_dataloader: data loader for the testing period.
:type test_dataloader: dataLoader
:return: Disaggregated power consumption.
:rtype: tensor
"""
net = model.model.eval()
num_batches = len(test_dataloader)
values = range(num_batches)
pred = []
true = []
with tqdm(total = len(values), file=sys.stdout) as pbar:
with torch.no_grad():
for batch_idx, batch in enumerate(test_dataloader):
if len(batch)==2:
x, y = batch
out = net(x)
pred.append(out)
true.append(y)
else:
x = batch
out = net(x)
pred.append(out)
del batch
pbar.set_description('processed %d' % (1 + batch_idx))
pbar.update(1)
pbar.close()
pred = torch.cat(pred, 0)
# Perform the denormalization Here
# Denormalise the output
if self.target_norm == 'z-norm':
# z-normalisation
pred = self.mean + self.std * pred
pred = torch.tensor(np.where(pred > 0, pred, 0))
elif self.target_norm =='min-max':
# min-max normalisation
pred = self.min + (self.max - self.min) * pred
pred = torch.tensor(np.where(pred > 0, pred, 0))
else:
# log normalisation was perfomed
pred = pred.expm1()
if len(true)!=0:
true = torch.cat(true, 0).expm1()
results = {"pred":pred, 'true':true}
else:
print(pred.shape)
results = {"pred":pred}
return results
[docs]class Seq2Point(S2P):
"""
.. _seqpoint:
PyTorch implementation of the Seq-to-point
NILM model as porposed in :
https://dl.acm.org/doi/pdf/10.1145/3360322.3360844
:param params: dictionnary of values relative to hyper-parameters.
:type params: dict
Besides the additional paramter from teh parent model, the params dictionnay is expected to include the following keys:
:param feature_type: The number of input features, defaults to 1.
:type feature_type: int
:param appliances: A list of appliances.
:type appliances: list of str
:param pool_filter: The size of pooling filter, defaults to 50.
:type pool_filter: int
:param latent_size: The number of nodes in the last layer, defaults to 1024.
:type latent_size: int
"""
def __init__(self, params):
super(Seq2Point, self).__init__(params)
in_size=4 if params['feature_type']=="combined" else 1
output_size = len(params['appliances']) if 'appliances' in params else 1
pool_filter = params['pool_filter'] if 'pool_filter' in params else 50
latent_size = params['latent_size'] if 'latent_size' in params else 1024
self.pool_filter = pool_filter
self.enc_net = nn.Sequential(create_conv1(in_size, 30, 10, bias=True, stride=1),
nn.ReLU(),
create_conv1(30, 40, 8, bias=True, stride=1),
nn.ReLU(),
create_conv1(40, 50, 6, bias=True, stride=1),
nn.ReLU(),
create_conv1(50, 50, 5, bias=True, stride=1),
nn.ReLU(),
nn.Dropout(0.2),
create_conv1(50, 50, 5, bias=True, stride=1),
nn.ReLU(),
nn.AdaptiveAvgPool1d(self.pool_filter),
nn.Flatten())
self.fc = nn.Sequential(create_linear(50*pool_filter, latent_size),
nn.Dropout(0.2),
nn.Linear(latent_size, output_size))
[docs] def forward(self, x):
if x.ndim!=3:
x = torch.unsqueeze(x, 1)
else:
x = x.permute(0,2,1)
x = self.enc_net(x)
x = self.fc(x)
if self.target_norm =='lognorm' :
x = F.softplus(x)
elif self.target_norm =='min-max':
F.relu(x)
return x
#TODO: Continue from here
[docs]class RNN(S2P):
"""
.. _rnn:
PyTorch implementation of the RNN
NILM model as porposed in :
https://dl.acm.org/doi/pdf/10.1145/3360322.3360844
:param params: dictionnary of values relative to hyper-parameters.
:type params: dict
Besides the additional paramter from teh parent model, the params dictionnay is expected to include the following keys:
:param feature_type: The number of input features, defaults to 1.
:type feature_type: int
:param appliances: A list of appliances.
:type appliances: list of str
"""
def __init__(self, params):
super(RNN, self).__init__(params)
in_size=4 if params['feature_type']=="combined" else 1
output_size = len(params['appliances']) if 'appliances' in params else 1
self.model1 = nn.Sequential(OrderedDict([
('conv1', nn.Conv1d(
in_channels = in_size,
out_channels = 16,
kernel_size = 4,
stride = 1
))]))
self.lstm1 = nn.LSTM(
input_size = 16,
hidden_size = 128,
num_layers = 1,
bidirectional = True
)
self.lstm2 = nn.LSTM(
input_size = 256,
hidden_size = 256,
num_layers = 1,
bidirectional = True
)
self.model2 = nn.Sequential(OrderedDict([
('Linear', nn.Linear(
in_features = 2*256 , # The calculation according to input sequence !!!
out_features= 128
)),
('relu6', nn.Tanh()),
('Linear2', nn.Linear(
in_features = 128,
out_features = output_size
)),
]))
[docs] def forward(self,x ):
y_pred = nn.functional.pad(x.permute(0,2,1), (1,2))
y_pred =self.model1(y_pred).permute(2,0,1)
y_pred, _ = self.lstm1(y_pred)
y_pred, _ = self.lstm2(y_pred)
y_pred = torch.mean(y_pred, dim=0).squeeze()
y_pred = self.model2(y_pred)
if self.target_norm =='lognorm' :
x = F.softplus(x)
elif self.target_norm =='min-max':
F.relu(x)
return y_pred
[docs]class WindowGRU(S2P):
"""
.. _gru:
PyTorch implementation of the Window-GRU
NILM model as porposed in :
https://dl.acm.org/doi/pdf/10.1145/3360322.3360844
:param params: dictionnary of values relative to hyper-parameters.
:type params: dict
Besides the additional paramter from teh parent model, the params dictionnay is expected to include the following keys:
:param feature_type: The number of input features, defaults to 1.
:type feature_type: int
:param appliances: A list of appliances.
:type appliances: list of str
"""
def __init__(self, params):
super(WindowGRU, self).__init__(params)
in_size=4 if params['feature_type']=="combined" else 1
output_size = len(params['appliances']) if 'appliances' in params else 1
self.model1 = nn.Sequential(OrderedDict([
('conv1', nn.Conv1d(
in_channels = in_size,
out_channels = 16,
kernel_size = 4,
stride = 1
)),
('relu1', nn.ReLU())]))
self.gru1 = nn.GRU(
input_size = 16,
hidden_size = 64,
num_layers = 1,
bidirectional = True
)
self.dropout = nn.Dropout(.5)
self.gru2 = nn.GRU(
input_size = 128,
hidden_size = 128,
num_layers = 1,
bidirectional = True
)
self.model2 = nn.Sequential(OrderedDict([ ('relu3', nn.ReLU()),
('dropout3', nn.Dropout(.5)),
('Linear', nn.Linear(
in_features = 256 , # The calculation according to input sequence !!!
out_features= 128
)),
('relu6', nn.ReLU()),
('dropout3', nn.Dropout(.2)),
('Linear2', nn.Linear(
in_features = 128,
out_features = output_size
)),
]))
[docs] def forward(self,x ):
y_pred = nn.functional.pad(x.permute(0,2,1), (1,2))
y_pred =self.model1(y_pred).permute(2,0,1)
y_pred, _ = self.gru1(y_pred)
y_pred =self.dropout(y_pred)
y_pred, _ = self.gru2(y_pred)
y_pred = torch.mean(y_pred, dim=0).squeeze()
y_pred = self.model2(y_pred)
if self.target_norm =='lognorm' :
x = F.softplus(x)
elif self.target_norm =='min-max':
F.relu(x)
return y_pred
# =============================================================================
# Sequence-to-Sequence models
# =============================================================================
[docs]class S2S(nn.Module):
"""
This class is an abstract class for Sequence to Sequence models. By implementing this class,
you can avoid to implement the predict and the forward functions.
:param params: dictionnary of values relative to hyper-parameters.
:type params: dict
The dictionnay is expected to include the following keys:
:param target_norm: the type of normlization of the target power, defaults to 'z-norm'.
:type target_norm: str.
:param mean: The mean consumption value of the target appliance, defaults to 0.
:type mean: float.
:param std: The std consumption value the target power, defaults to 1
:type std: float.
:param min: The mininum consumption value of the target appliance, defaults to 0.
:type min: float.
:param max: The maximum consumption value the target power, defaults to 1
:type max: float.
"""
def __init__(self,params):
"""
Initialise the NILM model
Parameters
----------
params (dict): dictionnary of values relative to hyper-parameters.
"""
super(S2S, self).__init__()
self.original_len = params['in_size'] if 'in_size' in params else 99
self.original_len += params['out_size'] if 'out_size' in params else 0
self.target_norm = params['target_norm'] if 'target_norm' in params else 'z-norm'
self.mean = params['mean'] if 'mean' in params else 0
self.std = params['std'] if 'std' in params else 1
self.min = params['min'] if 'min' in params else 0
self.max = params['max'] if 'max' in params else 1
[docs] def step(self, batch):
"""Disaggregates a batch of data
:param batch: A batch of data.
:type batch: Tensor
:return: loss function as returned form the model and MAE as returned from the model.
:rtype: tuple(float,float)
"""
x, y = batch
out = self(x) # BxCxT
error = (y - out)
loss = F.mse_loss(out, y)
mae = error.abs().data.mean()
return loss, mae
[docs] def predict(self, model, test_dataloader):
"""Generate prediction during testing for the test_dataLoader
:param model: pre-trained model.
:param test_dataloader: data loader for the testing period.
:return: data loader for the testing period.
:rtype: tensor
"""
net = model.model.eval()
num_batches = len(test_dataloader)
values = range(num_batches)
pred = []
true = []
with tqdm(total = len(values), file=sys.stdout) as pbar:
with torch.no_grad():
for batch_idx, batch in enumerate(test_dataloader):
if len(batch)==2:
x, y = batch
out = net(x)
pred.append(out)
true.append(y)
else:
x = batch
out = net(x)
pred.append(out)
del batch
pbar.set_description('processed: %d' % (1 + batch_idx))
pbar.update(1)
pbar.close()
pred = torch.cat(pred, 0)
pred = self.aggregate_seqs(pred)
# Perform denormalisation here
# Denormalise the output
if self.target_norm == 'z-norm':
# z-normalisation
pred = self.mean + self.std * pred
pred = torch.tensor(np.where(pred > 0, pred, 0))
elif self.target_norm =='min-max':
# min-max normalisation
pred = self.min + (self.max - self.min) * pred
pred = torch.tensor(np.where(pred > 0, pred, 0))
else:
# log normalisation was perfomed
pred = pred.expm1()
if len(true)!=0:
true = torch.cat(true, 0).expm1()
true = self.aggregate_seqs(true)
results = {"pred":pred[self.original_len // 2:], 'true':true} # removing the padding added at the beginning
else:
results = {"pred":pred[self.original_len // 2:]}
return results
[docs] def aggregate_seqs(self, prediction):
"""Aggregate the overleapping sequences using the mean
:param prediction: test predictions of the current model
:type prediction: tensor
:return: Aggregted sequence
:rtype: tensor
"""
l = self.original_len
n = prediction.shape[0] + l - 1
sum_arr = np.zeros((n))
counts_arr = np.zeros((n))
o = len(sum_arr)
for i in range(prediction.shape[0]):
sum_arr[i:i + l] += prediction[i].reshape(-1).numpy()
counts_arr[i:i + l] += 1
for i in range(len(sum_arr)):
sum_arr[i] = sum_arr[i] / counts_arr[i]
return torch.tensor(sum_arr)
[docs]class Seq2Seq(S2S):
"""
.. _seqseq:
PyTorch implementation of the Window-GRU
NILM model as porposed in :
https://dl.acm.org/doi/pdf/10.1145/3360322.3360844
:param params: dictionnary of values relative to hyper-parameters.
:type params: dict
Besides the additional paramter from teh parent model, the params dictionnay is expected to include the following keys:
:param feature_type: The number of input features, defaults to 1.
:type feature_type: int
:param appliances: A list of appliances.
:type appliances: list of str
:param pool_filter: The size of pooling filter, defaults to 50.
:type pool_filter: int
:param latent_size: The number of nodes in the last layer, defaults to 1024.
:type latent_size: int
"""
def __init__(self, params):
super(Seq2Seq, self).__init__(params)
in_size=4 if params['feature_type']=="combined" else 1
output_size = len(params['appliances']) if 'appliances' in params else 1
pool_filter= params['pool_filter'] if 'pool_filter' in params else 8
latent_size=params['latent_size'] if 'latent_size' in params else 1024
seq_len = params['in_size'] if 'in_size' in params else 99
self.pool_filter = pool_filter
self.out_dim = output_size
self.enc_net = nn.Sequential(create_conv1(in_size, 30, 10, bias=True, stride=2),
nn.ReLU(),
create_conv1(30, 30, 8, bias=True, stride=2),
nn.ReLU(),
create_conv1(30, 40, 6, bias=True, stride=1),
nn.ReLU(),
create_conv1(40, 50, 5, bias=True, stride=1),
nn.ReLU(),
nn.Dropout(0.2),
create_conv1(50, 50, 5, bias=True, stride=1),
nn.ReLU(),
nn.AdaptiveAvgPool1d(self.pool_filter),
nn.Flatten()
)
self.fc = nn.Sequential(create_linear(50*pool_filter, latent_size),
nn.Dropout(0.2),
nn.Linear(latent_size, output_size*seq_len))
[docs] def forward(self, x):
if x.ndim!=3:
x = torch.unsqueeze(x, 1)
else:
x = x.permute(0,2,1)
x = self.enc_net(x)
x = self.fc(x).reshape(x.size(0), -1, self.out_dim)
x = F.softplus(x) if self.target_norm =='lognorm' else F.relu(x)
return x
[docs]class DAE(S2S):
"""
.. _dae:
PyTorch implementation of the DAE
NILM model as porposed in :
https://dl.acm.org/doi/pdf/10.1145/3360322.3360844
:param params: dictionnary of values relative to hyper-parameters.
:type params: dict
Besides the additional paramter from teh parent model, the params dictionnay is expected to include the following keys:
:param feature_type: The number of input features, defaults to 1.
:type feature_type: int
:param appliances: A list of appliances.
:type appliances: list of str
:param pool_filter: The size of pooling filter, defaults to 50.
:type pool_filter: int
:param latent_size: The number of nodes in the last layer, defaults to 1024.
:type latent_size: int
"""
def __init__(self, params):
super(DAE, self).__init__(params)
self.input_features = 4 if params['feature_type']=="combined" else 1
self.sequence_length = params['in_size'] if 'in_size' in params else 99
self.n_appliances = len(params['appliances']) if 'appliances' in params else 1
self.net = nn.Sequential(OrderedDict([
('conv1', nn.Conv1d(
in_channels = self.input_features,
out_channels = 8,
kernel_size = 4,
stride = 1
)),
('flatten', nn.Flatten()),
('Linear1', nn.Linear(
in_features = self.sequence_length * 8,
out_features= self.sequence_length * 8
)),
('relu1', nn.ReLU()),
('Linear2', nn.Linear(
in_features = self.sequence_length * 8,
out_features= 128
)),
('relu2', nn.ReLU()),
('Linear3', nn.Linear(
in_features = 128,
out_features= self.sequence_length * 8
)),
('relu3', nn.ReLU())]))
self.output_layer = nn.Conv1d(
in_channels = 8,
out_channels = self.n_appliances,
kernel_size = 4,
stride = 1
)
[docs] def forward(self,x):
y_pred = nn.functional.pad(x.permute(0,2,1), (2,1))
y_pred =self.net(y_pred)
y_pred = torch.reshape(y_pred, (-1,8,self.sequence_length))
y_pred = self.output_layer(nn.functional.pad(y_pred, (2,1)))
if self.target_norm =='lognorm' :
x = F.softplus(x)
elif self.target_norm =='min-max':
F.relu(x)
return y_pred