Source code for nvflare.private.fed.client.client_engine_executor_spec

# Copyright (c) 2021, NVIDIA CORPORATION.  All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import time
from abc import ABC, abstractmethod
from typing import List, Union

from nvflare.apis.client_engine_spec import ClientEngineSpec
from nvflare.apis.engine_spec import EngineSpec
from nvflare.apis.fl_context import FLContext
from nvflare.apis.shareable import Shareable
from nvflare.apis.workspace import Workspace
from nvflare.widgets.widget import Widget


[docs]class TaskAssignment(object): def __init__(self, name: str, task_id: str, data: Shareable): """Init TaskAssignment. Keeps track of information about the assignment of a task, including the time that it was created after being fetched by the Client Run Manager. Args: name: task name task_id: task id data: the Shareable data for the task assignment """ self.name = name self.task_id = task_id self.data = data self.receive_time = time.time()
[docs]class ClientEngineExecutorSpec(ClientEngineSpec, EngineSpec, ABC): """The ClientEngineExecutorSpec defines the ClientEngine APIs running in the child process."""
[docs] @abstractmethod def get_task_assignment(self, fl_ctx: FLContext, timeout=None) -> TaskAssignment: pass
[docs] @abstractmethod def send_task_result(self, result: Shareable, fl_ctx: FLContext, timeout=None) -> bool: pass
[docs] @abstractmethod def get_workspace(self) -> Workspace: pass
[docs] @abstractmethod def get_widget(self, widget_id: str) -> Widget: pass
[docs] @abstractmethod def get_all_components(self) -> dict: pass
[docs] @abstractmethod def register_aux_message_handler(self, topic: str, message_handle_func): """Register aux message handling function with specified topics. Exception is raised when: a handler is already registered for the topic; bad topic - must be a non-empty string bad message_handle_func - must be callable Implementation Note: This method should simply call the ClientAuxRunner's register_aux_message_handler method. Args: topic: the topic to be handled by the func message_handle_func: the func to handle the message. Must follow aux_message_handle_func_signature. """ pass
[docs] @abstractmethod def send_aux_request( self, targets: Union[None, str, List[str]], topic: str, request: Shareable, timeout: float, fl_ctx: FLContext, optional=False, secure: bool = False, ) -> dict: """Send a request to Server via the aux channel. Implementation: simply calls the ClientAuxRunner's send_aux_request method. Args: targets: aux messages targets. None or empty list means the server. topic: topic of the request request: request to be sent timeout: number of secs to wait for replies. 0 means fire-and-forget. fl_ctx: FL context optional: whether the request is optional secure: should the request sent in the secure way Returns: a dict of reply Shareable in the format of: { site_name: reply_shareable } """ pass
[docs] @abstractmethod def fire_and_forget_aux_request( self, topic: str, request: Shareable, fl_ctx: FLContext, optional=False, secure=False ) -> Shareable: """Send an async request to Server via the aux channel. Args: topic: topic of the request request: request to be sent fl_ctx: FL context optional: whether the request is optional Returns: """ pass
[docs] @abstractmethod def build_component(self, config_dict): """Build a component from the config_dict. Args: config_dict: config dict """ pass
[docs] @abstractmethod def abort_app(self, job_id: str, fl_ctx: FLContext): """Abort the running FL App on the client. Args: job_id: current_job_id fl_ctx: FLContext """ pass