Source code for nvflare.app_common.logging.site_log_streamer

# Copyright (c) 2026, NVIDIA CORPORATION.  All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import os
import threading

from nvflare.apis.event_type import EventType
from nvflare.apis.fl_constant import (
    FLContextKey,
    JobConstants,
    ProcessType,
    ReservedKey,
    StreamCtxKey,
    WorkspaceConstants,
)
from nvflare.apis.fl_context import FLContext
from nvflare.apis.job_def import JobMetaKey
from nvflare.apis.workspace import Workspace
from nvflare.app_common.logging.constants import LIVE_LOG_TOPIC, Channels, is_log_streaming_allowed
from nvflare.app_common.streamers.log_streamer import LogStreamer
from nvflare.security.logging import secure_format_exception
from nvflare.widgets.widget import Widget

_LOG_STREAMER_PATH = "nvflare.app_common.logging.job_log_streamer.JobLogStreamer"


[docs] class SiteLogStreamer(Widget): """Site-level widget that injects a :class:`JobLogStreamer` into every job that does not already declare one. Place this in the client's ``resources.json`` so that live log streaming is provided automatically for every job — without requiring each job to include a ``JobLogStreamer`` in its own configuration. On ``BEFORE_JOB_LAUNCH`` (after the job config is deployed to disk but before the job subprocess starts) ``SiteLogStreamer`` reads the deployed ``config_fed_client.json``. If no ``JobLogStreamer`` component is found, it appends one with the configured parameters and writes the file back. The job subprocess then picks up the modified config and ``JobLogStreamer`` runs inside the job as if the user had declared it explicitly. When configured for ``error_log.txt``, ``SiteLogStreamer`` also uploads a post-run snapshot from ``CLIENT_PARENT`` on ``JOB_COMPLETED``. This preserves error-log delivery for launch/config/bootstrap failures where the job subprocess never reaches ``START_RUN`` and therefore never loads the injected ``JobLogStreamer``. The server side must have a :class:`~nvflare.app_common.logging.job_log_receiver.JobLogReceiver` in its ``resources.json`` (or job config) to receive and store the stream. Args: log_file_name: base name of the log file to stream. Defaults to ``WorkspaceConstants.LOG_FILE_NAME`` (``"log.txt"``). liveness_interval: seconds between heartbeat messages when no new log bytes have been written (default 10.0). Must be strictly less than the receiver's ``idle_timeout``. poll_interval: seconds between polls when no new data has been written to the log (default 0.5). """ def __init__( self, log_file_name: str = WorkspaceConstants.LOG_FILE_NAME, liveness_interval: float = 10.0, poll_interval: float = 0.5, ): super().__init__() self._log_file_name = log_file_name self._liveness_interval = liveness_interval self._poll_interval = poll_interval self.register_event_handler(EventType.BEFORE_JOB_LAUNCH, self._on_before_job_launch) self.register_event_handler(EventType.JOB_COMPLETED, self._on_job_completed) def _on_before_job_launch(self, event_type: str, fl_ctx: FLContext): job_meta = fl_ctx.get_prop(FLContextKey.JOB_META) if not job_meta: return job_id = job_meta.get(JobMetaKey.JOB_ID) if not job_id: return workspace_root = fl_ctx.get_prop(FLContextKey.WORKSPACE_ROOT) client_name = fl_ctx.get_identity_name() if not workspace_root or not client_name: return workspace = Workspace(root_dir=workspace_root, site_name=client_name) config_path = os.path.join(workspace.get_app_config_dir(job_id), JobConstants.CLIENT_JOB_CONFIG) if not os.path.exists(config_path): return try: with open(config_path) as f: cfg = json.load(f) except Exception as ex: self.log_exception( fl_ctx, f"Failed to read {config_path}; skipping log streamer injection: {secure_format_exception(ex)}", ) return components = cfg.get("components") if components is None: components = [] cfg["components"] = components # Site-level kill switch. When the site's resources.json doesn't enable # log streaming, strip any pre-declared JobLogStreamer from the deployed # job config and skip injection. if not is_log_streaming_allowed(fl_ctx): filtered = [c for c in components if "JobLogStreamer" not in c.get("path", "")] if len(filtered) != len(components): cfg["components"] = filtered # Persist atomically so the on-disk config is never partially # written. If the write fails, raise so the failure is captured # in FLContextKey.EXCEPTIONS rather than silently leaving the # original (un-stripped) JobLogStreamer config on disk. self._atomic_write_json(config_path, cfg) self.log_warning( fl_ctx, f"Removed JobLogStreamer from job {job_id}: site does not allow log streaming", ) else: self.log_debug( fl_ctx, f"Job {job_id}: site does not allow log streaming; not injecting JobLogStreamer", ) return for c in components: if "JobLogStreamer" in c.get("path", ""): self.log_debug(fl_ctx, f"Job {job_id} already has JobLogStreamer; skipping injection") return # Build the component entry. args = {} # This streamer is injected by a site-level component, and its receiver # lives in the server parent resources. Keep explicit job-level # JobLogStreamer instances scoped to server.<job_id>, but send this # injected stream to the parent JobLogReceiver. args["target_parent_server"] = True if self._log_file_name != WorkspaceConstants.LOG_FILE_NAME: args["log_file_name"] = self._log_file_name if self._liveness_interval != 10.0: args["liveness_interval"] = self._liveness_interval if self._poll_interval != 0.5: args["poll_interval"] = self._poll_interval entry = { "id": "auto_log_streamer", "path": _LOG_STREAMER_PATH, } if args: entry["args"] = args components.append(entry) try: self._atomic_write_json(config_path, cfg) except Exception as ex: self.log_exception( fl_ctx, f"Failed to write {config_path}; log streamer not injected: {secure_format_exception(ex)}", ) return self.log_info(fl_ctx, f"Injected JobLogStreamer into job {job_id}") @staticmethod def _atomic_write_json(path: str, data: dict) -> None: """Write JSON to ``path`` atomically via tempfile + os.replace. The original file is left untouched until the rename succeeds, so a partial write cannot corrupt it. Raises on any failure. """ tmp_path = f"{path}.{os.getpid()}.tmp" try: with open(tmp_path, "w") as f: json.dump(data, f, indent=2) os.replace(tmp_path, path) except Exception: try: os.remove(tmp_path) except OSError: pass raise def _stream_completed_log(self, fl_ctx: FLContext, log_path: str, client_name: str, job_id: str): stop_event = threading.Event() stop_event.set() try: LogStreamer.stream_log( channel=Channels.LOG_STREAMING_CHANNEL, topic=LIVE_LOG_TOPIC, stream_ctx={ StreamCtxKey.CLIENT_NAME: client_name, StreamCtxKey.JOB_ID: job_id, }, targets=["server"], file_name=log_path, fl_ctx=fl_ctx, stop_event=stop_event, poll_interval=self._poll_interval, liveness_interval=self._liveness_interval, ) except Exception as ex: self.log_exception( fl_ctx, f"Failed to upload completed log '{self._log_file_name}' for job {job_id}: " f"{secure_format_exception(ex)}", ) def _on_job_completed(self, event_type: str, fl_ctx: FLContext): if self._log_file_name != WorkspaceConstants.ERROR_LOG_FILE_NAME: return if fl_ctx.get_process_type() != ProcessType.CLIENT_PARENT: return if not is_log_streaming_allowed(fl_ctx): return workspace_root = fl_ctx.get_prop(FLContextKey.WORKSPACE_ROOT) client_name = fl_ctx.get_identity_name(default="") or fl_ctx.get_prop(FLContextKey.CLIENT_NAME) job_id = fl_ctx.get_prop(FLContextKey.CURRENT_JOB_ID) or fl_ctx.get_job_id() if not workspace_root or not client_name or not job_id: return workspace = Workspace(root_dir=workspace_root, site_name=client_name) log_path = workspace.get_app_error_log_file_path(job_id) if not os.path.exists(log_path): self.log_info(fl_ctx, f"No error log file found for {client_name} job {job_id}") return # The event FLContext can be reused after the handler returns, so hand # the background uploader its own fresh context. # # CLIENT_PARENT serves multiple jobs over its lifetime, so the parent # engine's ctx_manager does NOT carry per-job RUN_NUM / IDENTITY_NAME # as sticky props. Without explicitly seeding them here, the fresh # context produced by engine.new_context() arrives at the receiver # with empty peer_ctx.get_job_id(), and JobLogReceiver routes the # uploaded error log under the literal "unknown" job_id — which then # fails with `StorageException: path .../jobs-storage/unknown is not # a valid directory`. Seeding these reserved keys preserves the # receiver's existing peer-context-as-trust-source security model. engine = fl_ctx.get_engine() stream_fl_ctx = engine.new_context() if engine else fl_ctx if stream_fl_ctx is not fl_ctx: stream_fl_ctx.put(key=ReservedKey.RUN_NUM, value=job_id, private=False, sticky=True) stream_fl_ctx.put(key=ReservedKey.IDENTITY_NAME, value=client_name, private=False, sticky=False) threading.Thread( target=self._stream_completed_log, args=(stream_fl_ctx, log_path, client_name, job_id), daemon=True, ).start() self.log_info(fl_ctx, f"Started completed error log upload for {client_name} job {job_id}: {log_path}")