Source code for nvflare.fuel.hci.server.binary_transfer

# Copyright (c) 2023, NVIDIA CORPORATION.  All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import os
import shutil

from nvflare.fuel.f3.streaming.file_downloader import ObjectDownloader, add_file
from nvflare.fuel.hci.conn import Connection
from nvflare.fuel.hci.proto import MetaKey, MetaStatusValue, make_meta
from nvflare.fuel.hci.server.constants import ConnProps
from nvflare.fuel.utils.log_utils import get_obj_logger


[docs] class BinaryTransfer: def __init__(self): self.logger = get_obj_logger(self)
[docs] @staticmethod def tx_path(conn: Connection, tx_id: str, folder_name=None): download_dir = conn.get_prop(ConnProps.DOWNLOAD_DIR) if not folder_name: return os.path.join(download_dir, tx_id) else: return os.path.join(download_dir, tx_id, folder_name)
[docs] def download_folder(self, conn: Connection, tx_id: str, folder_name: str): self.logger.debug(f"download_folder called for {folder_name}") tx_path = self.tx_path(conn, tx_id, folder_name) engine = conn.get_prop(ConnProps.ENGINE) admin = conn.get_prop(ConnProps.ADMIN_SERVER) timeout = admin.timeout if admin else 5 cell = engine.get_cell() source_fqcn = cell.get_fqcn() downloader = ObjectDownloader( num_receivers=1, cell=engine.get_cell(), timeout=timeout, transaction_done_cb=self._cleanup_tx, tx_path=tx_path, ) # return list of the files files = [] for dir_path, dir_names, file_names in os.walk(tx_path): for f in file_names: full_path = os.path.join(dir_path, f) ref_id = add_file(downloader, file_name=full_path) p = os.path.relpath(full_path, tx_path) files.append([p, ref_id]) self.logger.debug(f"files of the folder to download: {files}") if len(files) > 0: conn.append_string( "OK", meta=make_meta( MetaStatusValue.OK, extra={ MetaKey.SOURCE_FQCN: source_fqcn, MetaKey.FILES: files, MetaKey.TX_ID: tx_id, MetaKey.FOLDER_NAME: folder_name, }, ), ) else: conn.append_error( "No data to download", meta=make_meta( MetaStatusValue.ERROR, info="No data to download", ), )
def _cleanup_tx(self, tx_id: str, status, files, tx_path): """ Remove the job download folder """ shutil.rmtree(tx_path, ignore_errors=True) self.logger.debug(f"deleted download path: {tx_id=} {status=} {tx_path=} {files=}")