# Copyright (c) 2021-2022, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Decomposers for objects used by NVFlare itself
This module contains all the decomposers used to run NVFlare.
The decomposers are registered at server/client startup.
"""
import os
from argparse import Namespace
from typing import Any
from nvflare.apis.analytix import AnalyticsDataType
from nvflare.apis.client import Client
from nvflare.apis.dxo import DXO
from nvflare.apis.fl_context import FLContext
from nvflare.apis.fl_snapshot import RunSnapshot
from nvflare.apis.shareable import Shareable
from nvflare.apis.signal import Signal
from nvflare.apis.workspace import Workspace
from nvflare.fuel.utils import fobs
from nvflare.fuel.utils.fobs.decomposer import Decomposer
[docs]class ShareableDecomposer(Decomposer):
[docs] @staticmethod
def supported_type():
return Shareable
[docs] def decompose(self, target: Shareable) -> Any:
return target.copy()
[docs] def recompose(self, data: Any) -> Shareable:
obj = Shareable()
for k, v in data.items():
obj[k] = v
return obj
[docs]class ContextDecomposer(Decomposer):
[docs] @staticmethod
def supported_type():
return FLContext
[docs] def decompose(self, target: FLContext) -> Any:
return [target.model, target.props]
[docs] def recompose(self, data: Any) -> FLContext:
obj = FLContext()
obj.model = data[0]
obj.props = data[1]
return obj
[docs]class DxoDecomposer(Decomposer):
[docs] @staticmethod
def supported_type():
return DXO
[docs] def decompose(self, target: DXO) -> Any:
return [target.data_kind, target.data, target.meta]
[docs] def recompose(self, data: Any) -> DXO:
return DXO(data[0], data[1], data[2])
[docs]class ClientDecomposer(Decomposer):
[docs] @staticmethod
def supported_type():
return Client
[docs] def decompose(self, target: Client) -> Any:
return [target.name, target.token, target.last_connect_time, target.props]
[docs] def recompose(self, data: Any) -> Client:
client = Client(data[0], data[1])
client.last_connect_time = data[2]
client.props = data[3]
return client
[docs]class RunSnapshotDecomposer(Decomposer):
[docs] @staticmethod
def supported_type():
return RunSnapshot
[docs] def decompose(self, target: RunSnapshot) -> Any:
return [target.component_states, target.completed, target.job_id]
[docs] def recompose(self, data: Any) -> RunSnapshot:
snapshot = RunSnapshot(data[2])
snapshot.component_states = data[0]
snapshot.completed = data[1]
return snapshot
[docs]class WorkspaceDecomposer(Decomposer):
[docs] @staticmethod
def supported_type():
return Workspace
[docs] def decompose(self, target: Workspace) -> Any:
return [target.root_dir, target.name, target.config_folder]
[docs] def recompose(self, data: Any) -> Workspace:
return Workspace(data[0], data[1], data[2])
[docs]class SignalDecomposer(Decomposer):
[docs] @staticmethod
def supported_type():
return Signal
[docs] def decompose(self, target: Signal) -> Any:
return [target.value, target.trigger_time, target.triggered]
[docs] def recompose(self, data: Any) -> Signal:
signal = Signal()
signal.value = data[0]
signal.trigger_time = data[1]
signal.triggered = data[2]
return signal
[docs]class AnalyticsDataTypeDecomposer(Decomposer):
[docs] @staticmethod
def supported_type():
return AnalyticsDataType
[docs] def decompose(self, target: AnalyticsDataType) -> Any:
return target.name
[docs] def recompose(self, data: Any) -> AnalyticsDataType:
return AnalyticsDataType[data]
[docs]class NamespaceDecomposer(Decomposer):
[docs] @staticmethod
def supported_type():
return Namespace
[docs] def decompose(self, target: Namespace) -> Any:
return vars(target)
[docs] def recompose(self, data: Any) -> Namespace:
return Namespace(**data)
[docs]def register():
if register.registered:
return
fobs.register_folder(os.path.dirname(__file__), __package__)
register.registered = True
register.registered = False