Source code for nvflare.tool.job.job_cli

# Copyright (c) 2023, NVIDIA CORPORATION.  All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import argparse
import datetime
import json
import os
import re
import shutil
import sys
import time
import traceback
from contextlib import contextmanager
from functools import partial
from tempfile import mkdtemp
from typing import List, Optional, Tuple

from pyhocon import ConfigFactory as CF
from pyhocon import ConfigTree

from nvflare.apis.utils.job_submit_token import validate_submit_token
from nvflare.cli_unknown_cmd_exception import CLIUnknownCmdException
from nvflare.fuel.utils.config import ConfigFormat
from nvflare.fuel.utils.config_factory import ConfigFactory
from nvflare.tool.cli_arg_utils import get_arg_value
from nvflare.tool.cli_session import add_startup_kit_selection_args, new_cli_session, new_cli_session_for_args
from nvflare.tool.job.config.configer import (
    build_config_file_indices,
    filter_indices,
    get_root_index,
    merge_configs_from_cli,
)
from nvflare.tool.job.job_client_const import (
    CONFIG_FED_CLIENT_CONF,
    CONFIG_FED_SERVER_CONF,
    CONFIG_FILE_BASE_NAME_WO_EXTS,
    DEFAULT_APP_NAME,
    JOB_CONFIG_COMP_NAME,
    JOB_CONFIG_FILE_NAME,
    JOB_CONFIG_VAR_NAME,
    JOB_CONFIG_VAR_VALUE,
    JOB_INFO_CONF,
    JOB_INFO_CONTROLLER_TYPE,
    JOB_INFO_CONTROLLER_TYPE_KEY,
    JOB_INFO_DESC,
    JOB_INFO_DESC_KEY,
    JOB_INFO_EXECUTION_API_TYPE,
    JOB_INFO_EXECUTION_API_TYPE_KEY,
    JOB_INFO_KEYS,
    JOB_INFO_MD,
    JOB_META_BASE_NAME,
    META_APP_NAME,
    TEMPLATES_KEY,
)
from nvflare.utils.cli_utils import (
    create_job_template_config,
    find_job_templates_location,
    get_curr_dir,
    load_hidden_config_state,
    save_config,
)


def _submit_token_arg(value: str) -> str:
    try:
        return validate_submit_token(value)
    except ValueError as ex:
        raise argparse.ArgumentTypeError(str(ex)) from ex


CMD_LIST_TEMPLATES = "list-templates"
CMD_SHOW_VARIABLES = "show-variables"
CMD_CREATE_JOB = "create"
CMD_SUBMIT_JOB = "submit"
CMD_JOB_LIST = "list"
CMD_JOB_META = "meta"
CMD_JOB_ABORT = "abort"
CMD_JOB_CLONE = "clone"
CMD_JOB_DOWNLOAD = "download"
CMD_JOB_DELETE = "delete"

# Job observability commands
CMD_JOB_STATS = "stats"
CMD_JOB_LOGS = "logs"

# Job lifecycle helpers
CMD_JOB_MONITOR = "monitor"
CMD_JOB_WAIT = "wait"
CMD_JOB_LOG_CONFIG = "log-config"
CMD_JOB_LOG_ALIAS = "log"

_JSON_OUTPUT_MODES = ["json"]
_JOB_SUBMIT_RETRY_TOKEN_SCHEMA = {
    "supported": True,
    "flag": "--submit-token",
    "scope": "study + submitter + token",
    "retry_safe_when_present": True,
    "effect": (
        "Deduplicates retries for identical submitted job content in the same study by the same submitter; "
        "different content with the same token is rejected."
    ),
}
_NO_RETRY_TOKEN_SCHEMA = {"supported": False}

_JOB_HELP_FORMATTER = partial(argparse.HelpFormatter, max_help_position=24, width=120)
_ACTIVE_STARTUP_KIT_HINT = (
    "Run 'nvflare config list' and 'nvflare config use <id>', pass --kit-id <id> or --startup-kit <path>, "
    "or set NVFLARE_STARTUP_KIT_DIR for automation."
)
_DEFAULT_JOB_LOG_TAIL_LINES = 500
_JOB_LOG_TS_RE = re.compile(r"^\[?(?P<ts>\d{4}-\d{2}-\d{2}[ T]\d{2}:\d{2}:\d{2}(?:[\.,]\d+)?)")
_JOB_DOWNLOAD_GLOBAL_MODEL_NAMES = (
    "FL_global_model.pt",
    "global_model.pt",
    "global_model.pth",
    "best_FL_global_model.pt",
    "best_global_model.pt",
    "model_global.json",
    "model_global.joblib",
)
_JOB_DOWNLOAD_EXPECTED_ARTIFACTS = ("global_model", "metrics_summary", "client_logs")


def _non_negative_int(value: str) -> int:
    try:
        parsed = int(value)
    except ValueError as e:
        raise argparse.ArgumentTypeError(str(e))
    if parsed < 0:
        raise argparse.ArgumentTypeError("value must be >= 0")
    return parsed


