Quickstart (Numpy - Cross Site Validation)

Before You Start

Before jumping into this QuickStart guide, make sure you have an environment with NVIDIA FLARE installed. You can follow installation on the general concept of setting up a Python virtual environment (the recommended environment) and how to install NVIDIA FLARE.

Prerequisite

This example builds on the Hello Numpy example based on the ScatterAndGather workflow. Please make sure you go through it completely as the concepts are heavily tied.

Introduction

This tutorial is meant to solely demonstrate how the NVIDIA FLARE system works, without introducing any actual deep learning concepts. Through this exercise, you will learn how to use NVIDIA FLARE with numpy to perform cross site validation after training. The training process is explained in the Hello Numpy example. Using simplified weights and metrics, you will be able to clearly see how NVIDIA FLARE performs validation across different sites with little extra work.

The design of this exercise follows on the Hello Numpy example which consists of one server and two clients starting with weights [[1, 2, 3], [4, 5, 6], [7, 8, 9]].

Cross site validation consists of the following steps:

  • During the initial phase of training with the ScatterAndGather workflow, NPTrainer saves the local model to disk for the clients.

  • The CrossSiteModelEval workflow gets the client models with the submit_model task.

  • The validate task is broadcast to the all participating clients with the model shareable containing the model data, and results from the validate task are saved.

During this exercise, we will see how NVIDIA FLARE takes care of most of the above steps with little work from the user. We will be working with the hello-numpy-cross-val application in the examples folder. Custom FL applications can contain the folders:

  1. custom: contains the custom components (np_trainer.py, np_model_persistor.py, np_validator.py, np_model_locator, np_formatter)

  2. config: contains client and server configurations (config_fed_client.json, config_fed_server.json)

  3. resources: contains the logger config (log.config)

Let’s get started. First clone the repo, if you haven’t already:

$ git clone https://github.com/NVIDIA/NVFlare.git

Remember to activate your NVIDIA FLARE Python virtual environment from the installation guide. Ensure numpy is installed.

(nvflare-env) $ python3 -m pip install numpy

Now that you have all your dependencies installed, let’s implement the Federated Learning system.

Training

In the Hello Numpy example, we implemented the NPTrainer object. In this example, we use the same NPTrainer but extend it to process the submit_model task to work with the CrossSiteModelEval workflow to get the client models.

The code in np_trainer.py saves the model to disk after each step of training in the model.

Note that the server also produces a global model. The CrossSiteModelEval workflow submits the server model for evaluation after the client models.

Implementing the Validator

The validator is an Executor that is called for validating the models received from the server during the CrossSiteModelEval workflow. These models could be from other clients or models generated on server.

