Source code for nvflare.app_common.decomposers.numpy_decomposers

# Copyright (c) 2024, NVIDIA CORPORATION.  All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Decomposers for types from app_common and Machine Learning libraries."""
import os
from abc import ABC
from io import BytesIO
from typing import Any, Tuple

import numpy as np

import nvflare.fuel.utils.fobs.dots as dots
from nvflare.app_common.np.np_downloader import ArrayDownloadable, download_arrays
from nvflare.fuel.f3.cellnet.cell import Cell
from nvflare.fuel.f3.streaming.download_service import Downloadable
from nvflare.fuel.utils import fobs
from nvflare.fuel.utils.fobs.datum import DatumManager
from nvflare.fuel.utils.fobs.decomposers.via_downloader import ViaDownloaderDecomposer

_NPZ_EXTENSION = ".npz"


[docs] class NumpyScalarDecomposer(fobs.Decomposer, ABC): """Decomposer base class for all numpy types with item method."""
[docs] def decompose(self, target: Any, manager: DatumManager = None) -> Any: return target.item()
[docs] def recompose(self, data: Any, manager: DatumManager = None) -> np.ndarray: return self.supported_type()(data)
[docs] class Float64ScalarDecomposer(NumpyScalarDecomposer):
[docs] def supported_type(self): return np.float64
[docs] class Float32ScalarDecomposer(NumpyScalarDecomposer):
[docs] def supported_type(self): return np.float32
[docs] class Int64ScalarDecomposer(NumpyScalarDecomposer):
[docs] def supported_type(self): return np.int64
[docs] class Int32ScalarDecomposer(NumpyScalarDecomposer):
[docs] def supported_type(self): return np.int32
[docs] class NumpyArrayDecomposer(ViaDownloaderDecomposer): def __init__(self): ViaDownloaderDecomposer.__init__(self, 1024 * 1024 * 2, "np_")
[docs] def supported_type(self): return np.ndarray
[docs] def get_download_dot(self) -> int: return dots.NUMPY_DOWNLOAD
[docs] def to_downloadable(self, items: dict, max_chunk_size: int, fobs_ctx: dict) -> Downloadable: return ArrayDownloadable(items, max_chunk_size)
[docs] def download( self, from_fqcn: str, ref_id: str, per_request_timeout: float, cell: Cell, secure=False, optional=False, abort_signal=None, ) -> Tuple[str, dict]: return download_arrays( from_fqcn, ref_id, per_request_timeout, cell, secure, optional, abort_signal, )
[docs] def native_decompose(self, target: np.ndarray, manager: DatumManager = None) -> bytes: stream = BytesIO() np.save(stream, target, allow_pickle=False) return stream.getvalue()
[docs] def native_recompose(self, data: bytes, manager: DatumManager = None) -> np.ndarray: stream = BytesIO(data) return np.load(stream, allow_pickle=False)
[docs] def register(): if register.registered: return fobs.register_folder(os.path.dirname(__file__), __package__) register.registered = True
register.registered = False