nvflare.fuel.f3.streaming.cacheable module

class CacheableObject(obj: Any, max_chunk_size: int)[source]

Bases: Downloadable

This class provides cache capability for managing chunks generated during streaming. When the object is to be sent to multiple receivers, each chunk is generated only once and cached for other receivers. Once all receivers received the chunk, it’s removed from the cache.

Constructor of CacheableObject.

Parameters:
  • obj – the object to be downloaded.

  • max_chunk_size – max number of bytes for each chunk.

Notes: The object must be able to be divided into multiple items. A chunk is generated for each item.

clear_cache()[source]

Clear the chunk cache only.

Does NOT touch base_obj — the source object is released separately via release() after the transaction_done_cb has been invoked, so the callback can still observe the original data if needed.

downloaded_to_all()[source]

Called when the object is fully downloaded to all receivers.

abstract get_item_count() int[source]

The subclass must implement this method to return the number of items the object contains.

Returns: the number of items the object contains

produce(state: dict, requester: str) Tuple[str, Any, dict][source]

Produce a small object to be sent (on object sender side).

Parameters:
  • state – current state of downloading, received from the downloading receiver

  • requester – the FQCN of the receiver that is downloading

Returns: a tuple of (return code, a small object to be sent, new state to be sent).

abstract produce_item(index: int) bytes[source]

This method is called to produce the chunk for the specified item.

Parameters:

index – index of the item.

Returns: a chunk for the item

release()[source]

Drop the reference to the source object.

Called by _Transaction.transaction_done() AFTER the transaction_done_cb fires. Setting base_obj to None drops the last infrastructure reference to the source data (e.g. a 5 GiB numpy dict), allowing it to be reclaimed by the GC immediately rather than waiting for a future cycle.

Overrides Downloadable.release() (which is a no-op by default).

set_transaction(tx_id, ref_id)[source]

This method is called when the object is added to a transaction. You can use this method to keep transaction ID and/or ref ID for your own purpose.

Parameters:
  • tx_id – the ID of the transaction that the object has been added to.

  • ref_id – ref ID generated for the object.

Returns: None

transaction_done(transaction_id: str, status: str)[source]

Called when the transaction is finished.

Parameters:
  • transaction_id – ID of the transaction.

  • status – completion status, a value defined in TransactionDoneStatus.

Returns: None

class ItemConsumer[source]

Bases: Consumer

consume(ref_id: str, state: dict, data: Any) dict[source]

Called to process the received data.

Parameters:
  • ref_id – ref id of the object being downloaded

  • state – current state of downloading

  • data – data to be processed

Returns: new state to be sent back to the data owner.

abstract consume_items(items: List[Any], result: Any) Any[source]

Process items and return updated result.

download_completed(ref_id: str)[source]

Called when the downloading is finished successfully.

Parameters:

ref_id – ref id of the object being downloaded

Returns: None

download_failed(ref_id, reason: str)[source]

Called when the downloading is finished unsuccessfully.

Parameters:
  • ref_id – ref id of the object being downloaded

  • reason – explain the reason of failure

Returns: None