Source code for nvflare.app_common.logging.job_log_receiver

# Copyright (c) 2026, NVIDIA CORPORATION.  All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import shutil
import tempfile
import threading
from collections import OrderedDict

from nvflare.apis.event_type import EventType
from nvflare.apis.fl_constant import ReturnCode, SystemComponents, WorkspaceConstants
from nvflare.apis.fl_context import FLContext
from nvflare.apis.storage import DataTypes
from nvflare.apis.streaming import StreamContext
from nvflare.app_common.logging.constants import ALLOW_LOG_STREAMING_VAR, LIVE_LOG_TOPIC, Channels
from nvflare.app_common.streamers.log_streamer import LogStreamer
from nvflare.widgets.widget import Widget

# Keys for per-stream state stored in StreamContext
_KEY_RECV_FILE = "JobLogReceiver.recv_file"
_KEY_RECV_PATH = "JobLogReceiver.recv_path"

# Cap on the number of (client, job_id) pairs we remember for "log once"
# tracking. The receiver is a long-lived server-side widget; without a cap,
# the set would grow without bound across job churn. When the cap is hit,
# the oldest entry is evicted (FIFO), at the cost of possibly re-logging a
# repeat violation from a long-quiescent job.
_UNAUTHORIZED_LOG_CAP = 4096


