Source code for nvflare.private.fed.app.utils

# Copyright (c) 2023, NVIDIA CORPORATION.  All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import os
import sys
import threading
import time

import psutil

from nvflare.apis.fl_constant import FLContextKey, WorkspaceConstants
from nvflare.apis.fl_context import FLContext
from nvflare.apis.fl_exception import UnsafeComponentError
from nvflare.fuel.sec.security_content_service import SecurityContentService
from nvflare.private.fed.runner import Runner
from nvflare.private.fed.server.admin import FedAdminServer
from nvflare.private.fed.server.fed_server import FederatedServer


[docs] def monitor_parent_process(runner: Runner, parent_pid, stop_event: threading.Event): while True: if stop_event.is_set() or not psutil.pid_exists(parent_pid): runner.stop() break time.sleep(1)
[docs] def check_parent_alive(parent_pid, stop_event: threading.Event): while True: if stop_event.is_set() or not psutil.pid_exists(parent_pid): pid = os.getpid() kill_child_processes(pid) os.killpg(os.getpgid(pid), 9) break time.sleep(1)
[docs] def kill_child_processes(parent_pid): try: parent = psutil.Process(parent_pid) except psutil.NoSuchProcess: return children = parent.children(recursive=True) for process in children: process.kill()
[docs] def create_admin_server(fl_server: FederatedServer, server_conf=None, args=None): """To create the admin server. Args: fl_server: fl_server server_conf: server config args: command args Returns: A FedAdminServer. """ admin_server = FedAdminServer( cell=fl_server.cell, fed_admin_interface=fl_server.engine, cmd_modules=fl_server.cmd_modules, file_upload_dir=os.path.join(args.workspace, server_conf.get("admin_storage", "tmp")), file_download_dir=os.path.join(args.workspace, server_conf.get("admin_storage", "tmp")), download_job_url=server_conf.get("download_job_url", "http://"), timeout=server_conf.get("admin_timeout", 10.0), ) return admin_server
[docs] def version_check(): if sys.version_info >= (3, 13): raise RuntimeError( "Python versions 3.13 and above are not yet supported. Please use Python version between 3.9 and 3.12." ) if sys.version_info < (3, 9): raise RuntimeError( "Python versions 3.8 and below are not supported. Please use Python version between 3.9 and 3.12." )
[docs] def init_security_content_service(workspace_dir): content_folder_path = os.path.join(workspace_dir, WorkspaceConstants.STARTUP_FOLDER_NAME) os.makedirs(content_folder_path, exist_ok=True) SecurityContentService.initialize(content_folder=content_folder_path)
[docs] def component_security_check(fl_ctx: FLContext): exceptions = fl_ctx.get_prop(FLContextKey.EXCEPTIONS) if exceptions: for _, exception in exceptions.items(): if isinstance(exception, UnsafeComponentError): print(f"Unsafe component configured, could not start {fl_ctx.get_identity_name()}!!") raise RuntimeError(exception)