Source code for nvflare.fuel.utils.fobs.decomposers.via_downloader

# Copyright (c) 2025, NVIDIA CORPORATION.  All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import threading
import uuid
from abc import ABC, abstractmethod
from typing import Any

import nvflare.fuel.utils.app_config_utils as acu
from nvflare.apis.fl_constant import ConfigVarName
from nvflare.fuel.f3.cellnet.cell import Cell
from nvflare.fuel.f3.cellnet.defs import MessageHeaderKey
from nvflare.fuel.f3.streaming.download_service import Downloadable
from nvflare.fuel.f3.streaming.file_downloader import ObjectDownloader
from nvflare.fuel.utils import fobs
from nvflare.fuel.utils.fobs.datum import Datum, DatumManager, DatumType
from nvflare.fuel.utils.log_utils import get_obj_logger

MIN_DOWNLOAD_TIMEOUT_DEFAULT = 300  # inactivity timeout between chunk requests; 5 min covers GC pauses
_MIN_DOWNLOAD_TIMEOUT = MIN_DOWNLOAD_TIMEOUT_DEFAULT  # backward-compat alias

# Thread-local flag for synchronous download-initiation detection.
# Task pipe and metric pipe share the same CoreCell (same site_name + token + mode
# → same FQCN → same _CellInfo cache entry → same core_cell.fobs_ctx).  A plain
# fobs_ctx flag would be clobbered by concurrent serialisation calls from different
# threads on the same cell.  Thread-local gives per-thread isolation because
# _finalize_download_tx() is always called synchronously in the thread that invoked
# send_to_peer() → encode_payload() → FOBS serialisation.
_tls = threading.local()


