nvflare.edge.simulation.et_task_processor module

class ETTaskProcessor(data_path: str, training_config: Dict | None = None)[source]

Bases: DeviceTaskProcessor, ABC

Base ExecutorTorch task processor.

Initialize the task processor.

Parameters:
  • data_path – Path to the dataset

  • training_config – Configuration for training including: - batch_size (int): Size of each training batch (default: 32) - shuffle (bool): Whether to shuffle the dataset (default: True) - num_workers (int): Number of worker processes for data loading (default: 0) - learning_rate (float): Learning rate for optimization (default: 0.1) - momentum (float): Momentum factor (default: 0.0) - weight_decay (float): Weight decay factor (default: 0.0) - dampening (float): Dampening for momentum (default: 0.0) - nesterov (bool): Enables Nesterov momentum (default: False) - epoch (int): Number of training epochs (default: 1)

abstract create_dataset(data_path: str) Dataset[source]

Create dataset for training.

Note: This method may perform expensive I/O operations.

Parameters:

data_path – Path to dataset

Returns:

PyTorch dataset for training

Return type:

Dataset

get_dataset() Dataset[source]

Get the dataset, creating it if necessary (cached).

process_task(task: TaskResponse) dict[source]

Process received task and return results.

Parameters:

task – The task response containing model and instructions

Returns:

Results from training

Return type:

dict

Raises:
  • ValueError – If task data is invalid or protocol validation fails

  • RuntimeError – If training operations fail

run_training(et_model, total_epochs: int = 1) Dict[source]

Run training loop.

Parameters:
  • et_model – ExecutorTorch model

  • total_epochs – Number of epochs to train

Returns:

Training results with parameter differences

Return type:

dict

setup(job: JobResponse) None[source]

Set up the task processor for a new job.

Parameters:

job – Job response containing job information and configuration

shutdown() None[source]

Clean up resources when shutting down.

calc_params_diff(initial_p, last_p)[source]
clone_params(et_params)[source]
tensor_dict_to_json(d)[source]