Source code for nvflare.fuel.utils.pipe.pipe

# Copyright (c) 2021, NVIDIA CORPORATION.  All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import re
import uuid
from abc import ABC, abstractmethod
from typing import Any, Tuple, Union

from nvflare.fuel.utils.attributes_exportable import AttributesExportable, ExportMode
from nvflare.fuel.utils.constants import Mode
from nvflare.fuel.utils.validation_utils import check_str


[docs]class Topic(object): ABORT = "_ABORT_" END = "_END_" HEARTBEAT = "_HEARTBEAT_" PEER_GONE = "_PEER_GONE_"
[docs]class Message: REQUEST = "REQ" REPLY = "REP" def __init__(self, msg_type: str, topic: str, data: Any, msg_id=None, req_id=None): check_str("msg_type", msg_type) if msg_type not in [Message.REPLY, Message.REQUEST]: raise ValueError(f"invalid note_type '{msg_type}': must be one of {[Message.REPLY, Message.REQUEST]}") self.msg_type = msg_type check_str("topic", topic) if not topic: raise ValueError("topic must not be empty") if not re.match("[a-zA-Z0-9_]+$", topic): raise ValueError("topic contains invalid char - only alphanumeric and underscore are allowed") self.topic = topic if not msg_id: msg_id = str(uuid.uuid4()) self.data = data self.msg_id = msg_id self.req_id = req_id self.sent_time = None self.received_time = None
[docs] @staticmethod def new_request(topic: str, data: Any, msg_id=None): return Message(Message.REQUEST, topic, data, msg_id)
[docs] @staticmethod def new_reply(topic: str, data: Any, req_msg_id, msg_id=None): return Message(Message.REPLY, topic, data, msg_id, req_id=req_msg_id)
def __str__(self): return f"Message(topic={self.topic}, msg_id={self.msg_id}, req_id={self.req_id}, msg_type={self.msg_type})"
[docs]class Pipe(AttributesExportable, ABC): def __init__(self, mode: Mode): """Creates the pipe. Args: mode (Mode): Mode of the endpoint. A pipe has two endpoints. An endpoint can be either the one that initiates communication or the one listening. """ if mode != Mode.ACTIVE and mode != Mode.PASSIVE: raise ValueError(f"mode must be '{Mode.ACTIVE}' or '{Mode.PASSIVE}' but got {mode}") self.mode = mode
[docs] @abstractmethod def open(self, name: str): """Open the pipe Args: name: name of the pipe """ pass
[docs] @abstractmethod def clear(self): """Clear the pipe""" pass
[docs] @abstractmethod def send(self, msg: Message, timeout=None) -> bool: """Sends the specified message to the peer. Args: msg: the message to be sent timeout: if specified, number of secs to wait for the peer to read the message. If not specified, wait indefinitely. Returns: Whether the message is read by the peer. """ pass
[docs] @abstractmethod def receive(self, timeout=None) -> Union[None, Message]: """Try to receive message from peer. Args: timeout: how long (number of seconds) to try If not specified, return right away. Returns: the message received; or None if no message """ pass
[docs] @abstractmethod def close(self): """Close the pipe Returns: None """ pass
[docs] @abstractmethod def can_resend(self) -> bool: """Whether the pipe is able to resend a message.""" pass
[docs] def get_last_peer_active_time(self): """Get the last time that the peer is known to be active Returns: the last time that the peer is known to be active; or 0 if this info is not available """ return 0
[docs] def export(self, export_mode: str) -> Tuple[str, dict]: if export_mode == ExportMode.SELF: mode = self.mode else: mode = Mode.ACTIVE if self.mode == Mode.PASSIVE else Mode.PASSIVE return f"{self.__module__}.{self.__class__.__name__}", {"mode": mode}