nvflare.app_common.widgets.streaming module

class AnalyticsReceiver(events: List[str] | None = None)[source]

Bases: Widget, ABC

Receives analytic data.

Parameters:

events (optional, List[str]) – A list of event that this receiver will handle.

abstract finalize(fl_ctx: FLContext)[source]

Finalizes the receiver.

Called after EventType.END_RUN.

Parameters:

fl_ctx (FLContext) – fl context.

handle_event(event_type: str, fl_ctx: FLContext)[source]

Handles events.

Parameters:
  • event_type (str) – event type fired by workflow.

  • fl_ctx (FLContext) – FLContext information.

abstract initialize(fl_ctx: FLContext)[source]

Initializes the receiver.

Called after EventType.START_RUN.

Parameters:

fl_ctx (FLContext) – fl context.

abstract save(fl_ctx: FLContext, shareable: Shareable, record_origin: str)[source]

Saves the received data.

Specific implementations of AnalyticsReceiver will implement save in their own way.

Parameters:
  • fl_ctx (FLContext) – fl context.

  • shareable (Shareable) – the received message.

  • record_origin (str) – the sender of this message / record.

class AnalyticsSender(event_type='analytix_log_stats', writer_name=LogWriterName.TORCH_TB)[source]

Bases: Widget

Sender for analytics data.

This class has some legacy methods that implement some common methods following signatures from PyTorch SummaryWriter. New code should use TBWriter instead, which contains an AnalyticsSender.

Parameters:
  • event_type (str) – event type to fire (defaults to “analytix_log_stats”).

  • writer_name – the log writer for syntax information (defaults to LogWriterName.TORCH_TB)

add(tag: str, value, data_type: AnalyticsDataType, global_step: int | None = None, **kwargs)[source]

Create and send a DXO by firing an event.

Parameters:
  • tag (str) – Tag name

  • value (_type_) – Value to send

  • data_type (AnalyticsDataType) – Data type of the value being sent

  • global_step (optional, int) – Global step value.

Raises:

TypeError – global_step must be an int

add_scalar(tag: str, scalar: float, global_step: int | None = None, **kwargs)[source]

Legacy method to send a scalar.

This follows the signature from PyTorch SummaryWriter and is here in case it is used in previous code. If you are writing new code, use TBWriter instead.

Parameters:
  • tag (str) – Data identifier.

  • scalar (float) – Value to send.

  • global_step (optional, int) – Global step value.

  • **kwargs – Additional arguments to pass to the receiver side.

add_scalars(tag: str, scalars: dict, global_step: int | None = None, **kwargs)[source]

Legacy method to send scalars.

This follows the signature from PyTorch SummaryWriter and is here in case it is used in previous code. If you are writing new code, use TBWriter instead.

Parameters:
  • tag (str) – The parent name for the tags.

  • scalars (dict) – Key-value pair storing the tag and corresponding values.

  • global_step (optional, int) – Global step value.

  • **kwargs – Additional arguments to pass to the receiver side.

close()[source]

Close resources.

flush()[source]

Legacy method to flush out the message.

This follows the signature from PyTorch SummaryWriter and is here in case it is used in previous code. If you are writing new code, use TBWriter instead.

This does nothing, it is defined to mimic the PyTorch SummaryWriter.

get_writer_name() LogWriterName[source]
handle_event(event_type: str, fl_ctx: FLContext)[source]

Handles events.

Parameters:
  • event_type (str) – event type fired by workflow.

  • fl_ctx (FLContext) – FLContext information.