np_validator.py
 15import logging
 16import time
 17
 18import numpy as np
 19
 20from constants import NPConstants
 21from nvflare.apis.dxo import from_shareable, DataKind, DXO
 22from nvflare.apis.executor import Executor
 23from nvflare.apis.fl_constant import ReturnCode
 24from nvflare.apis.fl_context import FLContext
 25from nvflare.apis.shareable import Shareable, make_reply
 26from nvflare.apis.signal import Signal
 27from nvflare.app_common.app_constant import AppConstants
 28
 29
 30class NPValidator(Executor):
 31    def __init__(
 32        self,
 33        epsilon=1,
 34        sleep_time=0,
 35        validate_task_name=AppConstants.TASK_VALIDATION,
 36    ):
 37        # Init functions of components should be very minimal. Init
 38        # is called when json is read. A big init will cause json loading to halt
 39        # for long time.
 40        super().__init__()
 41
 42        self.logger = logging.getLogger("NPValidator")
 43        self._random_epsilon = epsilon
 44        self._sleep_time = sleep_time
 45        self._validate_task_name = validate_task_name
 46
 47    def handle_event(self, event_type: str, fl_ctx: FLContext):
 48        # if event_type == EventType.START_RUN:
 49        #     Create all major components here. This is a simple app that doesn't need any components.
 50        # elif event_type == EventType.END_RUN:
 51        #     # Clean up resources (closing files, joining threads, removing dirs etc)
 52        pass
 53
 54    def execute(
 55        self,
 56        task_name: str,
 57        shareable: Shareable,
 58        fl_ctx: FLContext,
 59        abort_signal: Signal,
 60    ) -> Shareable:
 61        # Any long tasks should check abort_signal regularly. Otherwise abort client
 62        # will not work.
 63        count, interval = 0, 0.5
 64        while count < self._sleep_time:
 65            if abort_signal.triggered:
 66                return make_reply(ReturnCode.TASK_ABORTED)
 67            time.sleep(interval)
 68            count += interval
 69
 70        if task_name == self._validate_task_name:
 71            try:
 72                # First we extract DXO from the shareable.
 73                try:
 74                    model_dxo = from_shareable(shareable)
 75                except Exception as e:
 76                    self.log_error(fl_ctx, f"Unable to extract model dxo from shareable. Exception: {e.__str__()}")
 77                    return make_reply(ReturnCode.BAD_TASK_DATA)
 78
 79                # Get model from shareable. data_kind must be WEIGHTS.
 80                if model_dxo.data and model_dxo.data_kind == DataKind.WEIGHTS:
 81                    model = model_dxo.data
 82                else:
 83                    self.log_error(fl_ctx, f"Model DXO doesn't have data or is not of type DataKind.WEIGHTS. Unable  "
 84                                           "to validate.")
 85                    return make_reply(ReturnCode.BAD_TASK_DATA)
 86
 87                # Check if key exists in model
 88                if NPConstants.NUMPY_KEY not in model:
 89                    self.log_error(fl_ctx, "numpy_key not in model. Unable to validate.")
 90                    return make_reply(ReturnCode.BAD_TASK_DATA)
 91
 92                # The workflow provides MODEL_OWNER information in the shareable header.
 93                model_name = shareable.get_header(AppConstants.MODEL_OWNER, "?")
 94
 95                # Print properties.
 96                self.log_info(fl_ctx, f"Model: \n{model}")
 97                self.log_info(fl_ctx, f"Task name: {task_name}")
 98                self.log_info(fl_ctx, f"Client identity: {fl_ctx.get_identity_name()}")
 99                self.log_info(fl_ctx, f"Validating model from {model_name}.")
100
101                # Check abort signal regularly.
102                if abort_signal.triggered:
103                    return make_reply(ReturnCode.TASK_ABORTED)
104
105                # Do some dummy validation.
106                random_epsilon = np.random.random()
107                self.log_info(
108                    fl_ctx, f"Adding random epsilon {random_epsilon} in validation."
109                )
110                val_results = {}
111                np_data = model[NPConstants.NUMPY_KEY]
112                np_data = np.sum(np_data / np.max(np_data))
113                val_results["accuracy"] = np_data + random_epsilon
114
115                # Check abort signal regularly.
116                if abort_signal.triggered:
117                    return make_reply(ReturnCode.TASK_ABORTED)
118
119                self.log_info(fl_ctx, f"Validation result: {val_results}")
120
121                # Create DXO for metrics and return shareable.
122                metric_dxo = DXO(data_kind=DataKind.METRICS, data=val_results)
123                return metric_dxo.to_shareable()
124            except:
125                self.log_exception(fl_ctx, "Exception in NPValidator execute.")
126                return make_reply(ReturnCode.EXECUTION_EXCEPTION)
127        else:
128            return make_reply(ReturnCode.TASK_UNKNOWN)

The validator is an Executor and implements the execute function which receives a Shareable. It handles the validate task by performing a calculation to find the sum divided by the max of the data and adding a random random_epsilon before returning the results packaged with a DXO into a Shareable.

Note

Note that in our hello-examples, we are demonstrating Federated Learning using data that does not have to do with deep learning. NVIDIA FLARE can be used with any data packaged inside a Shareable object (subclasses dict), and DXO is recommended as a way to manage that data in a standard way.

Application Configuration

Inside the config folder there are two files, config_fed_client.json and config_fed_server.json.

