nvflare.fuel.utils.pipe.pipe module

class Message(msg_type: str, topic: str, data: Any, msg_id=None, req_id=None)[source]

Bases: object

REPLY = 'REP'
REQUEST = 'REQ'
static new_reply(topic: str, data: Any, req_msg_id, msg_id=None)[source]

Creates a new reply message in response to a request.

This static method creates a new Message object representing a reply to a previous request.

Parameters:
  • topic (str) – The topic of the reply message. This is a string that identifies the subject or category of the reply.

  • data (Any) – The data associated with the reply message. This can be any type of data relevant to the reply.

  • req_msg_id (int) – The identifier of the request message that this reply is responding to. This ID links the reply to the original request.

  • msg_id (Optional[int]) – An optional identifier for the reply message. If provided, this ID is used to uniquely identify the message. If not provided, a UUID will be generated to uniquely identify the message.

Returns:

A Message object with the type set to Message.REPLY,

and the provided topic, data, msg_id, and req_msg_id.

Return type:

Message

static new_request(topic: str, data: Any, msg_id=None)[source]

Creates a new request message.

This static method creates a new Message object representing a request.

Parameters:
  • topic (str) – The topic of the request message. This is a string that identifies the subject or category of the request.

  • data (Any) – The data associated with the request message. This can be any type of data that is relevant to the request.

  • msg_id (Optional[Any]) – An optional identifier for the message. If provided, this ID is used to uniquely identify the message. If not provided, a UUID will be generated to uniquely identify the message.

Returns:

A Message object with the type set to Message.REQUEST,

and the provided topic, data, and msg_id.

Return type:

Message

class Pipe(mode: Mode)[source]

Bases: AttributesExportable, ABC

Creates the pipe.

Parameters:

mode (Mode) – Mode of the endpoint. A pipe has two endpoints. An endpoint can be either the one that initiates communication or the one listening.

abstract can_resend() bool[source]

Whether the pipe is able to resend a message.

abstract clear()[source]

Clear the pipe

abstract close()[source]

Close the pipe

Returns: None

export(export_mode: str) Tuple[str, dict][source]

Exports attributes.

Parameters:

export_mode (str) – export to peer (ExportMode.PEER) or to self (ExportMode.SELF).

Returns:

A tuple of (export section name, arguments to be exported)

get_last_peer_active_time()[source]

Get the last time that the peer is known to be active

Returns: the last time that the peer is known to be active; or 0 if this info is not available

abstract open(name: str)[source]

Open the pipe

Parameters:

name – name of the pipe

abstract receive(timeout=None) None | Message[source]

Try to receive message from peer.

Parameters:

timeout – how long (number of seconds) to try If not specified, return right away.

Returns:

the message received; or None if no message

release_send_cache(msg: Message)[source]

Release any per-message state cached by send().

Called by PipeHandler after the retry loop exits (success, max retries, or stop) to allow pipe implementations to free resources that were attached to the message during send(). The default is a no-op; pipes that cache state (e.g. CellPipe caches the serialized CellMessage) must override this method.

Parameters:

msg – the message that was being sent.

abstract send(msg: Message, timeout=None) bool[source]

Sends the specified message to the peer.

Parameters:
  • msg – the message to be sent

  • timeout – if specified, number of secs to wait for the peer to read the message. If not specified, wait indefinitely.

Returns:

Whether the message is read by the peer.

class Topic[source]

Bases: object

ABORT = '_ABORT_'
END = '_END_'
HEARTBEAT = '_HEARTBEAT_'
PEER_GONE = '_PEER_GONE_'