nvflare.app_common.streamers.log_streamer module

LogStreamer — live-tailing streamer for growing log files

LogStreamer differs from FileStreamer in three fundamental ways:

  1. Growing file (live tail) FileStreamer treats a file as a static snapshot: it opens the file, reads it to the end, and sends EOF. LogStreamer instead tails the file — it keeps reading as new bytes are appended, blocking between polls until the caller sets stop_event and all buffered bytes have been flushed. It also survives log rotation: when the inode changes (or the file shrinks below the current read position) it closes the stale handle and reopens the file at the new path from the beginning.

  2. Dead-sender detection via liveness heartbeats Because the stream may be idle for extended periods (no new log lines), the receiver cannot distinguish “nothing to send yet” from “sender has crashed”. LogStreamer solves this with periodic heartbeat messages: if no data chunk has been sent for liveness_interval seconds the producer emits a zero-payload heartbeat. The consumer resets its idle clock on every message — data or heartbeat — so the idle timer only counts genuine silence on the network.

  3. Automatic stream closure on idle timeout The receiver runs a background watchdog thread. If no message of any kind (data or heartbeat) arrives for idle_timeout seconds the watchdog concludes the sender is unreachable, closes the stream via the engine’s END_STREAM hook, and fires stream_done_cb with StreamContextKey.RC = ReturnCode.TIMEOUT. To avoid spurious timeouts, liveness_interval must be strictly less than idle_timeout. This can be validated only when the caller knows both values; otherwise ensure they are consistent at deployment.

Relationship between the two parameters

  • liveness_interval: how often the sender emits a heartbeat when idle

  • idle_timeout: how long the receiver waits before declaring the sender dead

Rule: liveness_interval < idle_timeout. With the defaults (10 s and 30 s) a healthy sender heartbeats every 10 s, so the receiver’s 30 s timer is always reset well before it fires.

class LogChunkConsumerFactory(chunk_received_cb, idle_timeout: float, stream_done_cb, cb_kwargs: dict)[source]

Bases: ConsumerFactory

get_consumer(stream_ctx: dict, fl_ctx: FLContext) ObjectConsumer[source]

Called to get an ObjectConsumer to process a new stream on the receiving side. This is called only when the 1st streaming object is received for each stream.

Parameters:
  • stream_ctx – the context of the stream

  • fl_ctx – FLContext object

Returns: an ObjectConsumer

class LogStreamer[source]

Bases: StreamerBase

static get_file_name(stream_ctx: dict)[source]

Get the source log file’s base name from the stream context.

Intended for use inside chunk_received_cb or stream_done_cb on the receiving side.

Parameters:

stream_ctx – the stream context

Returns: file base name string, or None

static register_stream_processing(fl_ctx: FLContext, channel: str, topic: str, chunk_received_cb=None, stream_done_cb=None, idle_timeout: float = 30.0, **cb_kwargs)[source]

Register for live log stream processing on the receiving side.

Parameters:
  • fl_ctx – the FLContext object

  • channel – the app channel

  • topic – the app topic

  • chunk_received_cb – called for each received data chunk (heartbeats are silently absorbed and never forwarded): chunk_received_cb(data: bytes, stream_ctx: StreamContext, fl_ctx: FLContext, **cb_kwargs)

  • stream_done_cb – called when the stream ends (normal EOF, engine abort, or idle timeout); follows stream_done_cb_signature in nvflare.apis.streaming. The stream_ctx passed to this callback will contain StreamContextKey.RC = ReturnCode.TIMEOUT when the call is triggered by the idle-timeout watchdog.

  • idle_timeout – seconds without any message (data or heartbeat) before the receiver declares the sender dead and closes the stream (default 30.0). Set to 0 to disable.

  • **cb_kwargs – kwargs forwarded to both callbacks

Returns: None

Notes

stream_done_cb is guaranteed to be called at most once per stream even when both the idle-timeout path and the normal engine completion path race.

static stream_log(channel: str, topic: str, stream_ctx: dict, targets: List[str], file_name: str, fl_ctx: FLContext, stop_event: Event | None = None, poll_interval: float = 0.5, liveness_interval: float = 10.0, idle_timeout: float | None = None, chunk_size: int | None = None, chunk_timeout: float | None = None, optional: bool = False, secure: bool = False)[source]

Tail and stream a live log file to one or more targets.

Continuously reads new data appended to file_name (including across log rotations) and streams it to targets. Blocks until stop_event is set and all buffered data has been flushed, or until the run is aborted.

Log rotation is detected by comparing the file’s inode after each poll. When a rotation is found the old file handle is closed and the new file is opened from the beginning. Truncation (size decreased below the current read position) is treated the same way.

Liveness heartbeats — when no new data has been sent for liveness_interval seconds a lightweight heartbeat message (no payload) is sent to each target. This lets the receiver distinguish “log is quiet” from “sender process died”. The receiver’s idle-timeout clock is reset on every message, including heartbeats, so the timeout only fires when the sender is genuinely unreachable.

If the file does not exist when stream_log is called the producer waits, polling every poll_interval seconds until it appears.

Parameters:
  • channel – the app channel

  • topic – the app topic

  • stream_ctx – context data for this stream

  • targets – receiving site names

  • file_name – full path of the log file to tail

  • fl_ctx – a FLContext object

  • stop_event – a threading.Event used to signal the streamer to finish. When set, the streamer drains any remaining unread bytes and then sends EOF. If None a new Event is created; the only way to stop in that case is via the run abort signal.

  • poll_interval – seconds to wait between polls when no new data is available (default 0.5)

  • liveness_interval – seconds of log silence before sending a heartbeat to receivers (default 10.0)

  • idle_timeout – optional receiver idle-timeout value used only for local validation. When provided and greater than zero, liveness_interval must be strictly less than idle_timeout.

  • chunk_size – bytes per chunk; defaults to 64 KB

  • chunk_timeout – per-chunk send timeout in seconds; defaults to 5.0

  • optional – whether the stream is optional

  • secure – whether P2P security is required

Returns: result from engine.stream_objects — same shape as

FileStreamer.stream_file()

dispatch_stream_done(stream_ctx: dict, fl_ctx: FLContext, **kwargs)[source]