nvflare.fuel.utils.pipe.pipe module¶
- class Message(msg_type: str, topic: str, data: Any, msg_id=None, req_id=None)[source]¶
Bases:
object
- REPLY = 'REP'¶
- REQUEST = 'REQ'¶
- static new_reply(topic: str, data: Any, req_msg_id, msg_id=None)[source]¶
Creates a new reply message in response to a request.
This static method creates a new Message object representing a reply to a previous request.
- Parameters:
topic (str) – The topic of the reply message. This is a string that identifies the subject or category of the reply.
data (Any) – The data associated with the reply message. This can be any type of data relevant to the reply.
req_msg_id (int) – The identifier of the request message that this reply is responding to. This ID links the reply to the original request.
msg_id (Optional[int]) – An optional identifier for the reply message. If provided, this ID is used to uniquely identify the message. If not provided, a UUID will be generated to uniquely identify the message.
- Returns:
- A Message object with the type set to Message.REPLY,
and the provided topic, data, msg_id, and req_msg_id.
- Return type:
- static new_request(topic: str, data: Any, msg_id=None)[source]¶
Creates a new request message.
This static method creates a new Message object representing a request.
- Parameters:
topic (str) – The topic of the request message. This is a string that identifies the subject or category of the request.
data (Any) – The data associated with the request message. This can be any type of data that is relevant to the request.
msg_id (Optional[Any]) – An optional identifier for the message. If provided, this ID is used to uniquely identify the message. If not provided, a UUID will be generated to uniquely identify the message.
- Returns:
- A Message object with the type set to Message.REQUEST,
and the provided topic, data, and msg_id.
- Return type:
- class Pipe(mode: Mode)[source]¶
Bases:
AttributesExportable
,ABC
Creates the pipe.
- Parameters:
mode (Mode) – Mode of the endpoint. A pipe has two endpoints. An endpoint can be either the one that initiates communication or the one listening.
- export(export_mode: str) Tuple[str, dict] [source]¶
Exports attributes.
- Parameters:
export_mode (str) – export to peer (ExportMode.PEER) or to self (ExportMode.SELF).
- Returns:
A tuple of (export section name, arguments to be exported)
- get_last_peer_active_time()[source]¶
Get the last time that the peer is known to be active
Returns: the last time that the peer is known to be active; or 0 if this info is not available
- abstract receive(timeout=None) None | Message [source]¶
Try to receive message from peer.
- Parameters:
timeout – how long (number of seconds) to try If not specified, return right away.
- Returns:
the message received; or None if no message
- abstract send(msg: Message, timeout=None) bool [source]¶
Sends the specified message to the peer.
- Parameters:
msg – the message to be sent
timeout – if specified, number of secs to wait for the peer to read the message. If not specified, wait indefinitely.
- Returns:
Whether the message is read by the peer.