Source code for nvflare.app_common.streamers.log_streamer

# Copyright (c) 2026, NVIDIA CORPORATION.  All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
LogStreamer — live-tailing streamer for growing log files
=========================================================

LogStreamer differs from FileStreamer in three fundamental ways:

1. **Growing file (live tail)**
   FileStreamer treats a file as a static snapshot: it opens the file, reads
   it to the end, and sends EOF.  LogStreamer instead *tails* the file — it
   keeps reading as new bytes are appended, blocking between polls until the
   caller sets ``stop_event`` and all buffered bytes have been flushed.  It
   also survives log rotation: when the inode changes (or the file shrinks
   below the current read position) it closes the stale handle and reopens
   the file at the new path from the beginning.

2. **Dead-sender detection via liveness heartbeats**
   Because the stream may be idle for extended periods (no new log lines),
   the receiver cannot distinguish "nothing to send yet" from "sender has
   crashed".  LogStreamer solves this with periodic *heartbeat* messages: if
   no data chunk has been sent for ``liveness_interval`` seconds the producer
   emits a zero-payload heartbeat.  The consumer resets its idle clock on
   every message — data or heartbeat — so the idle timer only counts genuine
   silence on the network.

3. **Automatic stream closure on idle timeout**
   The receiver runs a background watchdog thread.  If no message of any
   kind (data or heartbeat) arrives for ``idle_timeout`` seconds the watchdog
   concludes the sender is unreachable, closes the stream via the engine's
   ``END_STREAM`` hook, and fires ``stream_done_cb`` with
   ``StreamContextKey.RC = ReturnCode.TIMEOUT``.  To avoid spurious timeouts,
   ``liveness_interval`` must be strictly less than ``idle_timeout``.  This
   can be validated only when the caller knows both values; otherwise ensure
   they are consistent at deployment.

Relationship between the two parameters
----------------------------------------
- ``liveness_interval``: how often the *sender* emits a heartbeat when idle
- ``idle_timeout``: how long the *receiver* waits before declaring the sender dead

