nvflare.fuel.utils.pipe.pipe_handler module¶
- class PipeHandler(pipe: Pipe, read_interval=0.1, heartbeat_interval=5.0, heartbeat_timeout=30.0, resend_interval=2.0, max_resends=5, default_request_timeout=5.0)[source]¶
Bases:
object
Monitors a pipe for messages from the peer.
PipeHandler reads the pipe periodically and puts received data in a message queue in the order the data is received.
If the received data indicates a peer status change (END, ABORT, GONE):
The data will be added to the message queue if the status callback is not registered.
Otherwise, the data will NOT be added to the message queue, the status callback will be called with the data.
- The PipeHandler should be used as follows:
The app creates a pipe and then creates the PipeHandler object for the pipe;
The app starts the PipeHandler. This step must be performed, or data in the pipe won’t be read.
The app should call handler.get_next() periodically to process the message in the queue. This method may return None if there is no message in the queue. The app also must handle the status change event from the peer if it does not set the status callback. The status change event has the special topic value of Topic.END or Topic.ABORT.
Optionally, the app can set a status callback and handle the peer’s status change immediately.
Stop the handler when the app is finished.
Note
The handler uses a heartbeat mechanism to detect that the peer may be disconnected (gone). It sends a heartbeat message to the peer based on configured interval. It also expects heartbeats from the peer. If peer’s heartbeat is not received for configured time, it will be treated as disconnected, and a GONE status is generated for the app to handle.
Constructor of the PipeHandler.
- Parameters:
pipe (Pipe) – the pipe to be monitored.
read_interval (float) – how often to read from the pipe.
heartbeat_interval (float) – how often to send a heartbeat to the peer.
heartbeat_timeout (float) – how long to wait for a heartbeat from the peer before treating the peer as gone, 0 means DO NOT check for heartbeat.
resend_interval (float) – how often to resend a message if failing to send. None means no resend. Note that if the pipe does not support resending, then no resend.
max_resends (int, optional) – max number of resends. None means no limit.
default_request_timeout (float) – default timeout for request if timeout not specified.
- get_next() Message | None [source]¶
Gets the next message from the message queue.
- Returns:
A Message at the top of the message queue. If the queue is empty, returns None.
- send_to_peer(msg: Message, timeout=None, abort_signal: Signal | None = None) bool [source]¶
Sends a message to peer.
- Parameters:
msg – message to be sent
timeout – how long to wait for the peer to read the data. If not specified, will use
self.default_request_timeout
.abort_signal
- Returns:
Whether the peer has read the data.
- set_message_cb(cb, *args, **kwargs)[source]¶
Sets a callback function for message handling.
When a regular message is received, this cb is called. If the cb is not set, the handler simply adds the received msg to the message queue. If the cb is set, the received msg will NOT be added to the message queue.
The cb must conform to this signature:
where the *args and *kwargs are ones passed to this call.
The cb is called from the thread that reads from the pipe, hence it should be short-lived. Do not put heavy processing logic in the CB.
- Parameters:
cb – the callback func
*args – the args to be passed to the cb
**kwargs – the kwargs to be passed to the cb
Returns: None
- set_status_cb(cb, *args, **kwargs)[source]¶
Sets a callback function for status handling.
When the peer status is changed (ABORT, END, GONE), this callback is called. If callback is not set, the handler simply adds the status change event (topic) to the message queue.
The callback function must conform to this signature:
where the *args and *kwargs are ones passed to this call. The cb is called from the thread that reads from the pipe, hence it should be short-lived. Do not put heavy processing logic in the cb.
- Parameters:
cb – the callback function.
*args – the args to be passed to the cb
**kwargs – the kwargs to be passed to the cb
Returns: None