# Copyright (c) 2023, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import argparse
import datetime
import json
import os
import re
import shutil
import sys
import time
import traceback
from contextlib import contextmanager
from functools import partial
from tempfile import mkdtemp
from typing import List, Optional, Tuple
from pyhocon import ConfigFactory as CF
from pyhocon import ConfigTree
from nvflare.apis.utils.job_submit_token import validate_submit_token
from nvflare.cli_unknown_cmd_exception import CLIUnknownCmdException
from nvflare.fuel.utils.config import ConfigFormat
from nvflare.fuel.utils.config_factory import ConfigFactory
from nvflare.tool.cli_arg_utils import get_arg_value
from nvflare.tool.cli_session import add_startup_kit_selection_args, new_cli_session, new_cli_session_for_args
from nvflare.tool.job.config.configer import (
build_config_file_indices,
filter_indices,
get_root_index,
merge_configs_from_cli,
)
from nvflare.tool.job.job_client_const import (
CONFIG_FED_CLIENT_CONF,
CONFIG_FED_SERVER_CONF,
CONFIG_FILE_BASE_NAME_WO_EXTS,
DEFAULT_APP_NAME,
JOB_CONFIG_COMP_NAME,
JOB_CONFIG_FILE_NAME,
JOB_CONFIG_VAR_NAME,
JOB_CONFIG_VAR_VALUE,
JOB_INFO_CONF,
JOB_INFO_CONTROLLER_TYPE,
JOB_INFO_CONTROLLER_TYPE_KEY,
JOB_INFO_DESC,
JOB_INFO_DESC_KEY,
JOB_INFO_EXECUTION_API_TYPE,
JOB_INFO_EXECUTION_API_TYPE_KEY,
JOB_INFO_KEYS,
JOB_INFO_MD,
JOB_META_BASE_NAME,
META_APP_NAME,
TEMPLATES_KEY,
)
from nvflare.utils.cli_utils import (
create_job_template_config,
find_job_templates_location,
get_curr_dir,
load_hidden_config_state,
save_config,
)
def _submit_token_arg(value: str) -> str:
try:
return validate_submit_token(value)
except ValueError as ex:
raise argparse.ArgumentTypeError(str(ex)) from ex
CMD_LIST_TEMPLATES = "list-templates"
CMD_SHOW_VARIABLES = "show-variables"
CMD_CREATE_JOB = "create"
CMD_SUBMIT_JOB = "submit"
CMD_JOB_LIST = "list"
CMD_JOB_META = "meta"
CMD_JOB_ABORT = "abort"
CMD_JOB_CLONE = "clone"
CMD_JOB_DOWNLOAD = "download"
CMD_JOB_DELETE = "delete"
# Job observability commands
CMD_JOB_STATS = "stats"
CMD_JOB_LOGS = "logs"
# Job lifecycle helpers
CMD_JOB_MONITOR = "monitor"
CMD_JOB_WAIT = "wait"
CMD_JOB_LOG_CONFIG = "log-config"
CMD_JOB_LOG_ALIAS = "log"
_JSON_OUTPUT_MODES = ["json"]
_JOB_SUBMIT_RETRY_TOKEN_SCHEMA = {
"supported": True,
"flag": "--submit-token",
"scope": "study + submitter + token",
"retry_safe_when_present": True,
"effect": (
"Deduplicates retries for identical submitted job content in the same study by the same submitter; "
"different content with the same token is rejected."
),
}
_NO_RETRY_TOKEN_SCHEMA = {"supported": False}
_JOB_HELP_FORMATTER = partial(argparse.HelpFormatter, max_help_position=24, width=120)
_ACTIVE_STARTUP_KIT_HINT = (
"Run 'nvflare config list' and 'nvflare config use <id>', pass --kit-id <id> or --startup-kit <path>, "
"or set NVFLARE_STARTUP_KIT_DIR for automation."
)
_DEFAULT_JOB_LOG_TAIL_LINES = 500
_JOB_LOG_TS_RE = re.compile(r"^\[?(?P<ts>\d{4}-\d{2}-\d{2}[ T]\d{2}:\d{2}:\d{2}(?:[\.,]\d+)?)")
_JOB_DOWNLOAD_GLOBAL_MODEL_NAMES = (
"FL_global_model.pt",
"global_model.pt",
"global_model.pth",
"best_FL_global_model.pt",
"best_global_model.pt",
"model_global.json",
"model_global.joblib",
)
_JOB_DOWNLOAD_EXPECTED_ARTIFACTS = ("global_model", "metrics_summary", "client_logs")
def _non_negative_int(value: str) -> int:
try:
parsed = int(value)
except ValueError as e:
raise argparse.ArgumentTypeError(str(e))
if parsed < 0:
raise argparse.ArgumentTypeError("value must be >= 0")
return parsed
[docs]
def find_filename_basename(f: str):
basename = os.path.basename(f)
if "." in basename:
return os.path.splitext(basename)[0]
else:
return basename
[docs]
def build_job_template_indices(job_templates_dir: str) -> ConfigTree:
conf = CF.parse_string("{ templates = {} }")
config_file_base_names = CONFIG_FILE_BASE_NAME_WO_EXTS
template_conf = conf.get(TEMPLATES_KEY)
keys = JOB_INFO_KEYS
for f in os.listdir(job_templates_dir):
template_path = os.path.join(job_templates_dir, f)
if os.path.isdir(template_path):
for _, _, files in os.walk(template_path):
config_files = [f for f in files if find_filename_basename(f) in config_file_base_names]
if len(config_files) > 0:
info_conf = get_template_info_config(template_path)
for key in keys:
value = info_conf.get(key, "NA") if info_conf else "NA"
template_name = os.path.basename(f)
template_conf.put(f"{template_name}.{key}", value)
return conf
[docs]
def get_template_info_config(template_dir):
info_conf_path = os.path.join(template_dir, JOB_INFO_CONF)
return CF.parse_file(info_conf_path) if os.path.isfile(info_conf_path) else None
[docs]
def get_app_dirs_from_template(template_dir):
app_dirs = []
for root, _dirs, files in os.walk(template_dir):
if root != template_dir and (CONFIG_FED_SERVER_CONF in files or CONFIG_FED_CLIENT_CONF in files):
app_dirs.append(root)
return app_dirs
[docs]
def get_app_dirs_from_job_folder(job_folder):
app_dirs = []
for root, _dirs, _files in os.walk(job_folder):
if root != job_folder and (root.endswith("config") or root.endswith("custom")):
dir_name = os.path.dirname(os.path.relpath(root, job_folder))
if dir_name:
app_dirs.append(dir_name)
return app_dirs
[docs]
def create_job(cmd_args):
from nvflare.tool.cli_schema import handle_schema_flag
handle_schema_flag(
job_sub_cmd_parser[CMD_CREATE_JOB],
"nvflare job create",
[],
sys.argv[1:],
deprecated=True,
deprecated_message="Use 'python job.py --export --export-dir <job_folder>' + 'nvflare job submit -j <job_folder>' instead.",
)
from nvflare.tool.cli_output import print_human
print_human(
"WARNING: 'nvflare job create' is deprecated. Use 'python job.py --export --export-dir <job_folder>' + "
"'nvflare job submit -j <job_folder>' instead. Run 'nvflare recipe list' to see available recipes."
)
try:
template_src = get_src_template(cmd_args)
if not template_src:
template_src = get_src_template_by_name(cmd_args)
app_dirs = get_app_dirs_from_template(str(template_src).strip())
app_names = [os.path.basename(f) for f in app_dirs]
app_names = app_names if app_names else [DEFAULT_APP_NAME]
job_folder = cmd_args.job_folder
prepare_job_folder(cmd_args)
app_custom_dirs = prepare_app_dirs(job_folder, app_names)
prepare_app_scripts(job_folder, app_custom_dirs, cmd_args)
config_dirs = get_config_dirs(job_folder, app_names)
fmt, real_config_path = ConfigFactory.search_config_format(CONFIG_FED_CLIENT_CONF, config_dirs)
if real_config_path and not cmd_args.force:
from nvflare.tool.cli_output import print_human
print_human(
f"""\nwarning: configuration files:\n
{"config_fed_server.[json|conf|yml]"} already exists.
\nNot generating the config files. If you would like to overwrite, use -force option"""
)
return
template_srcs = {}
if not app_dirs:
template_srcs[DEFAULT_APP_NAME] = template_src
else:
for app_dir in app_dirs:
app_name = os.path.basename(app_dir)
template_srcs[app_name] = app_dir
for app_name in template_srcs:
src = template_srcs[app_name]
app_config_dir = get_config_dir(job_folder, app_name)
shutil.copytree(src=src, dst=app_config_dir, dirs_exist_ok=True)
remove_extra_files(app_config_dir)
prepare_meta_config(cmd_args, template_src, app_names)
app_variable_values = prepare_job_config(cmd_args, app_names)
display_template_variables(job_folder, app_variable_values)
except ValueError as e:
from nvflare.tool.cli_output import output_usage_error, print_human
if cmd_args.debug:
print_human(traceback.format_exc())
output_usage_error(job_sub_cmd_parser[CMD_CREATE_JOB], detail=str(e), exit_code=4)
[docs]
def get_src_template_by_name(cmd_args):
job_templates_dir = find_job_templates_location()
template_index_conf = build_job_template_indices(job_templates_dir)
target_template_name = cmd_args.template
check_template_exists(target_template_name, template_index_conf)
template_src = os.path.join(job_templates_dir, target_template_name)
return template_src
[docs]
def get_src_template(cmd_args) -> Optional[str]:
target_template = os.path.abspath(cmd_args.template)
if os.path.isdir(target_template):
info_file = os.path.join(target_template, JOB_INFO_CONF)
if os.path.isfile(info_file):
return target_template
return None
[docs]
def remove_pycache_files(custom_dir):
for root, dirs, _files in os.walk(custom_dir):
# remove pycache and pyc files
for d in dirs:
if d == "__pycache__" or d.endswith(".pyc"):
shutil.rmtree(os.path.join(root, d))
[docs]
def show_variables(cmd_args):
from nvflare.tool.cli_schema import handle_schema_flag
handle_schema_flag(
job_sub_cmd_parser[CMD_SHOW_VARIABLES],
"nvflare job show-variables",
[],
sys.argv[1:],
deprecated=True,
deprecated_message="Use the Job Recipe API instead.",
)
from nvflare.tool.cli_output import print_human
if getattr(cmd_args, "job_sub_cmd", None) == "show_variables":
print_human("WARNING: 'nvflare job show_variables' is deprecated; use 'nvflare job show-variables' instead.")
print_human("WARNING: 'nvflare job show-variables' is deprecated. Use the Job Recipe API instead.")
try:
if not os.path.isdir(cmd_args.job_folder):
raise ValueError("required job folder is not specified.")
app_dirs = get_app_dirs_from_job_folder(cmd_args.job_folder)
app_names = [os.path.basename(f) for f in app_dirs]
app_names = app_names if app_names else [DEFAULT_APP_NAME]
indices = build_config_file_indices(cmd_args.job_folder, app_names)
variable_values = filter_indices(app_indices_configs=indices)
display_template_variables(cmd_args.job_folder, variable_values)
except ValueError as e:
from nvflare.tool.cli_output import output_usage_error, print_human
if cmd_args.debug:
print_human(traceback.format_exc())
output_usage_error(job_sub_cmd_parser[CMD_SHOW_VARIABLES], detail=str(e), exit_code=4)
[docs]
def check_template_exists(target_template_name, template_index_conf):
targets = [os.path.basename(key) for key in template_index_conf.get("templates").keys()]
found = target_template_name in targets
if not found:
raise ValueError(
f"Invalid template name {target_template_name}, "
f"please check the available templates using nvflare job list-templates"
)
[docs]
def display_template_variables(job_folder, app_variable_values):
from nvflare.tool.cli_output import print_human
print_human("\nThe following are the variables you can change in the template\n")
total_length = 135
left_margin = 1
print_human("-" * total_length)
job_folder_header = fix_length_format(f"job folder: {job_folder}", total_length)
print_human(" " * total_length)
print_human(" " * left_margin, job_folder_header)
print_human(" " * total_length)
print_human("-" * total_length)
file_name_fix_length = 30
var_name_fix_length = 30
var_value_fix_length = 35
var_comp_fix_length = 35
file_name = fix_length_format(JOB_CONFIG_FILE_NAME, file_name_fix_length)
var_name = fix_length_format(JOB_CONFIG_VAR_NAME, var_name_fix_length)
var_value = fix_length_format(JOB_CONFIG_VAR_VALUE, var_value_fix_length)
var_comp = fix_length_format(JOB_CONFIG_COMP_NAME, var_comp_fix_length)
print_human(" " * left_margin, file_name, var_name, var_value, var_comp)
print_human("-" * total_length)
for app_name, variable_values in app_variable_values.items():
if app_name != DEFAULT_APP_NAME and app_name != META_APP_NAME:
app_header = fix_length_format(f"app: {app_name}", total_length)
print_human(" " * left_margin, app_header)
print_human(" " * total_length)
for file in sorted(variable_values.keys()):
indices = variable_values.get(file)
file_name = os.path.basename(file)
file_name = fix_length_format(file_name, file_name_fix_length)
key_indices = indices
for index in sorted(key_indices.keys()):
key_index = key_indices[index]
var_name = fix_length_format(index, var_name_fix_length)
var_value = fix_length_format(str(key_index.value), var_value_fix_length)
var_comp = " " if key_index.component_name is None else key_index.component_name
var_comp = fix_length_format(var_comp, var_comp_fix_length)
print_human(" " * left_margin, file_name, var_name, var_value, var_comp)
print_human("")
print_human("-" * total_length)
[docs]
def list_templates(cmd_args):
from nvflare.tool.cli_schema import handle_schema_flag
handle_schema_flag(
job_sub_cmd_parser[CMD_LIST_TEMPLATES],
"nvflare job list-templates",
[],
sys.argv[1:],
deprecated=True,
deprecated_message="Use 'nvflare recipe list' instead.",
)
from nvflare.tool.cli_output import print_human
if getattr(cmd_args, "job_sub_cmd", None) == "list_templates":
print_human("WARNING: 'nvflare job list_templates' is deprecated; use 'nvflare job list-templates' instead.")
print_human("WARNING: 'nvflare job list-templates' is deprecated. Use 'nvflare recipe list' instead.")
try:
job_templates_dir = find_job_templates_location(cmd_args.job_templates_dir)
job_templates_dir = os.path.abspath(job_templates_dir)
template_index_conf = build_job_template_indices(job_templates_dir)
display_available_templates(template_index_conf)
if job_templates_dir:
update_job_templates_dir(job_templates_dir)
except ValueError as e:
from nvflare.tool.cli_output import output_usage_error, print_human
if cmd_args.debug:
print_human(traceback.format_exc())
output_usage_error(job_sub_cmd_parser[CMD_LIST_TEMPLATES], detail=str(e), exit_code=4)
[docs]
def update_job_templates_dir(job_templates_dir: str):
config_file_path, nvflare_config, _migration_needed = load_hidden_config_state()
if nvflare_config is None:
from pyhocon import ConfigFactory as CF
nvflare_config = CF.parse_string("{}")
config = create_job_template_config(nvflare_config, job_templates_dir)
save_config(config, config_file_path)
[docs]
def display_available_templates(template_index_conf):
from nvflare.tool.cli_output import print_human
print_human("\nThe following job templates are available: \n")
template_registry = template_index_conf.get("templates")
total_length = 120
left_margin = 1
print_human("-" * total_length)
name_fix_length = 20
description_fix_length = 60
controller_type_fix_length = 17
execution_api_type_fix_length = 23
name = fix_length_format("name", name_fix_length)
description = fix_length_format(JOB_INFO_DESC, description_fix_length)
execution_api_type = fix_length_format(JOB_INFO_EXECUTION_API_TYPE, execution_api_type_fix_length)
controller_type = fix_length_format(JOB_INFO_CONTROLLER_TYPE, controller_type_fix_length)
print_human(" " * left_margin, name, description, controller_type, execution_api_type)
print_human("-" * total_length)
for file_path in sorted(template_registry.keys()):
name = os.path.basename(file_path)
template_info = template_registry.get(file_path, None)
if not template_info:
template_info = template_registry.get(name)
name = fix_length_format(name, name_fix_length)
description = fix_length_format(template_info.get(JOB_INFO_DESC_KEY), description_fix_length)
execution_api_type = fix_length_format(
template_info.get(JOB_INFO_EXECUTION_API_TYPE_KEY),
execution_api_type_fix_length,
)
controller_type = fix_length_format(template_info.get(JOB_INFO_CONTROLLER_TYPE_KEY), controller_type_fix_length)
print_human(" " * left_margin, name, description, controller_type, execution_api_type)
print_human("-" * total_length)
[docs]
def submit_job(cmd_args):
from nvflare.tool.cli_schema import handle_schema_flag
if job_sub_cmd_parser[CMD_SUBMIT_JOB] is None:
root_parser = argparse.ArgumentParser(prog="nvflare job")
root_subparser = root_parser.add_subparsers(dest="job_sub_cmd")
define_submit_job_parser(root_subparser)
handle_schema_flag(
job_sub_cmd_parser[CMD_SUBMIT_JOB],
"nvflare job submit",
[
"nvflare config use admin@nvidia.com",
"nvflare job submit -j ./my_job",
"nvflare job submit -j ./my_job --submit-token retry-001",
],
sys.argv[1:],
output_modes=_JSON_OUTPUT_MODES,
streaming=False,
mutating=True,
idempotent=False,
retry_token=_JOB_SUBMIT_RETRY_TOKEN_SCHEMA,
)
def _has_job_meta(path: str) -> bool:
for ext in (".json", ".conf", ".yml", ".yaml"):
if os.path.isfile(os.path.join(path, f"meta{ext}")):
return True
return False
def _has_server_config(path: str) -> bool:
config_dir = os.path.join(path, "app", "config")
for ext in (".json", ".conf", ".yml", ".yaml"):
if os.path.isfile(os.path.join(config_dir, f"config_fed_server{ext}")):
return True
return False
def _resolve_job_folder(path: str) -> str:
if _has_job_meta(path) and _has_server_config(path):
return path
subdirs = []
for name in os.listdir(path):
if name.startswith("."):
continue
full = os.path.join(path, name)
if os.path.isdir(full):
subdirs.append(full)
if len(subdirs) == 1:
candidate = subdirs[0]
if _has_job_meta(candidate) and _has_server_config(candidate):
from nvflare.tool.cli_output import is_json_mode, print_human
if not is_json_mode():
print_human(f"Using job folder: {candidate}")
return candidate
return path
temp_job_dir = None
try:
if not os.path.isdir(cmd_args.job_folder):
raise ValueError(f"invalid job folder: {cmd_args.job_folder}")
job_folder = _resolve_job_folder(cmd_args.job_folder)
temp_job_dir = mkdtemp()
shutil.copytree(job_folder, temp_job_dir, dirs_exist_ok=True)
app_dirs = get_app_dirs_from_job_folder(job_folder)
app_names = [os.path.basename(f) for f in app_dirs]
app_names = app_names if app_names else [DEFAULT_APP_NAME]
prepare_job_config(cmd_args, app_names, temp_job_dir)
internal_submit_job(None, None, temp_job_dir, cmd_args)
except ValueError as e:
from nvflare.tool.cli_output import output_usage_error, print_human
if cmd_args.debug:
print_human(traceback.format_exc())
output_usage_error(job_sub_cmd_parser[CMD_SUBMIT_JOB], detail=str(e), exit_code=4)
finally:
if temp_job_dir:
if cmd_args.debug:
from nvflare.tool.cli_output import print_human
print_human(f"in debug mode, job configurations can be examined in temp job directory '{temp_job_dir}'")
else:
shutil.rmtree(temp_job_dir)
def _resolve_admin_user_and_dir_from_startup_kit(
startup_kit_dir: str,
) -> Tuple[str, str]:
from nvflare.tool.kit.kit_config import resolve_admin_user_and_dir_from_startup_kit
return resolve_admin_user_and_dir_from_startup_kit(startup_kit_dir)
[docs]
def internal_submit_job(admin_user_dir, username, temp_job_dir, cmd_args=None):
from nvflare.fuel.flare_api.api_spec import (
AuthorizationError,
InternalError,
InvalidJobDefinition,
NoConnection,
SubmitTokenConflict,
SubmitTokenJobDeleted,
)
from nvflare.tool.cli_output import is_json_mode, output_error, output_ok, print_human
if not is_json_mode():
print_human("trying to connect to the server")
study = getattr(cmd_args, "study", "default") if cmd_args else "default"
submit_token = getattr(cmd_args, "submit_token", None) if cmd_args else None
sess = _get_session(args=cmd_args, admin_user_dir=admin_user_dir, username=username, study=study)
try:
try:
job_id = sess.submit_job(temp_job_dir, submit_token=submit_token)
except InvalidJobDefinition as e:
output_error("JOB_INVALID", exit_code=1, detail=str(e))
raise SystemExit(1)
except SubmitTokenConflict as e:
output_error(
"SUBMIT_TOKEN_CONFLICT",
exit_code=4,
detail=str(e),
hint="Use a new submit token when submitting different job content.",
data={"existing_job_id": e.existing_job_id} if e.existing_job_id else None,
)
raise SystemExit(4)
except SubmitTokenJobDeleted as e:
data = {"job_id": e.job_id, "state": e.state, "deleted_time": e.deleted_time}
output_error(
"SUBMIT_TOKEN_JOB_DELETED",
exit_code=4,
detail=str(e),
data={key: value for key, value in data.items() if value is not None},
)
raise SystemExit(4)
except AuthorizationError as e:
output_error("AUTH_FAILED", exit_code=2, detail=str(e))
raise SystemExit(2)
except InternalError as e:
output_error("INTERNAL_ERROR", exit_code=5, detail=str(e))
raise SystemExit(5)
except NoConnection as e:
output_error("CONNECTION_FAILED", exit_code=2, detail=str(e))
raise SystemExit(2)
output_ok({"job_id": job_id})
finally:
sess.close()
job_sub_cmd_handlers = {
CMD_CREATE_JOB: create_job,
CMD_SUBMIT_JOB: submit_job,
CMD_LIST_TEMPLATES: list_templates,
CMD_SHOW_VARIABLES: show_variables,
CMD_JOB_LIST: None,
CMD_JOB_META: None,
CMD_JOB_ABORT: None,
CMD_JOB_CLONE: None,
CMD_JOB_DOWNLOAD: None,
CMD_JOB_DELETE: None,
CMD_JOB_STATS: None,
CMD_JOB_LOGS: None,
CMD_JOB_MONITOR: None,
CMD_JOB_WAIT: None,
CMD_JOB_LOG_CONFIG: None,
}
job_sub_cmd_parser = {
CMD_CREATE_JOB: None,
CMD_SUBMIT_JOB: None,
CMD_LIST_TEMPLATES: None,
CMD_SHOW_VARIABLES: None,
CMD_JOB_LIST: None,
CMD_JOB_META: None,
CMD_JOB_ABORT: None,
CMD_JOB_CLONE: None,
CMD_JOB_DOWNLOAD: None,
CMD_JOB_DELETE: None,
CMD_JOB_STATS: None,
CMD_JOB_LOGS: None,
CMD_JOB_MONITOR: None,
CMD_JOB_WAIT: None,
CMD_JOB_LOG_CONFIG: None,
}
[docs]
def handle_job_cli_cmd(cmd_args):
sub_cmd = {
"list_templates": CMD_LIST_TEMPLATES,
"show_variables": CMD_SHOW_VARIABLES,
}.get(cmd_args.job_sub_cmd, cmd_args.job_sub_cmd)
cmd_args.job_sub_cmd = sub_cmd
job_cmd_handler = job_sub_cmd_handlers.get(sub_cmd, None)
if job_cmd_handler:
job_cmd_handler(cmd_args)
elif cmd_args.job_sub_cmd is None:
raise CLIUnknownCmdException("\n no job subcommand provided. \n")
else:
raise CLIUnknownCmdException("\n invalid command. \n")
[docs]
def def_job_cli_parser(sub_cmd):
cmd = "job"
parser = sub_cmd.add_parser(
cmd,
help="submit, manage, and monitor FL jobs",
formatter_class=_JOB_HELP_FORMATTER,
)
job_subparser = parser.add_subparsers(title="job subcommands", metavar="", dest="job_sub_cmd")
define_submit_job_parser(job_subparser)
define_job_monitor_parser(job_subparser)
define_job_wait_parser(job_subparser)
define_list_jobs_parser(job_subparser)
define_abort_job_parser(job_subparser)
define_job_meta_parser(job_subparser)
define_job_logs_parser(job_subparser)
define_job_log_parser(job_subparser)
define_job_stats_parser(job_subparser)
define_download_job_parser(job_subparser)
define_clone_job_parser(job_subparser)
define_delete_job_parser(job_subparser)
define_list_templates_parser(job_subparser)
define_create_job_parser(job_subparser)
define_variables_parser(job_subparser)
return {cmd: parser}
[docs]
def define_submit_job_parser(job_subparser):
submit_parser = job_subparser.add_parser("submit", help="submit job")
submit_parser.add_argument(
"-j",
"--job-folder",
"--job_folder", # backward compat
dest="job_folder",
type=str,
nargs="?",
default=os.path.join(get_curr_dir(), "current_job"),
help="job folder path, default to ./current_job directory",
)
submit_parser.add_argument("-debug", "--debug", action="store_true", help="debug is on")
submit_parser.add_argument("--study", type=str, default="default", help="study to submit the job to")
submit_parser.add_argument(
"--submit-token",
type=_submit_token_arg,
default=None,
help="retry-safe submit token scoped by study and submitter",
)
add_startup_kit_selection_args(submit_parser)
submit_parser.add_argument("--schema", action="store_true", help="print command schema as JSON and exit")
job_sub_cmd_parser[CMD_SUBMIT_JOB] = submit_parser
[docs]
def define_list_templates_parser(job_subparser):
show_jobs_parser = job_subparser.add_parser(
CMD_LIST_TEMPLATES,
aliases=["list_templates"],
help="[DEPRECATED] use 'nvflare recipe list'",
)
show_jobs_parser.add_argument(
"-d",
"--job_templates_dir",
type=str,
nargs="?",
default=None,
help="Job template directory, if not specified, "
"will search from ~/.nvflare/config.conf and NVFLARE_HOME env. variables",
)
show_jobs_parser.add_argument("-debug", "--debug", action="store_true", help="debug is on")
show_jobs_parser.add_argument("--schema", action="store_true", help="print command schema as JSON and exit")
job_sub_cmd_parser[CMD_LIST_TEMPLATES] = show_jobs_parser
[docs]
def define_variables_parser(job_subparser):
show_variables_parser = job_subparser.add_parser(
CMD_SHOW_VARIABLES,
aliases=["show_variables"],
help="[DEPRECATED] use 'nvflare recipe list' or the Job Recipe API",
)
show_variables_parser.add_argument(
"-j",
"--job-folder",
"--job_folder",
type=str,
nargs="?",
default=os.path.join(get_curr_dir(), "current_job"),
help="job folder path, default to ./current_job directory",
)
show_variables_parser.add_argument("-debug", "--debug", action="store_true", help="debug is on")
show_variables_parser.add_argument("--schema", action="store_true", help="print command schema as JSON and exit")
job_sub_cmd_parser[CMD_SHOW_VARIABLES] = show_variables_parser
[docs]
def define_create_job_parser(job_subparser):
create_parser = job_subparser.add_parser(
"create",
help="[DEPRECATED] use 'python job.py --export --export-dir <job_folder>' + 'nvflare job submit -j <job_folder>'",
)
create_parser.add_argument(
"-j",
"--job_folder",
type=str,
nargs="?",
default=os.path.join(get_curr_dir(), "current_job"),
help="job_folder path, default to ./current_job directory",
)
create_parser.add_argument(
"-w",
"--template",
type=str,
nargs="?",
default="sag_pt",
help="""template name or template folder. You can use list-templates to see available jobs from job templates,
pick name such as 'sag_pt' as template name.
Alternatively, you can use the path to the job template folder, such as job_templates/sag_pt
""",
)
create_parser.add_argument(
"-sd",
"--script_dir",
type=str,
nargs="?",
help="""script directory contains additional related files.
All files or directories under this directory will be copied over
to the custom directory.""",
)
create_parser.add_argument(
"-f",
"--config_file",
type=str,
action="append",
nargs="*",
help="""Training config file with corresponding optional key=value pairs.
If key presents in the preceding config file, the value in the config
file will be overwritten by the new value """,
)
create_parser.add_argument("-debug", "--debug", action="store_true", help="debug is on")
create_parser.add_argument(
"-force",
"--force",
action="store_true",
help="force create is on, if -force, " "overwrite existing configuration with newly created configurations",
)
create_parser.add_argument("--schema", action="store_true", help="print command schema as JSON and exit")
job_sub_cmd_parser[CMD_CREATE_JOB] = create_parser
[docs]
def prepare_job_config(cmd_args, app_names: List[str], tmp_job_dir: Optional[str] = None):
merged_conf, config_modified = merge_configs_from_cli(cmd_args, app_names)
need_save_config = config_modified is True or tmp_job_dir is not None
if tmp_job_dir is None:
tmp_job_dir = cmd_args.job_folder
if need_save_config:
save_merged_configs(merged_conf, cmd_args.job_folder, tmp_job_dir)
variable_values = filter_indices(merged_conf)
return variable_values
[docs]
def has_client_config_file(app_config_dir):
return any(
[
os.path.exists(os.path.join(app_config_dir, f"config_fed_client{postfix}"))
for postfix in ConfigFormat.extensions()
]
)
[docs]
def save_merged_configs(app_merged_conf, job_folder, tmp_job_dir):
for app_name, merged_conf in app_merged_conf.items():
for file, (config, excluded_key_List, key_indices) in merged_conf.items():
if job_folder == tmp_job_dir:
dst_path = file
else:
rel_file_path = os.path.relpath(file, job_folder)
dst_path = os.path.join(tmp_job_dir, rel_file_path)
root_index = get_root_index(next(iter(key_indices.values()))[0])
save_config(root_index.value, dst_path)
[docs]
def load_default_config_template(config_file_name: str):
file_dir = os.path.dirname(__file__)
# src config here is always pyhocon
config_template = CF.parse_file(os.path.join(file_dir, f"config/{config_file_name}"))
return config_template
[docs]
def dst_app_path(job_folder: str, app_name="app"):
return os.path.join(job_folder, app_name)
[docs]
def dst_config_path(job_folder, config_filename, app_name: str = "app"):
config_dir = get_config_dir(job_folder, app_name)
dst_path = os.path.join(config_dir, config_filename)
return dst_path
[docs]
def get_config_dirs(job_folder: str, app_names: List[str]) -> List[str]:
config_dirs = []
if app_names:
for app_name in app_names:
config_dirs.append(get_config_dir(job_folder, app_name))
else:
config_dirs.append(get_config_dir(job_folder, "app"))
return config_dirs
[docs]
def get_config_dir(job_folder: str, app_name: str) -> str:
app_dir = dst_app_path(job_folder, app_name)
config_dir = os.path.join(app_dir, "config")
return config_dir
[docs]
def convert_args_list_to_dict(kvs: Optional[List[str]] = None) -> dict:
"""
Convert a list of key-value strings to a dictionary.
Args:
kvs (Optional[List[str]]): A list of key-value strings in the format "key=value".
Returns:
dict: A dictionary containing the key-value pairs from the input list.
"""
kv_dict = {}
if kvs:
for kv in kvs:
try:
key, value = kv.split("=", 1)
kv_dict[key.strip()] = value.strip()
except ValueError:
raise ValueError(f"Invalid key-value pair: '{kv}'")
return kv_dict
[docs]
def prepare_job_folder(cmd_args):
job_folder = cmd_args.job_folder
if job_folder:
if not os.path.exists(job_folder):
os.makedirs(job_folder)
elif not os.path.isdir(job_folder):
raise ValueError(f"job_folder '{job_folder}' exits but not directory")
elif cmd_args.force:
shutil.rmtree(job_folder)
os.makedirs(job_folder)
app_folder = os.path.join(cmd_args.job_folder, "app")
if os.path.exists(app_folder):
if cmd_args.force:
shutil.rmtree(app_folder)
else:
from nvflare.tool.cli_output import print_human
print_human(
"""\nwarning: app directory already exists.
\nIf you would like to overwrite, use -force option"""
)
[docs]
def is_subdir(path, directory):
# Normalize the paths to avoid issues with different OS formats
path = os.path.realpath(path)
directory = os.path.realpath(directory)
# Check if the directory is a prefix of the path
return os.path.commonpath([path, directory]) == directory
[docs]
def prepare_app_scripts(job_folder, app_custom_dirs, cmd_args):
script_dir = cmd_args.script_dir
for app_custom_dir in app_custom_dirs:
if script_dir and len(script_dir.strip()) > 0:
if os.path.exists(script_dir):
if script_dir == job_folder or is_subdir(job_folder, script_dir):
raise ValueError("job_folder must not be the same or sub directory of script_dir")
shutil.copytree(cmd_args.script_dir, app_custom_dir, dirs_exist_ok=True)
remove_pycache_files(app_custom_dir)
else:
raise ValueError(f"{cmd_args.script_dir} doesn't exists")
[docs]
def prepare_app_dirs(job_folder: str, app_names: List[str]) -> List[str]:
app_names = ["app"] if not app_names else app_names
app_custom_dirs = []
for app_name in app_names:
app_custom_dir = create_app_dir(job_folder=job_folder, app_name=app_name)
app_custom_dirs.append(app_custom_dir)
return app_custom_dirs
[docs]
def create_app_dir(job_folder, app_name: str = "app"):
app_dir = os.path.join(job_folder, app_name)
app_config_dir = os.path.join(app_dir, "config")
app_custom_dir = os.path.join(app_dir, "custom")
dirs = [app_dir, app_config_dir, app_custom_dir]
for d in dirs:
os.makedirs(d, exist_ok=True)
return app_custom_dir
# ---------------------------------------------------------------------------
# Section 3: New Job Lifecycle Commands
# ---------------------------------------------------------------------------
def _get_session(args=None, admin_user_dir=None, username=None, study="default"):
"""Create a secure session using command selectors, env, or active startup kit."""
from nvflare.tool.cli_output import get_connect_timeout, output_error
timeout = get_connect_timeout()
if admin_user_dir is None and username is None:
try:
return new_cli_session_for_args(
args=args,
timeout=timeout,
study=study,
debug=get_arg_value(args, "debug", False),
)
except ValueError as e:
output_error(
"STARTUP_KIT_MISSING",
exit_code=2,
detail=str(e),
hint=getattr(e, "hint", None) or _ACTIVE_STARTUP_KIT_HINT,
)
raise SystemExit(2)
if admin_user_dir is None or username is None:
try:
from nvflare.tool.cli_session import resolve_admin_user_and_dir_for_args
u, d = resolve_admin_user_and_dir_for_args(args)
except ValueError as e:
output_error(
"STARTUP_KIT_MISSING",
exit_code=2,
detail=str(e),
hint=getattr(e, "hint", None) or _ACTIVE_STARTUP_KIT_HINT,
)
raise SystemExit(2)
if username is None:
username = u
if admin_user_dir is None:
admin_user_dir = d
return new_cli_session(
username=username,
startup_kit_location=admin_user_dir,
timeout=timeout,
study=study,
)
@contextmanager
def _session(args=None, admin_user_dir=None, username=None, study="default"):
sess = _get_session(args=args, admin_user_dir=admin_user_dir, username=username, study=study)
try:
yield sess
finally:
if sess is not None:
sess.close()
def _has_scoped_startup_kit_args(args) -> bool:
return bool(get_arg_value(args, "kit_id") or get_arg_value(args, "startup_kit"))
def _job_session_for_args(cmd_args=None, study="default"):
if study != "default" or _has_scoped_startup_kit_args(cmd_args):
return _session(args=cmd_args, study=study)
return _session()
def _job_download_destination(job_id: str, output_dir: Optional[str]) -> str:
if not output_dir:
# download_job_result treats destination as the parent directory and moves
# the downloaded job folder under it. Passing cwd gives the default
# final local path ./<job_id> without nesting ./<job_id>/<job_id>.
output_dir = "."
return os.path.abspath(output_dir)
def _is_remote_download_location(location: str) -> bool:
return bool(location and re.match(r"^[A-Za-z][A-Za-z0-9+.-]*://", location))
def _local_download_path(location: str) -> Optional[str]:
if not location:
return None
location = str(location)
if _is_remote_download_location(location):
return None
return os.path.abspath(location)
def _is_path_within(root_real_path: str, candidate_path: str) -> bool:
try:
return os.path.commonpath([root_real_path, os.path.realpath(candidate_path)]) == root_real_path
except ValueError:
return False
def _iter_files_under(path: str):
if not path or not os.path.isdir(path):
return
root_real_path = os.path.realpath(path)
for root, dirs, files in os.walk(path):
# Keep the manifest local even if a symlink or concurrent filesystem change
# points a visited entry outside the requested download tree.
dirs[:] = sorted(
d
for d in dirs
if not os.path.islink(os.path.join(root, d)) and _is_path_within(root_real_path, os.path.join(root, d))
)
for file_name in sorted(files):
file_path = os.path.join(root, file_name)
if os.path.islink(file_path) or not _is_path_within(root_real_path, file_path):
continue
yield file_path
return None
def _is_identifiable_server_log(rel_parts: List[str]) -> bool:
return any(part.lower() in {"server", "app_server"} for part in rel_parts[:-1])
def _site_name_from_log_path(download_path: str, log_path: str) -> Optional[str]:
try:
rel_path = os.path.relpath(log_path, download_path)
except ValueError:
return None
rel_parts = rel_path.split(os.sep)
if len(rel_parts) < 2 or _is_identifiable_server_log(rel_parts):
return None
parent_parts = rel_parts[:-1]
for part in parent_parts:
if part.startswith("app_site-"):
return part[len("app_") :]
for part in parent_parts:
if part.startswith("site-"):
return part
parent = parent_parts[-1]
if parent.lower() in {"local", "logs", "simulate_job", "startup", "workspace"}:
return None
return parent
def _discover_job_download_artifacts(download_path: str) -> Tuple[dict, List[str]]:
artifacts = {}
client_logs = {}
for file_path in _iter_files_under(download_path):
file_name = os.path.basename(file_path)
if file_name in _JOB_DOWNLOAD_GLOBAL_MODEL_NAMES and "global_model" not in artifacts:
artifacts["global_model"] = file_path
continue
elif file_name == "metrics_summary.json" and "metrics_summary" not in artifacts:
artifacts["metrics_summary"] = file_path
continue
elif file_name != "log.txt":
continue
site_name = _site_name_from_log_path(download_path, file_path)
if site_name and site_name not in client_logs:
client_logs[site_name] = file_path
if client_logs:
artifacts["client_logs"] = client_logs
missing_artifacts = [name for name in _JOB_DOWNLOAD_EXPECTED_ARTIFACTS if name not in artifacts]
return artifacts, missing_artifacts
[docs]
def cmd_job_list(cmd_args):
from nvflare.fuel.flare_api.api_spec import AuthenticationError, NoConnection, SubmitTokenJobDeleted
from nvflare.tool.cli_output import output_error, output_ok
from nvflare.tool.cli_schema import handle_schema_flag
handle_schema_flag(
job_sub_cmd_parser[CMD_JOB_LIST],
"nvflare job list",
[
"nvflare job list",
"nvflare job list -n cifar -m 10",
"nvflare job list --submit-token retry-001",
"nvflare job list --study cancer_research",
],
sys.argv[1:],
output_modes=_JSON_OUTPUT_MODES,
streaming=False,
mutating=False,
idempotent=True,
retry_token=_NO_RETRY_TOKEN_SCHEMA,
)
study = getattr(cmd_args, "study", "default")
try:
with _job_session_for_args(cmd_args, study=study) as sess:
jobs = sess.list_jobs(
name_prefix=getattr(cmd_args, "name", None),
id_prefix=getattr(cmd_args, "id", None),
reverse=getattr(cmd_args, "reverse", False),
limit=getattr(cmd_args, "max", None),
submit_token=getattr(cmd_args, "submit_token", None),
)
except AuthenticationError:
raise
except SubmitTokenJobDeleted as e:
data = {"job_id": e.job_id, "state": e.state, "deleted_time": e.deleted_time}
output_error(
"SUBMIT_TOKEN_JOB_DELETED",
exit_code=4,
detail=str(e),
data={key: value for key, value in data.items() if value is not None},
)
return
except NoConnection as e:
output_error("CONNECTION_FAILED", exit_code=2, detail=str(e))
return
for j in jobs:
if "study" not in j:
j["study"] = study
output_ok(jobs)
def _format_job_meta_value(value):
if value is None or value == "":
return "-"
if isinstance(value, bool):
return "yes" if value else "no"
if isinstance(value, (dict, list)):
return json.dumps(value, default=str)
return str(value)
def _format_deploy_map(deploy_map):
if not isinstance(deploy_map, dict) or not deploy_map:
return "-"
parts = []
for app_name, targets in deploy_map.items():
if isinstance(targets, (list, tuple)):
target_text = ", ".join(str(target) for target in targets)
else:
target_text = str(targets)
parts.append(f"{app_name} -> {target_text}")
return "; ".join(parts)
def _format_submitter(meta: dict):
submitter = meta.get("submitter_name")
if not submitter:
return "-"
role = meta.get("submitter_role")
org = meta.get("submitter_org")
if role and org:
return f"{submitter} ({role} @ {org})"
if role:
return f"{submitter} ({role})"
if org:
return f"{submitter} ({org})"
return submitter
def _format_job_meta_timestamp(value, include_zone=False):
if value is None or value == "":
return None
try:
if isinstance(value, (int, float)):
dt = datetime.datetime.fromtimestamp(value)
else:
text = str(value).strip()
if text.endswith("Z"):
text = text[:-1] + "+00:00"
dt = datetime.datetime.fromisoformat(text)
formatted = dt.strftime("%Y-%m-%d %H:%M:%S")
if include_zone and dt.tzinfo:
offset = dt.strftime("%z")
zone = {"-0700": "PDT", "-0800": "PST"}.get(offset, dt.tzname() or offset)
if zone:
formatted = f"{formatted} {zone}"
return formatted
except (TypeError, ValueError):
return str(value)
def _format_job_meta_duration(value):
if value is None or value == "":
return None
if isinstance(value, (int, float)):
total_seconds = int(round(value))
else:
text = str(value).strip()
try:
parts = text.split(":")
if len(parts) == 3:
hours = int(parts[0])
minutes = int(parts[1])
seconds = int(float(parts[2]))
total_seconds = hours * 3600 + minutes * 60 + seconds
else:
total_seconds = int(round(float(text)))
except (TypeError, ValueError):
return text
hours, remainder = divmod(total_seconds, 3600)
minutes, seconds = divmod(remainder, 60)
pieces = []
if hours:
pieces.append(f"{hours}h")
if minutes:
pieces.append(f"{minutes}m")
if seconds or not pieces:
pieces.append(f"{seconds}s")
return " ".join(pieces)
def _print_section_table(print_func, sections):
headers = ["Section", "Field", "Value"]
rows = []
for section, fields in sections:
visible_fields = [(field, _format_job_meta_value(value)) for field, value in fields if value is not None]
for index, (field, value) in enumerate(visible_fields):
rows.append([section if index == 0 else "", field, value, index == 0])
if not rows:
return
widths = [max(len(headers[col_index]), *(len(row[col_index]) for row in rows)) for col_index in range(len(headers))]
def _border(left, middle, right):
return left + middle.join("─" * (width + 2) for width in widths) + right
def _row(values):
return "│ " + " │ ".join(value.ljust(width) for value, width in zip(values, widths)) + " │"
print_func(_border("┌", "┬", "┐"))
print_func(_row(headers))
print_func(_border("├", "┼", "┤"))
for index, row in enumerate(rows):
if index and row[3]:
print_func(_border("├", "┼", "┤"))
print_func(_row(row[:3]))
print_func(_border("└", "┴", "┘"))
def _print_job_meta_human(meta: dict, print_func):
deployment_fields = []
for entry in meta.get("job_deploy_detail") or []:
if isinstance(entry, str) and ":" in entry:
site, status = entry.split(":", 1)
deployment_fields.append((site.strip(), status.strip()))
else:
deployment_fields.append(("detail", _format_job_meta_value(entry)))
sections = [
(
"Identity",
[
("Job ID", meta.get("job_id")),
("Name", meta.get("name")),
("Study", meta.get("study")),
("Submitter", _format_submitter(meta)),
],
),
(
"Status",
[
("Status", meta.get("status")),
("Submitted", _format_job_meta_timestamp(meta.get("submit_time_iso") or meta.get("submit_time"), True)),
("Started", _format_job_meta_timestamp(meta.get("start_time"))),
("Duration", _format_job_meta_duration(meta.get("duration"))),
],
),
(
"Execution",
[
("Min clients", meta.get("min_clients")),
("BYOC", meta.get("byoc")),
("Deploy map", _format_deploy_map(meta.get("deploy_map"))),
],
),
("Deployment", deployment_fields),
(
"Schedule",
[
("Attempts", meta.get("schedule_count")),
("Last run", _format_job_meta_timestamp(meta.get("last_schedule_time"))),
],
),
(
"Details",
[
("Job folder", meta.get("job_folder_name")),
("Storage format", meta.get("data_storage_format")),
],
),
]
if meta.get("resource_spec"):
sections[2][1].append(("Resource spec", meta.get("resource_spec")))
_print_section_table(print_func, sections)
[docs]
def cmd_job_abort(cmd_args):
from nvflare.fuel.flare_api.api_spec import AuthenticationError, JobNotFound, JobNotRunning, NoConnection
from nvflare.tool.cli_output import output_error, output_ok
from nvflare.tool.cli_schema import handle_schema_flag
handle_schema_flag(
job_sub_cmd_parser[CMD_JOB_ABORT],
"nvflare job abort",
["nvflare job abort <job_id>", "nvflare job abort <job_id> --study cancer --force"],
sys.argv[1:],
)
study = get_arg_value(cmd_args, "study", "default")
if not cmd_args.force:
if not sys.stdin.isatty():
output_error(
"INVALID_ARGS",
exit_code=4,
detail="use --force in non-interactive mode",
)
raise SystemExit(4)
from nvflare.tool.cli_output import print_human, prompt_yn
if not prompt_yn(f"Abort job '{cmd_args.job_id}' in study '{study}'?"):
print_human("Aborted.")
return
try:
with _job_session_for_args(cmd_args, study=study) as sess:
sess.abort_job(cmd_args.job_id)
except JobNotFound:
output_error(
"JOB_NOT_FOUND",
job_id=cmd_args.job_id,
detail=f"searched study '{study}'",
hint=_job_not_found_hint(study),
)
return
except JobNotRunning:
output_error(
"JOB_NOT_RUNNING",
job_id=cmd_args.job_id,
detail="abort is available only while the job is running",
)
return
except AuthenticationError:
raise
except NoConnection as e:
output_error("CONNECTION_FAILED", exit_code=2, detail=str(e))
return
output_ok({"job_id": cmd_args.job_id, "status": "ABORTED"})
[docs]
def cmd_job_clone(cmd_args):
from nvflare.fuel.flare_api.api_spec import AuthenticationError, JobNotFound, NoConnection
from nvflare.tool.cli_output import output_error, output_ok
from nvflare.tool.cli_schema import handle_schema_flag
handle_schema_flag(
job_sub_cmd_parser[CMD_JOB_CLONE],
"nvflare job clone",
["nvflare job clone <job_id>", "nvflare job clone <job_id> --study cancer"],
sys.argv[1:],
)
study = get_arg_value(cmd_args, "study", "default")
try:
with _job_session_for_args(cmd_args, study=study) as sess:
new_job_id = sess.clone_job(cmd_args.job_id)
except JobNotFound:
output_error(
"JOB_NOT_FOUND",
job_id=cmd_args.job_id,
detail=f"searched study '{study}'",
hint=_job_not_found_hint(study),
)
return
except AuthenticationError:
raise
except NoConnection as e:
output_error("CONNECTION_FAILED", exit_code=2, detail=str(e))
return
output_ok({"source_job_id": cmd_args.job_id, "new_job_id": new_job_id})
[docs]
def cmd_job_download(cmd_args):
from nvflare.fuel.flare_api.api_spec import AuthenticationError, JobNotDone, JobNotFound, NoConnection
from nvflare.tool.cli_output import is_json_mode, output_error, output_ok, print_human
from nvflare.tool.cli_schema import handle_schema_flag
handle_schema_flag(
job_sub_cmd_parser[CMD_JOB_DOWNLOAD],
"nvflare job download",
[
"nvflare job download <job_id>",
"nvflare job download <job_id> -o /path/to/results",
"nvflare job download <job_id> --study cancer",
"nvflare job download <job_id> --force",
],
sys.argv[1:],
output_modes=_JSON_OUTPUT_MODES,
streaming=False,
mutating=True,
idempotent=False,
retry_token=_NO_RETRY_TOKEN_SCHEMA,
)
json_mode = is_json_mode()
study = get_arg_value(cmd_args, "study", "default")
force = get_arg_value(cmd_args, "force", False)
destination = _job_download_destination(cmd_args.job_id, getattr(cmd_args, "output_dir", None))
expected_download_path = os.path.join(destination, cmd_args.job_id)
job_wait_hint = (
f"Use 'nvflare job wait {cmd_args.job_id} --study {study}' or "
f"'nvflare job monitor {cmd_args.job_id} --study {study}' before downloading results."
)
try:
with _job_session_for_args(cmd_args, study=study) as sess:
meta = sess.get_job_meta(cmd_args.job_id)
status = meta.get("status") if isinstance(meta, dict) else None
if status and not _is_terminal_job_status(status):
output_error(
"JOB_NOT_DONE",
exit_code=4,
job_id=cmd_args.job_id,
detail=f"current status: {status}; searched study '{study}'",
hint=job_wait_hint,
)
return
if os.path.exists(expected_download_path):
if not force:
output_error(
"INVALID_ARGS",
exit_code=4,
detail=f"download destination already exists: {expected_download_path}",
hint="Use --force to replace the existing download, remove the directory, or choose a different --output-dir.",
)
return
if os.path.isdir(expected_download_path) and not os.path.islink(expected_download_path):
shutil.rmtree(expected_download_path)
else:
os.remove(expected_download_path)
if not json_mode:
print_human(f"Downloading job {cmd_args.job_id} from study {study} ...")
path = sess.download_job_result(cmd_args.job_id, destination)
except JobNotFound:
output_error(
"JOB_NOT_FOUND",
job_id=cmd_args.job_id,
detail=f"searched study '{study}'",
hint=_job_not_found_hint(study),
)
return
except JobNotDone as e:
output_error(
"JOB_NOT_DONE",
exit_code=4,
job_id=cmd_args.job_id,
detail=f"{e}; searched study '{study}'",
hint=job_wait_hint,
)
return
except AuthenticationError:
raise
except NoConnection as e:
output_error("CONNECTION_FAILED", exit_code=2, detail=str(e))
return
except shutil.Error as e:
output_error(
"INVALID_ARGS",
exit_code=4,
detail=str(e),
hint="Use --force to replace the existing download, remove the directory, or choose a different --output-dir.",
)
return
except OSError as e:
output_error("OUTPUT_DIR_NOT_WRITABLE", path=destination, detail=str(e))
return
path = str(path or destination)
download_path = _local_download_path(path)
if not json_mode:
print_human(f"Job result downloaded to: {download_path or path}")
payload = {
"job_id": cmd_args.job_id,
"download_path": download_path,
"path": download_path or path,
}
if download_path and os.path.isdir(download_path):
artifacts, missing_artifacts = _discover_job_download_artifacts(download_path)
payload["artifact_discovery"] = "completed"
payload["artifacts"] = artifacts
payload["missing_artifacts"] = missing_artifacts
else:
payload["artifact_discovery"] = "skipped"
payload["artifacts"] = None
payload["missing_artifacts"] = None
if json_mode:
output_ok(payload)
[docs]
def cmd_job_delete(cmd_args):
from nvflare.fuel.flare_api.api_spec import AuthenticationError, JobNotDone, JobNotFound, NoConnection
from nvflare.tool.cli_output import output_error, output_ok
from nvflare.tool.cli_schema import handle_schema_flag
handle_schema_flag(
job_sub_cmd_parser[CMD_JOB_DELETE],
"nvflare job delete",
["nvflare job delete <job_id>", "nvflare job delete <job_id> --study cancer --force"],
sys.argv[1:],
)
study = get_arg_value(cmd_args, "study", "default")
job_abort_hint = (
f"Use 'nvflare job abort {cmd_args.job_id} --study {study}' to stop the job first, "
"or wait/monitor until it finishes before deleting."
)
if not cmd_args.force:
if not sys.stdin.isatty():
output_error(
"INVALID_ARGS",
exit_code=4,
detail="use --force in non-interactive mode",
)
raise SystemExit(4)
from nvflare.tool.cli_output import print_human, prompt_yn
if not prompt_yn(f"Delete job '{cmd_args.job_id}' in study '{study}'?"):
print_human("Cancelled.")
return
try:
with _job_session_for_args(cmd_args, study=study) as sess:
result = sess.delete_job(cmd_args.job_id)
except JobNotFound:
output_error(
"JOB_NOT_FOUND",
job_id=cmd_args.job_id,
detail=f"searched study '{study}'",
hint=_job_not_found_hint(study),
)
return
except JobNotDone as e:
output_error(
"JOB_NOT_DONE",
exit_code=4,
job_id=cmd_args.job_id,
detail=f"{e}; searched study '{study}'",
hint=job_abort_hint,
)
return
except AuthenticationError:
raise
except NoConnection as e:
output_error("CONNECTION_FAILED", exit_code=2, detail=str(e))
return
if not isinstance(result, dict):
result = {"job_id": cmd_args.job_id}
result.setdefault("job_id", cmd_args.job_id)
output_ok(result)
# ---------------------------------------------------------------------------
# Parser definitions for new commands
# ---------------------------------------------------------------------------
[docs]
def define_list_jobs_parser(job_subparser):
p = job_subparser.add_parser(CMD_JOB_LIST, help="list jobs on the server")
p.add_argument("-n", "--name", type=str, default=None, help="filter by name prefix")
p.add_argument("-i", "--id", type=str, default=None, help="filter by job ID prefix")
p.add_argument("-r", "--reverse", action="store_true", default=False, help="reverse sort order")
p.add_argument("-m", "--max", type=int, default=None, help="max results to return")
p.add_argument("--study", type=str, default="default", help="study to list jobs from")
p.add_argument(
"--submit-token",
type=_submit_token_arg,
default=None,
help="retry-safe submit token to resolve a submitted job",
)
add_startup_kit_selection_args(p)
p.add_argument("--schema", action="store_true", help="print command schema as JSON and exit")
job_sub_cmd_parser[CMD_JOB_LIST] = p
job_sub_cmd_handlers[CMD_JOB_LIST] = cmd_job_list
[docs]
def define_abort_job_parser(job_subparser):
p = job_subparser.add_parser(CMD_JOB_ABORT, help="abort a running job")
p.add_argument("job_id", type=str, help="job ID")
p.add_argument("--force", action="store_true", help="skip confirmation prompt")
p.add_argument("--study", type=str, default="default", help="study containing the job")
add_startup_kit_selection_args(p)
p.add_argument("--schema", action="store_true", help="print command schema as JSON and exit")
job_sub_cmd_parser[CMD_JOB_ABORT] = p
job_sub_cmd_handlers[CMD_JOB_ABORT] = cmd_job_abort
[docs]
def define_clone_job_parser(job_subparser):
p = job_subparser.add_parser(CMD_JOB_CLONE, help="clone an existing job")
p.add_argument("job_id", type=str, help="job ID to clone")
p.add_argument("--study", type=str, default="default", help="study containing the source job")
add_startup_kit_selection_args(p)
p.add_argument("--schema", action="store_true", help="print command schema as JSON and exit")
job_sub_cmd_parser[CMD_JOB_CLONE] = p
job_sub_cmd_handlers[CMD_JOB_CLONE] = cmd_job_clone
[docs]
def define_download_job_parser(job_subparser):
p = job_subparser.add_parser(CMD_JOB_DOWNLOAD, help="download job result")
p.add_argument("job_id", type=str, help="job ID")
p.add_argument(
"-o",
"--output-dir",
dest="output_dir",
type=str,
default=None,
help="destination directory, default to ./<job_id>",
)
p.add_argument("--study", type=str, default="default", help="study containing the job")
p.add_argument("--force", action="store_true", help="replace an existing local download directory")
add_startup_kit_selection_args(p)
p.add_argument("--schema", action="store_true", help="print command schema as JSON and exit")
job_sub_cmd_parser[CMD_JOB_DOWNLOAD] = p
job_sub_cmd_handlers[CMD_JOB_DOWNLOAD] = cmd_job_download
[docs]
def define_delete_job_parser(job_subparser):
p = job_subparser.add_parser(CMD_JOB_DELETE, help="delete a job")
p.add_argument("job_id", type=str, help="job ID")
p.add_argument("--force", action="store_true", help="skip confirmation prompt")
p.add_argument("--study", type=str, default="default", help="study containing the job")
add_startup_kit_selection_args(p)
p.add_argument("--schema", action="store_true", help="print command schema as JSON and exit")
job_sub_cmd_parser[CMD_JOB_DELETE] = p
job_sub_cmd_handlers[CMD_JOB_DELETE] = cmd_job_delete
_TERMINAL_JOB_STATES = {
"FINISHED_OK",
"FINISHED_EXCEPTION",
"ABORTED",
"ABANDONED",
"FAILED",
}
def _is_terminal_job_status(status: str) -> bool:
return isinstance(status, str) and (status.startswith("FINISHED:") or status in _TERMINAL_JOB_STATES)
[docs]
def cmd_job_stats(cmd_args):
from nvflare.fuel.flare_api.api_spec import AuthenticationError, JobNotFound, JobNotRunning, NoConnection
from nvflare.tool.cli_output import output_error, output_ok
from nvflare.tool.cli_schema import handle_schema_flag
handle_schema_flag(
job_sub_cmd_parser[CMD_JOB_STATS],
"nvflare job stats",
[
"nvflare job stats abc123",
"nvflare job stats abc123 --site server",
"nvflare job stats abc123 --study cancer",
],
sys.argv[1:],
)
study = get_arg_value(cmd_args, "study", "default")
site = getattr(cmd_args, "site", "all")
if site == "all":
target_type = "all"
targets = None
elif site == "server":
target_type = "server"
targets = None
else:
target_type = "client"
targets = [site]
try:
with _job_session_for_args(cmd_args, study=study) as sess:
result = sess.show_stats(cmd_args.job_id, target_type, targets)
except JobNotFound:
output_error(
"JOB_NOT_FOUND",
job_id=cmd_args.job_id,
detail=f"searched study '{study}'",
hint=_job_not_found_hint(study),
)
return
except JobNotRunning:
output_error(
"JOB_NOT_RUNNING",
job_id=cmd_args.job_id,
detail="stats are available only while the job is running",
)
return
except AuthenticationError:
raise
except NoConnection as e:
output_error("CONNECTION_FAILED", exit_code=2, detail=str(e))
return
output_ok({"job_id": cmd_args.job_id, "stats": result})
def _normalize_log_timestamp(value: str) -> datetime.datetime:
text = (value or "").strip()
if not text:
raise ValueError("--since must be a non-empty timestamp")
if text.endswith("Z"):
text = text[:-1] + "+00:00"
text = text.replace(" ", "T", 1).replace(",", ".")
try:
ts = datetime.datetime.fromisoformat(text)
except ValueError as e:
raise ValueError(f"invalid --since timestamp: {value}") from e
if ts.tzinfo is not None:
ts = ts.astimezone(datetime.timezone.utc).replace(tzinfo=None)
return ts
def _log_since_arg(value: str) -> str:
try:
_normalize_log_timestamp(value)
except ValueError as e:
raise argparse.ArgumentTypeError(
f"{e}; accepted formats include ISO timestamps such as 2026-04-28T10:00:00 or 2026-04-28T10:00:00Z"
) from e
return value
def _parse_log_line_timestamp(line: str):
match = _JOB_LOG_TS_RE.match(line)
if not match:
return None
try:
# Naive server log timestamps are compared as UTC-equivalent naive values;
# explicit timezone offsets are normalized to UTC by _normalize_log_timestamp.
return _normalize_log_timestamp(match.group("ts"))
except ValueError:
return None
def _filter_log_since(text: str, since_ts: datetime.datetime):
lines = text.splitlines(keepends=True)
if not lines:
return text, False, False
filtered = []
keep_current = False
saw_timestamp = False
for line in lines:
# This is line-based filtering. Continuation lines are kept with the most
# recent parsed timestamp and can be imperfect for heavily interleaved logs.
line_ts = _parse_log_line_timestamp(line)
if line_ts is not None:
saw_timestamp = True
keep_current = line_ts >= since_ts
if keep_current:
filtered.append(line)
if not saw_timestamp:
return text, False, False
return "".join(filtered), True, len(filtered) != len(lines)
def _parse_log_json_line_timestamp(line: str):
try:
record = json.loads(line)
except (TypeError, ValueError):
return None
if not isinstance(record, dict):
return None
for key in ("asctime", "timestamp", "time"):
value = record.get(key)
if not value:
continue
try:
return _normalize_log_timestamp(str(value))
except ValueError:
continue
return None
def _filter_log_json_since(text: str, since_ts: datetime.datetime):
lines = text.splitlines(keepends=True)
if not lines:
return text, False, False
filtered = []
saw_timestamp = False
for line in lines:
line_ts = _parse_log_json_line_timestamp(line)
if line_ts is None:
continue
saw_timestamp = True
if line_ts >= since_ts:
filtered.append(line)
if not saw_timestamp:
return text, False, False
return "".join(filtered), True, len(filtered) != len(lines)
def _looks_like_json_log(text: str) -> bool:
for line in text.splitlines():
stripped = line.strip()
if not stripped:
continue
try:
return isinstance(json.loads(stripped), dict)
except (TypeError, ValueError):
return False
return False
def _format_log_json_record(record: dict) -> str:
asctime = record.get("asctime") or record.get("timestamp") or record.get("time") or ""
logger_name = record.get("name") or record.get("fullName") or ""
level = record.get("levelname") or record.get("level") or ""
fl_ctx = record.get("fl_ctx") or ""
message = record.get("message") or record.get("msg") or ""
parts = [str(part) for part in (asctime, logger_name, level, fl_ctx, message) if part not in (None, "")]
return " - ".join(parts) if parts else json.dumps(record, default=str)
def _render_log_json_as_text(text: str) -> str:
rendered = []
for line in text.splitlines():
stripped = line.strip()
if not stripped:
continue
try:
record = json.loads(stripped)
except (TypeError, ValueError):
rendered.append(line)
continue
if isinstance(record, dict):
rendered.append(_format_log_json_record(record))
else:
rendered.append(line)
if not rendered:
return ""
return "\n".join(rendered) + "\n"
def _tail_log_text(text: str, tail_lines: int):
lines = text.splitlines(keepends=True)
if tail_lines is None or len(lines) <= tail_lines:
return text, False
if tail_lines == 0:
return "", bool(lines)
return "".join(lines[-tail_lines:]), True
def _truncate_log_bytes(text: str, max_bytes: int):
if max_bytes is None:
return text, False
encoded = text.encode("utf-8")
if len(encoded) <= max_bytes:
return text, False
return encoded[:max_bytes].decode("utf-8", errors="ignore"), True
def _line_count(text: str) -> int:
return len(text.splitlines()) if text else 0
def _apply_job_log_bounds(logs: dict, tail_lines, since_ts, max_bytes):
bounded_logs = {}
sites = {}
any_truncated = False
since_applied_any = False
for site_name, log_text in logs.items():
if not isinstance(log_text, str):
bounded_logs[site_name] = log_text
sites[site_name] = {
"available": True,
"lines": None,
"logs_truncated": False,
}
continue
bounded = log_text
site_truncated = False
since_applied = False
if since_ts is not None:
if _looks_like_json_log(bounded):
bounded, since_applied, _since_filtered = _filter_log_json_since(bounded, since_ts)
else:
bounded, since_applied, _since_filtered = _filter_log_since(bounded, since_ts)
since_applied_any = since_applied_any or since_applied
bounded, tail_truncated = _tail_log_text(bounded, tail_lines)
bounded, byte_truncated = _truncate_log_bytes(bounded, max_bytes)
site_truncated = tail_truncated or byte_truncated
any_truncated = any_truncated or site_truncated
bounded_logs[site_name] = bounded
sites[site_name] = {
"available": True,
"lines": _line_count(bounded),
"bytes": len(bounded.encode("utf-8")),
"logs_truncated": site_truncated,
}
return bounded_logs, sites, any_truncated, since_applied_any
def _select_job_logs_for_output(result: dict, json_mode: bool):
logs = result.get("logs", {})
if not isinstance(logs, dict):
return {}
if json_mode:
return logs
return {
site_name: _render_log_json_as_text(log_text) if _looks_like_json_log(log_text) else log_text
for site_name, log_text in logs.items()
}
def _preferred_job_log_file(json_mode: bool) -> str:
return "log.json" if json_mode else "log.txt"
def _fallback_job_log_file(json_mode: bool) -> str:
return "log.txt" if json_mode else "log.json"
def _job_log_result_needs_fallback(result: dict, site: str) -> bool:
if not isinstance(result, dict):
return True
logs = result.get("logs", {})
if not isinstance(logs, dict):
return True
unavailable = result.get("unavailable", {})
if isinstance(unavailable, dict) and unavailable:
if site == "all":
return any(site_name not in logs for site_name in unavailable)
return site in unavailable
return not logs
def _merge_job_log_fallback_result(primary: dict, fallback: dict) -> dict:
if not isinstance(primary, dict):
primary = {}
if not isinstance(fallback, dict):
fallback = {}
primary_logs = primary.get("logs", {})
if not isinstance(primary_logs, dict):
primary_logs = {}
logs = dict(primary_logs)
primary_unavailable = primary.get("unavailable", {})
if not isinstance(primary_unavailable, dict):
primary_unavailable = {}
unavailable = dict(primary_unavailable)
fallback_logs = fallback.get("logs", {})
if isinstance(fallback_logs, dict):
for site_name, log_text in fallback_logs.items():
logs.setdefault(site_name, log_text)
unavailable.pop(site_name, None)
fallback_unavailable = fallback.get("unavailable", {})
if isinstance(fallback_unavailable, dict):
for site_name, reason in fallback_unavailable.items():
if site_name not in logs:
unavailable[site_name] = reason
result = dict(primary)
result["logs"] = logs
if unavailable:
result["unavailable"] = unavailable
else:
result.pop("unavailable", None)
return result
[docs]
def cmd_job_logs(cmd_args):
from nvflare.fuel.flare_api.api_spec import AuthenticationError, JobNotFound, NoConnection
from nvflare.tool.cli_output import is_json_mode, output_error, output_ok, print_human
from nvflare.tool.cli_schema import handle_schema_flag
handle_schema_flag(
job_sub_cmd_parser[CMD_JOB_LOGS],
"nvflare job logs",
[
"nvflare job logs abc123",
"nvflare job logs abc123 --site site-1",
"nvflare job logs abc123 --site all",
"nvflare job logs abc123 --site all --tail 200",
"nvflare job logs abc123 --study cancer",
],
sys.argv[1:],
output_modes=_JSON_OUTPUT_MODES,
streaming=False,
mutating=False,
idempotent=True,
retry_token=_NO_RETRY_TOKEN_SCHEMA,
)
site = getattr(cmd_args, "site", "server")
study = getattr(cmd_args, "study", "default")
tail_lines = getattr(cmd_args, "tail", None)
since_value = getattr(cmd_args, "since", None)
max_bytes = getattr(cmd_args, "max_bytes", None)
default_tail_applied = tail_lines is None and since_value is None and max_bytes is None
if default_tail_applied:
tail_lines = _DEFAULT_JOB_LOG_TAIL_LINES
try:
since_ts = _normalize_log_timestamp(since_value) if since_value else None
except ValueError as e:
output_error("INVALID_ARGS", exit_code=4, detail=str(e))
return
json_mode = is_json_mode()
try:
with _job_session_for_args(cmd_args, study=study) as sess:
result = sess.get_job_logs(cmd_args.job_id, target=site, log_file_name=_preferred_job_log_file(json_mode))
if _job_log_result_needs_fallback(result, site):
fallback_result = sess.get_job_logs(
cmd_args.job_id, target=site, log_file_name=_fallback_job_log_file(json_mode)
)
result = _merge_job_log_fallback_result(result, fallback_result)
except JobNotFound:
output_error(
"JOB_NOT_FOUND",
job_id=cmd_args.job_id,
detail=f"searched study '{study}'",
hint=_job_not_found_hint(study),
)
return
except AuthenticationError:
raise
except NoConnection as e:
output_error("CONNECTION_FAILED", exit_code=2, detail=str(e))
return
except Exception as e:
output_error("INTERNAL_ERROR", exit_code=5, detail=str(e))
return
logs = _select_job_logs_for_output(result, json_mode)
unavailable = result.get("unavailable", {})
if site != "all" and site != "server" and site in unavailable and site not in logs:
output_error(
"LOG_NOT_FOUND",
exit_code=1,
site=site,
detail=unavailable.get(site),
)
return
logs, sites, logs_truncated, since_applied = _apply_job_log_bounds(logs, tail_lines, since_ts, max_bytes)
for site_name, reason in unavailable.items():
sites[site_name] = {"available": False, "reason": reason}
payload = {"job_id": cmd_args.job_id, "target": site, "logs": logs}
payload["logs_truncated"] = logs_truncated
payload["sites"] = sites
payload["filters"] = {
"tail": tail_lines,
"since": since_value,
"since_applied": since_applied,
"max_bytes": max_bytes,
"default_tail_applied": default_tail_applied,
}
if unavailable:
payload["unavailable"] = unavailable
if not json_mode:
_print_job_logs_human(site, logs, unavailable, print_human)
return
output_ok(payload)
def _print_job_logs_human(target: str, logs: dict, unavailable: dict, print_func):
def _print_log_text(text):
if text:
print_func(text, end="" if text.endswith("\n") else "\n")
else:
print_func("(no log content)")
if target != "all" and target in logs and len(logs) == 1 and not unavailable:
_print_log_text(logs[target])
return
site_names = list(logs.keys())
if "server" in site_names:
site_names.remove("server")
site_names.insert(0, "server")
for index, site_name in enumerate(site_names):
if index:
print_func()
print_func(f"===== {site_name} =====")
_print_log_text(logs[site_name])
if unavailable:
if logs:
print_func()
print_func("Unavailable logs:", file=sys.stderr)
for site_name in sorted(unavailable):
print_func(f"{site_name}: {unavailable[site_name]}", file=sys.stderr)
[docs]
def define_job_stats_parser(job_subparser):
p = job_subparser.add_parser(CMD_JOB_STATS, help="show running job statistics")
p.add_argument("job_id", type=str, help="job ID")
p.add_argument("--site", default="all", help="target site name or all")
p.add_argument("--study", type=str, default="default", help="study containing the job")
add_startup_kit_selection_args(p)
p.add_argument("--schema", action="store_true", help="print command schema as JSON and exit")
job_sub_cmd_parser[CMD_JOB_STATS] = p
job_sub_cmd_handlers[CMD_JOB_STATS] = cmd_job_stats
[docs]
def define_job_logs_parser(job_subparser):
p = job_subparser.add_parser(CMD_JOB_LOGS, help="retrieve job logs from the server-side log store")
p.add_argument("job_id", type=str, help="job ID")
p.add_argument(
"--sites",
"--site",
dest="site",
default="server",
help="target site name, server, or all. Client logs must have been streamed to the server.",
)
p.add_argument(
"--tail",
type=_non_negative_int,
default=None,
help="return at most the last N log lines per site; applied after --since",
)
p.add_argument(
"--since",
type=_log_since_arg,
default=None,
help="return log lines at or after this timestamp; any explicit bound disables the default 500-line tail",
)
p.add_argument(
"--max-bytes",
type=_non_negative_int,
default=None,
help="return at most N bytes per site; applied after --since and --tail",
)
p.add_argument("--study", type=str, default="default", help="study to retrieve job logs from")
add_startup_kit_selection_args(p)
p.add_argument("--schema", action="store_true", help="print command schema as JSON and exit")
job_sub_cmd_parser[CMD_JOB_LOGS] = p
job_sub_cmd_handlers[CMD_JOB_LOGS] = cmd_job_logs
def _summarize_monitor_meta(meta: dict, job_meta_key_cls) -> dict:
if not meta:
return {}
fields = {
"job_name": job_meta_key_cls.JOB_NAME.value,
"status": job_meta_key_cls.STATUS.value,
"submit_time": job_meta_key_cls.SUBMIT_TIME_ISO.value,
"start_time": job_meta_key_cls.START_TIME.value,
"duration": job_meta_key_cls.DURATION.value,
"study": job_meta_key_cls.STUDY.value,
"submitter_name": job_meta_key_cls.SUBMITTER_NAME.value,
"submitter_org": job_meta_key_cls.SUBMITTER_ORG.value,
"submitter_role": job_meta_key_cls.SUBMITTER_ROLE.value,
}
summary = {}
for out_key, meta_key in fields.items():
value = meta.get(meta_key)
if value not in (None, ""):
summary[out_key] = value
return summary
def _parse_monitor_start_ts(meta: dict, start_time_key: str, submit_time_iso_key: str) -> float:
if not meta:
return None
start_time = meta.get(start_time_key)
if start_time:
try:
return datetime.datetime.strptime(start_time, "%Y-%m-%d %H:%M:%S.%f").timestamp()
except Exception:
pass
submit_time_iso = meta.get(submit_time_iso_key)
if submit_time_iso:
try:
return datetime.datetime.fromisoformat(submit_time_iso).timestamp()
except Exception:
pass
return None
def _parse_monitor_duration_seconds(value) -> float:
if value is None or value == "" or value == "N/A":
return None
if isinstance(value, (int, float)):
return float(value)
if not isinstance(value, str):
return None
parts = value.split(":")
try:
if len(parts) == 3:
hours = int(parts[0])
minutes = int(parts[1])
seconds = float(parts[2])
return hours * 3600 + minutes * 60 + seconds
if len(parts) == 2:
minutes = int(parts[0])
seconds = float(parts[1])
return minutes * 60 + seconds
if len(parts) == 1:
return float(parts[0])
except Exception:
return None
return None
def _build_monitor_key_aliases(extra_metrics: list) -> dict:
aliases = {
"round": [
"round",
"global_round",
"current_round",
"iteration",
"iter",
"epoch",
],
"accuracy": ["accuracy", "acc", "val_acc", "test_acc"],
"loss": ["loss", "train_loss", "val_loss", "test_loss"],
}
for metric in extra_metrics:
aliases[metric] = [metric]
return aliases
def _extract_monitor_metrics(stats: dict, key_aliases: dict) -> dict:
if not isinstance(stats, dict):
return {}
def _find_key(d: dict, keys: list):
for k in keys:
if k in d and isinstance(d[k], (int, float, str)):
return d[k]
return None
def _search(d: dict, keys: list):
if not isinstance(d, dict):
return None
value = _find_key(d, keys)
if value is not None:
return value
for v in d.values():
if isinstance(v, dict):
found = _search(v, keys)
if found is not None:
return found
return None
metrics = {}
for out_key, aliases in key_aliases.items():
value = _search(stats, aliases)
if value is not None:
metrics[out_key] = value
return metrics
def _make_monitor_state() -> dict:
return {
"last_status": None,
"last_meta": None,
"last_emit_ts": 0.0,
"last_stats": None,
"last_stats_raw": None,
"last_stats_ts": 0.0,
}
def _refresh_monitor_stats(sess, job_id: str, state: dict, stats_target: str, key_aliases: dict):
try:
stats = sess.show_stats(job_id, stats_target, None)
state["last_stats_raw"] = stats
state["last_stats"] = _extract_monitor_metrics(stats, key_aliases)
except Exception:
state["last_stats"] = None
state["last_stats_raw"] = None
def _emit_monitor_progress(job_id: str, job_meta: dict, state: dict, now: float, start: float, start_ts):
from nvflare.apis.job_def import JobMetaKey
from nvflare.tool.cli_output import print_human
status = job_meta.get("status", "UNKNOWN") if job_meta else "UNKNOWN"
summary = _summarize_monitor_meta(job_meta, JobMetaKey)
name = summary.get("job_name")
elapsed_base = start_ts if start_ts is not None else start
elapsed = round(now - elapsed_base, 1)
message_parts = []
if state["last_status"] is None:
message_parts.append(f"job_id: {job_id}")
if name:
message_parts.append(f"name: {name}")
submit_time = summary.get("submit_time")
if submit_time:
message_parts.append(f"submit_time: {submit_time}")
message_parts.append(f"status: {status}")
message_parts.append(f"elapsed_s: {elapsed}")
metrics = state.get("last_stats") or {}
if metrics:
metric_str = " ".join(f"{k}={v}" for k, v in metrics.items())
message_parts.append(f"metrics: {metric_str}")
print_human(" ".join(message_parts))
state["last_status"] = status
state["last_emit_ts"] = now
def _normalize_monitor_event_status(status: str) -> str:
if not status:
return "UNKNOWN"
if status == "FINISHED_OK" or status.startswith("FINISHED:COMPLETED"):
return "COMPLETED"
if status == "FINISHED_EXCEPTION" or status == "FAILED" or status.startswith("FINISHED:EXECUTION_EXCEPTION"):
return "FAILED"
if status == "ABORTED" or status.startswith("FINISHED:ABORTED"):
return "ABORTED"
if status == "ABANDONED" or status.startswith("FINISHED:ABANDONED"):
return "ABANDONED"
return status
def _build_monitor_progress_event(job_id: str, job_meta: dict, state: dict, now: float, start: float, start_ts) -> dict:
from nvflare.apis.job_def import JobMetaKey
raw_status = job_meta.get("status", "UNKNOWN") if job_meta else "UNKNOWN"
elapsed_base = start_ts if start_ts is not None else start
event = {
"event": "progress",
"job_id": job_id,
"status": _normalize_monitor_event_status(raw_status),
"job_status": raw_status,
"terminal": False,
"elapsed_s": round(now - elapsed_base, 1),
"job_meta": _summarize_monitor_meta(job_meta, JobMetaKey),
}
metrics = state.get("last_stats") or {}
if metrics:
event["metrics"] = metrics
return event
def _emit_monitor_jsonl_progress(job_id: str, job_meta: dict, state: dict, now: float, start: float, start_ts):
from nvflare.tool.cli_output import output_jsonl_event
output_jsonl_event(_build_monitor_progress_event(job_id, job_meta, state, now, start, start_ts))
state["last_status"] = job_meta.get("status", "UNKNOWN") if job_meta else "UNKNOWN"
state["last_emit_ts"] = now
def _build_monitor_terminal_event(data: dict, event: str = "terminal", error_code: str = None) -> dict:
raw_status = data.get("status", "UNKNOWN") if data else "UNKNOWN"
result = {
"event": event,
"job_id": data.get("job_id") if data else None,
"status": _normalize_monitor_event_status(raw_status),
"job_status": raw_status,
"terminal": True,
"duration_s": data.get("duration_s") if data else None,
"job_meta": data.get("job_meta") if data else {},
"metrics": data.get("last_stats") if data else None,
}
if data and "stats_raw" in data:
result["stats_raw"] = data.get("stats_raw")
if error_code:
result["error_code"] = error_code
return result
def _build_monitor_timeout_event(job_id: str, timeout: int, start: float, start_ts, cb_state: dict) -> dict:
from nvflare.apis.job_def import JobMetaKey
elapsed_base = start_ts if start_ts is not None else start
event = {
"event": "terminal",
"job_id": job_id,
"status": "TIMEOUT",
"terminal": True,
"timeout_seconds": timeout,
"elapsed_s": round(time.time() - elapsed_base, 1),
}
last_meta = cb_state.get("last_meta")
if last_meta:
raw_status = last_meta.get("status", "UNKNOWN")
event["job_status"] = raw_status
event["job_meta"] = _summarize_monitor_meta(last_meta, JobMetaKey)
metrics = cb_state.get("last_stats")
if metrics:
event["metrics"] = metrics
return event
def _build_monitor_status_callback(
start: float,
start_ts_holder: dict,
emit_interval: int,
stats_interval: int,
stats_target: str,
key_aliases: dict,
jsonl_mode: bool = False,
quiet_mode: bool = False,
):
def _status_cb(sess, job_id, job_meta, state):
from nvflare.apis.job_def import JobMetaKey
state["last_meta"] = job_meta
status = job_meta.get("status", "UNKNOWN") if job_meta else "UNKNOWN"
now = time.time()
if start_ts_holder["value"] is None:
start_ts_holder["value"] = _parse_monitor_start_ts(
job_meta, JobMetaKey.START_TIME.value, JobMetaKey.SUBMIT_TIME_ISO.value
)
if status in ("RUNNING", "DISPATCHED") and now - state["last_stats_ts"] >= stats_interval:
_refresh_monitor_stats(sess, job_id, state, stats_target, key_aliases)
state["last_stats_ts"] = now
if status != state["last_status"] or now - state["last_emit_ts"] >= emit_interval:
if jsonl_mode:
_emit_monitor_jsonl_progress(job_id, job_meta, state, now, start, start_ts_holder["value"])
elif quiet_mode:
state["last_status"] = status
state["last_emit_ts"] = now
else:
_emit_monitor_progress(job_id, job_meta, state, now, start, start_ts_holder["value"])
return not _is_terminal_job_status(status)
return _status_cb
def _build_monitor_output_data(
job_id: str, meta: dict, start: float, start_ts, cb_state: dict, json_mode: bool
) -> dict:
from nvflare.apis.job_def import JobMetaKey
meta_duration_s = _parse_monitor_duration_seconds(meta.get("duration") if meta else None)
if meta_duration_s is not None:
duration = round(meta_duration_s, 1)
elif start_ts is not None:
duration = round(time.time() - start_ts, 1)
else:
duration = round(time.time() - start, 1)
data = {
"job_id": job_id,
"status": meta.get("status", "UNKNOWN"),
"duration_s": duration,
"job_meta": _summarize_monitor_meta(meta, JobMetaKey),
"last_stats": cb_state.get("last_stats"),
}
if json_mode:
data["stats_raw"] = cb_state.get("last_stats_raw")
return data
def _print_monitor_result_human(data: dict, print_func):
job_id = data.get("job_id")
status = data.get("status", "UNKNOWN")
normalized_status = _normalize_monitor_event_status(status)
status_text = normalized_status if normalized_status == status else f"{normalized_status} ({status})"
print_func(f"Job {job_id} status: {status_text}")
duration = data.get("duration_s")
if duration is not None:
print_func(f" Duration: {duration}s")
job_meta = data.get("job_meta") or {}
if job_meta.get("job_name"):
print_func(f" Name: {job_meta['job_name']}")
if job_meta.get("study"):
print_func(f" Study: {job_meta['study']}")
metrics = data.get("last_stats") or {}
if metrics:
metric_str = " ".join(f"{k}={v}" for k, v in metrics.items())
print_func(f" Metrics: {metric_str}")
def _job_terminal_failure_error(status: str):
if status == "FAILED":
return (
"JOB_FAILED",
"Use 'nvflare job logs <job_id>' and 'nvflare job meta <job_id>' to inspect the failure.",
)
if status == "FINISHED_EXCEPTION" or status.startswith("FINISHED:EXECUTION_EXCEPTION"):
return (
"JOB_FINISHED_EXCEPTION",
"Use 'nvflare job logs <job_id>' and 'nvflare job meta <job_id>' to inspect the failure.",
)
if status == "ABORTED" or status.startswith("FINISHED:ABORTED"):
return (
"JOB_ABORTED",
"Use 'nvflare job meta <job_id>' to see abort details.",
)
if status == "ABANDONED" or status.startswith("FINISHED:ABANDONED"):
return (
"JOB_ABANDONED",
"Use 'nvflare job meta <job_id>' to inspect the abandonment details.",
)
if status.startswith("FINISHED:") and not status.startswith("FINISHED:COMPLETED"):
return (
"JOB_FAILED",
"Use 'nvflare job logs <job_id>' and 'nvflare job meta <job_id>' to inspect the failure.",
)
return None
def _job_not_found_hint(study: str) -> str:
if study == "default":
return (
"This command searched study 'default'. "
"Use 'nvflare job list --study <study_name>' to check jobs in another study, "
"then rerun this command with --study <study_name>."
)
return (
f"This command searched study '{study}'. "
f"Use 'nvflare job list --study {study}' to verify the job ID, "
"or rerun this command with the correct --study value."
)
[docs]
def cmd_job_monitor(cmd_args):
from nvflare.fuel.flare_api.api_spec import (
AuthenticationError,
AuthorizationError,
JobNotFound,
MonitorReturnCode,
NoConnection,
)
from nvflare.tool.cli_output import (
is_json_mode,
is_jsonl_mode,
output_error,
output_jsonl_event,
output_ok,
print_human,
)
from nvflare.tool.cli_schema import handle_schema_flag
handle_schema_flag(
job_sub_cmd_parser[CMD_JOB_MONITOR],
"nvflare job monitor",
[
"nvflare job monitor abc123",
"nvflare job monitor abc123 --timeout 3600",
"nvflare job monitor abc123 --study cancer",
"nvflare job monitor abc123 --timeout 3600 --format jsonl",
],
sys.argv[1:],
streaming=True,
output_modes=["json", "jsonl"],
mutating=False,
idempotent=True,
retry_token=_NO_RETRY_TOKEN_SCHEMA,
)
jsonl_mode = is_jsonl_mode()
json_mode = is_json_mode()
study = get_arg_value(cmd_args, "study", "default")
start = time.time()
start_ts_holder = {"value": None}
timeout = getattr(cmd_args, "timeout", 0)
interval = getattr(cmd_args, "interval", 2)
if timeout < 0:
output_error("INVALID_ARGS", exit_code=4, detail="--timeout must be >= 0")
return
if interval <= 0:
output_error("INVALID_ARGS", exit_code=4, detail="--interval must be > 0")
return
cb_state = _make_monitor_state()
emit_interval = max(interval, 5)
stats_interval = max(interval, 10)
stats_target = getattr(cmd_args, "stats_target", "server")
extra_metrics = [m for m in (getattr(cmd_args, "metrics", None) or []) if m]
key_aliases = _build_monitor_key_aliases(extra_metrics)
status_cb = _build_monitor_status_callback(
start=start,
start_ts_holder=start_ts_holder,
emit_interval=emit_interval,
stats_interval=stats_interval,
stats_target=stats_target,
key_aliases=key_aliases,
jsonl_mode=jsonl_mode,
quiet_mode=json_mode,
)
try:
with _job_session_for_args(cmd_args, study=study) as sess:
rc, meta = sess.monitor_job_and_return_job_meta(
cmd_args.job_id,
timeout=timeout,
poll_interval=interval,
cb=status_cb,
state=cb_state,
)
except JobNotFound:
output_error(
"JOB_NOT_FOUND",
job_id=cmd_args.job_id,
detail=f"searched study '{study}'",
hint=_job_not_found_hint(study),
)
return
except NoConnection as e:
output_error("CONNECTION_FAILED", exit_code=2, detail=str(e))
return
except (AuthenticationError, AuthorizationError) as e:
output_error("AUTH_FAILED", exit_code=2, detail=str(e))
return
except Exception as e:
output_error("INTERNAL_ERROR", exit_code=5, detail=str(e))
return
if rc == MonitorReturnCode.TIMEOUT:
if jsonl_mode:
output_jsonl_event(
_build_monitor_timeout_event(
job_id=cmd_args.job_id,
timeout=timeout,
start=start,
start_ts=start_ts_holder["value"],
cb_state=cb_state,
)
)
sys.exit(3)
output_error(
"TIMEOUT",
exit_code=3,
detail="job did not reach terminal state within timeout",
)
return
if rc == MonitorReturnCode.ENDED_BY_CB:
meta = cb_state.get("last_meta")
if meta is None:
output_error(
"INTERNAL_ERROR",
exit_code=5,
detail="monitoring stopped before job metadata was available",
)
return
if not meta:
output_error("INTERNAL_ERROR", exit_code=5, detail="monitoring returned no job metadata")
return
status = meta.get("status", "UNKNOWN")
data = _build_monitor_output_data(
job_id=cmd_args.job_id,
meta=meta,
start=start,
start_ts=start_ts_holder["value"],
cb_state=cb_state,
json_mode=json_mode or jsonl_mode,
)
failure = _job_terminal_failure_error(status)
if failure:
error_code, hint = failure
if jsonl_mode:
output_jsonl_event(_build_monitor_terminal_event(data, error_code=error_code))
sys.exit(1)
if not json_mode:
_print_monitor_result_human(data, print_human)
output_error(error_code, exit_code=1, hint=hint, data=data if json_mode else None, job_id=cmd_args.job_id)
else:
if jsonl_mode:
output_jsonl_event(_build_monitor_terminal_event(data))
return
if json_mode:
output_ok(data)
else:
_print_monitor_result_human(data, print_human)
[docs]
def cmd_job_wait(cmd_args):
from nvflare.apis.job_def import JobMetaKey
from nvflare.fuel.flare_api.api_spec import (
AuthenticationError,
AuthorizationError,
JobNotFound,
JobTimeout,
NoConnection,
)
from nvflare.tool.cli_output import is_json_mode, output_error, output_ok, print_human
from nvflare.tool.cli_schema import handle_schema_flag
handle_schema_flag(
job_sub_cmd_parser[CMD_JOB_WAIT],
"nvflare job wait",
[
"nvflare job wait abc123",
"nvflare job wait abc123 --timeout 3600",
"nvflare job wait abc123 --study cancer",
],
sys.argv[1:],
output_modes=_JSON_OUTPUT_MODES,
streaming=False,
mutating=False,
idempotent=True,
retry_token=_NO_RETRY_TOKEN_SCHEMA,
)
study = get_arg_value(cmd_args, "study", "default")
start = time.time()
timeout = getattr(cmd_args, "timeout", 0)
interval = getattr(cmd_args, "interval", 2)
if timeout < 0:
output_error("INVALID_ARGS", exit_code=4, detail="--timeout must be >= 0")
return
if interval <= 0:
output_error("INVALID_ARGS", exit_code=4, detail="--interval must be > 0")
return
json_mode = is_json_mode()
try:
with _job_session_for_args(cmd_args, study=study) as sess:
if not json_mode:
print_human(f"Waiting for job {cmd_args.job_id} in study {study} ...")
meta = sess.wait_for_job(cmd_args.job_id, timeout=timeout, poll_interval=interval)
except JobTimeout as e:
output_error(
"TIMEOUT",
exit_code=3,
detail=str(e),
)
return
except JobNotFound:
output_error(
"JOB_NOT_FOUND",
job_id=cmd_args.job_id,
detail=f"searched study '{study}'",
hint=_job_not_found_hint(study),
)
return
except (AuthenticationError, AuthorizationError) as e:
output_error("AUTH_FAILED", exit_code=2, detail=str(e))
return
except NoConnection as e:
output_error("CONNECTION_FAILED", exit_code=2, detail=str(e))
return
except Exception as e:
output_error("INTERNAL_ERROR", exit_code=5, detail=str(e))
return
if not meta:
output_error("INTERNAL_ERROR", exit_code=5, detail="wait returned no job metadata")
return
meta = dict(meta)
meta.setdefault(JobMetaKey.STUDY.value, study)
start_ts = _parse_monitor_start_ts(
meta,
JobMetaKey.START_TIME.value,
JobMetaKey.SUBMIT_TIME_ISO.value,
)
data = _build_monitor_output_data(
job_id=cmd_args.job_id,
meta=meta,
start=start,
start_ts=start_ts,
cb_state=_make_monitor_state(),
json_mode=json_mode,
)
failure = _job_terminal_failure_error(data["status"])
if failure:
error_code, hint = failure
if not json_mode:
_print_monitor_result_human(data, print_human)
output_error(error_code, exit_code=1, hint=hint, data=data if json_mode else None, job_id=cmd_args.job_id)
else:
if json_mode:
output_ok(data)
else:
_print_monitor_result_human(data, print_human)
[docs]
def cmd_job_log(cmd_args):
from nvflare.fuel.flare_api.api_spec import (
AuthenticationError,
AuthorizationError,
InternalError,
InvalidTarget,
JobNotFound,
NoConnection,
NoReply,
)
from nvflare.tool.cli_output import output_error, output_ok, output_usage_error
from nvflare.tool.cli_schema import handle_schema_flag
invoked_sub_cmd = CMD_JOB_LOG_CONFIG
if len(sys.argv) > 2 and sys.argv[1] == "job" and sys.argv[2] in (CMD_JOB_LOG_CONFIG, CMD_JOB_LOG_ALIAS):
invoked_sub_cmd = sys.argv[2]
handle_schema_flag(
job_sub_cmd_parser[CMD_JOB_LOG_CONFIG],
f"nvflare job {invoked_sub_cmd}",
[
f"nvflare job {invoked_sub_cmd} abc123 DEBUG",
f"nvflare job {invoked_sub_cmd} abc123 concise",
f"nvflare job {invoked_sub_cmd} abc123 DEBUG --study cancer",
],
sys.argv[1:],
)
level = getattr(cmd_args, "level", None)
site = getattr(cmd_args, "site", "all")
study = get_arg_value(cmd_args, "study", "default")
if not level:
output_usage_error(
job_sub_cmd_parser[CMD_JOB_LOG_CONFIG],
"provide a valid level name or mode",
exit_code=4,
error_code="LOG_CONFIG_INVALID",
message="Log config is not a recognised log mode.",
hint="Supply one of: DEBUG, INFO, WARNING, ERROR, CRITICAL, concise, msg_only, full, verbose, reload.",
)
return
try:
with _job_session_for_args(cmd_args, study=study) as sess:
meta = sess.get_job_meta(cmd_args.job_id)
job_status = meta.get("status", "UNKNOWN") if meta else "UNKNOWN"
if _is_terminal_job_status(job_status):
output_error(
"JOB_NOT_RUNNING",
exit_code=1,
job_id=cmd_args.job_id,
detail=f"job is in terminal state: {job_status}",
)
raise SystemExit(1)
sess.configure_job_log(cmd_args.job_id, level, target=site)
except (AuthenticationError, AuthorizationError, NoConnection):
raise
except InternalError as e:
output_error("INTERNAL_ERROR", exit_code=5, detail=str(e))
return
except InvalidTarget:
output_error("SITE_NOT_FOUND", site=site)
return
except NoReply:
output_error("SITE_NOT_FOUND", site=site)
return
except JobNotFound:
output_error(
"JOB_NOT_FOUND",
job_id=cmd_args.job_id,
detail=f"searched study '{study}'",
hint=_job_not_found_hint(study),
)
return
except Exception as e:
output_error("INTERNAL_ERROR", exit_code=5, detail=str(e))
return
sites = [site] if site != "all" else ["all"]
output_ok(
{
"job_id": cmd_args.job_id,
"config": level,
"sites": sites,
"status": "applied",
}
)
[docs]
def define_job_monitor_parser(job_subparser):
p = job_subparser.add_parser(CMD_JOB_MONITOR, help="wait for a job and stream progress to stderr")
p.add_argument("job_id", type=str, help="job ID")
p.add_argument("--timeout", type=int, default=0, help="seconds to wait (0 = no timeout)")
p.add_argument("--interval", type=int, default=2, help="poll interval in seconds")
p.add_argument("--study", type=str, default="default", help="study to monitor the job in")
p.add_argument(
"--stats-target",
dest="stats_target",
choices=["server", "client", "all"],
default="server",
help="where to fetch stats from (default: server)",
)
p.add_argument(
"--metric",
dest="metrics",
action="append",
default=None,
help="extra metric key to surface from stats (repeatable)",
)
add_startup_kit_selection_args(p)
p.add_argument("--schema", action="store_true", help="print command schema as JSON and exit")
job_sub_cmd_parser[CMD_JOB_MONITOR] = p
job_sub_cmd_handlers[CMD_JOB_MONITOR] = cmd_job_monitor
[docs]
def define_job_wait_parser(job_subparser):
p = job_subparser.add_parser(CMD_JOB_WAIT, help="wait for a job to finish")
p.add_argument("job_id", type=str, help="job ID")
p.add_argument("--timeout", type=float, default=0, help="seconds to wait (0 = no timeout)")
p.add_argument("--interval", type=float, default=2, help="poll interval in seconds")
p.add_argument("--study", type=str, default="default", help="study to wait for the job in")
add_startup_kit_selection_args(p)
p.add_argument("--schema", action="store_true", help="print command schema as JSON and exit")
job_sub_cmd_parser[CMD_JOB_WAIT] = p
job_sub_cmd_handlers[CMD_JOB_WAIT] = cmd_job_wait
[docs]
def define_job_log_parser(job_subparser):
p = job_subparser.add_parser(
CMD_JOB_LOG_CONFIG,
aliases=[CMD_JOB_LOG_ALIAS],
help="change logging configuration for a running job",
)
p.add_argument("job_id", type=str, help="job ID")
p.add_argument(
"level",
nargs="?",
default=None,
help="log level or mode: DEBUG, INFO, WARNING, ERROR, CRITICAL, concise, msg_only, full, verbose, reload",
)
p.add_argument("--site", default="all", help="target site name or all")
p.add_argument("--study", type=str, default="default", help="study containing the job")
add_startup_kit_selection_args(p)
p.add_argument("--schema", action="store_true", help="print command schema as JSON and exit")
job_sub_cmd_parser[CMD_JOB_LOG_CONFIG] = p
job_sub_cmd_parser[CMD_JOB_LOG_ALIAS] = p
job_sub_cmd_handlers[CMD_JOB_LOG_CONFIG] = cmd_job_log
job_sub_cmd_handlers[CMD_JOB_LOG_ALIAS] = cmd_job_log