# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import threading
import uuid
from abc import ABC, abstractmethod
from typing import Any
import nvflare.fuel.utils.app_config_utils as acu
from nvflare.apis.fl_constant import ConfigVarName
from nvflare.fuel.f3.cellnet.cell import Cell
from nvflare.fuel.f3.cellnet.defs import MessageHeaderKey
from nvflare.fuel.f3.streaming.download_service import Downloadable
from nvflare.fuel.f3.streaming.file_downloader import ObjectDownloader
from nvflare.fuel.utils import fobs
from nvflare.fuel.utils.fobs.datum import Datum, DatumManager, DatumType
from nvflare.fuel.utils.log_utils import get_obj_logger
MIN_DOWNLOAD_TIMEOUT_DEFAULT = 300 # inactivity timeout between chunk requests; 5 min covers GC pauses
_MIN_DOWNLOAD_TIMEOUT = MIN_DOWNLOAD_TIMEOUT_DEFAULT # backward-compat alias
# Thread-local flag for synchronous download-initiation detection.
# Task pipe and metric pipe share the same CoreCell (same site_name + token + mode
# → same FQCN → same _CellInfo cache entry → same core_cell.fobs_ctx). A plain
# fobs_ctx flag would be clobbered by concurrent serialisation calls from different
# threads on the same cell. Thread-local gives per-thread isolation because
# _finalize_download_tx() is always called synchronously in the thread that invoked
# send_to_peer() → encode_payload() → FOBS serialisation.
_tls = threading.local()
[docs]
def was_download_initiated() -> bool:
"""Return True if _finalize_download_tx() created a download transaction in
the current thread's most recent encode_payload() call.
Called by FlareAgent._do_submit_result() immediately after send_to_peer()
returns to decide whether to wait for the server to finish downloading tensors.
Returns False for validate results (metrics only, no tensors).
"""
return getattr(_tls, "download_initiated", False)
[docs]
def clear_download_initiated() -> None:
"""Reset the thread-local flag before a send_to_peer() call.
Prevents a stale True from a previous training round (which did have tensors)
from carrying over to the current validate round (which has no tensors).
"""
_tls.download_initiated = False
[docs]
class LazyDownloadRef:
"""Placeholder created in PASS_THROUGH mode instead of downloading a tensor.
When a cell is configured as a pure forwarder (``FOBSContextKey.PASS_THROUGH``
is set in its FOBS context), incoming download references from the source are
not resolved. Instead a ``LazyDownloadRef`` is created for each tensor item
in the received batch so that the original source FQCN and batch ref_id are
preserved.
When the forwarding node (CJ) later serialises the task for its subprocess,
``LazyDownloadRefDecomposer.decompose()`` detects ``LazyDownloadRef`` targets
and re-emits the *original* download datum (pointing back to the server)
instead of creating a new datum that would point to the CJ. The subprocess
agent then resolves the references directly from the originating source,
downloading each tensor individually without any copy passing through the CJ.
Attributes:
fqcn: FQCN of the originating cell that owns the download transaction.
ref_id: UUID of the batch download transaction on that cell.
item_id: Intra-batch item placeholder (e.g. ``"T0"``, ``"T1"``).
dot: Datum Object Type of the original download datum. Identifies
which ``ViaDownloaderDecomposer`` subclass owns this ref (e.g.
``NUMPY_DOWNLOAD`` or ``TENSOR_DOWNLOAD``). Required by
``LazyDownloadRefDecomposer`` to route serialisation and
deserialisation to the correct handler.
"""
__slots__ = ("fqcn", "ref_id", "item_id", "dot")
def __init__(self, fqcn: str, ref_id: str, item_id: str, dot: int = 0):
self.fqcn = fqcn
self.ref_id = ref_id
self.item_id = item_id
self.dot = dot
class _LazyBatchInfo:
"""Sentinel stored in fobs_ctx[items_key] during PASS_THROUGH mode.
Carries the (fqcn, ref_id, dot) of the *original* download batch so that
``recompose()`` can build a ``LazyDownloadRef`` for each item_id it
encounters. Using a named sentinel class (rather than a plain tuple)
makes the PASS_THROUGH path unambiguous and robust against accidental
type collisions.
"""
__slots__ = ("fqcn", "ref_id", "dot")
def __init__(self, fqcn: str, ref_id: str, dot: int = 0):
self.fqcn = fqcn
self.ref_id = ref_id
self.dot = dot
# fobs_ctx key used to carry the fqcn/ref_id batch info in PASS_THROUGH mode
# so that recompose() can build per-item LazyDownloadRefs from a single datum.
_LAZY_BATCH_CTX_SUFFIX = "_lazy_batch"
[docs]
class EncKey:
TYPE = "type"
DATA = "data"
[docs]
class EncType:
NATIVE = "native"
REF = "ref"
class _RefKey:
REF_ID = "ref_id"
FQCN = "fqcn"
class _CtxKey:
MSG_ROOT_ID = "msg_root_id"
MSG_ROOT_TTL = "msg_root_ttl"
OBJECTS = "objects" # objects to be downloaded
FINAL_CB_REGISTERED = "final_cb_registered"
class _DecomposeCtx:
def __init__(self):
self.target_to_item = {} # target_id => item_id
self.target_items = {} # item_id => item value
self.last_item_id = 0
self.lock = threading.Lock()
def add_item(self, item: Any):
with self.lock:
target_id = id(item)
item_id = self.target_to_item.get(target_id)
if not item_id:
item_id = f"T{self.last_item_id}"
self.last_item_id += 1
self.target_items[item_id] = item
self.target_to_item[target_id] = item_id
return item_id, target_id
def get_item_count(self):
return len(self.target_items)
[docs]
class ViaDownloaderDecomposer(fobs.Decomposer, ABC):
def __init__(self, max_chunk_size: int, config_var_prefix):
self.logger = get_obj_logger(self)
self.prefix = self.__class__.__name__
self.decompose_ctx_key = f"{self.prefix}_dc" # kept in fobs_ctx: each target type has its own DecomposeCtx
self.items_key = f"{self.prefix}_items" # in fobs_ctx: each target type has its own set of items
self.config_var_prefix = config_var_prefix
self.max_chunk_size = max_chunk_size
[docs]
@abstractmethod
def to_downloadable(self, items: dict, max_chunk_size: int, fobs_ctx: dict) -> Downloadable:
"""Convert the items Downloadable object.
Args:
items: a dict of items of target object type to be converted
max_chunk_size: max size of one chunk.
fobs_ctx: FOBS Context
Returns: a Downloadable object
The "items" is a dict of target objects. The dict contains all objects of the target type in one payload.
"""
pass
[docs]
@abstractmethod
def download(
self,
from_fqcn: str,
ref_id: str,
per_request_timeout: float,
cell: Cell,
secure=False,
optional=False,
abort_signal=None,
) -> tuple[str, dict]:
pass
[docs]
def supported_dots(self):
return [self.get_download_dot()]
[docs]
@abstractmethod
def get_download_dot(self) -> int:
"""Get the Datum Object Type to be used for download ref datum
Returns: the DOT for download ref datum
"""
pass
[docs]
@abstractmethod
def native_decompose(self, target: Any, manager: DatumManager = None) -> bytes:
pass
[docs]
@abstractmethod
def native_recompose(self, data: bytes, manager: DatumManager = None) -> Any:
pass
def _create_ref(self, target: Any, manager: DatumManager, fobs_ctx: dict):
# create a reference item for the target object. The ref item represents the target object in
# the serialized payload.
dc = fobs_ctx.get(self.decompose_ctx_key)
item_id, target_id = dc.add_item(target)
if dc.get_item_count() == 1:
# register the post_process callback to further process these items.
# only register cb once!
manager.register_post_cb(self._process_items_to_datum)
return item_id, target_id
def _create_downloadable(self, fobs_ctx: dict) -> Downloadable:
dc = fobs_ctx.get(self.decompose_ctx_key)
assert isinstance(dc, _DecomposeCtx)
items = dc.target_items
max_chunk_size = acu.get_int_var(
self._config_var_name(ConfigVarName.DOWNLOAD_CHUNK_SIZE),
self.max_chunk_size,
)
try:
return self.to_downloadable(items, max_chunk_size, fobs_ctx)
except Exception as e:
self.logger.error(f"Error converting {len(items)} items to Downloadable: {e}")
raise e
@staticmethod
def _determine_msg_root(fobs_ctx: dict):
msg_root_id = fobs_ctx.get(_CtxKey.MSG_ROOT_ID)
msg_root_ttl = fobs_ctx.get(_CtxKey.MSG_ROOT_TTL)
if not msg_root_id:
# try to get from msg
msg = fobs_ctx.get(fobs.FOBSContextKey.MESSAGE)
if msg:
msg_root_id = msg.get_header(MessageHeaderKey.MSG_ROOT_ID)
msg_root_ttl = msg.get_header(MessageHeaderKey.MSG_ROOT_TTL)
return msg_root_id, msg_root_ttl
[docs]
def decompose(self, target: Any, manager: DatumManager = None) -> Any:
if not manager:
# this should never happen
raise RuntimeError("FOBS System Error: missing DatumManager")
# ── LazyDownloadRef: re-emit the original server datum verbatim ────────
# A LazyDownloadRef was created in PASS_THROUGH mode when CJ received the
# task from the server. Instead of creating a *new* download transaction
# on *this* cell (which would make the subprocess download from CJ), we
# re-emit the exact datum that the server originally sent. The subprocess
# agent therefore downloads each tensor directly from the server, with no
# tensor data ever materialised on CJ.
if isinstance(target, LazyDownloadRef):
fobs_ctx = manager.fobs_ctx
lazy_batch_key = f"{self.prefix}{_LAZY_BATCH_CTX_SUFFIX}"
if lazy_batch_key not in fobs_ctx:
# First LazyDownloadRef of this batch: register a post-callback
# that will add the single shared datum (fqcn + ref_id) after all
# items have been serialised.
fobs_ctx[lazy_batch_key] = {"fqcn": target.fqcn, "ref_id": target.ref_id}
manager.register_post_cb(self._finalize_lazy_batch)
self.logger.debug(
f"ViaDownloader: re-emitting LazyDownloadRef {target.item_id=} " f"{target.fqcn=} {target.ref_id=}"
)
return {EncKey.TYPE: EncType.REF, EncKey.DATA: target.item_id}
max_chunk_size = acu.get_int_var(
self._config_var_name(ConfigVarName.DOWNLOAD_CHUNK_SIZE),
self.max_chunk_size,
)
fobs_ctx = manager.fobs_ctx
cell = fobs_ctx.get(fobs.FOBSContextKey.CELL)
if not cell:
# If no cell, only support native decomposers
fobs_ctx["native"] = True
use_native = fobs_ctx.get("native", False)
if max_chunk_size <= 0 or use_native:
# use native decompose
self.logger.debug("using native_decompose")
data = self.native_decompose(target, manager)
return {EncKey.TYPE: EncType.NATIVE, EncKey.DATA: data}
else:
self.logger.debug(f"using download decompose: {max_chunk_size=}")
# Create a DecomposeCtx for this target type.
# Note: there could be multiple target types - each target type has its own DecomposeCtx!
dc = fobs_ctx.get(self.decompose_ctx_key)
if not dc:
dc = _DecomposeCtx()
fobs_ctx[self.decompose_ctx_key] = dc
item_id, target_id = self._create_ref(target, manager, fobs_ctx)
self.logger.debug(f"ViaDownloader: created ref for target {target_id}: {item_id}")
return {EncKey.TYPE: EncType.REF, EncKey.DATA: item_id}
def _create_downloader(self, fobs_ctx: dict):
# Transaction lifecycle is managed solely by _monitor_tx() (download_service.py).
# We deliberately do NOT subscribe to msg_root deletion here. The msg_root is
# deleted as soon as all blobs are delivered, but blob_cb fires asynchronously —
# secondary tensor downloads are still in flight when msg_root is deleted.
# Subscribing caused a race: delete_transaction() removed refs from _ref_table
# before blob_cb could finish its _download_from_remote_cell() calls, producing
# "no ref found" FATAL_SYSTEM_ERROR (RC12 Bug 1).
# _monitor_tx() polls is_finished() every 5s and cleans up within 5s of the last
# receiver completing all chunk downloads — sufficient for all model sizes.
msg_root_id, msg_root_ttl = self._determine_msg_root(fobs_ctx)
# Read min_download_timeout from job config so operators can tune
# it per-job (e.g. np_min_download_timeout: 600 for a 70B model).
# Falls back to the module-level constant (60s) when not set.
min_timeout = acu.get_positive_float_var(
self._config_var_name(ConfigVarName.MIN_DOWNLOAD_TIMEOUT),
_MIN_DOWNLOAD_TIMEOUT,
)
if msg_root_ttl:
timeout = msg_root_ttl
else:
timeout = min_timeout
if timeout < min_timeout:
timeout = min_timeout
self.logger.debug(f"ViaDownloader: {msg_root_id=} {timeout=}")
downloader = None
cell = fobs_ctx.get(fobs.FOBSContextKey.CELL)
if cell:
num = fobs_ctx.get(fobs.FOBSContextKey.NUM_RECEIVERS)
num_receivers = num if num else 1
# Optional lifecycle callback set by FlareAgent._do_submit_result()
# (subprocess → CJ → server reverse path) so the subprocess can wait
# until the server has finished downloading from its DownloadService
# before exiting. None when no gating is needed (forward path).
on_complete_cb = fobs_ctx.get(fobs.FOBSContextKey.DOWNLOAD_COMPLETE_CB)
downloader = ObjectDownloader(
num_receivers=num_receivers,
cell=cell,
timeout=timeout,
transaction_done_cb=on_complete_cb,
)
return downloader
def _process_items_to_datum(self, mgr: DatumManager):
"""This method is called during serialization after all target items are serialized.
For primary msg, we turn the collected items into a file, and add file info as a Datum to the datum manager.
Args:
mgr:
Returns:
"""
fobs_ctx = mgr.fobs_ctx
dc = fobs_ctx.get(self.decompose_ctx_key)
assert isinstance(dc, _DecomposeCtx)
# create datum for the collected target items
# This is called once for each target object type!
# register the final CB to be called after the post_process.
# Note that the post_process (this CB) only generates files but does not create download transaction.
# For large object, file generation could take long time. If we create the download transaction, it may
# become expired even before file generation is done!
# This is why we do the file generation in this CB, and then create the transaction in the final_cb!
final_cb_registered = fobs_ctx.get(_CtxKey.FINAL_CB_REGISTERED)
if not final_cb_registered:
# register final_cb
mgr.register_post_cb(self._finalize_download_tx)
fobs_ctx[_CtxKey.FINAL_CB_REGISTERED] = True
try:
if not mgr.get_error():
datum = self._create_datum(fobs_ctx)
mgr.add_datum(datum)
except Exception as ex:
self.logger.error(f"exception creating datum: {ex}")
mgr.set_error(f"exception creating datum in {type(self)}")
def _config_var_name(self, base_name: str):
return f"{self.config_var_prefix}{base_name}"
def _create_datum(self, fobs_ctx: dict):
downloadable = self._create_downloadable(fobs_ctx)
cell = fobs_ctx.get(fobs.FOBSContextKey.CELL)
# use download DOT
# keep files in fobs_ctx
downloadable_objs = fobs_ctx.get(_CtxKey.OBJECTS)
if not downloadable_objs:
downloadable_objs = []
fobs_ctx[_CtxKey.OBJECTS] = downloadable_objs
# create a new ref id
ref_id = str(uuid.uuid4())
downloadable_objs.append((ref_id, downloadable))
ref = {
_RefKey.FQCN: cell.get_fqcn(),
_RefKey.REF_ID: ref_id,
}
self.logger.debug(f"ViaDownloader: created download ref for target type {self.__class__.__name__}: {ref=}")
datum = Datum(datum_type=DatumType.TEXT, value=json.dumps(ref), dot=self.get_download_dot())
return datum
def _finalize_download_tx(self, mgr: DatumManager):
self.logger.debug("ViaDownloader: finalizing download tx")
fobs_ctx = mgr.fobs_ctx
downloadable_objs = fobs_ctx.get(_CtxKey.OBJECTS)
if downloadable_objs:
downloader = self._create_downloader(fobs_ctx)
for ref_id, obj in downloadable_objs:
self.logger.debug(f"ViaDownloader: adding object to downloader: {ref_id=}")
downloader.add_object(obj, ref_id=ref_id)
# Signal FlareAgent (same thread) that a download transaction was created.
# Thread-local avoids shared-state races when task pipe and metric pipe
# share the same CoreCell (RC12 Bug 3).
_tls.download_initiated = True
def _finalize_lazy_batch(self, mgr: DatumManager):
"""Post-callback used when re-emitting a LazyDownloadRef batch.
Adds a single datum containing the *original* source FQCN and ref_id so
that the downstream consumer (subprocess agent) can download the tensors
directly from the originating cell (typically the FL server) without
involving the CJ at all.
"""
fobs_ctx = mgr.fobs_ctx
lazy_batch_key = f"{self.prefix}{_LAZY_BATCH_CTX_SUFFIX}"
lazy_batch = fobs_ctx.get(lazy_batch_key)
if not lazy_batch:
return
ref = {_RefKey.FQCN: lazy_batch["fqcn"], _RefKey.REF_ID: lazy_batch["ref_id"]}
datum = Datum(datum_type=DatumType.TEXT, value=json.dumps(ref), dot=self.get_download_dot())
self.logger.debug(
f"ViaDownloader: finalized lazy batch datum for {lazy_batch['fqcn']=} {lazy_batch['ref_id']=}"
)
mgr.add_datum(datum)
[docs]
def process_datum(self, datum: Datum, manager: DatumManager):
"""This is called by the manager to process a datum that has a DOT.
This happens before the recompose processing.
The datum contains information about where the data is:
For bytes DOT, the data is included in the datum directly.
For file DOT, the data is in a file, and the location of the file is further specified:
- If the location is local, then the file is on local file system;
- If the location is remote_cell, then the file is on a remote cell, and needs to be downloaded.
Args:
datum: datum to be processed.
manager: the datum manager.
Returns: None
"""
self.logger.debug(f"ViaDownloader: pre-processing datum {datum.dot=} before recompose")
fobs_ctx = manager.fobs_ctx
if fobs_ctx.get(fobs.FOBSContextKey.PASS_THROUGH):
# PASS_THROUGH mode: do NOT download tensors at this intermediate hop.
# Store the batch (fqcn, ref_id) so that recompose() can build a
# LazyDownloadRef for each item_id it encounters. The downstream
# consumer (subprocess agent) will resolve the references directly
# from the originating source cell.
ref = json.loads(datum.value)
self.logger.debug(f"ViaDownloader PASS_THROUGH: preserving lazy ref {ref} instead of downloading")
fobs_ctx[self.items_key] = _LazyBatchInfo(ref[_RefKey.FQCN], ref[_RefKey.REF_ID], datum.dot)
return
# data is to be downloaded
ref = json.loads(datum.value)
items = self._download_from_remote_cell(manager.fobs_ctx, ref)
fobs_ctx[self.items_key] = items
[docs]
def recompose(self, data: Any, manager: DatumManager = None) -> Any:
if not manager:
# should never happen!
raise RuntimeError("missing DatumManager")
if not isinstance(data, dict):
self.logger.error(f"data to be recomposed should be dict but got {type(data)}")
raise RuntimeError("FOBS protocol error")
enc_type = data.get(EncKey.TYPE)
data = data.get(EncKey.DATA)
if not data:
self.logger.error("missing 'data' property from the recompose data")
raise RuntimeError("FOBS protocol error")
if enc_type == EncType.NATIVE:
self.logger.debug("using native_recompose")
return self.native_recompose(data, manager)
elif enc_type != EncType.REF:
self.logger.error(f"invalid enc_type {enc_type} in recompose data")
raise RuntimeError("FOBS protocol error")
if not isinstance(data, str):
self.logger.error(f"ref data must be str but got {type(data)}")
raise RuntimeError("FOBS protocol error")
# data is the item id
tid = threading.get_ident()
self.logger.debug(f"ViaDownloader: {tid=} recomposing data item {data}")
item_id = data
fobs_ctx = manager.fobs_ctx
items = fobs_ctx.get(self.items_key)
# PASS_THROUGH mode: items_key holds a _LazyBatchInfo sentinel, not a dict.
# Build a LazyDownloadRef so the reference can be forwarded verbatim.
# Carry items.dot so that LazyDownloadRefDecomposer can route back to the
# correct ViaDownloaderDecomposer subclass during subprocess recompose().
if isinstance(items, _LazyBatchInfo):
lazy = LazyDownloadRef(fqcn=items.fqcn, ref_id=items.ref_id, item_id=item_id, dot=items.dot)
self.logger.debug(
f"ViaDownloader PASS_THROUGH: created LazyDownloadRef {item_id=} "
f"{items.fqcn=} {items.ref_id=} {items.dot=}"
)
return lazy
if items is None:
self.logger.error(f"cannot find item {item_id} because no downloaded data is loaded")
raise RuntimeError(f"FOBS download data is missing for item {item_id}")
self.logger.debug(f"trying to get item for {item_id=} from {type(items)=}")
make_lazy_ref_fn = getattr(items, "make_lazy_ref", None)
if callable(make_lazy_ref_fn) and item_id in items:
item = make_lazy_ref_fn(item_id)
self.logger.debug(f"{tid=} created lazy ref for {item_id}")
return item
get_item_fn = getattr(items, "get", None)
if not callable(get_item_fn):
self.logger.error(f"downloaded data for {item_id} does not support get(): {type(items)}")
raise RuntimeError(f"FOBS download data has invalid type for item {item_id}")
if hasattr(items, "__contains__") and item_id not in items:
self.logger.error(f"cannot find item {item_id} from loaded data")
raise RuntimeError(f"FOBS download data is incomplete: item {item_id} is missing")
item = items.get(item_id)
self.logger.debug(f"{tid=} found item {item_id}: {type(item)}")
if item is None:
self.logger.error(f"downloaded item {item_id} is None")
raise RuntimeError(f"FOBS download data is incomplete: item {item_id} is None")
return item
def _download_from_remote_cell(self, fobs_ctx: dict, ref: dict):
self.logger.debug(f"trying to download from remote cell for {ref=}")
cell = fobs_ctx.get(fobs.FOBSContextKey.CELL)
if not cell:
self.logger.error("cannot download from remote cell since cell not available in fobs context")
raise RuntimeError("FOBS Protocol Error")
ref_id = ref.get(_RefKey.REF_ID)
if not ref_id:
self.logger.error(f"missing {_RefKey.REF_ID} from {ref}")
raise RuntimeError("FOBS Protocol Error")
fqcn = ref.get(_RefKey.FQCN)
if not fqcn:
self.logger.error(f"missing {_RefKey.FQCN} from {ref}")
raise RuntimeError("FOBS Protocol Error")
req_timeout = fobs_ctx.get(fobs.FOBSContextKey.DOWNLOAD_REQ_TIMEOUT, None)
if not req_timeout:
req_timeout = acu.get_positive_float_var(
self._config_var_name(ConfigVarName.STREAMING_PER_REQUEST_TIMEOUT), 600.0
)
self.logger.debug(f"DOWNLOAD_REQ_TIMEOUT={req_timeout}")
abort_signal = fobs_ctx.get(fobs.FOBSContextKey.ABORT_SIGNAL)
self.logger.debug(f"trying to download: {ref_id=} {fqcn=}")
err, items = self.download(
from_fqcn=fqcn,
ref_id=ref_id,
per_request_timeout=req_timeout,
cell=cell,
abort_signal=abort_signal,
)
if err:
self.logger.error(f"failed to download from {fqcn} for source {ref}: {err}")
raise RuntimeError(f"failed to download from {fqcn}")
else:
self.logger.debug(f"downloaded {len(items)} items successfully")
return items
[docs]
class LazyDownloadRefDecomposer(fobs.Decomposer):
"""Decomposer that serialises and deserialises :class:`LazyDownloadRef` objects.
``LazyDownloadRef`` objects are created at a forwarding hop (e.g. the CJ
process) when ``FOBSContextKey.PASS_THROUGH`` is set. Instead of
downloading tensors from the FL server, each tensor is represented as a
lightweight placeholder that carries the original server FQCN, batch
ref_id, item_id, and the Datum Object Type (``dot``) of the originating
``ViaDownloaderDecomposer`` subclass.
When the forwarding node re-serialises the task for the subprocess agent,
FOBS routes each ``LazyDownloadRef`` to this decomposer.
**decompose()**
Delegates to the ``ViaDownloaderDecomposer`` subclass identified by
``lazy.dot``. That handler's ``decompose()`` re-emits the original
server batch datum (fqcn / ref_id) via a post-callback so the
subprocess knows exactly where to download from. ``lazy_dot`` is
appended to the returned encoding dict so ``recompose()`` can route
back to the same handler.
**recompose()**
Uses ``lazy_dot`` to look up the original handler and delegates to
``handler.recompose()``. At the subprocess, ``process_datum()`` has
already populated ``fobs_ctx[handler.items_key]`` with the downloaded
tensors, so the call returns the real tensor value directly.
"""
[docs]
def supported_type(self):
return LazyDownloadRef
[docs]
def decompose(self, lazy: LazyDownloadRef, manager: DatumManager = None) -> dict:
handler = fobs.get_dot_handler(lazy.dot)
if not handler:
raise RuntimeError(
f"LazyDownloadRefDecomposer: no DOT handler registered for dot={lazy.dot!r}. "
"Ensure the original ViaDownloaderDecomposer subclass (e.g. NumpyArrayDecomposer) "
"is registered before serialising LazyDownloadRef objects."
)
result = handler.decompose(lazy, manager)
result["lazy_dot"] = lazy.dot
return result
[docs]
def recompose(self, data: dict, manager: DatumManager = None) -> Any:
lazy_dot = data.get("lazy_dot")
if lazy_dot is None:
raise RuntimeError("LazyDownloadRefDecomposer: missing 'lazy_dot' in encoded data")
handler = fobs.get_dot_handler(lazy_dot)
if not handler:
raise RuntimeError(f"LazyDownloadRefDecomposer: no DOT handler registered for lazy_dot={lazy_dot!r}")
return handler.recompose(data, manager)