Source code for nvflare.recipe.run

# Copyright (c) 2025, NVIDIA CORPORATION.  All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import threading
from typing import Optional

from nvflare.fuel.utils.log_utils import get_obj_logger
from nvflare.recipe.spec import ExecEnv


[docs] class Run: """Represents a running or completed job execution. Provides methods to get job status, results, and abort the job. Caches status and result after the execution environment is stopped. This class is thread-safe. All state-changing operations are protected by a lock. """ def __init__(self, exec_env: ExecEnv, job_id: str): """Initialize a Run instance. Args: exec_env: The execution environment managing this job. job_id: The unique identifier for the job. Raises: ValueError: If exec_env is None or job_id is empty. """ if exec_env is None: raise ValueError("exec_env cannot be None") if not job_id or not isinstance(job_id, str): raise ValueError("job_id must be a non-empty string") self.exec_env = exec_env self.job_id = job_id self._lock = threading.Lock() self._stopped = False self._cached_status: Optional[str] = None self._cached_result: Optional[str] = None self.logger = get_obj_logger(self)
[docs] def get_job_id(self) -> str: """Get the job ID. Returns: str: The job ID. """ return self.job_id
[docs] def get_status(self) -> Optional[str]: """Get the status of the run. Returns: Optional[str]: The status of the run, or None if not available or on error. """ with self._lock: if self._stopped: return self._cached_status try: return self.exec_env.get_job_status(self.job_id) except Exception as e: self.logger.warning(f"Failed to get job status: {e}") return None
[docs] def get_result(self, timeout: float = 0.0, clean_up: bool = True) -> Optional[str]: """Get the result workspace of the run. Waits for job to complete, caches status, then stops execution environment. Args: timeout (float, optional): Timeout for job completion. Defaults to 0.0 (no timeout). clean_up (bool, optional): Whether to remove the execution-environment workspace (e.g. the POC workspace) when stopping. Defaults to True, preserving the existing "each run is independent" behavior. Pass ``clean_up=False`` to keep the workspace on disk after the run so server/client log files (including the per-service ``poc_console.log`` introduced in #4500) remain available for debugging or test assertions. Returns: Optional[str]: Result workspace path, or None if job not finished or on error. The path may be removed by the time this method returns when ``clean_up=True``. """ with self._lock: if self._stopped: return self._cached_result result = None try: result = self.exec_env.get_job_result(self.job_id, timeout=timeout) self._cached_result = result except Exception as e: self.logger.warning(f"Failed to get job result: {e}") self._cached_result = None try: self._cached_status = self.exec_env.get_job_status(self.job_id) except Exception as e: self.logger.warning(f"Failed to get job status: {e}") self._cached_status = None try: self.exec_env.stop(clean_up=clean_up) except Exception as e: self.logger.warning(f"Failed to stop execution environment: {e}") finally: self._stopped = True return result
[docs] def abort(self) -> None: """Abort the running job. This is a no-op if the execution environment has already been stopped (e.g., after get_result() was called). Errors are logged but not raised. """ with self._lock: if self._stopped: return try: self.exec_env.abort_job(self.job_id) except Exception as e: self.logger.warning(f"Failed to abort job: {e}")