# -*- coding: utf-8 -*-
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from tqdm import tqdm
import sys
import numpy as np
[docs]class Encoder(nn.Module):
"""
Decoder block of the Temporal_pooling layer
"""
def __init__(
self,
in_features=3,
out_features=1,
kernel_size=3,
padding=1,
stride=1,
dropout=0.1,
):
super(Encoder, self).__init__()
self.conv = nn.Conv1d(
in_features,
out_features,
kernel_size=kernel_size,
padding=padding,
stride=stride,
bias=False,
)
self.bn = nn.BatchNorm1d(out_features)
self.drop = nn.Dropout(dropout)
[docs] def forward(self, x):
return self.drop(self.bn(F.relu(self.conv(x))))
[docs]class TemporalPooling(nn.Module):
"""
Temporal Pooling mechanism that combines data with different scales.
"""
def __init__(self, in_features=3, out_features=1, kernel_size=2,
dropout=0.1):
super(TemporalPooling, self).__init__()
self.kernel_size = kernel_size
self.pool = nn.AvgPool1d(kernel_size=self.kernel_size,
stride=self.kernel_size, padding=0)
self.conv = nn.Conv1d(in_features, out_features, kernel_size=1,
padding=0)
self.bn = nn.BatchNorm1d(out_features)
self.drop = nn.Dropout(dropout)
[docs] def forward(self, x):
# TODO: verify that the inputs are in the right shape
x = self.pool(x)
x = self.conv(x)
x = self.bn(F.relu(x))
x = self.drop(F.interpolate(x, scale_factor=self.kernel_size,
mode='linear', align_corners=True))
return x
[docs]class Decoder(nn.Module):
"""
Decoder block of the Temporal_pooling layer
"""
def __init__(self, in_features=3, out_features=1, kernel_size=2, stride=2):
super(Decoder, self).__init__()
self.conv = nn.ConvTranspose1d(in_features, out_features,
kernel_size=kernel_size, stride=stride,
bias=False)
self.bn = nn.BatchNorm1d(out_features)
[docs] def forward(self, x):
return self.conv(x)
[docs]class PTPNet(nn.Module):
"""
.. _ptp:
Source: https://github.com/lmssdd/TPNILM
Check the paper
Non-Intrusive Load Disaggregation by Convolutional
Neural Network and Multilabel Classification
by Luca Massidda, Marino Marrocu and Simone Manca
The hyperparameter dictionnary is expected to include the following parameters
The hyperparameter dictionnary is expected to include the following parameters
:param in_size: The input sequence length, defaults to 99
:type in_size: int
:param border: The delay between the input and out sequence, defaults to 30.
:type border: int
:param appliances: List of appliances
:type appliances: list
:param feature_type: The type of input features generated during pre-processing, defaults to 'main'.
:type feature_type: str
:param init_features: The number of features in the first encoder layer, defaults to 32.
:type init_fetaure: int
:param dropout: Dropout
:type dropout: float
:param target_norm: The type of normalization of the target data, defeaults to 'z-norm'.
:type target_norm: str
:param mean: The mean consumption of the target power, defaults to 0
:type mean: float
:param std: The STD consumption of the target power, defaults to 1
:type std: float
It can be used as follows:
.. code-block::python
'tempPool': NILMExperiment({
"model_name": 'tempPool',
'experiment_label':'regression',
'in_size': 480,
'input_norm':'z-norm',
'target_norm':'z-norm',
'feature_type':'mains',
'max_nb_epochs':max_nb_epochs,
'task_type':'regression',
'hidden_dim':64,
}),
"""
def __init__(self, params):
super(PTPNet, self).__init__()
output_len= params['in_size'] if 'in_size' in params else 481
self.border = params["border"] if "border" in params else 30
out_channels= len(params['appliances']) if 'appliances' in params else 1
input_features= 4 if params['feature_type'] == 'combined' else 1
features = params['init_features'] if 'init_features' in params else 32
dropout=params['dropout'] if 'dropout' in params else 0.1
self.border = params['border'] if 'border' in params else 16
self.seq_len = output_len
self.target_norm = params['target_norm'] if 'target_norm' in params else 'z-norm'
self.mean = params['mean'] if 'mean' in params else 0
self.std = params['std'] if 'std' in params else 1
p = 2
k = 1
self.encoder1 = Encoder(input_features, features, kernel_size=3,
padding=0, dropout=dropout)
# (batch, input_len - 2, 32)
self.pool1 = nn.MaxPool1d(kernel_size=p, stride=p)
self.encoder2 = Encoder(features * 1 ** k, features * 2 ** k,
kernel_size=3, padding=0, dropout=dropout)
# (batch, [input_len - 6] / 2, 64)
self.pool2 = nn.MaxPool1d(kernel_size=p, stride=p)
self.encoder3 = Encoder(features * 2 ** k, features * 4 ** k,
kernel_size=3, padding=0, dropout=dropout)
# (batch, [input_len - 12] / 4, 128)
self.pool3 = nn.MaxPool1d(kernel_size=p, stride=p)
self.encoder4 = Encoder(features * 4 ** k, features * 8 ** k,
kernel_size=3, padding=0, dropout=dropout)
# (batch, [input_len - 30] / 8, 256)
# Compute the output size of the encoder4 layer
# (batch, S, 256)
s = output_len / 8
if int(s/ 12) == 0:
print(f"""
Warning !!! the sequence length should be larger than {8*12}...
Continuing with the current length could badly impact the performance :(
""")
self.tpool1 = TemporalPooling(features * 8 ** k, features * 2 ** k,
kernel_size=int(s / 12) if int(s/ 12) > 0 else 1,
dropout=dropout)
self.tpool2 = TemporalPooling(features * 8 ** k, features * 2 ** k,
kernel_size=int(s / 6) if int(s/ 6) > 0 else 1,
dropout=dropout)
self.tpool3 = TemporalPooling(features * 8 ** k, features * 2 ** k,
kernel_size=int(s / 3) - int(s / 3) % 2 if int(s/ 3) > 0 else 1,
dropout=dropout)
self.tpool4 = TemporalPooling(features * 8 ** k, features * 2 ** k,
kernel_size=int(s / 2) - int(s / 2) % 2 if int(s/ 2) > 0 else 1,
dropout=dropout)
self.decoder = Decoder(2 * features * 8 ** k, features * 1 ** k,
kernel_size=p ** 3, stride=p ** 3)
self.activation = nn.Conv1d(features * 1 ** k, out_channels,
kernel_size=1, padding=0)
self.power = nn.Conv1d(features * 1 ** k, out_channels,
kernel_size=1, padding=0)
self.pow_criterion = nn.MSELoss()
self.act_criterion = nn.BCEWithLogitsLoss()
self.pow_w = (params['task_type'] == 'regression')
self.act_w = (params['task_type'] == 'classification')
self.pow_loss_avg = 0.68
self.act_loss_avg = 0.0045
self.power_scale = params["power_scale"] if "power_scale" in params else 2000.0
[docs] def forward(self, x):
# TODO: verify that the shapes with the original paper using the same parameters
enc1 = self.encoder1(x)
enc2 = self.encoder2(self.pool1(enc1))
enc3 = self.encoder3(self.pool2(enc2))
enc4 = self.encoder4(self.pool3(enc3))
tp1 = self.tpool1(enc4)
tp2 = self.tpool2(enc4)
tp3 = self.tpool3(enc4)
tp4 = self.tpool4(enc4)
dec = self.decoder(torch.cat([enc4, tp1, tp2, tp3, tp4], dim=1))
pw = self.power(dec)
act = self.activation(F.relu(dec))
return pw.permute(0,2,1), act.permute(0,2,1)
[docs] def step(self,batch , sequence_type= None):
"""Disaggregates a batch of data
:param batch: A batch of data.
:type batch: Tensor
:return: loss function as returned form the model and MAE as returned from the model.
:rtype: tuple(float,float)
"""
data, target_power, target_status = batch
output_power, output_status = self(data)
pow_loss = self.pow_criterion(output_power, target_power)
act_loss = self.act_criterion(output_status, target_status)
loss = (self.pow_w * pow_loss/ self.pow_loss_avg + self.act_w * act_loss / self.act_loss_avg)
error = (output_power - target_power)
mae = error.abs().data.mean()
return loss, mae
[docs] def predict(self, model, test_dataloader):
"""Generates predictions for the test data loader
:param model: Pre-trained model
:type model: nn.Module
:param test_dataloader: The test data
:type test_dataloader: dataLoader
:return: Generated predictions
:rtype: dict
"""
net = model.model.eval()
num_batches = len(test_dataloader)
values = range(num_batches)
s_hat =[]
p_hat =[]
x_true = []
with tqdm(total = len(values), file=sys.stdout) as pbar:
with torch.no_grad():
for batch_idx, batch in enumerate(test_dataloader):
x = batch
pw, sh = self(x)
sh = torch.sigmoid(sh)
s_hat.append(sh)
p_hat.append(pw)
#x += x.detach().cpu().numpy().mean() #TODO: This was done in the original implementation.
#This is a strange normalisation method. Check how it works in an notebook
x_true.append(x[:, :,
self.border:-self.border].detach().cpu().numpy().flatten())
del batch
pbar.set_description('processed %d' % (1 + batch_idx))
pbar.update(1)
pbar.close()
p_hat = torch.cat(p_hat, 0).float()
s_hat = torch.cat(s_hat, 0).float()
pred, s_hat = self.aggregate_seqs(p_hat.squeeze(), s_hat.squeeze())
# Denormalise the output
if self.target_norm == 'z-norm':
# z-normalisation
pred = self.mean + self.std * pred
pred = torch.tensor(np.where(pred > 0, pred, 0))
elif self.target_norm =='min-max':
# min-max normalisation
pred = self.min + (self.max - self.min) * pred
pred = torch.tensor(np.where(pred > 0, pred, 0))
else:
# log normalisation was perfomed
pred = torch.tensor(pred).expm1()
results ={
# 'aggregates': torch.tensor(x_true),
'pred': pred[self.border//2:], # this done to remove the data that was added
# during padding, data at the end will automatically
# be removed by the API
'pred_states': torch.tensor(s_hat)[self.border//2:]
}
return results
[docs] def aggregate_seqs(self, prediction, states):
""" Aggregates the overleapping sequences using the mean
:param prediction: test predictions of the current model with shape (n_samples + window_size -1 ,window_size)
:type prediction: tensor
:return: Aggregted sequence
:rtype: tensor
"""
l = self.seq_len
n = prediction.shape[0] + l - 1
sum_arr = np.zeros((n))
sum_states = np.zeros((n))
counts_arr = np.zeros((n))
o = len(sum_arr)
for i in range(prediction.shape[0]):
sum_arr[i:i + l] += prediction[i].reshape(-1).numpy()
sum_states[i:i + l] += states[i].reshape(-1).numpy()
counts_arr[i:i + l] += 1
for i in range(len(sum_arr)):
sum_arr[i] = sum_arr[i] / counts_arr[i]
sum_states[i] = sum_states[i] / counts_arr[i]
return torch.tensor(sum_arr), torch.tensor(sum_states)