nvflare.app_common.launchers.subprocess_launcher module

class SubprocessLauncher(script: str, launch_once: bool | None = True, clean_up_script: str | None = None, shutdown_timeout: float | None = 0.0)[source]

Bases: Launcher

Initializes the SubprocessLauncher.

Parameters:
  • script (str) – Script to be launched using subprocess.

  • launch_once (bool) – Whether the external process will be launched only once at the beginning or on each task.

  • clean_up_script (Optional[str]) – Optional clean up script to be run after the main script execution.

  • shutdown_timeout (float) – If provided, will wait for this number of seconds before shutdown.

check_run_status(task_name: str, fl_ctx: FLContext) str[source]

Checks the run status of Launcher.

finalize(fl_ctx: FLContext) None[source]
initialize(fl_ctx: FLContext)[source]
launch_task(task_name: str, shareable: Shareable, fl_ctx: FLContext, abort_signal: Signal) bool[source]

Launches external system to handle a task.

Parameters:
  • task_name (str) – task name.

  • shareable (Shareable) – input shareable.

  • fl_ctx (FLContext) – fl context.

  • abort_signal (Signal) – signal to check during execution to determine whether this task is aborted.

Returns:

Whether launch success or not.

needs_deferred_stop() bool[source]

Returns True if stop_task() should be deferred to a background thread.

Deferred stop is needed when the launcher terminates the external process on each stop_task() call (launch_once=False), so the process can stay alive long enough for the server to finish downloading large tensors from it.

For launch_once=True launchers the subprocess lives for the entire job, so deferring would block the next round’s launch indefinitely — return False.

stop_task(task_name: str, fl_ctx: FLContext, abort_signal: Signal) None[source]

Stops external system and free up resources.

Parameters:
  • task_name (str) – task name.

  • fl_ctx (FLContext) – fl context.

Note

Implementations must be idempotent and thread-safe. LauncherExecutor may call stop_task() from a deferred background thread and, in extreme timeout scenarios, concurrently from the main task thread as a fallback. A second concurrent or sequential call must be a safe no-op (e.g. guard on a null process reference inside a lock, as SubprocessLauncher does).

get_line(buffer: bytearray)[source]

Read a line from the binary buffer. It treats all combinations of and as line breaks.

Args:

buffer: A binary buffer

Returns:

(line, remaining): Return the first line as str and the remaining buffer. line is None if no newline found

log_subprocess_output(process, logger)[source]