config_fed_server.json
 1{
 2  "format_version": 2,
 3  "server": {
 4    "heart_beat_timeout": 600
 5  },
 6  "task_data_filters": [],
 7  "task_result_filters": [],
 8  "components": [
 9    {
10      "id": "persistor",
11      "path": "np_model_persistor.NPModelPersistor",
12      "args": {}
13    },
14    {
15      "id": "shareable_generator",
16      "path": "nvflare.app_common.shareablegenerators.full_model_shareable_generator.FullModelShareableGenerator",
17      "args": {}
18    },
19    {
20      "id": "aggregator",
21      "path": "nvflare.app_common.aggregators.intime_accumulate_model_aggregator.InTimeAccumulateWeightedAggregator",
22      "args": {
23        "expected_data_kind": "WEIGHTS"
24      }
25    },
26    {
27      "id": "model_locator",
28      "path": "np_model_locator.NPModelLocator",
29      "args": {}
30    },
31    {
32      "id": "formatter",
33      "path": "np_formatter.NPFormatter",
34      "args": {}
35    }
36  ],
37  "workflows": [
38    {
39      "id": "scatter_and_gather",
40      "path": "nvflare.app_common.workflows.scatter_and_gather.ScatterAndGather",
41      "args": {
42        "min_clients": 2,
43        "num_rounds": 3,
44        "start_round": 0,
45        "wait_time_after_min_received": 10,
46        "aggregator_id": "aggregator",
47        "persistor_id": "persistor",
48        "shareable_generator_id": "shareable_generator",
49        "train_task_name": "train",
50        "train_timeout": 6000
51      }
52    },
53    {
54      "id": "cross_site_model_eval",
55      "path": "nvflare.app_common.workflows.cross_site_model_eval.CrossSiteModelEval",
56      "args": {
57        "model_locator_id": "model_locator",
58        "submit_model_timeout": 600,
59        "validation_timeout": 6000,
60        "cleanup_models": false
61      }
62    }
63  ]
64}

The server now has a second workflow configured after Scatter and Gather, CrossSiteModelEval. The components “model_locator” and “formatter” have been added to work with the cross site model evaluation workflow, and the rest is the same as in Hello Numpy.

config_fed_client.json
 1{
 2  "format_version": 2,
 3  "executors": [
 4    {
 5      "tasks": [
 6        "train",
 7        "submit_model"
 8      ],
 9      "executor": {
10        "path": "np_trainer.NPTrainer",
11        "args": {}
12      }
13    },
14    {
15      "tasks": [
16        "validate"
17      ],
18      "executor": {
19        "path": "np_validator.NPValidator"
20      }
21    }
22  ],
23  "task_result_filters": [],
24  "task_data_filters": [],
25  "components": []
26}

The client configuration now has more tasks and an additional Executor NPValidator configured to handle the “validate” task. The “submit_model” task has been added to the NPTrainer Executor to work with the CrossSiteModelEval workflow to get the client models.

Cross site validation!

Now you can use admin commands to upload, deploy, and start this example app. To do this on a proof of concept local FL system, follow the sections Setting Up the Application Environment in POC Mode and Starting the Application Environment in POC Mode if you have not already.

Running the FL System

With the admin client command prompt successfully connected and logged in, enter the commands below in order. Pay close attention to what happens in each of four terminals. You can see how the admin controls the server and clients with each command.

> upload_app hello-numpy-cross-val

Uploads the application from the admin client to the server’s staging area.

> set_run_number 1

Creates a run directory in the workspace for the run_number on the server and all clients. The run directory allows for the isolation of different runs so the information in one particular run does not interfere with other runs.

> deploy_app hello-numpy-cross-val all

This will make the hello-numpy-cross-val application the active one in the run_number workspace. After the above two commands, the server and all the clients know the hello-numpy-cross-val application will reside in the run_1 workspace.

> start_app all

This start_app command instructs the NVIDIA FLARE server and clients to start training with the hello-numpy-cross-val application in that run_1 workspace.

From time to time, you can issue check_status server in the admin client to check the entire training progress. During the first phase, the model will be trained. During the second phase, cross site validation will happen. The workflow on the client will change to CrossSiteModelEval as it enters this second phase.

Accessing the results

During cross site model evaluation, every client validates other clients’ models and server models (if present). This can produce a lot of results. All the results are kept on the server in <run_dir>/cross_val_results.json. All the models sent to the server are also present in the <run_dir>/<client_uid>/ directory.

The results will be in the json format.

Understanding the Output

After starting the server and clients, you should begin to see some outputs in each terminal tracking the progress of the FL run. As each client finishes training, it will start the cross site validation process. Druing this you’ll see several important outputs the track the progress of cross site validation.

The server shows the log of each client requesting models, the models it sends and the results received. Since the server could be responding to many clients at the same time, it may require careful examination to make proper sense of events from the jumbled logs.

Once the FL run is complete and the server has successfully aggregated the client’s results after all the rounds, and cross site model evaluation is finished, run the following commands in the fl_admin to shutdown the system (while inputting admin when prompted with password):

> shutdown client
> shutdown server
> bye

In order to stop all processes, run ./stop_fl.sh.

Congratulations! You’ve successfully run your numpy federated learning system with cross site validation. The full source code for this exercise can be found in examples/hello-numpy-cross-val.