Source code for nvflare.app_common.tracking.log_writer

# Copyright (c) 2023, NVIDIA CORPORATION.  All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from abc import ABC, abstractmethod
from typing import Optional

from nvflare.apis.analytix import ANALYTIC_EVENT_TYPE, AnalyticsDataType, LogWriterName
from nvflare.apis.event_type import EventType
from nvflare.apis.fl_component import FLComponent
from nvflare.apis.fl_context import FLContext
from nvflare.app_common.widgets.streaming import AnalyticsSender


[docs] class LogWriter(FLComponent, ABC): def __init__(self, event_type: str = ANALYTIC_EVENT_TYPE, metrics_sender_id: str = None): super().__init__() self.event_type = event_type self.metrics_sender_id = metrics_sender_id self.sender = None self.engine = None
[docs] def handle_event(self, event_type: str, fl_ctx: FLContext): if event_type == EventType.ABOUT_TO_START_RUN: engine = fl_ctx.get_engine() if self.metrics_sender_id: self.sender = engine.get_component(self.metrics_sender_id) if self.sender is None: self.system_panic("Cannot load MetricsSender!", fl_ctx=fl_ctx) self.sender.writer = self.get_writer_name() else: self.sender = AnalyticsSender(self.event_type, self.get_writer_name()) self.sender.engine = engine
[docs] def write(self, tag: str, value, data_type: AnalyticsDataType, global_step: Optional[int] = None, **kwargs): """Writes a record. Args: tag (str): Tag name value: Value to send data_type (AnalyticsDataType): Data type of the value being sent global_step (optional, int): Global step value. Raises: TypeError: global_step must be an int """ self.sender.add(tag=tag, value=value, data_type=data_type, global_step=global_step, **kwargs)
[docs] @abstractmethod def get_writer_name(self) -> LogWriterName: pass
[docs] def get_default_metric_data_type(self) -> AnalyticsDataType: return AnalyticsDataType.METRICS