nvflare.fuel.f3.streaming.obj_downloader module

class Consumer[source]

Bases: ABC

abstract consume(ref_id: str, state: dict, data: Any) dict[source]

Called to process the received data.

Parameters:
  • ref_id – ref id of the object being downloaded

  • state – current state of downloading

  • data – data to be processed

Returns: new state to be sent back to the data owner.

abstract download_completed(ref_id: str)[source]

Called when the downloading is finished successfully.

Parameters:

ref_id – ref id of the object being downloaded

Returns: None

abstract download_failed(ref_id: str, reason: str)[source]

Called when the downloading is finished unsuccessfully.

Parameters:
  • ref_id – ref id of the object being downloaded

  • reason – explain the reason of failure

Returns: None

class DownloadStatus[source]

Bases: object

FAILED = 'failed'
SUCCESS = 'success'
OBJ_DOWNLOADER_TOPIC = 'obj_downloader__download'

This package provides a framework for building object downloading capability (e.g. file download).

A large object takes a lot of memory space. Sending a large object in one message needs even more memory space since the object needs to be serialized into large number of bytes. Additional memory space may still be needed for the transport layer to send the message. If the message is to be sent to multiple endpoints, even more memory is needed.

Object Downloading can drastically reduce memory consumption: - Instead of sending the large object in one message, it is divided into many smaller objects; - Instead of pushing the message to the endpoints, each endpoint will come to request. This makes it more reliable when different endpoints have different speed.

Object Downloading works as follows: - The sender prepares the object(s) for downloading. It first creates a transaction to get a tx_id. It then adds each object to be downloaded to the transaction, and get a reference id (ref_id). - The sender sends the ref_id(s) to all recipients through a separate message. - Each recipient then calls the download_object function to download each referenced large object.

To develop the downloading capability for a type of object (e.g. a file, a large dict, etc.), you need to provide the implementation of a Producer and a Consumer. - On the sending side, the Producer is responsible for producing the next small object to be sent (a chunk of bytes; a small subset of the large dict; etc.). - On the receiving side, the Consumer is responsible for processing the received small objects (writing the received bytes to a temp file; putting the received small dict to the end result; etc.).

One issue with object downloading is object life cycle management. Since the large objects to be downloaded are usually temporary, you need to remove them when they are downloaded by all sites. But the problem is that you don’t know how quickly each site can finish downloading these large objects. When a transaction contains multiple objects to be downloaded, it’s even harder to know it.

There are two ways to handle this issue: object downloaded callback, and transaction timeout.

You can register an object_downloaded CB when adding an object to transaction. When the object is fully downloaded to a site, this CB will be called. The obj_downloaded CB must follow this signature:

downloaded_cb(ref_id: str, to_site: str, status: str, obj: Any, **cb_kwargs)

where ref_id is the reference id of the object; to_site is the FQCN of the site that has just finished downloading; status is the status of downloading, as defined in DownloadStatus class; obj is the large object that was just downloaded; cb_kwargs are the kw args registered with the CB.

Transaction timeout is the amount of time after the last downloading activity on any object in the transaction from any site. For example, suppose you want to send 2 large files to 3 sites, each time a download request is received on any file from any of the 3 sites, the last activity time of the transaction is updated to now. If no downloading activity is received from any site on any objects in the transaction for the specified timeout, the transaction is considered “timed out”, and the timeout callback registered with the transaction is called. The transaction timeout CB must follow this signature:

timeout_cb(tx_id: str, objs: List[Any], **cb_kwargs)

where tx_id is the ID of the transaction; objs is the list of large objects registered with the transaction; cb_kwargs are the kw args registered with the CB.

You may need to use both mechanisms to fully take care of object life cycles. The object downloaded CB may never be called since the site somehow didn’t finish the downloading. In reality the timeout mechanism may be sufficient.

Unlike with Object Streamer that the object owner pushes small objects to the recipients; with Object Downloader, each recipient pulls the data from the object owner.

class ObjDownloader[source]

Bases: object

classmethod add_download_object(transaction_id: str, obj: Any, ref_id=None, obj_downloaded_cb=None, **cb_kwargs) str[source]
classmethod delete_transaction(transaction_id: str, call_cb=False)[source]
classmethod new_transaction(cell: Cell, producer: Producer, timeout: float, tx_id=None, timeout_cb=None, **cb_kwargs)[source]
classmethod shutdown()[source]

Shutdown and clean up resources.

Returns: None

class ProduceRC[source]

Bases: object

Defines return code for the Producer’s produce method.

EOF = 'eof'
ERROR = 'error'
OK = 'ok'
class Producer[source]

Bases: ABC

abstract produce(ref_id: str, obj: ~typing.Any, state: dict, requester: str) -> (<class 'str'>, typing.Any, <class 'dict'>)[source]

Produce a small object to be sent (on object sender side).

Parameters:
  • ref_id – the ref id of the object being downloaded

  • obj – the large object

  • state – current state of downloading, received from the downloading site

  • requester – the FQCN of the site that is downloading

Returns: a tuple of (return code, a small object to be sent, new state to be sent).

download_object(from_fqcn: str, ref_id: str, per_request_timeout: float, cell: Cell, consumer: Consumer, secure=False, optional=False, abort_signal: Signal | None = None)[source]

Download a large object from the object owner.

Parameters:
  • from_fqcn – the FQCN of the object owner

  • ref_id – reference id of the object to be downloaded

  • per_request_timeout – timeout for each request to the object owner.

  • cell – the cell to be used for communication withe object owner.

  • consumer – the Consumer object used for processing received data

  • secure – use P2P private communication with the data owner

  • optional – supress log messages

  • abort_signal – for signaling abort

Returns: None