Source code for nvflare.fuel.utils.pipe.file_pipe

# Copyright (c) 2021, NVIDIA CORPORATION.  All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import os
import shutil
import time
from typing import Tuple

from nvflare.fuel.utils.attributes_exportable import ExportMode
from nvflare.fuel.utils.constants import Mode
from nvflare.fuel.utils.pipe.file_accessor import FileAccessor
from nvflare.fuel.utils.pipe.file_name_utils import file_name_to_message, message_to_file_name
from nvflare.fuel.utils.pipe.fobs_file_accessor import FobsFileAccessor
from nvflare.fuel.utils.pipe.pipe import Message, Pipe, Topic
from nvflare.fuel.utils.validation_utils import check_object_type, check_positive_number, check_str


[docs]class FilePipe(Pipe): def __init__(self, mode: Mode, root_path: str, file_check_interval=0.1): """Implementation of communication through the file system. Args: mode (Mode): Mode of the endpoint. A pipe has two endpoints. An endpoint can be either the one that initiates communication or the one listening. root_path (str): root path for this file pipe, folders and files will be created under this root_path for communication. file_check_interval (float): how often should to check the file exists. """ super().__init__(mode=mode) check_positive_number("file_check_interval", file_check_interval) check_str("root_path", root_path) self._remove_root = False if not os.path.exists(root_path): try: # create the root path os.makedirs(root_path) self._remove_root = True except Exception: pass self.root_path = root_path self.file_check_interval = file_check_interval self.pipe_path = None self.x_path = None self.y_path = None self.t_path = None if self.mode == Mode.ACTIVE: self.get_f = self.x_get self.put_f = self.x_put elif self.mode == Mode.PASSIVE: self.get_f = self.y_get self.put_f = self.y_put self.accessor = FobsFileAccessor() # default
[docs] def set_file_accessor(self, accessor: FileAccessor): """Sets the file accessor to be used by the pipe. The default file accessor is FobsFileAccessor. Args: accessor: the accessor to be used. """ check_object_type("accessor", accessor, FileAccessor) self.accessor = accessor
@staticmethod def _make_dir(path): try: os.mkdir(path) except FileExistsError: # this is okay pass
[docs] def open(self, name: str): if not self.accessor: raise RuntimeError("File accessor is not set. Make sure to set a FileAccessor before opening the pipe") pipe_path = os.path.join(self.root_path, name) if not os.path.exists(pipe_path): self._make_dir(pipe_path) x_path = os.path.join(pipe_path, "x") if not os.path.exists(x_path): self._make_dir(x_path) y_path = os.path.join(pipe_path, "y") if not os.path.exists(y_path): self._make_dir(y_path) t_path = os.path.join(pipe_path, "t") if not os.path.exists(t_path): self._make_dir(t_path) self.pipe_path = pipe_path self.x_path = x_path self.y_path = y_path self.t_path = t_path
@staticmethod def _clear_dir(p: str): file_list = os.listdir(p) if file_list: for f in file_list: try: os.remove(os.path.join(p, f)) except FileNotFoundError: pass def _create_file(self, to_dir: str, msg: Message) -> str: file_name = message_to_file_name(msg) file_path = os.path.join(to_dir, file_name) tmp_path = os.path.join(self.t_path, file_name) if not self.pipe_path: raise BrokenPipeError("pipe broken") try: self.accessor.write(msg.data, tmp_path) os.rename(tmp_path, file_path) except FileNotFoundError: raise BrokenPipeError("pipe closed") return file_path
[docs] def clear(self): self._clear_dir(self.x_path) self._clear_dir(self.y_path) self._clear_dir(self.t_path)
def _monitor_file(self, file_path: str, timeout=None) -> bool: """Monitors the file until it's read-and-removed by peer, or timed out. Args: file_path: the path to be monitored timeout: how long to wait for timeout Returns: whether the file has been read and removed """ start = time.time() while True: if not self.pipe_path: raise BrokenPipeError("pipe broken") if not os.path.exists(file_path): return True if timeout and time.time() - start > timeout: # timed out - try to delete the file try: os.remove(file_path) except FileNotFoundError: # the file is read by the peer! return True return False time.sleep(self.file_check_interval)
[docs] def x_put(self, msg: Message, timeout) -> bool: """ Args: msg: timeout: Returns: whether file is read by the peer """ # put it in Y's queue file_path = self._create_file(self.y_path, msg) return self._monitor_file(file_path, timeout)
def _read_file(self, file_path: str): # since reading file may take time and another process may try to delete the file # we move the file to a temp name before reading it file_name = os.path.basename(file_path) msg = file_name_to_message(file_name) tmp_path = os.path.join(self.t_path, file_name) try: create_time = os.path.getctime(file_path) os.rename(file_path, tmp_path) data = self.accessor.read(tmp_path) if os.path.isfile(tmp_path): os.remove(tmp_path) # remove this file elif os.path.isdir(tmp_path): shutil.rmtree(tmp_path) else: raise RuntimeError(f"cannot remove unsupported path: '{tmp_path}'") msg.data = data msg.sent_time = create_time msg.received_time = time.time() return msg except FileNotFoundError: raise BrokenPipeError("pipe closed") def _get_next(self, from_dir: str): try: files = os.listdir(from_dir) except Exception: raise BrokenPipeError(f"error reading from {from_dir}") if files: files = [os.path.join(from_dir, f) for f in files] files.sort(key=os.path.getmtime, reverse=False) file_path = files[0] return self._read_file(file_path) else: return None def _get_from_dir(self, from_dir: str, timeout=None): if not timeout or timeout <= 0: return self._get_next(from_dir) start = time.time() while True: msg = self._get_next(from_dir) if msg: return msg if time.time() - start >= timeout: break time.sleep(self.file_check_interval) return None
[docs] def x_get(self, timeout=None): # read from X's queue return self._get_from_dir(self.x_path, timeout)
[docs] def y_put(self, msg: Message, timeout) -> bool: # put it in X's queue file_path = self._create_file(self.x_path, msg) return self._monitor_file(file_path, timeout)
[docs] def y_get(self, timeout=None): # read from Y's queue return self._get_from_dir(self.y_path, timeout)
[docs] def send(self, msg: Message, timeout=None) -> bool: """Sends the specified message to the peer. Args: msg: the message to be sent timeout: if specified, number of secs to wait for the peer to read the message. If not specified, wait indefinitely. Returns: Whether the message is read by the peer. """ if not self.pipe_path: raise BrokenPipeError("pipe is not open") if not timeout and msg.topic in [Topic.END, Topic.ABORT, Topic.HEARTBEAT]: timeout = 5.0 return self.put_f(msg, timeout)
[docs] def receive(self, timeout=None): if not self.pipe_path: raise BrokenPipeError("pipe is not open") return self.get_f(timeout)
[docs] def close(self): pipe_path = self.pipe_path self.pipe_path = None if self.mode == Mode.PASSIVE: if pipe_path and os.path.exists(pipe_path): shutil.rmtree(pipe_path, ignore_errors=True) if self._remove_root and os.path.exists(self.root_path): shutil.rmtree(self.root_path, ignore_errors=True)
[docs] def can_resend(self) -> bool: return False
[docs] def export(self, export_mode: str) -> Tuple[str, dict]: if export_mode == ExportMode.SELF: mode = self.mode else: mode = Mode.ACTIVE if self.mode == Mode.PASSIVE else Mode.PASSIVE export_args = {"mode": mode, "root_path": self.root_path} return f"{self.__module__}.{self.__class__.__name__}", export_args