Source code for nvflare.private.fed.app.utils

# Copyright (c) 2023, NVIDIA CORPORATION.  All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import os
import signal
import sys
import threading
import time

import psutil

from nvflare.apis.fl_constant import FLContextKey, WorkspaceConstants
from nvflare.apis.fl_context import FLContext
from nvflare.apis.fl_exception import UnsafeComponentError
from nvflare.fuel.hci.security import hash_password
from nvflare.fuel.sec.security_content_service import SecurityContentService
from nvflare.private.defs import SSLConstants
from nvflare.private.fed.runner import Runner
from nvflare.private.fed.server.admin import FedAdminServer
from nvflare.private.fed.server.fed_server import FederatedServer


[docs]def monitor_parent_process(runner: Runner, parent_pid, stop_event: threading.Event): while True: if stop_event.is_set() or not psutil.pid_exists(parent_pid): runner.stop() break time.sleep(1)
[docs]def check_parent_alive(parent_pid, stop_event: threading.Event): while True: if stop_event.is_set() or not psutil.pid_exists(parent_pid): pid = os.getpid() kill_child_processes(pid) os.killpg(os.getpgid(pid), 9) break time.sleep(1)
[docs]def kill_child_processes(parent_pid, sig=signal.SIGTERM): try: parent = psutil.Process(parent_pid) except psutil.NoSuchProcess: return children = parent.children(recursive=True) for process in children: process.send_signal(sig)
[docs]def create_admin_server(fl_server: FederatedServer, server_conf=None, args=None, secure_train=False): """To create the admin server. Args: fl_server: fl_server server_conf: server config args: command args secure_train: True/False Returns: A FedAdminServer. """ users = {} # Create a default user admin:admin for the POC insecure use case. if not secure_train: users = {"admin": hash_password("admin")} root_cert = server_conf[SSLConstants.ROOT_CERT] if secure_train else None server_cert = server_conf[SSLConstants.CERT] if secure_train else None server_key = server_conf[SSLConstants.PRIVATE_KEY] if secure_train else None admin_server = FedAdminServer( cell=fl_server.cell, fed_admin_interface=fl_server.engine, users=users, cmd_modules=fl_server.cmd_modules, file_upload_dir=os.path.join(args.workspace, server_conf.get("admin_storage", "tmp")), file_download_dir=os.path.join(args.workspace, server_conf.get("admin_storage", "tmp")), host=server_conf.get("admin_host", "localhost"), port=server_conf.get("admin_port", 5005), ca_cert_file_name=root_cert, server_cert_file_name=server_cert, server_key_file_name=server_key, accepted_client_cns=None, download_job_url=server_conf.get("download_job_url", "http://"), ) return admin_server
[docs]def version_check(): if sys.version_info >= (3, 11): raise RuntimeError("Python versions 3.11 and above are not yet supported. Please use Python 3.8, 3.9 or 3.10.") if sys.version_info < (3, 8): raise RuntimeError("Python versions 3.7 and below are not supported. Please use Python 3.8, 3.9 or 3.10")
[docs]def init_security_content_service(workspace_dir): content_folder_path = os.path.join(workspace_dir, WorkspaceConstants.STARTUP_FOLDER_NAME) os.makedirs(content_folder_path, exist_ok=True) SecurityContentService.initialize(content_folder=content_folder_path)
[docs]def component_security_check(fl_ctx: FLContext): exceptions = fl_ctx.get_prop(FLContextKey.EXCEPTIONS) if exceptions: for _, exception in exceptions.items(): if isinstance(exception, UnsafeComponentError): print(f"Unsafe component configured, could not start {fl_ctx.get_identity_name()}!!") raise RuntimeError(exception)