Source code for nvflare.fuel.utils.fobs.decomposer

# Copyright (c) 2022, NVIDIA CORPORATION.  All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import inspect
from abc import ABC, abstractmethod
from enum import Enum
from functools import lru_cache
from typing import Any, List, Optional, Type, TypeVar

# Generic type supported by the decomposer.
from nvflare.fuel.utils.fobs.datum import Datum, DatumManager

T = TypeVar("T")

DICT_CONTENT = "dict"
DATA_CONTENT = "data"
LIST_CONTENT = "list"


# Bounded so the cache cannot grow without limit if dynamically-generated classes are serialized;
# FOBS-handled types are effectively a small, fixed set so this is never a practical constraint.
@lru_cache(maxsize=1024)
def _requires_init_args(cls: type) -> bool:
    """Return True if ``cls`` cannot be instantiated without arguments.

    Builtin containers like ``dict``/``list`` expose no introspectable signature but do accept
    no-arg construction, so they return False. The result is cached per class since it is purely a
    function of the class definition. Note this inspects the call signature only; it deliberately
    does not run ``__init__``, so a constructor that accepts no args but raises internally is still
    reported as no-arg-constructible (its error is allowed to surface at call time, not swallowed).
    This reflects ``__new__`` when ``__init__`` is inherited; a class that requires args in
    ``__new__`` is out of scope (it violates DataClassDecomposer requirement #2 and cannot be rebuilt
    via ``cls.__new__(cls)`` anyway).
    """
    try:
        signature = inspect.signature(cls)
    except (ValueError, TypeError):
        return False
    try:
        signature.bind()
    except TypeError:
        return True
    return False


