nvflare.recipe package

Submodules

Module contents

class FedAvgRecipe(*, name: str = 'fedavg', model: Any | Dict[str, Any] | None = None, initial_ckpt: str | None = None, min_clients: int, num_rounds: int = 2, train_script: str, train_args: str = '', aggregator: Aggregator | None = None, aggregator_data_kind: DataKind | None = DataKind.WEIGHTS, launch_external_process: bool = False, command: str = 'python3 -u', framework: FrameworkType = FrameworkType.PYTORCH, server_expected_format: ExchangeFormat = ExchangeFormat.NUMPY, params_transfer_type: TransferType = TransferType.FULL, model_persistor: ModelPersistor | None = None, per_site_config: Dict[str, Dict] | None = None, launch_once: bool = True, shutdown_timeout: float = 0.0, key_metric: str = 'accuracy', stop_cond: str | None = None, patience: int | None = None, best_model_filename: str | None = None, save_filename: str | None = None, exclude_vars: str | None = None, aggregation_weights: Dict[str, float] | None = None, server_memory_gc_rounds: int = 0, enable_tensor_disk_offload: bool = False, client_memory_gc_rounds: int = 0, cuda_empty_cache: bool = False)[source]

Bases: Recipe

Unified FedAvg recipe for PyTorch, TensorFlow, and Scikit-learn.

FedAvg is a fundamental federated learning algorithm that aggregates model updates from multiple clients by computing a weighted average based on the amount of local training data. This recipe sets up a complete federated learning workflow with memory-efficient InTime aggregation.

The recipe configures: - A federated job with initial model (optional) - FedAvg controller with InTime aggregation for memory efficiency - Optional early stopping and model selection - Script runners for client-side training execution

Parameters:
  • name – Name of the federated learning job. Defaults to “fedavg”.

  • model – Initial model to start federated training with. Can be: - Model instance (nn.Module, tf.keras.Model, etc.) - Dict config: {“class_path”: “module.ClassName”, “args”: {“param”: value}} - None: no initial model For framework-specific types (nn.Module, tf.keras.Model), use the corresponding framework recipe (e.g., nvflare.app_opt.pt.recipes.FedAvgRecipe).

  • initial_ckpt – Absolute path to a pre-trained checkpoint file. The file may not exist locally as it could be on the server. Used to load initial weights.

  • min_clients – Minimum number of clients required to start a training round.

  • num_rounds – Number of federated training rounds to execute. Defaults to 2.

  • train_script – Path to the training script that will be executed on each client.

  • train_args – Command line arguments to pass to the training script.

  • aggregator – Custom aggregator (ModelAggregator) for combining client model updates. Must implement accept_model(), aggregate_model(), reset_stats() methods. If None, uses built-in memory-efficient weighted averaging. Defaults to None.

  • aggregator_data_kind – Data kind for aggregation (DataKind.WEIGHTS or DataKind.WEIGHT_DIFF). Kept for backward compatibility. Defaults to DataKind.WEIGHTS.

  • launch_external_process – Whether to launch the script in external process. Defaults to False.

  • command – If launch_external_process=True, command to run script (prepended to script). Defaults to “python3 -u”.

  • framework – The framework type. One of: - FrameworkType.PYTORCH (default) - FrameworkType.TENSORFLOW - FrameworkType.NUMPY - FrameworkType.RAW (for custom frameworks, e.g., sklearn, XGBoost)

  • server_expected_format – What format to exchange the parameters between server and client. Defaults to ExchangeFormat.NUMPY.

  • params_transfer_type – How to transfer the parameters. FULL means the whole model parameters are sent. DIFF means that only the difference is sent. Defaults to TransferType.FULL.

  • model_persistor – Custom model persistor for any framework. If None, uses the framework’s default persistor when one is available.

  • per_site_config – Per-site configuration for the federated learning job. Dictionary mapping site names to configuration dicts. Each config dict can contain optional overrides: - train_script (str): Training script path - train_args (str): Script arguments - launch_external_process (bool): Whether to launch external process - command (str): Command prefix for external process - framework (FrameworkType): Framework type - server_expected_format (ExchangeFormat): Exchange format - params_transfer_type (TransferType): Parameter transfer type - launch_once (bool): Whether to launch external process once or per task - shutdown_timeout (float): Shutdown timeout in seconds If not provided, the same configuration will be used for all clients.

  • launch_once – Whether the external process will be launched only once at the beginning or on each task. Only used if launch_external_process is True. Defaults to True.

  • shutdown_timeout – If provided, will wait for this number of seconds before shutdown. Only used if launch_external_process is True. Defaults to 0.0.

  • key_metric – Metric used to determine if the model is globally best. If validation metrics are a dict, key_metric selects the metric used for global model selection by the IntimeModelSelector. Defaults to “accuracy”.

  • stop_cond – Early stopping condition based on metric. String literal in the format of ‘<key> <op> <value>’ (e.g. “accuracy >= 80”). If None, early stopping is disabled.

  • patience – Number of rounds with no improvement after which FL will be stopped. Only applies if stop_cond is set. Defaults to None.

  • best_model_filename – Filename for saving the best model. If unset, framework persistors that expose a separate best-model filename use their own default, such as DefaultCheckpointFileName.BEST_GLOBAL_MODEL for the default PyTorch persistor.

  • save_filename – Deprecated alias for best_model_filename. If both are specified, they must match.

  • exclude_vars – Regex pattern for variables to exclude from aggregation.

  • aggregation_weights – Per-client aggregation weights dict. Defaults to equal weights.

  • server_memory_gc_rounds – Run memory cleanup (gc.collect + malloc_trim) every N rounds on server. Set to 0 to disable. Defaults to 0.

  • enable_tensor_disk_offload – Enable disk-backed tensor offload for incoming streamed payloads. When True, server receives tensor payloads via temp files and materializes lazily.

