nvflare.recipe.spec module

class ExecEnv(extra: dict | None = None)[source]

Bases: ABC

Constructor of ExecEnv

Parameters:

extra – a dict of extra properties

abstract abort_job(job_id: str) None[source]

Abort a running job.

Parameters:

job_id – The job ID to abort.

abstract deploy(job: FedJob) str[source]

Deploy a FedJob and return an execution response.

Parameters:

job – The FedJob to deploy.

Returns:

The job ID.

Return type:

str

get_extra_prop(prop_name: str, default=None)[source]

Get the specified extra property.

Parameters:
  • prop_name – name of the property

  • default – the default value to return if the named property does not exist.

Returns: value of the property or the default

abstract get_job_result(job_id: str, timeout: float = 0.0) str | None[source]

Get the result workspace of a job.

Parameters:
  • job_id – The job ID to get results for.

  • timeout – The timeout for the job to complete. Defaults to 0.0 (no timeout).

Returns:

The result workspace path if job completed, None if still running or stopped early.

Return type:

Optional[str]

abstract get_job_status(job_id: str) str | None[source]

Get the status of a job.

Parameters:

job_id – The job ID to check status for.

Returns:

The status of the job, or None if not supported.

Return type:

Optional[str]

stop(clean_up: bool = False) None[source]

Stop the execution environment and optionally clean up resources.

This method is called after job execution to ensure proper cleanup. Default implementation is a no-op. Override in subclasses that need cleanup.

Parameters:

clean_up – If True, remove workspace and temporary files after stopping. If False, only stop running processes but preserve workspace. Defaults to False.

class Recipe(job: FedJob)[source]

Bases: ABC

This is base class of a recipe. Recipes are implemented by jobs. A concrete recipe must provide the job for recipe implementation.

Parameters:

job – the job that implements the recipe.

add_client_config(config: Dict, clients: List[str] | None = None)[source]

Add top-level configuration parameters to config_fed_client.json.

Parameters:
  • config – Dictionary of configuration parameters to add.

  • clients – Optional list of specific client names. If None, applies to all clients.

Raises:

TypeError – If config is not a dictionary.

add_client_file(file_path: str, clients: List[str] | None = None)[source]

Add a file or directory to client apps.

The file will be added to the client’s custom directory and bundled with the job. Can be a script, configuration file, or any resource needed by clients.

Parameters:
  • file_path – Path to the file or directory to add to clients.

  • clients – Optional list of specific client names. If None, applies to all clients.

Raises:

TypeError – If file_path is not a string.

Example

# Add a wrapper script to all clients recipe.add_client_file(“client_wrapper.sh”)

# Add a script to specific clients recipe.add_client_file(“custom_script.py”, clients=[“site1”, “site2”])

add_client_input_filter(filter: Filter, tasks: List[str] | None = None, clients: List[str] | None = None)[source]

Add a filter to clients for incoming tasks from the server.

Parameters:
  • filter – the filter to be added

  • tasks – tasks that the filter applies to

  • clients – client names to add, if None, all clients will be added.

Returns: None

add_client_output_filter(filter: Filter, tasks: List[str] | None = None, clients: List[str] | None = None)[source]

Add a filter to clients for outgoing result to server.

Parameters:
  • filter – the filter to be added

  • tasks – tasks that the filter applies to

  • clients – client names to add, if None, all clients will be added.

Returns: None

add_decomposers(decomposers: List[str | Decomposer])[source]

Add decomposers to the job

Parameters:

decomposers – spec of decomposers. Can be class names or Decomposer objects

Returns: None

add_server_config(config: Dict)[source]

Add top-level configuration parameters to config_fed_server.json.

Parameters:

config – Dictionary of configuration parameters to add.

Raises:

TypeError – If config is not a dictionary.

add_server_file(file_path: str)[source]

Add a file or directory to server app.

The file will be added to the server’s custom directory and bundled with the job. Can be a script, configuration file, or any resource needed by the server.

Parameters:

file_path – Path to the file or directory to add to server.

Raises:

TypeError – If file_path is not a string.

Example

# Add a wrapper script to server recipe.add_server_file(“server_wrapper.sh”)

add_server_input_filter(filter: Filter, tasks: List[str] | None = None)[source]

Add a filter to server for incoming task result from clients. .

Parameters:
  • filter – the filter to be added

  • tasks – tasks that the filter applies to

Returns: None

add_server_output_filter(filter: Filter, tasks: List[str] | None = None)[source]

Add a filter to the server for outgoing tasks to clients.

Parameters:
  • filter – the filter to be added

  • tasks – tasks that the filter applies to

Returns: None

enable_log_streaming(*file_names: str) None[source]

Enable live log streaming from clients to the server while the job runs.

Adds one JobLogStreamer per file name to clients and a single JobLogReceiver to the server. Streaming is still gated per-site by allow_log_streaming in resources.json.

Parameters:

*file_names – log file base names to stream. If omitted, defaults to "log.json".

Example:

recipe.enable_log_streaming()                     # streams log.json
recipe.enable_log_streaming("log.txt")            # streams a single file
recipe.enable_log_streaming("log.json", "log.txt")  # streams both
execute(env: ExecEnv, server_exec_params: dict | None = None, client_exec_params: dict | None = None) Run | None[source]

Execute or export the recipe based on command-line flags.

Transparently checks sys.argv for --export / --export-dir without interfering with the caller’s own argument parser.

  • python job.py → run the job

  • python job.py --export → export to ./fl_job

  • python job.py --export --export-dir X → export to X

Parameters:
  • env – the execution environment

  • server_exec_params – execution params for the server

  • client_exec_params – execution params for clients

Returns:

Run when executing; raises SystemExit(0) when exporting so callers need not guard against a None return value.

export(job_dir: str, server_exec_params: dict | None = None, client_exec_params: dict | None = None, env: ExecEnv | None = None)[source]

Export the recipe to a job definition.

Parameters:
  • job_dir – directory where the job will be exported to.

  • server_exec_params – execution params for the server

  • client_exec_params – execution params for clients

  • env – the environment that the exported job will be running in

Returns: None

process_env(env: ExecEnv)[source]

Process environment-specific configuration.

Subclasses can override to add environment-specific processing. Script validation is handled by each ExecEnv subclass in deploy().

run(env: ExecEnv, server_exec_params: dict | None = None, client_exec_params: dict | None = None) Run[source]

Run the recipe in a specified execution environment.

Parameters:
  • env – the execution environment

  • server_exec_params – execution params for the server

  • client_exec_params – execution params for clients

Returns: Run to get job ID and execution results