nvflare.edge.simulation.et_task_processor module
- class ETTaskProcessor(data_path: str, training_config: Dict | None = None)[source]
Bases:
DeviceTaskProcessor,ABCBase ExecutorTorch task processor.
Initialize the task processor.
- Parameters:
data_path – Path to the dataset
training_config – Configuration for training including: - batch_size (int): Size of each training batch (default: 32) - shuffle (bool): Whether to shuffle the dataset (default: True) - num_workers (int): Number of worker processes for data loading (default: 0) - learning_rate (float): Learning rate for optimization (default: 0.1) - momentum (float): Momentum factor (default: 0.0) - weight_decay (float): Weight decay factor (default: 0.0) - dampening (float): Dampening for momentum (default: 0.0) - nesterov (bool): Enables Nesterov momentum (default: False) - epoch (int): Number of training epochs (default: 1)
- abstract create_dataset(data_path: str) Dataset[source]
Create dataset for training.
Note: This method may perform expensive I/O operations.
- Parameters:
data_path – Path to dataset
- Returns:
PyTorch dataset for training
- Return type:
Dataset
- process_task(task: TaskResponse) dict[source]
Process received task and return results.
- Parameters:
task – The task response containing model and instructions
- Returns:
Results from training
- Return type:
dict
- Raises:
ValueError – If task data is invalid or protocol validation fails
RuntimeError – If training operations fail
- run_training(et_model, total_epochs: int = 1) Dict[source]
Run training loop.
- Parameters:
et_model – ExecutorTorch model
total_epochs – Number of epochs to train
- Returns:
Training results with parameter differences
- Return type:
dict
- setup(job: JobResponse) None[source]
Set up the task processor for a new job.
- Parameters:
job – Job response containing job information and configuration