Source code for nvflare.tool.kit.kit_cli

# Copyright (c) 2026, NVIDIA CORPORATION.  All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""nvflare config startup kit command parser and handlers."""

import os
import sys
from typing import Callable, Dict

from nvflare.cli_unknown_cmd_exception import CLIUnknownCmdException
from nvflare.tool.cli_output import is_json_mode, output_error_message, output_ok, print_human
from nvflare.tool.cli_schema import handle_schema_flag
from nvflare.tool.kit.kit_config import (
    NVFLARE_STARTUP_KIT_DIR,
    StartupKitConfigError,
    add_startup_kit_entry,
    get_active_startup_kit_id,
    get_cli_config_path,
    get_startup_kit_entries,
    get_startup_kit_status,
    inspect_startup_kit_metadata,
    load_cli_config,
    remove_startup_kit_entry,
    save_cli_config,
    set_active_startup_kit,
)

CMD_KIT_ADD = "add"
CMD_KIT_USE = "use"
CMD_KIT_INSPECT = "inspect"
CMD_KIT_LIST = "list"
CMD_KIT_REMOVE = "remove"
# The startup kit commands are registered directly under nvflare config.
KIT_COMMAND = "nvflare config"

_JSON_OUTPUT_MODES = ["json"]
_NO_RETRY_TOKEN_SCHEMA = {"supported": False}

_kit_sub_cmd_parsers = {}


def _emit_kit_error(e: StartupKitConfigError, exit_code: int = 4):
    output_error_message("INVALID_ARGS", str(e), hint=e.hint, exit_code=exit_code)


def _metadata_for_output(path: str) -> Dict[str, str]:
    metadata = inspect_startup_kit_metadata(path)
    return {
        "identity": metadata.get("identity") or "-",
        "cert_role": metadata.get("cert_role") or "-",
        "role": metadata.get("role") or metadata.get("cert_role") or "-",
        "org": metadata.get("org") or "-",
        "project": metadata.get("project") or "-",
        "certificate": metadata.get("certificate"),
        "findings": metadata.get("findings") or [],
    }


def _json_value(value):
    return None if value in (None, "-") else value


