Source code for nvflare.fuel.f3.streaming.tools.file_sender

# Copyright (c) 2023, NVIDIA CORPORATION.  All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import sys
import threading
import time

from nvflare.fuel.f3.cellnet.core_cell import CellAgent, CoreCell
from nvflare.fuel.f3.message import Message
from nvflare.fuel.f3.stream_cell import StreamCell
from nvflare.fuel.f3.streaming.tools.utils import RX_CELL, TEST_CHANNEL, TEST_TOPIC, TX_CELL, setup_log


[docs]class FileSender: """Utility to send a file to another cell""" def __init__(self, url: str): core_cell = CoreCell(TX_CELL, url, secure=False, credentials={}) self.stream_cell = StreamCell(core_cell) core_cell.set_cell_connected_cb(self.cell_connected) core_cell.start() self.cell = core_cell self.ready = threading.Event()
[docs] def stop(self): self.cell.stop()
[docs] def wait(self): self.ready.wait()
[docs] def send(self, file_to_send: str): future = self.stream_cell.send_file(TEST_CHANNEL, TEST_TOPIC, RX_CELL, Message(None, file_to_send)) while True: if future.done(): break time.sleep(1) progress = future.get_progress() percent = progress * 100.0 / future.get_size() print(f"Sent {progress} bytes {percent:.2f}% done") size = future.result() print(f"Total {size} bytes sent for file {file_to_send}")
[docs] def cell_connected(self, agent: CellAgent): if agent.get_fqcn() == RX_CELL: self.ready.set()
if __name__ == "__main__": setup_log(logging.INFO) if len(sys.argv) != 3: print(f"Usage: {sys.argv[0]} connect_url file_name") sys.exit(1) connect_url = sys.argv[1] file_name = sys.argv[2] sender = FileSender(connect_url) print("Waiting for receiver to be online ...") sender.wait() print(f"Sending file {file_name} ...") start = time.time() sender.send(file_name) print(f"Time elapsed: {(time.time()-start):.3f} seconds") sender.stop() print("Done")