Source code for nvflare.edge.widgets.etr

# Copyright (c) 2025, NVIDIA CORPORATION.  All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from nvflare.apis.event_type import EventType
from nvflare.apis.fl_constant import FLContextKey
from nvflare.apis.fl_context import FLContext
from nvflare.edge.constants import EdgeContextKey, EdgeEventType, EdgeMsgTopic
from nvflare.fuel.f3.cellnet.cell import ReturnCode as CellReturnCode
from nvflare.fuel.f3.cellnet.defs import CellChannel
from nvflare.fuel.f3.cellnet.utils import make_reply
from nvflare.fuel.f3.message import Message as CellMessage
from nvflare.security.logging import secure_format_exception
from nvflare.widgets.widget import Widget


[docs] class EdgeTaskReceiver(Widget): """Edge Task Receiver (ETR) is to be used to receive edge requests dispatched from CP. ETR must be installed on CJ as a component in config_fed_client.json. Note: ETR does not process edge requests directly. It's another component's responsibility (e.g. ETE) to process edge requests. ETR indirectly interacts with request-processing component (also installed on the CJ) via Flare Events: EdgeEventType.EDGE_REQUEST_RECEIVED for receiving task requests; """ def __init__(self): Widget.__init__(self) self.engine = None self.register_event_handler(EventType.START_RUN, self._handle_start_run) def _handle_start_run(self, event_type: str, fl_ctx: FLContext): self.engine = fl_ctx.get_engine() cell = self.engine.get_cell() cell.register_request_cb( channel=CellChannel.EDGE_REQUEST, topic=EdgeMsgTopic.TASK_REQUEST, cb=self._receive_edge_request, event_type=EdgeEventType.EDGE_TASK_REQUEST_RECEIVED, ) cell.register_request_cb( channel=CellChannel.EDGE_REQUEST, topic=EdgeMsgTopic.SELECTION_REQUEST, cb=self._receive_edge_request, event_type=EdgeEventType.EDGE_SELECTION_REQUEST_RECEIVED, ) cell.register_request_cb( channel=CellChannel.EDGE_REQUEST, topic=EdgeMsgTopic.RESULT_REPORT, cb=self._receive_edge_request, event_type=EdgeEventType.EDGE_RESULT_REPORT_RECEIVED, ) def _receive_edge_request(self, request: CellMessage, event_type: str): with self.engine.new_context() as fl_ctx: assert isinstance(fl_ctx, FLContext) try: # place the cell message into fl_ctx in case it's needed by process_edge_request. fl_ctx.set_prop(EdgeContextKey.REQUEST_FROM_EDGE, request.payload, private=True, sticky=False) self.engine.fire_event(event_type, fl_ctx) exception = fl_ctx.get_prop(FLContextKey.EXCEPTIONS) if exception: return make_reply(CellReturnCode.PROCESS_EXCEPTION) reply = fl_ctx.get_prop(EdgeContextKey.REPLY_TO_EDGE) if not reply: self.logger.debug("no reply for edge request") return make_reply(CellReturnCode.PROCESS_EXCEPTION) else: self.logger.debug("sending back edge result") return make_reply(CellReturnCode.OK, body=reply) except Exception as ex: self.log_error(fl_ctx, f"exception from receive_edge_request: {secure_format_exception(ex)}") return make_reply(CellReturnCode.PROCESS_EXCEPTION)