Source code for nvflare.job_config.fed_app_config

# Copyright (c) 2024, NVIDIA CORPORATION.  All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import List

from nvflare.apis.executor import Executor
from nvflare.apis.impl.controller import Controller
from nvflare.apis.impl.wf_comm_server import WFCommServer
from nvflare.job_config.base_app_config import BaseAppConfig
from nvflare.private.fed.client.client_json_config import _ExecutorDef
from nvflare.private.fed.server.server_json_config import WorkFlow


[docs]class ClientAppConfig(BaseAppConfig): """ClientAppConfig represents the ClientApp inside the Job. It holds the BaseAppConfig components and the task executors components data for the ClientApp. """ def __init__(self) -> None: super().__init__() self.executors: [_ExecutorDef] = []
[docs] def add_executor(self, tasks: List[str], executor: Executor): if not isinstance(executor, Executor): raise RuntimeError(f"workflow must be type of Executor, but got {executor.__class__}") e = _ExecutorDef() e.tasks = tasks e.executor = executor self.executors.append(e)
[docs]class ServerAppConfig(BaseAppConfig): """ServerAppConfig represents the ServerApp inside the Job. it holds the BaseAppConfig components and the workflow components data for the ServerApp. """ def __init__(self) -> None: super().__init__() self.workflows: [Controller] = [] self.ids = []
[docs] def add_workflow(self, cid, controller: Controller): if not isinstance(controller, Controller): raise RuntimeError(f"workflow must be type of Controller, but got {controller.__class__}") # self.add_component(cid, controller) if cid in self.components.keys() or cid in self.ids: raise RuntimeError(f"Component with ID:{cid} already exist.") communicator = WFCommServer() self.handlers.append(communicator) controller.set_communicator(communicator) self.workflows.append(WorkFlow(cid, controller)) self.ids.append(cid)
[docs]class FedAppConfig: """FedAppConfig represents the App information inside the Job. It contains either a ServerApp, or a ClientApp, or both of them. """ def __init__(self, server_app: ServerAppConfig = None, client_app: ClientAppConfig = None) -> None: super().__init__() if server_app and not isinstance(server_app, ServerAppConfig): raise ValueError(f"server_app must be type of ServerAppConfig, but got {server_app.__class__}") if client_app and not isinstance(client_app, ClientAppConfig): raise ValueError(f"client_app must be type of ClientAppConfig, but got {client_app.__class__}") self.server_app: ServerAppConfig = server_app self.client_app: ClientAppConfig = client_app