Source code for nvflare.fuel.hci.client.fl_admin_api_runner

# Copyright (c) 2021, NVIDIA CORPORATION.  All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import os
import time

from nvflare.apis.workspace import Workspace
from nvflare.fuel.common.excepts import ConfigError
from nvflare.fuel.hci.client.api_spec import AdminConfigKey, UidSource
from nvflare.fuel.hci.client.config import secure_load_admin_config
from nvflare.fuel.hci.client.fl_admin_api import FLAdminAPI
from nvflare.fuel.hci.client.fl_admin_api_spec import TargetType
from nvflare.security.logging import secure_format_exception


[docs] def api_command_wrapper(api_command_result): """Prints the result of the command and raises RuntimeError to interrupt command sequence if there is an error. Args: api_command_result: result of the api command """ print(api_command_result) if not api_command_result["status"] == "SUCCESS": raise RuntimeError("command was not successful!") return api_command_result
[docs] class FLAdminAPIRunner: def __init__( self, username, admin_dir, poc=False, debug=False, ): """Initializes and logs into an FLAdminAPI instance. The default locations for certs, keys, and directories are used. Args: username: string of username to log in with admin_dir: string of root admin dir containing the startup dir debug: whether to turn on debug mode """ assert isinstance(username, str), "username must be str" self.username = username assert isinstance(admin_dir, str), "admin_dir must be str" if debug: debug = True try: os.chdir(admin_dir) workspace = Workspace(root_dir=admin_dir) conf = secure_load_admin_config(workspace) except ConfigError as e: print(f"ConfigError: {secure_format_exception(e)}") return admin_config = conf.get_admin_config() if not admin_config: print(f"Missing '{AdminConfigKey.ADMIN}' section in fed_admin configuration.") return upload_dir = admin_config.get(AdminConfigKey.UPLOAD_DIR) download_dir = admin_config.get(AdminConfigKey.DOWNLOAD_DIR) if download_dir and not os.path.isdir(download_dir): os.makedirs(download_dir) # Connect with admin client if poc: admin_config[AdminConfigKey.UID_SOURCE] = UidSource.CERT self.api = FLAdminAPI( admin_config=admin_config, upload_dir=upload_dir, download_dir=download_dir, user_name=username, debug=debug, ) self.api.connect(timeout=5.0) self.api.login()
[docs] def run( self, job_folder_name, ): """An example script to upload, deploy, and start a specified app. Note that the app folder must be in upload_dir already. Prints the command to be executed first so it is easy to follow along as the commands run. Args: job_folder_name: name of job folder to submit, either relative to the upload_dir specified in the fed_admin.json config, or absolute path """ try: print("api.check_status(TargetType.SERVER)") api_command_wrapper(self.api.check_status(TargetType.SERVER)) print(f'api.submit_job("{job_folder_name}")') api_command_wrapper(self.api.submit_job(job_folder_name)) time.sleep(1) print("api.check_status(TargetType.SERVER)") api_command_wrapper(self.api.check_status(TargetType.SERVER)) # The following wait_until can be put into a loop that has other behavior other than waiting until clients # are in a status of stopped. For this code, the app is expected to stop, or this may not end. print("api.wait_until_client_status()") wait_result = api_command_wrapper(self.api.wait_until_client_status()) print(wait_result) print("api.check_status(TargetType.SERVER)") api_command_wrapper(self.api.check_status(TargetType.SERVER)) # now server engine status should be stopped time.sleep(10) # wait for clients to stop in case they take longer than server to stop print("api.check_status(TargetType.CLIENT)") api_command_wrapper(self.api.check_status(TargetType.CLIENT)) except RuntimeError as e: print(f"There was an exception: {secure_format_exception(e)}")
[docs] def close(self): self.api.logout()