Source code for nvflare.app_common.tie.py_applet

# Copyright (c) 2024, NVIDIA CORPORATION.  All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import multiprocessing
import os
import sys
import threading
import time
from abc import ABC, abstractmethod

from nvflare.apis.workspace import Workspace
from nvflare.fuel.utils.log_utils import add_log_file_handler, configure_logging
from nvflare.security.logging import secure_format_exception, secure_log_traceback

from .applet import Applet
from .defs import Constant


[docs] class PyRunner(ABC): """ A PyApplet must return a light-weight PyRunner object to run the Python code of the external app. Since the runner could be running in a separate subprocess, the runner object must be pickleable! """
[docs] @abstractmethod def start(self, app_ctx: dict): """Start the external app's Python code Args: app_ctx: the app's execution context Returns: """ pass
[docs] @abstractmethod def stop(self, timeout: float): """Stop the external app's python code Args: timeout: how long to wait for the app to stop before killing it Returns: None """ pass
[docs] @abstractmethod def is_stopped(self) -> (bool, int): """Check whether the app code is stopped Returns: a tuple of: whether the app is stopped, and exit code if stopped """ pass
class _PyStarter: """This class is used to start the Python code of the applet. It is used when running the applet in a thread or in a separate process. """ def __init__(self, runner: PyRunner, in_process: bool, workspace: Workspace, job_id: str): self.runner = runner self.in_process = in_process self.workspace = workspace self.job_id = job_id self.error = None self.started = True self.stopped = False self.exit_code = 0 def start(self, app_ctx: dict): """Start the applet and wait for it to finish. Args: app_ctx: the app's execution context Returns: None """ try: if not self.in_process: # enable logging run_dir = self.workspace.get_run_dir(self.job_id) log_file_name = os.path.join(run_dir, "applet_log.txt") configure_logging(self.workspace) add_log_file_handler(log_file_name) self.runner.start(app_ctx) # Note: run_func does not return until it runs to completion! self.stopped = True except Exception as e: secure_log_traceback() self.error = f"Exception starting applet: {secure_format_exception(e)}" self.started = False self.exit_code = Constant.EXIT_CODE_CANT_START self.stopped = True if not self.in_process: # this is a separate process sys.exit(self.exit_code)
[docs] class PyApplet(Applet, ABC): def __init__(self, in_process: bool): """Constructor of PyApplet, which runs the applet's Python code in a separate thread or subprocess. Args: in_process: whether to run the applet code as separate thread within the same process or as a separate subprocess. """ Applet.__init__(self) self.in_process = in_process self.starter = None self.process = None self.runner = None
[docs] @abstractmethod def get_runner(self, app_ctx: dict) -> PyRunner: """Subclass must implement this method to return a PyRunner. The returned PyRunner must be pickleable since it could be run in a separate subprocess! Args: app_ctx: the app context for the runner Returns: a PyRunner object """ pass
[docs] def start(self, app_ctx: dict): """Start the execution of the applet. Args: app_ctx: the app context Returns: """ fl_ctx = app_ctx.get(Constant.APP_CTX_FL_CONTEXT) engine = fl_ctx.get_engine() workspace = engine.get_workspace() job_id = fl_ctx.get_job_id() runner = self.get_runner(app_ctx) if not isinstance(runner, PyRunner): raise RuntimeError(f"runner must be a PyRunner but got {type(runner)}") self.runner = runner self.starter = _PyStarter(runner, self.in_process, workspace, job_id) if self.in_process: self._start_in_thread(self.starter, app_ctx) else: self._start_in_process(self.starter, app_ctx)
def _start_in_thread(self, starter, app_ctx: dict): """Start the applet in a separate thread.""" self.logger.info("Starting applet in another thread") thread = threading.Thread(target=starter.start, args=(app_ctx,), daemon=True, name="applet") thread.start() if not self.starter.started: self.logger.error(f"Cannot start applet: {self.starter.error}") raise RuntimeError(self.starter.error) def _start_in_process(self, starter, app_ctx: dict): """Start the applet in a separate process.""" # must remove Constant.APP_CTX_FL_CONTEXT from ctx because it's not pickleable! app_ctx.pop(Constant.APP_CTX_FL_CONTEXT, None) self.logger.info("Starting applet in another process") self.process = multiprocessing.Process(target=starter.start, args=(app_ctx,), daemon=True, name="applet") self.process.start()
[docs] def stop(self, timeout=0.0) -> int: """Stop the applet Args: timeout: amount of time to wait for the applet to stop by itself. If the applet does not stop on its own within this time, we'll forcefully stop it by kill. Returns: None """ if not self.runner: raise RuntimeError("PyRunner is not set") if self.in_process: self.runner.stop(timeout) return 0 else: p = self.process self.process = None if p: assert isinstance(p, multiprocessing.Process) if p.exitcode is None: # the process is still running if timeout > 0: # wait for the applet to stop by itself start = time.time() while time.time() - start < timeout: if p.exitcode is not None: # already stopped self.logger.info(f"applet stopped (rc={p.exitcode}) after {time.time() - start} secs") return p.exitcode time.sleep(0.1) self.logger.info("stopped applet by killing the process") p.kill() return -9
[docs] def is_stopped(self) -> (bool, int): if not self.runner: raise RuntimeError("PyRunner is not set") if self.in_process: if self.starter: if self.starter.stopped: self.logger.info("starter is stopped!") return True, self.starter.exit_code return self.runner.is_stopped() else: if self.process: assert isinstance(self.process, multiprocessing.Process) ec = self.process.exitcode if ec is None: return False, 0 else: return True, ec else: return True, 0