nvflare.fuel.f3.streaming.cacheable module
- class CacheableObject(obj: Any, max_chunk_size: int)[source]
Bases:
DownloadableThis class provides cache capability for managing chunks generated during streaming. When the object is to be sent to multiple receivers, each chunk is generated only once and cached for other receivers. Once all receivers received the chunk, it’s removed from the cache.
Constructor of CacheableObject.
- Parameters:
obj – the object to be downloaded.
max_chunk_size – max number of bytes for each chunk.
Notes: The object must be able to be divided into multiple items. A chunk is generated for each item.
- clear_cache()[source]
Clear the chunk cache only.
Does NOT touch base_obj — the source object is released separately via release() after the transaction_done_cb has been invoked, so the callback can still observe the original data if needed.
- abstract get_item_count() int[source]
The subclass must implement this method to return the number of items the object contains.
Returns: the number of items the object contains
- produce(state: dict, requester: str) Tuple[str, Any, dict][source]
Produce a small object to be sent (on object sender side).
- Parameters:
state – current state of downloading, received from the downloading receiver
requester – the FQCN of the receiver that is downloading
Returns: a tuple of (return code, a small object to be sent, new state to be sent).
- abstract produce_item(index: int) bytes[source]
This method is called to produce the chunk for the specified item.
- Parameters:
index – index of the item.
Returns: a chunk for the item
- release()[source]
Drop the reference to the source object.
Called by _Transaction.transaction_done() AFTER the transaction_done_cb fires. Setting base_obj to None drops the last infrastructure reference to the source data (e.g. a 5 GiB numpy dict), allowing it to be reclaimed by the GC immediately rather than waiting for a future cycle.
Overrides Downloadable.release() (which is a no-op by default).
- set_transaction(tx_id, ref_id)[source]
This method is called when the object is added to a transaction. You can use this method to keep transaction ID and/or ref ID for your own purpose.
- Parameters:
tx_id – the ID of the transaction that the object has been added to.
ref_id – ref ID generated for the object.
Returns: None
- class ItemConsumer[source]
Bases:
Consumer- consume(ref_id: str, state: dict, data: Any) dict[source]
Called to process the received data.
- Parameters:
ref_id – ref id of the object being downloaded
state – current state of downloading
data – data to be processed
Returns: new state to be sent back to the data owner.
- abstract consume_items(items: List[Any], result: Any) Any[source]
Process items and return updated result.