# Copyright (c) 2023, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import threading
import time
from typing import Dict, List, Optional, Union
from nvflare.apis.client import Client
from nvflare.apis.controller_spec import ClientTask, Task
from nvflare.apis.dxo import DXO, from_shareable
from nvflare.apis.fl_component import FLComponent
from nvflare.apis.fl_constant import ReturnCode
from nvflare.apis.fl_context import FLContext
from nvflare.apis.shareable import Shareable
from nvflare.apis.signal import Signal
from nvflare.app_common.workflows.error_handling_controller import ErrorHandlingController
[docs]class BroadcastAndWait(FLComponent):
def __init__(self, fl_ctx: FLContext, controller: ErrorHandlingController):
super().__init__()
self.lock = threading.Lock()
self.fl_ctx = fl_ctx
self.controller = controller
self.task = None
# [target, DXO]
self.results: Dict[str, DXO] = {}
[docs] def broadcast_and_wait(
self,
task_name: str,
task_input: Shareable,
fl_ctx: FLContext,
targets: Union[List[Client], List[str], None] = None,
task_props: Optional[Dict] = None,
min_responses: int = 1,
abort_signal: Signal = None,
) -> Dict[str, DXO]:
task = Task(name=task_name, data=task_input, result_received_cb=self.results_cb, props=task_props)
self.controller.broadcast_and_wait(task, fl_ctx, targets, min_responses, 0, abort_signal)
return self.results
[docs] def multicasts_and_wait(
self,
task_name: str,
task_inputs: Dict[str, Shareable],
fl_ctx: FLContext,
abort_signal: Signal = None,
task_check_period: int = 0.5,
) -> Dict[str, DXO]:
tasks: Dict[str, Task] = self.get_tasks(task_name, task_inputs)
for client_name in tasks:
self.controller.send(task=tasks[client_name], fl_ctx=fl_ctx, targets=[client_name])
while self.controller.get_num_standing_tasks():
if abort_signal.triggered:
self.log_info(fl_ctx, "Abort signal triggered. Finishing multicasts_and_wait.")
return
self.log_debug(fl_ctx, "Checking standing tasks to see if multicasts_and_wait finished.")
time.sleep(task_check_period)
return self.results
[docs] def get_tasks(self, task_name: str, task_inputs: Dict[str, Shareable]) -> Dict[str, Task]:
tasks = {}
for client_name in task_inputs:
task = Task(name=task_name, data=task_inputs[client_name], result_received_cb=self.results_cb)
tasks[client_name] = task
return tasks
[docs] def update_result(self, client_name: str, dxo: DXO):
try:
self.lock.acquire()
self.log_debug(self.fl_ctx, "Acquired a lock")
self.results.update({client_name: dxo})
finally:
self.log_debug(self.fl_ctx, "Released a lock")
self.lock.release()
[docs] def results_cb(self, client_task: ClientTask, fl_ctx: FLContext):
client_name = client_task.client.name
task_name = client_task.task.name
print("task_name", task_name)
self.log_info(fl_ctx, f"Processing {task_name}, {self.task} result from client {client_name}")
result = client_task.result
rc = result.get_return_code()
if rc == ReturnCode.OK:
self.log_info(fl_ctx, f"Received result from client:{client_name} for task {task_name} ")
dxo = from_shareable(result)
self.update_result(client_name, dxo)
else:
if rc in self.controller.abort_job_in_error.keys():
self.controller.handle_client_errors(rc, client_task, fl_ctx)
# Cleanup task result
client_task.result = None