Source code for nvflare.app_opt.tf.client_api_launcher_executor

# Copyright (c) 2023, NVIDIA CORPORATION.  All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import Optional

from nvflare.apis.fl_context import FLContext
from nvflare.app_common.app_constant import AppConstants
from nvflare.app_common.executors.client_api_launcher_executor import ClientAPILauncherExecutor
from nvflare.app_opt.tf.params_converter import KerasModelToNumpyParamsConverter, NumpyToKerasModelParamsConverter
from nvflare.client.config import ExchangeFormat, TransferType
from nvflare.client.constants import CLIENT_API_CONFIG


[docs] class TFClientAPILauncherExecutor(ClientAPILauncherExecutor): def __init__( self, pipe_id: str, launcher_id: Optional[str] = None, launch_timeout: Optional[float] = None, task_wait_timeout: Optional[float] = None, last_result_transfer_timeout: float = 300.0, external_pre_init_timeout: float = 60.0, peer_read_timeout: Optional[float] = 60.0, monitor_interval: float = 0.01, read_interval: float = 0.5, heartbeat_interval: float = 5.0, heartbeat_timeout: float = 60.0, workers: int = 4, train_with_evaluation: bool = False, train_task_name: str = AppConstants.TASK_TRAIN, evaluate_task_name: str = AppConstants.TASK_VALIDATION, submit_model_task_name: str = AppConstants.TASK_SUBMIT_MODEL, from_nvflare_converter_id: Optional[str] = None, to_nvflare_converter_id: Optional[str] = None, server_expected_format: str = ExchangeFormat.NUMPY, params_exchange_format: str = ExchangeFormat.KERAS_LAYER_WEIGHTS, params_transfer_type: str = TransferType.FULL, config_file_name: str = CLIENT_API_CONFIG, ) -> None: ClientAPILauncherExecutor.__init__( self, pipe_id=pipe_id, launcher_id=launcher_id, launch_timeout=launch_timeout, task_wait_timeout=task_wait_timeout, last_result_transfer_timeout=last_result_transfer_timeout, external_pre_init_timeout=external_pre_init_timeout, peer_read_timeout=peer_read_timeout, monitor_interval=monitor_interval, read_interval=read_interval, heartbeat_interval=heartbeat_interval, heartbeat_timeout=heartbeat_timeout, workers=workers, train_with_evaluation=train_with_evaluation, train_task_name=train_task_name, evaluate_task_name=evaluate_task_name, submit_model_task_name=submit_model_task_name, from_nvflare_converter_id=from_nvflare_converter_id, to_nvflare_converter_id=to_nvflare_converter_id, server_expected_format=server_expected_format, params_exchange_format=params_exchange_format, params_transfer_type=params_transfer_type, config_file_name=config_file_name, )
[docs] def initialize(self, fl_ctx: FLContext) -> None: super().initialize(fl_ctx) if ( self._server_expected_format == ExchangeFormat.NUMPY and self._params_exchange_format == ExchangeFormat.KERAS_LAYER_WEIGHTS ): if self._from_nvflare_converter is None: self._from_nvflare_converter = NumpyToKerasModelParamsConverter( [AppConstants.TASK_TRAIN, AppConstants.TASK_VALIDATION] ) if self._to_nvflare_converter is None: self._to_nvflare_converter = KerasModelToNumpyParamsConverter( [AppConstants.TASK_TRAIN, AppConstants.TASK_SUBMIT_MODEL] )