From 4c6cb58bd5501433e7fa0ba5bca954f25eacf762 Mon Sep 17 00:00:00 2001 From: fvazquez Date: Fri, 23 May 2025 16:43:28 +0200 Subject: [PATCH 1/4] Added gradient clipping and support for more layers --- dislib/pytorch/pytorch_distributed.py | 64 ++++++++++++++++++++++++--- 1 file changed, 59 insertions(+), 5 deletions(-) diff --git a/dislib/pytorch/pytorch_distributed.py b/dislib/pytorch/pytorch_distributed.py index 9e5cb0d8..f7846e08 100644 --- a/dislib/pytorch/pytorch_distributed.py +++ b/dislib/pytorch/pytorch_distributed.py @@ -75,11 +75,30 @@ def train_cnn_batch_GPU(self, model_parameters, x_train, {'processorType': 'GPU', 'computingUnits': '${ComputingUnitsGPUs}'}]) @task() def _train_cnn_batch_GPU(self, model_parameters, x_train, - y_train, num_batches, shuffle_block_data): + y_train, num_batches, shuffle_block_data, + transformations=None, + gradient_clipping=None): if shuffle_block_data: idx = torch.randperm(x_train.shape[0]) - x_train = x_train[idx].view(x_train.size()) - y_train = y_train[idx].view(y_train.size()) + if not isinstance(x_train.size, int): + x_train = x_train[idx].view(x_train.size()) + else: + if not torch.is_tensor(x_train): + x_train = x_train[idx] + else: + x_train = torch.from_numpy(x_train) + x_train = x_train[idx].view(x_train.size()) + if not isinstance(y_train.size, int): + if len(y_train.size()) > 1: + y_train = y_train[idx].view(y_train.size()) + else: + y_train = y_train[idx] + else: + if not torch.is_tensor(y_train): + y_train = y_train[idx] + else: + y_train = torch.from_numpy(y_train) + y_train = y_train[idx].view(y_train.size()) if hasattr(self.model, 'neural_network_layers'): len_nn = len(self.model.neural_network_layers) for i in range(len_nn): @@ -89,10 +108,17 @@ def _train_cnn_batch_GPU(self, model_parameters, x_train, nn.Parameter( model_parameters.neural_network_layers[i]. weight.float()) + if hasattr(model_parameters.neural_network_layers[i], + 'bias'): self.model.neural_network_layers[i].bias = \ nn.Parameter( model_parameters.neural_network_layers[i].bias. float()) + if hasattr(self.model.neural_network_layers[i], + 'alpha'): + self.model.neural_network_layers[i].alpha = \ + nn.Parameter(model_parameters. + neural_network_layers[i].alpha) if hasattr(self.model, 'dense_neural_network_layers'): len_nn = len(model_parameters.dense_neural_network_layers) for i in range(len_nn): @@ -103,23 +129,37 @@ def _train_cnn_batch_GPU(self, model_parameters, x_train, nn.Parameter( model_parameters.dense_neural_network_layers[i]. weight.float()) + if hasattr(model_parameters.dense_neural_network_layers[i], + 'bias'): self.model.dense_neural_network_layers[i].bias = \ nn.Parameter( model_parameters.dense_neural_network_layers[i]. bias.float()) + if hasattr(self.model.neural_network_layers[i], + 'alpha'): + self.model.neural_network_layers[i].alpha = \ + nn.Parameter(model_parameters. + neural_network_layers[i].alpha) device = torch.device("cuda" if torch.cuda.is_available() else "cpu") self.model = self.model.to(device) optimizer = self.optimizer(self.model.parameters(), **self.optimizer_parameters) x_train = x_train.float().to(device) - true_labels = y_train.to(device) indexes = int(x_train.shape[0] / num_batches) + if isinstance(self.loss, nn.CrossEntropyLoss): + y_train = y_train.long() + true_labels = y_train.to(device) for idx in range(num_batches): optimizer.zero_grad() - outputs = self.model(x_train[idx*indexes:(idx+1)*indexes]) + inputs = x_train[idx*indexes:(idx + 1)*indexes] + if transformations is not None: + inputs = transformations(inputs) + outputs = self.model(inputs) loss = self.loss(outputs, true_labels[idx*indexes:(idx+1)*indexes]) loss.backward() + if gradient_clipping is not None: + gradient_clipping(self.model) optimizer.step() self.model = self.model.to("cpu") return self.model @@ -181,9 +221,16 @@ def _aggregate_parameters_async(self, model_params, model_params.neural_network_layers[i].weight = \ nn.Parameter(final_added_parameters[j].float()) j += 1 + if hasattr(model_params.neural_network_layers[i], + 'bias'): model_params.neural_network_layers[i].bias = \ nn.Parameter(final_added_parameters[j].float()) j += 1 + if hasattr(self.model.neural_network_layers[i], + 'alpha'): + self.model.neural_network_layers[i].alpha = \ + nn.Parameter(model_params. + neural_network_layers[i].alpha) if hasattr(model_params, 'dense_neural_network_layers'): len_nn = len(model_params.dense_neural_network_layers) aux_j = 0 @@ -193,7 +240,14 @@ def _aggregate_parameters_async(self, model_params, model_params.dense_neural_network_layers[i].weight = \ nn.Parameter(final_added_parameters[aux_j + j].float()) aux_j += 1 + if hasattr(model_params.dense_neural_network_layers[i], + 'bias'): model_params.dense_neural_network_layers[i].bias = \ nn.Parameter(final_added_parameters[aux_j + j].float()) aux_j += 1 + if hasattr(self.model.dense_neural_network_layers[i], + 'alpha'): + self.model.dense_neural_network_layers[i].alpha = \ + nn.Parameter(model_params. + neural_network_layers[i].alpha) return model_params From e83baf17887e6bc239dabb75e1bb8c0bbb070b81 Mon Sep 17 00:00:00 2001 From: fvazquez Date: Fri, 23 May 2025 16:43:48 +0200 Subject: [PATCH 2/4] Add scheduler --- .../encapsulated_functions_distributed.py | 87 ++++++++++++++++++- 1 file changed, 86 insertions(+), 1 deletion(-) diff --git a/dislib/pytorch/encapsulated_functions_distributed.py b/dislib/pytorch/encapsulated_functions_distributed.py index 44e6a7a1..acaf5fcf 100644 --- a/dislib/pytorch/encapsulated_functions_distributed.py +++ b/dislib/pytorch/encapsulated_functions_distributed.py @@ -84,7 +84,7 @@ def __init__(self, num_workers=10): self.num_workers = num_workers def build(self, net, optimizer, loss, optimizer_parameters, - num_gpu=0, num_nodes=0): + scheduler=None, T_max=1, eta_min=0.0, num_gpu=0, num_nodes=0): """ Builds the model to obtain the initial parameters of the training and it also builds the model in each worker in order to be ready @@ -114,9 +114,17 @@ def build(self, net, optimizer, loss, optimizer_parameters, copy.deepcopy(optimizer), optimizer_parameters) + self.optimizer_parameters = optimizer_parameters self.num_gpu = num_gpu self.num_gpus_per_worker = int(num_nodes*num_gpu/self.num_workers) self.model_parameters = net + self.optimizer = optimizer(self.model_parameters.parameters(), + **optimizer_parameters) + if scheduler is not None: + self.scheduler = scheduler(self.optimizer, + T_max=T_max, eta_min=eta_min) + else: + self.scheduler = None def get_parameters(self): """ @@ -181,6 +189,17 @@ def fit_synchronous_shuffle_every_n_epochs_with_GPU(self, x_train, parameters_for_workers = [copy.deepcopy(self.model_parameters) for _ in range(len(parameters_for_workers))] + if self.scheduler is not None: + self.scheduler.step() + self.optimizer_parameters = {} + self.optimizer_parameters["lr"] = \ + self.optimizer.param_groups[0]["lr"] + if "momentum" in self.optimizer.param_groups[0]: + self.optimizer_parameters["momentum"] = \ + self.optimizer.param_groups[0]["momentum"] + if "weight_decay" in self.optimizer.param_groups[0]: + self.optimizer_parameters["weight_decay"] = \ + self.optimizer.param_groups[0]["weight_decay"] parameters_for_workers = compss_wait_on(parameters_for_workers) self.model_parameters = pt_aggregateParameters( parameters_for_workers) @@ -249,6 +268,17 @@ def fit_synchronous_every_n_epochs_with_GPU(self, x_train, y_train, parameters_for_workers = [ copy.deepcopy(self.model_parameters) for _ in range(len(parameters_for_workers))] + if self.scheduler is not None: + self.scheduler.step() + self.optimizer_parameters = {} + self.optimizer_parameters["lr"] = \ + self.optimizer.param_groups[0]["lr"] + if "momentum" in self.optimizer.param_groups[0]: + self.optimizer_parameters["momentum"] = \ + self.optimizer.param_groups[0]["momentum"] + if "weight_decay" in self.optimizer.param_groups[0]: + self.optimizer_parameters["weight_decay"] = \ + self.optimizer.param_groups[0]["weight_decay"] parameters_for_workers = compss_wait_on(parameters_for_workers) self.model_parameters = pt_aggregateParameters( parameters_for_workers) @@ -305,6 +335,17 @@ def fit_synchronous_with_GPU(self, x_train, y_train, j = j + 1 if j == self.num_workers: j = 0 + if self.scheduler is not None: + self.scheduler.step() + self.optimizer_parameters = {} + self.optimizer_parameters["lr"] = \ + self.optimizer.param_groups[0]["lr"] + if "momentum" in self.optimizer.param_groups[0]: + self.optimizer_parameters["momentum"] = \ + self.optimizer.param_groups[0]["momentum"] + if "weight_decay" in self.optimizer.param_groups[0]: + self.optimizer_parameters["weight_decay"] = \ + self.optimizer.param_groups[0]["weight_decay"] parameters_for_workers = compss_wait_on(parameters_for_workers) self.model_parameters = pt_aggregateParameters( parameters_for_workers) @@ -356,6 +397,17 @@ def fit_synchronous_shuffle_with_GPU(self, x_train, y_train, j = j + 1 if j == self.num_workers: j = 0 + if self.scheduler is not None: + self.scheduler.step() + self.optimizer_parameters = {} + self.optimizer_parameters["lr"] = \ + self.optimizer.param_groups[0]["lr"] + if "momentum" in self.optimizer.param_groups[0]: + self.optimizer_parameters["momentum"] = \ + self.optimizer.param_groups[0]["momentum"] + if "weight_decay" in self.optimizer.param_groups[0]: + self.optimizer_parameters["weight_decay"] = \ + self.optimizer.param_groups[0]["weight_decay"] parameters_for_workers = compss_wait_on(parameters_for_workers) self.model_parameters = \ pt_aggregateParameters(parameters_for_workers) @@ -417,6 +469,17 @@ def fit_asynchronous_with_GPU(self, x_train, y_train, j = j + 1 if j == self.num_workers: j = 0 + if self.scheduler is not None: + self.scheduler.step() + self.optimizer_parameters = {} + self.optimizer_parameters["lr"] = \ + self.optimizer.param_groups[0]["lr"] + if "momentum" in self.optimizer.param_groups[0]: + self.optimizer_parameters["momentum"] = \ + self.optimizer.param_groups[0]["momentum"] + if "weight_decay" in self.optimizer.param_groups[0]: + self.optimizer_parameters["weight_decay"] = \ + self.optimizer.param_groups[0]["weight_decay"] for j in range(self.num_workers): parameters_for_workers[j] = \ self.compss_object[j].aggregate_parameters_async( @@ -468,6 +531,17 @@ def fit_asynchronous_shuffle_with_GPU(self, x_train, y_train, j = j + 1 if j == self.num_workers: j = 0 + if self.scheduler is not None: + self.scheduler.step() + self.optimizer_parameters = {} + self.optimizer_parameters["lr"] = \ + self.optimizer.param_groups[0]["lr"] + if "momentum" in self.optimizer.param_groups[0]: + self.optimizer_parameters["momentum"] = \ + self.optimizer.param_groups[0]["momentum"] + if "weight_decay" in self.optimizer.param_groups[0]: + self.optimizer_parameters["weight_decay"] = \ + self.optimizer.param_groups[0]["weight_decay"] for j in range(self.num_workers): parameters_for_workers[j] = \ self.compss_object[j].aggregate_parameters_async( @@ -532,6 +606,17 @@ def fit_asynchronous_n_epochs_with_GPU(self, x_train, y_train, j = j + 1 if j == self.num_workers: j = 0 + if self.scheduler is not None: + self.scheduler.step() + self.optimizer_parameters = {} + self.optimizer_parameters["lr"] = \ + self.optimizer.param_groups[0]["lr"] + if "momentum" in self.optimizer.param_groups[0]: + self.optimizer_parameters["momentum"] = \ + self.optimizer.param_groups[0]["momentum"] + if "weight_decay" in self.optimizer.param_groups[0]: + self.optimizer_parameters["weight_decay"] = \ + self.optimizer.param_groups[0]["weight_decay"] if (i + 1) % n_epocs_sync == 0: for j in range(self.num_workers): parameters_for_workers[j] = \ From ecf0c248c747b80aae4bda5be7cae0ae27df8cf8 Mon Sep 17 00:00:00 2001 From: fvazquez Date: Thu, 29 May 2025 09:54:03 +0200 Subject: [PATCH 3/4] Specified CMake version, before it was incompatible with pyeddl --- Dockerfile | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/Dockerfile b/Dockerfile index 3bf546c5..d5465aca 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,4 +1,4 @@ -FROM compss/compss-tutorial:latest +FROM compss/compss-tutorial:3.3.3 MAINTAINER COMPSs Support COPY . dislib/ @@ -14,7 +14,8 @@ RUN apt-get -o Acquire::Check-Valid-Until=false -o Acquire::Check-Date=false upd git clone https://github.com/Blosc/python-blosc2/ /python-blosc2 && cd /python-blosc2 && git checkout v2.5.1 && \ python3 -m pip install --trusted-host pypi.org --trusted-host files.pythonhosted.org --upgrade -r /python-blosc2/requirements-build.txt && \ python3 -m pip install --trusted-host pypi.org --trusted-host files.pythonhosted.org --upgrade -r /python-blosc2/requirements-runtime.txt && \ - cd /python-blosc2 && git submodule update --init --recursive && python3 setup.py build_ext --inplace -- -DDEACTIVATE_AVX2:STRING=ON && \ + python3 -m pip install "cmake==3.31.2" && \ + cd /python-blosc2 && git submodule update --init --recursive && export CXXFLAGS="-Wno-error=maybe-uninitialized" && python3 setup.py build_ext --inplace -- -DDEACTIVATE_AVX2:STRING=ON && \ git clone --recurse-submodules https://github.com/deephealthproject/pyeddl.git /pyeddl && cd /pyeddl && \ cd third_party/eddl && mkdir build && cd build && cmake .. -D CMAKE_INSTALL_PREFIX=/pyeddl/third_party/eddl -D BUILD_TARGET=CPU -D BUILD_SHARED_LIB=ON -D BUILD_SUPERBUILD=ON -D BUILD_PROTOBUF=ON -D BUILD_TESTS=OFF && \ make && make install && cd ../.. && \ From 2f2f303cc96702fd535b6417544684f3f1007434 Mon Sep 17 00:00:00 2001 From: fvazquez Date: Thu, 6 Nov 2025 17:18:11 +0100 Subject: [PATCH 4/4] Updated, corrected functions and added functions to obtain training curves --- dislib/data/tensor.py | 75 ++-- dislib/pytorch/auxiliar_functions.py | 267 ++++++++++++++ .../encapsulated_functions_distributed.py | 340 ++++++++++++++---- dislib/pytorch/pytorch_distributed.py | 199 +++++++--- tests/test_distributed_pytorch.py | 144 ++++++++ tests/test_tensor.py | 17 + 6 files changed, 914 insertions(+), 128 deletions(-) create mode 100644 dislib/pytorch/auxiliar_functions.py diff --git a/dislib/data/tensor.py b/dislib/data/tensor.py index f28dc5ca..85623078 100644 --- a/dislib/data/tensor.py +++ b/dislib/data/tensor.py @@ -72,6 +72,16 @@ def __str__(self): self.number_samples, self.shape) + def __repr__(self): + return "ds-tensor(tensors=(...), " \ + "tensors_shape=%r, " \ + "n_tensors=%r, " \ + "num_samples=%r, " \ + "shape=%r)" % (self.tensor_shape, + self.n_tensors, + self.number_samples, + self.shape) + def __del__(self): if self._delete: [compss_delete_object(tensor) for tensors in self.tensors for @@ -611,10 +621,10 @@ def from_pt_tensor(tensor, shape=None): dtype=tensor.dtype) -def from_ds_array(ds_array, shape=None): +def from_ds_array(ds_array, shape=None, one_column=False): """ Creates the Tensor object from a ds_array. - This method can't generated Tensors that have more than two dimensions, + This method can't generate Tensors that have more than two dimensions, thus the output of this method can't be used in a Convolutional Neural Network neither any Neural Network that requires data input with more than two dimensions. @@ -623,11 +633,15 @@ def from_ds_array(ds_array, shape=None): ---------- ds_array : ds-array The ds-array to transform into ds-tensor. - shape : tuple of two ints. + shape : tuple of two ints. Optional, if used with + one_column=True this will be ignored The organization of the number of tensors, how many will be on axis=0 and how many will be on axis=1. The total number of tensors should be the same as blocks are in the input ds-array. + one_column : boolean + If the ds-tensor will have one unique column of tensors or + as many as column blocks are in the ds-array Returns ------- @@ -645,24 +659,36 @@ def from_ds_array(ds_array, shape=None): raise ValueError("The number of tensors specified in the " "shape should be equal to the number of " "blocks in the input ds-array") - new_tensor = Tensor._get_out_tensors([ds_array._n_blocks[0], - ds_array._n_blocks[1]]) - for block_i, idx_i in zip(ds_array._blocks, range(len(new_tensor))): - for block_j, idx_j in zip(block_i, range(len(new_tensor[idx_i]))): - new_tensor[idx_i][idx_j] = _assign_blocks_to_tensors(block_j) - if shape is not None and (isinstance(shape, tuple) or - isinstance(shape, list)): - return change_shape(Tensor(tensors=new_tensor, - tensor_shape=(ds_array._reg_shape[0], - ds_array.shape[1]), - number_samples=ds_array.shape[0], - dtype=np.float64, delete=ds_array._delete), shape) + if not one_column: + new_tensor = Tensor._get_out_tensors([ds_array._n_blocks[0], + ds_array._n_blocks[1]]) + for block_i, idx_i in zip(ds_array._blocks, range(len(new_tensor))): + for block_j, idx_j in zip(block_i, range(len(new_tensor[idx_i]))): + new_tensor[idx_i][idx_j] = _assign_blocks_to_tensors(block_j) + if shape is not None and (isinstance(shape, tuple) or + isinstance(shape, list)): + return change_shape(Tensor(tensors=new_tensor, + tensor_shape=(ds_array._reg_shape[0], + ds_array._reg_shape[1]), + number_samples=ds_array.shape[0], + dtype=np.float64, + delete=ds_array._delete), shape) + else: + return Tensor(tensors=new_tensor, + tensor_shape=(ds_array._reg_shape[0], + ds_array._reg_shape[1]), + number_samples=ds_array.shape[0], + dtype=np.float64, delete=ds_array._delete) else: - return Tensor(tensors=new_tensor, - tensor_shape=(ds_array._reg_shape[0], - ds_array.shape[1]), - number_samples=ds_array.shape[0], - dtype=np.float64, delete=ds_array._delete) + new_tensor = Tensor._get_out_tensors( + [ds_array._n_blocks[0], 1]) + for block_i, tensor_i in zip(ds_array._blocks, new_tensor): + _assign_various_blocks_to_tensor(block_i, tensor_i) + return Tensor(tensors=new_tensor, tensor_shape=( + ds_array._reg_shape[0], ds_array.shape[1]), + dtype=np.float64, + number_samples=ds_array.shape[0], + delete=ds_array._delete) def cat(tensors, dimension): @@ -1292,6 +1318,15 @@ def _shuffle_last_tensor_xy(tensor, tensor_y, random_state=None): return tensor, tensor_y +@constraint(computing_units="${ComputingUnits}") +@task(blocks={Type: COLLECTION_IN, Depth: 1}, + out_tensors={Type: COLLECTION_OUT, Depth: 1}) +def _assign_various_blocks_to_tensor(blocks, out_tensors): + block = np.block(blocks) + out_tensor = torch.from_numpy(block) + out_tensors[0] = out_tensor + + @constraint(computing_units="${ComputingUnits}") @task(returns=1) def _assign_blocks_to_tensors(block): diff --git a/dislib/pytorch/auxiliar_functions.py b/dislib/pytorch/auxiliar_functions.py new file mode 100644 index 00000000..32af099c --- /dev/null +++ b/dislib/pytorch/auxiliar_functions.py @@ -0,0 +1,267 @@ +from pycompss.api.parameter import IN, COLLECTION_IN +from pycompss.api.constraint import constraint +from pycompss.api.task import task +import math +from sklearn.metrics import accuracy_score +import torch.nn as nn +import torch + + +@constraint(processors=[ + {'processorType': 'CPU', 'computingUnits': '1'}, + {'processorType': 'GPU', 'computingUnits': '1'}]) +@task(x_test=COLLECTION_IN, model=IN, + parameters_for_model=COLLECTION_IN, returns=2) +def evaluate_model_parameters(x_val, y_val, model, + parameters_for_model, + loss_function, + score_function=accuracy_score, + y_val_loss_function=None): + model = assign_parameters(model, parameters_for_model) + validation_loss = None + validation_acc = None + if x_val is not None: + torch_model = model.eval().to("cuda:0") + indexes = 128 + num_batches = math.ceil(x_val[0][0].shape[0]/indexes) + outputs = [] + for x_out_tens in x_val: + for x_in_tens in x_out_tens: + x_in_tens = x_in_tens.to("cuda:0") + for idx in range(num_batches): + with torch.no_grad(): + output = torch_model(x_in_tens + [idx * indexes: + (idx + 1) * indexes]. + float()) + output_cpu = output.cpu() + outputs.append(output_cpu) + del output + torch.cuda.empty_cache() + x_in_tens = x_in_tens.to("cpu") + del x_in_tens + torch.cuda.empty_cache() + outputs = torch.cat(outputs) + loss = loss_function() + if y_val_loss_function is not None: + validation_loss = loss(outputs, y_val_loss_function).item() + else: + validation_loss = loss(outputs, y_val).item() + outputs = process_outputs(outputs) + outputs = outputs.detach().cpu().numpy() + validation_acc = score_function(y_val, outputs) + del torch_model + return validation_loss, validation_acc + + +def pt_aggregate_parameters(workers_parameters): + NUM_WORKERS = len(workers_parameters) + + final_weights = [] + for i in range(NUM_WORKERS): + workers_weights = [] + for param in workers_parameters[i].parameters(): + workers_weights.append(param) + final_weights.append(workers_weights) + final_added_parameters = final_weights[0] + for i in range(len(final_weights[0])): + for j in range(1, len(final_weights)): + final_added_parameters[i] = final_added_parameters[i] + \ + final_weights[j][i] + + for i in range(len(final_weights[0])): + final_added_parameters[i] = final_added_parameters[i]/NUM_WORKERS + j = 0 + if hasattr(workers_parameters[0], 'neural_network_layers'): + len_nn = len(workers_parameters[0].neural_network_layers) + for i in range(len_nn): + if hasattr(workers_parameters[0].neural_network_layers[i], + 'weight'): + workers_parameters[0].neural_network_layers[i].weight = \ + nn.Parameter(final_added_parameters[j]) + j += 1 + workers_parameters[0].neural_network_layers[i].bias = \ + nn.Parameter(final_added_parameters[j]) + j += 1 + if hasattr(workers_parameters[0].neural_network_layers[i], + 'shortcut'): + len_shortcut = len(workers_parameters[0]. + neural_network_layers[i].shortcut) + for k in range(len_shortcut): + if hasattr(workers_parameters[0]. + neural_network_layers[i].shortcut[k], + 'weight'): + workers_parameters[0].\ + neural_network_layers[i].\ + shortcut[k].weight = \ + nn.Parameter(final_added_parameters[j]) + j += 1 + workers_parameters[0].\ + neural_network_layers[i].\ + shortcut[k].bias = \ + nn.Parameter(final_added_parameters[j]) + j += 1 + if hasattr(workers_parameters[0]. + neural_network_layers[i].shortcut[k], + 'alpha'): + workers_parameters[0].\ + neural_network_layers[i].\ + shortcut[k].alpha = \ + nn.Parameter(final_added_parameters[j]) + j += 1 + if hasattr(workers_parameters[0]. + neural_network_layers[i], + 'layers'): + len_layers = len(workers_parameters[0]. + neural_network_layers[i].layers) + for k in range(len_layers): + if hasattr(workers_parameters[0]. + neural_network_layers[i].layers[k], + 'weight'): + workers_parameters[0].\ + neural_network_layers[i].layers[k].weight = \ + nn.Parameter(final_added_parameters[j]) + j += 1 + workers_parameters[0].\ + neural_network_layers[i].layers[k].bias = \ + nn.Parameter(final_added_parameters[j]) + j += 1 + if hasattr(workers_parameters[0]. + neural_network_layers[i].layers[k], + 'alpha'): + workers_parameters[0].\ + neural_network_layers[i].layers[k].\ + alpha = \ + nn.Parameter(final_added_parameters[j]) + j += 1 + if hasattr(workers_parameters[0], 'dense_neural_network_layers'): + len_nn = len(workers_parameters[0].dense_neural_network_layers) + aux_j = 0 + for i in range(len_nn): + if hasattr(workers_parameters[0].dense_neural_network_layers[i], + 'weight'): + workers_parameters[0].dense_neural_network_layers[i].weight = \ + nn.Parameter(final_added_parameters[aux_j + j]) + aux_j += 1 + workers_parameters[0].dense_neural_network_layers[i].bias = \ + nn.Parameter(final_added_parameters[aux_j + j]) + aux_j += 1 + return workers_parameters[0] + + +def assign_parameters(model, trained_weights): + j = 0 + if hasattr(model, 'neural_network_layers'): + len_nn = len(model.neural_network_layers) + for i in range(len_nn): + if hasattr(model.neural_network_layers[i], 'weight'): + model.neural_network_layers[i].weight = \ + nn.Parameter(trained_weights. + neural_network_layers[i].weight) + j += 1 + model.neural_network_layers[i].bias = \ + nn.Parameter(trained_weights. + neural_network_layers[i].bias) + j += 1 + if hasattr(model.neural_network_layers[i], 'shortcut'): + len_shortcut = len(model.neural_network_layers[i].shortcut) + for k in range(len_shortcut): + if hasattr(model.neural_network_layers[i].shortcut[k], + 'weight'): + model.neural_network_layers[i].shortcut[k].weight = \ + nn.Parameter(trained_weights. + neural_network_layers[i]. + shortcut[k].weight) + j += 1 + model.neural_network_layers[i].shortcut[k].bias = \ + nn.Parameter(trained_weights. + neural_network_layers[i]. + shortcut[k].bias) + j += 1 + if hasattr(model.neural_network_layers[i].shortcut[k], + 'alpha'): + model.neural_network_layers[i].shortcut[k].alpha = \ + nn.Parameter(trained_weights. + neural_network_layers[i]. + shortcut[k].alpha) + j += 1 + if hasattr(model.neural_network_layers[i], + 'layers'): + len_layers = len(model.neural_network_layers[i].layers) + for k in range(len_layers): + if hasattr(model.neural_network_layers[i].layers[k], + 'weight'): + model.neural_network_layers[i].layers[k].weight = \ + nn.Parameter(trained_weights. + neural_network_layers[i]. + layers[k].weight) + j += 1 + model.neural_network_layers[i].layers[k].bias = \ + nn.Parameter(trained_weights. + neural_network_layers[i]. + layers[k].bias) + j += 1 + if hasattr(model.neural_network_layers[i].layers[k], + 'alpha'): + model.neural_network_layers[i].layers[k].alpha = \ + nn.Parameter(trained_weights. + neural_network_layers[i]. + layers[k].alpha) + j += 1 + if hasattr(model, 'dense_neural_network_layers'): + len_nn = len(model.dense_neural_network_layers) + aux_j = 0 + for i in range(len_nn): + if hasattr(model.dense_neural_network_layers[i], 'weight'): + model.dense_neural_network_layers[i].weight = \ + nn.Parameter(trained_weights. + dense_neural_network_layers[i]. + weight) + aux_j += 1 + model.dense_neural_network_layers[i].bias = \ + nn.Parameter(trained_weights. + dense_neural_network_layers[i]. + bias) + aux_j += 1 + return model + + +def process_outputs(output_nn): + _, indices = torch.max(output_nn, dim=1) + binary_output = torch.zeros_like(output_nn) + binary_output[torch.arange(output_nn.size(0)), indices] = 1 + return binary_output + + +def compute_validation_losses(torch_model, x_val, y_val_to_loss, + y_val, loss_function, + validation_score=accuracy_score): + validation_loss = [] + validation_acc = [] + torch_model = torch_model.eval().to("cuda:0") + indexes = 128 + num_batches = math.ceil(x_val[0][0].tensor_shape[0]/indexes) + outputs = [] + for x_out_tens in x_val: + for x_in_tens in x_out_tens: + x_in_tens = x_in_tens.to("cuda:0") + for idx in range(num_batches): + with torch.no_grad(): + output = torch_model(x_in_tens[idx * indexes: + (idx + 1) * indexes]. + float()) + output_cpu = output.cpu() + outputs.append(output_cpu) + del output + torch.cuda.empty_cache() + x_in_tens = x_in_tens.to("cpu") + del x_in_tens + torch.cuda.empty_cache() + outputs = torch.cat(outputs) + loss = loss_function() + validation_loss.append(loss(outputs, y_val_to_loss).item()) + if outputs.shape[-1] > 1: + outputs = process_outputs(outputs) + outputs = outputs.detach().cpu().numpy() + validation_acc.append(accuracy_score(y_val, outputs)) + return validation_loss, validation_acc diff --git a/dislib/pytorch/encapsulated_functions_distributed.py b/dislib/pytorch/encapsulated_functions_distributed.py index acaf5fcf..e8ffd024 100644 --- a/dislib/pytorch/encapsulated_functions_distributed.py +++ b/dislib/pytorch/encapsulated_functions_distributed.py @@ -1,54 +1,12 @@ import numpy as np - -import torch.nn as nn import copy from pycompss.api.api import compss_wait_on, compss_delete_object from dislib.data.tensor import shuffle from dislib.pytorch.pytorch_distributed import PytorchDistributed - - -def pt_aggregateParameters(workers_parameters): - NUM_WORKERS = len(workers_parameters) - - final_weights = [] - for i in range(NUM_WORKERS): - workers_weights = [] - for param in workers_parameters[i].parameters(): - workers_weights.append(param) - final_weights.append(workers_weights) - final_added_parameters = final_weights[0] - for i in range(len(final_weights[0])): - for j in range(1, len(final_weights)): - final_added_parameters[i] = final_added_parameters[i] + \ - final_weights[j][i] - - for i in range(len(final_weights[0])): - final_added_parameters[i] = final_added_parameters[i]/NUM_WORKERS - j = 0 - if hasattr(workers_parameters[0], 'neural_network_layers'): - len_nn = len(workers_parameters[0].neural_network_layers) - for i in range(len_nn): - if hasattr(workers_parameters[0].neural_network_layers[i], - 'weight'): - workers_parameters[0].neural_network_layers[i].weight = \ - nn.Parameter(final_added_parameters[j]) - j += 1 - workers_parameters[0].neural_network_layers[i].bias = \ - nn.Parameter(final_added_parameters[j]) - j += 1 - if hasattr(workers_parameters[0], 'dense_neural_network_layers'): - len_nn = len(workers_parameters[0].dense_neural_network_layers) - aux_j = 0 - for i in range(len_nn): - if hasattr(workers_parameters[0].dense_neural_network_layers[i], - 'weight'): - workers_parameters[0].dense_neural_network_layers[i].weight = \ - nn.Parameter(final_added_parameters[aux_j + j]) - aux_j += 1 - workers_parameters[0].dense_neural_network_layers[i].bias = \ - nn.Parameter(final_added_parameters[aux_j + j]) - aux_j += 1 - return workers_parameters[0] +from sklearn.metrics import accuracy_score +from auxiliar_functions import pt_aggregate_parameters, \ + compute_validation_losses, assign_parameters, \ + evaluate_model_parameters class EncapsulatedFunctionsDistributedPytorch(object): @@ -84,7 +42,8 @@ def __init__(self, num_workers=10): self.num_workers = num_workers def build(self, net, optimizer, loss, optimizer_parameters, - scheduler=None, T_max=1, eta_min=0.0, num_gpu=0, num_nodes=0): + gradient_clipping=None, scheduler=None, T_max=1, + eta_min=0.0, num_gpu=0, num_nodes=0): """ Builds the model to obtain the initial parameters of the training and it also builds the model in each worker in order to be ready @@ -120,6 +79,15 @@ def build(self, net, optimizer, loss, optimizer_parameters, self.model_parameters = net self.optimizer = optimizer(self.model_parameters.parameters(), **optimizer_parameters) + if gradient_clipping is not None: + if callable(gradient_clipping): + self.gradient_clipping = gradient_clipping + else: + self.gradient_clipping = None + raise Warning("Gradient clipping specified should" + "be a callable function. Set to None") + else: + self.gradient_clipping = None if scheduler is not None: self.scheduler = scheduler(self.optimizer, T_max=T_max, eta_min=eta_min) @@ -177,7 +145,8 @@ def fit_synchronous_shuffle_every_n_epochs_with_GPU(self, x_train, x_train.tensors[int(row)][int(col)], y_train.tensors[int(row)][int(col)], num_batches_per_worker, - shuffle_block_data=False) + shuffle_block_data=False, + gradient_clipping=self.gradient_clipping) j = j + 1 if j == self.num_workers: j = 0 @@ -185,7 +154,7 @@ def fit_synchronous_shuffle_every_n_epochs_with_GPU(self, x_train, parameters_for_workers = compss_wait_on( parameters_for_workers) self.model_parameters = \ - pt_aggregateParameters(parameters_for_workers) + pt_aggregate_parameters(parameters_for_workers) parameters_for_workers = [copy.deepcopy(self.model_parameters) for _ in range(len(parameters_for_workers))] @@ -201,7 +170,7 @@ def fit_synchronous_shuffle_every_n_epochs_with_GPU(self, x_train, self.optimizer_parameters["weight_decay"] = \ self.optimizer.param_groups[0]["weight_decay"] parameters_for_workers = compss_wait_on(parameters_for_workers) - self.model_parameters = pt_aggregateParameters( + self.model_parameters = pt_aggregate_parameters( parameters_for_workers) return self.model_parameters @@ -256,7 +225,8 @@ def fit_synchronous_every_n_epochs_with_GPU(self, x_train, y_train, x_train.tensors[int(row)][int(col)], y_train.tensors[int(row)][int(col)], num_batches_per_worker, - shuffle_block_data=shuffle_block_data) + shuffle_block_data=shuffle_block_data, + gradient_clipping=self.gradient_clipping) j = j + 1 if j == self.num_workers: j = 0 @@ -264,7 +234,7 @@ def fit_synchronous_every_n_epochs_with_GPU(self, x_train, y_train, parameters_for_workers = compss_wait_on( parameters_for_workers) self.model_parameters = \ - pt_aggregateParameters(parameters_for_workers) + pt_aggregate_parameters(parameters_for_workers) parameters_for_workers = [ copy.deepcopy(self.model_parameters) for _ in range(len(parameters_for_workers))] @@ -280,10 +250,127 @@ def fit_synchronous_every_n_epochs_with_GPU(self, x_train, y_train, self.optimizer_parameters["weight_decay"] = \ self.optimizer.param_groups[0]["weight_decay"] parameters_for_workers = compss_wait_on(parameters_for_workers) - self.model_parameters = pt_aggregateParameters( + self.model_parameters = pt_aggregate_parameters( parameters_for_workers) return self.model_parameters + def fit_synchronous_with_GPU_train_curves(self, x_train, + y_train, + num_batches_per_worker, + num_epochs, + shuffle_blocks=True, + shuffle_block_data=True, + complete_shuffle=False, + x_test=None, y_test=None, + y_test_to_loss=None, + function_score=accuracy_score): + """ + Training of the neural network performing a syncrhonization of the + weights at the end of each epoch, it performs a total shuffle of + the tensors on the ds_tensor and the elements inside each tensor + + Parameters + ---------- + x_train : ds_tensor + samples and features of the training dataset + y_train: ds_tensor + classes or values of the samples of the training dataset + num_batches_per_worker: int + Number of batches that each worker will be trained with every + piece of the dataset + num_epochs: int + Total number of epochs to train the model + shuffle_blocks: boolean + Variable specifying to shuffle the blocks of the ds_tensor or not + shuffle_block_data: boolean + Variable specifying whether to shuffle the elements inside each + tensor locally or not + x_test: list of tensors + samples and features of the validation dataset + y_test: list of tensors + classes or values of the samples of the validation dataset + y_test_to_loss: list of tensors + classes or values of the samples of the validation dataset + as the loss function expects + function_score: function + function to compute the score of the predictions during + training and validation + Returns + ------- + model_parameters: np.array + """ + parameters_for_workers = [copy.deepcopy(self.model_parameters) for + _ in range(self.num_workers)] + rows = np.arange(x_train.shape[0]) + cols = np.arange(x_train.shape[1]) + training_loss = [] + training_acc = [] + validation_acc = [] + validation_loss = [] + for i in range(num_epochs): + j = 0 + if complete_shuffle: + x_train, y_train = shuffle(x_train, y_train) + elif shuffle_blocks: + rows = np.random.permutation(x_train.shape[0]) + cols = np.random.permutation(x_train.shape[1]) + epoch_accuracy = [] + epoch_loss = [] + for row in rows: + for col in cols: + parameters_for_workers[j], train_loss, train_accuracy = \ + self.compss_object[j].train_cnn_batch_GPU_losses( + parameters_for_workers[j], + x_train.tensors[int(row)][int(col)], + y_train.tensors[int(row)][int(col)], + num_batches_per_worker, + shuffle_block_data=shuffle_block_data, + score_to_compute=function_score, + gradient_clipping=self.gradient_clipping) + j = j + 1 + if j == self.num_workers: + j = 0 + epoch_loss.append(train_loss) + epoch_accuracy.append(train_accuracy) + if self.scheduler is not None: + self.scheduler.step() + self.optimizer_parameters = {} + self.optimizer_parameters["lr"] = \ + self.optimizer.param_groups[0]["lr"] + if "momentum" in self.optimizer.param_groups[0]: + self.optimizer_parameters["momentum"] = \ + self.optimizer.param_groups[0]["momentum"] + if "weight_decay" in self.optimizer.param_groups[0]: + self.optimizer_parameters["weight_decay"] = \ + self.optimizer.param_groups[0]["weight_decay"] + parameters_for_workers = compss_wait_on(parameters_for_workers) + epoch_loss = compss_wait_on(epoch_loss) + epoch_accuracy = compss_wait_on(epoch_accuracy) + training_acc.append(np.array(epoch_accuracy).mean()) + training_loss.append(np.array(epoch_loss).mean()) + self.model_parameters = pt_aggregate_parameters( + parameters_for_workers) + assign_parameters(self.compss_object[0].model, + self.model_parameters) + torch_model = self.compss_object[0].model + if y_test_to_loss is not None: + val_loss, val_acc = compute_validation_losses( + torch_model, x_test, y_test_to_loss, y_test, + validation_score=function_score) + else: + val_loss, val_acc = compute_validation_losses( + torch_model, x_test, y_test, y_test, + validation_score=function_score) + validation_loss.extend(val_loss) + validation_acc.extend(val_acc) + [compss_delete_object(params) for params in parameters_for_workers] + del parameters_for_workers + parameters_for_workers = [copy.deepcopy(self.model_parameters) + for _ in range(self.num_workers)] + self.model_parameters = parameters_for_workers[0] + return self.model_parameters, training_loss, training_acc, \ + validation_loss, validation_acc + def fit_synchronous_with_GPU(self, x_train, y_train, num_batches_per_worker, num_epochs, @@ -331,7 +418,8 @@ def fit_synchronous_with_GPU(self, x_train, y_train, x_train.tensors[int(row)][int(col)], y_train.tensors[int(row)][int(col)], num_batches_per_worker, - shuffle_block_data=shuffle_block_data) + shuffle_block_data=shuffle_block_data, + gradient_clipping=self.gradient_clipping) j = j + 1 if j == self.num_workers: j = 0 @@ -347,7 +435,7 @@ def fit_synchronous_with_GPU(self, x_train, y_train, self.optimizer_parameters["weight_decay"] = \ self.optimizer.param_groups[0]["weight_decay"] parameters_for_workers = compss_wait_on(parameters_for_workers) - self.model_parameters = pt_aggregateParameters( + self.model_parameters = pt_aggregate_parameters( parameters_for_workers) [compss_delete_object(params) for params in parameters_for_workers] del parameters_for_workers @@ -393,7 +481,8 @@ def fit_synchronous_shuffle_with_GPU(self, x_train, y_train, x_train.tensors[int(row)][int(col)], y_train.tensors[int(row)][int(col)], num_batches_per_worker, - shuffle_block_data=False) + shuffle_block_data=False, + gradient_clipping=self.gradient_clipping) j = j + 1 if j == self.num_workers: j = 0 @@ -410,13 +499,126 @@ def fit_synchronous_shuffle_with_GPU(self, x_train, y_train, self.optimizer.param_groups[0]["weight_decay"] parameters_for_workers = compss_wait_on(parameters_for_workers) self.model_parameters = \ - pt_aggregateParameters(parameters_for_workers) + pt_aggregate_parameters(parameters_for_workers) parameters_for_workers = [copy.deepcopy(self.model_parameters) for _ in range(self.num_workers)] parameters_for_workers = compss_wait_on(parameters_for_workers) - self.model_parameters = pt_aggregateParameters(parameters_for_workers) + self.model_parameters = pt_aggregate_parameters(parameters_for_workers) return self.model_parameters + def fit_asynchronous_with_GPU_train_curves( + self, x_train, y_train, + num_batches_per_worker, + num_epochs, + shuffle_blocks=True, + shuffle_block_data=True, + complete_shuffle=False, + x_test=None, + y_test=None, + y_test_to_loss=None, + function_score=accuracy_score): + """ + Training of the neural network performing an asyncrhonous update + of the weights every epoch, it performs a shuffle of the tensors + on the ds_tensor and a local shuffle of the elements inside each + tensor + + Parameters + ---------- + x_train : ds_tensor + samples and features of the training dataset + y_train: ds_tensor + classes or values of the samples of the training dataset + num_batches_per_worker: int + Number of batches that each worker will be trained with every + piece of the dataset + num_epochs: int + Total number of epochs to train the model + shuffle_blocks: boolean + Variable specifying to shuffle the blocks of the ds_tensor or not + shuffle_block_data: boolean + Variable specifying whether to shuffle the elements inside each + tensor locally or not + Returns + ------- + model_parameters: pytorch tensor + """ + parameters_for_workers = [copy.deepcopy(self.model_parameters) + for _ in range(self.num_workers)] + rows = np.arange(x_train.shape[0]) + cols = np.arange(x_train.shape[1]) + training_loss = [] + training_acc = [] + validation_loss = [] + validation_acc = [] + for i in range(num_epochs): + j = 0 + if complete_shuffle: + pass + elif shuffle_blocks: + rows = np.random.permutation(x_train.shape[0]) + cols = np.random.permutation(x_train.shape[1]) + epoch_accuracy = [] + epoch_loss = [] + for row in rows: + for col in cols: + parameters_for_workers[j], train_loss, \ + train_accuracy = \ + self.compss_object[j].\ + train_cnn_batch_GPU( + parameters_for_workers[j], + x_train.tensors[int(row)] + [int(col)], + y_train.tensors[int(row)] + [int(col)], + num_batches_per_worker, + shuffle_block_data=shuffle_block_data, + gradient_clipping=self.gradient_clipping) + j = j + 1 + if j == self.num_workers: + j = 0 + epoch_loss.append(train_loss) + epoch_accuracy.append(train_accuracy) + training_loss.append(epoch_loss) + training_acc.append(epoch_accuracy) + if self.scheduler is not None: + self.scheduler.step() + self.optimizer_parameters = {} + self.optimizer_parameters["lr"] = \ + self.optimizer.param_groups[0]["lr"] + if "momentum" in self.optimizer.param_groups[0]: + self.optimizer_parameters["momentum"] = \ + self.optimizer.param_groups[0]["momentum"] + if "weight_decay" in self.optimizer.param_groups[0]: + self.optimizer_parameters["weight_decay"] = \ + self.optimizer.param_groups[0]["weight_decay"] + for j in range(self.num_workers): + parameters_for_workers[j] = \ + self.compss_object[j].aggregate_parameters_async( + self.model_parameters, + parameters_for_workers[j]) + val_loss, val_acc = evaluate_model_parameters( + x_test, y_test, + self.compss_object[0].model, + parameters_for_workers[-1], + self.loss, + score_function=function_score, + y_val_loss_function=y_test_to_loss) + validation_acc.append(val_acc) + validation_loss.append(val_loss) + parameters_for_workers = compss_wait_on(parameters_for_workers) + self.model_parameters = pt_aggregate_parameters(parameters_for_workers) + training_loss = compss_wait_on(training_loss) + training_acc = compss_wait_on(training_acc) + validation_loss = compss_wait_on(validation_loss) + validation_acc = compss_wait_on(validation_acc) + training_loss = [np.array(epoch_loss).mean() + for epoch_loss in training_loss] + training_acc = [np.array(epoch_accuracy).mean() + for epoch_accuracy in training_acc] + return self.model_parameters, training_loss, training_acc, \ + validation_loss, validation_acc + def fit_asynchronous_with_GPU(self, x_train, y_train, num_batches_per_worker, num_epochs, @@ -465,7 +667,8 @@ def fit_asynchronous_with_GPU(self, x_train, y_train, x_train.tensors[int(row)][int(col)], y_train.tensors[int(row)][int(col)], num_batches_per_worker, - shuffle_block_data=shuffle_block_data) + shuffle_block_data=shuffle_block_data, + gradient_clipping=self.gradient_clipping) j = j + 1 if j == self.num_workers: j = 0 @@ -486,7 +689,7 @@ def fit_asynchronous_with_GPU(self, x_train, y_train, self.model_parameters, parameters_for_workers[j]) parameters_for_workers = compss_wait_on(parameters_for_workers) - self.model_parameters = pt_aggregateParameters(parameters_for_workers) + self.model_parameters = pt_aggregate_parameters(parameters_for_workers) return self.model_parameters def fit_asynchronous_shuffle_with_GPU(self, x_train, y_train, @@ -527,7 +730,8 @@ def fit_asynchronous_shuffle_with_GPU(self, x_train, y_train, x_train.tensors[int(row)][int(col)], y_train.tensors[int(row)][int(col)], num_batches_per_worker, - shuffle_block_data=False) + shuffle_block_data=False, + gradient_clipping=self.gradient_clipping) j = j + 1 if j == self.num_workers: j = 0 @@ -548,7 +752,7 @@ def fit_asynchronous_shuffle_with_GPU(self, x_train, y_train, self.model_parameters, parameters_for_workers[j]) parameters_for_workers = compss_wait_on(parameters_for_workers) - self.model_parameters = pt_aggregateParameters(parameters_for_workers) + self.model_parameters = pt_aggregate_parameters(parameters_for_workers) return self.model_parameters def fit_asynchronous_n_epochs_with_GPU(self, x_train, y_train, @@ -602,7 +806,8 @@ def fit_asynchronous_n_epochs_with_GPU(self, x_train, y_train, x_train.tensors[int(row)][int(col)], y_train.tensors[int(row)][int(col)], num_batches_per_worker, - shuffle_block_data=shuffle_block_data) + shuffle_block_data=shuffle_block_data, + gradient_clipping=self.gradient_clipping) j = j + 1 if j == self.num_workers: j = 0 @@ -624,7 +829,7 @@ def fit_asynchronous_n_epochs_with_GPU(self, x_train, y_train, self.model_parameters, parameters_for_workers[j]) parameters_for_workers = compss_wait_on(parameters_for_workers) - self.model_parameters = pt_aggregateParameters(parameters_for_workers) + self.model_parameters = pt_aggregate_parameters(parameters_for_workers) return self.model_parameters def fit_asynchronous_shuffle_n_epochs_with_GPU(self, x_train, @@ -657,7 +862,7 @@ def fit_asynchronous_shuffle_n_epochs_with_GPU(self, x_train, """ parameters_for_workers = [copy.deepcopy(self.model_parameters) for _ in range(self.num_workers)] - pt_aggregateParameters(parameters_for_workers) + pt_aggregate_parameters(parameters_for_workers) for i in range(num_epochs): j = 0 x_train, y_train = shuffle(x_train, y_train) @@ -671,7 +876,8 @@ def fit_asynchronous_shuffle_n_epochs_with_GPU(self, x_train, x_train.tensors[int(row)][int(col)], y_train.tensors[int(row)][int(col)], num_batches_per_worker, - shuffle_block_data=False) + shuffle_block_data=False, + gradient_clipping=self.gradient_clipping) j = j + 1 if j == self.num_workers: j = 0 @@ -682,5 +888,5 @@ def fit_asynchronous_shuffle_n_epochs_with_GPU(self, x_train, self.model_parameters, parameters_for_workers[j]) parameters_for_workers = compss_wait_on(parameters_for_workers) - self.model_parameters = pt_aggregateParameters(parameters_for_workers) + self.model_parameters = pt_aggregate_parameters(parameters_for_workers) return self.model_parameters diff --git a/dislib/pytorch/pytorch_distributed.py b/dislib/pytorch/pytorch_distributed.py index f7846e08..0c327632 100644 --- a/dislib/pytorch/pytorch_distributed.py +++ b/dislib/pytorch/pytorch_distributed.py @@ -4,10 +4,99 @@ from pycompss.api.constraint import constraint import torch.nn as nn import torch +from sklearn.metrics import accuracy_score +import numpy as np os.environ["CUDA_DEVICE_ORDER"] = "PCI_BUS_ID" +def assign_parameters(model, model_parameters): + if hasattr(model, 'neural_network_layers'): + len_nn = len(model.neural_network_layers) + for i in range(len_nn): + if hasattr(model_parameters.neural_network_layers[i], + 'weight'): + model.neural_network_layers[i].weight = \ + nn.Parameter( + model_parameters.neural_network_layers[i]. + weight.float()) + if hasattr(model_parameters.neural_network_layers[i], + 'bias'): + model.neural_network_layers[i].bias = \ + nn.Parameter( + model_parameters.neural_network_layers[i].bias. + float()) + if hasattr(model.neural_network_layers[i], + 'alpha'): + model.neural_network_layers[i].alpha = \ + nn.Parameter(model_parameters. + neural_network_layers[i].alpha) + if hasattr(model.neural_network_layers[i], + 'shortcut'): + len_shortcut = len(model.neural_network_layers[i].shortcut) + for k in range(len_shortcut): + if hasattr(model.neural_network_layers[i].shortcut[k], + 'weight'): + model.neural_network_layers[i].shortcut[k].weight = \ + nn.Parameter(model_parameters. + neural_network_layers[i].shortcut[k]. + weight.float()) + model.neural_network_layers[i].shortcut[k].bias = \ + nn.Parameter(model_parameters. + neural_network_layers[i].shortcut[k]. + bias.float()) + if hasattr(model.neural_network_layers[i].shortcut[k], + 'alpha'): + model.neural_network_layers[i].shortcut[k].alpha = \ + nn.Parameter(model_parameters. + neural_network_layers[i]. + shortcut[k].alpha) + if hasattr(model.neural_network_layers[i], + 'layers'): + len_layers = len(model.neural_network_layers[i].layers) + for k in range(len_layers): + if hasattr(model.neural_network_layers[i].layers[k], + 'weight'): + model.neural_network_layers[i].layers[k].weight = \ + nn.Parameter(model_parameters. + neural_network_layers[i]. + layers[k].weight.float()) + if hasattr(model.neural_network_layers[i].layers[k], + 'bias'): + model.neural_network_layers[i].layers[k].bias = \ + nn.Parameter(model_parameters. + neural_network_layers[i]. + layers[k].bias.float()) + if hasattr(model.neural_network_layers[i].layers[k], + 'alpha'): + model.neural_network_layers[i].layers[k].alpha = \ + nn.Parameter(model_parameters. + neural_network_layers[i]. + layers[k].alpha) + if hasattr(model, 'dense_neural_network_layers'): + len_nn = len(model_parameters.dense_neural_network_layers) + for i in range(len_nn): + if hasattr( + model_parameters.dense_neural_network_layers[i], + 'weight'): + model.dense_neural_network_layers[i].weight = \ + nn.Parameter( + model_parameters.dense_neural_network_layers[i]. + weight.float()) + if hasattr(model_parameters.dense_neural_network_layers[i], + 'bias'): + model.dense_neural_network_layers[i].bias = \ + nn.Parameter( + model_parameters.dense_neural_network_layers[i]. + bias.float()) + if hasattr(model.neural_network_layers[i], + 'alpha'): + model.neural_network_layers[i].alpha = \ + nn.Parameter(model_parameters. + neural_network_layers[i].alpha) + return model + + class PytorchDistributed(object): """ PytorchDistributed object. It is in charge of executing in parallel the @@ -70,6 +159,74 @@ def train_cnn_batch_GPU(self, model_parameters, x_train, y_train, num_batches, shuffle_block_data) + @constraint(processors=[ + {'processorType': 'CPU', 'computingUnits': '${ComputingUnits}'}, + {'processorType': 'GPU', 'computingUnits': '${ComputingUnitsGPUs}'}]) + @task() + def _train_cnn_batch_GPU_losses(self, model_parameters, x_train, + y_train, num_batches, shuffle_block_data, + transformations=None, + gradient_clipping=None, + score_to_compute=accuracy_score): + if shuffle_block_data: + idx = torch.randperm(x_train.shape[0]) + if not isinstance(x_train.size, int): + x_train = x_train[idx].view(x_train.size()) + else: + if not torch.is_tensor(x_train): + x_train = x_train[idx] + else: + x_train = torch.from_numpy(x_train) + x_train = x_train[idx].view(x_train.size()) + if not isinstance(y_train.size, int): + if len(y_train.size()) > 1: + y_train = y_train[idx].view(y_train.size()) + else: + y_train = y_train[idx] + else: + if not torch.is_tensor(y_train): + y_train = y_train[idx] + else: + y_train = torch.from_numpy(y_train) + y_train = y_train[idx].view(y_train.size()) + self.model = assign_parameters(self.model, model_parameters) + device = torch.device("cuda" if torch.cuda.is_available() else "cpu") + self.model = self.model.to(device) + optimizer = self.optimizer(self.model.parameters(), + **self.optimizer_parameters) + x_train = x_train.float().to(device) + indexes = int(x_train.shape[0] / num_batches) + if len(y_train.shape) == 1: + y_train = y_train.float() + elif y_train.shape[-1] == 1: + y_train = y_train.float() + if isinstance(self.loss, nn.CrossEntropyLoss): + y_train = y_train.long() + true_labels = y_train.to(device) + output_list = [] + losses_epoch = [] + for idx in range(num_batches): + optimizer.zero_grad() + inputs = x_train[idx*indexes:(idx + 1)*indexes] + if transformations is not None: + inputs = transformations(inputs) + outputs = self.model(inputs) + loss = self.loss(outputs, + true_labels[idx*indexes:(idx+1)*indexes]) + loss.backward() + if gradient_clipping is not None: + gradient_clipping(self.model) + optimizer.step() + losses_epoch.append(loss.item()) + output_list.append(outputs.to("cpu")) + output_list = torch.cat(output_list) + _, output_list = torch.max(output_list, dim=1) + true_labels = true_labels.to("cpu") + self.model = self.model.to("cpu") + return self.model, np.array(losses_epoch).mean(), \ + score_to_compute(true_labels.detach().numpy(), + output_list.detach().numpy()) + @constraint(processors=[ {'processorType': 'CPU', 'computingUnits': '${ComputingUnits}'}, {'processorType': 'GPU', 'computingUnits': '${ComputingUnitsGPUs}'}]) @@ -99,47 +256,7 @@ def _train_cnn_batch_GPU(self, model_parameters, x_train, else: y_train = torch.from_numpy(y_train) y_train = y_train[idx].view(y_train.size()) - if hasattr(self.model, 'neural_network_layers'): - len_nn = len(self.model.neural_network_layers) - for i in range(len_nn): - if hasattr(model_parameters.neural_network_layers[i], - 'weight'): - self.model.neural_network_layers[i].weight = \ - nn.Parameter( - model_parameters.neural_network_layers[i]. - weight.float()) - if hasattr(model_parameters.neural_network_layers[i], - 'bias'): - self.model.neural_network_layers[i].bias = \ - nn.Parameter( - model_parameters.neural_network_layers[i].bias. - float()) - if hasattr(self.model.neural_network_layers[i], - 'alpha'): - self.model.neural_network_layers[i].alpha = \ - nn.Parameter(model_parameters. - neural_network_layers[i].alpha) - if hasattr(self.model, 'dense_neural_network_layers'): - len_nn = len(model_parameters.dense_neural_network_layers) - for i in range(len_nn): - if hasattr( - model_parameters.dense_neural_network_layers[i], - 'weight'): - self.model.dense_neural_network_layers[i].weight = \ - nn.Parameter( - model_parameters.dense_neural_network_layers[i]. - weight.float()) - if hasattr(model_parameters.dense_neural_network_layers[i], - 'bias'): - self.model.dense_neural_network_layers[i].bias = \ - nn.Parameter( - model_parameters.dense_neural_network_layers[i]. - bias.float()) - if hasattr(self.model.neural_network_layers[i], - 'alpha'): - self.model.neural_network_layers[i].alpha = \ - nn.Parameter(model_parameters. - neural_network_layers[i].alpha) + self.model = assign_parameters(self.model, model_parameters) device = torch.device("cuda" if torch.cuda.is_available() else "cpu") self.model = self.model.to(device) optimizer = self.optimizer(self.model.parameters(), diff --git a/tests/test_distributed_pytorch.py b/tests/test_distributed_pytorch.py index e377b1bc..0cedaba4 100644 --- a/tests/test_distributed_pytorch.py +++ b/tests/test_distributed_pytorch.py @@ -51,6 +51,85 @@ def test_initialization(self): self.assertTrue(isinstance(model, MNIST_Network)) def test_synchronous_training(self): + x = torch.ones([1000, 1, 1, 1]) + y = torch.ones([1000, 1]) + x[:100] = torch.ones([100, 1, 1, 1]) + y[:100] = 0 + x[100:200] = torch.ones([100, 1, 1, 1]) * 2 + y[100:200] = 1 + x[200:300] = torch.ones([100, 1, 1, 1])*4 + y[200:300] = 2 + x[300:400] = torch.ones([100, 1, 1, 1])*8 + y[300:400] = 3 + x[400:500] = torch.ones([100, 1, 1, 1]) * 16 + y[400:500] = 4 + x[500:600] = torch.ones([100, 1, 1, 1]) * 32 + y[500:600] = 5 + x[600:700] = torch.ones([100, 1, 1, 1]) * 64 + y[600:700] = 6 + x[700:800] = torch.ones([100, 1, 1, 1]) * 128 + y[700:800] = 7 + x[800:900] = torch.ones([100, 1, 1, 1]) * 256 + y[800:900] = 8 + x[900:] = torch.ones([100, 1, 1, 1]) * 512 + y[900:] = 9 + indices = torch.randperm(x.size()[0]) + x = x[indices] + y = y[indices] + x_tensor = ds.from_pt_tensor(tensor=x, + shape=(2, 2)) + x_tensor_original = ds.from_pt_tensor(tensor=x, + shape=(2, 2)) + y = torch.nn.functional.one_hot(y.long(), num_classes=-1) + y = y[:, 0, :].double() + y_tensor = ds.from_pt_tensor(tensor=y, + shape=(2, 2)) + y_tensor_original = ds.from_pt_tensor(tensor=y, + shape=(2, 2)) + model = TestsNetwork(1, 10) + model.apply(init_weights) + encaps_function = EncapsulatedFunctionsDistributedPytorch( + num_workers=1) + criterion = nn.CrossEntropyLoss + optimizer = optim.SGD + optimizer_parameters = {"lr": 0.002, "momentum": 0.9} + encaps_function.build(model, optimizer, + criterion, optimizer_parameters, + num_gpu=1, num_nodes=1) + y_tensor = y_tensor_original.collect() + x_tensor = x_tensor_original.collect() + y_tensor_function = torch.cat([tens for tensor in y_tensor + for tens in tensor]) + trained_weights, train_loss, train_acc, validation_loss, \ + validation_acc = encaps_function.\ + fit_synchronous_with_GPU_train_curves(x_tensor, + y_tensor, 10, 64, + x_test=x_tensor, + y_test=y_tensor_function) + self.assertTrue(torch.is_tensor(train_loss[0])) + self.assertTrue(torch.is_tensor(train_acc[0])) + self.assertTrue(torch.is_tensor(validation_loss[0])) + self.assertTrue(torch.is_tensor(validation_acc[0])) + self.assertTrue(len(train_loss) == 10) + self.assertTrue(len(train_acc) == 10) + self.assertTrue(len(validation_loss) == 10) + self.assertTrue(len(validation_acc) == 10) + model = set_weights_neural_network(model, trained_weights) + model.eval() + total = 0 + running_accuracy = 0 + for tensor, labels in zip(x_tensor, y_tensor): + for images, in_labels in zip(tensor, labels): + outputs = in_labels + predicted_outputs = model(images.float()) + preds, predicted = torch.max(predicted_outputs, 1) + outs, outputs = torch.max(outputs, 1) + total += predicted_outputs.shape[0] + running_accuracy += (predicted == outputs).sum().item() + # Check predictions are better than random + self.assertTrue((running_accuracy / total) > 0.1) + + def test_synchronous_training_training_curves(self): x = torch.ones([1000, 1, 1, 1]) y = torch.ones([1000, 1]) x[:100] = torch.ones([100, 1, 1, 1]) @@ -350,6 +429,71 @@ def test_synchronous_with_GPU_cnn(self): running_accuracy += (predicted == outputs).sum().item() self.assertTrue((running_accuracy / total) >= 0.5) + def test_asynchronous_training_training_curves(self): + x = torch.ones([1000, 1, 1, 1]) + y = torch.ones([1000, 1]) + x[:500] = torch.ones([500, 1, 1, 1]) + y[:500] = 0 + x[500:] = torch.ones([500, 1, 1, 1]) * 32 + y[500:] = 1 + indices = torch.randperm(x.size()[0]) + x = x[indices] + y = y[indices] + x_tensor = ds.from_pt_tensor(tensor=x, + shape=(2, 2)) + x_tensor_original = ds.from_pt_tensor(tensor=x, + shape=(2, 2)) + y = torch.nn.functional.one_hot(y.long(), num_classes=-1) + y = y[:, 0, :].double() + y_tensor = ds.from_pt_tensor(tensor=y, + shape=(2, 2)) + y_tensor_original = ds.from_pt_tensor(tensor=y, + shape=(2, 2)) + model = TestsNetwork(1, 10) + model.apply(init_weights) + encaps_function = EncapsulatedFunctionsDistributedPytorch( + num_workers=1) + criterion = nn.CrossEntropyLoss + optimizer = optim.SGD + x_tensor_original = x_tensor_original.collect() + y_tensor_original = y_tensor_original.collect() + y_test_local = torch.cat([tens for tensor in + y_tensor_original for tens in + tensor]) + optimizer_parameters = {"lr": 0.002, "momentum": 0.9} + encaps_function.build(model, optimizer, + criterion, optimizer_parameters, + num_gpu=1, num_nodes=1) + trained_weights, train_loss, train_acc, validation_loss, \ + validation_acc = encaps_function.\ + fit_asynchronous_with_GPU_train_curves(x_tensor, + y_tensor, 10, 64, + x_test=x_tensor_original, + y_test=y_test_local) + model = set_weights_neural_network(model, trained_weights) + self.assertTrue(torch.is_tensor(train_loss[0])) + self.assertTrue(torch.is_tensor(train_acc[0])) + self.assertTrue(torch.is_tensor(validation_loss[0])) + self.assertTrue(torch.is_tensor(validation_acc[0])) + self.assertTrue(len(train_loss) == 10) + self.assertTrue(len(train_acc) == 10) + self.assertTrue(len(validation_loss) == 10) + self.assertTrue(len(validation_acc) == 10) + model.eval() + total = 0 + running_accuracy = 0 + for tensor, labels in zip(x_tensor_original, + y_tensor_original): + for images, in_labels in zip(tensor, labels): + outputs = in_labels + predicted_outputs = model(images.float()) + preds, predicted = torch.max(predicted_outputs, 1) + outs, outputs = torch.max(outputs, 1) + total += predicted_outputs.shape[0] + running_accuracy += (predicted == outputs).sum().item() + # Check predictions are better than random + self.assertTrue((running_accuracy / total) >= 0.5) + def test_asynchronous_shuffle_with_GPU_cnn(self): x = torch.ones([1000, 1, 1, 1]) y = torch.ones([1000, 1]) diff --git a/tests/test_tensor.py b/tests/test_tensor.py index d3cccebe..23a6b37d 100644 --- a/tests/test_tensor.py +++ b/tests/test_tensor.py @@ -611,6 +611,7 @@ def test_from_ds_array(self): i_tensor): self.assertTrue(np.all(compss_wait_on(j_block) == np.array(compss_wait_on(j_tensor)))) + self.assertTrue(ds_tensor.tensor_shape == (2, 2)) with self.assertRaises(TypeError): ds.from_ds_array(np_array, (2, 2)) ds_array = ds.array([[1, 2, 3, 4], @@ -622,6 +623,22 @@ def test_from_ds_array(self): with self.assertRaises(ValueError): ds.from_ds_array(ds_array, (6, 2)) + def test_from_ds_array_single_column(self): + ds_array = ds.array([[1, 2, 3, 4], + [5, 6, 7, 8], + [9, 10, 11, 12], + [13, 14, 15, 16]], (2, 2)) + ds_tensor = ds.from_ds_array(ds_array, shape=(2, 2), one_column=True) + first_tensor = ds_tensor.tensors[0][0] + second_tensor = ds_tensor.tensors[1][0] + self.assertTrue(np.all(compss_wait_on(first_tensor) == + np.block(compss_wait_on( + ds_array._blocks[0])))) + self.assertTrue(np.all(compss_wait_on(second_tensor) == + np.block(compss_wait_on( + ds_array._blocks[1])))) + self.assertTrue(ds_tensor.tensor_shape == (2, 4)) + def test_load_dataset(self): path = "tests/datasets/npy" x_train_tensor = ds.data.tensor.load_dataset(2, path)