nvflare.recipe.spec module
- class ExecEnv(extra: dict | None = None)[source]
Bases:
ABCConstructor of ExecEnv
- Parameters:
extra – a dict of extra properties
- abstract abort_job(job_id: str) None[source]
Abort a running job.
- Parameters:
job_id – The job ID to abort.
- abstract deploy(job: FedJob) str[source]
Deploy a FedJob and return an execution response.
- Parameters:
job – The FedJob to deploy.
- Returns:
The job ID.
- Return type:
str
- get_extra_prop(prop_name: str, default=None)[source]
Get the specified extra property.
- Parameters:
prop_name – name of the property
default – the default value to return if the named property does not exist.
Returns: value of the property or the default
- abstract get_job_result(job_id: str, timeout: float = 0.0) str | None[source]
Get the result workspace of a job.
- Parameters:
job_id – The job ID to get results for.
timeout – The timeout for the job to complete. Defaults to 0.0 (no timeout).
- Returns:
The result workspace path if job completed, None if still running or stopped early.
- Return type:
Optional[str]
- abstract get_job_status(job_id: str) str | None[source]
Get the status of a job.
- Parameters:
job_id – The job ID to check status for.
- Returns:
The status of the job, or None if not supported.
- Return type:
Optional[str]
- stop(clean_up: bool = False) None[source]
Stop the execution environment and optionally clean up resources.
This method is called after job execution to ensure proper cleanup. Default implementation is a no-op. Override in subclasses that need cleanup.
- Parameters:
clean_up – If True, remove workspace and temporary files after stopping. If False, only stop running processes but preserve workspace. Defaults to False.
- class Recipe(job: FedJob)[source]
Bases:
ABCThis is base class of a recipe. Recipes are implemented by jobs. A concrete recipe must provide the job for recipe implementation.
- Parameters:
job – the job that implements the recipe.
- add_client_config(config: Dict, clients: List[str] | None = None)[source]
Add top-level configuration parameters to config_fed_client.json.
- Parameters:
config – Dictionary of configuration parameters to add.
clients – Optional list of specific client names. If None, applies to all clients.
- Raises:
TypeError – If config is not a dictionary.
- add_client_file(file_path: str, clients: List[str] | None = None)[source]
Add a file or directory to client apps.
The file will be added to the client’s custom directory and bundled with the job. Can be a script, configuration file, or any resource needed by clients.
- Parameters:
file_path – Path to the file or directory to add to clients.
clients – Optional list of specific client names. If None, applies to all clients.
- Raises:
TypeError – If file_path is not a string.
Example
# Add a wrapper script to all clients recipe.add_client_file(“client_wrapper.sh”)
# Add a script to specific clients recipe.add_client_file(“custom_script.py”, clients=[“site1”, “site2”])
- add_client_input_filter(filter: Filter, tasks: List[str] | None = None, clients: List[str] | None = None)[source]
Add a filter to clients for incoming tasks from the server.
- Parameters:
filter – the filter to be added
tasks – tasks that the filter applies to
clients – client names to add, if None, all clients will be added.
Returns: None
- add_client_output_filter(filter: Filter, tasks: List[str] | None = None, clients: List[str] | None = None)[source]
Add a filter to clients for outgoing result to server.
- Parameters:
filter – the filter to be added
tasks – tasks that the filter applies to
clients – client names to add, if None, all clients will be added.
Returns: None
- add_decomposers(decomposers: List[str | Decomposer])[source]
Add decomposers to the job
- Parameters:
decomposers – spec of decomposers. Can be class names or Decomposer objects
Returns: None
- add_server_config(config: Dict)[source]
Add top-level configuration parameters to config_fed_server.json.
- Parameters:
config – Dictionary of configuration parameters to add.
- Raises:
TypeError – If config is not a dictionary.
- add_server_file(file_path: str)[source]
Add a file or directory to server app.
The file will be added to the server’s custom directory and bundled with the job. Can be a script, configuration file, or any resource needed by the server.
- Parameters:
file_path – Path to the file or directory to add to server.
- Raises:
TypeError – If file_path is not a string.
Example
# Add a wrapper script to server recipe.add_server_file(“server_wrapper.sh”)
- add_server_input_filter(filter: Filter, tasks: List[str] | None = None)[source]
Add a filter to server for incoming task result from clients. .
- Parameters:
filter – the filter to be added
tasks – tasks that the filter applies to
Returns: None
- add_server_output_filter(filter: Filter, tasks: List[str] | None = None)[source]
Add a filter to the server for outgoing tasks to clients.
- Parameters:
filter – the filter to be added
tasks – tasks that the filter applies to
Returns: None
- enable_log_streaming(*file_names: str) None[source]
Enable live log streaming from clients to the server while the job runs.
Adds one
JobLogStreamerper file name to clients and a singleJobLogReceiverto the server. Streaming is still gated per-site byallow_log_streaminginresources.json.- Parameters:
*file_names – log file base names to stream. If omitted, defaults to
"log.json".
Example:
recipe.enable_log_streaming() # streams log.json recipe.enable_log_streaming("log.txt") # streams a single file recipe.enable_log_streaming("log.json", "log.txt") # streams both
- execute(env: ExecEnv, server_exec_params: dict | None = None, client_exec_params: dict | None = None) Run | None[source]
Execute or export the recipe based on command-line flags.
Transparently checks sys.argv for
--export/--export-dirwithout interfering with the caller’s own argument parser.python job.py→ run the jobpython job.py --export→ export to./fl_jobpython job.py --export --export-dir X→ export toX
- Parameters:
env – the execution environment
server_exec_params – execution params for the server
client_exec_params – execution params for clients
- Returns:
Run when executing; raises SystemExit(0) when exporting so callers need not guard against a None return value.
- export(job_dir: str, server_exec_params: dict | None = None, client_exec_params: dict | None = None, env: ExecEnv | None = None)[source]
Export the recipe to a job definition.
- Parameters:
job_dir – directory where the job will be exported to.
server_exec_params – execution params for the server
client_exec_params – execution params for clients
env – the environment that the exported job will be running in
Returns: None
- process_env(env: ExecEnv)[source]
Process environment-specific configuration.
Subclasses can override to add environment-specific processing. Script validation is handled by each ExecEnv subclass in deploy().
- run(env: ExecEnv, server_exec_params: dict | None = None, client_exec_params: dict | None = None) Run[source]
Run the recipe in a specified execution environment.
- Parameters:
env – the execution environment
server_exec_params – execution params for the server
client_exec_params – execution params for clients
Returns: Run to get job ID and execution results