FedJob API

The FLARE FedJob API allows users to Pythonically define and create job configurations.

Core Concepts

  • Use the to routine to assign objects (e.g. Controller, ScriptRunner, Executor, PTModel, Filters, Components etc.) to the server or clients.

  • Objects can define how they are added to the job by implementing add_to_fed_job, otherwise they are added as components.

  • Export the job to a configuration with export_job.

  • Run the job in the simulator with simulator_run.

Table overview of the FedJob API:

FedJob API

API

Description

API Doc Link

to

Assign object to target.

to

to_server

Assign object to server.

to_server

to_clients

Assign object to all clients.

to_clients

set_up_client

To be used in FedJob subclasses. Setup routine called by FedJob when first sending object to a client target.

set_up_client

as_id

Return generated uuid of object. Object will be added as component if referenced.

as_id

simulator_run

Run the job with the simulator.

simulator_run

export_job

Export the job configuration.

export_job

Here is an example of how to create a simple cifar10_fedavg job using the FedJob API. We assign a FedAvg controller and the initial PyTorch model to the server, and assign a ScriptExecutor for our training script to the clients. Then we use the simulator to run the job:

from src.net import Net

from nvflare.app_common.widgets.intime_model_selector import IntimeModelSelector
from nvflare.app_common.workflows.fedavg import FedAvg
from nvflare.app_opt.pt.job_config.model import PTModel

from nvflare.job_config.api import FedJob
from nvflare.job_config.script_runner import ScriptRunner

if __name__ == "__main__":
    n_clients = 2
    num_rounds = 2
    train_script = "src/cifar10_fl.py"

    # Create the FedJob
    job = FedJob(name="cifar10_fedavg")

    # Define the FedAvg controller workflow and send to server
    controller = FedAvg(
        num_clients=n_clients,
        num_rounds=num_rounds,
    )
    job.to_server(controller)

    # Define the initial global model with PTModel wrapper and send to server
    job.to_server(PTModel(Net()))

    # Add model selection widget and send to server
    job.to_server(IntimeModelSelector(key_metric="accuracy"))

    # Send ScriptRunner to all clients
    runner = ScriptRunner(
        script=train_script, script_args="f--batch_size 32 --data_path /tmp/data/site-{i}"
    )
    job.to_clients(runner)

    # job.export_job("/tmp/nvflare/jobs/job_config")
    job.simulator_run("/tmp/nvflare/jobs/workdir", n_clients=n_clients)

Initializing the FedJob

Initialize the FedJob object with the following arguments:

  • name (str): for job name.

  • min_clients (int): required for the job, will be set in the meta.json.

  • mandatory_clients (List[str]): to run the job, will be set in the meta.json.

Example:

job = FedJob(name="cifar10_fedavg", min_clients=2, mandatory_clients=["site-1", "site-2"])

Assigning objects with to

Assign objects with to for a specific target, to_server for the server, and to_clients for all the clients.

These functions have the following parameters which are used depending on the type of object:

  • obj (any): The object to be assigned. The obj will be given a default id if none is provided based on its type.

  • target (str): (For to) The target location of the object. Can be “server” or a client name, e.g. “site-1”.

  • **kwargs: if the object implements the add_to_fed_job method, kwargs are additional args to be passed to this function. See the specific object’s section for more details.

Note

In order for the FedJob to use the values of arguments passed into the obj, the arguments must be set as instance variables of the same name (or prefixed with “_”) in the constructor.

Below we cover in-depth how different types of objects are handled when using to:

Controller

If the object is a Controller sent to the server, the controller is added to the server app workflows.

Example:

controller = FedAvg(
    num_clients=n_clients,
    num_rounds=num_rounds,
)
job.to(controller, "server")

If the object is a Controller sent to a client, the controller is added to the client app components as a client-side controller. The controller can then be used by the ClientControllerExecutor.

ScriptRunner

The ScriptRunner can be added to clients and is used to run or launch a script. The tasks parameter specifies the tasks the script is defined the handle (defaults to “[*]” for all tasks).

ScriptRunner args:

  • script: the script to run, will automatically be added to the custom folder.

  • script_args: arguments appended to the end of script.

  • launch_external_process: two modes, default in-process (launch_external_process=False) and ex-process (launch_external_process=True).

  • command: in the ex-process mode, command is prepended to the script (defaults to “python3”).

  • framework: determines what FrameworkType to use for the script.

Example:

# in-process: runs `__main__` of "src/cifar10_fl.py" with argv "--batch_size 32"
in_process_runner = ScriptRunner(
    script="src/cifar10_fl.py",
    script_args="--batch_size 32"
)
job.to(in_process_runner, "site-1", tasks=["train"])

# subprocess: runs `python3 -u custom/src/cifar10_fl.py --batch_size 32`
external_process_runner = ScriptRunner(
    script="src/cifar10_fl.py",
    script_args="--batch_size 32",
    launch_external_process=True,
    command="python3 -u"
)
job.to(external_process_runner, "site-2", tasks=["train"])

For more details on how the ScriptRunner internally configures the InProcessClientAPIExecutor or ClientAPILauncherExecutor, refer to its add_to_fed_job implementation. A dictionary of component ids added is also returned to be used if needed.

Executor

If the object is an Executor, it must be sent to a client. The executor is added to the client app executors. The tasks parameter specifies the tasks that the executor is defined the handle (defaults to “[*]” for all tasks).

