Source code for deep_nilmtk.loader.wavenetdataset

import torch
import numpy as np
import random



[docs]def pad_data(data, context_size): """Performs data padding for both target and aggregate consumption :param data: The aggregate power :type data: np.array :param context_size: The length of teh sequence. :type context_size: int :return: The padded aggregate power. :rtype: np.array """ sequence_length = context_size units_to_pad = sequence_length // 2 padding = (units_to_pad,units_to_pad) if data.ndim==1: new_mains = np.pad(data, padding,'constant',constant_values=(0,0)) return new_mains else: new_mains = [] for i in range(data.shape[-1]): new_mains.append(np.pad(data[:,i], padding,'constant',constant_values=(0,0))) return np.stack(new_mains).T
[docs]class WaveNetDataLoader(torch.utils.data.Dataset): """ .. _wavenetdataset: This class is the dataLoader for the WaveNILM NILM model. The original code can be found here: https://github.com/jiejiang-jojo/fast-seq2point/ :param inputs: The aggregate power. :type inputs: np.array :param targets: The target appliance(s) power consumption, defaults to None :type targets: np.array, optional :param params: Hyper-parameter values, defaults to {} :type params: dict, optional The hyperparameter dictionnary is expected to include the following parameters :param in_size: The input sequence length, defaults to 99 :type in_size: int :param kernel_size: The size of teh kernel, defaults to 3. :type kernel_size: int :param layers: The number of layers of the model, defaults to 6. :type layers: int .. note:: This data loader generates target sequence cenetred in the input sequence with a length difference L between input and output sequence L = (2 ** layers - 1) * (kernel_size - 1) + 1 """ def __init__(self, inputs, targets=None, params= {}): self.context_size= params['in_size'] if 'in_size' in params else 99 self.kernel_size = params['kernel_size'] if 'kernel_size' in params else 3 # has to be odd integer, since even integer may break dilated conv output size self.layers = params['layers'] if 'layers' in params else 6 self.seq_len = (2 ** self.layers - 1) * (self.kernel_size - 1) + 1 self.num_points = self.context_size + self.seq_len #pad the sequence with zeros in the beginning and at the end inputs = pad_data(inputs, self.num_points) self.inputs = torch.tensor(inputs).float() if targets is not None: targets = pad_data(targets, self.num_points) if params['target_norm'] == 'z-norm': self.mean = np.mean(targets) self.std = np.std(targets) self.targets = torch.tensor( (targets - self.mean) / self.std ).float() else: self.targets = torch.tensor(targets).float().log1p() print(f"inputs {self.inputs.shape}, targets {self.targets.shape}") else: self.targets = None print(f"inputs {self.inputs.shape}") self.len = self.inputs.shape[0] - self.num_points self.indices = np.arange(self.inputs.shape[0]) def __len__(self): 'Denotes the total number of samples' return self.len
[docs] def get_sample(self, index): """ Generate a sample of power sequence. :param index: The start index of the first sequence :type index: int :return: Aggregate power and target consumption during training and only aggreagte power during testing :rtype: np.array """ indices = self.indices[index : index + self.num_points] indices_target = indices[self.seq_len//2 : - self.seq_len//2 ] inputs = self.inputs[sorted(indices)] if self.targets is not None: targets = self.targets[sorted(indices_target)] return inputs, targets else: return inputs
def __getitem__(self, index): return self.get_sample(index)