[docs] def default_startup_kit_id(path: str) -> str: """Return the default local registry ID for a compatibility startup-kit path.""" metadata = inspect_startup_kit_metadata(path) identity = metadata.get("identity") return identity or os.path.basename(os.path.abspath(path.rstrip(os.sep)))
[docs] def cmd_kit_add(args): handle_schema_flag( _kit_sub_cmd_parsers[CMD_KIT_ADD], f"{KIT_COMMAND} add", [f"{KIT_COMMAND} add cancer_lead /secure/startup_kits/cancer/lead@nvidia.com"], sys.argv[1:], ) kit_id = args.kit_id.strip() try: config = load_cli_config() config = add_startup_kit_entry(config, kit_id, args.startup_kit_dir, force=args.force) save_cli_config(config) except StartupKitConfigError as e: _emit_kit_error(e) return output_ok( { "registered_startup_kit": kit_id, "path": get_startup_kit_entries(config)[kit_id], "next_step": f"{KIT_COMMAND} use {kit_id}", } )
[docs] def cmd_kit_use(args): handle_schema_flag( _kit_sub_cmd_parsers[CMD_KIT_USE], f"{KIT_COMMAND} use", [f"{KIT_COMMAND} use cancer_lead"], sys.argv[1:], output_modes=_JSON_OUTPUT_MODES, streaming=False, mutating=True, idempotent=True, retry_token=_NO_RETRY_TOKEN_SCHEMA, ) kit_id = args.kit_id.strip() try: config = load_cli_config() entries = get_startup_kit_entries(config) config = set_active_startup_kit(config, kit_id) save_cli_config(config) except StartupKitConfigError as e: _emit_kit_error(e) return path = entries[kit_id] metadata = _metadata_for_output(path) if is_json_mode(): data = { "startup_kit": { "source": "active", "id": kit_id, "path": path, }, "identity": { "name": _json_value(metadata["identity"]), "org": _json_value(metadata["org"]), "role": _json_value(metadata["role"]), }, "project": _json_value(metadata["project"]), "certificate": metadata["certificate"], "findings": metadata["findings"] + [ { "code": "CONFIG_USE_MUTATES_GLOBAL_STATE", "severity": "warning", "message": "nvflare config use changes the global active startup kit.", "hint": "Automation should prefer --kit-id or --startup-kit on each server-connected command.", } ], } output_ok(data) else: _render_startup_kit_table( [ { "active": "*", "id": kit_id, "status": "ok", "identity": metadata["identity"], "cert_role": metadata["cert_role"], "path": path, } ] )
def _print_env_warning(): env_path = os.getenv(NVFLARE_STARTUP_KIT_DIR) if env_path and not is_json_mode(): print_human(f"warning: {NVFLARE_STARTUP_KIT_DIR} is set ({env_path})") print_human(" normal commands will use this path instead of the active kit above") def _render_startup_kit_table(rows): keys = ["active", "id", "status", "identity", "cert_role", "path"] widths = [max(len(key), max(len(str(row.get(key, ""))) for row in rows)) for key in keys] header = " ".join(key.ljust(width) for key, width in zip(keys, widths)) print_human(header) print_human("-" * len(header)) for row in rows: print_human(" ".join(str(row.get(key, "")).ljust(width) for key, width in zip(keys, widths))) def _render_kit_inspect_human(data: dict): _render_startup_kit_table( [ { "active": "*", "id": data.get("active") or "-", "status": data.get("status") or "-", "identity": data.get("identity") or "-", "cert_role": data.get("cert_role") or "-", "path": data.get("path") or "-", } ] ) print_human("") print_human(f"config_file: {data.get('config_file') or '-'}") if data.get("hint"): print_human(f"hint: {data['hint']}")
[docs] def cmd_kit_inspect(args): handle_schema_flag( _kit_sub_cmd_parsers[CMD_KIT_INSPECT], f"{KIT_COMMAND} inspect", [f"{KIT_COMMAND} inspect"], sys.argv[1:], output_modes=_JSON_OUTPUT_MODES, streaming=False, mutating=False, idempotent=True, retry_token=_NO_RETRY_TOKEN_SCHEMA, ) try: config = load_cli_config() except StartupKitConfigError as e: _emit_kit_error(e) return active = get_active_startup_kit_id(config) entries = get_startup_kit_entries(config) if not active: _print_env_warning() if is_json_mode(): output_ok( { "active": None, "config_file": str(get_cli_config_path()), "findings": [ { "code": "NO_ACTIVE_STARTUP_KIT", "severity": "info", "message": "No active startup kit is configured.", "hint": f"Run {KIT_COMMAND} use <id>, or pass --kit-id/--startup-kit per command.", } ], } ) else: print_human("No active startup kit.") print_human(f"Hint: Run {KIT_COMMAND} use <id>.") return path = entries.get(active) data = {"active": active, "path": path or "-", "config_file": str(get_cli_config_path())} if path is None: data.update({"status": "unregistered", "hint": f"run {KIT_COMMAND} list, then {KIT_COMMAND} use <id>"}) else: status, normalized_path, metadata = get_startup_kit_status(path) data["status"] = status if normalized_path: data["path"] = normalized_path if status == "ok": data["identity"] = metadata.get("identity") or "-" data["cert_role"] = metadata.get("cert_role") or "-" if is_json_mode(): data["role"] = metadata.get("role") or metadata.get("cert_role") or "-" data["org"] = metadata.get("org") or "-" data["project"] = metadata.get("project") or "-" data["certificate"] = metadata.get("certificate") data["findings"] = metadata.get("findings") or [] else: data["identity"] = "-" data["cert_role"] = "-" data["hint"] = f"run {KIT_COMMAND} use <id> or {KIT_COMMAND} remove {active}" if is_json_mode(): data["role"] = "-" data["org"] = "-" data["project"] = "-" data["certificate"] = None data["findings"] = metadata.get("findings") or [] _print_env_warning() if is_json_mode(): output_ok(data) else: _render_kit_inspect_human(data)
[docs] def cmd_kit_list(args): handle_schema_flag( _kit_sub_cmd_parsers[CMD_KIT_LIST], f"{KIT_COMMAND} list", [f"{KIT_COMMAND} list"], sys.argv[1:], output_modes=_JSON_OUTPUT_MODES, streaming=False, mutating=False, idempotent=True, retry_token=_NO_RETRY_TOKEN_SCHEMA, ) try: config = load_cli_config() except StartupKitConfigError as e: _emit_kit_error(e) return active = get_active_startup_kit_id(config) entries = get_startup_kit_entries(config) rows = [] for kit_id, path in sorted(entries.items()): status, normalized_path, metadata = get_startup_kit_status(path) row = { "active": "*" if kit_id == active else "", "id": kit_id, "status": status, "identity": metadata.get("identity") or "-", "cert_role": metadata.get("cert_role") or "-", "path": normalized_path or path, } if is_json_mode(): row.update( { "role": metadata.get("role") or metadata.get("cert_role") or "-", "org": metadata.get("org") or "-", "project": metadata.get("project") or "-", "certificate": metadata.get("certificate"), "findings": metadata.get("findings") or [], } ) rows.append(row) if not rows and not is_json_mode(): print_human("No startup kits registered.") output_ok(rows)
[docs] def cmd_kit_remove(args): handle_schema_flag( _kit_sub_cmd_parsers[CMD_KIT_REMOVE], f"{KIT_COMMAND} remove", [f"{KIT_COMMAND} remove cancer_lead"], sys.argv[1:], ) kit_id = args.kit_id.strip() try: config = load_cli_config() was_active = get_active_startup_kit_id(config) == kit_id config = remove_startup_kit_entry(config, kit_id) save_cli_config(config) except StartupKitConfigError as e: _emit_kit_error(e) return data = {"removed_startup_kit": kit_id} if was_active: data["warning"] = "no active startup kit is configured" data["next_step"] = f"{KIT_COMMAND} use <id>" output_ok(data)
_KIT_HANDLERS: Dict[str, Callable] = { CMD_KIT_ADD: cmd_kit_add, CMD_KIT_USE: cmd_kit_use, CMD_KIT_INSPECT: cmd_kit_inspect, CMD_KIT_LIST: cmd_kit_list, CMD_KIT_REMOVE: cmd_kit_remove, }
[docs] def def_kit_cli_parser(sub_cmd): add_parser = sub_cmd.add_parser(CMD_KIT_ADD, help="register a startup kit path") add_parser.add_argument("kit_id", help="local startup kit ID") add_parser.add_argument("startup_kit_dir", help="admin/user startup kit directory") add_parser.add_argument("--force", action="store_true", help="replace an existing local registration") add_parser.add_argument("--schema", action="store_true", help="print command schema as JSON and exit") _kit_sub_cmd_parsers[CMD_KIT_ADD] = add_parser use_parser = sub_cmd.add_parser(CMD_KIT_USE, help="activate a registered startup kit") use_parser.add_argument("kit_id", help="local startup kit ID") use_parser.add_argument("--schema", action="store_true", help="print command schema as JSON and exit") _kit_sub_cmd_parsers[CMD_KIT_USE] = use_parser inspect_parser = sub_cmd.add_parser(CMD_KIT_INSPECT, help="inspect the configured active startup kit") inspect_parser.add_argument("--schema", action="store_true", help="print command schema as JSON and exit") _kit_sub_cmd_parsers[CMD_KIT_INSPECT] = inspect_parser list_parser = sub_cmd.add_parser(CMD_KIT_LIST, help="list registered startup kits") list_parser.add_argument("--schema", action="store_true", help="print command schema as JSON and exit") _kit_sub_cmd_parsers[CMD_KIT_LIST] = list_parser remove_parser = sub_cmd.add_parser(CMD_KIT_REMOVE, help="remove a local startup kit registration") remove_parser.add_argument("kit_id", help="local startup kit ID") remove_parser.add_argument("--schema", action="store_true", help="print command schema as JSON and exit") _kit_sub_cmd_parsers[CMD_KIT_REMOVE] = remove_parser return {name: parser for name, parser in _kit_sub_cmd_parsers.items()}
[docs] def handle_kit_cmd(args): sub_cmd = getattr(args, "config_sub_cmd", None) handler = _KIT_HANDLERS.get(sub_cmd) if handler: handler(args) elif sub_cmd is None: return else: raise CLIUnknownCmdException("invalid config command")