Source code for nvflare.private.fed.server.message_send

# Copyright (c) 2023, NVIDIA CORPORATION.  All rights reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# See the License for the specific language governing permissions and
# limitations under the License.

from nvflare.fuel.f3.cellnet.core_cell import FQCN
from nvflare.fuel.f3.cellnet.core_cell import Message as CellMessage
from nvflare.fuel.f3.cellnet.core_cell import TargetMessage
from nvflare.private.admin_defs import Message
from nvflare.private.defs import CellChannel, new_cell_message

[docs]class ClientReply(object): def __init__(self, client_token: str, client_name: str, req: Message, reply: Message): """Client reply. Args: client_token (str): client token client_name (str): name of the client req (Message): request reply (Message): reply """ self.client_token = client_token self.client_name = client_name self.request = req self.reply = reply
[docs]def send_requests( cell, command: str, requests: dict, clients, job_id=None, timeout_secs=2.0, optional=False ) -> [ClientReply]: """Send requests to clients. NOTE:: This method is to be used by a Command Handler to send requests to Clients. Hence, it is run in the Command Handler's handling thread. This is a blocking call - returned only after all responses are received or timeout. Args: cell: the source cell command: the command to be sent clients: the clients the command will be sent to requests: A dict of requests: {client token: request or list of requests} job_id: id of the job that the command is applied to timeout_secs: how long to wait for reply before timeout optional: whether the message is optional Returns: A list of ClientReply """ if not isinstance(requests, dict): raise TypeError("requests must be a dict but got {}".format(type(requests))) if len(requests) == 0: return [] target_msgs = {} name_to_token = {} name_to_req = {} for token, req in requests.items(): client = clients.get(token) if not client: continue if job_id: fqcn = FQCN.join([, job_id]) channel = CellChannel.CLIENT_COMMAND optional = True else: fqcn = channel = CellChannel.CLIENT_MAIN target_msgs[] = TargetMessage( target=fqcn, channel=channel, topic=command, message=new_cell_message({}, req) ) name_to_token[] = token name_to_req[] = req if not target_msgs: return [] if timeout_secs <= 0.0: # this is fire-and-forget! cell.fire_multi_requests_and_forget(target_msgs, optional=optional) return [] else: result = [] replies = cell.broadcast_multi_requests(target_msgs, timeout_secs, optional=optional) for name, reply in replies.items(): assert isinstance(reply, CellMessage) result.append( ClientReply( client_token=name_to_token[name], client_name=name, req=name_to_req[name], reply=reply.payload ) ) return result