# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Federated Averaging for Logistic Regression with Newton-Raphson method
using Numpy
"""
from typing import List, Optional
import numpy as np
from nvflare.apis.fl_constant import FLMetaKey
from nvflare.app_common.abstract.fl_model import FLModel
from nvflare.app_common.aggregators.weighted_aggregation_helper import WeightedAggregationHelper
from nvflare.app_common.app_constant import AppConstants
from nvflare.app_common.np.constants import NPConstants
from nvflare.app_common.np.np_model_persistor import NPModelPersistor
from nvflare.app_common.workflows.base_fedavg import BaseFedAvg
from nvflare.app_common.workflows.lr.np_persistor import LRModelPersistor
[docs]
class FedAvgLR(BaseFedAvg):
def __init__(
self,
damping_factor: float,
epsilon: float = 1.0,
model_dir: str = "models",
model_name: str = "weights.npy",
n_features: int = 13,
aggregator: WeightedAggregationHelper = WeightedAggregationHelper(),
persistor: Optional[NPModelPersistor] = None,
*args,
**kwargs,
):
"""
Initialize the FedAvgLR class for Federated Averaging with Newton-Raphson optimization.
Args:
damping_factor (float): Damping factor for Newton-Raphson updates, used to control the step size.
epsilon (float, optional): Regularization factor to avoid empty Hessian matrix inversion. Defaults to 1.0.
model_dir (str, optional): Directory to save and load the model. Defaults to "models".
model_name (str, optional): Name of the model file. Defaults to "weights.npy".
n_features (int, optional): Number of features in the dataset. Defaults to 13.
aggregator (WeightedAggregationHelper, optional): Helper for weighted aggregation of model updates.
persistor (Optional[NPModelPersistor], optional): Custom persistor for model saving and loading. If not provided, a default LRModelPersistor is used.
*args: Additional positional arguments passed to the base class.
**kwargs: Additional keyword arguments passed to the base class.
This class implements the Federated Averaging algorithm using the Newton-Raphson method for optimization.
It supports flexible model persistence through customizable persistors, allowing integration with different
storage backends or model formats.
"""
super().__init__(*args, **kwargs)
self.damping_factor = damping_factor
self.epsilon = epsilon
self.model_dir = model_dir
self.model_name = model_name
self.n_features = n_features
self.aggregator = aggregator
self._default_persistor = LRModelPersistor(
model_dir=self.model_dir, model_name=self.model_name, n_features=self.n_features
)
self.persistor = persistor
[docs]
def run(self) -> None:
"""
The run function executes the logic of federated
second order Newton-Raphson optimization.
"""
self.info("starting Federated Averaging Newton Raphson ...")
# First load the model and set up some training params.
# A `persisitor` (NewtonRaphsonModelPersistor) will load
# the model in `ModelLearnable` format, then will be
# converted `FLModel` by `ModelController`.
#
model = self.load_model()
model.start_round = self.start_round
model.total_rounds = self.num_rounds
self.info("Server side model loader: {}".format(model))
for self.current_round in range(self.start_round, self.start_round + self.num_rounds):
self.info(f"Round {self.current_round} started.")
# Get the list of clients.
clients = self.sample_clients(self.num_clients)
model.current_round = self.current_round
# Send training task and current global model to clients.
#
# A `task` instance will be created, and sent
# to clients, the model is first converted to a shareable
# and is attached to the task.
#
# After the task is finished, the result (shareable) received
# from the task is converted to FLModel, and is returned to the
# server. The `results` below is a list with result (FLModel)
# from all clients.
#
# The full logic of `task` is implemented in:
# https://github.com/NVIDIA/NVFlare/blob/d6827bca96d332adb3402ceceb4b67e876146067/nvflare/app_common/workflows/model_controller.py#L178
#
self.info("sending server side global model to clients")
results = self.send_model_and_wait(targets=clients, data=model)
# Aggregate results received from clients.
aggregate_results = self.aggregate(results, aggregate_fn=self.newton_raphson_aggregator_fn)
# Update global model based on the following formula:
# weights = weights + updates, where
# updates = -damping_factor * Hessian^{-1} . Gradient
self.update_model(model, aggregate_results)
# Save global model.
self.save_model(model)
self.info("Finished FedAvg.")
[docs]
def newton_raphson_aggregator_fn(self, results: List[FLModel]):
"""
Custom aggregator function for second order Newton-Raphson
optimization.
This uses the default thread-safe WeightedAggregationHelper,
which implement a weighted average of all values received from
a `result` dictionary.
Args:
results: a list of `FLModel`s. Each `FLModel` is received
from a client. The field `params` is a dictionary that
contains values to be aggregated: the gradient and hessian.
"""
self.info("received results from clients: {}".format(results))
# On client side the `NUM_STEPS_CURRENT_ROUND` key
# is used to track the number of samples for each client.
for curr_result in results:
self.aggregator.add(
data=curr_result.params,
weight=curr_result.meta.get(FLMetaKey.NUM_STEPS_CURRENT_ROUND, 1.0),
contributor_name=curr_result.meta.get("client_name", AppConstants.CLIENT_UNKNOWN),
contribution_round=curr_result.current_round,
)
aggregated_dict = self.aggregator.get_result()
# self.info("aggregated result: {}".format(aggregated_dict))
# Compute global model update:
# update = - damping_factor * Hessian^{-1} . Gradient
# A regularization is added to avoid empty hessian.
#
reg = self.epsilon * np.eye(aggregated_dict["hessian"].shape[0])
newton_raphson_updates = self.damping_factor * np.linalg.solve(
aggregated_dict["hessian"] + reg, aggregated_dict["gradient"]
)
self.info("Newton-Raphson updates: {}".format(newton_raphson_updates))
# Convert the aggregated result to `FLModel`, this `FLModel`
# will then be used by `update_model` method from the base class,
# to update the global model weights.
#
aggr_result = FLModel(
params={"newton_raphson_updates": newton_raphson_updates},
params_type=results[0].params_type,
meta={
"nr_aggregated": len(results),
AppConstants.CURRENT_ROUND: results[0].current_round,
AppConstants.NUM_ROUNDS: self.num_rounds,
},
)
return aggr_result
[docs]
def update_model(self, model, model_update, replace_meta=True) -> FLModel:
"""
Update logistic regression parameters based on
aggregated gradient and hessian.
"""
if model_update is None:
return model
if model_update.params is None:
raise ValueError("model params is None")
if replace_meta:
model.meta = model_update.meta
else:
model.meta.update(model_update.meta)
model.metrics = model_update.metrics
model.params[NPConstants.NUMPY_KEY] += model_update.params["newton_raphson_updates"]
return model
[docs]
def load_model(self) -> FLModel:
if self.persistor is None:
self.persistor = self._default_persistor
model = super().load_model()
if model is None:
raise ValueError("model can't be None")
if model.params is None:
raise ValueError("model params is None")
return model
[docs]
def save_model(self, model: FLModel) -> None:
if self.persistor is None:
self.persistor = self._default_persistor
super().save_model(model)