Source code for deep_nilmtk.model.wavenet


import math
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.autograd import Variable
from .baselines import S2S

# Original script https://github.com/jiejiang-jojo/fast-seq2point/


[docs]def init_layer_2(layer): """Initialize a Linear or Convolutional layer. """ #nn.init.xavier_uniform_(layer.weight) layer.weight.data.uniform_(-0.1, 0.1) if layer.bias is not None: layer.bias.data.fill_(0.)
[docs]def init_layer(layer): """Initialize a Linear or Convolutional layer. """ #nn.init.xavier_uniform_(layer.weight) if layer.weight.ndimension() == 4: (n_out, n_in, height, width) = layer.weight.size() n = n_in * height * width elif layer.weight.ndimension() == 3: (n_out, n_in, width) = layer.weight.size() n = n_in * width elif layer.weight.ndimension() == 2: (n_out, n) = layer.weight.size() std = math.sqrt(2. / n) scale = std * math.sqrt(3.) layer.weight.data.uniform_(-scale, scale) if layer.bias is not None: layer.bias.data.fill_(0.)
[docs]def init_bn(bn): """Initialize a Batchnorm layer. """ bn.bias.data.fill_(0.) bn.running_mean.data.fill_(0.) bn.weight.data.fill_(1.) bn.running_var.data.fill_(1.)
[docs]class CNN3(nn.Module): def __init__(self, seq_len=41, to_binary=False): super(CNN3, self).__init__() assert (seq_len - 1) % 2 == 0, f'seq_len ({seq_len}) must be odd' self.seq_len = seq_len self.to_binary = to_binary self.kernel_size = (seq_len - 1) // 2 + 1 self.conv1 = nn.Conv2d(in_channels=1, out_channels=32, kernel_size=(self.kernel_size, 1), stride=(1, 1), padding=(0, 0), bias=True) self.conv2 = nn.Conv2d(in_channels=32, out_channels=64, kernel_size=(self.kernel_size, 1), stride=(1, 1), padding=(0, 0), bias=True) self.conv_final = nn.Conv2d(in_channels=64, out_channels=1, kernel_size=(1, 1), stride=(1, 1), padding=(0, 0), bias=True) self.init_weights()
[docs] def init_weights(self): init_layer(self.conv1) init_layer(self.conv2) init_layer(self.conv_final)
[docs] def forward(self, input): x = input x = x.view(x.shape[0], 1, x.shape[1], 1) x = F.relu(self.conv1(x)) x = F.relu(self.conv2(x)) x = self.conv_final(x) x = x.view(x.shape[0], x.shape[2]) if self.to_binary: return torch.sigmoid(x) return x
[docs]class CNN7(nn.Module): def __init__(self, seq_len=253, to_binary=False): super(CNN7, self).__init__() self.seq_len = seq_len self.to_binary = to_binary assert (seq_len - 1) % 6 == 0, f'seq_len ({seq_len}) - 1 must be divisible by 6' self.kernel_size = (seq_len - 1) // 6 + 1 self.conv1 = nn.Conv2d(in_channels=1, out_channels=32, kernel_size=(self.kernel_size, 1), stride=(1, 1), padding=(0, 0), bias=True) self.conv2 = nn.Conv2d(in_channels=32, out_channels=64, kernel_size=(self.kernel_size, 1), stride=(1, 1), padding=(0, 0), bias=True) self.conv3 = nn.Conv2d(in_channels=64, out_channels=128, kernel_size=(self.kernel_size, 1), stride=(1, 1), padding=(0, 0), bias=True) self.conv4 = nn.Conv2d(in_channels=128, out_channels=256, kernel_size=(self.kernel_size, 1), stride=(1, 1), padding=(0, 0), bias=True) self.conv5 = nn.Conv2d(in_channels=256, out_channels=256, kernel_size=(self.kernel_size, 1), stride=(1, 1), padding=(0, 0), bias=True) self.conv6 = nn.Conv2d(in_channels=256, out_channels=256, kernel_size=(self.kernel_size, 1), stride=(1, 1), padding=(0, 0), bias=True) self.conv_final = nn.Conv2d(in_channels=256, out_channels=1, kernel_size=(1, 1), stride=(1, 1), padding=(0, 0), bias=True) self.init_weights()
[docs] def init_weights(self): init_layer(self.conv1) init_layer(self.conv2) init_layer(self.conv3) init_layer(self.conv4) init_layer(self.conv5) init_layer(self.conv6) init_layer(self.conv_final)
[docs] def forward(self, input): x = input x = x.view(x.shape[0], 1, x.shape[1], 1) x = F.relu(self.conv1(x)) x = F.relu(self.conv2(x)) x = F.relu(self.conv3(x)) x = F.relu(self.conv4(x)) x = F.relu(self.conv5(x)) x = F.relu(self.conv6(x)) x = self.conv_final(x) x = x.view(x.shape[0], x.shape[2]) if self.to_binary: return torch.sigmoid(x) return x
[docs]class CNN5(nn.Module): def __init__(self, seq_len=15, to_binary=False): super(CNN5, self).__init__() self.seq_len = seq_len self.to_binary = to_binary assert (seq_len - 3) % 4 == 0, f'seq_len ({seq_len}) - 3 should be divisible by 4' self.kernel_size = (seq_len - 3) // 4 self.conv1 = nn.Conv2d(in_channels=1, out_channels=32, kernel_size=(self.kernel_size, 1), stride=(1, 1), padding=(0, 0), bias=True) self.conv2 = nn.Conv2d(in_channels=32, out_channels=64, kernel_size=(self.kernel_size, 1), stride=(1, 1), padding=(0, 0), bias=True) self.conv3 = nn.Conv2d(in_channels=64, out_channels=128, kernel_size=(self.kernel_size, 1), stride=(1, 1), padding=(0, 0), bias=True) self.conv4 = nn.Conv2d(in_channels=128, out_channels=256, kernel_size=(self.kernel_size, 1), stride=(1, 1), padding=(0, 0), bias=True) self.conv5 = nn.Conv2d(in_channels=256, out_channels=256, kernel_size=(7, 1), stride=(1, 1), padding=(0, 0), bias=True) self.conv_final = nn.Conv2d(in_channels=256, out_channels=1, kernel_size=(1, 1), stride=(1, 1), padding=(0, 0), bias=True) self.init_weights()
[docs] def init_weights(self): init_layer(self.conv1) init_layer(self.conv2) init_layer(self.conv3) init_layer(self.conv4) init_layer(self.conv5) init_layer(self.conv_final)
[docs] def forward(self, input): x = input x = x.view(x.shape[0], 1, x.shape[1], 1) x = F.relu(self.conv1(x)) x = F.relu(self.conv2(x)) x = F.relu(self.conv3(x)) x = F.relu(self.conv4(x)) x = F.relu(self.conv5(x)) x = self.conv_final(x) x = x.view(x.shape[0], x.shape[2]) if self.to_binary: return torch.sigmoid(x) return x
[docs]class DilatedResidualBlock(nn.Module): def __init__(self, residual_channels, dilation_channels, skip_channels, kernel_size, dilation, bias): super(DilatedResidualBlock, self).__init__() self.residual_channels = residual_channels self.dilation_channels = dilation_channels self.skip_channels = skip_channels self.dilation = dilation self.dilated_conv = nn.Conv1d(residual_channels, 2 * dilation_channels, kernel_size=kernel_size, dilation=dilation, padding=dilation) self.mixing_conv = nn.Conv1d(dilation_channels, residual_channels + skip_channels, kernel_size=1, bias=False) self.init_weights()
[docs] def init_weights(self): init_layer(self.dilated_conv) init_layer(self.mixing_conv)
[docs] def forward(self, data_in): out = self.dilated_conv(data_in) out1 = out.narrow(-2, 0, self.dilation_channels) out2 = out.narrow(-2, self.dilation_channels, self.dilation_channels) tanh_out = torch.tanh(out1) sigm_out = torch.sigmoid(out2) data = torch.mul(tanh_out, sigm_out) data = self.mixing_conv(data) res = data.narrow(-2, 0, self.residual_channels) skip = data.narrow(-2, self.residual_channels, self.skip_channels) res = res + data_in return res, skip
[docs]class DilatedResidualBlock2(nn.Module): def __init__(self, residual_channels, dilation_channels, skip_channels, kernel_size, dilation, bias): super(DilatedResidualBlock2, self).__init__() self.residual_channels = residual_channels self.dilation_channels = dilation_channels self.skip_channels = skip_channels self.dilated_conv_tanh = nn.Conv1d(residual_channels, dilation_channels, kernel_size=kernel_size, dilation=dilation, padding=dilation, bias=bias) self.dilated_conv_sigmoid = nn.Conv1d(residual_channels, dilation_channels, kernel_size=kernel_size, dilation=dilation, padding=dilation, bias=bias) self.res = nn.Conv1d(dilation_channels, residual_channels, kernel_size=1, bias=True) self.skip = nn.Conv1d(dilation_channels, skip_channels, kernel_size=1, bias=True) self.init_weights()
[docs] def init_weights(self): init_layer(self.dilated_conv_tanh) init_layer(self.dilated_conv_sigmoid) init_layer(self.res) init_layer(self.skip)
[docs] def forward(self, data_in): out1 = self.dilated_conv_tanh(data_in) out2 = self.dilated_conv_sigmoid(data_in) tanh_out = torch.tanh(out1) sigm_out = torch.sigmoid(out2) data = F.mul(tanh_out, sigm_out) skip = self.skip(data) res = self.res(data) + data_in return res, skip
[docs]class WaveNet(S2S): """ .. _wavenilm: WaveNet model for load disaggregtion using residual and dilated convolutions. This model a sequence to subsequence model where: Output_sequence_length = Input_sequence_length - L L = (2 ** layers - 1) * (kernel_size - 1) + 1 """ def __init__(self, params): super(WaveNet, self).__init__(params) self.kernel_size = params['kernel_size'] if 'kernel_size' in params else 3 # has to be odd integer, since even integer may break dilated conv output size self.layers = params['layers'] if 'layers' in params else 6 assert self.kernel_size % 2 == 1, f'kernel_size ({self.kernel_size}) must be odd' self.to_binary = params['to_binary'] if 'to_binary' in params else False self.n_appliances = len(params['appliances']) if 'appliances' in params else 1 self.in_features = 1 # TODO: fix this whith according to input features self.seq_len = (2 ** self.layers - 1) * (self.kernel_size - 1) + 1 self.residual_channels = params['residual_channels'] if 'residual_channels' in params else 32 self.dilation_channels = params['dilation_channels'] if 'dilation_channels' in params else 32 self.skip_channels = params['skip_channels'] if 'skip_channels' in params else 32 self.causal_conv = nn.Conv1d(1, self.residual_channels, kernel_size=1, bias=True) self.blocks = [DilatedResidualBlock(self.residual_channels, self.dilation_channels, self.skip_channels, self.kernel_size, 2**i, True) for i in range(self.layers)] for i, block in enumerate(self.blocks): self.add_module(f"dilatedConv{i}", block) self.penultimate_conv = nn.Conv1d(self.skip_channels, self.skip_channels, kernel_size=self.kernel_size, padding=(self.kernel_size-1)//2, bias=True) self.final_conv = nn.Conv1d(self.skip_channels, self.n_appliances, kernel_size=self.kernel_size, padding=(self.kernel_size-1)//2, bias=True) self.init_weights()
[docs] def init_weights(self): init_layer(self.causal_conv) init_layer(self.penultimate_conv) init_layer(self.final_conv)
[docs] def forward(self, data_in): data_in = data_in.view(data_in.shape[0], self.in_features, data_in.shape[1]) data_out = self.causal_conv(data_in) skip_connections = [] for block in self.blocks: data_out, skip_out = block(data_out) skip_connections.append(skip_out) skip_out = skip_connections[0] for skip_other in skip_connections[1:]: skip_out = skip_out + skip_other data_out = F.relu(skip_out) data_out = self.penultimate_conv(data_out) #data_out = F.relu(data_out) data_out = self.final_conv(data_out) data_out = data_out.narrow(-1, self.seq_len//2, data_out.size()[-1]-self.seq_len) # data_out = data_out.view(data_out.shape[0], data_out.shape[2]) if self.to_binary: return torch.sigmoid(data_out) return data_out
[docs]class WaveNetBGRU(nn.Module): """ WaveNet model with a GRU :param nn: [description] :type nn: [type] :return: [description] :rtype: [type] """ def __init__(self, layers=6, kernel_size=3, residual_channels=32, dilation_channels=32, skip_channels=32): super(WaveNetBGRU, self).__init__() assert kernel_size % 2 == 1, f'kernel_size ({kernel_size}) must be odd' self.kernel_size = kernel_size # has to be odd integer, since even integer may break dilated conv output size self.seq_len = (2 ** layers - 1) * (kernel_size - 1) + 1 self.residual_channels = residual_channels self.dilation_channels = dilation_channels self.skip_channels = skip_channels self.causal_conv = nn.Conv1d(1, residual_channels, kernel_size=1, bias=False) self.blocks = [DilatedResidualBlock(residual_channels, dilation_channels, skip_channels, kernel_size, 2**i, True) for i in range(layers)] for i, block in enumerate(self.blocks): self.add_module(f"dilatedConv{i}", block) self.penultimate_conv = nn.Conv1d(skip_channels, skip_channels, kernel_size=kernel_size, padding=(kernel_size-1)//2, bias=True) self.final_conv = nn.Conv1d(skip_channels, 1, kernel_size=kernel_size, padding=(kernel_size-1)//2, bias=True) self.bgru = nn.GRU(input_size=skip_channels, hidden_size=skip_channels, num_layers=2, bias=True, batch_first=True, dropout=0., bidirectional=True) self.fc_final = nn.Linear(2 * skip_channels, 1) self.init_weights() def _init_param(self, param): if param.ndimension() == 1: param.data.fill_(0.) elif param.ndimension() == 2: n = param.size(-1) std = math.sqrt(2. / n) scale = std * math.sqrt(3.) param.data.uniform_(-scale, scale)
[docs] def init_weights(self): init_layer(self.causal_conv) init_layer(self.penultimate_conv) for param in self.bgru.parameters(): self._init_param(param) init_layer(self.fc_final)
[docs] def forward(self, data_in): data_in = data_in.view(data_in.shape[0], 1, data_in.shape[1]) data_out = self.causal_conv(data_in) skip_connections = [] for block in self.blocks: data_out, skip_out = block(data_out) skip_connections.append(skip_out) skip_out = skip_connections[0] for skip_other in skip_connections[1:]: skip_out = skip_out + skip_other data_out = F.relu(skip_out) data_out = self.penultimate_conv(data_out) '''(batch_size, feature_maps, time_steps)''' data_out = data_out.transpose(1, 2) '''(batch_size, time_steps, feature_maps)''' (data_out, h) = self.bgru(data_out) data_out = self.fc_final(data_out) '''(batch_size, time_steps, 1)''' data_out = data_out.view(data_out.shape[0 : 2]) '''(batch_size, time_steps)''' seq_len = self.seq_len width = data_out.shape[1] - seq_len + 1 output = data_out[:, seq_len // 2 : seq_len // 2 + width] '''(batch_size, width)''' return output
# return data_out.view(data_out.shape[0], data_out.shape[2])
[docs]class WaveNetBGRU_speedup(nn.Module): """ WaveNet with a GRU and faster generation fo predictions :param nn: [description] :type nn: [type] :return: [description] :rtype: [type] """ def __init__(self, layers=6, kernel_size=3, residual_channels=32, dilation_channels=32, skip_channels=32): super(WaveNetBGRU_speedup, self).__init__() assert kernel_size % 2 == 1, f'kernel_size ({kernel_size}) must be odd' self.kernel_size = kernel_size # has to be odd integer, since even integer may break dilated conv output size self.seq_len = (2 ** layers - 1) * (kernel_size - 1) + 1 self.residual_channels = residual_channels self.dilation_channels = dilation_channels self.skip_channels = skip_channels self.causal_conv = nn.Conv1d(1, residual_channels, kernel_size=1, bias=False) self.blocks = [DilatedResidualBlock(residual_channels, dilation_channels, skip_channels, kernel_size, 2**i, True) for i in range(layers)] for i, block in enumerate(self.blocks): self.add_module(f"dilatedConv{i}", block) self.penultimate_conv = nn.Conv1d(skip_channels, skip_channels, kernel_size=kernel_size, padding=(kernel_size-1)//2, bias=True) self.final_conv = nn.Conv1d(skip_channels, 1, kernel_size=kernel_size, padding=(kernel_size-1)//2, bias=True) self.bgru = nn.GRU(input_size=skip_channels, hidden_size=skip_channels, num_layers=2, bias=True, batch_first=True, dropout=0., bidirectional=True) self.fc_final = nn.Linear(2 * skip_channels, 1) self.init_weights() def _init_param(self, param): if param.ndimension() == 1: param.data.fill_(0.) elif param.ndimension() == 2: n = param.size(-1) std = math.sqrt(2. / n) scale = std * math.sqrt(3.) param.data.uniform_(-scale, scale)
[docs] def init_weights(self): init_layer(self.causal_conv) init_layer(self.penultimate_conv) for param in self.bgru.parameters(): self._init_param(param) init_layer(self.fc_final)
[docs] def forward(self, data_in): data_in = data_in.view(data_in.shape[0], 1, data_in.shape[1]) data_out = self.causal_conv(data_in) skip_connections = [] for block in self.blocks: data_out, skip_out = block(data_out) skip_connections.append(skip_out) skip_out = skip_connections[0] for skip_other in skip_connections[1:]: skip_out = skip_out + skip_other data_out = F.relu(skip_out) data_out = self.penultimate_conv(data_out) '''(batch_size, feature_maps, time_steps)''' data_out = data_out.transpose(1, 2) '''(batch_size, time_steps, feature_maps)''' seq_len = self.seq_len width = data_out.shape[1] - seq_len + 1 data_out = data_out[:, seq_len // 2 : seq_len // 2 + width] '''(batch_size, width)''' (data_out, h) = self.bgru(data_out) data_out = self.fc_final(data_out) '''(batch_size, time_steps, 1)''' output = data_out.view(data_out.shape[0 : 2]) '''(batch_size, time_steps)''' return output
# return data_out.view(data_out.shape[0], data_out.shape[2])