Source code for nvflare.app_opt.pt.job_config.fed_sag_mlflow

# Copyright (c) 2024, NVIDIA CORPORATION.  All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import List, Optional

import torch.nn as nn

from nvflare.apis.dxo import DataKind
from nvflare.app_common.aggregators import InTimeAccumulateWeightedAggregator
from nvflare.app_common.shareablegenerators import FullModelShareableGenerator
from nvflare.app_common.workflows.scatter_and_gather import ScatterAndGather
from nvflare.app_opt.pt.job_config.base_fed_job import BaseFedJob
from nvflare.app_opt.tracking.mlflow.mlflow_receiver import MLflowReceiver
from nvflare.app_opt.tracking.mlflow.mlflow_writer import MLflowWriter


[docs] class SAGMLFlowJob(BaseFedJob): def __init__( self, initial_model: nn.Module, n_clients: int, num_rounds: int, name: str = "fed_job", min_clients: int = 1, mandatory_clients: Optional[List[str]] = None, key_metric: str = "accuracy", tracking_uri=None, kwargs=None, artifact_location=None, ): """PyTorch ScatterAndGather with MLFlow Job. Configures server side ScatterAndGather controller, persistor with initial model, and widgets. User must add executors. Args: initial_model (nn.Module): initial PyTorch Model n_clients (int): number of clients for this job num_rounds (int): number of rounds for FedAvg name (name, optional): name of the job. Defaults to "fed_job" min_clients (int, optional): the minimum number of clients for the job. Defaults to 1. mandatory_clients (List[str], optional): mandatory clients to run the job. Default None. key_metric (str, optional): Metric used to determine if the model is globally best. if metrics are a `dict`, `key_metric` can select the metric used for global model selection. Defaults to "accuracy". kwargs: kwargs dict """ super().__init__(initial_model, name, min_clients, mandatory_clients, key_metric) shareable_generator = FullModelShareableGenerator() shareable_generator_id = self.to_server(shareable_generator, id="shareable_generator") aggregator_id = self.to_server( InTimeAccumulateWeightedAggregator(expected_data_kind=DataKind.WEIGHTS), id="aggregator" ) component = MLflowReceiver(tracking_uri=tracking_uri, kw_args=kwargs, artifact_location=artifact_location) self.to_server(id="mlflow_receiver_with_tracking_uri", obj=component) controller = ScatterAndGather( min_clients=n_clients, num_rounds=num_rounds, wait_time_after_min_received=10, aggregator_id=aggregator_id, persistor_id=self.comp_ids["persistor_id"], shareable_generator_id=shareable_generator_id, ) self.to_server(controller)
[docs] def set_up_client(self, target: str): super().set_up_client(target) ml_flow_writer = MLflowWriter(event_type="event_type") self.to(id="log_writer", obj=ml_flow_writer, target=target)