nvflare.fuel.f3.streaming.download_service module
- class Consumer[source]
Bases:
ABC- abstract consume(ref_id: str, state: dict, data: Any) dict[source]
Called to process the received data.
- Parameters:
ref_id – ref id of the object being downloaded
state – current state of downloading
data – data to be processed
Returns: new state to be sent back to the data owner.
- class DownloadService[source]
Bases:
object- classmethod add_object(transaction_id: str, obj: Downloadable, ref_id=None) str[source]
- classmethod get_transaction_info(transaction_id: str) TransactionInfo | None[source]
- class DownloadStatus[source]
Bases:
objectConstants for object download status.
- FAILED = 'failed'
- SUCCESS = 'success'
- class Downloadable(obj: Any)[source]
Bases:
ABC- downloaded_to_one(to_receiver: str, status: str)[source]
Called when an object is downloaded to a receiver.
- Parameters:
to_receiver – name of the receiver that the object has been completely downloaded to.
status – the download status: DownloadStatus.SUCCESS or DownloadStatus.FAILED.
Returns: None
- abstract produce(state: dict, requester: str) Tuple[str, Any, dict][source]
Produce a small object to be sent (on object sender side).
- Parameters:
state – current state of downloading, received from the downloading receiver
requester – the FQCN of the receiver that is downloading
Returns: a tuple of (return code, a small object to be sent, new state to be sent).
- release()[source]
Drop the infrastructure reference to the source object.
Called by _Transaction.transaction_done() AFTER the transaction_done_cb fires. Subclasses should override this to null their base_obj (or any other large reference) so the GC can reclaim the memory immediately. The default implementation is a no-op.
- set_transaction(tx_id: str, ref_id: str)[source]
This method is called when the object is added to a transaction. You can use this method to keep transaction ID and/or ref ID for your own purpose.
- Parameters:
tx_id – the ID of the transaction that the object has been added to.
ref_id – ref ID generated for the object.
Returns: None
- OBJ_DOWNLOADER_TOPIC = 'download_service__download'
This package provides a framework for building object downloading capability (file download, tensor download, etc.).
A large object takes a lot of memory space. Sending a large object in one message needs even more memory space since the object needs to be serialized into large number of bytes. Additional memory space may still be needed for the transport layer to send the message. If the message is to be sent to multiple endpoints, even more memory is needed.
Object Downloading can drastically reduce memory consumption: - Instead of sending the large object in one message, it is divided into many smaller objects; - Instead of pushing the message to the endpoints, each endpoint will come to request. This makes it more reliable when different endpoints have different speed.
Object Downloading works as follows: - The sender prepares the object(s) for downloading. It first creates a transaction to get a tx_id. It then adds each object (called Downloadable) to be downloaded to the transaction, and get a reference id (ref_id). - The sender sends the ref_id(s) to all recipients through a separate message. - Each recipient then calls the download_object function to download each referenced large object.
Note that the endpoint that received object refs may forward the refs to another endpoint, which then downloads the referenced object(s).
To develop the downloading capability for a type of object (e.g. a file, a tensor state dict, etc.), you need to provide the implementation of a Downloadable and a Consumer. - On the sending side, the Downloadable is responsible for producing the next small object to be sent (a chunk of bytes; a small subset of the large dict; etc.). - On the receiving side, the Consumer is responsible for processing the received small objects (writing the received bytes to a temp file; putting the received small dict to the end result; etc.).
One issue with object downloading is object life cycle management. Since the large objects to be downloaded are usually temporary, you need to remove them when they are downloaded by all receivers. But the problem is that you don’t know how quickly each receiver can finish downloading these large objects. When a transaction contains multiple objects to be downloaded, it’s even harder to know it.
There are two ways to handle this issue: object downloaded callback, and transaction timeout.
You can implement the downloaded_to_one method for the Downloadable object. This method is called when the object is downloaded to one receiver.
You can also implement the downloaded_to_all method for the Downloadable object. This method is called when the object is downloaded to all receivers.
Note that the downloaded_to_all method only works if you know how many receivers the object will be downloaded to!
You can always implement the transaction_done method for the Downloadable object. This method is called when the transaction is done for some reason (normal completion or timeout).
Transaction timeout is the amount of time after the last downloading activity on any object in the transaction from any receiver. For example, suppose you want to send 2 large files to 3 receivers, each time a download request is received on any file from any of the 3 receivers, the last activity time of the transaction is updated to now. If no downloading activity is received from any receiver on any objects in the transaction for the specified timeout, the transaction is considered “timed out”, and the transaction_done method is called for each Downloadable object added to the transaction.
Unlike with Object Streamer that the object owner pushes small objects to the recipients; with Object Downloader, each recipient pulls the data from the object owner.
- class ProduceRC[source]
Bases:
objectDefines return code for the Downloadable object’s ‘produce’ method.
- EOF = 'eof'
- ERROR = 'error'
- OK = 'ok'
- class TransactionDoneStatus[source]
Bases:
objectConstants for transaction completion status.
- DELETED = 'deleted'
- FINISHED = 'finished'
- TIMEOUT = 'timeout'
- class TransactionInfo(tx: _Transaction)[source]
Bases:
objectThis structure contains public info of a transaction: timeout value of the transaction; number of receivers that objects in the transaction will be downloaded to. 0 means unknown. objects that are added to the transaction.
- download_object(from_fqcn: str, ref_id: str, per_request_timeout: float, cell: Cell, consumer: Consumer, secure=False, optional=False, abort_signal: Signal | None = None, max_retries: int = 3)[source]
Download a large object from the object owner.
- Parameters:
from_fqcn – the FQCN of the object owner
ref_id – reference id of the object to be downloaded
per_request_timeout – timeout for each request to the object owner.
cell – the cell to be used for communication with the object owner.
consumer – the Consumer object used for processing received data
secure – use P2P private communication with the data owner
optional – suppress log messages
abort_signal – for signaling abort
max_retries – max number of retries per request on TIMEOUT (default 3). Resending the same state causes the producer to re-generate the same chunk, so retry is data-safe. Note: CacheableObject’s _adjust_cache may run twice for the same state on retry, which can prematurely evict cache entries in multi-receiver scenarios but does not affect data correctness.
Returns: None