FedJob API¶
The FLARE FedJob
API allows users to Pythonically define and create job configurations.
Core Concepts¶
Use the
to
routine to assign objects (e.g. Controller, ScriptRunner, Executor, PTModel, Filters, Components etc.) to the server or clients.Objects can define how they are added to the job by implementing
add_to_fed_job
, otherwise they are added as components.Export the job to a configuration with
export_job
.Run the job in the simulator with
simulator_run
.
Table overview of the FedJob
API:
API |
Description |
API Doc Link |
---|---|---|
to |
Assign object to target. |
|
to_server |
Assign object to server. |
|
to_clients |
Assign object to all clients. |
|
set_up_client |
To be used in FedJob subclasses. Setup routine called by FedJob when first sending object to a client target. |
|
as_id |
Return generated uuid of object. Object will be added as component if referenced. |
|
simulator_run |
Run the job with the simulator. |
|
export_job |
Export the job configuration. |
Here is an example of how to create a simple cifar10_fedavg job using the FedJob
API.
We assign a FedAvg controller and the initial PyTorch model to the server, and assign a ScriptExecutor for our training script to the clients.
Then we use the simulator to run the job:
from src.net import Net
from nvflare.app_common.widgets.intime_model_selector import IntimeModelSelector
from nvflare.app_common.workflows.fedavg import FedAvg
from nvflare.app_opt.pt.job_config.model import PTModel
from nvflare.job_config.api import FedJob
from nvflare.job_config.script_runner import ScriptRunner
if __name__ == "__main__":
n_clients = 2
num_rounds = 2
train_script = "src/cifar10_fl.py"
# Create the FedJob
job = FedJob(name="cifar10_fedavg")
# Define the FedAvg controller workflow and send to server
controller = FedAvg(
num_clients=n_clients,
num_rounds=num_rounds,
)
job.to_server(controller)
# Define the initial global model with PTModel wrapper and send to server
job.to_server(PTModel(Net()))
# Add model selection widget and send to server
job.to_server(IntimeModelSelector(key_metric="accuracy"))
# Send ScriptRunner to all clients
runner = ScriptRunner(
script=train_script, script_args="f--batch_size 32 --data_path /tmp/data/site-{i}"
)
job.to_clients(runner)
# job.export_job("/tmp/nvflare/jobs/job_config")
job.simulator_run("/tmp/nvflare/jobs/workdir", n_clients=n_clients)
Initializing the FedJob¶
Initialize the FedJob
object with the following arguments:
name
(str): for job name.min_clients
(int): required for the job, will be set in the meta.json.mandatory_clients
(List[str]): to run the job, will be set in the meta.json.
Example:
job = FedJob(name="cifar10_fedavg", min_clients=2, mandatory_clients=["site-1", "site-2"])
Assigning objects with to
¶
Assign objects with to
for a specific target
,
to_server
for the server, and
to_clients
for all the clients.
These functions have the following parameters which are used depending on the type of object:
obj
(any): The object to be assigned. The obj will be given a default id if none is provided based on its type.target
(str): (Forto
) The target location of the object. Can be “server” or a client name, e.g. “site-1”.**kwargs
: if the object implements theadd_to_fed_job
method,kwargs
are additional args to be passed to this function. See the specific object’s section for more details.
Note
In order for the FedJob to use the values of arguments passed into the obj
, the arguments must be set as instance variables of the same name (or prefixed with “_”) in the constructor.
Below we cover in-depth how different types of objects are handled when using to
:
Controller¶
If the object is a Controller
sent to the server, the controller is added to the server app workflows.
Example:
controller = FedAvg(
num_clients=n_clients,
num_rounds=num_rounds,
)
job.to(controller, "server")
If the object is a Controller
sent to a client, the controller is added to the client app components as a client-side controller.
The controller can then be used by the ClientControllerExecutor
.
ScriptRunner¶
The ScriptRunner
can be added to clients and is used to run or launch a script.
The tasks
parameter specifies the tasks the script is defined the handle (defaults to “[*]” for all tasks).
ScriptRunner args:
script
: the script to run, will automatically be added to the custom folder.script_args
: arguments appended to the end of script.launch_external_process
: two modes, default in-process (launch_external_process=False) and ex-process (launch_external_process=True).command
: in the ex-process mode, command is prepended to the script (defaults to “python3”).framework
: determines whatFrameworkType
to use for the script.
Example:
# in-process: runs `__main__` of "src/cifar10_fl.py" with argv "--batch_size 32"
in_process_runner = ScriptRunner(
script="src/cifar10_fl.py",
script_args="--batch_size 32"
)
job.to(in_process_runner, "site-1", tasks=["train"])
# subprocess: runs `python3 -u custom/src/cifar10_fl.py --batch_size 32`
external_process_runner = ScriptRunner(
script="src/cifar10_fl.py",
script_args="--batch_size 32",
launch_external_process=True,
command="python3 -u"
)
job.to(external_process_runner, "site-2", tasks=["train"])
For more details on how the ScriptRunner internally configures the InProcessClientAPIExecutor
or ClientAPILauncherExecutor
, refer to its
add_to_fed_job
implementation.
A dictionary of component ids added is also returned to be used if needed.
Executor¶
If the object is an Executor
, it must be sent to a client. The executor is added to the client app executors.
The tasks
parameter specifies the tasks that the executor is defined the handle (defaults to “[*]” for all tasks).
Example:
executor = MyExecutor()
job.to(executor, "site-1", tasks=["train"])
Resource (str)¶
If the object is a str, it is treated as an external resource and will be included in the custom directory.
If the object is a script, it will be copied to the custom directory.
If the object is a directory, the directory will be copied flat to the custom directory.
Example:
job.to("src/cifar10_fl.py", "site-1") # script
job.to("content_dir", "site-1") # directory
Filter¶
If the object is a Filter
,
Users must specify the
filter_type
as either FilterType.TASK_RESULT (flow from executor to controller) or FilterType.TASK_DATA (flow from controller to executor).The filter will be added task_data_filters and task_result_filters accordingly and be applied to the specified
tasks
(defaults to “[*]” for all tasks).
Example:
pp_filter = PercentilePrivacy(percentile=10, gamma=0.01)
job.to(pp_filter, "site-1", tasks=["train"], filter_type=FilterType.TASK_RESULT)
Model Wrappers¶
Model Wrappers PTModel
and TFModel
are used for adding a model with persistor.
PTModel
: for PyTorch models (torch.nn.Module) we add aPTFileModelPersistor
andPTFileModelLocator
, and return a dictionary for these added component ids.TFModel
: for TensorFlow models (tf.keras.Model) we add aTFModelPersistor
and return the added persistor id.
Example:
component_ids = job.to(PTModel(Net()), "server")
For other types of models, the model and persistor can be added explicitly as components.
Components¶
For any object that does not fall under any of the previous types and does not implement add_to_fed_job
, then it is added as a component with id
.
The
id
can be either specified as a parameter, or it will be automatically assigned.If adding a component with a previously used id, then the id will be incremented (e.g. “component_id1”, “component_id2”) and returned.
Components may reference other components by id.
Example:
job.to_server(IntimeModelSelector(key_metric="accuracy"))
In the case that an id generated by as_id
, is referenced by another added object, this the referenced object will also be added as a component.
In the example below, comp2 is assigned to the server. Since comp1 was referenced in comp2 with as_id
, comp1 will also be added as a component to the server.
Example:
comp1 = Component1()
comp2 = Component2(sub_component_id=job.as_id(comp1))
job.to(comp2, "server")
add_to_fed_job¶
If the obj implements the add_to_fed_job
method, it will be called with the kwargs. The implementation of add_to_fed_job is specific to the obj being added.
This method must follow this signature:
add_to_fed_job(job, ctx, ...)
Many of the object types covered in the above sections have implemented add_to_fed_job as they either have special cases or server as wrappers to add additional related components.
As shown in the table below, the Object Developer FedJob API provides functions to add components, Controllers, Executors, Filters, and resources.
The Job Context ctx
should simply be passed to these “add_xxx” methods, and does need to be accessed.
Additionally, the check_kwargs function can check and enforce required arguments in the kwargs.
Note
When adding other components, a good practice is to return the ids of the extra components added in case they might be needed elsewhere.
Example of TFModel
add_to_fed_job
:
def add_to_fed_job(self, job, ctx):
"""This method is used by Job API.
Args:
job: the Job object to add to
ctx: Job Context
Returns:
dictionary of ids of component added
"""
if isinstance(self.model, tf.keras.Model): # if model, create a TF persistor
persistor = TFModelPersistor(model=self.model)
persistor_id = job.add_component(comp_id="persistor", obj=persistor, ctx=ctx)
return persistor_id
else:
raise ValueError(
f"Unable to add {self.model} to job with TFModelPersistor. Expected tf.keras.Model but got {type(self.model)}."
)
API |
Description |
API Doc Link |
---|---|---|
add_component |
Add a component to the job. |
|
add_controller |
Add a Controller object to the job. |
|
add_executor |
Add an executor to the job. |
|
add_filter |
Add a filter to the job. |
|
add_resources |
Add resources to the job. |
|
check_kwargs |
Check kwargs for arguments. Raise Error if required arg is missing, or unexpected arg is given. |
Job Pattern Inheritance¶
Job inheritance can be useful when there are common patterns that can be reused in many jobs.
When subclassing FedJob, any number of objects can be sent to the server in the __init__,
and set_up_client
can be implemented to send objects to clients.
set_up_client
is called by FedJob when first sending object to a client target, as the specific client targets can vary.
For example of a Job pattern, we can use FedAvgJob
to simplify the creation of a FedAvg job.
The FedAvgJob automatically adds the FedAvg controller, PTFileModelPersistor and IntimeModelSelector, resulting in the following experience:
job = FedAvgJob(name="cifar10_fedavg", num_rounds=num_rounds, n_clients=n_clients, initial_model=Net())
For more examples of job patterns, see:
FedAvgJob
(pytorch)FedAvgJob
(tensorflow)FlowerJob
Note
Some of the default components included in these patterns are different, always refer to the exported job configs for a full list of components used at every site.
Running the Job¶
Simulator¶
Run the FedJob with the simulator with simulator_run
in the workspace
, with n_clients
, threads
, and gpu
assignments.
Note
Only set n_clients
if you have not specified clients using to
.
Example:
job.simulator_run(workspace="/tmp/nvflare/jobs/workdir", n_clients=2, threads=2, gpu="0,1")
Export Configuration¶
We can export the job configuration with export_job
to the job_root
directory to be used in other modes.
Example:
job.export_job(job_root="/tmp/nvflare/jobs/job_config")
Examples¶
To see examples of how the FedJob API can be used for different applications, refer the Getting Started and Job API examples.