nvflare.fuel.utils.pipe.pipe module

class Message(msg_type: str, topic: str, data: Any, msg_id=None, req_id=None)[source]

Bases: object

REPLY = 'REP'
REQUEST = 'REQ'
static new_reply(topic: str, data: Any, req_msg_id, msg_id=None)[source]
static new_request(topic: str, data: Any, msg_id=None)[source]
class Pipe(mode: Mode)[source]

Bases: AttributesExportable, ABC

Creates the pipe.

Parameters:

mode (Mode) – Mode of the endpoint. A pipe has two endpoints. An endpoint can be either the one that initiates communication or the one listening.

abstract can_resend() bool[source]

Whether the pipe is able to resend a message.

abstract clear()[source]

Clear the pipe

abstract close()[source]

Close the pipe

Returns: None

export(export_mode: str) Tuple[str, dict][source]

Exports attributes.

Parameters:

export_mode (str) – export to peer (ExportMode.PEER) or to self (ExportMode.SELF).

Returns:

A tuple of (export section name, arguments to be exported)

get_last_peer_active_time()[source]

Get the last time that the peer is known to be active

Returns: the last time that the peer is known to be active; or 0 if this info is not available

abstract open(name: str)[source]

Open the pipe

Parameters:

name – name of the pipe

abstract receive(timeout=None) None | Message[source]

Try to receive message from peer.

Parameters:

timeout – how long (number of seconds) to try If not specified, return right away.

Returns:

the message received; or None if no message

abstract send(msg: Message, timeout=None) bool[source]

Sends the specified message to the peer.

Parameters:
  • msg – the message to be sent

  • timeout – if specified, number of secs to wait for the peer to read the message. If not specified, wait indefinitely.

Returns:

Whether the message is read by the peer.

class Topic[source]

Bases: object

ABORT = '_ABORT_'
END = '_END_'
HEARTBEAT = '_HEARTBEAT_'
PEER_GONE = '_PEER_GONE_'