Source code for nvflare.app_common.widgets.streaming

# Copyright (c) 2021, NVIDIA CORPORATION.  All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from abc import ABC, abstractmethod
from threading import Lock
from typing import List, Optional

from nvflare.apis.analytix import ANALYTIC_EVENT_TYPE, AnalyticsDataType, LogWriterName, TrackConst
from nvflare.apis.event_type import EventType
from nvflare.apis.fl_constant import EventScope, FLContextKey, ReservedKey
from nvflare.apis.fl_context import FLContext
from nvflare.apis.shareable import Shareable
from nvflare.apis.utils.analytix_utils import create_analytic_dxo, send_analytic_dxo
from nvflare.widgets.widget import Widget


[docs] class AnalyticsSender(Widget): def __init__(self, event_type=ANALYTIC_EVENT_TYPE, writer_name=LogWriterName.TORCH_TB): """Sender for analytics data. This class has some legacy methods that implement some common methods following signatures from PyTorch SummaryWriter. New code should use :py:class:`TBWriter <nvflare.app_opt.tracking.tb.tb_writer.TBWriter>` instead, which contains an AnalyticsSender. Args: event_type (str): event type to fire (defaults to "analytix_log_stats"). writer_name: the log writer for syntax information (defaults to LogWriterName.TORCH_TB) """ super().__init__() self.engine = None self.event_type = event_type self.writer = writer_name
[docs] def get_writer_name(self) -> LogWriterName: return self.writer
[docs] def handle_event(self, event_type: str, fl_ctx: FLContext): if event_type == EventType.ABOUT_TO_START_RUN: self.engine = fl_ctx.get_engine()
[docs] def add(self, tag: str, value, data_type: AnalyticsDataType, global_step: Optional[int] = None, **kwargs): """Create and send a DXO by firing an event. Args: tag (str): Tag name value (_type_): Value to send data_type (AnalyticsDataType): Data type of the value being sent global_step (optional, int): Global step value. Raises: TypeError: global_step must be an int """ kwargs = kwargs if kwargs else {} if global_step is not None: if not isinstance(global_step, int): raise TypeError(f"Expect global step to be an instance of int, but got {type(global_step)}") kwargs[TrackConst.GLOBAL_STEP_KEY] = global_step dxo = create_analytic_dxo(tag=tag, value=value, data_type=data_type, writer=self.get_writer_name(), **kwargs) with self.engine.new_context() as fl_ctx: send_analytic_dxo(self, dxo=dxo, fl_ctx=fl_ctx, event_type=self.event_type)
[docs] def close(self): """Close resources.""" if self.engine: self.engine = None
[docs] class AnalyticsReceiver(Widget, ABC): def __init__(self, events: Optional[List[str]] = None): """Receives analytic data. Args: events (optional, List[str]): A list of event that this receiver will handle. """ super().__init__() if events is None: events = [ANALYTIC_EVENT_TYPE, f"fed.{ANALYTIC_EVENT_TYPE}"] self.events = events self._initialized = False self._save_lock = Lock() self._end = False
[docs] @abstractmethod def initialize(self, fl_ctx: FLContext): """Initializes the receiver. Called after EventType.START_RUN. Args: fl_ctx (FLContext): fl context. """ pass
[docs] @abstractmethod def save(self, fl_ctx: FLContext, shareable: Shareable, record_origin: str): """Saves the received data. Specific implementations of AnalyticsReceiver will implement save in their own way. Args: fl_ctx (FLContext): fl context. shareable (Shareable): the received message. record_origin (str): the sender of this message / record. """ pass
[docs] @abstractmethod def finalize(self, fl_ctx: FLContext): """Finalizes the receiver. Called after EventType.END_RUN. Args: fl_ctx (FLContext): fl context. """ pass
[docs] def handle_event(self, event_type: str, fl_ctx: FLContext): if event_type == EventType.START_RUN: self._handle_start_run_event(fl_ctx) elif event_type in self.events: self._handle_data_event(event_type, fl_ctx) elif event_type == EventType.END_RUN: self._handle_end_run_event(fl_ctx)
def _handle_start_run_event(self, fl_ctx: FLContext): try: self.initialize(fl_ctx) except Exception as e: # catch the exception so the job can continue self.log_error(fl_ctx, f"Receiver initialize failed with {e}.", fire_event=False) return self._initialized = True def _handle_data_event(self, event_type: str, fl_ctx: FLContext): if self._initialized: if self._end: self.log_debug(fl_ctx, f"Already received end run event, drop event {event_type}.", fire_event=False) return data = fl_ctx.get_prop(FLContextKey.EVENT_DATA, None) if data is None: self.log_error(fl_ctx, "Missing event data.", fire_event=False) return if not isinstance(data, Shareable): self.log_error( fl_ctx, f"Expect data to be an instance of Shareable but got {type(data)}", fire_event=False ) return record_origin = self._get_record_origin(fl_ctx, data) if record_origin is None: self.log_error(fl_ctx, "record_origin can't be None.", fire_event=False) return try: with self._save_lock: self.save(shareable=data, fl_ctx=fl_ctx, record_origin=record_origin) except Exception as e: self.log_error(fl_ctx, f"Receiver save method failed with {e}.", fire_event=False) def _handle_end_run_event(self, fl_ctx: FLContext): if self._initialized: self._end = True try: with self._save_lock: self.finalize(fl_ctx) except Exception as e: # catch the exception so the job can continue self.log_error(fl_ctx, f"Receiver finalize failed with {e}.", fire_event=False) def _get_record_origin(self, fl_ctx: FLContext, data: Shareable) -> Optional[str]: if fl_ctx.get_prop(FLContextKey.EVENT_SCOPE) == EventScope.FEDERATION: return data.get_peer_prop(ReservedKey.IDENTITY_NAME, None) else: return fl_ctx.get_identity_name()