Rule: ``liveness_interval < idle_timeout``.  With the defaults (10 s and
30 s) a healthy sender heartbeats every 10 s, so the receiver's 30 s timer
is always reset well before it fires.
"""
import os
import threading
import time
from typing import List, Tuple

from nvflare.apis.fl_context import FLContext
from nvflare.apis.shareable import ReturnCode, Shareable, make_reply
from nvflare.apis.streaming import ConsumerFactory, ObjectConsumer, StreamableEngine, StreamContext, StreamContextKey
from nvflare.fuel.utils.validation_utils import check_non_negative_number, check_positive_int, check_positive_number

from .streamer_base import (  # noqa: F401
    KEY_DATA,
    KEY_DATA_SIZE,
    KEY_EOF,
    KEY_FILE_NAME,
    KEY_HEARTBEAT,
    KEY_STREAM_DONE_CB,
    BaseChunkConsumer,
    BaseChunkProducer,
    StreamerBase,
)


def _make_once(fn):
    """Return a thread-safe wrapper that calls *fn* at most once across all callers.

    Used so that ``stream_done_cb`` is executed exactly once regardless of whether
    the idle-timeout watchdog or the normal engine completion path fires first.
    """
    lock = threading.Lock()
    called = [False]

    def wrapper(*args, **kwargs):
        with lock:
            if called[0]:
                return
            called[0] = True
        return fn(*args, **kwargs)

    return wrapper


class _LogChunkConsumer(BaseChunkConsumer):
    def __init__(
        self,
        stream_ctx: StreamContext,
        chunk_received_cb,
        idle_timeout: float,
        cb_kwargs: dict,
        fl_ctx: FLContext = None,
    ):
        super().__init__()
        self._chunk_received_cb = chunk_received_cb
        self._idle_timeout = idle_timeout
        self._cb_kwargs = cb_kwargs
        self._stream_ctx = stream_ctx
        self._fl_ctx = fl_ctx  # updated on each consume(); seeded from get_consumer()
        self._last_received_time = time.time()
        self._done = threading.Event()

        if idle_timeout > 0:
            t = threading.Thread(target=self._watchdog, daemon=True)
            t.start()

    def _watchdog(self):
        """Background thread: end this stream when it goes idle."""
        poll = min(1.0, self._idle_timeout / 3)
        while not self._done.wait(timeout=poll):
            elapsed = time.time() - self._last_received_time
            if elapsed >= self._idle_timeout:
                self.logger.warning(f"log stream idle for {elapsed:.1f}s (threshold {self._idle_timeout}s) — closing")
                self._done.set()
                end_stream = self._stream_ctx.get(StreamContextKey.END_STREAM)
                if callable(end_stream):
                    # Use the latest FLContext we have seen. It may still be None if
                    # the sender dies immediately after opening the stream, but ending
                    # the transport still lets stream_runner clean up its tx_table.
                    end_stream(ReturnCode.TIMEOUT, self._fl_ctx)
                else:
                    self.logger.error("missing end_stream hook in stream context")
                return

    def consume(
        self,
        shareable: Shareable,
        stream_ctx: StreamContext,
        fl_ctx: FLContext,
    ) -> Tuple[bool, Shareable]:
        self._last_received_time = time.time()
        self._fl_ctx = fl_ctx

        # If the watchdog already fired, reject any late-arriving chunks from the
        # sender after this stream has already been closed locally.
        if self._done.is_set():
            return False, make_reply(ReturnCode.TASK_ABORTED)

        # Heartbeat: sender is alive but idle — update the liveness timestamp and continue.
        if shareable.get(KEY_HEARTBEAT):
            return True, make_reply(ReturnCode.OK)

        data = shareable.get(KEY_DATA)
        data_size = shareable.get(KEY_DATA_SIZE)
        self._validate_chunk(data, data_size)

        if data and self._chunk_received_cb:
            self._chunk_received_cb(data, stream_ctx, fl_ctx, **self._cb_kwargs)

        eof = shareable.get(KEY_EOF)
        if eof:
            return False, make_reply(ReturnCode.OK)
        return True, make_reply(ReturnCode.OK)

    def finalize(self, stream_ctx: StreamContext, fl_ctx: FLContext):
        # Stop the watchdog — the stream ended normally via EOF or engine abort.
        self._done.set()


[docs] class LogChunkConsumerFactory(ConsumerFactory): def __init__(self, chunk_received_cb, idle_timeout: float, stream_done_cb, cb_kwargs: dict): self._chunk_received_cb = chunk_received_cb self._idle_timeout = idle_timeout self._stream_done_cb = stream_done_cb self._cb_kwargs = cb_kwargs
[docs] def get_consumer(self, stream_ctx: StreamContext, fl_ctx: FLContext) -> ObjectConsumer: if self._stream_done_cb: stream_ctx[KEY_STREAM_DONE_CB] = _make_once(self._stream_done_cb) return _LogChunkConsumer( stream_ctx=stream_ctx, chunk_received_cb=self._chunk_received_cb, idle_timeout=self._idle_timeout, cb_kwargs=self._cb_kwargs, fl_ctx=fl_ctx, )
[docs] def dispatch_stream_done(stream_ctx: StreamContext, fl_ctx: FLContext, **kwargs): stream_done_cb = stream_ctx.get(KEY_STREAM_DONE_CB) if stream_done_cb: return stream_done_cb(stream_ctx, fl_ctx, **kwargs) return None
class _LogTailProducer(BaseChunkProducer): _MAX_BOOTSTRAP_MISSES = 10 def __init__( self, file_name: str, chunk_size: int, chunk_timeout: float, poll_interval: float, stop_event: threading.Event, liveness_interval: float, ): super().__init__() self.file_name = file_name self.chunk_size = chunk_size self.chunk_timeout = chunk_timeout self.poll_interval = poll_interval self.stop_event = stop_event self._liveness_interval = liveness_interval self._last_send_time = time.time() self.file = None self.inode = None self._draining = False # True while doing the post-stop drain retry # Bootstrap-tolerance bookkeeping: at job startup the receiver may not # have registered its handler yet when the first chunk arrives, in # which case the server replies EXECUTION_EXCEPTION and (without this # tolerance) the stream tears down with a misleading ERROR log line # and no further bytes are streamed. Until we see the first OK reply # we treat an EXECUTION_EXCEPTION as a transient miss, roll the file # offset back, and let produce() re-emit the same bytes. self._first_ok_received = False self._bootstrap_miss_count = 0 self._last_data_offset = None self._last_was_data = False self._open_file() def _open_file(self): try: self.file = open(self.file_name, "rb") self.inode = os.stat(self.file_name).st_ino self.logger.debug(f"opened log file {self.file_name} inode={self.inode}") except OSError as e: self.logger.warning(f"cannot open log file {self.file_name}: {e}") self.file = None self.inode = None def _check_rotation(self) -> bool: """Return True if the log file has been rotated (inode change or truncation).""" try: stat = os.stat(self.file_name) except OSError: # File temporarily absent during a rotation in progress — don't switch yet. return False if stat.st_ino != self.inode: self.logger.info(f"log rotation detected (inode change): {self.file_name}") return True if stat.st_size < self.file.tell(): self.logger.info(f"log rotation detected (truncation): {self.file_name}") return True return False def produce( self, stream_ctx: StreamContext, fl_ctx: FLContext, ) -> Tuple[Shareable, float]: abort_signal = fl_ctx.get_run_abort_signal() while True: # Detect and handle log rotation: close stale handle, reopen at new path. if self.file and self._check_rotation(): self.file.close() self.file = None self._open_file() elif not self.file and os.path.exists(self.file_name): # File appeared (or reappeared after rotation); open it now. self._open_file() # Read the next chunk from the current position. if self.file: # Record the pre-read offset so process_replies() can roll back # if the first send hits a bootstrap miss on the receiver. if not self._first_ok_received: try: self._last_data_offset = self.file.tell() except OSError: self._last_data_offset = None chunk = self.file.read(self.chunk_size) if chunk: self._draining = False # new data arrived — reset drain state self._last_send_time = time.time() self._last_was_data = True result = Shareable() result[KEY_DATA] = chunk result[KEY_DATA_SIZE] = len(chunk) result[KEY_EOF] = False return result, self.chunk_timeout # No new data — drain completely before honouring stop/abort so we # never drop bytes written just before the signal fires. # NOTE: abort_signal is intentionally checked here (after the read) # rather than at the top of the loop because in the simulator the # abort signal is triggered *before* ABOUT_TO_END_RUN fires, so # checking it first causes the producer to exit immediately and # lose any log bytes still buffered in the file. if self.stop_event.is_set() or (abort_signal and abort_signal.triggered): if not self._draining: # First empty read after stop — sleep one more interval and retry # to capture log bytes written by cleanup / END_RUN handlers that # run just after the stop signal fires. self._draining = True time.sleep(self.poll_interval) continue # Second consecutive empty read after stop — nothing more to drain. self.eof = True break # Send a heartbeat if we have been idle longer than liveness_interval. # This lets the receiver distinguish "no new logs yet" from "sender died". if time.time() - self._last_send_time >= self._liveness_interval: self._last_send_time = time.time() self._last_was_data = False hb = Shareable() hb[KEY_HEARTBEAT] = True hb[KEY_DATA] = None hb[KEY_DATA_SIZE] = 0 hb[KEY_EOF] = False return hb, self.chunk_timeout time.sleep(self.poll_interval) # Signal end-of-stream to receivers. result = Shareable() result[KEY_DATA] = None result[KEY_DATA_SIZE] = 0 result[KEY_EOF] = True return result, self.chunk_timeout def close(self): if self.file: self.file.close() self.file = None def process_replies(self, replies, stream_ctx, fl_ctx): # Tolerate a bounded number of bootstrap misses at job startup. The # receiver registers its handler on START_RUN in the server's job # subprocess; if the client's first chunk arrives before that handler is # wired up, the server replies EXECUTION_EXCEPTION. Without this guard # the base class logs an ERROR and tears the stream down for the rest of # the job. Instead, until we see the first OK reply, treat a small number # of uniform EXECUTION_EXCEPTION replies as transient: roll the file # offset back so the bytes are re-emitted on the next produce(), log at # debug, and keep the stream alive. After the retry budget is exhausted, # or after the first OK reply, fall through to the base behavior so # genuine receiver failures are still surfaced. if not self._first_ok_received and replies: transient = all( reply.get_return_code(ReturnCode.OK) == ReturnCode.EXECUTION_EXCEPTION for reply in replies.values() ) if transient and self._bootstrap_miss_count < self._MAX_BOOTSTRAP_MISSES: self._bootstrap_miss_count += 1 if self._last_was_data and self.file is not None and self._last_data_offset is not None: try: self.file.seek(self._last_data_offset) except OSError as e: self.logger.warning(f"could not seek back after bootstrap miss: {e}") self.logger.debug( f"transient bootstrap miss {self._bootstrap_miss_count}/{self._MAX_BOOTSTRAP_MISSES} " f"on first chunk to {list(replies)}; rolling back and retrying" ) return None # keep producing — receiver should be ready by next send if any(reply.get_return_code(ReturnCode.OK) == ReturnCode.OK for reply in replies.values()): self._first_ok_received = True self._bootstrap_miss_count = 0 return super().process_replies(replies, stream_ctx, fl_ctx)
[docs] class LogStreamer(StreamerBase):
[docs] @staticmethod def register_stream_processing( fl_ctx: FLContext, channel: str, topic: str, chunk_received_cb=None, stream_done_cb=None, idle_timeout: float = 30.0, **cb_kwargs, ): """Register for live log stream processing on the receiving side. Args: fl_ctx: the FLContext object channel: the app channel topic: the app topic chunk_received_cb: called for each received data chunk (heartbeats are silently absorbed and never forwarded): ``chunk_received_cb(data: bytes, stream_ctx: StreamContext, fl_ctx: FLContext, **cb_kwargs)`` stream_done_cb: called when the stream ends (normal EOF, engine abort, or idle timeout); follows ``stream_done_cb_signature`` in ``nvflare.apis.streaming``. The ``stream_ctx`` passed to this callback will contain ``StreamContextKey.RC = ReturnCode.TIMEOUT`` when the call is triggered by the idle-timeout watchdog. idle_timeout: seconds without any message (data or heartbeat) before the receiver declares the sender dead and closes the stream (default 30.0). Set to 0 to disable. **cb_kwargs: kwargs forwarded to both callbacks Returns: None Notes: ``stream_done_cb`` is guaranteed to be called at most once per stream even when both the idle-timeout path and the normal engine completion path race. """ engine = fl_ctx.get_engine() if not isinstance(engine, StreamableEngine): raise RuntimeError(f"engine must be StreamableEngine but got {type(engine)}") engine.register_stream_processing( channel=channel, topic=topic, factory=LogChunkConsumerFactory(chunk_received_cb, idle_timeout, stream_done_cb, cb_kwargs), stream_done_cb=dispatch_stream_done if stream_done_cb else None, **cb_kwargs, )
[docs] @staticmethod def stream_log( channel: str, topic: str, stream_ctx: StreamContext, targets: List[str], file_name: str, fl_ctx: FLContext, stop_event: threading.Event = None, poll_interval: float = 0.5, liveness_interval: float = 10.0, idle_timeout: float = None, chunk_size: int = None, chunk_timeout: float = None, optional: bool = False, secure: bool = False, ): """Tail and stream a live log file to one or more targets. Continuously reads new data appended to *file_name* (including across log rotations) and streams it to *targets*. Blocks until *stop_event* is set **and** all buffered data has been flushed, or until the run is aborted. **Log rotation** is detected by comparing the file's inode after each poll. When a rotation is found the old file handle is closed and the new file is opened from the beginning. Truncation (size decreased below the current read position) is treated the same way. **Liveness heartbeats** — when no new data has been sent for *liveness_interval* seconds a lightweight heartbeat message (no payload) is sent to each target. This lets the receiver distinguish "log is quiet" from "sender process died". The receiver's idle-timeout clock is reset on every message, including heartbeats, so the timeout only fires when the sender is genuinely unreachable. If the file does not exist when ``stream_log`` is called the producer waits, polling every *poll_interval* seconds until it appears. Args: channel: the app channel topic: the app topic stream_ctx: context data for this stream targets: receiving site names file_name: full path of the log file to tail fl_ctx: a FLContext object stop_event: a :class:`threading.Event` used to signal the streamer to finish. When set, the streamer drains any remaining unread bytes and then sends EOF. If *None* a new Event is created; the only way to stop in that case is via the run abort signal. poll_interval: seconds to wait between polls when no new data is available (default 0.5) liveness_interval: seconds of log silence before sending a heartbeat to receivers (default 10.0) idle_timeout: optional receiver idle-timeout value used only for local validation. When provided and greater than zero, ``liveness_interval`` must be strictly less than ``idle_timeout``. chunk_size: bytes per chunk; defaults to 64 KB chunk_timeout: per-chunk send timeout in seconds; defaults to 5.0 optional: whether the stream is optional secure: whether P2P security is required Returns: result from ``engine.stream_objects`` — same shape as :meth:`FileStreamer.stream_file` """ if not chunk_size: chunk_size = 64 * 1024 check_positive_int("chunk_size", chunk_size) if not chunk_timeout: chunk_timeout = 5.0 check_positive_number("chunk_timeout", chunk_timeout) check_positive_number("poll_interval", poll_interval) check_positive_number("liveness_interval", liveness_interval) if idle_timeout is not None: check_non_negative_number("idle_timeout", idle_timeout) if idle_timeout > 0 and liveness_interval >= idle_timeout: raise ValueError( f"liveness_interval ({liveness_interval}s) must be less than idle_timeout ({idle_timeout}s)" ) if stop_event is None: stop_event = threading.Event() engine = fl_ctx.get_engine() if not isinstance(engine, StreamableEngine): raise RuntimeError(f"engine must be StreamableEngine but got {type(engine)}") if not stream_ctx: stream_ctx = {} stream_ctx[KEY_FILE_NAME] = os.path.basename(file_name) producer = _LogTailProducer(file_name, chunk_size, chunk_timeout, poll_interval, stop_event, liveness_interval) try: return engine.stream_objects( channel=channel, topic=topic, stream_ctx=stream_ctx, targets=targets, producer=producer, fl_ctx=fl_ctx, optional=optional, secure=secure, ) finally: producer.close()
[docs] @staticmethod def get_file_name(stream_ctx: StreamContext): """Get the source log file's base name from the stream context. Intended for use inside ``chunk_received_cb`` or ``stream_done_cb`` on the receiving side. Args: stream_ctx: the stream context Returns: file base name string, or None """ return stream_ctx.get(KEY_FILE_NAME)