from collections import OrderedDict
import numpy as np
import pandas as pd
import torch
import sys
from pathlib import Path
import pytorch_lightning as pl
from nilmtk.disaggregate import Disaggregator
import mlflow
from functools import partial
from sklearn.model_selection import TimeSeriesSplit
import optuna
import os
import warnings
warnings.filterwarnings("ignore")
import joblib
import pickle
from ..config import get_exp_parameters
from ..preprocessing.pre_processing import data_preprocessing
from ..utils.utils import get_latest_checkpoint
from ..model.model_pil import pilModel
from ..config import NILM_MODELS
from ..utils import DictLogger
[docs]class NILMExperiment(Disaggregator):
"""
This class defines a NILM experiment. It is compatibale with both
single and multi-appliance models and offers different advanced features
like cross-validation and hyper-parametrs optimization during the
training phase. The class is independent of the deep model used for
load disaggregation.
.. note:: For a PyTorch model to be compatible with this class, an entry should be added for this model in the config module.
"""
def __init__(self, params):
"""
Initialise the environmental parametrs for the NILM experiment. For the
hyper-parameters, it takes the default values defined in the config module
and updates only the subset of values specified in params.
:param params: Dictionnary with different values of hyper-parameters.
:type params: dictionnary
"""
super().__init__()
hparams = get_exp_parameters()
hparams = vars(hparams.parse_args())
hparams.update(params)
pl.seed_everything(hparams['seed'])
self._data = None
self.models = OrderedDict()
self.data_loaders = OrderedDict()
self.MODEL_NAME = hparams['model_name']
self.hparams = hparams
# Paramter for MLflow
self.run_id = OrderedDict()
# Parameters for Optuna
self.optuna_params = OrderedDict()
self.best_trials ={}
# Parameter for appliances
self.appliance_params ={}
def _prepare_data(self, mains, sub_main):
"""
Performs data pre-processing and formating. By default, the default pre-processing
method in used. Neverthless, custom pre-processing methdos are also possible
to use and need only to be specified in the corresponding entry of the model
within the config module within the extra_params. For example:
NILM_MODELS = {
...
'NILMmodel': {
'model': modelClass,
'loader': dataLoaderClass,
'extra_params':{
'pre-process': preprocessingFunction
}
},
...
}
:param mains: aggregtae power consumption
:type mains: List of pd.DataFrame
:param sub_main: sub metered energy consumption
:type sub_main: List of pd.DataFrame
"""
# Check if the data is not already loaded
if self._data is None:
# Check if the specified model require a custom preprocessing function
preprocess_func = NILM_MODELS[self.hparams['model_name']]['extra_params']['pre-process'] if 'pre-process' in NILM_MODELS[self.hparams['model_name']]['extra_params'] else data_preprocessing
# Pre-processing
mains, multi_appliance_meters, single_appliance_meters = preprocess_func(mains,
sub_main,
self.hparams['feature_type'],
self.hparams['alpha'],
self.hparams['input_norm'],
self.hparams['main_mu'],
self.hparams['main_std'],
self.hparams['q_filter'])
# Data fromating according to the type of the model ['single-appliance', 'multi-appliance']
self._data = {
"features":mains,
"targets":multi_appliance_meters
} if self.hparams["multi_appliance"] else {
"features":mains,
"targets":single_appliance_meters
}
[docs] def partial_fit(self, mains, sub_main,do_preprocessing=True, **load_kwargs):
""" Trains the model for appliances according to the model name specified
in the experiment's definition. It starts with the data pre-processing and
formatting and then train the model based on the type of the model(single
or multi-task).
:param mains: Aggregate power measurements.
:type mains: Liste of pd.DataFrame
:param sub_main: Appliances power measurements.
:type sub_main: Liste of pd.DataFrame
:param do_preprocessing: Performs pre-processing or not. Defaults to True., defaults to True
:type do_preprocessing: bool, optional
"""
# Check if the fitting was not already done
if self._data is None:
# Data pre-processing
if do_preprocessing:
self._prepare_data(mains, sub_main)
# Preparing folders for saving results
logs = Path(self.hparams['logs_path']) # logs/ log files containing the details of the execution
results = Path(self.hparams['results_path']) # results/ csv and pickle files recording the testing results
figures = Path(self.hparams['figure_path']) # figures/ folder to save figures
logs.mkdir(parents=True, exist_ok=True)
logs.mkdir(parents=True, exist_ok=True)
results.mkdir(parents=True, exist_ok=True)
figures.mkdir(parents=True, exist_ok=True)
# Select a fitting strategy according to the model type
if not self.hparams['multi_appliance']:
self.single_appliance_fit()
else:
self.multi_appliance_fit()
[docs] def disaggregate_chunk(self,test_main_list,do_preprocessing=True):
"""
Uses trained models to disaggregate the test_main_list. It is compatible with both single and multi-appliance models.
:param test_main_list: Aggregate power measurements.
:type test_main_list: Liste of pd.DataFrame
:param do_preprocessing: Specify if pre-processing need to be done or not, defaults to True
:type do_preprocessing: bool, optional
:return: Appliances power measurements.
:rtype: list of pd.DataFrame
"""
if not self.hparams['multi_appliance']:
test_predictions = self.single_appliance_disaggregate(test_main_list, do_preprocessing = do_preprocessing)
return test_predictions
else:
test_predictions = self.multi_appliance_disaggregate(test_main_list, do_preprocessing = do_preprocessing)
return test_predictions
[docs] def single_appliance_disaggregate(self, test_main_list, model=None,do_preprocessing=True):
"""
Perfroms load disaggregtaion for single appliance models. If Optuna was used during the
training phase, it disaggregtaes the test_main_list using only the best trial.
If cross-validation is used during training, it returns the average of predictions
cross all folds for each applaince. In this later case, the predictions for each fold
are also logged in the results folder under the name
['model_name']_[appliance_name]_all_folds_predictions.p.
Alternatively, when both Optuna and cross-validation are used, it returns the average predictions
of all folds for only the best trial.
:param test_main_list: Aggregate power measurements
:type test_main_list: liste of pd.DataFrame
:param model: Pre-trained appliance's models. Defaults to None.
:type model: dict, optional
:param do_preprocessing: Specify if pre-processing need to be done or not, defaults to True
:type do_preprocessing: bool, optional
:return: estimated power consumption of the considered appliances.
:rtype: liste of dict
"""
if model is not None:
self.models = model
if do_preprocessing:
test_main_list = data_preprocessing(test_main_list, None,
self.hparams['feature_type'],
self.hparams['alpha'],
self.hparams['input_norm'],
self.hparams['main_mu'],
self.hparams['main_std'],
self.hparams['q_filter'])
test_predictions = []
test_results = []
for test_main in test_main_list:
test_main = test_main.values
disggregation_dict = {}
result_dict = {}
for appliance in self.models:
dataloader = self.data_loaders[appliance]
model = self.models[appliance]
data = dataloader(inputs=test_main,
targets=None,
params = self.hparams )
test_loader = torch.utils.data.DataLoader(data,
self.hparams['batch_size'],
collate_fn=
NILM_MODELS[self.hparams['model_name']]['extra_params']['collate_fns'](
self.hparams, sample=False) if 'collate_fns' in NILM_MODELS[self.hparams['model_name']]['extra_params'] else None,
shuffle=False,
num_workers=self.hparams['num_workers'])
exp_name = self.hparams['checkpoints_path']+f"{self.exp_name}_{appliance}"
if self.hparams['use_optuna']:
exp_name += f'/trial_{self.best_trials[appliance]}/'
if self.hparams['kfolds'] > 1:
# TODO: check if average cross all folds or only the best model
app_result_cross_fold =[]
dump_results ={}
for fold in model:
#load checkpoints
checkpoint_path = get_latest_checkpoint(exp_name+f'/{fold}')
chechpoint=torch.load(checkpoint_path)
model_fold = model[fold]
model_fold.load_state_dict(chechpoint['state_dict'])
model_fold.eval()
network = model_fold.model.eval()
if self.hparams['target_norm'] == 'z-norm' :
network.mean = self.appliance_params[appliance]['mean']
network.std = self.appliance_params[appliance]['std']
elif self.hparams['target_norm'] == 'min-max' :
network.min = self.appliance_params[appliance]['min']
network.max = self.appliance_params[appliance]['max']
results = network.predict(model_fold, test_loader)
df = results['pred'].cpu().numpy().flatten()
app_result_cross_fold.append(df)
dump_results [fold] = df
dump_results ['mean_preditions'] = pd.Series(np.mean(np.array(app_result_cross_fold), axis=0))
dump_results ['std_predictions'] = pd.Series(np.std(np.array(app_result_cross_fold), axis=0))
dump_results ['min_predictions'] = pd.Series(np.min(np.array(app_result_cross_fold), axis=0))
dump_results ['max_predictions'] = pd.Series(np.max(np.array(app_result_cross_fold), axis=0))
pickle.dump(dump_results, open(f"{self.hparams['results_path']}/{self.hparams['model_name']}_{appliance}_all_folds_predictions.p", "wb"))
df = pd.Series(np.mean(np.array(app_result_cross_fold), axis=0))
else:
#load checkpoints
checkpoint_path = get_latest_checkpoint(exp_name)
chechpoint=torch.load(checkpoint_path)
model.load_state_dict(chechpoint['state_dict'])
model.eval()
network = model.model.eval()
if self.hparams['target_norm'] == 'z-norm' :
network.mean = self.appliance_params[appliance]['mean']
network.std = self.appliance_params[appliance]['std']
elif self.hparams['target_norm'] == 'min-max' :
network.min = self.appliance_params[appliance]['min']
network.max = self.appliance_params[appliance]['max']
results = network.predict(model, test_loader)
df = pd.Series(results['pred'].cpu().numpy().flatten())
disggregation_dict[appliance] = df
result_dict[appliance]=results
# Tracking results of the current applaince
appliance_results = {}
for key in results:
appliance_results[key] = pd.Series(results[key].cpu().numpy().flatten())
# Saving files in the disk
appliance_results = pd.DataFrame(appliance_results)
# TODO:should be logged in a subdirectory
# Create couple of artifact files under the directory "data"
os.makedirs(self.hparams['results_path']+f'/{appliance}', exist_ok=True)
appliance_results.to_csv(self.hparams['results_path']+f'/{appliance}/{self.exp_name}.csv', index = False)
# Logging results relative the current appliance
mlflow.set_experiment(appliance)
if self.hparams['log_artificat']:
with mlflow.start_run(self.run_id[appliance]):
mlflow.log_artifacts(self.hparams['results_path']+f'/{appliance}', artifact_path="test_results")
results = pd.DataFrame(disggregation_dict, dtype='float32')
test_predictions.append(results)
test_results.append(result_dict)
np.save(self.hparams['results_path']+f"{self.exp_name}.npy", test_results)
return test_predictions
[docs] def objective(self, trial, train_loader=None, val_loader=None, fold_idx= None):
"""The objective function to be used with optuna. This function requires the model under study to
implement a static function called suggest_hparams() [see the model documentation for more informations]
:param trial: Optuna.trial
:param train_loader: training dataLoader for the current experiment. Defaults to None.
:type train_loader: DataLoader, optional
:param val_loader: validation dataLoader for the current experiment. Defaults to None.
:type val_loader: DataLoader, optional
:param fold_idx: Number of the fold of cross-validation is used. Defaults to None.
:type fold_idx: int, optional
:raises Exception: In case the model does not suggest any parameters.
:return: The best validation loss aschieved
:rtype: float
"""
#
# Initialize the best_val_loss value
best_val_loss = float('Inf')
mlflow.set_experiment(f'{self.optuna_params["appliance_name"]}')
# Start a new mlflow run
with mlflow.start_run():
# Get hyperparameter suggestions created by Optuna
# and log them as params using mlflow
suggested_params_func = NILM_MODELS[self.hparams['model_name']]['model'].suggest_hparams
if callable(suggested_params_func):
suggested_params = NILM_MODELS[self.hparams['model_name']]['model'].suggest_hparams(None,trial)
self.hparams.update(suggested_params)
mlflow.log_params(suggested_params)
else:
raise Exception('''
No params to optimise by optuna
A static function inside the NILM model should provide
a dictionnary of params suggested by optuna
see documentation for more details
''')
# Check if the appliance was already trained. If not then create a new model for it
print("First model training for", self.optuna_params["appliance_name"])
net, dataloader = self.get_net_and_loaders()
# To use only if the Cross validation is not used
if (train_loader is None) or (val_loader is None):
self.data_loaders[self.optuna_params["appliance_name"]]=dataloader
dataloader = self.data_loaders[self.optuna_params["appliance_name"]]
data = dataloader(inputs=self._data['features'], targets=self.optuna_params["power"])
train_data, val_data = torch.utils.data.random_split(data,
[int(data.len*(1-0.15)), data.len - int(data.len*(1-0.15))],
generator=torch.Generator().manual_seed(42))
train_loader = torch.utils.data.DataLoader(train_data, self.hparams['batch_size'], shuffle=True,
collate_fn=
NILM_MODELS[self.hparams['model_name']]['extra_params']['collate_fns'](
self.hparams, sample=True) if 'collate_fns' in NILM_MODELS[self.hparams['model_name']]['extra_params'] else None,
num_workers=self.hparams['num_workers'])
val_loader = torch.utils.data.DataLoader(val_data,
self.hparams['batch_size'],
shuffle=False,
collate_fn=
NILM_MODELS[self.hparams['model_name']]['extra_params']['collate_fns'](
self.hparams, sample=False) if 'collate_fns' in NILM_MODELS[self.hparams['model_name']]['extra_params'] else None,
num_workers=self.hparams['num_workers'])
# Auto log all MLflow from lightening
mlflow.pytorch.autolog()
# Model Training
if fold_idx is None:
self.models[self.optuna_params["appliance_name"]] = pilModel(net, self.hparams)
best_val_loss, path = self.train_model(
self.optuna_params["appliance_name"],
train_loader, val_loader,
self.optuna_params['exp_name'],
data.mean if self.hparams['target_norm'] == 'z-norm' else None,
data.std if self.hparams['target_norm'] == 'z-norm' else None,
trial_idx = trial.number)
else:
self.models[self.optuna_params["appliance_name"]][f'fold_{fold_idx}'] = pilModel(net, self.hparams)
best_val_loss, path = self.train_model(
self.optuna_params["appliance_name"],
train_loader, val_loader,
self.optuna_params['exp_name'],
data.mean if self.hparams['target_norm'] == 'z-norm' else None,
data.std if self.hparams['target_norm'] == 'z-norm' else None,
trial_idx = trial.number,
fold_idx= fold_idx,
model = self.models[self.optuna_params["appliance_name"]][f'fold_{fold_idx}'])
# saving the trained model
trial.set_user_attr(key='best_run_id', value = mlflow.active_run().info.run_id)
trial.set_user_attr(key="trial_ID", value= trial.number )
trial.set_user_attr(key="path", value= path )
return best_val_loss
[docs] def objective_cv(self, trial):
"""The objective function for Optuna when cross-validation is also used
:param trial: An optuna trial
:type trial: Optuna.Trial
:return: average of best loss validations for considered folds
:rtype: float
"""
fold = TimeSeriesSplit(n_splits=self.hparams['kfolds'], test_size=self.hparams['test_size'], gap = self.hparams['gap'])
scores = []
#select model and data loaders to use
_, dataloader = self.get_net_and_loaders()
self.models[self.optuna_params["appliance_name"]] = {}
self.data_loaders[self.optuna_params["appliance_name"]]=dataloader
dataloader = self.data_loaders[self.optuna_params["appliance_name"]]
dataset = dataloader(inputs=self._data['features'], targets=self.optuna_params["power"])
for fold_idx, (train_idx, valid_idx) in enumerate(fold.split(range(len(dataset)))):
print(f'started training for the fold {fold_idx}.')
train_data = torch.utils.data.Subset(dataset, train_idx)
val_data = torch.utils.data.Subset(dataset, valid_idx)
train_loader = torch.utils.data.DataLoader(train_data, self.hparams['batch_size'], shuffle=True,
collate_fn=
NILM_MODELS[self.hparams['model_name']]['extra_params']['collate_fns'](
self.hparams, sample=True) if 'collate_fns' in NILM_MODELS[self.hparams['model_name']]['extra_params'] else None,
num_workers=self.hparams['num_workers'])
val_loader = torch.utils.data.DataLoader(val_data,
self.hparams['batch_size'],
shuffle=False,
collate_fn=
NILM_MODELS[self.hparams['model_name']]['extra_params']['collate_fns'](
self.hparams, sample=False) if 'collate_fns' in NILM_MODELS[self.hparams['model_name']]['extra_params'] else None,
num_workers=self.hparams['num_workers'])
mae_loss = self.objective(trial, train_loader, val_loader, fold_idx)
scores.append(mae_loss)
return np.mean(scores)
[docs] def get_net_and_loaders(self):
"""Returns an instance of the specified model and the correspanding dataloader
:return: (model , dataloader)
:rtype: tuple(nn.Module, torch.utils.data.Dataset)
"""
# Get the class of the required model from the config file
net = NILM_MODELS[self.hparams['model_name']]['model'](self.hparams)
# Get the class of the related dataloader from the config file
data = partial(
NILM_MODELS[self.hparams['model_name']]['loader'],
params = self.hparams)
return net, data
[docs] def save_best_model(self, study, trial):
"""Keeps track of the trial giving best results
:param study: Optuna study
:param trial: Optuna trial
"""
if study.best_trial.number == trial.number:
study.set_user_attr(key="trial_ID", value=trial.number)
study.set_user_attr(key="best_run_id", value=trial.user_attrs["best_run_id"])
study.set_user_attr(key="path", value=trial.user_attrs["path"])
[docs] def single_appliance_fit(self):
"""
Train the specified models for each appliance separately taking into consideration
the use of cross-validation and hyper-parameters optimisation. The checkpoints for
each model are saved in the correspondng path.
"""
self.exp_name = f"{self.hparams['model_name']}_{self.hparams['data']}_single_appliance_{self.hparams['experiment_label']}"
original_checkpoint = self.hparams['checkpoints_path']
for appliance_name, power in self._data['targets']:
exp_name = f"{self.exp_name}_{appliance_name}"
checkpoints = Path(original_checkpoint +f"{exp_name}")
checkpoints.mkdir(parents=True, exist_ok=True)
#update checkpoint path
new_params = {"checkpoints_path": original_checkpoint +f"{exp_name}",
"appliances":[appliance_name]
}
self.hparams.update(new_params)
print(f"fit model for {exp_name}")
if self.hparams['use_optuna']:
# Use Optuna fot parameter optimisation of the model
study = optuna.create_study(study_name=exp_name, direction="minimize")
self.optuna_params ={
'power' :power,
'appliance_name':appliance_name,
'exp_name': exp_name
}
if self.hparams['kfolds'] <= 1:
study.optimize(self.objective, n_trials=self.hparams['n_trials'], callbacks=[self.save_best_model])
# Load weights of the model
app_model, _ = self.get_net_and_loaders()
#TODO: load checkpoints
chechpoint=torch.load(study.user_attrs["path"])
model = pilModel(app_model, self.hparams)
model.hparams['checkpoint_path'] = study.user_attrs["path"]
model.load_state_dict(chechpoint['state_dict'])
model.eval()
# Save best model for testing time
self.models[appliance_name] = model
else:
study.optimize(self.objective_cv, n_trials=self.hparams['n_trials'], callbacks=[self.save_best_model])
# Save figures
try:
fig1 = optuna.visualization.plot_param_importances(study)
fig2 = optuna.visualization.plot_parallel_coordinate(study)
fig2.write_image(self.hparams['checkpoints_path'] +'/_parallel_coordinate.pdf')
fig1.write_image(self.hparams['checkpoints_path'] +'/_param_importance.pdf')
except:
pass
results_df = study.trials_dataframe()
results_df.to_csv( f'{self.hparams["checkpoints_path"]}/Seq2Point_Study_{exp_name}_{appliance_name}.csv')
joblib.dump(study, f'{self.hparams["checkpoints_path"]}/Seq2Point_Study_{exp_name}_{appliance_name}.pkl')
# Restoring the best model and use it for the testing
self.best_trials[appliance_name] = study.best_trial.number
app_model, _ = self.get_net_and_loaders()
#TODO: load checkpoints
self.run_id[appliance_name] = study.user_attrs["best_run_id"]
else:
# Check if the appliance was already trained. If not then create a new model for it
if self.hparams['kfolds'] > 1:
self.models[appliance_name] ={}
# Getting the required data for the appliance
_, dataloader = self.get_net_and_loaders()
self.data_loaders[appliance_name]=dataloader
dataset = dataloader(inputs=self._data['features'], targets=power)
# Splitting the data into several folds
fold = TimeSeriesSplit(n_splits=self.hparams['kfolds'], test_size=self.hparams['test_size'], gap = self.hparams['gap'])
scores = []
# Using each fold as a validation data
for fold_idx, (train_idx, valid_idx) in enumerate(fold.split(range(len(dataset)))):
print(f'started training for the fold {fold_idx}.')
app_model, _ = self.get_net_and_loaders()
self.models[appliance_name][f'fold_{fold_idx}'] = pilModel(app_model, self.hparams)
train_data = torch.utils.data.Subset(dataset, train_idx)
val_data = torch.utils.data.Subset(dataset, valid_idx)
train_loader = torch.utils.data.DataLoader(train_data,
self.hparams['batch_size'],
shuffle=True,
collate_fn=
NILM_MODELS[self.hparams['model_name']]['extra_params']['collate_fns'](
self.hparams, sample=True) if 'collate_fns' in NILM_MODELS[self.hparams['model_name']]['extra_params'] else None,
num_workers=self.hparams['num_workers'])
val_loader = torch.utils.data.DataLoader(val_data,
self.hparams['batch_size'],
shuffle=False,
collate_fn=
NILM_MODELS[self.hparams['model_name']]['extra_params']['collate_fns'](
self.hparams, sample=False) if 'collate_fns' in NILM_MODELS[self.hparams['model_name']]['extra_params'] else None,
num_workers=self.hparams['num_workers'])
# select experiment if does not exist create it
# an experiment is created for each appliance
mlflow.set_experiment(f'{appliance_name}')
# Start a new for the current appliance
with mlflow.start_run(run_name=self.hparams['model_name']):
# Auto log all MLflow from lightening
mlflow.pytorch.autolog()
# Save the run ID to use in testing phase
self.run_id[appliance_name] = mlflow.active_run().info.run_id
# Log parameters of current run
mlflow.log_params(self.hparams)
# Model Training
mae_loss = self.train_model(appliance_name,
train_loader,
val_loader,
exp_name,
dataset.mean if self.hparams['target_norm'] == 'z-norm' else None,
dataset.std if self.hparams['target_norm'] == 'z-norm' else None,
fold_idx = fold_idx,
model=self.models[appliance_name][f'fold_{fold_idx}'])
scores.append(mae_loss)
else:
if appliance_name not in self.models:
print("First model training for", appliance_name)
#select model and data loaders to use
net, dataloader = self.get_net_and_loaders()
self.models[appliance_name] = pilModel(net, self.hparams)
self.data_loaders[appliance_name]=dataloader
# Retrain the particular appliance
else:
print("Started Retraining model for", appliance_name)
dataloader = self.data_loaders[appliance_name]
data = dataloader(inputs=self._data['features'], targets=power)
train_data, val_data=torch.utils.data.random_split(data,
[int(data.len*(1-0.15)), data.len - int(data.len*(1-0.15))],
generator=torch.Generator().manual_seed(42))
train_loader = torch.utils.data.DataLoader(train_data, self.hparams['batch_size'], shuffle=True,
collate_fn=
NILM_MODELS[self.hparams['model_name']]['extra_params']['collate_fns'](
self.hparams, sample=True) if 'collate_fns' in NILM_MODELS[self.hparams['model_name']]['extra_params'] else None,
num_workers=self.hparams['num_workers'])
val_loader = torch.utils.data.DataLoader(val_data,
self.hparams['batch_size'],
shuffle=False,
collate_fn=
NILM_MODELS[self.hparams['model_name']]['extra_params']['collate_fns'](
self.hparams, sample=False) if 'collate_fns' in NILM_MODELS[self.hparams['model_name']]['extra_params'] else None,
num_workers=self.hparams['num_workers'])
# select experiment if does not exist create it
# an experiment is created for each appliance
mlflow.set_experiment(f'{appliance_name}')
# Start a new for the current appliance
with mlflow.start_run():
# Auto log all MLflow from lightening
mlflow.pytorch.autolog()
# Save the run ID to use in testing phase
self.run_id[appliance_name] = mlflow.active_run().info.run_id
# Log parameters of current run
mlflow.log_params(self.hparams)
# Model Training
self.train_model(
appliance_name,
train_loader,
val_loader,
exp_name,
data.mean if self.hparams['target_norm'] == 'z-norm' else 0,
data.std if self.hparams['target_norm'] == 'z-norm' else 1)
new_params = {"checkpoints_path": original_checkpoint }
self.hparams.update(new_params)
[docs] def train_model(self,
appliance_name,
train_loader,
val_loader,
exp_name,
mean = None,
std = None,
trial_idx= None,
fold_idx= None,
model =None):
"""Trains a single PyTorch model.
:param appliance_name: Name of teh appliance to be modeled
:type appliance_name: str
:param train_loader: training dataLoader for the current appliance
:type train_loader: DataLoader
:param val_loader: validation dataLoader for the current appliance
:type val_loader: DataLoader
:param exp_name: the name of the experiment
:type exp_name: str
:param mean: mean value of the target appliance power. Defaults to None.
:type mean: float, optional
:param std: std value of the target applaince power. Defaults to None.
:type std: float, optional
:param trial_idx: ID of the current optuna trial if optuna is used. Defaults to None.
:type trial_idx: int, optional
:param fold_idx: the number of the fold if CV is used. Defaults to None.
:type fold_idx: int, optional
:param model: Lightning model of the current appliance. Defaults to None.
:return: in the case of using Optuna, it return the best validation loss and the path to the best checkpoint.
:rtype: tuple(int, str)
"""
chkpt_path = self.hparams['checkpoints_path']
version = ''
if trial_idx is not None:
chkpt_path += f"/trial_{trial_idx}"
version += f"/trial_{trial_idx}"
if fold_idx is not None:
chkpt_path += f"/fold_{fold_idx}"
version += f"/fold_{fold_idx}"
best_checkpoint=get_latest_checkpoint(chkpt_path)
model = model if model is not None else self.models[appliance_name]
if self.hparams['target_norm'] == 'z-norm':
self.appliance_params[appliance_name] = {
'mean': mean,
'std': std
}
checkpoint_callback = pl.callbacks.model_checkpoint.ModelCheckpoint(dirpath = chkpt_path,
monitor = 'val_mae',
mode="min",
save_top_k = 1)
early_stop_callback = pl.callbacks.EarlyStopping(monitor ='val_mae',
min_delta=1e-4,
patience = self.hparams['patience_check'],
mode="min")
logger = DictLogger(self.hparams['logs_path'],
name = exp_name,
version = "single_appliance_experiment" + version if version !='' else "single_appliance_experiment" )
trainer = pl.Trainer(logger = logger,
gradient_clip_val=self.hparams['clip_value'],
# checkpoint_callback=checkpoint_callback,
max_epochs = self.hparams['max_nb_epochs'],
callbacks=[early_stop_callback, checkpoint_callback],
gpus=-1 if torch.cuda.is_available() else None,
resume_from_checkpoint=best_checkpoint if not self.hparams['use_optuna'] else None)
if self.hparams['train']:
trainer.fit(model, train_loader, val_loader)
if len(logger.metrics) >= 2:
if self.hparams['use_optuna']:
# TODO: Get the minimal validation loss and not the last one
return logger.metrics[-2]["val_loss"], checkpoint_callback.best_model_path
[docs] def multi_appliance_disaggregate(self, test_main_list, model=None,do_preprocessing=True):
return None
[docs] def multi_appliance_fit(self):
"""
Train the specified models for each appliance separately taking into consideration
the use of cross-validation and hyper-parameters optimisation. The checkpoints for
each model are saved in the correspondng path.
"""
self.exp_name = f"{self.hparams['model_name']}_{self.hparams['data']}_single_appliance_{self.hparams['experiment_label']}"
original_checkpoint = self.hparams['checkpoints_path']
exp_name = f"{self.exp_name}_multi_app"
checkpoints = Path(original_checkpoint +f"{exp_name}")
checkpoints.mkdir(parents=True, exist_ok=True)
#update checkpoint path
new_params = {"checkpoints_path": original_checkpoint +f"{exp_name}"}
self.hparams.update(new_params)
print(f"fit model for {exp_name}")
if self.hparams['use_optuna']:
# Use Optuna fot parameter optimisation of the model
study = optuna.create_study(study_name=exp_name, direction="minimize")
self.optuna_params ={
'power' :power,
'appliance_name':appliance_name,
'exp_name': exp_name
}
if self.hparams['kfolds'] <= 1:
study.optimize(self.objective, n_trials=self.hparams['n_trials'], callbacks=[self.save_best_model])
# Load weights of the model
app_model, _ = self.get_net_and_loaders()
#TODO: load checkpoints
chechpoint=torch.load(study.user_attrs["path"])
model = pilModel(app_model, self.hparams)
model.hparams['checkpoint_path'] = study.user_attrs["path"]
model.load_state_dict(chechpoint['state_dict'])
model.eval()
# Save best model for testing time
self.models[appliance_name] = model
else:
study.optimize(self.objective_cv, n_trials=self.hparams['n_trials'], callbacks=[self.save_best_model])
# Save figures
try:
fig1 = optuna.visualization.plot_param_importances(study)
fig2 = optuna.visualization.plot_parallel_coordinate(study)
fig2.write_image(self.hparams['checkpoints_path'] +'/_parallel_coordinate.pdf')
fig1.write_image(self.hparams['checkpoints_path'] +'/_param_importance.pdf')
except:
pass
results_df = study.trials_dataframe()
results_df.to_csv( f'{self.hparams["checkpoints_path"]}/Seq2Point_Study_{exp_name}_{appliance_name}.csv')
joblib.dump(study, f'{self.hparams["checkpoints_path"]}/Seq2Point_Study_{exp_name}_{appliance_name}.pkl')
# Restoring the best model and use it for the testing
self.best_trials[appliance_name] = study.best_trial.number
app_model, _ = self.get_net_and_loaders()
#TODO: load checkpoints
self.run_id[appliance_name] = study.user_attrs["best_run_id"]
else:
# Check if the appliance was already trained. If not then create a new model for it
if self.hparams['kfolds'] > 1:
self.models[appliance_name] ={}
# Getting the required data for the appliance
_, dataloader = self.get_net_and_loaders()
self.data_loaders[appliance_name]=dataloader
dataset = dataloader(inputs=self._data['features'], targets=power)
# Splitting the data into several folds
fold = TimeSeriesSplit(n_splits=self.hparams['kfolds'], test_size=self.hparams['test_size'], gap = self.hparams['gap'])
scores = []
# Using each fold as a validation data
for fold_idx, (train_idx, valid_idx) in enumerate(fold.split(range(len(dataset)))):
print(f'started training for the fold {fold_idx}.')
app_model, _ = self.get_net_and_loaders()
self.models[appliance_name][f'fold_{fold_idx}'] = pilModel(app_model, self.hparams)
train_data = torch.utils.data.Subset(dataset, train_idx)
val_data = torch.utils.data.Subset(dataset, valid_idx)
train_loader = torch.utils.data.DataLoader(train_data,
self.hparams['batch_size'],
shuffle=True,
collate_fn=
NILM_MODELS[self.hparams['model_name']]['extra_params']['collate_fns'](
self.hparams, sample=True) if 'collate_fns' in NILM_MODELS[self.hparams['model_name']]['extra_params'] else None,
num_workers=self.hparams['num_workers'])
val_loader = torch.utils.data.DataLoader(val_data,
self.hparams['batch_size'],
shuffle=False,
collate_fn=
NILM_MODELS[self.hparams['model_name']]['extra_params']['collate_fns'](
self.hparams, sample=False) if 'collate_fns' in NILM_MODELS[self.hparams['model_name']]['extra_params'] else None,
num_workers=self.hparams['num_workers'])
# select experiment if does not exist create it
# an experiment is created for each appliance
mlflow.set_experiment(f'{appliance_name}')
# Start a new for the current appliance
with mlflow.start_run(run_name=self.hparams['model_name']):
# Auto log all MLflow from lightening
mlflow.pytorch.autolog()
# Save the run ID to use in testing phase
self.run_id[appliance_name] = mlflow.active_run().info.run_id
# Log parameters of current run
mlflow.log_params(self.hparams)
# Model Training
mae_loss = self.train_model(appliance_name,
train_loader,
val_loader,
exp_name,
dataset.mean if self.hparams['target_norm'] == 'z-norm' else None,
dataset.std if self.hparams['target_norm'] == 'z-norm' else None,
fold_idx = fold_idx,
model=self.models[appliance_name][f'fold_{fold_idx}'])
scores.append(mae_loss)
else:
if 'Multi-appliance' not in self.models:
print("First model training for Multi-appliance model")
#select model and data loaders to use
net, dataloader = self.get_net_and_loaders()
self.models['Multi-appliance'] = pilModel(net, self.hparams)
self.data_loaders['Multi-appliance'] = dataloader
# Retrain the particular appliance
else:
print("Started Retraining Muti-appliance model")
dataloader = self.data_loaders['Multi-appliance']
data = dataloader(inputs=self._data['features'], targets=self._data['targets'])
train_data, val_data=torch.utils.data.random_split(data,
[int(data.len*(1-0.15)), data.len - int(data.len*(1-0.15))],
generator=torch.Generator().manual_seed(42))
train_loader = torch.utils.data.DataLoader(train_data, self.hparams['batch_size'], shuffle=True,
collate_fn=
NILM_MODELS[self.hparams['model_name']]['extra_params']['collate_fns'](
self.hparams, sample=True) if 'collate_fns' in NILM_MODELS[self.hparams['model_name']]['extra_params'] else None,
num_workers=self.hparams['num_workers'])
val_loader = torch.utils.data.DataLoader(val_data,
self.hparams['batch_size'],
shuffle=False,
collate_fn=
NILM_MODELS[self.hparams['model_name']]['extra_params']['collate_fns'](
self.hparams, sample=False) if 'collate_fns' in NILM_MODELS[self.hparams['model_name']]['extra_params'] else None,
num_workers=self.hparams['num_workers'])
# select experiment if does not exist create it
# an experiment is created for each appliance
mlflow.set_experiment(f'Multi-appliance')
# Start a new for the current appliance
with mlflow.start_run():
# Auto log all MLflow from lightening
mlflow.pytorch.autolog()
# Save the run ID to use in testing phase
self.run_id['Multi-appliance'] = mlflow.active_run().info.run_id
# Log parameters of current run
mlflow.log_params(self.hparams)
# Model Training
self.train_model(
'Multi-appliance',
train_loader,
val_loader,
exp_name,
data.mean if self.hparams['target_norm'] == 'z-norm' else 0,
data.std if self.hparams['target_norm'] == 'z-norm' else 1)
new_params = {"checkpoints_path": original_checkpoint }
self.hparams.update(new_params)