nvflare.fuel.utils.fobs.decomposers.via_downloader module
- class LazyDownloadRef(fqcn: str, ref_id: str, item_id: str, dot: int = 0)[source]
Bases:
objectPlaceholder created in PASS_THROUGH mode instead of downloading a tensor.
When a cell is configured as a pure forwarder (
FOBSContextKey.PASS_THROUGHis set in its FOBS context), incoming download references from the source are not resolved. Instead aLazyDownloadRefis created for each tensor item in the received batch so that the original source FQCN and batch ref_id are preserved.When the forwarding node (CJ) later serialises the task for its subprocess,
LazyDownloadRefDecomposer.decompose()detectsLazyDownloadReftargets and re-emits the original download datum (pointing back to the server) instead of creating a new datum that would point to the CJ. The subprocess agent then resolves the references directly from the originating source, downloading each tensor individually without any copy passing through the CJ.- fqcn
FQCN of the originating cell that owns the download transaction.
- ref_id
UUID of the batch download transaction on that cell.
- item_id
Intra-batch item placeholder (e.g.
"T0","T1").
- dot
Datum Object Type of the original download datum. Identifies which
ViaDownloaderDecomposersubclass owns this ref (e.g.NUMPY_DOWNLOADorTENSOR_DOWNLOAD). Required byLazyDownloadRefDecomposerto route serialisation and deserialisation to the correct handler.
- dot
- fqcn
- item_id
- ref_id
- class LazyDownloadRefDecomposer[source]
Bases:
DecomposerDecomposer that serialises and deserialises
LazyDownloadRefobjects.LazyDownloadRefobjects are created at a forwarding hop (e.g. the CJ process) whenFOBSContextKey.PASS_THROUGHis set. Instead of downloading tensors from the FL server, each tensor is represented as a lightweight placeholder that carries the original server FQCN, batch ref_id, item_id, and the Datum Object Type (dot) of the originatingViaDownloaderDecomposersubclass.When the forwarding node re-serialises the task for the subprocess agent, FOBS routes each
LazyDownloadRefto this decomposer.- decompose()
Delegates to the
ViaDownloaderDecomposersubclass identified bylazy.dot. That handler’sdecompose()re-emits the original server batch datum (fqcn / ref_id) via a post-callback so the subprocess knows exactly where to download from.lazy_dotis appended to the returned encoding dict sorecompose()can route back to the same handler.- recompose()
Uses
lazy_dotto look up the original handler and delegates tohandler.recompose(). At the subprocess,process_datum()has already populatedfobs_ctx[handler.items_key]with the downloaded tensors, so the call returns the real tensor value directly.
- decompose(lazy: LazyDownloadRef, manager: DatumManager | None = None) dict[source]
Decompose the target into types supported by msgpack or classes with decomposers registered.
Msgpack supports primitives, bytes, memoryview, lists, dicts.
- Parameters:
target – The instance to be serialized
manager – Datum manager to store externalized datum
- Returns:
The decomposed serializable objects
- recompose(data: dict, manager: DatumManager | None = None) Any[source]
Reconstruct the object from decomposed components.
- Parameters:
data – The decomposed component
manager – Datum manager to internalize datum
- Returns:
The reconstructed object
- class ViaDownloaderDecomposer(max_chunk_size: int, config_var_prefix)[source]
Bases:
Decomposer,ABC- decompose(target: Any, manager: DatumManager | None = None) Any[source]
Decompose the target into types supported by msgpack or classes with decomposers registered.
Msgpack supports primitives, bytes, memoryview, lists, dicts.
- Parameters:
target – The instance to be serialized
manager – Datum manager to store externalized datum
- Returns:
The decomposed serializable objects
- abstract download(from_fqcn: str, ref_id: str, per_request_timeout: float, cell: Cell, secure=False, optional=False, abort_signal=None) tuple[str, dict][source]
- abstract get_download_dot() int[source]
Get the Datum Object Type to be used for download ref datum
Returns: the DOT for download ref datum
- abstract native_decompose(target: Any, manager: DatumManager | None = None) bytes[source]
- abstract native_recompose(data: bytes, manager: DatumManager | None = None) Any[source]
- process_datum(datum: Datum, manager: DatumManager)[source]
This is called by the manager to process a datum that has a DOT. This happens before the recompose processing.
The datum contains information about where the data is: For bytes DOT, the data is included in the datum directly. For file DOT, the data is in a file, and the location of the file is further specified:
If the location is local, then the file is on local file system;
If the location is remote_cell, then the file is on a remote cell, and needs to be downloaded.
- Parameters:
datum – datum to be processed.
manager – the datum manager.
Returns: None
- recompose(data: Any, manager: DatumManager | None = None) Any[source]
Reconstruct the object from decomposed components.
- Parameters:
data – The decomposed component
manager – Datum manager to internalize datum
- Returns:
The reconstructed object
- supported_dots()[source]
Return the Datum Object Types supported by this decomposer. If a DOT is returned, this decomposer’s process_datum method will be called for any datum whose DOT matches this DOT.
Returns: None or list of DOTs
- abstract to_downloadable(items: dict, max_chunk_size: int, fobs_ctx: dict) Downloadable[source]
Convert the items Downloadable object.
- Parameters:
items – a dict of items of target object type to be converted
max_chunk_size – max size of one chunk.
fobs_ctx – FOBS Context
Returns: a Downloadable object
The “items” is a dict of target objects. The dict contains all objects of the target type in one payload.
- clear_download_initiated() None[source]
Reset the thread-local flag before a send_to_peer() call.
Prevents a stale True from a previous training round (which did have tensors) from carrying over to the current validate round (which has no tensors).
- was_download_initiated() bool[source]
Return True if _finalize_download_tx() created a download transaction in the current thread’s most recent encode_payload() call.
Called by FlareAgent._do_submit_result() immediately after send_to_peer() returns to decide whether to wait for the server to finish downloading tensors. Returns False for validate results (metrics only, no tensors).