nvflare.fuel.utils.pipe.pipe module

class Message(msg_type: str, topic: str, data: Any, msg_id=None, req_id=None)[source]

Bases: object

REPLY = 'REP'
REQUEST = 'REQ'
static new_reply(topic: str, data: Any, req_msg_id, msg_id=None)[source]

Creates a new reply message in response to a request.

This static method creates a new Message object representing a reply to a previous request.

Parameters:
  • topic (str) – The topic of the reply message. This is a string that identifies the subject or category of the reply.

  • data (Any) – The data associated with the reply message. This can be any type of data relevant to the reply.

  • req_msg_id (int) – The identifier of the request message that this reply is responding to. This ID links the reply to the original request.

  • msg_id (Optional[int]) – An optional identifier for the reply message. If provided, this ID is used to uniquely identify the message. If not provided, a UUID will be generated to uniquely identify the message.

Returns:

A Message object with the type set to Message.REPLY,

and the provided topic, data, msg_id, and req_msg_id.

Return type:

Message

static new_request(topic: str, data: Any, msg_id=None)[source]

Creates a new request message.

This static method creates a new Message object representing a request.

Parameters:
  • topic (str) – The topic of the request message. This is a string that identifies the subject or category of the request.

  • data (Any) – The data associated with the request message. This can be any type of data that is relevant to the request.

  • msg_id (Optional[Any]) – An optional identifier for the message. If provided, this ID is used to uniquely identify the message. If not provided, a UUID will be generated to uniquely identify the message.

Returns:

A Message object with the type set to Message.REQUEST,

and the provided topic, data, and msg_id.

Return type:

Message

class Pipe(mode: Mode)[source]

Bases: AttributesExportable, ABC

Creates the pipe.

Parameters:

mode (Mode) – Mode of the endpoint. A pipe has two endpoints. An endpoint can be either the one that initiates communication or the one listening.

abstract can_resend() bool[source]

Whether the pipe is able to resend a message.

abstract clear()[source]

Clear the pipe

abstract close()[source]

Close the pipe

Returns: None

export(export_mode: str) Tuple[str, dict][source]

Exports attributes.

Parameters:

export_mode (str) – export to peer (ExportMode.PEER) or to self (ExportMode.SELF).

Returns:

A tuple of (export section name, arguments to be exported)

get_last_peer_active_time()[source]

Get the last time that the peer is known to be active

Returns: the last time that the peer is known to be active; or 0 if this info is not available

abstract open(name: str)[source]

Open the pipe

Parameters:

name – name of the pipe

abstract receive(timeout=None) None | Message[source]

Try to receive message from peer.

Parameters:

timeout – how long (number of seconds) to try If not specified, return right away.

Returns:

the message received; or None if no message

abstract send(msg: Message, timeout=None) bool[source]

Sends the specified message to the peer.

Parameters:
  • msg – the message to be sent

  • timeout – if specified, number of secs to wait for the peer to read the message. If not specified, wait indefinitely.

Returns:

Whether the message is read by the peer.

class Topic[source]

Bases: object

ABORT = '_ABORT_'
END = '_END_'
HEARTBEAT = '_HEARTBEAT_'
PEER_GONE = '_PEER_GONE_'