[docs] def was_download_initiated() -> bool: """Return True if _finalize_download_tx() created a download transaction in the current thread's most recent encode_payload() call. Called by FlareAgent._do_submit_result() immediately after send_to_peer() returns to decide whether to wait for the server to finish downloading tensors. Returns False for validate results (metrics only, no tensors). """ return getattr(_tls, "download_initiated", False)
[docs] def clear_download_initiated() -> None: """Reset the thread-local flag before a send_to_peer() call. Prevents a stale True from a previous training round (which did have tensors) from carrying over to the current validate round (which has no tensors). """ _tls.download_initiated = False
[docs] class LazyDownloadRef: """Placeholder created in PASS_THROUGH mode instead of downloading a tensor. When a cell is configured as a pure forwarder (``FOBSContextKey.PASS_THROUGH`` is set in its FOBS context), incoming download references from the source are not resolved. Instead a ``LazyDownloadRef`` is created for each tensor item in the received batch so that the original source FQCN and batch ref_id are preserved. When the forwarding node (CJ) later serialises the task for its subprocess, ``LazyDownloadRefDecomposer.decompose()`` detects ``LazyDownloadRef`` targets and re-emits the *original* download datum (pointing back to the server) instead of creating a new datum that would point to the CJ. The subprocess agent then resolves the references directly from the originating source, downloading each tensor individually without any copy passing through the CJ. Attributes: fqcn: FQCN of the originating cell that owns the download transaction. ref_id: UUID of the batch download transaction on that cell. item_id: Intra-batch item placeholder (e.g. ``"T0"``, ``"T1"``). dot: Datum Object Type of the original download datum. Identifies which ``ViaDownloaderDecomposer`` subclass owns this ref (e.g. ``NUMPY_DOWNLOAD`` or ``TENSOR_DOWNLOAD``). Required by ``LazyDownloadRefDecomposer`` to route serialisation and deserialisation to the correct handler. """ __slots__ = ("fqcn", "ref_id", "item_id", "dot") def __init__(self, fqcn: str, ref_id: str, item_id: str, dot: int = 0): self.fqcn = fqcn self.ref_id = ref_id self.item_id = item_id self.dot = dot
class _LazyBatchInfo: """Sentinel stored in fobs_ctx[items_key] during PASS_THROUGH mode. Carries the (fqcn, ref_id, dot) of the *original* download batch so that ``recompose()`` can build a ``LazyDownloadRef`` for each item_id it encounters. Using a named sentinel class (rather than a plain tuple) makes the PASS_THROUGH path unambiguous and robust against accidental type collisions. """ __slots__ = ("fqcn", "ref_id", "dot") def __init__(self, fqcn: str, ref_id: str, dot: int = 0): self.fqcn = fqcn self.ref_id = ref_id self.dot = dot # fobs_ctx key used to carry the fqcn/ref_id batch info in PASS_THROUGH mode # so that recompose() can build per-item LazyDownloadRefs from a single datum. _LAZY_BATCH_CTX_SUFFIX = "_lazy_batch"
[docs] class EncKey: TYPE = "type" DATA = "data"
[docs] class EncType: NATIVE = "native" REF = "ref"
class _RefKey: REF_ID = "ref_id" FQCN = "fqcn" class _CtxKey: MSG_ROOT_ID = "msg_root_id" MSG_ROOT_TTL = "msg_root_ttl" OBJECTS = "objects" # objects to be downloaded FINAL_CB_REGISTERED = "final_cb_registered" class _DecomposeCtx: def __init__(self): self.target_to_item = {} # target_id => item_id self.target_items = {} # item_id => item value self.last_item_id = 0 self.lock = threading.Lock() def add_item(self, item: Any): with self.lock: target_id = id(item) item_id = self.target_to_item.get(target_id) if not item_id: item_id = f"T{self.last_item_id}" self.last_item_id += 1 self.target_items[item_id] = item self.target_to_item[target_id] = item_id return item_id, target_id def get_item_count(self): return len(self.target_items)
[docs] class ViaDownloaderDecomposer(fobs.Decomposer, ABC): def __init__(self, max_chunk_size: int, config_var_prefix): self.logger = get_obj_logger(self) self.prefix = self.__class__.__name__ self.decompose_ctx_key = f"{self.prefix}_dc" # kept in fobs_ctx: each target type has its own DecomposeCtx self.items_key = f"{self.prefix}_items" # in fobs_ctx: each target type has its own set of items self.config_var_prefix = config_var_prefix self.max_chunk_size = max_chunk_size
[docs] @abstractmethod def to_downloadable(self, items: dict, max_chunk_size: int, fobs_ctx: dict) -> Downloadable: """Convert the items Downloadable object. Args: items: a dict of items of target object type to be converted max_chunk_size: max size of one chunk. fobs_ctx: FOBS Context Returns: a Downloadable object The "items" is a dict of target objects. The dict contains all objects of the target type in one payload. """ pass
[docs] @abstractmethod def download( self, from_fqcn: str, ref_id: str, per_request_timeout: float, cell: Cell, secure=False, optional=False, abort_signal=None, ) -> tuple[str, dict]: pass
[docs] def supported_dots(self): return [self.get_download_dot()]
[docs] @abstractmethod def get_download_dot(self) -> int: """Get the Datum Object Type to be used for download ref datum Returns: the DOT for download ref datum """ pass
[docs] @abstractmethod def native_decompose(self, target: Any, manager: DatumManager = None) -> bytes: pass
[docs] @abstractmethod def native_recompose(self, data: bytes, manager: DatumManager = None) -> Any: pass
def _create_ref(self, target: Any, manager: DatumManager, fobs_ctx: dict): # create a reference item for the target object. The ref item represents the target object in # the serialized payload. dc = fobs_ctx.get(self.decompose_ctx_key) item_id, target_id = dc.add_item(target) if dc.get_item_count() == 1: # register the post_process callback to further process these items. # only register cb once! manager.register_post_cb(self._process_items_to_datum) return item_id, target_id def _create_downloadable(self, fobs_ctx: dict) -> Downloadable: dc = fobs_ctx.get(self.decompose_ctx_key) assert isinstance(dc, _DecomposeCtx) items = dc.target_items max_chunk_size = acu.get_int_var( self._config_var_name(ConfigVarName.DOWNLOAD_CHUNK_SIZE), self.max_chunk_size, ) try: return self.to_downloadable(items, max_chunk_size, fobs_ctx) except Exception as e: self.logger.error(f"Error converting {len(items)} items to Downloadable: {e}") raise e @staticmethod def _determine_msg_root(fobs_ctx: dict): msg_root_id = fobs_ctx.get(_CtxKey.MSG_ROOT_ID) msg_root_ttl = fobs_ctx.get(_CtxKey.MSG_ROOT_TTL) if not msg_root_id: # try to get from msg msg = fobs_ctx.get(fobs.FOBSContextKey.MESSAGE) if msg: msg_root_id = msg.get_header(MessageHeaderKey.MSG_ROOT_ID) msg_root_ttl = msg.get_header(MessageHeaderKey.MSG_ROOT_TTL) return msg_root_id, msg_root_ttl
[docs] def decompose(self, target: Any, manager: DatumManager = None) -> Any: if not manager: # this should never happen raise RuntimeError("FOBS System Error: missing DatumManager") # ── LazyDownloadRef: re-emit the original server datum verbatim ──────── # A LazyDownloadRef was created in PASS_THROUGH mode when CJ received the # task from the server. Instead of creating a *new* download transaction # on *this* cell (which would make the subprocess download from CJ), we # re-emit the exact datum that the server originally sent. The subprocess # agent therefore downloads each tensor directly from the server, with no # tensor data ever materialised on CJ. if isinstance(target, LazyDownloadRef): fobs_ctx = manager.fobs_ctx lazy_batch_key = f"{self.prefix}{_LAZY_BATCH_CTX_SUFFIX}" if lazy_batch_key not in fobs_ctx: # First LazyDownloadRef of this batch: register a post-callback # that will add the single shared datum (fqcn + ref_id) after all # items have been serialised. fobs_ctx[lazy_batch_key] = {"fqcn": target.fqcn, "ref_id": target.ref_id} manager.register_post_cb(self._finalize_lazy_batch) self.logger.debug( f"ViaDownloader: re-emitting LazyDownloadRef {target.item_id=} " f"{target.fqcn=} {target.ref_id=}" ) return {EncKey.TYPE: EncType.REF, EncKey.DATA: target.item_id} max_chunk_size = acu.get_int_var( self._config_var_name(ConfigVarName.DOWNLOAD_CHUNK_SIZE), self.max_chunk_size, ) fobs_ctx = manager.fobs_ctx cell = fobs_ctx.get(fobs.FOBSContextKey.CELL) if not cell: # If no cell, only support native decomposers fobs_ctx["native"] = True use_native = fobs_ctx.get("native", False) if max_chunk_size <= 0 or use_native: # use native decompose self.logger.debug("using native_decompose") data = self.native_decompose(target, manager) return {EncKey.TYPE: EncType.NATIVE, EncKey.DATA: data} else: self.logger.debug(f"using download decompose: {max_chunk_size=}") # Create a DecomposeCtx for this target type. # Note: there could be multiple target types - each target type has its own DecomposeCtx! dc = fobs_ctx.get(self.decompose_ctx_key) if not dc: dc = _DecomposeCtx() fobs_ctx[self.decompose_ctx_key] = dc item_id, target_id = self._create_ref(target, manager, fobs_ctx) self.logger.debug(f"ViaDownloader: created ref for target {target_id}: {item_id}") return {EncKey.TYPE: EncType.REF, EncKey.DATA: item_id}
def _create_downloader(self, fobs_ctx: dict): # Transaction lifecycle is managed solely by _monitor_tx() (download_service.py). # We deliberately do NOT subscribe to msg_root deletion here. The msg_root is # deleted as soon as all blobs are delivered, but blob_cb fires asynchronously — # secondary tensor downloads are still in flight when msg_root is deleted. # Subscribing caused a race: delete_transaction() removed refs from _ref_table # before blob_cb could finish its _download_from_remote_cell() calls, producing # "no ref found" FATAL_SYSTEM_ERROR (RC12 Bug 1). # _monitor_tx() polls is_finished() every 5s and cleans up within 5s of the last # receiver completing all chunk downloads — sufficient for all model sizes. msg_root_id, msg_root_ttl = self._determine_msg_root(fobs_ctx) # Read min_download_timeout from job config so operators can tune # it per-job (e.g. np_min_download_timeout: 600 for a 70B model). # Falls back to the module-level constant (60s) when not set. min_timeout = acu.get_positive_float_var( self._config_var_name(ConfigVarName.MIN_DOWNLOAD_TIMEOUT), _MIN_DOWNLOAD_TIMEOUT, ) if msg_root_ttl: timeout = msg_root_ttl else: timeout = min_timeout if timeout < min_timeout: timeout = min_timeout self.logger.debug(f"ViaDownloader: {msg_root_id=} {timeout=}") downloader = None cell = fobs_ctx.get(fobs.FOBSContextKey.CELL) if cell: num = fobs_ctx.get(fobs.FOBSContextKey.NUM_RECEIVERS) num_receivers = num if num else 1 # Optional lifecycle callback set by FlareAgent._do_submit_result() # (subprocess → CJ → server reverse path) so the subprocess can wait # until the server has finished downloading from its DownloadService # before exiting. None when no gating is needed (forward path). on_complete_cb = fobs_ctx.get(fobs.FOBSContextKey.DOWNLOAD_COMPLETE_CB) downloader = ObjectDownloader( num_receivers=num_receivers, cell=cell, timeout=timeout, transaction_done_cb=on_complete_cb, ) return downloader def _process_items_to_datum(self, mgr: DatumManager): """This method is called during serialization after all target items are serialized. For primary msg, we turn the collected items into a file, and add file info as a Datum to the datum manager. Args: mgr: Returns: """ fobs_ctx = mgr.fobs_ctx dc = fobs_ctx.get(self.decompose_ctx_key) assert isinstance(dc, _DecomposeCtx) # create datum for the collected target items # This is called once for each target object type! # register the final CB to be called after the post_process. # Note that the post_process (this CB) only generates files but does not create download transaction. # For large object, file generation could take long time. If we create the download transaction, it may # become expired even before file generation is done! # This is why we do the file generation in this CB, and then create the transaction in the final_cb! final_cb_registered = fobs_ctx.get(_CtxKey.FINAL_CB_REGISTERED) if not final_cb_registered: # register final_cb mgr.register_post_cb(self._finalize_download_tx) fobs_ctx[_CtxKey.FINAL_CB_REGISTERED] = True try: if not mgr.get_error(): datum = self._create_datum(fobs_ctx) mgr.add_datum(datum) except Exception as ex: self.logger.error(f"exception creating datum: {ex}") mgr.set_error(f"exception creating datum in {type(self)}") def _config_var_name(self, base_name: str): return f"{self.config_var_prefix}{base_name}" def _create_datum(self, fobs_ctx: dict): downloadable = self._create_downloadable(fobs_ctx) cell = fobs_ctx.get(fobs.FOBSContextKey.CELL) # use download DOT # keep files in fobs_ctx downloadable_objs = fobs_ctx.get(_CtxKey.OBJECTS) if not downloadable_objs: downloadable_objs = [] fobs_ctx[_CtxKey.OBJECTS] = downloadable_objs # create a new ref id ref_id = str(uuid.uuid4()) downloadable_objs.append((ref_id, downloadable)) ref = { _RefKey.FQCN: cell.get_fqcn(), _RefKey.REF_ID: ref_id, } self.logger.debug(f"ViaDownloader: created download ref for target type {self.__class__.__name__}: {ref=}") datum = Datum(datum_type=DatumType.TEXT, value=json.dumps(ref), dot=self.get_download_dot()) return datum def _finalize_download_tx(self, mgr: DatumManager): self.logger.debug("ViaDownloader: finalizing download tx") fobs_ctx = mgr.fobs_ctx downloadable_objs = fobs_ctx.get(_CtxKey.OBJECTS) if downloadable_objs: downloader = self._create_downloader(fobs_ctx) for ref_id, obj in downloadable_objs: self.logger.debug(f"ViaDownloader: adding object to downloader: {ref_id=}") downloader.add_object(obj, ref_id=ref_id) # Signal FlareAgent (same thread) that a download transaction was created. # Thread-local avoids shared-state races when task pipe and metric pipe # share the same CoreCell (RC12 Bug 3). _tls.download_initiated = True def _finalize_lazy_batch(self, mgr: DatumManager): """Post-callback used when re-emitting a LazyDownloadRef batch. Adds a single datum containing the *original* source FQCN and ref_id so that the downstream consumer (subprocess agent) can download the tensors directly from the originating cell (typically the FL server) without involving the CJ at all. """ fobs_ctx = mgr.fobs_ctx lazy_batch_key = f"{self.prefix}{_LAZY_BATCH_CTX_SUFFIX}" lazy_batch = fobs_ctx.get(lazy_batch_key) if not lazy_batch: return ref = {_RefKey.FQCN: lazy_batch["fqcn"], _RefKey.REF_ID: lazy_batch["ref_id"]} datum = Datum(datum_type=DatumType.TEXT, value=json.dumps(ref), dot=self.get_download_dot()) self.logger.debug( f"ViaDownloader: finalized lazy batch datum for {lazy_batch['fqcn']=} {lazy_batch['ref_id']=}" ) mgr.add_datum(datum)
[docs] def process_datum(self, datum: Datum, manager: DatumManager): """This is called by the manager to process a datum that has a DOT. This happens before the recompose processing. The datum contains information about where the data is: For bytes DOT, the data is included in the datum directly. For file DOT, the data is in a file, and the location of the file is further specified: - If the location is local, then the file is on local file system; - If the location is remote_cell, then the file is on a remote cell, and needs to be downloaded. Args: datum: datum to be processed. manager: the datum manager. Returns: None """ self.logger.debug(f"ViaDownloader: pre-processing datum {datum.dot=} before recompose") fobs_ctx = manager.fobs_ctx if fobs_ctx.get(fobs.FOBSContextKey.PASS_THROUGH): # PASS_THROUGH mode: do NOT download tensors at this intermediate hop. # Store the batch (fqcn, ref_id) so that recompose() can build a # LazyDownloadRef for each item_id it encounters. The downstream # consumer (subprocess agent) will resolve the references directly # from the originating source cell. ref = json.loads(datum.value) self.logger.debug(f"ViaDownloader PASS_THROUGH: preserving lazy ref {ref} instead of downloading") fobs_ctx[self.items_key] = _LazyBatchInfo(ref[_RefKey.FQCN], ref[_RefKey.REF_ID], datum.dot) return # data is to be downloaded ref = json.loads(datum.value) items = self._download_from_remote_cell(manager.fobs_ctx, ref) fobs_ctx[self.items_key] = items
[docs] def recompose(self, data: Any, manager: DatumManager = None) -> Any: if not manager: # should never happen! raise RuntimeError("missing DatumManager") if not isinstance(data, dict): self.logger.error(f"data to be recomposed should be dict but got {type(data)}") raise RuntimeError("FOBS protocol error") enc_type = data.get(EncKey.TYPE) data = data.get(EncKey.DATA) if not data: self.logger.error("missing 'data' property from the recompose data") raise RuntimeError("FOBS protocol error") if enc_type == EncType.NATIVE: self.logger.debug("using native_recompose") return self.native_recompose(data, manager) elif enc_type != EncType.REF: self.logger.error(f"invalid enc_type {enc_type} in recompose data") raise RuntimeError("FOBS protocol error") if not isinstance(data, str): self.logger.error(f"ref data must be str but got {type(data)}") raise RuntimeError("FOBS protocol error") # data is the item id tid = threading.get_ident() self.logger.debug(f"ViaDownloader: {tid=} recomposing data item {data}") item_id = data fobs_ctx = manager.fobs_ctx items = fobs_ctx.get(self.items_key) # PASS_THROUGH mode: items_key holds a _LazyBatchInfo sentinel, not a dict. # Build a LazyDownloadRef so the reference can be forwarded verbatim. # Carry items.dot so that LazyDownloadRefDecomposer can route back to the # correct ViaDownloaderDecomposer subclass during subprocess recompose(). if isinstance(items, _LazyBatchInfo): lazy = LazyDownloadRef(fqcn=items.fqcn, ref_id=items.ref_id, item_id=item_id, dot=items.dot) self.logger.debug( f"ViaDownloader PASS_THROUGH: created LazyDownloadRef {item_id=} " f"{items.fqcn=} {items.ref_id=} {items.dot=}" ) return lazy if items is None: self.logger.error(f"cannot find item {item_id} because no downloaded data is loaded") raise RuntimeError(f"FOBS download data is missing for item {item_id}") self.logger.debug(f"trying to get item for {item_id=} from {type(items)=}") make_lazy_ref_fn = getattr(items, "make_lazy_ref", None) if callable(make_lazy_ref_fn) and item_id in items: item = make_lazy_ref_fn(item_id) self.logger.debug(f"{tid=} created lazy ref for {item_id}") return item get_item_fn = getattr(items, "get", None) if not callable(get_item_fn): self.logger.error(f"downloaded data for {item_id} does not support get(): {type(items)}") raise RuntimeError(f"FOBS download data has invalid type for item {item_id}") if hasattr(items, "__contains__") and item_id not in items: self.logger.error(f"cannot find item {item_id} from loaded data") raise RuntimeError(f"FOBS download data is incomplete: item {item_id} is missing") item = items.get(item_id) self.logger.debug(f"{tid=} found item {item_id}: {type(item)}") if item is None: self.logger.error(f"downloaded item {item_id} is None") raise RuntimeError(f"FOBS download data is incomplete: item {item_id} is None") return item
def _download_from_remote_cell(self, fobs_ctx: dict, ref: dict): self.logger.debug(f"trying to download from remote cell for {ref=}") cell = fobs_ctx.get(fobs.FOBSContextKey.CELL) if not cell: self.logger.error("cannot download from remote cell since cell not available in fobs context") raise RuntimeError("FOBS Protocol Error") ref_id = ref.get(_RefKey.REF_ID) if not ref_id: self.logger.error(f"missing {_RefKey.REF_ID} from {ref}") raise RuntimeError("FOBS Protocol Error") fqcn = ref.get(_RefKey.FQCN) if not fqcn: self.logger.error(f"missing {_RefKey.FQCN} from {ref}") raise RuntimeError("FOBS Protocol Error") req_timeout = fobs_ctx.get(fobs.FOBSContextKey.DOWNLOAD_REQ_TIMEOUT, None) if not req_timeout: req_timeout = acu.get_positive_float_var( self._config_var_name(ConfigVarName.STREAMING_PER_REQUEST_TIMEOUT), 600.0 ) self.logger.debug(f"DOWNLOAD_REQ_TIMEOUT={req_timeout}") abort_signal = fobs_ctx.get(fobs.FOBSContextKey.ABORT_SIGNAL) self.logger.debug(f"trying to download: {ref_id=} {fqcn=}") err, items = self.download( from_fqcn=fqcn, ref_id=ref_id, per_request_timeout=req_timeout, cell=cell, abort_signal=abort_signal, ) if err: self.logger.error(f"failed to download from {fqcn} for source {ref}: {err}") raise RuntimeError(f"failed to download from {fqcn}") else: self.logger.debug(f"downloaded {len(items)} items successfully") return items
[docs] class LazyDownloadRefDecomposer(fobs.Decomposer): """Decomposer that serialises and deserialises :class:`LazyDownloadRef` objects. ``LazyDownloadRef`` objects are created at a forwarding hop (e.g. the CJ process) when ``FOBSContextKey.PASS_THROUGH`` is set. Instead of downloading tensors from the FL server, each tensor is represented as a lightweight placeholder that carries the original server FQCN, batch ref_id, item_id, and the Datum Object Type (``dot``) of the originating ``ViaDownloaderDecomposer`` subclass. When the forwarding node re-serialises the task for the subprocess agent, FOBS routes each ``LazyDownloadRef`` to this decomposer. **decompose()** Delegates to the ``ViaDownloaderDecomposer`` subclass identified by ``lazy.dot``. That handler's ``decompose()`` re-emits the original server batch datum (fqcn / ref_id) via a post-callback so the subprocess knows exactly where to download from. ``lazy_dot`` is appended to the returned encoding dict so ``recompose()`` can route back to the same handler. **recompose()** Uses ``lazy_dot`` to look up the original handler and delegates to ``handler.recompose()``. At the subprocess, ``process_datum()`` has already populated ``fobs_ctx[handler.items_key]`` with the downloaded tensors, so the call returns the real tensor value directly. """
[docs] def supported_type(self): return LazyDownloadRef
[docs] def decompose(self, lazy: LazyDownloadRef, manager: DatumManager = None) -> dict: handler = fobs.get_dot_handler(lazy.dot) if not handler: raise RuntimeError( f"LazyDownloadRefDecomposer: no DOT handler registered for dot={lazy.dot!r}. " "Ensure the original ViaDownloaderDecomposer subclass (e.g. NumpyArrayDecomposer) " "is registered before serialising LazyDownloadRef objects." ) result = handler.decompose(lazy, manager) result["lazy_dot"] = lazy.dot return result
[docs] def recompose(self, data: dict, manager: DatumManager = None) -> Any: lazy_dot = data.get("lazy_dot") if lazy_dot is None: raise RuntimeError("LazyDownloadRefDecomposer: missing 'lazy_dot' in encoded data") handler = fobs.get_dot_handler(lazy_dot) if not handler: raise RuntimeError(f"LazyDownloadRefDecomposer: no DOT handler registered for lazy_dot={lazy_dot!r}") return handler.recompose(data, manager)