nvflare.app_common.streamers.log_streamer module
LogStreamer — live-tailing streamer for growing log files
LogStreamer differs from FileStreamer in three fundamental ways:
Growing file (live tail) FileStreamer treats a file as a static snapshot: it opens the file, reads it to the end, and sends EOF. LogStreamer instead tails the file — it keeps reading as new bytes are appended, blocking between polls until the caller sets
stop_eventand all buffered bytes have been flushed. It also survives log rotation: when the inode changes (or the file shrinks below the current read position) it closes the stale handle and reopens the file at the new path from the beginning.Dead-sender detection via liveness heartbeats Because the stream may be idle for extended periods (no new log lines), the receiver cannot distinguish “nothing to send yet” from “sender has crashed”. LogStreamer solves this with periodic heartbeat messages: if no data chunk has been sent for
liveness_intervalseconds the producer emits a zero-payload heartbeat. The consumer resets its idle clock on every message — data or heartbeat — so the idle timer only counts genuine silence on the network.Automatic stream closure on idle timeout The receiver runs a background watchdog thread. If no message of any kind (data or heartbeat) arrives for
idle_timeoutseconds the watchdog concludes the sender is unreachable, closes the stream via the engine’sEND_STREAMhook, and firesstream_done_cbwithStreamContextKey.RC = ReturnCode.TIMEOUT. To avoid spurious timeouts,liveness_intervalmust be strictly less thanidle_timeout. This can be validated only when the caller knows both values; otherwise ensure they are consistent at deployment.
Relationship between the two parameters
liveness_interval: how often the sender emits a heartbeat when idleidle_timeout: how long the receiver waits before declaring the sender dead
Rule: liveness_interval < idle_timeout. With the defaults (10 s and
30 s) a healthy sender heartbeats every 10 s, so the receiver’s 30 s timer
is always reset well before it fires.
- class LogChunkConsumerFactory(chunk_received_cb, idle_timeout: float, stream_done_cb, cb_kwargs: dict)[source]
Bases:
ConsumerFactory- get_consumer(stream_ctx: dict, fl_ctx: FLContext) ObjectConsumer[source]
Called to get an ObjectConsumer to process a new stream on the receiving side. This is called only when the 1st streaming object is received for each stream.
- Parameters:
stream_ctx – the context of the stream
fl_ctx – FLContext object
Returns: an ObjectConsumer
- class LogStreamer[source]
Bases:
StreamerBase- static get_file_name(stream_ctx: dict)[source]
Get the source log file’s base name from the stream context.
Intended for use inside
chunk_received_cborstream_done_cbon the receiving side.- Parameters:
stream_ctx – the stream context
Returns: file base name string, or None
- static register_stream_processing(fl_ctx: FLContext, channel: str, topic: str, chunk_received_cb=None, stream_done_cb=None, idle_timeout: float = 30.0, **cb_kwargs)[source]
Register for live log stream processing on the receiving side.
- Parameters:
fl_ctx – the FLContext object
channel – the app channel
topic – the app topic
chunk_received_cb – called for each received data chunk (heartbeats are silently absorbed and never forwarded):
chunk_received_cb(data: bytes, stream_ctx: StreamContext, fl_ctx: FLContext, **cb_kwargs)stream_done_cb – called when the stream ends (normal EOF, engine abort, or idle timeout); follows
stream_done_cb_signatureinnvflare.apis.streaming. Thestream_ctxpassed to this callback will containStreamContextKey.RC = ReturnCode.TIMEOUTwhen the call is triggered by the idle-timeout watchdog.idle_timeout – seconds without any message (data or heartbeat) before the receiver declares the sender dead and closes the stream (default 30.0). Set to 0 to disable.
**cb_kwargs – kwargs forwarded to both callbacks
Returns: None
Notes
stream_done_cbis guaranteed to be called at most once per stream even when both the idle-timeout path and the normal engine completion path race.
- static stream_log(channel: str, topic: str, stream_ctx: dict, targets: List[str], file_name: str, fl_ctx: FLContext, stop_event: Event | None = None, poll_interval: float = 0.5, liveness_interval: float = 10.0, idle_timeout: float | None = None, chunk_size: int | None = None, chunk_timeout: float | None = None, optional: bool = False, secure: bool = False)[source]
Tail and stream a live log file to one or more targets.
Continuously reads new data appended to file_name (including across log rotations) and streams it to targets. Blocks until stop_event is set and all buffered data has been flushed, or until the run is aborted.
Log rotation is detected by comparing the file’s inode after each poll. When a rotation is found the old file handle is closed and the new file is opened from the beginning. Truncation (size decreased below the current read position) is treated the same way.
Liveness heartbeats — when no new data has been sent for liveness_interval seconds a lightweight heartbeat message (no payload) is sent to each target. This lets the receiver distinguish “log is quiet” from “sender process died”. The receiver’s idle-timeout clock is reset on every message, including heartbeats, so the timeout only fires when the sender is genuinely unreachable.
If the file does not exist when
stream_logis called the producer waits, polling every poll_interval seconds until it appears.- Parameters:
channel – the app channel
topic – the app topic
stream_ctx – context data for this stream
targets – receiving site names
file_name – full path of the log file to tail
fl_ctx – a FLContext object
stop_event – a
threading.Eventused to signal the streamer to finish. When set, the streamer drains any remaining unread bytes and then sends EOF. If None a new Event is created; the only way to stop in that case is via the run abort signal.poll_interval – seconds to wait between polls when no new data is available (default 0.5)
liveness_interval – seconds of log silence before sending a heartbeat to receivers (default 10.0)
idle_timeout – optional receiver idle-timeout value used only for local validation. When provided and greater than zero,
liveness_intervalmust be strictly less thanidle_timeout.chunk_size – bytes per chunk; defaults to 64 KB
chunk_timeout – per-chunk send timeout in seconds; defaults to 5.0
optional – whether the stream is optional
secure – whether P2P security is required
- Returns: result from
engine.stream_objects— same shape as FileStreamer.stream_file()