# The original code was taken from the
# the repo: https://github.com/Yueeeeeeee/BERT4NILM/
# Paper Link: https://dl.acm.org/doi/pdf/10.1145/3427771.3429390
import math
import torch
from torch import nn
import torch.nn.functional as F
from tqdm import tqdm
import sys
from .layers import create_linear, create_conv1, create_deconv1
from torch.nn import TransformerEncoderLayer
import numpy as np
import mlflow
[docs]class GELU(nn.Module):
"""
Gaussian Error Linear Units GLU
"""
[docs] def forward(self, x):
"""
"""
return 0.5 * x * (1 + torch.tanh(math.sqrt(2 / math.pi) * (x + 0.044715 * torch.pow(x, 3))))
[docs]class PositionalEmbedding(nn.Module):
"""
Positional Embedding
:param max_len: maximum length of the input
:type max_len: int
:param d_model: dimension of the model
:type d_model: int
"""
def __init__(self, max_len, d_model):
super().__init__()
self.pe = nn.Embedding(max_len, d_model)
[docs] def forward(self, x):
batch_size = x.size(0)
return self.pe.weight.unsqueeze(0).repeat(batch_size, 1, 1)
[docs]class LayerNorm(nn.Module):
"""
Normalization layer
:param features: The number of input features
:type features: int
:param eps: Regularization factor, defaults to 1e-6
:type eps: float, optional
"""
def __init__(self, features, eps=1e-6):
super(LayerNorm, self).__init__()
self.weight = nn.Parameter(torch.ones(features))
self.bias = nn.Parameter(torch.zeros(features))
self.eps = eps
[docs] def forward(self, x):
mean = x.mean(-1, keepdim=True)
std = x.std(-1, keepdim=True)
return self.weight * (x - mean) / (std + self.eps) + self.bias
[docs]class Attention(nn.Module):
"""
Attention layer
:param query: Query values
:type query: tensor
:param key: Key values
:type key: tensor
:param value: Values
:type value: tensor
:param mask: Mask for a causal model, defaults to None
:type mask: tensor, optional
:param dropout: Dropout, defaults to None
:type dropout: float, optional
:return: output of the attention layer and attention score
:rtype: tuple(tensor, tensor)
"""
[docs] def forward(self, query, key, value, mask=None, dropout=None):
scores = torch.matmul(query, key.transpose(-2, -1)) / math.sqrt(query.size(-1))
if mask is not None:
scores = scores.masked_fill(mask == 0, -1e9)
p_attn = F.softmax(scores, dim=-1)
if dropout is not None:
p_attn = dropout(p_attn)
return torch.matmul(p_attn, value), p_attn
[docs]class MultiHeadedAttention(nn.Module):
"""
Multi headed attention layer
:param h: The number of heads
:type h: int
:param d_model: The dimension of the model
:type d_model: int
:param dropout: Dropout, defaults to 0.1
:type dropout: float, optional
.. note:
d_model should be multiple of h.
"""
def __init__(self, h, d_model, dropout=0.1):
super().__init__()
assert d_model % h == 0
self.d_k = d_model // h
self.h = h
self.linear_layers = nn.ModuleList([create_linear(d_model, d_model) for _ in range(3)])
self.output_linear = create_linear(d_model, d_model)
self.attention = Attention()
self.dropout = nn.Dropout(p=dropout)
[docs] def forward(self, query, key, value, mask=None):
batch_size = query.size(0)
query, key, value = [l(x).view(batch_size, -1, self.h, self.d_k).transpose(1, 2)
for l, x in zip(self.linear_layers, (query, key, value))]
x, attn = self.attention(
query, key, value, mask=mask, dropout=self.dropout)
x = x.transpose(1, 2).contiguous().view(
batch_size, -1, self.h * self.d_k)
return self.output_linear(x)
[docs]class PositionwiseFeedForward(nn.Module):
"""
Calculates the position wise feed forward
:param d_model: The dimension of the model
:type d_model: int
:param d_ff: size of hidden layer
:type d_ff: int
"""
def __init__(self, d_model, d_ff):
super(PositionwiseFeedForward, self).__init__()
self.w_1 = create_linear(d_model, d_ff)
self.w_2 = create_linear(d_ff, d_model)
self.activation = GELU()
[docs] def forward(self, x):
return self.w_2(self.activation(self.w_1(x)))
[docs]class SublayerConnection(nn.Module):
"""
Performs the addition and layer normalisation
More details can be found https://arxiv.org/pdf/1706.03762.pdf
:param size: the size of teh input
:type size: int
:param dropout: Dropout
:type dropout: float
"""
def __init__(self, size, dropout):
super(SublayerConnection, self).__init__()
self.layer_norm = LayerNorm(size)
self.dropout = nn.Dropout(dropout)
[docs] def forward(self, x, sublayer):
return self.layer_norm(x + self.dropout(sublayer(x)))
[docs]class BERT4NILM(nn.Module):
"""
.. _bert:
BERT4NILM implementation.
Original paper can be found here: https://dl.acm.org/doi/pdf/10.1145/3427771.3429390
Original code can be found here: https://github.com/Yueeeeeeee/BERT4NILM
The hyperparameter dictionnary is expected to include the following parameters
:param threshold: The threshold for states generation in the target power consumption, defaults to None
:type threshold: List of floats
:param cutoff: The cutoff for states generation in the target power consumption, defaults to None
:type cutoff: List of floats
:param min_on: The min on duration for states generation in the target power consumption, defaults to None
:type min_on: List of floats
:param min_off: The min off duration for states generation in the target power consumption, defaults to None
:type min_off: List of floats
:param in_size: The length of the input sequence, defaults to 488.
:type in_size: int
:param stride: The distance between two consecutive sequences, defaults to 1.
:type stride: int
:param hidden: The hidden size, defaults to 256
:type hidden: int
:param heads: The number of attention heads in each transformer block, defaults to 2
:type heads: int
:param n_layers: the number of transformer blocks in the model, defaults to 2
:type n_layers: int
:params dropout: The dropout, defaults to 0.2
:type dropout: float
it can be used as follow:
.. code-block:: python
'Bert4NILM': NILMExperiment({
"model_name": 'BERT4NILM',
'in_size': 480,
'feature_type':'main',
'stride':10,
'max_nb_epochs':1,
'cutoff':{
'aggregate': 6000,
'kettle': 3100,
'fridge': 300,
'washing machine': 2500,
'microwave': 3000,
'dishwasher': 2500
},
'threshold':{
'kettle': 2000,
'fridge': 50,
'washing machine': 20,
'microwave': 200,
'dishwasher': 10
},
'min_on':{
'kettle': 2,
'fridge': 10,
'washing machine': 300,
'microwave': 2,
'dishwasher': 300
},
'min_off':{
'kettle': 0,
'fridge': 2,
'washing machine': 26,
'microwave': 5,
'dishwasher': 300
},
})
"""
def __init__(self, params):
super().__init__()
self.original_len = params['in_size'] if 'in_size' in params else 99
self.output_size = len(params['appliances']) if 'appliances' in params else 1
self.stride = params['stride'] if 'stride' in params else 1
# this is only valide for a single appliance usage
# The original mode was proposed for several appliances
self.cutoff = [params['cutoff'][params['appliances'][0]]] if 'cutoff' in params else None
self.threshold = [params['threshold'][params['appliances'][0]]] if 'threshold' in params else None
self.min_on = [params['min_on'][params['appliances'][0]]] if 'min_on' in params else None
self.min_off = [params['min_off'][params['appliances'][0]]] if 'min_off' in params else None
# self.C0 = [params['lambda'][params['appliances'][0]]] if 'lambda' in params else [1e-6]
self.set_hpramas(self.cutoff, self.threshold, self.min_on, self.min_off)
self.C0 = torch.tensor([params['c0'] if 'c0' in params else 1.])
self.latent_len = int(self.original_len / 2)
self.dropout_rate = params['dropout'] if 'dropout' in params else 0.2
self.hidden = params['hidden'] if 'hidden' in params else 256
self.heads = params['heads'] if 'heads' in params else 2
self.n_layers = params['n_layers'] if 'n_layers' in params else 2
self.conv = create_conv1(in_channels=1, out_channels=self.hidden,
kernel_size=5, stride=1, padding=2, padding_mode='replicate')
self.pool = nn.LPPool1d(norm_type=2, kernel_size=2, stride=2)
self.position = PositionalEmbedding(
max_len=self.latent_len, d_model=self.hidden)
self.layer_norm = LayerNorm(self.hidden)
self.dropout = nn.Dropout(p=self.dropout_rate)
# self.transformer_blocks = nn.ModuleList([TransformerBlock(
# self.hidden, self.heads, self.hidden * 4, self.dropout_rate) for _ in range(self.n_layers)])
self.transformer_blocks = nn.ModuleList([TransformerEncoderLayer(
self.hidden, self.heads, self.hidden * 4, self.dropout_rate) for _ in range(self.n_layers)])
self.deconv = create_deconv1(
in_channels=self.hidden, out_channels=self.hidden, kernel_size=4, stride=2, padding=1)
self.linear1 = create_linear(self.hidden, 128)
self.linear2 = create_linear(128, self.output_size)
self.activation = nn.Sigmoid()
self.kl = nn.KLDivLoss(reduction='batchmean')
self.mse = nn.MSELoss()
self.mae = nn.L1Loss(reduction='mean')
self.margin = nn.SoftMarginLoss()
self.l1_on = nn.L1Loss(reduction='sum')
[docs] def forward(self, sequence):
x_token = self.pool(self.conv(sequence.unsqueeze(1))).permute(0, 2, 1)
embedding = x_token + self.position(sequence)
x = self.dropout(self.layer_norm(embedding))
mask = None
for transformer in self.transformer_blocks:
x = transformer.forward(x, mask)
x = self.deconv(x.permute(0, 2, 1)).permute(0, 2, 1)
x = torch.tanh(self.linear1(x))
x = self.linear2(x)
return x
[docs] def step(self, batch, seq_type=None):
"""Disaggregates a batch of data
:param batch: A batch of data.
:type batch: Tensor
:return: loss function as returned form the model and MAE as returned from the model.
:rtype: tuple(float,float)
"""
seqs, labels_energy, status = batch
batch_shape = status.shape
logits = self.forward(seqs)
labels = labels_energy / self.cutoff.to(seqs.device) # This the normalization of the output data ?!!!
logits_energy = self.cutoff_energy(logits * self.cutoff.to(seqs.device))
logits_status = self.compute_status(logits_energy)
mask = (status >= 0).to(seqs.device)
labels_masked = torch.masked_select(labels, mask).view((-1, batch_shape[-1])).float()
logits_masked = torch.masked_select(logits, mask).view((-1, batch_shape[-1])).float()
status_masked = torch.masked_select(status, mask).view((-1, batch_shape[-1])).float()
logits_status_masked = torch.masked_select(logits_status, mask).view((-1, batch_shape[-1])).float()
# Calculating the Loss function
kl_loss = self.kl(torch.log(F.softmax(logits_masked.squeeze() / 0.1, dim=-1) + 1e-9), F.softmax(labels_masked.squeeze() / 0.1, dim=-1))
mse_loss = self.mse(logits_masked.contiguous().view(-1),
labels_masked.contiguous().view(-1))
margin_loss = self.margin((logits_status_masked * 2 - 1).contiguous().view(-1),
(status_masked * 2 - 1).contiguous().view(-1))
total_loss = kl_loss + mse_loss + margin_loss
on_mask = (status >= 0) * (((status == 1) + (status != logits_status.reshape(status.shape))) >= 1)
if on_mask.sum() > 0:
total_size = torch.tensor(on_mask.shape).prod()
logits_on = torch.masked_select(logits.reshape(on_mask.shape), on_mask)
labels_on = torch.masked_select(labels.reshape(on_mask.shape), on_mask)
loss_l1_on = self.l1_on(logits_on.contiguous().view(-1),
labels_on.contiguous().view(-1))
total_loss += self.C0.to(seqs.device)[0] * loss_l1_on / total_size
mae = self.mae(logits_masked.contiguous().view(-1),
labels_masked.contiguous().view(-1))
return total_loss, mae
[docs] def set_hpramas(self,cutoff, threshold, min_on, min_off):
"""
Setter for the hyper-parameters related to appliance state generation
:param cutoff: The power cutoff
:type cutoff: float
:param threshold: Threshold of target power consumption
:type threshold: float
:param min_on: Minimum on duration
:type min_on: float
:param min_off: Minimum off duration
:type min_off: float
"""
if cutoff is not None:
self.cutoff = torch.tensor(cutoff)
if threshold is not None:
self.threshold = torch.tensor(threshold)
if min_on is not None:
self.min_on = torch.tensor(min_on)
if min_off is not None:
self.min_off = torch.tensor(min_off)
[docs] def cutoff_energy(self, data):
"""
Removes the spikes from the data
:param data: Power consumption
:type data: tesnor
:return: Updated ower consumption
:rtype: tensor
"""
columns = data.squeeze().shape[-1]
if self.cutoff.size(0) == 0:
self.cutoff = torch.tensor(
[3100 for i in range(columns)])
data[data < 5] = 0
data = torch.min(data, self.cutoff.to(data.device))
return data
[docs] def compute_status(self, data):
"""
Calculates teh states for the target data based on the threshold
:param data: The target data
:type data: tensor
:return: The operational states
:rtype: tensor
"""
data_shape = data.shape
columns = data.shape[-1]
if self.threshold.size(0) == 0:
self.threshold = torch.tensor(
[10 for i in range(columns)])
status = (data >= self.threshold.to(data.device)) * 1
return status
[docs] def predict(self, model, test_dataloader):
"""Generates predictions for the test data loader
:param model: Pre-trained model
:type model: nn.Module
:param test_dataloader: The test data
:type test_dataloader: dataLoader
:return: Generated predictions
:rtype: dict
"""
net = model.model.eval()
num_batches = len(test_dataloader)
values = range(num_batches)
pred = []
e_pred_curve = []
s_pred_curve = []
with tqdm(total = len(values), file=sys.stdout) as pbar:
with torch.no_grad():
for batch_idx, batch in enumerate(test_dataloader):
seqs = batch
logits = self.forward(seqs)
logits_energy = self.cutoff_energy(logits * self.cutoff.to(seqs.device)) # Denormalization
logits_status = self.compute_status(logits_energy)
status = (logits_status > 0) * 1
s_pred_curve.append(status )
e_pred_curve.append(logits_energy * status)
del batch
pbar.set_description('processed: %d' % (1 + batch_idx))
pbar.update(1)
pbar.close()
# TODO: Denormalisation !!! Previously done ?
e_pred_curve = torch.cat(e_pred_curve, 0)
s_pred_curve = torch.cat(s_pred_curve, 0)
e_pred_curve = self.aggregate_seqs(e_pred_curve.squeeze())
s_pred_curve = self.aggregate_seqs(s_pred_curve.squeeze())
results = {
"pred":e_pred_curve,
"s_pred_curve":s_pred_curve
}
return results
[docs] def aggregate_seqs(self, prediction):
""" Aggregate the overleapping sequences using the mean
taking into consideration the stride size
:param prediction: test predictions of the current model
:type prediction: tensor
:return: Aggregted sequence
:rtype: tensor
"""
l = self.original_len
s = self.stride
n = (prediction.shape[0] -1) * self.stride + l # this is yo take into consideration the stride
sum_arr = np.zeros((n))
counts_arr = np.zeros((n))
o = len(sum_arr)
for i in range(prediction.shape[0]):
sum_arr[i*s:i*s + l] += prediction[i].reshape(-1).numpy()
counts_arr[i*s:i*s + l] += 1
for i in range(len(sum_arr)):
sum_arr[i] = sum_arr[i] / counts_arr[i]
return torch.tensor(sum_arr)