Example:

executor = MyExecutor()
job.to(executor, "site-1", tasks=["train"])

Resource (str)

If the object is a str, it is treated as an external resource and will be included in the custom directory.

  • If the object is a script, it will be copied to the custom directory.

  • If the object is a directory, the directory will be copied flat to the custom directory.

Example:

job.to("src/cifar10_fl.py", "site-1") # script
job.to("content_dir", "site-1") # directory

Filter

If the object is a Filter,

  • Users must specify the filter_type as either FilterType.TASK_RESULT (flow from executor to controller) or FilterType.TASK_DATA (flow from controller to executor).

  • The filter will be added task_data_filters and task_result_filters accordingly and be applied to the specified tasks (defaults to “[*]” for all tasks).

Example:

pp_filter = PercentilePrivacy(percentile=10, gamma=0.01)
job.to(pp_filter, "site-1", tasks=["train"], filter_type=FilterType.TASK_RESULT)

Model Wrappers

Model Wrappers PTModel and TFModel are used for adding a model with persistor.

  • PTModel: for PyTorch models (torch.nn.Module) we add a PTFileModelPersistor and PTFileModelLocator, and return a dictionary for these added component ids.

  • TFModel: for TensorFlow models (tf.keras.Model) we add a TFModelPersistor and return the added persistor id.

Example:

component_ids = job.to(PTModel(Net()), "server")

For other types of models, the model and persistor can be added explicitly as components.

Components

For any object that does not fall under any of the previous types and does not implement add_to_fed_job, then it is added as a component with id.

  • The id can be either specified as a parameter, or it will be automatically assigned.

  • If adding a component with a previously used id, then the id will be incremented (e.g. “component_id1”, “component_id2”) and returned.

  • Components may reference other components by id.

Example:

job.to_server(IntimeModelSelector(key_metric="accuracy"))

In the case that an id generated by as_id, is referenced by another added object, this the referenced object will also be added as a component. In the example below, comp2 is assigned to the server. Since comp1 was referenced in comp2 with as_id, comp1 will also be added as a component to the server.

Example:

comp1 = Component1()
comp2 = Component2(sub_component_id=job.as_id(comp1))
job.to(comp2, "server")

add_to_fed_job

If the obj implements the add_to_fed_job method, it will be called with the kwargs. The implementation of add_to_fed_job is specific to the obj being added. This method must follow this signature:

add_to_fed_job(job, ctx, ...)

Many of the object types covered in the above sections have implemented add_to_fed_job as they either have special cases or server as wrappers to add additional related components.

As shown in the table below, the Object Developer FedJob API provides functions to add components, Controllers, Executors, Filters, and resources. The Job Context ctx should simply be passed to these “add_xxx” methods, and does need to be accessed. Additionally, the check_kwargs function can check and enforce required arguments in the kwargs.

Note

When adding other components, a good practice is to return the ids of the extra components added in case they might be needed elsewhere.

Example of TFModel add_to_fed_job:

def add_to_fed_job(self, job, ctx):
    """This method is used by Job API.

    Args:
        job: the Job object to add to
        ctx: Job Context

    Returns:
        dictionary of ids of component added
    """
    if isinstance(self.model, tf.keras.Model):  # if model, create a TF persistor
        persistor = TFModelPersistor(model=self.model)
        persistor_id = job.add_component(comp_id="persistor", obj=persistor, ctx=ctx)
        return persistor_id
    else:
        raise ValueError(
            f"Unable to add {self.model} to job with TFModelPersistor. Expected tf.keras.Model but got {type(self.model)}."
        )
FedJob Object Developer API

API

Description

API Doc Link

add_component

Add a component to the job.

add_component

add_controller

Add a Controller object to the job.

add_controller

add_executor

Add an executor to the job.

add_executor

add_filter

Add a filter to the job.

add_filter

add_resources

Add resources to the job.

add_resources

check_kwargs

Check kwargs for arguments. Raise Error if required arg is missing, or unexpected arg is given.

check_kwargs

Job Pattern Inheritance

Job inheritance can be useful when there are common patterns that can be reused in many jobs.

When subclassing FedJob, any number of objects can be sent to the server in the __init__, and set_up_client can be implemented to send objects to clients. set_up_client is called by FedJob when first sending object to a client target, as the specific client targets can vary.

For example of a Job pattern, we can use FedAvgJob to simplify the creation of a FedAvg job. The FedAvgJob automatically adds the FedAvg controller, PTFileModelPersistor and IntimeModelSelector, resulting in the following experience:

job = FedAvgJob(name="cifar10_fedavg", num_rounds=num_rounds, n_clients=n_clients, initial_model=Net())

For more examples of job patterns, see:

Note

Some of the default components included in these patterns are different, always refer to the exported job configs for a full list of components used at every site.

Running the Job

Simulator

Run the FedJob with the simulator with simulator_run in the workspace, with n_clients, threads, and gpu assignments.

Note

Only set n_clients if you have not specified clients using to.

Example:

job.simulator_run(workspace="/tmp/nvflare/jobs/workdir", n_clients=2, threads=2, gpu="0,1")

Export Configuration

We can export the job configuration with export_job to the job_root directory to be used in other modes.

Example:

job.export_job(job_root="/tmp/nvflare/jobs/job_config")

Examples

To see examples of how the FedJob API can be used for different applications, refer the Getting Started and Job API examples.