Note

This recipe uses InTime (streaming) aggregation for memory efficiency - each client result is aggregated immediately upon receipt rather than collecting all results first. Memory usage is constant regardless of the number of clients.

If you want to use a custom aggregator, you can pass it in the aggregator parameter. The custom aggregator must be a subclass of the Aggregator class.

This is base class of a recipe. Recipes are implemented by jobs. A concrete recipe must provide the job for recipe implementation.

Parameters:

job – the job that implements the recipe.

class PocEnv(*, num_clients: int | None = 2, clients: list[str] | None = None, gpu_ids: list[int] | None = None, use_he: bool = False, docker_image: str | None = None, project_conf_path: str = '', username: str = 'admin@nvidia.com', study: str = 'default', extra: dict | None = None)[source]

Bases: ExecEnv

Proof of Concept execution environment for local testing and development.

This environment sets up a POC deployment on a single machine with multiple processes representing the server, clients, and admin console.

Initialize POC execution environment.

Parameters:
  • num_clients (int, optional) – Number of clients to use in POC mode. Defaults to 2.

  • clients (list[str], optional) – List of client names. If None, will generate site-1, site-2, etc. Defaults to None. If specified, number_of_clients argument will be ignored.

  • gpu_ids (list[int], optional) – List of GPU IDs to assign to clients. If None, uses CPU only. Defaults to None.

  • use_he (bool, optional) – Whether to use HE. Defaults to False.

  • docker_image (str, optional) – Docker image to use for POC. Defaults to None.

  • project_conf_path (str, optional) – Path to the project configuration file. Defaults to “”. If specified, ‘number_of_clients’,’clients’ and ‘docker’ specific options will be ignored.

  • username (str, optional) – Admin user. Defaults to “admin@nvidia.com”.

  • study (str, optional) – Study name to tag submitted jobs. Defaults to “default”.

  • extra – extra env info.

abort_job(job_id: str) None[source]

Abort a running job.

Parameters:

job_id – The job ID to abort.

deploy(job: FedJob) str[source]

Deploy a FedJob to the POC environment.

Parameters:

job (FedJob) – The FedJob to deploy.

Returns:

Job ID.

Return type:

str

Raises:

ValueError – If scripts do not exist locally.

get_job_result(job_id: str, timeout: float = 0.0) str | None[source]

Get the result workspace of a job.

Parameters:
  • job_id – The job ID to get results for.

  • timeout – The timeout for the job to complete. Defaults to 0.0 (no timeout).

Returns:

The result workspace path if job completed, None otherwise.

Return type:

Optional[str]

get_job_status(job_id: str) str | None[source]

Get the status of a job.

Parameters:

job_id – The job ID to check status for.

Returns:

The status of the job, or None if not available.

Return type:

Optional[str]

stop(clean_up: bool = False) None[source]

Try to stop and clean existing POC.

This method is idempotent - safe to call multiple times.

Parameters:

clean_up (bool, optional) – Whether to clean the POC workspace. Defaults to False.

class ProdEnv(startup_kit_location: str, login_timeout: float = 5.0, username: str = 'admin@nvidia.com', study: str = 'default', extra: dict | None = None)[source]

Bases: ExecEnv

Production execution environment for submitting and monitoring NVFlare jobs.

This environment uses the startup kit of an NVFlare deployment to submit jobs via the Flare API.

Parameters:
  • startup_kit_location (str) – Path to the admin’s startup kit directory.

  • login_timeout (float) – Timeout (in seconds) for logging into the Flare API session. Must be > 0.

  • username (str) – Username to log in with.

  • study (str) – Study name to tag submitted jobs. Defaults to “default”.

  • extra – extra env info.

abort_job(job_id: str) None[source]

