nvflare.fuel.utils.fobs.decomposers.via_downloader module

class EncKey[source]

Bases: object

DATA = 'data'
TYPE = 'type'
class EncType[source]

Bases: object

NATIVE = 'native'
REF = 'ref'
class LazyDownloadRef(fqcn: str, ref_id: str, item_id: str, dot: int = 0)[source]

Bases: object

Placeholder created in PASS_THROUGH mode instead of downloading a tensor.

When a cell is configured as a pure forwarder (FOBSContextKey.PASS_THROUGH is set in its FOBS context), incoming download references from the source are not resolved. Instead a LazyDownloadRef is created for each tensor item in the received batch so that the original source FQCN and batch ref_id are preserved.

When the forwarding node (CJ) later serialises the task for its subprocess, LazyDownloadRefDecomposer.decompose() detects LazyDownloadRef targets and re-emits the original download datum (pointing back to the server) instead of creating a new datum that would point to the CJ. The subprocess agent then resolves the references directly from the originating source, downloading each tensor individually without any copy passing through the CJ.

fqcn

FQCN of the originating cell that owns the download transaction.

ref_id

UUID of the batch download transaction on that cell.

item_id

Intra-batch item placeholder (e.g. "T0", "T1").

dot

Datum Object Type of the original download datum. Identifies which ViaDownloaderDecomposer subclass owns this ref (e.g. NUMPY_DOWNLOAD or TENSOR_DOWNLOAD). Required by LazyDownloadRefDecomposer to route serialisation and deserialisation to the correct handler.

dot
fqcn
item_id
ref_id
class LazyDownloadRefDecomposer[source]

Bases: Decomposer

Decomposer that serialises and deserialises LazyDownloadRef objects.

LazyDownloadRef objects are created at a forwarding hop (e.g. the CJ process) when FOBSContextKey.PASS_THROUGH is set. Instead of downloading tensors from the FL server, each tensor is represented as a lightweight placeholder that carries the original server FQCN, batch ref_id, item_id, and the Datum Object Type (dot) of the originating ViaDownloaderDecomposer subclass.

When the forwarding node re-serialises the task for the subprocess agent, FOBS routes each LazyDownloadRef to this decomposer.

decompose()

Delegates to the ViaDownloaderDecomposer subclass identified by lazy.dot. That handler’s decompose() re-emits the original server batch datum (fqcn / ref_id) via a post-callback so the subprocess knows exactly where to download from. lazy_dot is appended to the returned encoding dict so recompose() can route back to the same handler.

recompose()

Uses lazy_dot to look up the original handler and delegates to handler.recompose(). At the subprocess, process_datum() has already populated fobs_ctx[handler.items_key] with the downloaded tensors, so the call returns the real tensor value directly.

decompose(lazy: LazyDownloadRef, manager: DatumManager | None = None) dict[source]

Decompose the target into types supported by msgpack or classes with decomposers registered.

Msgpack supports primitives, bytes, memoryview, lists, dicts.

Parameters:
  • target – The instance to be serialized

  • manager – Datum manager to store externalized datum

Returns:

The decomposed serializable objects

recompose(data: dict, manager: DatumManager | None = None) Any[source]

Reconstruct the object from decomposed components.

Parameters:
  • data – The decomposed component

  • manager – Datum manager to internalize datum

Returns:

The reconstructed object

supported_type()[source]

Returns the type/class supported by this decomposer.

Returns:

The class (not instance) of supported type

class ViaDownloaderDecomposer(max_chunk_size: int, config_var_prefix)[source]

Bases: Decomposer, ABC

decompose(target: Any, manager: DatumManager | None = None) Any[source]

Decompose the target into types supported by msgpack or classes with decomposers registered.

Msgpack supports primitives, bytes, memoryview, lists, dicts.

Parameters:
  • target – The instance to be serialized

  • manager – Datum manager to store externalized datum

Returns:

The decomposed serializable objects

abstract download(from_fqcn: str, ref_id: str, per_request_timeout: float, cell: Cell, secure=False, optional=False, abort_signal=None) tuple[str, dict][source]
abstract get_download_dot() int[source]

Get the Datum Object Type to be used for download ref datum

Returns: the DOT for download ref datum

abstract native_decompose(target: Any, manager: DatumManager | None = None) bytes[source]
abstract native_recompose(data: bytes, manager: DatumManager | None = None) Any[source]
process_datum(datum: Datum, manager: DatumManager)[source]

This is called by the manager to process a datum that has a DOT. This happens before the recompose processing.

The datum contains information about where the data is: For bytes DOT, the data is included in the datum directly. For file DOT, the data is in a file, and the location of the file is further specified:

  • If the location is local, then the file is on local file system;

  • If the location is remote_cell, then the file is on a remote cell, and needs to be downloaded.

Parameters:
  • datum – datum to be processed.

  • manager – the datum manager.

Returns: None

recompose(data: Any, manager: DatumManager | None = None) Any[source]

Reconstruct the object from decomposed components.

Parameters:
  • data – The decomposed component

  • manager – Datum manager to internalize datum

Returns:

The reconstructed object

supported_dots()[source]

Return the Datum Object Types supported by this decomposer. If a DOT is returned, this decomposer’s process_datum method will be called for any datum whose DOT matches this DOT.

Returns: None or list of DOTs

abstract to_downloadable(items: dict, max_chunk_size: int, fobs_ctx: dict) Downloadable[source]

Convert the items Downloadable object.

Parameters:
  • items – a dict of items of target object type to be converted

  • max_chunk_size – max size of one chunk.

  • fobs_ctx – FOBS Context

Returns: a Downloadable object

The “items” is a dict of target objects. The dict contains all objects of the target type in one payload.

clear_download_initiated() None[source]

Reset the thread-local flag before a send_to_peer() call.

Prevents a stale True from a previous training round (which did have tensors) from carrying over to the current validate round (which has no tensors).

was_download_initiated() bool[source]

Return True if _finalize_download_tx() created a download transaction in the current thread’s most recent encode_payload() call.

Called by FlareAgent._do_submit_result() immediately after send_to_peer() returns to decide whether to wait for the server to finish downloading tensors. Returns False for validate results (metrics only, no tensors).