Hello Cross-Site Validation¶
Before You Start¶
Before jumping into this guide, make sure you have an environment with NVIDIA FLARE installed.
You can follow the installation guide on the general concept of setting up a Python virtual environment (the recommended environment) and how to install NVIDIA FLARE.
Prerequisite¶
This example builds on the Hello Scatter and Gather example
based on the ScatterAndGather
workflow.
Please make sure you go through it completely as the concepts are heavily tied.
Introduction¶
This tutorial is meant to solely demonstrate how the NVIDIA FLARE system works, without introducing any actual deep learning concepts.
Through this exercise, you will learn how to use NVIDIA FLARE with numpy to perform cross site validation after training.
The training process is explained in the Hello Scatter and Gather example.
Using simplified weights and metrics, you will be able to clearly see how NVIDIA FLARE performs validation across different sites with little extra work.
The setup of this exercise consists of one server and two clients.
The server side model starting with weights [[1, 2, 3], [4, 5, 6], [7, 8, 9]]
.
Cross site validation consists of the following steps:
During the initial phase of training with the
ScatterAndGather
workflow, NPTrainer saves the local model to disk for the clients.The
CrossSiteModelEval
workflow gets the client models with thesubmit_model
task.The
validate
task is broadcast to the all participating clients with the model shareable containing the model data, and results from thevalidate
task are saved.
During this exercise, we will see how NVIDIA FLARE takes care of most of the above steps with little work from the user.
We will be working with the hello-numpy-cross-val
application in the examples folder.
Custom FL applications can contain the folders:
custom: contains the custom components (
np_trainer.py
,np_model_persistor.py
,np_validator.py
,np_model_locator
,np_formatter
)config: contains client and server configurations (
config_fed_client.json
,config_fed_server.json
)resources: contains the logger config (
log.config
)
Let’s get started. First clone the repo, if you haven’t already:
$ git clone https://github.com/NVIDIA/NVFlare.git
Remember to activate your NVIDIA FLARE Python virtual environment from the installation guide. Ensure numpy is installed.
(nvflare-env) $ python3 -m pip install numpy
Now that you have all your dependencies installed, let’s implement the Federated Learning system.
Training¶
In the Hello Scatter and Gather example, we implemented the NPTrainer
object.
In this example, we use the same NPTrainer
but extend it to process the submit_model
task to
work with the CrossSiteModelEval
workflow to get the client models.
The code in np_trainer.py
saves the model to disk after each step of training in the model.
Note that the server also produces a global model.
The CrossSiteModelEval
workflow submits the server model for evaluation after the client models.
Implementing the Validator¶
The validator is an Executor that is called for validating the models received from the server during
the CrossSiteModelEval
workflow.
These models could be from other clients or models generated on server.
15import logging
16import time
17
18import numpy as np
19
20from nvflare.apis.dxo import DXO, DataKind, from_shareable
21from nvflare.apis.executor import Executor
22from nvflare.apis.fl_constant import ReturnCode
23from nvflare.apis.fl_context import FLContext
24from nvflare.apis.shareable import Shareable, make_reply
25from nvflare.apis.signal import Signal
26from nvflare.app_common.app_constant import AppConstants
27
28from .constants import NPConstants
29
30
31class NPValidator(Executor):
32 def __init__(
33 self,
34 epsilon=1,
35 sleep_time=0,
36 validate_task_name=AppConstants.TASK_VALIDATION,
37 ):
38 # Init functions of components should be very minimal. Init
39 # is called when json is read. A big init will cause json loading to halt
40 # for long time.
41 super().__init__()
42
43 self.logger = logging.getLogger("NPValidator")
44 self._random_epsilon = epsilon
45 self._sleep_time = sleep_time
46 self._validate_task_name = validate_task_name
47
48 def handle_event(self, event_type: str, fl_ctx: FLContext):
49 # if event_type == EventType.START_RUN:
50 # Create all major components here. This is a simple app that doesn't need any components.
51 # elif event_type == EventType.END_RUN:
52 # # Clean up resources (closing files, joining threads, removing dirs etc.)
53 pass
54
55 def execute(
56 self,
57 task_name: str,
58 shareable: Shareable,
59 fl_ctx: FLContext,
60 abort_signal: Signal,
61 ) -> Shareable:
62 # Any long tasks should check abort_signal regularly.
63 # Otherwise, abort client will not work.
64 count, interval = 0, 0.5
65 while count < self._sleep_time:
66 if abort_signal.triggered:
67 return make_reply(ReturnCode.TASK_ABORTED)
68 time.sleep(interval)
69 count += interval
70
71 if task_name == self._validate_task_name:
72 try:
73 # First we extract DXO from the shareable.
74 try:
75 model_dxo = from_shareable(shareable)
76 except Exception as e:
77 self.log_error(fl_ctx, f"Unable to extract model dxo from shareable. Exception: {e.__str__()}")
78 return make_reply(ReturnCode.BAD_TASK_DATA)
79
80 # Get model from shareable. data_kind must be WEIGHTS.
81 if model_dxo.data and model_dxo.data_kind == DataKind.WEIGHTS:
82 model = model_dxo.data
83 else:
84 self.log_error(
85 fl_ctx, "Model DXO doesn't have data or is not of type DataKind.WEIGHTS. Unable to validate."
86 )
87 return make_reply(ReturnCode.BAD_TASK_DATA)
88
89 # Check if key exists in model
90 if NPConstants.NUMPY_KEY not in model:
91 self.log_error(fl_ctx, "numpy_key not in model. Unable to validate.")
92 return make_reply(ReturnCode.BAD_TASK_DATA)
93
94 # The workflow provides MODEL_OWNER information in the shareable header.
95 model_name = shareable.get_header(AppConstants.MODEL_OWNER, "?")
96
97 # Print properties.
98 self.log_info(fl_ctx, f"Model: \n{model}")
99 self.log_info(fl_ctx, f"Task name: {task_name}")
100 self.log_info(fl_ctx, f"Client identity: {fl_ctx.get_identity_name()}")
101 self.log_info(fl_ctx, f"Validating model from {model_name}.")
102
103 # Check abort signal regularly.
104 if abort_signal.triggered:
105 return make_reply(ReturnCode.TASK_ABORTED)
106
107 # Do some dummy validation.
108 random_epsilon = np.random.random()
109 self.log_info(fl_ctx, f"Adding random epsilon {random_epsilon} in validation.")
110 val_results = {}
111 np_data = model[NPConstants.NUMPY_KEY]
112 np_data = np.sum(np_data / np.max(np_data))
113 val_results["accuracy"] = np_data + random_epsilon
114
115 # Check abort signal regularly.
116 if abort_signal.triggered:
117 return make_reply(ReturnCode.TASK_ABORTED)
118
119 self.log_info(fl_ctx, f"Validation result: {val_results}")
120
121 # Create DXO for metrics and return shareable.
122 metric_dxo = DXO(data_kind=DataKind.METRICS, data=val_results)
123 return metric_dxo.to_shareable()
124 except Exception as e:
125 self.log_exception(fl_ctx, f"Exception in NPValidator execute: {e}.")
126 return make_reply(ReturnCode.EXECUTION_EXCEPTION)
127 else:
128 return make_reply(ReturnCode.TASK_UNKNOWN)
The validator is an Executor and implements the execute function which receives a Shareable.
It handles the validate
task by performing a calculation to find the sum divided by the max of the data
and adding a random_epsilon
before returning the results packaged with a DXO into a Shareable.
Note
Note that in our hello-examples, we are demonstrating Federated Learning using data that does not have to do with deep learning.
NVIDIA FLARE can be used with any data packaged inside a Shareable object (subclasses dict
), and
DXO is recommended as a way to manage that data in a standard way.
Application Configuration¶
Inside the config folder there are two files, config_fed_client.json
and config_fed_server.json
.
1{
2 "format_version": 2,
3 "server": {
4 "heart_beat_timeout": 600
5 },
6 "task_data_filters": [],
7 "task_result_filters": [],
8 "components": [
9 {
10 "id": "persistor",
11 "path": "nvflare.app_common.np.np_model_persistor.NPModelPersistor",
12 "args": {}
13 },
14 {
15 "id": "shareable_generator",
16 "path": "nvflare.app_common.shareablegenerators.full_model_shareable_generator.FullModelShareableGenerator",
17 "args": {}
18 },
19 {
20 "id": "aggregator",
21 "path": "nvflare.app_common.aggregators.intime_accumulate_model_aggregator.InTimeAccumulateWeightedAggregator",
22 "args": {
23 "expected_data_kind": "WEIGHTS"
24 }
25 },
26 {
27 "id": "model_locator",
28 "path": "nvflare.app_common.np.np_model_locator.NPModelLocator",
29 "args": {}
30 },
31 {
32 "id": "formatter",
33 "path": "nvflare.app_common.np.np_formatter.NPFormatter",
34 "args": {}
35 },
36 {
37 "id": "json_generator",
38 "path": "nvflare.app_common.widgets.validation_json_generator.ValidationJsonGenerator",
39 "args": {}
40 }
41 ],
42 "workflows": [
43 {
44 "id": "scatter_and_gather",
45 "path": "nvflare.app_common.workflows.scatter_and_gather.ScatterAndGather",
46 "args": {
47 "min_clients": 2,
48 "num_rounds": 3,
49 "start_round": 0,
50 "wait_time_after_min_received": 10,
51 "aggregator_id": "aggregator",
52 "persistor_id": "persistor",
53 "shareable_generator_id": "shareable_generator",
54 "train_task_name": "train",
55 "train_timeout": 6000
56 }
57 },
58 {
59 "id": "cross_site_model_eval",
60 "path": "nvflare.app_common.workflows.cross_site_model_eval.CrossSiteModelEval",
61 "args": {
62 "model_locator_id": "model_locator",
63 "submit_model_timeout": 600,
64 "validation_timeout": 6000,
65 "cleanup_models": false
66 }
67 }
68 ]
69}
The server now has a second workflow configured after Scatter and Gather, CrossSiteModelEval
.
The components “model_locator” and “formatter” have been added to work with the cross site model evaluation workflow, and the rest is the same as in Hello Scatter and Gather.
1{
2 "format_version": 2,
3 "executors": [
4 {
5 "tasks": [
6 "train",
7 "submit_model"
8 ],
9 "executor": {
10 "path": "nvflare.app_common.np.np_trainer.NPTrainer",
11 "args": {}
12 }
13 },
14 {
15 "tasks": [
16 "validate"
17 ],
18 "executor": {
19 "path": "nvflare.app_common.np.np_validator.NPValidator"
20 }
21 }
22 ],
23 "task_result_filters": [],
24 "task_data_filters": [],
25 "components": []
26}
The client configuration now has more tasks and an additional Executor NPValidator
configured to handle the “validate” task.
The “submit_model” task has been added to the NPTrainer
Executor to work with the CrossSiteModelEval
workflow to get the client models.
Cross site validation!¶
Now you can use admin command prompt to submit and start this example job. To do this on a proof of concept local FL system, follow the sections Setting Up the Application Environment in POC Mode and Starting the Application Environment in POC Mode if you have not already.
Running the FL System¶
With the admin client command prompt successfully connected and logged in, enter the command below.
> submit_job hello-numpy-cross-val
Pay close attention to what happens in each of four terminals.
You can see how the admin submits the job to the server and how
the JobRunner
on the server
automatically picks up the job to deploy and start the run.
This command uploads the job configuration from the admin client to the server. A job id will be returned, and we can use that id to access job information.
Note
If we use submit_job [app] then that app will be treated as a single app job.
From time to time, you can issue check_status server
in the admin client to check the entire training progress.
You should now see how the training does in the very first terminal (the one that started the server).
During the first phase, the model will be trained.
During the second phase, cross site validation will happen.
The workflow on the client will change to CrossSiteModelEval
as it enters this second phase.
During cross site model evaluation, every client validates other clients’ models and server models (if present). This can produce a lot of results. All the results will be kept in the job’s workspace when it is completed.
Understanding the Output¶
After starting the server and clients, you should begin to see some outputs in each terminal tracking the progress of the FL run. As each client finishes training, it will start the cross site validation process. During this you’ll see several important outputs the track the progress of cross site validation.
The server shows the log of each client requesting models, the models it sends and the results received. Since the server could be responding to many clients at the same time, it may require careful examination to make proper sense of events from the jumbled logs.
Accessing the results¶
The results of each job will usually be stored inside the server side workspace.
Please refer to access server-side workspace for accessing the server side workspace.
Note
You could see the cross-site validation results
at [DOWNLOAD_DIR]/[JOB_ID]/workspace/cross_site_val/cross_val_results.json
Shutdown FL system¶
Once the FL run is complete and the server has successfully aggregated the client’s results after all the rounds, and
cross site model evaluation is finished, run the following commands in the fl_admin to shutdown the system (while
inputting admin
when prompted with password):
> shutdown client
> shutdown server
> bye
Congratulations!
You’ve successfully run your numpy federated learning system with cross site validation.
The full source code for this exercise can be found in examples/hello-numpy-cross-val.