# Copyright (c) 2021, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import time
from nvflare.apis.workspace import Workspace
from nvflare.fuel.common.excepts import ConfigError
from nvflare.fuel.hci.client.fl_admin_api import FLAdminAPI
from nvflare.fuel.hci.client.fl_admin_api_spec import TargetType
from nvflare.private.fed.app.fl_conf import FLAdminClientStarterConfigurator
from nvflare.security.logging import secure_format_exception
[docs]def api_command_wrapper(api_command_result):
"""Prints the result of the command and raises RuntimeError to interrupt command sequence if there is an error.
Args:
api_command_result: result of the api command
"""
print(api_command_result)
if not api_command_result["status"] == "SUCCESS":
raise RuntimeError("command was not successful!")
return api_command_result
[docs]class FLAdminAPIRunner:
def __init__(
self,
username,
admin_dir,
poc=False,
debug=False,
):
"""Initializes and logs into an FLAdminAPI instance.
The default locations for certs, keys, and directories are used.
Args:
username: string of username to log in with
admin_dir: string of root admin dir containing the startup dir
poc: whether to run in poc mode without SSL certs
debug: whether to turn on debug mode
"""
assert isinstance(username, str), "username must be str"
self.username = username
assert isinstance(admin_dir, str), "admin_dir must be str"
if poc:
self.poc = True
else:
self.poc = False
if debug:
debug = True
try:
os.chdir(admin_dir)
workspace = Workspace(root_dir=admin_dir)
conf = FLAdminClientStarterConfigurator(workspace)
conf.configure()
except ConfigError as e:
print(f"ConfigError: {secure_format_exception(e)}")
return
try:
admin_config = conf.config_data["admin"]
except KeyError:
print("Missing admin section in fed_admin configuration.")
return
ca_cert = admin_config.get("ca_cert", "")
client_cert = admin_config.get("client_cert", "")
client_key = admin_config.get("client_key", "")
if admin_config.get("with_ssl"):
if len(ca_cert) <= 0:
print("missing CA Cert file name field ca_cert in fed_admin configuration")
return
if len(client_cert) <= 0:
print("missing Client Cert file name field client_cert in fed_admin configuration")
return
if len(client_key) <= 0:
print("missing Client Key file name field client_key in fed_admin configuration")
return
else:
ca_cert = None
client_key = None
client_cert = None
upload_dir = admin_config.get("upload_dir")
download_dir = admin_config.get("download_dir")
if not os.path.isdir(download_dir):
os.makedirs(download_dir)
assert os.path.isdir(admin_dir), f"admin directory does not exist at {admin_dir}"
if not self.poc:
assert os.path.isfile(ca_cert), f"rootCA.pem does not exist at {ca_cert}"
assert os.path.isfile(client_cert), f"client.crt does not exist at {client_cert}"
assert os.path.isfile(client_key), f"client.key does not exist at {client_key}"
# Connect with admin client
self.api = FLAdminAPI(
ca_cert=ca_cert,
client_cert=client_cert,
client_key=client_key,
upload_dir=upload_dir,
download_dir=download_dir,
overseer_agent=conf.overseer_agent,
user_name=username,
insecure=self.poc,
debug=debug,
)
# wait for admin to login
_t_warning_start = time.time()
while not self.api.server_sess_active:
time.sleep(0.5)
if time.time() - _t_warning_start > 10:
print("Admin is taking a long time to log in to the server...")
print("Make sure the server is up and available, and all configurations are correct.")
_t_warning_start = time.time()
[docs] def run(
self,
job_folder_name,
):
"""An example script to upload, deploy, and start a specified app.
Note that the app folder must be in upload_dir already. Prints the command to be executed first so it is easy
to follow along as the commands run.
Args:
job_folder_name: name of job folder to submit, either relative to the upload_dir specified in the fed_admin.json config, or absolute path
"""
try:
print("api.check_status(TargetType.SERVER)")
api_command_wrapper(self.api.check_status(TargetType.SERVER))
print(f'api.submit_job("{job_folder_name}")')
api_command_wrapper(self.api.submit_job(job_folder_name))
time.sleep(1)
print("api.check_status(TargetType.SERVER)")
api_command_wrapper(self.api.check_status(TargetType.SERVER))
# The following wait_until can be put into a loop that has other behavior other than waiting until clients
# are in a status of stopped. For this code, the app is expected to stop, or this may not end.
print("api.wait_until_client_status()")
wait_result = api_command_wrapper(self.api.wait_until_client_status())
print(wait_result)
print("api.check_status(TargetType.SERVER)")
api_command_wrapper(self.api.check_status(TargetType.SERVER))
# now server engine status should be stopped
time.sleep(10) # wait for clients to stop in case they take longer than server to stop
print("api.check_status(TargetType.CLIENT)")
api_command_wrapper(self.api.check_status(TargetType.CLIENT))
except RuntimeError as e:
print(f"There was an exception: {secure_format_exception(e)}")