Source code for nvflare.fuel.utils.fobs.decomposer

# Copyright (c) 2022, NVIDIA CORPORATION.  All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from abc import ABC, abstractmethod
from enum import Enum
from typing import Any, Type, TypeVar

# Generic type supported by the decomposer.
from nvflare.fuel.utils.fobs.datum import Datum, DatumManager, DatumRef, DatumType

T = TypeVar("T")


[docs]class Decomposer(ABC): """Abstract base class for decomposers. Every class to be serialized by FOBS must register a decomposer which is a concrete subclass of this class. """
[docs] @abstractmethod def supported_type(self) -> Type[T]: """Returns the type/class supported by this decomposer. Returns: The class (not instance) of supported type """ pass
[docs] @abstractmethod def decompose(self, target: T, manager: DatumManager = None) -> Any: """Decompose the target into types supported by msgpack or classes with decomposers registered. Msgpack supports primitives, bytes, memoryview, lists, dicts. Args: target: The instance to be serialized manager: Datum manager to store externalized datum Returns: The decomposed serializable objects """ pass
[docs] @abstractmethod def recompose(self, data: Any, manager: DatumManager = None) -> T: """Reconstruct the object from decomposed components. Args: data: The decomposed component manager: Datum manager to internalize datum Returns: The reconstructed object """ pass
[docs]def restore_position(manager: DatumManager, datum: Datum, position): """ This function is used for restoring object state at the specified position. Args: manager: the datum manager datum: the datum that contains the value of the original object at the position. position: the position to be restored Returns: None """ target, key = position original_obj = manager.get_original(target) # also need to restore values in the original object if any if datum.datum_type in (DatumType.BLOB, DatumType.TEXT): target[key] = datum.value if original_obj: original_obj[key] = datum.value else: # file datum - app provided target[key] = datum if original_obj: original_obj[key] = datum
[docs]class Externalizer: """ This class is used to help creating 'decompose' method of decomposers of arbitrary classes. """ def __init__(self, manager: DatumManager): self.manager = manager def _set_position(self, ext_result: Any, target, key): if isinstance(ext_result, DatumRef): datum = self.manager.get_datum(ext_result.datum_id) if datum: datum.set_restore_func(restore_position, (target, key))
[docs] def externalize(self, target: Any): """Recursively go through object tree (dict or list) and externalize leaf nodes.""" if not self.manager: return target if isinstance(target, dict): for k, v in target.items(): d = self.externalize(v) target[k] = d self._set_position(d, target, k) # remember the position so it can be restored elif isinstance(target, list): # note: tuple is not supported since it is immutable. for i, v in enumerate(target): d = self.externalize(v) target[i] = d self._set_position(d, target, i) else: # leaf node target = self.manager.externalize(target) return target
[docs]class Internalizer: """ This class is used to help creating 'recompose' method of decomposers of arbitrary classes. """ def __init__(self, manager: DatumManager): self.manager = manager
[docs] def internalize(self, target) -> Any: """Recursively go through object tree (dict or list) and internalize leaf nodes.""" if not self.manager: return target if isinstance(target, dict): for k, v in target.items(): target[k] = self.internalize(v) elif isinstance(target, list): for i, v in enumerate(target): target[i] = self.internalize(v) else: target = self.manager.internalize(target) return target
[docs]class DictDecomposer(Decomposer): """Generic decomposer for subclasses of dict like Shareable""" def __init__(self, dict_type: Type[dict]): self.dict_type = dict_type
[docs] def supported_type(self): return self.dict_type
[docs] def decompose(self, target: dict, manager: DatumManager = None) -> Any: # need to create a new object; otherwise msgpack will try to decompose this object endlessly. tc = target.copy() manager.register_copy(tc, target) externalizer = Externalizer(manager) return externalizer.externalize(tc)
[docs] def recompose(self, data: dict, manager: DatumManager = None) -> dict: internalizer = Internalizer(manager) data = internalizer.internalize(data) obj = self.dict_type() for k, v in data.items(): obj[k] = v return obj
[docs]class DataClassDecomposer(Decomposer): """Generic decomposers for data classes, which must meet following requirements: 1. All class members must be serializable. The type of member must be one of the types supported by MessagePack or a decomposer is registered for the type. 2. The __new__ method only takes one argument which is the class type. 3. The __init__ method has no side effects. It can only change the states of the object. The side effects include creating files, initializing loggers, modifying global variables. """ def __init__(self, data_type: Type[T]): self.data_type = data_type
[docs] def supported_type(self) -> Type[T]: return self.data_type
[docs] def decompose(self, target: T, manager: DatumManager = None) -> Any: return vars(target)
[docs] def recompose(self, data: dict, manager: DatumManager = None) -> T: instance = self.data_type.__new__(self.data_type) instance.__dict__.update(data) return instance
[docs]class EnumTypeDecomposer(Decomposer): """Generic decomposers for enum types.""" def __init__(self, data_type: Type[Enum]): if not issubclass(data_type, Enum): raise TypeError(f"{data_type} is not an enum") self.data_type = data_type
[docs] def supported_type(self) -> Type[Enum]: return self.data_type
[docs] def decompose(self, target: Enum, manager: DatumManager = None) -> Any: return target.name
[docs] def recompose(self, data: Any, manager: DatumManager = None) -> Enum: return self.data_type[data]