Source code for nvflare.tool.cert.cert_cli

# Copyright (c) 2026, NVIDIA CORPORATION.  All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""nvflare cert subcommand: parser registration and dispatch."""

import argparse
from typing import Optional

from nvflare.tool.cert.cert_constants import is_valid_provision_version

# Module-level parser references used by --schema in handlers and for help fallback.
_cert_init_parser: Optional[argparse.ArgumentParser] = None
_cert_request_parser: Optional[argparse.ArgumentParser] = None
_cert_approve_parser: Optional[argparse.ArgumentParser] = None
_cert_parser: Optional[argparse.ArgumentParser] = None


def _name_type(value: str) -> str:
    """Argparse type function: validate name length."""
    if len(value) > 64:
        raise argparse.ArgumentTypeError(f"name must be 64 characters or fewer (got {len(value)})")
    return value


def _positive_int(value: str) -> int:
    try:
        parsed = int(value)
    except ValueError as e:
        raise argparse.ArgumentTypeError(str(e))
    if parsed < 1:
        raise argparse.ArgumentTypeError("value must be >= 1")
    return parsed


def _provision_version_type(value: str) -> str:
    if not is_valid_provision_version(value):
        raise argparse.ArgumentTypeError("version must be exactly two digits, for example 00")
    return value


def _add_compat_output_arg(parser: argparse.ArgumentParser) -> None:
    parser.add_argument(
        "--output",
        dest="compat_output_format",
        choices=["json", "txt", "human"],
        default=None,
        help=argparse.SUPPRESS,
    )


def _def_cert_init_parser(cert_sub: argparse._SubParsersAction) -> argparse.ArgumentParser:
    global _cert_init_parser
    p = cert_sub.add_parser(
        "init",
        description="Initialize a root CA for distributed provisioning. Project Admin only. One-time per federation.",
        help="Initialize root CA for a distributed provisioning federation (Project Admin only).",
    )
    profile_arg = p.add_argument(
        "--profile",
        required=False,
        default=None,
        dest="profile",
        metavar="PROJECT_PROFILE",
        help="Project profile yaml file. The profile name is used as the CN of the root CA certificate.",
    )
    profile_arg.schema_required = True
    output_dir_arg = p.add_argument(
        "-o",
        "--output-dir",
        required=False,
        default=None,
        dest="output_dir",
        metavar="OUTPUT_DIR",
        help="Directory where CA files are written. Created if it does not exist.",
    )
    output_dir_arg.schema_required = True
    p.add_argument(
        "--org",
        required=False,
        default=None,
        dest="org",
        metavar="ORG",
        help="Organization name for the CA certificate (O field). Default: omitted.",
    )
    p.add_argument(
        "--valid-days",
        required=False,
        type=_positive_int,
        default=3650,
        dest="valid_days",
        metavar="DAYS",
        help="Validity period for the root CA certificate in days. Default: 3650.",
    )
    p.add_argument(
        "--deploy-version",
        required=False,
        type=_provision_version_type,
        default="00",
        dest="deploy_version",
        metavar="NN",
        help=(
            "Deployment generation used for package output directory prod_<NN>. "
            "Default: 00. Normally ignore this; use 01, 02, etc. only when creating a new deployment CA."
        ),
    )
    p.add_argument(
        "--force",
        action="store_true",
        default=False,
        help=(
            "Replace existing CA files only when --deploy-version differs from the existing CA deploy version. "
            "Fails with an error if --deploy-version matches the existing CA deploy version. "
            "If ca.json is absent, --deploy-version is not checked. Backs up existing files first."
        ),
    )
    p.add_argument(
        "--schema",
        action="store_true",
        default=False,
        help="Print JSON schema for this command and exit.",
    )
    _add_compat_output_arg(p)
    _cert_init_parser = p
    return p


