nvflare.fuel.utils.pipe.pipe_handler module

class PipeHandler(pipe: Pipe, read_interval=0.1, heartbeat_interval=5.0, heartbeat_timeout=30.0, resend_interval=2.0, max_resends=5, default_request_timeout=5.0)[source]

Bases: object

PipeHandler monitors a pipe for messages from the peer. It reads the pipe periodically and puts received data in a message queue in the order the data is received.

If the received data indicates a peer status change (END, ABORT, GONE), the data is added the message queue if the status_cb is not registered. If the status_cb is registered, the data is NOT added the message queue. Instead, the status_cb is called with the status data.

The PipeHandler should be used as follows:
  • The app creates a pipe and then creates the PipeHandler object for the pipe;

  • The app starts the PipeHandler. This step must be performed, or data in the pipe won’t be read.

  • The app should call handler.get_next() periodically to process the message in the queue. This method may return None if there is no message in the queue. The app also must handle the status change event from the peer if it does not set the status_cb. The status change event has the special topic value of Topic.END or Topic.ABORT.

  • Optionally, the app can set a status_cb and handle the peer’s status change immediately.

  • Stop the handler when the app is finished.

NOTE: the handler uses a heartbeat mechanism to detect that the peer may be disconnected (gone). It sends a heartbeat message to the peer based on configured interval. It also expects heartbeats from the peer. If peer’s heartbeat is not received for configured time, it will be treated as disconnected, and a GONE status is generated for the app to handle.

Constructor of the PipeHandler.

Parameters:
  • pipe (Pipe) – the pipe to be monitored.

  • read_interval (float) – how often to read from the pipe.

  • heartbeat_interval (float) – how often to send a heartbeat to the peer.

  • heartbeat_timeout (float) – how long to wait for a heartbeat from the peer before treating the peer as gone, 0 means DO NOT check for heartbeat.

  • resend_interval (float) – how often to resend a message if failing to send. None means no resend. Note that if the pipe does not support resending, then no resend.

  • max_resends (int, optional) – max number of resends. None means no limit.

  • default_request_timeout (float) – default timeout for request if timeout not specified.

get_next() Message | None[source]

Gets the next message from the message queue.

Returns:

A Message at the top of the message queue. If the queue is empty, returns None.

notify_abort(data)[source]

Notifies the peer that the communication is aborted.

notify_end(data)[source]

Notifies the peer that the communication is ended normally.

pause()[source]

Stops heartbeat checking and sending.

resume()[source]

Resumes heartbeat checking and sending.

send_to_peer(msg: Message, timeout=None, abort_signal: Signal | None = None) bool[source]

Sends a message to peer.

Parameters:
  • msg – message to be sent

  • timeout – how long to wait for the peer to read the data. If not specified, will use self.default_request_timeout.

  • abort_signal

Returns:

Whether the peer has read the data.

set_message_cb(cb, *args, **kwargs)[source]

Set CB for message handling. When a regular message is received, this CB is called. If the msg CB is not set, the handler simply adds the received msg to the message queue. If the msg CB is set, the received msg will NOT be added to the message queue.

The CB must conform to this signature:

cb(msg, *args, **kwargs)

where the *args and *kwargs are ones passed to this call.

The CB is called from the thread that reads from the pipe, hence it should be short-lived. Do not put heavy processing logic in the CB.

Parameters:
  • cb – the callback func

  • *args – the args to be passed to the cb

  • **kwargs – the kwargs to be passed to the cb

Returns: None

set_status_cb(cb, *args, **kwargs)[source]

Set CB for status handling. When the peer status is changed (ABORT, END, GONE), this CB is called. If status CB is not set, the handler simply adds the status change event (topic) to the message queue.

The status_cb must conform to this signature:

cb(msg, *args, **kwargs)

where the *args and *kwargs are ones passed to this call. The status_cb is called from the thread that reads from the pipe, hence it should be short-lived. Do not put heavy processing logic in the status_cb.

Parameters:
  • cb – the callback func

  • *args – the args to be passed to the cb

  • **kwargs – the kwargs to be passed to the cb

Returns: None

start()[source]

Starts the PipeHandler. Note: before calling this method, the pipe managed by this PipeHandler must have been opened.

stop(close_pipe=True)[source]

Stops the handler and optionally close the monitored pipe.

Parameters:

close_pipe – whether to close the monitored pipe.