[docs] def new_empty_container(cls: type): """Create an empty instance of ``cls`` to be repopulated during (de)serialization. The exact (sub)class type is preserved so ``strict_types`` packing routes the value back through its decomposer instead of losing the type. When ``cls.__init__`` requires positional arguments (e.g. dict-based edge models like ``SelectionRequest(device_info, job_id)``), ``__init__`` is bypassed via ``__new__`` since only an empty container is needed. Constructor state such as a defaultdict's default_factory is not preserved in that case. """ if _requires_init_args(cls): return cls.__new__(cls) return cls()
[docs] class Decomposer(ABC): """Abstract base class for decomposers. Every class to be serialized by FOBS must register a decomposer which is a concrete subclass of this class. """
[docs] @abstractmethod def supported_type(self) -> Type[T]: """Returns the type/class supported by this decomposer. Returns: The class (not instance) of supported type """ pass
[docs] def supported_dots(self) -> Optional[List[int]]: """Return the Datum Object Types supported by this decomposer. If a DOT is returned, this decomposer's process_datum method will be called for any datum whose DOT matches this DOT. Returns: None or list of DOTs """ pass
[docs] def process_datum(self, datum: Datum, manager: DatumManager): """This method will be called during message deserialization to process the specified datum. Args: datum: the datum to be processed manager: the datum manger Returns: None """ pass
[docs] @abstractmethod def decompose(self, target: T, manager: DatumManager = None) -> Any: """Decompose the target into types supported by msgpack or classes with decomposers registered. Msgpack supports primitives, bytes, memoryview, lists, dicts. Args: target: The instance to be serialized manager: Datum manager to store externalized datum Returns: The decomposed serializable objects """ pass
[docs] @abstractmethod def recompose(self, data: Any, manager: DatumManager = None) -> T: """Reconstruct the object from decomposed components. Args: data: The decomposed component manager: Datum manager to internalize datum Returns: The reconstructed object """ pass
[docs] class Externalizer: """ This class is used to help creating 'decompose' method of decomposers of arbitrary classes. """ def __init__(self, manager: DatumManager): self.manager = manager
[docs] def externalize(self, target: Any): """Recursively externalize leaf nodes without mutating the source containers. Dict/list subclasses are reconstructed at every level (see ``new_empty_container``), so nested container subclasses (both dict and list) may not preserve constructor state such as a defaultdict's default_factory or the capacity/initializer argument of a list subclass. """ if not self.manager: return target if isinstance(target, dict): new_target = new_empty_container(type(target)) for k, v in target.items(): new_target[k] = self.externalize(v) return new_target elif isinstance(target, list): # Note: tuple is not supported since it is immutable. new_target = new_empty_container(type(target)) for v in target: new_target.append(self.externalize(v)) return new_target else: return self.manager.externalize(target)
[docs] class Internalizer: """ This class is used to help creating 'recompose' method of decomposers of arbitrary classes. """ def __init__(self, manager: DatumManager): self.manager = manager
[docs] def internalize(self, target) -> Any: """Recursively go through object tree (dict or list) and internalize leaf nodes.""" if not self.manager: return target if isinstance(target, dict): for k, v in target.items(): target[k] = self.internalize(v) elif isinstance(target, list): for i, v in enumerate(target): target[i] = self.internalize(v) else: target = self.manager.internalize(target) return target
[docs] class DictDecomposer(Decomposer): """Generic decomposer for subclasses of dict like Shareable""" def __init__(self, dict_type: Type[dict]): self.dict_type = dict_type
[docs] def supported_type(self): return self.dict_type
[docs] def decompose(self, target: dict, manager: DatumManager = None) -> Any: # Convert the top-level dict subclass to a plain dict. Externalizer preserves dict/list subclass types, # including nested subclasses, so starting from a plain copy prevents msgpack from re-entering this decomposer. tc = target.copy() externalizer = Externalizer(manager) return externalizer.externalize(tc)
[docs] def recompose(self, data: dict, manager: DatumManager = None) -> dict: internalizer = Internalizer(manager) data = internalizer.internalize(data) # Mirror the encode side (Externalizer): bypass __init__ for dict subclasses whose # constructor requires arguments, so decode does not raise the same TypeError. obj = new_empty_container(self.dict_type) for k, v in data.items(): obj[k] = v return obj
[docs] class DataClassDecomposer(Decomposer): """Generic decomposers for data classes, which must meet following requirements: 1. All class members must be serializable. The type of member must be one of the types supported by MessagePack or a decomposer is registered for the type. 2. The __new__ method only takes one argument which is the class type. 3. The __init__ method has no side effects. It can only change the states of the object. The side effects include creating files, initializing loggers, modifying global variables. Instance ``__dict__`` attributes are always captured; ``dict`` and ``list`` subclass contents are additionally captured so the elements survive the round-trip (the instance is rebuilt via ``__new__``, so ``__init__`` is never invoked to repopulate them). """ def __init__(self, data_type: Type[T]): self.data_type = data_type
[docs] def supported_type(self) -> Type[T]: return self.data_type
[docs] def decompose(self, target: T, manager: DatumManager = None) -> Any: data = {} if hasattr(target, "__dict__"): data[DATA_CONTENT] = vars(target) if isinstance(target, dict): data[DICT_CONTENT] = dict(target) if isinstance(target, list): data[LIST_CONTENT] = list(target) return data
[docs] def recompose(self, data: dict, manager: DatumManager = None) -> T: instance = self.data_type.__new__(self.data_type) data_content = data.get(DATA_CONTENT, None) if data_content is not None: instance.__dict__.update(data_content) dict_content = data.get(DICT_CONTENT, None) if dict_content is not None: instance.update(dict_content) list_content = data.get(LIST_CONTENT, None) if list_content is not None: instance.extend(list_content) return instance
[docs] class EnumTypeDecomposer(Decomposer): """Generic decomposers for enum types.""" def __init__(self, data_type: Type[Enum]): if not issubclass(data_type, Enum): raise TypeError(f"{data_type} is not an enum") self.data_type = data_type
[docs] def supported_type(self) -> Type[Enum]: return self.data_type
[docs] def decompose(self, target: Enum, manager: DatumManager = None) -> Any: return target.name
[docs] def recompose(self, data: Any, manager: DatumManager = None) -> Enum: return self.data_type[data]