nvflare.fuel.f3.streaming.download_service module

class Consumer[source]

Bases: ABC

abstract consume(ref_id: str, state: dict, data: Any) dict[source]

Called to process the received data.

Parameters:
  • ref_id – ref id of the object being downloaded

  • state – current state of downloading

  • data – data to be processed

Returns: new state to be sent back to the data owner.

abstract download_completed(ref_id: str)[source]

Called when the downloading is finished successfully.

Parameters:

ref_id – ref id of the object being downloaded

Returns: None

abstract download_failed(ref_id: str, reason: str)[source]

Called when the downloading is finished unsuccessfully.

Parameters:
  • ref_id – ref id of the object being downloaded

  • reason – explain the reason of failure

Returns: None

class DownloadService[source]

Bases: object

classmethod add_object(transaction_id: str, obj: Downloadable, ref_id=None) str[source]
classmethod delete_transaction(transaction_id: str)[source]
classmethod get_transaction_id(ref_id: str) str | None[source]
classmethod get_transaction_info(transaction_id: str) TransactionInfo | None[source]
classmethod new_transaction(cell: Cell, timeout: float, num_receivers: int = 0, tx_id=None, transaction_done_cb=None, **cb_kwargs)[source]
classmethod shutdown()[source]

Shutdown and clean up resources.

Returns: None

class DownloadStatus[source]

Bases: object

Constants for object download status.

FAILED = 'failed'
SUCCESS = 'success'
class Downloadable(obj: Any)[source]

Bases: ABC

downloaded_to_all()[source]

Called when the object is fully downloaded to all receivers.

downloaded_to_one(to_receiver: str, status: str)[source]

Called when an object is downloaded to a receiver.

Parameters:
  • to_receiver – name of the receiver that the object has been completely downloaded to.

  • status – the download status: DownloadStatus.SUCCESS or DownloadStatus.FAILED.

Returns: None

abstract produce(state: dict, requester: str) Tuple[str, Any, dict][source]

Produce a small object to be sent (on object sender side).

Parameters:
  • state – current state of downloading, received from the downloading receiver

  • requester – the FQCN of the receiver that is downloading

Returns: a tuple of (return code, a small object to be sent, new state to be sent).

release()[source]

Drop the infrastructure reference to the source object.

Called by _Transaction.transaction_done() AFTER the transaction_done_cb fires. Subclasses should override this to null their base_obj (or any other large reference) so the GC can reclaim the memory immediately. The default implementation is a no-op.

set_transaction(tx_id: str, ref_id: str)[source]

This method is called when the object is added to a transaction. You can use this method to keep transaction ID and/or ref ID for your own purpose.

Parameters:
  • tx_id – the ID of the transaction that the object has been added to.

  • ref_id – ref ID generated for the object.

Returns: None

transaction_done(transaction_id: str, status: str)[source]

Called when the transaction is finished.

Parameters:
  • transaction_id – ID of the transaction.

  • status – completion status, a value defined in TransactionDoneStatus.

Returns: None

OBJ_DOWNLOADER_TOPIC = 'download_service__download'

This package provides a framework for building object downloading capability (file download, tensor download, etc.).

A large object takes a lot of memory space. Sending a large object in one message needs even more memory space since the object needs to be serialized into large number of bytes. Additional memory space may still be needed for the transport layer to send the message. If the message is to be sent to multiple endpoints, even more memory is needed.

Object Downloading can drastically reduce memory consumption: - Instead of sending the large object in one message, it is divided into many smaller objects; - Instead of pushing the message to the endpoints, each endpoint will come to request. This makes it more reliable when different endpoints have different speed.

Object Downloading works as follows: - The sender prepares the object(s) for downloading. It first creates a transaction to get a tx_id. It then adds each object (called Downloadable) to be downloaded to the transaction, and get a reference id (ref_id). - The sender sends the ref_id(s) to all recipients through a separate message. - Each recipient then calls the download_object function to download each referenced large object.

Note that the endpoint that received object refs may forward the refs to another endpoint, which then downloads the referenced object(s).

To develop the downloading capability for a type of object (e.g. a file, a tensor state dict, etc.), you need to provide the implementation of a Downloadable and a Consumer. - On the sending side, the Downloadable is responsible for producing the next small object to be sent (a chunk of bytes; a small subset of the large dict; etc.). - On the receiving side, the Consumer is responsible for processing the received small objects (writing the received bytes to a temp file; putting the received small dict to the end result; etc.).

One issue with object downloading is object life cycle management. Since the large objects to be downloaded are usually temporary, you need to remove them when they are downloaded by all receivers. But the problem is that you don’t know how quickly each receiver can finish downloading these large objects. When a transaction contains multiple objects to be downloaded, it’s even harder to know it.

There are two ways to handle this issue: object downloaded callback, and transaction timeout.

You can implement the downloaded_to_one method for the Downloadable object. This method is called when the object is downloaded to one receiver.

You can also implement the downloaded_to_all method for the Downloadable object. This method is called when the object is downloaded to all receivers.

Note that the downloaded_to_all method only works if you know how many receivers the object will be downloaded to!

You can always implement the transaction_done method for the Downloadable object. This method is called when the transaction is done for some reason (normal completion or timeout).

Transaction timeout is the amount of time after the last downloading activity on any object in the transaction from any receiver. For example, suppose you want to send 2 large files to 3 receivers, each time a download request is received on any file from any of the 3 receivers, the last activity time of the transaction is updated to now. If no downloading activity is received from any receiver on any objects in the transaction for the specified timeout, the transaction is considered “timed out”, and the transaction_done method is called for each Downloadable object added to the transaction.

Unlike with Object Streamer that the object owner pushes small objects to the recipients; with Object Downloader, each recipient pulls the data from the object owner.

class ProduceRC[source]

Bases: object

Defines return code for the Downloadable object’s ‘produce’ method.

EOF = 'eof'
ERROR = 'error'
OK = 'ok'
class TransactionDoneStatus[source]

Bases: object

Constants for transaction completion status.

DELETED = 'deleted'
FINISHED = 'finished'
TIMEOUT = 'timeout'
class TransactionInfo(tx: _Transaction)[source]

Bases: object

This structure contains public info of a transaction: timeout value of the transaction; number of receivers that objects in the transaction will be downloaded to. 0 means unknown. objects that are added to the transaction.

download_object(from_fqcn: str, ref_id: str, per_request_timeout: float, cell: Cell, consumer: Consumer, secure=False, optional=False, abort_signal: Signal | None = None, max_retries: int = 3)[source]

Download a large object from the object owner.

Parameters:
  • from_fqcn – the FQCN of the object owner

  • ref_id – reference id of the object to be downloaded

  • per_request_timeout – timeout for each request to the object owner.

  • cell – the cell to be used for communication with the object owner.

  • consumer – the Consumer object used for processing received data

  • secure – use P2P private communication with the data owner

  • optional – suppress log messages

  • abort_signal – for signaling abort

  • max_retries – max number of retries per request on TIMEOUT (default 3). Resending the same state causes the producer to re-generate the same chunk, so retry is data-safe. Note: CacheableObject’s _adjust_cache may run twice for the same state on retry, which can prematurely evict cache entries in multi-receiver scenarios but does not affect data correctness.

Returns: None