Abort a running job.

Parameters:

job_id – The job ID to abort.

deploy(job: FedJob) str[source]

Deploy a job using SessionManager.

get_job_result(job_id: str, timeout: float = 0.0) str | None[source]

Get the result workspace of a job.

Parameters:
  • job_id – The job ID to get results for.

  • timeout – The timeout for the job to complete. Defaults to 0.0 (no timeout).

Returns:

The result workspace path if job completed, None if still running or stopped early.

Return type:

Optional[str]

get_job_status(job_id: str) str | None[source]

Get the status of a job.

Parameters:

job_id – The job ID to check status for.

Returns:

The status of the job, or None if not supported.

Return type:

Optional[str]

class Run(exec_env: ExecEnv, job_id: str)[source]

Bases: object

Represents a running or completed job execution.

Provides methods to get job status, results, and abort the job. Caches status and result after the execution environment is stopped.

This class is thread-safe. All state-changing operations are protected by a lock.

Initialize a Run instance.

Parameters:
  • exec_env – The execution environment managing this job.

  • job_id – The unique identifier for the job.

Raises:

ValueError – If exec_env is None or job_id is empty.

abort() None[source]

Abort the running job.

This is a no-op if the execution environment has already been stopped (e.g., after get_result() was called). Errors are logged but not raised.

get_job_id() str[source]

Get the job ID.

Returns:

The job ID.

Return type:

str

get_result(timeout: float = 0.0, clean_up: bool = True) str | None[source]

Get the result workspace of the run.

Waits for job to complete, caches status, then stops execution environment.