[docs] def find_filename_basename(f: str): basename = os.path.basename(f) if "." in basename: return os.path.splitext(basename)[0] else: return basename
[docs] def build_job_template_indices(job_templates_dir: str) -> ConfigTree: conf = CF.parse_string("{ templates = {} }") config_file_base_names = CONFIG_FILE_BASE_NAME_WO_EXTS template_conf = conf.get(TEMPLATES_KEY) keys = JOB_INFO_KEYS for f in os.listdir(job_templates_dir): template_path = os.path.join(job_templates_dir, f) if os.path.isdir(template_path): for _, _, files in os.walk(template_path): config_files = [f for f in files if find_filename_basename(f) in config_file_base_names] if len(config_files) > 0: info_conf = get_template_info_config(template_path) for key in keys: value = info_conf.get(key, "NA") if info_conf else "NA" template_name = os.path.basename(f) template_conf.put(f"{template_name}.{key}", value) return conf
[docs] def get_template_info_config(template_dir): info_conf_path = os.path.join(template_dir, JOB_INFO_CONF) return CF.parse_file(info_conf_path) if os.path.isfile(info_conf_path) else None
[docs] def get_app_dirs_from_template(template_dir): app_dirs = [] for root, _dirs, files in os.walk(template_dir): if root != template_dir and (CONFIG_FED_SERVER_CONF in files or CONFIG_FED_CLIENT_CONF in files): app_dirs.append(root) return app_dirs
[docs] def get_app_dirs_from_job_folder(job_folder): app_dirs = [] for root, _dirs, _files in os.walk(job_folder): if root != job_folder and (root.endswith("config") or root.endswith("custom")): dir_name = os.path.dirname(os.path.relpath(root, job_folder)) if dir_name: app_dirs.append(dir_name) return app_dirs
[docs] def create_job(cmd_args): from nvflare.tool.cli_schema import handle_schema_flag handle_schema_flag( job_sub_cmd_parser[CMD_CREATE_JOB], "nvflare job create", [], sys.argv[1:], deprecated=True, deprecated_message="Use 'python job.py --export --export-dir <job_folder>' + 'nvflare job submit -j <job_folder>' instead.", ) from nvflare.tool.cli_output import print_human print_human( "WARNING: 'nvflare job create' is deprecated. Use 'python job.py --export --export-dir <job_folder>' + " "'nvflare job submit -j <job_folder>' instead. Run 'nvflare recipe list' to see available recipes." ) try: template_src = get_src_template(cmd_args) if not template_src: template_src = get_src_template_by_name(cmd_args) app_dirs = get_app_dirs_from_template(str(template_src).strip()) app_names = [os.path.basename(f) for f in app_dirs] app_names = app_names if app_names else [DEFAULT_APP_NAME] job_folder = cmd_args.job_folder prepare_job_folder(cmd_args) app_custom_dirs = prepare_app_dirs(job_folder, app_names) prepare_app_scripts(job_folder, app_custom_dirs, cmd_args) config_dirs = get_config_dirs(job_folder, app_names) fmt, real_config_path = ConfigFactory.search_config_format(CONFIG_FED_CLIENT_CONF, config_dirs) if real_config_path and not cmd_args.force: from nvflare.tool.cli_output import print_human print_human( f"""\nwarning: configuration files:\n {"config_fed_server.[json|conf|yml]"} already exists. \nNot generating the config files. If you would like to overwrite, use -force option""" ) return template_srcs = {} if not app_dirs: template_srcs[DEFAULT_APP_NAME] = template_src else: for app_dir in app_dirs: app_name = os.path.basename(app_dir) template_srcs[app_name] = app_dir for app_name in template_srcs: src = template_srcs[app_name] app_config_dir = get_config_dir(job_folder, app_name) shutil.copytree(src=src, dst=app_config_dir, dirs_exist_ok=True) remove_extra_files(app_config_dir) prepare_meta_config(cmd_args, template_src, app_names) app_variable_values = prepare_job_config(cmd_args, app_names) display_template_variables(job_folder, app_variable_values) except ValueError as e: from nvflare.tool.cli_output import output_usage_error, print_human if cmd_args.debug: print_human(traceback.format_exc()) output_usage_error(job_sub_cmd_parser[CMD_CREATE_JOB], detail=str(e), exit_code=4)
[docs] def get_src_template_by_name(cmd_args): job_templates_dir = find_job_templates_location() template_index_conf = build_job_template_indices(job_templates_dir) target_template_name = cmd_args.template check_template_exists(target_template_name, template_index_conf) template_src = os.path.join(job_templates_dir, target_template_name) return template_src
[docs] def get_src_template(cmd_args) -> Optional[str]: target_template = os.path.abspath(cmd_args.template) if os.path.isdir(target_template): info_file = os.path.join(target_template, JOB_INFO_CONF) if os.path.isfile(info_file): return target_template return None
[docs] def remove_pycache_files(custom_dir): for root, dirs, _files in os.walk(custom_dir): # remove pycache and pyc files for d in dirs: if d == "__pycache__" or d.endswith(".pyc"): shutil.rmtree(os.path.join(root, d))
[docs] def remove_extra_files(config_dir): extra_file = [JOB_INFO_MD, JOB_INFO_CONF, "__init__.py", "__pycache__"] for ef in extra_file: file_path = os.path.join(config_dir, ef) if os.path.isfile(file_path): os.remove(file_path) elif os.path.isdir(file_path): shutil.rmtree(file_path)
[docs] def show_variables(cmd_args): from nvflare.tool.cli_schema import handle_schema_flag handle_schema_flag( job_sub_cmd_parser[CMD_SHOW_VARIABLES], "nvflare job show-variables", [], sys.argv[1:], deprecated=True, deprecated_message="Use the Job Recipe API instead.", ) from nvflare.tool.cli_output import print_human if getattr(cmd_args, "job_sub_cmd", None) == "show_variables": print_human("WARNING: 'nvflare job show_variables' is deprecated; use 'nvflare job show-variables' instead.") print_human("WARNING: 'nvflare job show-variables' is deprecated. Use the Job Recipe API instead.") try: if not os.path.isdir(cmd_args.job_folder): raise ValueError("required job folder is not specified.") app_dirs = get_app_dirs_from_job_folder(cmd_args.job_folder) app_names = [os.path.basename(f) for f in app_dirs] app_names = app_names if app_names else [DEFAULT_APP_NAME] indices = build_config_file_indices(cmd_args.job_folder, app_names) variable_values = filter_indices(app_indices_configs=indices) display_template_variables(cmd_args.job_folder, variable_values) except ValueError as e: from nvflare.tool.cli_output import output_usage_error, print_human if cmd_args.debug: print_human(traceback.format_exc()) output_usage_error(job_sub_cmd_parser[CMD_SHOW_VARIABLES], detail=str(e), exit_code=4)
[docs] def check_template_exists(target_template_name, template_index_conf): targets = [os.path.basename(key) for key in template_index_conf.get("templates").keys()] found = target_template_name in targets if not found: raise ValueError( f"Invalid template name {target_template_name}, " f"please check the available templates using nvflare job list-templates" )
[docs] def display_template_variables(job_folder, app_variable_values): from nvflare.tool.cli_output import print_human print_human("\nThe following are the variables you can change in the template\n") total_length = 135 left_margin = 1 print_human("-" * total_length) job_folder_header = fix_length_format(f"job folder: {job_folder}", total_length) print_human(" " * total_length) print_human(" " * left_margin, job_folder_header) print_human(" " * total_length) print_human("-" * total_length) file_name_fix_length = 30 var_name_fix_length = 30 var_value_fix_length = 35 var_comp_fix_length = 35 file_name = fix_length_format(JOB_CONFIG_FILE_NAME, file_name_fix_length) var_name = fix_length_format(JOB_CONFIG_VAR_NAME, var_name_fix_length) var_value = fix_length_format(JOB_CONFIG_VAR_VALUE, var_value_fix_length) var_comp = fix_length_format(JOB_CONFIG_COMP_NAME, var_comp_fix_length) print_human(" " * left_margin, file_name, var_name, var_value, var_comp) print_human("-" * total_length) for app_name, variable_values in app_variable_values.items(): if app_name != DEFAULT_APP_NAME and app_name != META_APP_NAME: app_header = fix_length_format(f"app: {app_name}", total_length) print_human(" " * left_margin, app_header) print_human(" " * total_length) for file in sorted(variable_values.keys()): indices = variable_values.get(file) file_name = os.path.basename(file) file_name = fix_length_format(file_name, file_name_fix_length) key_indices = indices for index in sorted(key_indices.keys()): key_index = key_indices[index] var_name = fix_length_format(index, var_name_fix_length) var_value = fix_length_format(str(key_index.value), var_value_fix_length) var_comp = " " if key_index.component_name is None else key_index.component_name var_comp = fix_length_format(var_comp, var_comp_fix_length) print_human(" " * left_margin, file_name, var_name, var_value, var_comp) print_human("") print_human("-" * total_length)
[docs] def list_templates(cmd_args): from nvflare.tool.cli_schema import handle_schema_flag handle_schema_flag( job_sub_cmd_parser[CMD_LIST_TEMPLATES], "nvflare job list-templates", [], sys.argv[1:], deprecated=True, deprecated_message="Use 'nvflare recipe list' instead.", ) from nvflare.tool.cli_output import print_human if getattr(cmd_args, "job_sub_cmd", None) == "list_templates": print_human("WARNING: 'nvflare job list_templates' is deprecated; use 'nvflare job list-templates' instead.") print_human("WARNING: 'nvflare job list-templates' is deprecated. Use 'nvflare recipe list' instead.") try: job_templates_dir = find_job_templates_location(cmd_args.job_templates_dir) job_templates_dir = os.path.abspath(job_templates_dir) template_index_conf = build_job_template_indices(job_templates_dir) display_available_templates(template_index_conf) if job_templates_dir: update_job_templates_dir(job_templates_dir) except ValueError as e: from nvflare.tool.cli_output import output_usage_error, print_human if cmd_args.debug: print_human(traceback.format_exc()) output_usage_error(job_sub_cmd_parser[CMD_LIST_TEMPLATES], detail=str(e), exit_code=4)
[docs] def update_job_templates_dir(job_templates_dir: str): config_file_path, nvflare_config, _migration_needed = load_hidden_config_state() if nvflare_config is None: from pyhocon import ConfigFactory as CF nvflare_config = CF.parse_string("{}") config = create_job_template_config(nvflare_config, job_templates_dir) save_config(config, config_file_path)
[docs] def display_available_templates(template_index_conf): from nvflare.tool.cli_output import print_human print_human("\nThe following job templates are available: \n") template_registry = template_index_conf.get("templates") total_length = 120 left_margin = 1 print_human("-" * total_length) name_fix_length = 20 description_fix_length = 60 controller_type_fix_length = 17 execution_api_type_fix_length = 23 name = fix_length_format("name", name_fix_length) description = fix_length_format(JOB_INFO_DESC, description_fix_length) execution_api_type = fix_length_format(JOB_INFO_EXECUTION_API_TYPE, execution_api_type_fix_length) controller_type = fix_length_format(JOB_INFO_CONTROLLER_TYPE, controller_type_fix_length) print_human(" " * left_margin, name, description, controller_type, execution_api_type) print_human("-" * total_length) for file_path in sorted(template_registry.keys()): name = os.path.basename(file_path) template_info = template_registry.get(file_path, None) if not template_info: template_info = template_registry.get(name) name = fix_length_format(name, name_fix_length) description = fix_length_format(template_info.get(JOB_INFO_DESC_KEY), description_fix_length) execution_api_type = fix_length_format( template_info.get(JOB_INFO_EXECUTION_API_TYPE_KEY), execution_api_type_fix_length, ) controller_type = fix_length_format(template_info.get(JOB_INFO_CONTROLLER_TYPE_KEY), controller_type_fix_length) print_human(" " * left_margin, name, description, controller_type, execution_api_type) print_human("-" * total_length)
[docs] def fix_length_format(name: str, name_fix_length: int): return f"{name[:name_fix_length]:{name_fix_length}}"
[docs] def submit_job(cmd_args): from nvflare.tool.cli_schema import handle_schema_flag if job_sub_cmd_parser[CMD_SUBMIT_JOB] is None: root_parser = argparse.ArgumentParser(prog="nvflare job") root_subparser = root_parser.add_subparsers(dest="job_sub_cmd") define_submit_job_parser(root_subparser) handle_schema_flag( job_sub_cmd_parser[CMD_SUBMIT_JOB], "nvflare job submit", [ "nvflare config use admin@nvidia.com", "nvflare job submit -j ./my_job", "nvflare job submit -j ./my_job --submit-token retry-001", ], sys.argv[1:], output_modes=_JSON_OUTPUT_MODES, streaming=False, mutating=True, idempotent=False, retry_token=_JOB_SUBMIT_RETRY_TOKEN_SCHEMA, ) def _has_job_meta(path: str) -> bool: for ext in (".json", ".conf", ".yml", ".yaml"): if os.path.isfile(os.path.join(path, f"meta{ext}")): return True return False def _has_server_config(path: str) -> bool: config_dir = os.path.join(path, "app", "config") for ext in (".json", ".conf", ".yml", ".yaml"): if os.path.isfile(os.path.join(config_dir, f"config_fed_server{ext}")): return True return False def _resolve_job_folder(path: str) -> str: if _has_job_meta(path) and _has_server_config(path): return path subdirs = [] for name in os.listdir(path): if name.startswith("."): continue full = os.path.join(path, name) if os.path.isdir(full): subdirs.append(full) if len(subdirs) == 1: candidate = subdirs[0] if _has_job_meta(candidate) and _has_server_config(candidate): from nvflare.tool.cli_output import is_json_mode, print_human if not is_json_mode(): print_human(f"Using job folder: {candidate}") return candidate return path temp_job_dir = None try: if not os.path.isdir(cmd_args.job_folder): raise ValueError(f"invalid job folder: {cmd_args.job_folder}") job_folder = _resolve_job_folder(cmd_args.job_folder) temp_job_dir = mkdtemp() shutil.copytree(job_folder, temp_job_dir, dirs_exist_ok=True) app_dirs = get_app_dirs_from_job_folder(job_folder) app_names = [os.path.basename(f) for f in app_dirs] app_names = app_names if app_names else [DEFAULT_APP_NAME] prepare_job_config(cmd_args, app_names, temp_job_dir) internal_submit_job(None, None, temp_job_dir, cmd_args) except ValueError as e: from nvflare.tool.cli_output import output_usage_error, print_human if cmd_args.debug: print_human(traceback.format_exc()) output_usage_error(job_sub_cmd_parser[CMD_SUBMIT_JOB], detail=str(e), exit_code=4) finally: if temp_job_dir: if cmd_args.debug: from nvflare.tool.cli_output import print_human print_human(f"in debug mode, job configurations can be examined in temp job directory '{temp_job_dir}'") else: shutil.rmtree(temp_job_dir)
def _resolve_admin_user_and_dir_from_startup_kit( startup_kit_dir: str, ) -> Tuple[str, str]: from nvflare.tool.kit.kit_config import resolve_admin_user_and_dir_from_startup_kit return resolve_admin_user_and_dir_from_startup_kit(startup_kit_dir)
[docs] def internal_submit_job(admin_user_dir, username, temp_job_dir, cmd_args=None): from nvflare.fuel.flare_api.api_spec import ( AuthorizationError, InternalError, InvalidJobDefinition, NoConnection, SubmitTokenConflict, SubmitTokenJobDeleted, ) from nvflare.tool.cli_output import is_json_mode, output_error, output_ok, print_human if not is_json_mode(): print_human("trying to connect to the server") study = getattr(cmd_args, "study", "default") if cmd_args else "default" submit_token = getattr(cmd_args, "submit_token", None) if cmd_args else None sess = _get_session(args=cmd_args, admin_user_dir=admin_user_dir, username=username, study=study) try: try: job_id = sess.submit_job(temp_job_dir, submit_token=submit_token) except InvalidJobDefinition as e: output_error("JOB_INVALID", exit_code=1, detail=str(e)) raise SystemExit(1) except SubmitTokenConflict as e: output_error( "SUBMIT_TOKEN_CONFLICT", exit_code=4, detail=str(e), hint="Use a new submit token when submitting different job content.", data={"existing_job_id": e.existing_job_id} if e.existing_job_id else None, ) raise SystemExit(4) except SubmitTokenJobDeleted as e: data = {"job_id": e.job_id, "state": e.state, "deleted_time": e.deleted_time} output_error( "SUBMIT_TOKEN_JOB_DELETED", exit_code=4, detail=str(e), data={key: value for key, value in data.items() if value is not None}, ) raise SystemExit(4) except AuthorizationError as e: output_error("AUTH_FAILED", exit_code=2, detail=str(e)) raise SystemExit(2) except InternalError as e: output_error("INTERNAL_ERROR", exit_code=5, detail=str(e)) raise SystemExit(5) except NoConnection as e: output_error("CONNECTION_FAILED", exit_code=2, detail=str(e)) raise SystemExit(2) output_ok({"job_id": job_id}) finally: sess.close()
job_sub_cmd_handlers = { CMD_CREATE_JOB: create_job, CMD_SUBMIT_JOB: submit_job, CMD_LIST_TEMPLATES: list_templates, CMD_SHOW_VARIABLES: show_variables, CMD_JOB_LIST: None, CMD_JOB_META: None, CMD_JOB_ABORT: None, CMD_JOB_CLONE: None, CMD_JOB_DOWNLOAD: None, CMD_JOB_DELETE: None, CMD_JOB_STATS: None, CMD_JOB_LOGS: None, CMD_JOB_MONITOR: None, CMD_JOB_WAIT: None, CMD_JOB_LOG_CONFIG: None, } job_sub_cmd_parser = { CMD_CREATE_JOB: None, CMD_SUBMIT_JOB: None, CMD_LIST_TEMPLATES: None, CMD_SHOW_VARIABLES: None, CMD_JOB_LIST: None, CMD_JOB_META: None, CMD_JOB_ABORT: None, CMD_JOB_CLONE: None, CMD_JOB_DOWNLOAD: None, CMD_JOB_DELETE: None, CMD_JOB_STATS: None, CMD_JOB_LOGS: None, CMD_JOB_MONITOR: None, CMD_JOB_WAIT: None, CMD_JOB_LOG_CONFIG: None, }
[docs] def handle_job_cli_cmd(cmd_args): sub_cmd = { "list_templates": CMD_LIST_TEMPLATES, "show_variables": CMD_SHOW_VARIABLES, }.get(cmd_args.job_sub_cmd, cmd_args.job_sub_cmd) cmd_args.job_sub_cmd = sub_cmd job_cmd_handler = job_sub_cmd_handlers.get(sub_cmd, None) if job_cmd_handler: job_cmd_handler(cmd_args) elif cmd_args.job_sub_cmd is None: raise CLIUnknownCmdException("\n no job subcommand provided. \n") else: raise CLIUnknownCmdException("\n invalid command. \n")
[docs] def def_job_cli_parser(sub_cmd): cmd = "job" parser = sub_cmd.add_parser( cmd, help="submit, manage, and monitor FL jobs", formatter_class=_JOB_HELP_FORMATTER, ) job_subparser = parser.add_subparsers(title="job subcommands", metavar="", dest="job_sub_cmd") define_submit_job_parser(job_subparser) define_job_monitor_parser(job_subparser) define_job_wait_parser(job_subparser) define_list_jobs_parser(job_subparser) define_abort_job_parser(job_subparser) define_job_meta_parser(job_subparser) define_job_logs_parser(job_subparser) define_job_log_parser(job_subparser) define_job_stats_parser(job_subparser) define_download_job_parser(job_subparser) define_clone_job_parser(job_subparser) define_delete_job_parser(job_subparser) define_list_templates_parser(job_subparser) define_create_job_parser(job_subparser) define_variables_parser(job_subparser) return {cmd: parser}
[docs] def define_submit_job_parser(job_subparser): submit_parser = job_subparser.add_parser("submit", help="submit job") submit_parser.add_argument( "-j", "--job-folder", "--job_folder", # backward compat dest="job_folder", type=str, nargs="?", default=os.path.join(get_curr_dir(), "current_job"), help="job folder path, default to ./current_job directory", ) submit_parser.add_argument("-debug", "--debug", action="store_true", help="debug is on") submit_parser.add_argument("--study", type=str, default="default", help="study to submit the job to") submit_parser.add_argument( "--submit-token", type=_submit_token_arg, default=None, help="retry-safe submit token scoped by study and submitter", ) add_startup_kit_selection_args(submit_parser) submit_parser.add_argument("--schema", action="store_true", help="print command schema as JSON and exit") job_sub_cmd_parser[CMD_SUBMIT_JOB] = submit_parser
[docs] def define_list_templates_parser(job_subparser): show_jobs_parser = job_subparser.add_parser( CMD_LIST_TEMPLATES, aliases=["list_templates"], help="[DEPRECATED] use 'nvflare recipe list'", ) show_jobs_parser.add_argument( "-d", "--job_templates_dir", type=str, nargs="?", default=None, help="Job template directory, if not specified, " "will search from ~/.nvflare/config.conf and NVFLARE_HOME env. variables", ) show_jobs_parser.add_argument("-debug", "--debug", action="store_true", help="debug is on") show_jobs_parser.add_argument("--schema", action="store_true", help="print command schema as JSON and exit") job_sub_cmd_parser[CMD_LIST_TEMPLATES] = show_jobs_parser
[docs] def define_variables_parser(job_subparser): show_variables_parser = job_subparser.add_parser( CMD_SHOW_VARIABLES, aliases=["show_variables"], help="[DEPRECATED] use 'nvflare recipe list' or the Job Recipe API", ) show_variables_parser.add_argument( "-j", "--job-folder", "--job_folder", type=str, nargs="?", default=os.path.join(get_curr_dir(), "current_job"), help="job folder path, default to ./current_job directory", ) show_variables_parser.add_argument("-debug", "--debug", action="store_true", help="debug is on") show_variables_parser.add_argument("--schema", action="store_true", help="print command schema as JSON and exit") job_sub_cmd_parser[CMD_SHOW_VARIABLES] = show_variables_parser
[docs] def define_create_job_parser(job_subparser): create_parser = job_subparser.add_parser( "create", help="[DEPRECATED] use 'python job.py --export --export-dir <job_folder>' + 'nvflare job submit -j <job_folder>'", ) create_parser.add_argument( "-j", "--job_folder", type=str, nargs="?", default=os.path.join(get_curr_dir(), "current_job"), help="job_folder path, default to ./current_job directory", ) create_parser.add_argument( "-w", "--template", type=str, nargs="?", default="sag_pt", help="""template name or template folder. You can use list-templates to see available jobs from job templates, pick name such as 'sag_pt' as template name. Alternatively, you can use the path to the job template folder, such as job_templates/sag_pt """, ) create_parser.add_argument( "-sd", "--script_dir", type=str, nargs="?", help="""script directory contains additional related files. All files or directories under this directory will be copied over to the custom directory.""", ) create_parser.add_argument( "-f", "--config_file", type=str, action="append", nargs="*", help="""Training config file with corresponding optional key=value pairs. If key presents in the preceding config file, the value in the config file will be overwritten by the new value """, ) create_parser.add_argument("-debug", "--debug", action="store_true", help="debug is on") create_parser.add_argument( "-force", "--force", action="store_true", help="force create is on, if -force, " "overwrite existing configuration with newly created configurations", ) create_parser.add_argument("--schema", action="store_true", help="print command schema as JSON and exit") job_sub_cmd_parser[CMD_CREATE_JOB] = create_parser
[docs] def prepare_job_config(cmd_args, app_names: List[str], tmp_job_dir: Optional[str] = None): merged_conf, config_modified = merge_configs_from_cli(cmd_args, app_names) need_save_config = config_modified is True or tmp_job_dir is not None if tmp_job_dir is None: tmp_job_dir = cmd_args.job_folder if need_save_config: save_merged_configs(merged_conf, cmd_args.job_folder, tmp_job_dir) variable_values = filter_indices(merged_conf) return variable_values
[docs] def has_client_config_file(app_config_dir): return any( [ os.path.exists(os.path.join(app_config_dir, f"config_fed_client{postfix}")) for postfix in ConfigFormat.extensions() ] )
[docs] def save_merged_configs(app_merged_conf, job_folder, tmp_job_dir): for app_name, merged_conf in app_merged_conf.items(): for file, (config, excluded_key_List, key_indices) in merged_conf.items(): if job_folder == tmp_job_dir: dst_path = file else: rel_file_path = os.path.relpath(file, job_folder) dst_path = os.path.join(tmp_job_dir, rel_file_path) root_index = get_root_index(next(iter(key_indices.values()))[0]) save_config(root_index.value, dst_path)
[docs] def prepare_meta_config(cmd_args, target_template_dir, app_names): job_folder = cmd_args.job_folder job_folder = job_folder[:-1] if job_folder.endswith("/") else job_folder job_name = os.path.basename(job_folder) meta_files = [f"{JOB_META_BASE_NAME}{postfix}" for postfix in ConfigFormat.extensions()] dst_path = None for mf in meta_files: meta_path = os.path.join(job_folder, mf) if os.path.isfile(meta_path): dst_path = meta_path break src_meta_path = os.path.join(target_template_dir, f"{JOB_META_BASE_NAME}.conf") if not os.path.isfile(src_meta_path): dst_config = load_default_config_template(f"{JOB_META_BASE_NAME}.conf") else: dst_config = CF.parse_file(src_meta_path) # Use existing meta.conf if user already defined it. if not dst_path or (dst_path and cmd_args.force): dst_config.put("name", job_name) dst_path = os.path.join(job_folder, f"{JOB_META_BASE_NAME}.conf") save_config(dst_config, dst_path) # clean up app_names = [DEFAULT_APP_NAME] if not app_names else app_names for app_name in app_names: config_dir = get_config_dir(job_folder, app_name) for mf in meta_files: meta_path = os.path.join(config_dir, mf) if os.path.isfile(meta_path): os.remove(meta_path)
[docs] def load_default_config_template(config_file_name: str): file_dir = os.path.dirname(__file__) # src config here is always pyhocon config_template = CF.parse_file(os.path.join(file_dir, f"config/{config_file_name}")) return config_template
[docs] def dst_app_path(job_folder: str, app_name="app"): return os.path.join(job_folder, app_name)
[docs] def dst_config_path(job_folder, config_filename, app_name: str = "app"): config_dir = get_config_dir(job_folder, app_name) dst_path = os.path.join(config_dir, config_filename) return dst_path
[docs] def get_config_dirs(job_folder: str, app_names: List[str]) -> List[str]: config_dirs = [] if app_names: for app_name in app_names: config_dirs.append(get_config_dir(job_folder, app_name)) else: config_dirs.append(get_config_dir(job_folder, "app")) return config_dirs
[docs] def get_config_dir(job_folder: str, app_name: str) -> str: app_dir = dst_app_path(job_folder, app_name) config_dir = os.path.join(app_dir, "config") return config_dir
[docs] def convert_args_list_to_dict(kvs: Optional[List[str]] = None) -> dict: """ Convert a list of key-value strings to a dictionary. Args: kvs (Optional[List[str]]): A list of key-value strings in the format "key=value". Returns: dict: A dictionary containing the key-value pairs from the input list. """ kv_dict = {} if kvs: for kv in kvs: try: key, value = kv.split("=", 1) kv_dict[key.strip()] = value.strip() except ValueError: raise ValueError(f"Invalid key-value pair: '{kv}'") return kv_dict
[docs] def prepare_job_folder(cmd_args): job_folder = cmd_args.job_folder if job_folder: if not os.path.exists(job_folder): os.makedirs(job_folder) elif not os.path.isdir(job_folder): raise ValueError(f"job_folder '{job_folder}' exits but not directory") elif cmd_args.force: shutil.rmtree(job_folder) os.makedirs(job_folder) app_folder = os.path.join(cmd_args.job_folder, "app") if os.path.exists(app_folder): if cmd_args.force: shutil.rmtree(app_folder) else: from nvflare.tool.cli_output import print_human print_human( """\nwarning: app directory already exists. \nIf you would like to overwrite, use -force option""" )
[docs] def is_subdir(path, directory): # Normalize the paths to avoid issues with different OS formats path = os.path.realpath(path) directory = os.path.realpath(directory) # Check if the directory is a prefix of the path return os.path.commonpath([path, directory]) == directory
[docs] def prepare_app_scripts(job_folder, app_custom_dirs, cmd_args): script_dir = cmd_args.script_dir for app_custom_dir in app_custom_dirs: if script_dir and len(script_dir.strip()) > 0: if os.path.exists(script_dir): if script_dir == job_folder or is_subdir(job_folder, script_dir): raise ValueError("job_folder must not be the same or sub directory of script_dir") shutil.copytree(cmd_args.script_dir, app_custom_dir, dirs_exist_ok=True) remove_pycache_files(app_custom_dir) else: raise ValueError(f"{cmd_args.script_dir} doesn't exists")
[docs] def prepare_app_dirs(job_folder: str, app_names: List[str]) -> List[str]: app_names = ["app"] if not app_names else app_names app_custom_dirs = [] for app_name in app_names: app_custom_dir = create_app_dir(job_folder=job_folder, app_name=app_name) app_custom_dirs.append(app_custom_dir) return app_custom_dirs
[docs] def create_app_dir(job_folder, app_name: str = "app"): app_dir = os.path.join(job_folder, app_name) app_config_dir = os.path.join(app_dir, "config") app_custom_dir = os.path.join(app_dir, "custom") dirs = [app_dir, app_config_dir, app_custom_dir] for d in dirs: os.makedirs(d, exist_ok=True) return app_custom_dir
# --------------------------------------------------------------------------- # Section 3: New Job Lifecycle Commands # --------------------------------------------------------------------------- def _get_session(args=None, admin_user_dir=None, username=None, study="default"): """Create a secure session using command selectors, env, or active startup kit.""" from nvflare.tool.cli_output import get_connect_timeout, output_error timeout = get_connect_timeout() if admin_user_dir is None and username is None: try: return new_cli_session_for_args( args=args, timeout=timeout, study=study, debug=get_arg_value(args, "debug", False), ) except ValueError as e: output_error( "STARTUP_KIT_MISSING", exit_code=2, detail=str(e), hint=getattr(e, "hint", None) or _ACTIVE_STARTUP_KIT_HINT, ) raise SystemExit(2) if admin_user_dir is None or username is None: try: from nvflare.tool.cli_session import resolve_admin_user_and_dir_for_args u, d = resolve_admin_user_and_dir_for_args(args) except ValueError as e: output_error( "STARTUP_KIT_MISSING", exit_code=2, detail=str(e), hint=getattr(e, "hint", None) or _ACTIVE_STARTUP_KIT_HINT, ) raise SystemExit(2) if username is None: username = u if admin_user_dir is None: admin_user_dir = d return new_cli_session( username=username, startup_kit_location=admin_user_dir, timeout=timeout, study=study, ) @contextmanager def _session(args=None, admin_user_dir=None, username=None, study="default"): sess = _get_session(args=args, admin_user_dir=admin_user_dir, username=username, study=study) try: yield sess finally: if sess is not None: sess.close() def _has_scoped_startup_kit_args(args) -> bool: return bool(get_arg_value(args, "kit_id") or get_arg_value(args, "startup_kit")) def _job_session_for_args(cmd_args=None, study="default"): if study != "default" or _has_scoped_startup_kit_args(cmd_args): return _session(args=cmd_args, study=study) return _session() def _job_download_destination(job_id: str, output_dir: Optional[str]) -> str: if not output_dir: # download_job_result treats destination as the parent directory and moves # the downloaded job folder under it. Passing cwd gives the default # final local path ./<job_id> without nesting ./<job_id>/<job_id>. output_dir = "." return os.path.abspath(output_dir) def _is_remote_download_location(location: str) -> bool: return bool(location and re.match(r"^[A-Za-z][A-Za-z0-9+.-]*://", location)) def _local_download_path(location: str) -> Optional[str]: if not location: return None location = str(location) if _is_remote_download_location(location): return None return os.path.abspath(location) def _is_path_within(root_real_path: str, candidate_path: str) -> bool: try: return os.path.commonpath([root_real_path, os.path.realpath(candidate_path)]) == root_real_path except ValueError: return False def _iter_files_under(path: str): if not path or not os.path.isdir(path): return root_real_path = os.path.realpath(path) for root, dirs, files in os.walk(path): # Keep the manifest local even if a symlink or concurrent filesystem change # points a visited entry outside the requested download tree. dirs[:] = sorted( d for d in dirs if not os.path.islink(os.path.join(root, d)) and _is_path_within(root_real_path, os.path.join(root, d)) ) for file_name in sorted(files): file_path = os.path.join(root, file_name) if os.path.islink(file_path) or not _is_path_within(root_real_path, file_path): continue yield file_path return None def _is_identifiable_server_log(rel_parts: List[str]) -> bool: return any(part.lower() in {"server", "app_server"} for part in rel_parts[:-1]) def _site_name_from_log_path(download_path: str, log_path: str) -> Optional[str]: try: rel_path = os.path.relpath(log_path, download_path) except ValueError: return None rel_parts = rel_path.split(os.sep) if len(rel_parts) < 2 or _is_identifiable_server_log(rel_parts): return None parent_parts = rel_parts[:-1] for part in parent_parts: if part.startswith("app_site-"): return part[len("app_") :] for part in parent_parts: if part.startswith("site-"): return part parent = parent_parts[-1] if parent.lower() in {"local", "logs", "simulate_job", "startup", "workspace"}: return None return parent def _discover_job_download_artifacts(download_path: str) -> Tuple[dict, List[str]]: artifacts = {} client_logs = {} for file_path in _iter_files_under(download_path): file_name = os.path.basename(file_path) if file_name in _JOB_DOWNLOAD_GLOBAL_MODEL_NAMES and "global_model" not in artifacts: artifacts["global_model"] = file_path continue elif file_name == "metrics_summary.json" and "metrics_summary" not in artifacts: artifacts["metrics_summary"] = file_path continue elif file_name != "log.txt": continue site_name = _site_name_from_log_path(download_path, file_path) if site_name and site_name not in client_logs: client_logs[site_name] = file_path if client_logs: artifacts["client_logs"] = client_logs missing_artifacts = [name for name in _JOB_DOWNLOAD_EXPECTED_ARTIFACTS if name not in artifacts] return artifacts, missing_artifacts
[docs] def cmd_job_list(cmd_args): from nvflare.fuel.flare_api.api_spec import AuthenticationError, NoConnection, SubmitTokenJobDeleted from nvflare.tool.cli_output import output_error, output_ok from nvflare.tool.cli_schema import handle_schema_flag handle_schema_flag( job_sub_cmd_parser[CMD_JOB_LIST], "nvflare job list", [ "nvflare job list", "nvflare job list -n cifar -m 10", "nvflare job list --submit-token retry-001", "nvflare job list --study cancer_research", ], sys.argv[1:], output_modes=_JSON_OUTPUT_MODES, streaming=False, mutating=False, idempotent=True, retry_token=_NO_RETRY_TOKEN_SCHEMA, ) study = getattr(cmd_args, "study", "default") try: with _job_session_for_args(cmd_args, study=study) as sess: jobs = sess.list_jobs( name_prefix=getattr(cmd_args, "name", None), id_prefix=getattr(cmd_args, "id", None), reverse=getattr(cmd_args, "reverse", False), limit=getattr(cmd_args, "max", None), submit_token=getattr(cmd_args, "submit_token", None), ) except AuthenticationError: raise except SubmitTokenJobDeleted as e: data = {"job_id": e.job_id, "state": e.state, "deleted_time": e.deleted_time} output_error( "SUBMIT_TOKEN_JOB_DELETED", exit_code=4, detail=str(e), data={key: value for key, value in data.items() if value is not None}, ) return except NoConnection as e: output_error("CONNECTION_FAILED", exit_code=2, detail=str(e)) return for j in jobs: if "study" not in j: j["study"] = study output_ok(jobs)
[docs] def cmd_job_meta(cmd_args): from nvflare.fuel.flare_api.api_spec import AuthenticationError, JobNotFound, NoConnection from nvflare.tool.cli_output import is_json_mode, output_error, output_ok, print_human from nvflare.tool.cli_schema import handle_schema_flag handle_schema_flag( job_sub_cmd_parser[CMD_JOB_META], "nvflare job meta", ["nvflare job meta <job_id>", "nvflare job meta <job_id> --study cancer"], sys.argv[1:], output_modes=_JSON_OUTPUT_MODES, streaming=False, mutating=False, idempotent=True, retry_token=_NO_RETRY_TOKEN_SCHEMA, ) study = get_arg_value(cmd_args, "study", "default") try: with _job_session_for_args(cmd_args, study=study) as sess: meta = sess.get_job_meta(cmd_args.job_id) except JobNotFound: output_error( "JOB_NOT_FOUND", job_id=cmd_args.job_id, detail=f"searched study '{study}'", hint=_job_not_found_hint(study), ) return except AuthenticationError: raise except NoConnection as e: output_error("CONNECTION_FAILED", exit_code=2, detail=str(e)) return if meta is None: output_error( "JOB_NOT_FOUND", job_id=cmd_args.job_id, detail=f"searched study '{study}'", hint=_job_not_found_hint(study), ) return else: if is_json_mode(): output_ok(meta) else: _print_job_meta_human(meta, print_human)
def _format_job_meta_value(value): if value is None or value == "": return "-" if isinstance(value, bool): return "yes" if value else "no" if isinstance(value, (dict, list)): return json.dumps(value, default=str) return str(value) def _format_deploy_map(deploy_map): if not isinstance(deploy_map, dict) or not deploy_map: return "-" parts = [] for app_name, targets in deploy_map.items(): if isinstance(targets, (list, tuple)): target_text = ", ".join(str(target) for target in targets) else: target_text = str(targets) parts.append(f"{app_name} -> {target_text}") return "; ".join(parts) def _format_submitter(meta: dict): submitter = meta.get("submitter_name") if not submitter: return "-" role = meta.get("submitter_role") org = meta.get("submitter_org") if role and org: return f"{submitter} ({role} @ {org})" if role: return f"{submitter} ({role})" if org: return f"{submitter} ({org})" return submitter def _format_job_meta_timestamp(value, include_zone=False): if value is None or value == "": return None try: if isinstance(value, (int, float)): dt = datetime.datetime.fromtimestamp(value) else: text = str(value).strip() if text.endswith("Z"): text = text[:-1] + "+00:00" dt = datetime.datetime.fromisoformat(text) formatted = dt.strftime("%Y-%m-%d %H:%M:%S") if include_zone and dt.tzinfo: offset = dt.strftime("%z") zone = {"-0700": "PDT", "-0800": "PST"}.get(offset, dt.tzname() or offset) if zone: formatted = f"{formatted} {zone}" return formatted except (TypeError, ValueError): return str(value) def _format_job_meta_duration(value): if value is None or value == "": return None if isinstance(value, (int, float)): total_seconds = int(round(value)) else: text = str(value).strip() try: parts = text.split(":") if len(parts) == 3: hours = int(parts[0]) minutes = int(parts[1]) seconds = int(float(parts[2])) total_seconds = hours * 3600 + minutes * 60 + seconds else: total_seconds = int(round(float(text))) except (TypeError, ValueError): return text hours, remainder = divmod(total_seconds, 3600) minutes, seconds = divmod(remainder, 60) pieces = [] if hours: pieces.append(f"{hours}h") if minutes: pieces.append(f"{minutes}m") if seconds or not pieces: pieces.append(f"{seconds}s") return " ".join(pieces) def _print_section_table(print_func, sections): headers = ["Section", "Field", "Value"] rows = [] for section, fields in sections: visible_fields = [(field, _format_job_meta_value(value)) for field, value in fields if value is not None] for index, (field, value) in enumerate(visible_fields): rows.append([section if index == 0 else "", field, value, index == 0]) if not rows: return widths = [max(len(headers[col_index]), *(len(row[col_index]) for row in rows)) for col_index in range(len(headers))] def _border(left, middle, right): return left + middle.join("─" * (width + 2) for width in widths) + right def _row(values): return "│ " + " │ ".join(value.ljust(width) for value, width in zip(values, widths)) + " │" print_func(_border("┌", "┬", "┐")) print_func(_row(headers)) print_func(_border("├", "┼", "┤")) for index, row in enumerate(rows): if index and row[3]: print_func(_border("├", "┼", "┤")) print_func(_row(row[:3])) print_func(_border("└", "┴", "┘")) def _print_job_meta_human(meta: dict, print_func): deployment_fields = [] for entry in meta.get("job_deploy_detail") or []: if isinstance(entry, str) and ":" in entry: site, status = entry.split(":", 1) deployment_fields.append((site.strip(), status.strip())) else: deployment_fields.append(("detail", _format_job_meta_value(entry))) sections = [ ( "Identity", [ ("Job ID", meta.get("job_id")), ("Name", meta.get("name")), ("Study", meta.get("study")), ("Submitter", _format_submitter(meta)), ], ), ( "Status", [ ("Status", meta.get("status")), ("Submitted", _format_job_meta_timestamp(meta.get("submit_time_iso") or meta.get("submit_time"), True)), ("Started", _format_job_meta_timestamp(meta.get("start_time"))), ("Duration", _format_job_meta_duration(meta.get("duration"))), ], ), ( "Execution", [ ("Min clients", meta.get("min_clients")), ("BYOC", meta.get("byoc")), ("Deploy map", _format_deploy_map(meta.get("deploy_map"))), ], ), ("Deployment", deployment_fields), ( "Schedule", [ ("Attempts", meta.get("schedule_count")), ("Last run", _format_job_meta_timestamp(meta.get("last_schedule_time"))), ], ), ( "Details", [ ("Job folder", meta.get("job_folder_name")), ("Storage format", meta.get("data_storage_format")), ], ), ] if meta.get("resource_spec"): sections[2][1].append(("Resource spec", meta.get("resource_spec"))) _print_section_table(print_func, sections)
[docs] def cmd_job_abort(cmd_args): from nvflare.fuel.flare_api.api_spec import AuthenticationError, JobNotFound, JobNotRunning, NoConnection from nvflare.tool.cli_output import output_error, output_ok from nvflare.tool.cli_schema import handle_schema_flag handle_schema_flag( job_sub_cmd_parser[CMD_JOB_ABORT], "nvflare job abort", ["nvflare job abort <job_id>", "nvflare job abort <job_id> --study cancer --force"], sys.argv[1:], ) study = get_arg_value(cmd_args, "study", "default") if not cmd_args.force: if not sys.stdin.isatty(): output_error( "INVALID_ARGS", exit_code=4, detail="use --force in non-interactive mode", ) raise SystemExit(4) from nvflare.tool.cli_output import print_human, prompt_yn if not prompt_yn(f"Abort job '{cmd_args.job_id}' in study '{study}'?"): print_human("Aborted.") return try: with _job_session_for_args(cmd_args, study=study) as sess: sess.abort_job(cmd_args.job_id) except JobNotFound: output_error( "JOB_NOT_FOUND", job_id=cmd_args.job_id, detail=f"searched study '{study}'", hint=_job_not_found_hint(study), ) return except JobNotRunning: output_error( "JOB_NOT_RUNNING", job_id=cmd_args.job_id, detail="abort is available only while the job is running", ) return except AuthenticationError: raise except NoConnection as e: output_error("CONNECTION_FAILED", exit_code=2, detail=str(e)) return output_ok({"job_id": cmd_args.job_id, "status": "ABORTED"})
[docs] def cmd_job_clone(cmd_args): from nvflare.fuel.flare_api.api_spec import AuthenticationError, JobNotFound, NoConnection from nvflare.tool.cli_output import output_error, output_ok from nvflare.tool.cli_schema import handle_schema_flag handle_schema_flag( job_sub_cmd_parser[CMD_JOB_CLONE], "nvflare job clone", ["nvflare job clone <job_id>", "nvflare job clone <job_id> --study cancer"], sys.argv[1:], ) study = get_arg_value(cmd_args, "study", "default") try: with _job_session_for_args(cmd_args, study=study) as sess: new_job_id = sess.clone_job(cmd_args.job_id) except JobNotFound: output_error( "JOB_NOT_FOUND", job_id=cmd_args.job_id, detail=f"searched study '{study}'", hint=_job_not_found_hint(study), ) return except AuthenticationError: raise except NoConnection as e: output_error("CONNECTION_FAILED", exit_code=2, detail=str(e)) return output_ok({"source_job_id": cmd_args.job_id, "new_job_id": new_job_id})
[docs] def cmd_job_download(cmd_args): from nvflare.fuel.flare_api.api_spec import AuthenticationError, JobNotDone, JobNotFound, NoConnection from nvflare.tool.cli_output import is_json_mode, output_error, output_ok, print_human from nvflare.tool.cli_schema import handle_schema_flag handle_schema_flag( job_sub_cmd_parser[CMD_JOB_DOWNLOAD], "nvflare job download", [ "nvflare job download <job_id>", "nvflare job download <job_id> -o /path/to/results", "nvflare job download <job_id> --study cancer", "nvflare job download <job_id> --force", ], sys.argv[1:], output_modes=_JSON_OUTPUT_MODES, streaming=False, mutating=True, idempotent=False, retry_token=_NO_RETRY_TOKEN_SCHEMA, ) json_mode = is_json_mode() study = get_arg_value(cmd_args, "study", "default") force = get_arg_value(cmd_args, "force", False) destination = _job_download_destination(cmd_args.job_id, getattr(cmd_args, "output_dir", None)) expected_download_path = os.path.join(destination, cmd_args.job_id) job_wait_hint = ( f"Use 'nvflare job wait {cmd_args.job_id} --study {study}' or " f"'nvflare job monitor {cmd_args.job_id} --study {study}' before downloading results." ) try: with _job_session_for_args(cmd_args, study=study) as sess: meta = sess.get_job_meta(cmd_args.job_id) status = meta.get("status") if isinstance(meta, dict) else None if status and not _is_terminal_job_status(status): output_error( "JOB_NOT_DONE", exit_code=4, job_id=cmd_args.job_id, detail=f"current status: {status}; searched study '{study}'", hint=job_wait_hint, ) return if os.path.exists(expected_download_path): if not force: output_error( "INVALID_ARGS", exit_code=4, detail=f"download destination already exists: {expected_download_path}", hint="Use --force to replace the existing download, remove the directory, or choose a different --output-dir.", ) return if os.path.isdir(expected_download_path) and not os.path.islink(expected_download_path): shutil.rmtree(expected_download_path) else: os.remove(expected_download_path) if not json_mode: print_human(f"Downloading job {cmd_args.job_id} from study {study} ...") path = sess.download_job_result(cmd_args.job_id, destination) except JobNotFound: output_error( "JOB_NOT_FOUND", job_id=cmd_args.job_id, detail=f"searched study '{study}'", hint=_job_not_found_hint(study), ) return except JobNotDone as e: output_error( "JOB_NOT_DONE", exit_code=4, job_id=cmd_args.job_id, detail=f"{e}; searched study '{study}'", hint=job_wait_hint, ) return except AuthenticationError: raise except NoConnection as e: output_error("CONNECTION_FAILED", exit_code=2, detail=str(e)) return except shutil.Error as e: output_error( "INVALID_ARGS", exit_code=4, detail=str(e), hint="Use --force to replace the existing download, remove the directory, or choose a different --output-dir.", ) return except OSError as e: output_error("OUTPUT_DIR_NOT_WRITABLE", path=destination, detail=str(e)) return path = str(path or destination) download_path = _local_download_path(path) if not json_mode: print_human(f"Job result downloaded to: {download_path or path}") payload = { "job_id": cmd_args.job_id, "download_path": download_path, "path": download_path or path, } if download_path and os.path.isdir(download_path): artifacts, missing_artifacts = _discover_job_download_artifacts(download_path) payload["artifact_discovery"] = "completed" payload["artifacts"] = artifacts payload["missing_artifacts"] = missing_artifacts else: payload["artifact_discovery"] = "skipped" payload["artifacts"] = None payload["missing_artifacts"] = None if json_mode: output_ok(payload)
[docs] def cmd_job_delete(cmd_args): from nvflare.fuel.flare_api.api_spec import AuthenticationError, JobNotDone, JobNotFound, NoConnection from nvflare.tool.cli_output import output_error, output_ok from nvflare.tool.cli_schema import handle_schema_flag handle_schema_flag( job_sub_cmd_parser[CMD_JOB_DELETE], "nvflare job delete", ["nvflare job delete <job_id>", "nvflare job delete <job_id> --study cancer --force"], sys.argv[1:], ) study = get_arg_value(cmd_args, "study", "default") job_abort_hint = ( f"Use 'nvflare job abort {cmd_args.job_id} --study {study}' to stop the job first, " "or wait/monitor until it finishes before deleting." ) if not cmd_args.force: if not sys.stdin.isatty(): output_error( "INVALID_ARGS", exit_code=4, detail="use --force in non-interactive mode", ) raise SystemExit(4) from nvflare.tool.cli_output import print_human, prompt_yn if not prompt_yn(f"Delete job '{cmd_args.job_id}' in study '{study}'?"): print_human("Cancelled.") return try: with _job_session_for_args(cmd_args, study=study) as sess: result = sess.delete_job(cmd_args.job_id) except JobNotFound: output_error( "JOB_NOT_FOUND", job_id=cmd_args.job_id, detail=f"searched study '{study}'", hint=_job_not_found_hint(study), ) return except JobNotDone as e: output_error( "JOB_NOT_DONE", exit_code=4, job_id=cmd_args.job_id, detail=f"{e}; searched study '{study}'", hint=job_abort_hint, ) return except AuthenticationError: raise except NoConnection as e: output_error("CONNECTION_FAILED", exit_code=2, detail=str(e)) return if not isinstance(result, dict): result = {"job_id": cmd_args.job_id} result.setdefault("job_id", cmd_args.job_id) output_ok(result)
# --------------------------------------------------------------------------- # Parser definitions for new commands # ---------------------------------------------------------------------------
[docs] def define_list_jobs_parser(job_subparser): p = job_subparser.add_parser(CMD_JOB_LIST, help="list jobs on the server") p.add_argument("-n", "--name", type=str, default=None, help="filter by name prefix") p.add_argument("-i", "--id", type=str, default=None, help="filter by job ID prefix") p.add_argument("-r", "--reverse", action="store_true", default=False, help="reverse sort order") p.add_argument("-m", "--max", type=int, default=None, help="max results to return") p.add_argument("--study", type=str, default="default", help="study to list jobs from") p.add_argument( "--submit-token", type=_submit_token_arg, default=None, help="retry-safe submit token to resolve a submitted job", ) add_startup_kit_selection_args(p) p.add_argument("--schema", action="store_true", help="print command schema as JSON and exit") job_sub_cmd_parser[CMD_JOB_LIST] = p job_sub_cmd_handlers[CMD_JOB_LIST] = cmd_job_list
[docs] def define_job_meta_parser(job_subparser): p = job_subparser.add_parser(CMD_JOB_META, help="get metadata for a job") p.add_argument("job_id", type=str, help="job ID") p.add_argument("--study", type=str, default="default", help="study containing the job") add_startup_kit_selection_args(p) p.add_argument("--schema", action="store_true", help="print command schema as JSON and exit") job_sub_cmd_parser[CMD_JOB_META] = p job_sub_cmd_handlers[CMD_JOB_META] = cmd_job_meta
[docs] def define_abort_job_parser(job_subparser): p = job_subparser.add_parser(CMD_JOB_ABORT, help="abort a running job") p.add_argument("job_id", type=str, help="job ID") p.add_argument("--force", action="store_true", help="skip confirmation prompt") p.add_argument("--study", type=str, default="default", help="study containing the job") add_startup_kit_selection_args(p) p.add_argument("--schema", action="store_true", help="print command schema as JSON and exit") job_sub_cmd_parser[CMD_JOB_ABORT] = p job_sub_cmd_handlers[CMD_JOB_ABORT] = cmd_job_abort
[docs] def define_clone_job_parser(job_subparser): p = job_subparser.add_parser(CMD_JOB_CLONE, help="clone an existing job") p.add_argument("job_id", type=str, help="job ID to clone") p.add_argument("--study", type=str, default="default", help="study containing the source job") add_startup_kit_selection_args(p) p.add_argument("--schema", action="store_true", help="print command schema as JSON and exit") job_sub_cmd_parser[CMD_JOB_CLONE] = p job_sub_cmd_handlers[CMD_JOB_CLONE] = cmd_job_clone
[docs] def define_download_job_parser(job_subparser): p = job_subparser.add_parser(CMD_JOB_DOWNLOAD, help="download job result") p.add_argument("job_id", type=str, help="job ID") p.add_argument( "-o", "--output-dir", dest="output_dir", type=str, default=None, help="destination directory, default to ./<job_id>", ) p.add_argument("--study", type=str, default="default", help="study containing the job") p.add_argument("--force", action="store_true", help="replace an existing local download directory") add_startup_kit_selection_args(p) p.add_argument("--schema", action="store_true", help="print command schema as JSON and exit") job_sub_cmd_parser[CMD_JOB_DOWNLOAD] = p job_sub_cmd_handlers[CMD_JOB_DOWNLOAD] = cmd_job_download
[docs] def define_delete_job_parser(job_subparser): p = job_subparser.add_parser(CMD_JOB_DELETE, help="delete a job") p.add_argument("job_id", type=str, help="job ID") p.add_argument("--force", action="store_true", help="skip confirmation prompt") p.add_argument("--study", type=str, default="default", help="study containing the job") add_startup_kit_selection_args(p) p.add_argument("--schema", action="store_true", help="print command schema as JSON and exit") job_sub_cmd_parser[CMD_JOB_DELETE] = p job_sub_cmd_handlers[CMD_JOB_DELETE] = cmd_job_delete
_TERMINAL_JOB_STATES = { "FINISHED_OK", "FINISHED_EXCEPTION", "ABORTED", "ABANDONED", "FAILED", } def _is_terminal_job_status(status: str) -> bool: return isinstance(status, str) and (status.startswith("FINISHED:") or status in _TERMINAL_JOB_STATES)
[docs] def cmd_job_stats(cmd_args): from nvflare.fuel.flare_api.api_spec import AuthenticationError, JobNotFound, JobNotRunning, NoConnection from nvflare.tool.cli_output import output_error, output_ok from nvflare.tool.cli_schema import handle_schema_flag handle_schema_flag( job_sub_cmd_parser[CMD_JOB_STATS], "nvflare job stats", [ "nvflare job stats abc123", "nvflare job stats abc123 --site server", "nvflare job stats abc123 --study cancer", ], sys.argv[1:], ) study = get_arg_value(cmd_args, "study", "default") site = getattr(cmd_args, "site", "all") if site == "all": target_type = "all" targets = None elif site == "server": target_type = "server" targets = None else: target_type = "client" targets = [site] try: with _job_session_for_args(cmd_args, study=study) as sess: result = sess.show_stats(cmd_args.job_id, target_type, targets) except JobNotFound: output_error( "JOB_NOT_FOUND", job_id=cmd_args.job_id, detail=f"searched study '{study}'", hint=_job_not_found_hint(study), ) return except JobNotRunning: output_error( "JOB_NOT_RUNNING", job_id=cmd_args.job_id, detail="stats are available only while the job is running", ) return except AuthenticationError: raise except NoConnection as e: output_error("CONNECTION_FAILED", exit_code=2, detail=str(e)) return output_ok({"job_id": cmd_args.job_id, "stats": result})
def _normalize_log_timestamp(value: str) -> datetime.datetime: text = (value or "").strip() if not text: raise ValueError("--since must be a non-empty timestamp") if text.endswith("Z"): text = text[:-1] + "+00:00" text = text.replace(" ", "T", 1).replace(",", ".") try: ts = datetime.datetime.fromisoformat(text) except ValueError as e: raise ValueError(f"invalid --since timestamp: {value}") from e if ts.tzinfo is not None: ts = ts.astimezone(datetime.timezone.utc).replace(tzinfo=None) return ts def _log_since_arg(value: str) -> str: try: _normalize_log_timestamp(value) except ValueError as e: raise argparse.ArgumentTypeError( f"{e}; accepted formats include ISO timestamps such as 2026-04-28T10:00:00 or 2026-04-28T10:00:00Z" ) from e return value def _parse_log_line_timestamp(line: str): match = _JOB_LOG_TS_RE.match(line) if not match: return None try: # Naive server log timestamps are compared as UTC-equivalent naive values; # explicit timezone offsets are normalized to UTC by _normalize_log_timestamp. return _normalize_log_timestamp(match.group("ts")) except ValueError: return None def _filter_log_since(text: str, since_ts: datetime.datetime): lines = text.splitlines(keepends=True) if not lines: return text, False, False filtered = [] keep_current = False saw_timestamp = False for line in lines: # This is line-based filtering. Continuation lines are kept with the most # recent parsed timestamp and can be imperfect for heavily interleaved logs. line_ts = _parse_log_line_timestamp(line) if line_ts is not None: saw_timestamp = True keep_current = line_ts >= since_ts if keep_current: filtered.append(line) if not saw_timestamp: return text, False, False return "".join(filtered), True, len(filtered) != len(lines) def _parse_log_json_line_timestamp(line: str): try: record = json.loads(line) except (TypeError, ValueError): return None if not isinstance(record, dict): return None for key in ("asctime", "timestamp", "time"): value = record.get(key) if not value: continue try: return _normalize_log_timestamp(str(value)) except ValueError: continue return None def _filter_log_json_since(text: str, since_ts: datetime.datetime): lines = text.splitlines(keepends=True) if not lines: return text, False, False filtered = [] saw_timestamp = False for line in lines: line_ts = _parse_log_json_line_timestamp(line) if line_ts is None: continue saw_timestamp = True if line_ts >= since_ts: filtered.append(line) if not saw_timestamp: return text, False, False return "".join(filtered), True, len(filtered) != len(lines) def _looks_like_json_log(text: str) -> bool: for line in text.splitlines(): stripped = line.strip() if not stripped: continue try: return isinstance(json.loads(stripped), dict) except (TypeError, ValueError): return False return False def _format_log_json_record(record: dict) -> str: asctime = record.get("asctime") or record.get("timestamp") or record.get("time") or "" logger_name = record.get("name") or record.get("fullName") or "" level = record.get("levelname") or record.get("level") or "" fl_ctx = record.get("fl_ctx") or "" message = record.get("message") or record.get("msg") or "" parts = [str(part) for part in (asctime, logger_name, level, fl_ctx, message) if part not in (None, "")] return " - ".join(parts) if parts else json.dumps(record, default=str) def _render_log_json_as_text(text: str) -> str: rendered = [] for line in text.splitlines(): stripped = line.strip() if not stripped: continue try: record = json.loads(stripped) except (TypeError, ValueError): rendered.append(line) continue if isinstance(record, dict): rendered.append(_format_log_json_record(record)) else: rendered.append(line) if not rendered: return "" return "\n".join(rendered) + "\n" def _tail_log_text(text: str, tail_lines: int): lines = text.splitlines(keepends=True) if tail_lines is None or len(lines) <= tail_lines: return text, False if tail_lines == 0: return "", bool(lines) return "".join(lines[-tail_lines:]), True def _truncate_log_bytes(text: str, max_bytes: int): if max_bytes is None: return text, False encoded = text.encode("utf-8") if len(encoded) <= max_bytes: return text, False return encoded[:max_bytes].decode("utf-8", errors="ignore"), True def _line_count(text: str) -> int: return len(text.splitlines()) if text else 0 def _apply_job_log_bounds(logs: dict, tail_lines, since_ts, max_bytes): bounded_logs = {} sites = {} any_truncated = False since_applied_any = False for site_name, log_text in logs.items(): if not isinstance(log_text, str): bounded_logs[site_name] = log_text sites[site_name] = { "available": True, "lines": None, "logs_truncated": False, } continue bounded = log_text site_truncated = False since_applied = False if since_ts is not None: if _looks_like_json_log(bounded): bounded, since_applied, _since_filtered = _filter_log_json_since(bounded, since_ts) else: bounded, since_applied, _since_filtered = _filter_log_since(bounded, since_ts) since_applied_any = since_applied_any or since_applied bounded, tail_truncated = _tail_log_text(bounded, tail_lines) bounded, byte_truncated = _truncate_log_bytes(bounded, max_bytes) site_truncated = tail_truncated or byte_truncated any_truncated = any_truncated or site_truncated bounded_logs[site_name] = bounded sites[site_name] = { "available": True, "lines": _line_count(bounded), "bytes": len(bounded.encode("utf-8")), "logs_truncated": site_truncated, } return bounded_logs, sites, any_truncated, since_applied_any def _select_job_logs_for_output(result: dict, json_mode: bool): logs = result.get("logs", {}) if not isinstance(logs, dict): return {} if json_mode: return logs return { site_name: _render_log_json_as_text(log_text) if _looks_like_json_log(log_text) else log_text for site_name, log_text in logs.items() } def _preferred_job_log_file(json_mode: bool) -> str: return "log.json" if json_mode else "log.txt" def _fallback_job_log_file(json_mode: bool) -> str: return "log.txt" if json_mode else "log.json" def _job_log_result_needs_fallback(result: dict, site: str) -> bool: if not isinstance(result, dict): return True logs = result.get("logs", {}) if not isinstance(logs, dict): return True unavailable = result.get("unavailable", {}) if isinstance(unavailable, dict) and unavailable: if site == "all": return any(site_name not in logs for site_name in unavailable) return site in unavailable return not logs def _merge_job_log_fallback_result(primary: dict, fallback: dict) -> dict: if not isinstance(primary, dict): primary = {} if not isinstance(fallback, dict): fallback = {} primary_logs = primary.get("logs", {}) if not isinstance(primary_logs, dict): primary_logs = {} logs = dict(primary_logs) primary_unavailable = primary.get("unavailable", {}) if not isinstance(primary_unavailable, dict): primary_unavailable = {} unavailable = dict(primary_unavailable) fallback_logs = fallback.get("logs", {}) if isinstance(fallback_logs, dict): for site_name, log_text in fallback_logs.items(): logs.setdefault(site_name, log_text) unavailable.pop(site_name, None) fallback_unavailable = fallback.get("unavailable", {}) if isinstance(fallback_unavailable, dict): for site_name, reason in fallback_unavailable.items(): if site_name not in logs: unavailable[site_name] = reason result = dict(primary) result["logs"] = logs if unavailable: result["unavailable"] = unavailable else: result.pop("unavailable", None) return result
[docs] def cmd_job_logs(cmd_args): from nvflare.fuel.flare_api.api_spec import AuthenticationError, JobNotFound, NoConnection from nvflare.tool.cli_output import is_json_mode, output_error, output_ok, print_human from nvflare.tool.cli_schema import handle_schema_flag handle_schema_flag( job_sub_cmd_parser[CMD_JOB_LOGS], "nvflare job logs", [ "nvflare job logs abc123", "nvflare job logs abc123 --site site-1", "nvflare job logs abc123 --site all", "nvflare job logs abc123 --site all --tail 200", "nvflare job logs abc123 --study cancer", ], sys.argv[1:], output_modes=_JSON_OUTPUT_MODES, streaming=False, mutating=False, idempotent=True, retry_token=_NO_RETRY_TOKEN_SCHEMA, ) site = getattr(cmd_args, "site", "server") study = getattr(cmd_args, "study", "default") tail_lines = getattr(cmd_args, "tail", None) since_value = getattr(cmd_args, "since", None) max_bytes = getattr(cmd_args, "max_bytes", None) default_tail_applied = tail_lines is None and since_value is None and max_bytes is None if default_tail_applied: tail_lines = _DEFAULT_JOB_LOG_TAIL_LINES try: since_ts = _normalize_log_timestamp(since_value) if since_value else None except ValueError as e: output_error("INVALID_ARGS", exit_code=4, detail=str(e)) return json_mode = is_json_mode() try: with _job_session_for_args(cmd_args, study=study) as sess: result = sess.get_job_logs(cmd_args.job_id, target=site, log_file_name=_preferred_job_log_file(json_mode)) if _job_log_result_needs_fallback(result, site): fallback_result = sess.get_job_logs( cmd_args.job_id, target=site, log_file_name=_fallback_job_log_file(json_mode) ) result = _merge_job_log_fallback_result(result, fallback_result) except JobNotFound: output_error( "JOB_NOT_FOUND", job_id=cmd_args.job_id, detail=f"searched study '{study}'", hint=_job_not_found_hint(study), ) return except AuthenticationError: raise except NoConnection as e: output_error("CONNECTION_FAILED", exit_code=2, detail=str(e)) return except Exception as e: output_error("INTERNAL_ERROR", exit_code=5, detail=str(e)) return logs = _select_job_logs_for_output(result, json_mode) unavailable = result.get("unavailable", {}) if site != "all" and site != "server" and site in unavailable and site not in logs: output_error( "LOG_NOT_FOUND", exit_code=1, site=site, detail=unavailable.get(site), ) return logs, sites, logs_truncated, since_applied = _apply_job_log_bounds(logs, tail_lines, since_ts, max_bytes) for site_name, reason in unavailable.items(): sites[site_name] = {"available": False, "reason": reason} payload = {"job_id": cmd_args.job_id, "target": site, "logs": logs} payload["logs_truncated"] = logs_truncated payload["sites"] = sites payload["filters"] = { "tail": tail_lines, "since": since_value, "since_applied": since_applied, "max_bytes": max_bytes, "default_tail_applied": default_tail_applied, } if unavailable: payload["unavailable"] = unavailable if not json_mode: _print_job_logs_human(site, logs, unavailable, print_human) return output_ok(payload)
def _print_job_logs_human(target: str, logs: dict, unavailable: dict, print_func): def _print_log_text(text): if text: print_func(text, end="" if text.endswith("\n") else "\n") else: print_func("(no log content)") if target != "all" and target in logs and len(logs) == 1 and not unavailable: _print_log_text(logs[target]) return site_names = list(logs.keys()) if "server" in site_names: site_names.remove("server") site_names.insert(0, "server") for index, site_name in enumerate(site_names): if index: print_func() print_func(f"===== {site_name} =====") _print_log_text(logs[site_name]) if unavailable: if logs: print_func() print_func("Unavailable logs:", file=sys.stderr) for site_name in sorted(unavailable): print_func(f"{site_name}: {unavailable[site_name]}", file=sys.stderr)
[docs] def define_job_stats_parser(job_subparser): p = job_subparser.add_parser(CMD_JOB_STATS, help="show running job statistics") p.add_argument("job_id", type=str, help="job ID") p.add_argument("--site", default="all", help="target site name or all") p.add_argument("--study", type=str, default="default", help="study containing the job") add_startup_kit_selection_args(p) p.add_argument("--schema", action="store_true", help="print command schema as JSON and exit") job_sub_cmd_parser[CMD_JOB_STATS] = p job_sub_cmd_handlers[CMD_JOB_STATS] = cmd_job_stats
[docs] def define_job_logs_parser(job_subparser): p = job_subparser.add_parser(CMD_JOB_LOGS, help="retrieve job logs from the server-side log store") p.add_argument("job_id", type=str, help="job ID") p.add_argument( "--sites", "--site", dest="site", default="server", help="target site name, server, or all. Client logs must have been streamed to the server.", ) p.add_argument( "--tail", type=_non_negative_int, default=None, help="return at most the last N log lines per site; applied after --since", ) p.add_argument( "--since", type=_log_since_arg, default=None, help="return log lines at or after this timestamp; any explicit bound disables the default 500-line tail", ) p.add_argument( "--max-bytes", type=_non_negative_int, default=None, help="return at most N bytes per site; applied after --since and --tail", ) p.add_argument("--study", type=str, default="default", help="study to retrieve job logs from") add_startup_kit_selection_args(p) p.add_argument("--schema", action="store_true", help="print command schema as JSON and exit") job_sub_cmd_parser[CMD_JOB_LOGS] = p job_sub_cmd_handlers[CMD_JOB_LOGS] = cmd_job_logs
def _summarize_monitor_meta(meta: dict, job_meta_key_cls) -> dict: if not meta: return {} fields = { "job_name": job_meta_key_cls.JOB_NAME.value, "status": job_meta_key_cls.STATUS.value, "submit_time": job_meta_key_cls.SUBMIT_TIME_ISO.value, "start_time": job_meta_key_cls.START_TIME.value, "duration": job_meta_key_cls.DURATION.value, "study": job_meta_key_cls.STUDY.value, "submitter_name": job_meta_key_cls.SUBMITTER_NAME.value, "submitter_org": job_meta_key_cls.SUBMITTER_ORG.value, "submitter_role": job_meta_key_cls.SUBMITTER_ROLE.value, } summary = {} for out_key, meta_key in fields.items(): value = meta.get(meta_key) if value not in (None, ""): summary[out_key] = value return summary def _parse_monitor_start_ts(meta: dict, start_time_key: str, submit_time_iso_key: str) -> float: if not meta: return None start_time = meta.get(start_time_key) if start_time: try: return datetime.datetime.strptime(start_time, "%Y-%m-%d %H:%M:%S.%f").timestamp() except Exception: pass submit_time_iso = meta.get(submit_time_iso_key) if submit_time_iso: try: return datetime.datetime.fromisoformat(submit_time_iso).timestamp() except Exception: pass return None def _parse_monitor_duration_seconds(value) -> float: if value is None or value == "" or value == "N/A": return None if isinstance(value, (int, float)): return float(value) if not isinstance(value, str): return None parts = value.split(":") try: if len(parts) == 3: hours = int(parts[0]) minutes = int(parts[1]) seconds = float(parts[2]) return hours * 3600 + minutes * 60 + seconds if len(parts) == 2: minutes = int(parts[0]) seconds = float(parts[1]) return minutes * 60 + seconds if len(parts) == 1: return float(parts[0]) except Exception: return None return None def _build_monitor_key_aliases(extra_metrics: list) -> dict: aliases = { "round": [ "round", "global_round", "current_round", "iteration", "iter", "epoch", ], "accuracy": ["accuracy", "acc", "val_acc", "test_acc"], "loss": ["loss", "train_loss", "val_loss", "test_loss"], } for metric in extra_metrics: aliases[metric] = [metric] return aliases def _extract_monitor_metrics(stats: dict, key_aliases: dict) -> dict: if not isinstance(stats, dict): return {} def _find_key(d: dict, keys: list): for k in keys: if k in d and isinstance(d[k], (int, float, str)): return d[k] return None def _search(d: dict, keys: list): if not isinstance(d, dict): return None value = _find_key(d, keys) if value is not None: return value for v in d.values(): if isinstance(v, dict): found = _search(v, keys) if found is not None: return found return None metrics = {} for out_key, aliases in key_aliases.items(): value = _search(stats, aliases) if value is not None: metrics[out_key] = value return metrics def _make_monitor_state() -> dict: return { "last_status": None, "last_meta": None, "last_emit_ts": 0.0, "last_stats": None, "last_stats_raw": None, "last_stats_ts": 0.0, } def _refresh_monitor_stats(sess, job_id: str, state: dict, stats_target: str, key_aliases: dict): try: stats = sess.show_stats(job_id, stats_target, None) state["last_stats_raw"] = stats state["last_stats"] = _extract_monitor_metrics(stats, key_aliases) except Exception: state["last_stats"] = None state["last_stats_raw"] = None def _emit_monitor_progress(job_id: str, job_meta: dict, state: dict, now: float, start: float, start_ts): from nvflare.apis.job_def import JobMetaKey from nvflare.tool.cli_output import print_human status = job_meta.get("status", "UNKNOWN") if job_meta else "UNKNOWN" summary = _summarize_monitor_meta(job_meta, JobMetaKey) name = summary.get("job_name") elapsed_base = start_ts if start_ts is not None else start elapsed = round(now - elapsed_base, 1) message_parts = [] if state["last_status"] is None: message_parts.append(f"job_id: {job_id}") if name: message_parts.append(f"name: {name}") submit_time = summary.get("submit_time") if submit_time: message_parts.append(f"submit_time: {submit_time}") message_parts.append(f"status: {status}") message_parts.append(f"elapsed_s: {elapsed}") metrics = state.get("last_stats") or {} if metrics: metric_str = " ".join(f"{k}={v}" for k, v in metrics.items()) message_parts.append(f"metrics: {metric_str}") print_human(" ".join(message_parts)) state["last_status"] = status state["last_emit_ts"] = now def _normalize_monitor_event_status(status: str) -> str: if not status: return "UNKNOWN" if status == "FINISHED_OK" or status.startswith("FINISHED:COMPLETED"): return "COMPLETED" if status == "FINISHED_EXCEPTION" or status == "FAILED" or status.startswith("FINISHED:EXECUTION_EXCEPTION"): return "FAILED" if status == "ABORTED" or status.startswith("FINISHED:ABORTED"): return "ABORTED" if status == "ABANDONED" or status.startswith("FINISHED:ABANDONED"): return "ABANDONED" return status def _build_monitor_progress_event(job_id: str, job_meta: dict, state: dict, now: float, start: float, start_ts) -> dict: from nvflare.apis.job_def import JobMetaKey raw_status = job_meta.get("status", "UNKNOWN") if job_meta else "UNKNOWN" elapsed_base = start_ts if start_ts is not None else start event = { "event": "progress", "job_id": job_id, "status": _normalize_monitor_event_status(raw_status), "job_status": raw_status, "terminal": False, "elapsed_s": round(now - elapsed_base, 1), "job_meta": _summarize_monitor_meta(job_meta, JobMetaKey), } metrics = state.get("last_stats") or {} if metrics: event["metrics"] = metrics return event def _emit_monitor_jsonl_progress(job_id: str, job_meta: dict, state: dict, now: float, start: float, start_ts): from nvflare.tool.cli_output import output_jsonl_event output_jsonl_event(_build_monitor_progress_event(job_id, job_meta, state, now, start, start_ts)) state["last_status"] = job_meta.get("status", "UNKNOWN") if job_meta else "UNKNOWN" state["last_emit_ts"] = now def _build_monitor_terminal_event(data: dict, event: str = "terminal", error_code: str = None) -> dict: raw_status = data.get("status", "UNKNOWN") if data else "UNKNOWN" result = { "event": event, "job_id": data.get("job_id") if data else None, "status": _normalize_monitor_event_status(raw_status), "job_status": raw_status, "terminal": True, "duration_s": data.get("duration_s") if data else None, "job_meta": data.get("job_meta") if data else {}, "metrics": data.get("last_stats") if data else None, } if data and "stats_raw" in data: result["stats_raw"] = data.get("stats_raw") if error_code: result["error_code"] = error_code return result def _build_monitor_timeout_event(job_id: str, timeout: int, start: float, start_ts, cb_state: dict) -> dict: from nvflare.apis.job_def import JobMetaKey elapsed_base = start_ts if start_ts is not None else start event = { "event": "terminal", "job_id": job_id, "status": "TIMEOUT", "terminal": True, "timeout_seconds": timeout, "elapsed_s": round(time.time() - elapsed_base, 1), } last_meta = cb_state.get("last_meta") if last_meta: raw_status = last_meta.get("status", "UNKNOWN") event["job_status"] = raw_status event["job_meta"] = _summarize_monitor_meta(last_meta, JobMetaKey) metrics = cb_state.get("last_stats") if metrics: event["metrics"] = metrics return event def _build_monitor_status_callback( start: float, start_ts_holder: dict, emit_interval: int, stats_interval: int, stats_target: str, key_aliases: dict, jsonl_mode: bool = False, quiet_mode: bool = False, ): def _status_cb(sess, job_id, job_meta, state): from nvflare.apis.job_def import JobMetaKey state["last_meta"] = job_meta status = job_meta.get("status", "UNKNOWN") if job_meta else "UNKNOWN" now = time.time() if start_ts_holder["value"] is None: start_ts_holder["value"] = _parse_monitor_start_ts( job_meta, JobMetaKey.START_TIME.value, JobMetaKey.SUBMIT_TIME_ISO.value ) if status in ("RUNNING", "DISPATCHED") and now - state["last_stats_ts"] >= stats_interval: _refresh_monitor_stats(sess, job_id, state, stats_target, key_aliases) state["last_stats_ts"] = now if status != state["last_status"] or now - state["last_emit_ts"] >= emit_interval: if jsonl_mode: _emit_monitor_jsonl_progress(job_id, job_meta, state, now, start, start_ts_holder["value"]) elif quiet_mode: state["last_status"] = status state["last_emit_ts"] = now else: _emit_monitor_progress(job_id, job_meta, state, now, start, start_ts_holder["value"]) return not _is_terminal_job_status(status) return _status_cb def _build_monitor_output_data( job_id: str, meta: dict, start: float, start_ts, cb_state: dict, json_mode: bool ) -> dict: from nvflare.apis.job_def import JobMetaKey meta_duration_s = _parse_monitor_duration_seconds(meta.get("duration") if meta else None) if meta_duration_s is not None: duration = round(meta_duration_s, 1) elif start_ts is not None: duration = round(time.time() - start_ts, 1) else: duration = round(time.time() - start, 1) data = { "job_id": job_id, "status": meta.get("status", "UNKNOWN"), "duration_s": duration, "job_meta": _summarize_monitor_meta(meta, JobMetaKey), "last_stats": cb_state.get("last_stats"), } if json_mode: data["stats_raw"] = cb_state.get("last_stats_raw") return data def _print_monitor_result_human(data: dict, print_func): job_id = data.get("job_id") status = data.get("status", "UNKNOWN") normalized_status = _normalize_monitor_event_status(status) status_text = normalized_status if normalized_status == status else f"{normalized_status} ({status})" print_func(f"Job {job_id} status: {status_text}") duration = data.get("duration_s") if duration is not None: print_func(f" Duration: {duration}s") job_meta = data.get("job_meta") or {} if job_meta.get("job_name"): print_func(f" Name: {job_meta['job_name']}") if job_meta.get("study"): print_func(f" Study: {job_meta['study']}") metrics = data.get("last_stats") or {} if metrics: metric_str = " ".join(f"{k}={v}" for k, v in metrics.items()) print_func(f" Metrics: {metric_str}") def _job_terminal_failure_error(status: str): if status == "FAILED": return ( "JOB_FAILED", "Use 'nvflare job logs <job_id>' and 'nvflare job meta <job_id>' to inspect the failure.", ) if status == "FINISHED_EXCEPTION" or status.startswith("FINISHED:EXECUTION_EXCEPTION"): return ( "JOB_FINISHED_EXCEPTION", "Use 'nvflare job logs <job_id>' and 'nvflare job meta <job_id>' to inspect the failure.", ) if status == "ABORTED" or status.startswith("FINISHED:ABORTED"): return ( "JOB_ABORTED", "Use 'nvflare job meta <job_id>' to see abort details.", ) if status == "ABANDONED" or status.startswith("FINISHED:ABANDONED"): return ( "JOB_ABANDONED", "Use 'nvflare job meta <job_id>' to inspect the abandonment details.", ) if status.startswith("FINISHED:") and not status.startswith("FINISHED:COMPLETED"): return ( "JOB_FAILED", "Use 'nvflare job logs <job_id>' and 'nvflare job meta <job_id>' to inspect the failure.", ) return None def _job_not_found_hint(study: str) -> str: if study == "default": return ( "This command searched study 'default'. " "Use 'nvflare job list --study <study_name>' to check jobs in another study, " "then rerun this command with --study <study_name>." ) return ( f"This command searched study '{study}'. " f"Use 'nvflare job list --study {study}' to verify the job ID, " "or rerun this command with the correct --study value." )
[docs] def cmd_job_monitor(cmd_args): from nvflare.fuel.flare_api.api_spec import ( AuthenticationError, AuthorizationError, JobNotFound, MonitorReturnCode, NoConnection, ) from nvflare.tool.cli_output import ( is_json_mode, is_jsonl_mode, output_error, output_jsonl_event, output_ok, print_human, ) from nvflare.tool.cli_schema import handle_schema_flag handle_schema_flag( job_sub_cmd_parser[CMD_JOB_MONITOR], "nvflare job monitor", [ "nvflare job monitor abc123", "nvflare job monitor abc123 --timeout 3600", "nvflare job monitor abc123 --study cancer", "nvflare job monitor abc123 --timeout 3600 --format jsonl", ], sys.argv[1:], streaming=True, output_modes=["json", "jsonl"], mutating=False, idempotent=True, retry_token=_NO_RETRY_TOKEN_SCHEMA, ) jsonl_mode = is_jsonl_mode() json_mode = is_json_mode() study = get_arg_value(cmd_args, "study", "default") start = time.time() start_ts_holder = {"value": None} timeout = getattr(cmd_args, "timeout", 0) interval = getattr(cmd_args, "interval", 2) if timeout < 0: output_error("INVALID_ARGS", exit_code=4, detail="--timeout must be >= 0") return if interval <= 0: output_error("INVALID_ARGS", exit_code=4, detail="--interval must be > 0") return cb_state = _make_monitor_state() emit_interval = max(interval, 5) stats_interval = max(interval, 10) stats_target = getattr(cmd_args, "stats_target", "server") extra_metrics = [m for m in (getattr(cmd_args, "metrics", None) or []) if m] key_aliases = _build_monitor_key_aliases(extra_metrics) status_cb = _build_monitor_status_callback( start=start, start_ts_holder=start_ts_holder, emit_interval=emit_interval, stats_interval=stats_interval, stats_target=stats_target, key_aliases=key_aliases, jsonl_mode=jsonl_mode, quiet_mode=json_mode, ) try: with _job_session_for_args(cmd_args, study=study) as sess: rc, meta = sess.monitor_job_and_return_job_meta( cmd_args.job_id, timeout=timeout, poll_interval=interval, cb=status_cb, state=cb_state, ) except JobNotFound: output_error( "JOB_NOT_FOUND", job_id=cmd_args.job_id, detail=f"searched study '{study}'", hint=_job_not_found_hint(study), ) return except NoConnection as e: output_error("CONNECTION_FAILED", exit_code=2, detail=str(e)) return except (AuthenticationError, AuthorizationError) as e: output_error("AUTH_FAILED", exit_code=2, detail=str(e)) return except Exception as e: output_error("INTERNAL_ERROR", exit_code=5, detail=str(e)) return if rc == MonitorReturnCode.TIMEOUT: if jsonl_mode: output_jsonl_event( _build_monitor_timeout_event( job_id=cmd_args.job_id, timeout=timeout, start=start, start_ts=start_ts_holder["value"], cb_state=cb_state, ) ) sys.exit(3) output_error( "TIMEOUT", exit_code=3, detail="job did not reach terminal state within timeout", ) return if rc == MonitorReturnCode.ENDED_BY_CB: meta = cb_state.get("last_meta") if meta is None: output_error( "INTERNAL_ERROR", exit_code=5, detail="monitoring stopped before job metadata was available", ) return if not meta: output_error("INTERNAL_ERROR", exit_code=5, detail="monitoring returned no job metadata") return status = meta.get("status", "UNKNOWN") data = _build_monitor_output_data( job_id=cmd_args.job_id, meta=meta, start=start, start_ts=start_ts_holder["value"], cb_state=cb_state, json_mode=json_mode or jsonl_mode, ) failure = _job_terminal_failure_error(status) if failure: error_code, hint = failure if jsonl_mode: output_jsonl_event(_build_monitor_terminal_event(data, error_code=error_code)) sys.exit(1) if not json_mode: _print_monitor_result_human(data, print_human) output_error(error_code, exit_code=1, hint=hint, data=data if json_mode else None, job_id=cmd_args.job_id) else: if jsonl_mode: output_jsonl_event(_build_monitor_terminal_event(data)) return if json_mode: output_ok(data) else: _print_monitor_result_human(data, print_human)
[docs] def cmd_job_wait(cmd_args): from nvflare.apis.job_def import JobMetaKey from nvflare.fuel.flare_api.api_spec import ( AuthenticationError, AuthorizationError, JobNotFound, JobTimeout, NoConnection, ) from nvflare.tool.cli_output import is_json_mode, output_error, output_ok, print_human from nvflare.tool.cli_schema import handle_schema_flag handle_schema_flag( job_sub_cmd_parser[CMD_JOB_WAIT], "nvflare job wait", [ "nvflare job wait abc123", "nvflare job wait abc123 --timeout 3600", "nvflare job wait abc123 --study cancer", ], sys.argv[1:], output_modes=_JSON_OUTPUT_MODES, streaming=False, mutating=False, idempotent=True, retry_token=_NO_RETRY_TOKEN_SCHEMA, ) study = get_arg_value(cmd_args, "study", "default") start = time.time() timeout = getattr(cmd_args, "timeout", 0) interval = getattr(cmd_args, "interval", 2) if timeout < 0: output_error("INVALID_ARGS", exit_code=4, detail="--timeout must be >= 0") return if interval <= 0: output_error("INVALID_ARGS", exit_code=4, detail="--interval must be > 0") return json_mode = is_json_mode() try: with _job_session_for_args(cmd_args, study=study) as sess: if not json_mode: print_human(f"Waiting for job {cmd_args.job_id} in study {study} ...") meta = sess.wait_for_job(cmd_args.job_id, timeout=timeout, poll_interval=interval) except JobTimeout as e: output_error( "TIMEOUT", exit_code=3, detail=str(e), ) return except JobNotFound: output_error( "JOB_NOT_FOUND", job_id=cmd_args.job_id, detail=f"searched study '{study}'", hint=_job_not_found_hint(study), ) return except (AuthenticationError, AuthorizationError) as e: output_error("AUTH_FAILED", exit_code=2, detail=str(e)) return except NoConnection as e: output_error("CONNECTION_FAILED", exit_code=2, detail=str(e)) return except Exception as e: output_error("INTERNAL_ERROR", exit_code=5, detail=str(e)) return if not meta: output_error("INTERNAL_ERROR", exit_code=5, detail="wait returned no job metadata") return meta = dict(meta) meta.setdefault(JobMetaKey.STUDY.value, study) start_ts = _parse_monitor_start_ts( meta, JobMetaKey.START_TIME.value, JobMetaKey.SUBMIT_TIME_ISO.value, ) data = _build_monitor_output_data( job_id=cmd_args.job_id, meta=meta, start=start, start_ts=start_ts, cb_state=_make_monitor_state(), json_mode=json_mode, ) failure = _job_terminal_failure_error(data["status"]) if failure: error_code, hint = failure if not json_mode: _print_monitor_result_human(data, print_human) output_error(error_code, exit_code=1, hint=hint, data=data if json_mode else None, job_id=cmd_args.job_id) else: if json_mode: output_ok(data) else: _print_monitor_result_human(data, print_human)
[docs] def cmd_job_log(cmd_args): from nvflare.fuel.flare_api.api_spec import ( AuthenticationError, AuthorizationError, InternalError, InvalidTarget, JobNotFound, NoConnection, NoReply, ) from nvflare.tool.cli_output import output_error, output_ok, output_usage_error from nvflare.tool.cli_schema import handle_schema_flag invoked_sub_cmd = CMD_JOB_LOG_CONFIG if len(sys.argv) > 2 and sys.argv[1] == "job" and sys.argv[2] in (CMD_JOB_LOG_CONFIG, CMD_JOB_LOG_ALIAS): invoked_sub_cmd = sys.argv[2] handle_schema_flag( job_sub_cmd_parser[CMD_JOB_LOG_CONFIG], f"nvflare job {invoked_sub_cmd}", [ f"nvflare job {invoked_sub_cmd} abc123 DEBUG", f"nvflare job {invoked_sub_cmd} abc123 concise", f"nvflare job {invoked_sub_cmd} abc123 DEBUG --study cancer", ], sys.argv[1:], ) level = getattr(cmd_args, "level", None) site = getattr(cmd_args, "site", "all") study = get_arg_value(cmd_args, "study", "default") if not level: output_usage_error( job_sub_cmd_parser[CMD_JOB_LOG_CONFIG], "provide a valid level name or mode", exit_code=4, error_code="LOG_CONFIG_INVALID", message="Log config is not a recognised log mode.", hint="Supply one of: DEBUG, INFO, WARNING, ERROR, CRITICAL, concise, msg_only, full, verbose, reload.", ) return try: with _job_session_for_args(cmd_args, study=study) as sess: meta = sess.get_job_meta(cmd_args.job_id) job_status = meta.get("status", "UNKNOWN") if meta else "UNKNOWN" if _is_terminal_job_status(job_status): output_error( "JOB_NOT_RUNNING", exit_code=1, job_id=cmd_args.job_id, detail=f"job is in terminal state: {job_status}", ) raise SystemExit(1) sess.configure_job_log(cmd_args.job_id, level, target=site) except (AuthenticationError, AuthorizationError, NoConnection): raise except InternalError as e: output_error("INTERNAL_ERROR", exit_code=5, detail=str(e)) return except InvalidTarget: output_error("SITE_NOT_FOUND", site=site) return except NoReply: output_error("SITE_NOT_FOUND", site=site) return except JobNotFound: output_error( "JOB_NOT_FOUND", job_id=cmd_args.job_id, detail=f"searched study '{study}'", hint=_job_not_found_hint(study), ) return except Exception as e: output_error("INTERNAL_ERROR", exit_code=5, detail=str(e)) return sites = [site] if site != "all" else ["all"] output_ok( { "job_id": cmd_args.job_id, "config": level, "sites": sites, "status": "applied", } )
[docs] def define_job_monitor_parser(job_subparser): p = job_subparser.add_parser(CMD_JOB_MONITOR, help="wait for a job and stream progress to stderr") p.add_argument("job_id", type=str, help="job ID") p.add_argument("--timeout", type=int, default=0, help="seconds to wait (0 = no timeout)") p.add_argument("--interval", type=int, default=2, help="poll interval in seconds") p.add_argument("--study", type=str, default="default", help="study to monitor the job in") p.add_argument( "--stats-target", dest="stats_target", choices=["server", "client", "all"], default="server", help="where to fetch stats from (default: server)", ) p.add_argument( "--metric", dest="metrics", action="append", default=None, help="extra metric key to surface from stats (repeatable)", ) add_startup_kit_selection_args(p) p.add_argument("--schema", action="store_true", help="print command schema as JSON and exit") job_sub_cmd_parser[CMD_JOB_MONITOR] = p job_sub_cmd_handlers[CMD_JOB_MONITOR] = cmd_job_monitor
[docs] def define_job_wait_parser(job_subparser): p = job_subparser.add_parser(CMD_JOB_WAIT, help="wait for a job to finish") p.add_argument("job_id", type=str, help="job ID") p.add_argument("--timeout", type=float, default=0, help="seconds to wait (0 = no timeout)") p.add_argument("--interval", type=float, default=2, help="poll interval in seconds") p.add_argument("--study", type=str, default="default", help="study to wait for the job in") add_startup_kit_selection_args(p) p.add_argument("--schema", action="store_true", help="print command schema as JSON and exit") job_sub_cmd_parser[CMD_JOB_WAIT] = p job_sub_cmd_handlers[CMD_JOB_WAIT] = cmd_job_wait
[docs] def define_job_log_parser(job_subparser): p = job_subparser.add_parser( CMD_JOB_LOG_CONFIG, aliases=[CMD_JOB_LOG_ALIAS], help="change logging configuration for a running job", ) p.add_argument("job_id", type=str, help="job ID") p.add_argument( "level", nargs="?", default=None, help="log level or mode: DEBUG, INFO, WARNING, ERROR, CRITICAL, concise, msg_only, full, verbose, reload", ) p.add_argument("--site", default="all", help="target site name or all") p.add_argument("--study", type=str, default="default", help="study containing the job") add_startup_kit_selection_args(p) p.add_argument("--schema", action="store_true", help="print command schema as JSON and exit") job_sub_cmd_parser[CMD_JOB_LOG_CONFIG] = p job_sub_cmd_parser[CMD_JOB_LOG_ALIAS] = p job_sub_cmd_handlers[CMD_JOB_LOG_CONFIG] = cmd_job_log job_sub_cmd_handlers[CMD_JOB_LOG_ALIAS] = cmd_job_log