Source code for nvflare.apis.impl.seq_relay_manager

# Copyright (c) 2021, NVIDIA CORPORATION.  All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import time
from typing import Tuple

from nvflare.apis.controller_spec import ClientTask, Task, TaskCompletionStatus
from nvflare.apis.fl_context import FLContext
from nvflare.apis.shareable import ReservedHeaderKey, Shareable

from .task_manager import TaskCheckStatus, TaskManager

_KEY_DYNAMIC_TARGETS = "__dynamic_targets"
_KEY_TASK_ASSIGN_TIMEOUT = "__task_assignment_timeout"
_KEY_TASK_RESULT_TIMEOUT = "__task_result_timeout"
_KEY_LAST_SEND_IDX = "__last_send_idx"
_PENDING_CLIENT_TASK = "__pending_client_task"


[docs]class SequentialRelayTaskManager(TaskManager): def __init__(self, task: Task, task_assignment_timeout, task_result_timeout, dynamic_targets: bool): """Task manager for relay controller on SendOrder.SEQUENTIAL. Args: task (Task): an instance of Task task_assignment_timeout (int): timeout value on a client requesting its task task_result_timeout (int): timeout value on reply of one client dynamic_targets (bool): allow clients to join after this task starts """ TaskManager.__init__(self) if task_assignment_timeout is None: task_assignment_timeout = 0 if task_result_timeout is None: task_result_timeout = 0 task.props[_KEY_DYNAMIC_TARGETS] = dynamic_targets task.props[_KEY_TASK_ASSIGN_TIMEOUT] = task_assignment_timeout task.props[_KEY_TASK_RESULT_TIMEOUT] = task_result_timeout task.props[_KEY_LAST_SEND_IDX] = -1 # client index of last send task.props[_PENDING_CLIENT_TASK] = None
[docs] def check_task_send(self, client_task: ClientTask, fl_ctx: FLContext) -> TaskCheckStatus: """Determine whether the task should be sent to the client. Args: client_task (ClientTask): the task processing state of the client fl_ctx (FLContext): fl context that comes with the task request Returns: TaskCheckStatus: NO_BLOCK for not sending the task, BLOCK for waiting, SEND for OK to send """ client_name = client_task.client.name task = client_task.task if task.props[_KEY_DYNAMIC_TARGETS]: if task.targets is None: task.targets = [] if client_name not in task.targets: self.logger.debug("client_name: {} added to task.targets".format(client_name)) task.targets.append(client_name) # is this client eligible? if client_name not in task.targets: # this client is not a target return TaskCheckStatus.NO_BLOCK # adjust client window win_start_idx, win_end_idx = self._determine_window(task) self.logger.debug("win_start_idx={}, win_end_idx={}".format(win_start_idx, win_end_idx)) if win_start_idx < 0: # wait for this task to end by the monitor return TaskCheckStatus.BLOCK # see whether this client is in the window for i in range(win_start_idx, win_end_idx): if client_name == task.targets[i]: # this client is in the window! self.logger.debug("last_send_idx={}".format(i)) task.props[_KEY_LAST_SEND_IDX] = i return TaskCheckStatus.SEND # this client is not in the window return TaskCheckStatus.NO_BLOCK
def _determine_window(self, task: Task) -> Tuple[int, int]: """Returns two indexes (starting/ending) of a window of client candidates. When starting is negative and ending is 0, the window is closed and the task should exit When both starting and ending are negative, there is no client candidate as current client task has not returned Args: task (Task): an instance of Task Returns: Tuple[int, int]: starting and ending indices of a window of client candidates. """ # adjust client window task_result_timeout = task.props[_KEY_TASK_RESULT_TIMEOUT] last_send_idx = task.props[_KEY_LAST_SEND_IDX] last_send_target = task.targets[last_send_idx] if last_send_idx >= 0 and last_send_target in task.last_client_task_map: # see whether the result has been received last_task = task.last_client_task_map[last_send_target] self.logger.debug("last_task={}".format(last_task)) if last_task.result_received_time is None: # result has not been received # should this client timeout? if task_result_timeout and time.time() - last_task.task_sent_time > task_result_timeout: # timeout! # we give up on this client and move to the next target win_start_idx = last_send_idx + 1 win_start_time = last_task.task_sent_time + task_result_timeout self.logger.debug( "client task result timed out. win_start_idx={}, win_start_time={}".format( win_start_idx, win_start_time ) ) else: # continue to wait self.logger.debug("keep waiting on task={}".format(task)) return -1, -1 else: # result has been received! win_start_idx = last_send_idx + 1 win_start_time = last_task.result_received_time self.logger.debug( "result received. win_start_idx={}, win_start_time={}".format(win_start_idx, win_start_time) ) else: # nothing has been sent win_start_idx = 0 win_start_time = task.schedule_time self.logger.debug( "nothing has been sent. win_start_idx={}, win_start_time={}".format(win_start_idx, win_start_time) ) num_targets = 0 if task.targets is None else len(task.targets) if num_targets and win_start_idx >= num_targets: # we reached the end of targets # so task should exit return -1, 0 task_assignment_timeout = task.props[_KEY_TASK_ASSIGN_TIMEOUT] if task_assignment_timeout: win_size = int((time.time() - win_start_time) / task_assignment_timeout) + 1 else: win_size = 1 self.logger.debug("win_size={}".format(win_size)) win_end_idx = win_start_idx + win_size # Should exit if win extends past the entire target list + 1 if task_assignment_timeout and win_end_idx > num_targets + 1: return -1, 0 if win_end_idx > num_targets: win_end_idx = num_targets self.logger.debug("win_end_idx={}".format(win_end_idx)) return win_start_idx, win_end_idx
[docs] def check_task_exit(self, task: Task) -> Tuple[bool, TaskCompletionStatus]: """Determine whether the task should exit. Args: task (Task): an instance of Task Returns: Tuple[bool, TaskCompletionStatus]: first entry in the tuple means whether to exit the task or not. If it's True, the task should exit. second entry in the tuple indicates the TaskCompletionStatus. """ # are we waiting for any client? win_start_idx, win_end_idx = self._determine_window(task) self.logger.debug("check_task_exit: win_start_idx={}, win_end_idx={}".format(win_start_idx, win_end_idx)) if win_start_idx < 0 and win_end_idx == 0: last_send_idx = task.props[_KEY_LAST_SEND_IDX] last_send_target = task.targets[last_send_idx] if last_send_idx >= 0 and last_send_target in task.last_client_task_map: # see whether the result has been received last_client_task = task.last_client_task_map[last_send_target] if last_client_task.result_received_time is not None: return True, TaskCompletionStatus.OK return True, TaskCompletionStatus.TIMEOUT else: return False, TaskCompletionStatus.IGNORED
[docs] def check_task_result(self, result: Shareable, client_task: ClientTask, fl_ctx: FLContext): """Check the result received from the client. See whether the client_task is the last one in the task's list If not, then it is a late response and ReservedHeaderKey.REPLY_IS_LATE is set to True in result's header. Args: result (Shareable): an instance of Shareable client_task (ClientTask): the task processing state of the client fl_ctx (FLContext): fl context that comes with the task request """ # see whether the client_task is the last one in the task's list # If not, then it is a late response task = client_task.task if client_task != task.client_tasks[-1]: result.set_header(key=ReservedHeaderKey.REPLY_IS_LATE, value=True)