Parameters:
  • timeout (float, optional) – Timeout for job completion. Defaults to 0.0 (no timeout).

  • clean_up (bool, optional) – Whether to remove the execution-environment workspace (e.g. the POC workspace) when stopping. Defaults to True, preserving the existing “each run is independent” behavior. Pass clean_up=False to keep the workspace on disk after the run so server/client log files (including the per-service poc_console.log introduced in #4500) remain available for debugging or test assertions.

Returns:

Result workspace path, or None if job not finished or on error. The path may be removed by the time this method returns when clean_up=True.

Return type:

Optional[str]

get_status() str | None[source]

Get the status of the run.

Returns:

The status of the run, or None if not available or on error.

Return type:

Optional[str]

class SimEnv(*, num_clients: int = 0, clients: list[str] | None = None, num_threads: int | None = None, gpu_config: str | None = None, log_config: str | None = None, workspace_root: str = '/tmp/nvflare/simulation', extra: dict | None = None)[source]

Bases: ExecEnv

Initialize simulation execution environment.

Parameters:
  • num_clients (int, optional) – Number of simulated clients. Defaults to 0.

  • clients (list[str], optional) – List of client names. Defaults to None.

  • num_threads (int, optional) – Number of threads to run simulator. Defaults to None. If not provided, the number of threads will be set to the number of clients.

  • gpu_config (str, optional) – GPU configuration string. Defaults to None.

  • log_config (str, optional) – Log configuration string. Defaults to None.

  • workspace_root (str, optional) – Root directory for simulation workspace. Defaults to WORKSPACE_ROOT.

  • extra – extra env config info

abort_job(job_id: str) None[source]

Abort job - not supported in simulation environment.

deploy(job: FedJob)[source]

Deploy a FedJob and return an execution response.

Parameters:

job – The FedJob to deploy.

Returns:

The job ID.

Return type:

str

get_job_result(job_id: str, timeout: float = 0.0) str | None[source]

Get job result workspace path.

get_job_status(job_id: str) str | None[source]

Get job status - not supported in simulation environment.

add_cross_site_evaluation(recipe: Recipe, submit_model_timeout: int = 600, validation_timeout: int = 6000, participating_clients: List[str] | None = None)[source]

Add cross-site evaluation to an existing recipe.

This utility automatically configures cross-site evaluation by: - Auto-detecting the framework from the recipe - Adding the appropriate model locator - Adding the CrossSiteModelEval controller - Adding ValidationJsonGenerator for results - Auto-adding the appropriate validator to clients (for NumPy recipes)

For standalone CSE without training, use NumpyCrossSiteEvalRecipe instead.

Note: This utility is designed for adding CSE to training recipes. If you call it on a CSE-only recipe (e.g., NumpyCrossSiteEvalRecipe), it will detect this and skip adding duplicate validators automatically.

WARNING: Do not call this function multiple times on the same recipe instance. This function is idempotent and will raise a RuntimeError if called more than once on the same recipe to prevent duplicate component registration.

IMPORTANT for PyTorch: Your client training script must handle validation tasks by checking flare.is_evaluate() and returning metrics without training. Example pattern:

```python # In your client script: while flare.is_running():

input_model = flare.receive() model.load_state_dict(input_model.params)

# Evaluate model (always required) metrics = evaluate(model, test_loader)

# Handle CSE validation task if flare.is_evaluate():

output_model = flare.FLModel(metrics=metrics) flare.send(output_model) continue # Skip training for validation-only tasks

# Normal training code here…

```

Example (NumPy - fully automatic):

```python from nvflare.app_common.np.recipes import NumpyFedAvgRecipe from nvflare.recipe.utils import add_cross_site_evaluation

recipe = NumpyFedAvgRecipe(

name=”my-job”, model=[1.0, 2.0, 3.0], min_clients=2, num_rounds=3, train_script=”client.py”

)

# That’s it! Framework auto-detected, validator auto-added add_cross_site_evaluation(recipe) ```

Example (PyTorch - requires client script support):

```python from nvflare.app_opt.pt.recipes import FedAvgRecipe from nvflare.recipe.utils import add_cross_site_evaluation

recipe = FedAvgRecipe(

name=”my-job”, min_clients=2, num_rounds=3, model=MyModel(), train_script=”client.py”

)

# Note: client.py must handle flare.is_evaluate() for validation add_cross_site_evaluation(recipe) ```

Example (TensorFlow - Client API pattern, recommended):

```python from nvflare.app_opt.tf.recipes import FedAvgRecipe from nvflare.recipe.utils import add_cross_site_evaluation

recipe = FedAvgRecipe(

name=”my-job”, min_clients=2, num_rounds=3, model=MyTFModel(), train_script=”client.py”

)

# Note: client.py must handle flare.is_evaluate() for validation add_cross_site_evaluation(recipe) ```

Example (TensorFlow - Component-based alternative):

```python from nvflare.app_opt.tf.recipes import FedAvgRecipe from nvflare.app_opt.tf.tf_validator import TFValidator from nvflare.recipe.utils import add_cross_site_evaluation

recipe = FedAvgRecipe(

name=”my-job”, min_clients=2, num_rounds=3, model=MyTFModel(), train_script=”client.py”

)

add_cross_site_evaluation(recipe)

# Optional: manually add TFValidator for component-based validation validator = TFValidator(model=my_model, data_loader=test_loader) recipe.job.to_clients(validator, tasks=[“validate”]) ```

Parameters:
  • recipe – Recipe instance to augment with cross-site evaluation.

  • submit_model_timeout – Timeout (seconds) for submitting models to clients. Defaults to 600.

  • validation_timeout – Timeout (seconds) for validation tasks on clients. Defaults to 6000.

  • participating_clients – Optional list of client names to include in cross-site evaluation. If not provided, all clients connected at controller start are used.

Raises:
  • ValueError – If the recipe doesn’t have a framework attribute or uses an unsupported framework.

  • RuntimeError – If cross-site evaluation has already been added to this recipe.

Note

  • Currently supports PyTorch, NumPy, and TensorFlow frameworks.

  • NumPy recipes using `NumpyFedAvgRecipe`: Validators (NPValidator) are automatically added to clients to handle validation tasks. The function intelligently detects if validators are already configured by checking for executors handling TASK_VALIDATION, avoiding duplicates for CSE-only recipes (like NumpyCrossSiteEvalRecipe).

  • Unified `FedAvgRecipe` with `framework=FrameworkType.NUMPY`: Uses the same Client API validation pattern as PyTorch and TensorFlow. Your client script should handle flare.is_evaluate() and return metrics for validation tasks.

  • PyTorch recipes: No separate validator component is needed. The client training script handles validation tasks through the Client API’s flare.is_evaluate() check. See the hello-pt example for implementation pattern.

  • TensorFlow recipes: Similar to PyTorch, uses the Client API pattern. The client script should handle validation tasks via flare.is_evaluate() check.

add_experiment_tracking(recipe: Recipe, tracking_type: str, tracking_config: dict | None = None, client_side: bool = False, server_side: bool = True)[source]

Add experiment tracking to a recipe.

Adds tracking receivers to the server and/or clients to collect and log metrics during training.

Parameters:
  • recipe – Recipe instance to augment with experiment tracking.

  • tracking_type – Type of tracking to enable (“mlflow”, “tensorboard”, or “wandb”).

  • tracking_config – Optional configuration dict for the tracking receiver.

  • client_side – If True, add tracking to all clients (each client tracks locally).

  • server_side – If True, add tracking to server (aggregates metrics from all clients). Default: True.

Examples

# Server-side tracking (default - federated metrics) add_experiment_tracking(recipe, “mlflow”, {“tracking_uri”: “…”})

# Client-side tracking only (each client tracks independently) add_experiment_tracking(recipe, “mlflow”, {…}, client_side=True, server_side=False)

# Both server and client tracking add_experiment_tracking(recipe, “mlflow”, {…}, client_side=True, server_side=True)