[docs] class JobLogReceiver(Widget): """Receives live log data streamed by :class:`JobLogStreamer`. ``JobLogReceiver`` accepts a live stream: each chunk is written directly to its final file as it arrives so that the log can be followed with ``tail -f`` on the server while the job runs. When the stream closes (normal EOF, job abort, or idle timeout) the file is handed to the job manager for storage. The destination file is written to ``{dest_dir}/{job_id}/{client_name}/{log_file_name}``, making it easy to locate and tail during a run. This widget can be placed in either of two locations: **Job-level configuration** (``config_fed_server.json``) Add it via ``job.to_server(JobLogReceiver())`` in the Job API, or declare it in the job's server config. In this mode the handler is registered on ``START_RUN`` (and on ``ABOUT_TO_START_RUN`` where available) so that the stream handler is wired up before any client chunk can arrive. The widget is only active for that specific job. **System-level resources** (``resources.json`` on the server) Declare it as a system component so it is instantiated when the server process starts. In the long-lived parent process the handler is registered on ``SYSTEM_START`` and remains active across jobs. In per-job server subprocesses (where ``SYSTEM_START`` does not fire) the same instance re-registers on every ``START_RUN`` against the new job-scoped ``ObjectStreamer``. The stream handler may be (re-)registered on every triggering event; ``registry.set`` is idempotent for the same channel/topic pair. Args: dest_dir: directory where incoming log files are written. Defaults to the system temporary directory. idle_timeout: seconds without any message (data or heartbeat) before the receiver declares the sender dead and closes the stream (default 30.0). Set to 0 to disable. """ def __init__(self, dest_dir: str = None, idle_timeout: float = 30.0): super().__init__() self._dest_dir = dest_dir self._idle_timeout = idle_timeout # Tracks (client, job_id) pairs we've already logged a "site does not allow # streaming" error for, so the warning is emitted at most once per job. # Bounded with FIFO eviction (see _UNAUTHORIZED_LOG_CAP) and guarded by a # lock so the check-then-add is atomic across concurrent stream chunks. self._unauthorized_logged: OrderedDict = OrderedDict() self._unauthorized_lock = threading.Lock() # Trigger on every event that may bring up a fresh ObjectStreamer: # - SYSTEM_START fires once in the long-lived server parent process. # - ABOUT_TO_START_RUN fires only on the client side, but listing it # keeps the receiver usable if it is ever placed there. # - START_RUN is the only one that fires in the server-job subprocess # (see ServerRunner.run) and is also the point at which an in-process # deployment swaps engine.run_manager to the job's RunManager. # Re-registering on each trigger is safe: the underlying registry.set # is idempotent for the same channel/topic, and registering against a # new ObjectStreamer is exactly what we want when the run manager has # been replaced. self.register_event_handler( [EventType.SYSTEM_START, EventType.ABOUT_TO_START_RUN, EventType.START_RUN], self._register, ) def _effective_dest_dir(self) -> str: return self._dest_dir or tempfile.gettempdir() @staticmethod def _sanitize_path_component(name: str) -> str: """Strip path separators and traversal sequences from a single path component.""" # Use only the base name to prevent directory traversal via '/' or '..' return os.path.basename(name) if name else "unknown" @classmethod def _storage_data_type(cls, log_file_name: str) -> str: log_file_name = cls._sanitize_path_component(log_file_name) or WorkspaceConstants.LOG_FILE_NAME if log_file_name == WorkspaceConstants.ERROR_LOG_FILE_NAME: return DataTypes.ERRORLOG.value return f"{DataTypes.LOG.value}_{log_file_name}" def _get_trusted_stream_identity(self, fl_ctx: FLContext): peer_ctx = fl_ctx.get_peer_context() if peer_ctx is None: return "unknown", "unknown" client = self._sanitize_path_component(peer_ctx.get_identity_name(default="unknown")) job_id = self._sanitize_path_component(peer_ctx.get_job_id(default="unknown")) return client, job_id def _is_site_allowed_to_stream(self, client_name: str, fl_ctx: FLContext) -> bool: """Look up the registered Client and check its site_config (forwarded from the client's resources.json during registration). Default-allow: only an explicit ``allow_log_streaming=False`` in the site_config disables streaming. Missing fields, unknown clients, and unavailable engine all resolve to allowed. """ engine = fl_ctx.get_engine() if engine is None: return True getter = getattr(engine, "get_client_from_name", None) if getter is None: return True client = getter(client_name) if client is None: return True site_config = client.get_site_config() or {} return bool(site_config.get(ALLOW_LOG_STREAMING_VAR, True)) def _record_unauthorized(self, key: tuple) -> bool: """Atomically record an unauthorized (client, job_id) pair. Returns True if this is the first time we've seen this key (caller should emit the once-per-job log line). Bounds the bookkeeping with FIFO eviction so the receiver can run for the lifetime of the server without unbounded memory growth. """ with self._unauthorized_lock: if key in self._unauthorized_logged: return False if len(self._unauthorized_logged) >= _UNAUTHORIZED_LOG_CAP: self._unauthorized_logged.popitem(last=False) self._unauthorized_logged[key] = None return True def _on_chunk_received(self, data: bytes, stream_ctx: StreamContext, fl_ctx: FLContext): f = stream_ctx.get(_KEY_RECV_FILE) if f is None: client, job_id = self._get_trusted_stream_identity(fl_ctx) if not self._is_site_allowed_to_stream(client, fl_ctx): # Drop the chunk: the site has explicitly disabled streaming. # Log once per (client, job_id) so the operator can see it # without flooding the server log. if self._record_unauthorized((client, job_id)): self.log_error( fl_ctx, f"Dropping live log chunk from {client} for job {job_id}: site has " f"'{ALLOW_LOG_STREAMING_VAR}' disabled in its resources.json", ) return log_file_name = self._sanitize_path_component(LogStreamer.get_file_name(stream_ctx) or "log.txt") path = os.path.join(self._effective_dest_dir(), job_id, client, log_file_name) os.makedirs(os.path.dirname(path), exist_ok=True) self.log_debug(fl_ctx, f"Opening log file for {client} job {job_id}: {path}") f = open(path, "wb") stream_ctx[_KEY_RECV_FILE] = f stream_ctx[_KEY_RECV_PATH] = path f.write(data) f.flush() def _on_stream_done(self, stream_ctx: StreamContext, fl_ctx: FLContext): f = stream_ctx.get(_KEY_RECV_FILE) if f is not None: f.close() stream_ctx[_KEY_RECV_FILE] = None rc = LogStreamer.get_rc(stream_ctx) client, job_id = self._get_trusted_stream_identity(fl_ctx) if rc != ReturnCode.OK: file_path = stream_ctx.get(_KEY_RECV_PATH) self.log_warning( fl_ctx, f"Live log stream from {client} job {job_id} ended with rc={rc}; partial log retained at {file_path}", ) return file_path = stream_ctx.get(_KEY_RECV_PATH) if not file_path: self.log_warning(fl_ctx, f"No log data received from {client} for job {job_id}") return log_type = LogStreamer.get_file_name(stream_ctx) engine = fl_ctx.get_engine() job_manager = engine.get_component(SystemComponents.JOB_MANAGER) if job_manager is None: # No job manager (e.g. simulator): move file from temp staging dir to the # job's workspace run directory so it lives alongside other job artifacts. if self._dest_dir is None: workspace = getattr(engine, "get_workspace", lambda: None)() if workspace is not None: dest_path = os.path.join(workspace.get_run_dir(job_id), client, log_type) try: os.makedirs(os.path.dirname(dest_path), exist_ok=True) shutil.move(file_path, dest_path) self.log_info(fl_ctx, f"Saved live log '{log_type}' from {client} to {dest_path}") return except Exception: self.log_exception(fl_ctx, f"Failed to move live log to workspace; retained at {file_path}") return self.log_info(fl_ctx, f"Live log '{log_type}' from {client} retained at {file_path}") return data_type = self._storage_data_type(log_type) self.log_info( fl_ctx, f"Saving live log '{log_type}' as '{data_type}' from {client} for job {job_id}: {file_path}" ) job_manager.set_client_data(job_id, file_path, client, data_type, fl_ctx) def _register(self, event_type: str, fl_ctx: FLContext): # Re-register on every triggering event. The active ObjectStreamer is # owned by the current run_manager and is replaced whenever the engine # transitions to a new run, so caching a "registered once" flag would # leave the new run_manager's registry empty and produce # "no stream processing info registered for log_streaming:live_log" # on the first incoming chunk. LogStreamer.register_stream_processing( fl_ctx, channel=Channels.LOG_STREAMING_CHANNEL, topic=LIVE_LOG_TOPIC, chunk_received_cb=self._on_chunk_received, stream_done_cb=self._on_stream_done, idle_timeout=self._idle_timeout, )