def _def_cert_request_parser(cert_sub: argparse._SubParsersAction) -> argparse.ArgumentParser:
    global _cert_request_parser
    p = cert_sub.add_parser(
        "request",
        description="Create a local key, CSR, metadata, and request zip.",
        help="Create a distributed provisioning request zip.",
    )
    p.add_argument(
        "-p",
        "--participant",
        required=True,
        dest="participant",
        help="Participant definition YAML with top-level name and participants[0].",
    )
    p.add_argument(
        "--out",
        required=False,
        default=None,
        dest="output_dir",
        help="Request folder. Default: ./<name>.",
    )
    p.add_argument("--force", action="store_true", default=False, help="Overwrite existing request files.")
    p.add_argument("--schema", action="store_true", default=False, help="Print JSON schema for this command and exit.")
    _add_compat_output_arg(p)
    _cert_request_parser = p
    return p


def _def_cert_approve_parser(cert_sub: argparse._SubParsersAction) -> argparse.ArgumentParser:
    global _cert_approve_parser
    p = cert_sub.add_parser(
        "approve",
        description="Approve a request zip and create a signed zip. Project Admin only.",
        help="Approve a distributed provisioning request zip.",
    )
    p.add_argument("request_zip", help="Request zip produced by 'nvflare cert request'.")
    p.add_argument(
        "-c",
        "--ca-dir",
        required=True,
        dest="ca_dir",
        help="Directory containing rootCA.pem, rootCA.key, and ca.json.",
    )
    p.add_argument(
        "--out",
        required=False,
        default=None,
        dest="signed_zip",
        help="Signed zip output path. Default: <name>.signed.zip.",
    )
    p.add_argument(
        "--profile",
        required=True,
        dest="profile",
        help="Project profile YAML containing name, scheme, and connection_security.",
    )
    p.add_argument(
        "--valid-days",
        required=False,
        type=_positive_int,
        default=1095,
        dest="valid_days",
        help="Certificate validity in days. Default: 1095 (3 years).",
    )
    p.add_argument("--force", action="store_true", default=False, help="Overwrite existing signed zip.")
    p.add_argument("--schema", action="store_true", default=False, help="Print JSON schema for this command and exit.")
    _add_compat_output_arg(p)
    _cert_approve_parser = p
    return p


def _ensure_parsers_initialized() -> None:
    """Ensure module-level parser references are populated.

    The parsers are normally registered when the CLI entry point calls
    ``def_cert_cli_parser``.  In contexts where that has not happened (unit
    tests, ``--schema`` invoked standalone) this function performs a one-time
    initialization using a throwaway top-level parser so that the module-level
    ``_cert_*_parser`` references are populated.
    """
    global _cert_init_parser, _cert_request_parser, _cert_approve_parser
    if _cert_init_parser is None or _cert_request_parser is None or _cert_approve_parser is None:
        import argparse as _argparse

        _dummy = _argparse.ArgumentParser()
        _sub = _dummy.add_subparsers()
        _cert_sub = _sub.add_parser("cert").add_subparsers(metavar="{init,request,approve}")
        _def_cert_init_parser(_cert_sub)
        _def_cert_request_parser(_cert_sub)
        _def_cert_approve_parser(_cert_sub)


[docs] def def_cert_cli_parser(sub_cmd) -> dict: """Register 'nvflare cert' and its subcommands with the top-level sub_cmd parser.""" global _cert_parser _cert_parser = sub_cmd.add_parser( "cert", help="Certificate management for distributed provisioning.", description="Manage certificates for FLARE distributed provisioning.", ) cert_sub = _cert_parser.add_subparsers(dest="cert_sub_command", metavar="{init,request,approve}") _def_cert_init_parser(cert_sub) _def_cert_request_parser(cert_sub) _def_cert_approve_parser(cert_sub) return {"cert": _cert_parser}
[docs] def handle_cert_cmd(args): """Dispatch to the appropriate cert subcommand handler.""" from nvflare.tool.cert.cert_commands import handle_cert_approve, handle_cert_init, handle_cert_request from nvflare.tool.cli_output import output_usage_error, set_output_format compat_output_format = getattr(args, "compat_output_format", None) if compat_output_format: set_output_format(compat_output_format) dispatch = { "init": handle_cert_init, "request": handle_cert_request, "approve": handle_cert_approve, } handler = dispatch.get(getattr(args, "cert_sub_command", None)) if not handler: detail = ( "cert subcommand required" if getattr(args, "cert_sub_command", None) is None else "invalid cert subcommand" ) output_usage_error(_cert_parser, detail, exit_code=4) return 1 return handler(args)