Source code for nvflare.tool.cli_output

# Copyright (c) 2026, NVIDIA CORPORATION.  All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
Stream contract for all nvflare CLI command handlers
=====================================================

Current default (human-first):
    stdout — human-readable output (tables, summaries, prompts).
    stderr — errors and diagnostics.

JSON output mode (--format json):
    stdout — exactly one JSON envelope per command invocation:
        {"schema_version": "1", "status": "ok"|"error", "data": {...}}
    stderr — all human-readable output: progress, warnings, prompts, diagnostics.

JSON Lines output mode (--format jsonl):
    stdout — newline-delimited JSON events for streaming commands.
    stderr — all human-readable output: progress, warnings, prompts, diagnostics.

Exceptions (plain text, outside the JSON contract):
    --help / -h         argparse usage text; agents use --schema instead
    --version           top-level utility path, not command output
    argparse errors     unrecognised-argument messages generated by argparse
"""

import json
import logging
import re
import sys
from typing import Any, Optional

from nvflare.tool.cli_contract import SCHEMA_VERSION

logger = logging.getLogger(__name__)

_REDACTED = "<redacted>"
_SENSITIVE_KEY_PARTS = {
    "password",
    "passwd",
    "passphrase",
}
_SENSITIVE_KEY_NAMES = {
    "access_key",
    "access_token",
    "api_key",
    "apikey",
    "auth_token",
    "bearer_token",
    "client_secret",
    "credential",
    "credentials",
    "id_token",
    "private_key",
    "privatekey",
    "refresh_token",
    "secret_key",
    "session_token",
}
_SENSITIVE_ASSIGNMENT_PATTERN = re.compile(
    r"(?i)\b("
    r"password|passwd|passphrase|credential|credentials|"
    r"access[-_ ]?token|refresh[-_ ]?token|id[-_ ]?token|session[-_ ]?token|auth[-_ ]?token|"
    r"api[-_ ]?key|private[-_ ]?key|secret[-_ ]?key|client[-_ ]?secret"
    r")(\s*[:=]\s*)([\"']?)([^\"'\s,;]+)([\"']?)"
)
_BEARER_TOKEN_PATTERN = re.compile(r"(?i)\b(authorization\s*:\s*bearer\s+)([A-Za-z0-9._~+/=-]+)")
_AUTH_VALUE_PATTERN = re.compile(r"(?i)\b(authorization\s*[:=]\s*)(?!bearer\s+)([\"']?)([^\"'\s,;]+)([\"']?)")
_PEM_PRIVATE_KEY_PATTERN = re.compile(
    r"-----BEGIN [A-Z ]*PRIVATE KEY-----.*?-----END [A-Z ]*PRIVATE KEY-----",
    re.DOTALL,
)
_URL_PASSWORD_PATTERN = re.compile(r"\b([a-zA-Z][a-zA-Z0-9+.-]*://[^/\s:@]+:)([^@\s/]+)(@)")

# Module-level CLI state. This process is single-command/single-process, so a pair of globals is
# sufficient here, but they are intentionally process-global and not thread-safe.
# Possible values: "txt" (default, human-readable), "json", or "jsonl".
_output_format: str = "txt"
_connect_timeout: float = 5.0

_VALID_OUTPUT_STATUS = {"ok", "error"}


[docs] def set_output_format(fmt: str) -> None: """Set the output format for all cli_output functions. Called once by cli.py after --format is parsed. Args: fmt: "txt" (default) for human-readable output to stdout/stderr; "json" for a single machine-readable JSON envelope on stdout. "human" is accepted as a backward-compatible alias for "txt". """ global _output_format normalized = fmt.lower() if fmt else "txt" _output_format = "txt" if normalized == "human" else normalized
[docs] def set_connect_timeout(value: float) -> None: """Set CLI connection timeout (seconds).""" global _connect_timeout try: _connect_timeout = float(value) except (TypeError, ValueError): logger.warning("invalid CLI connection timeout; using default 5.0 seconds") _connect_timeout = 5.0
[docs] def get_connect_timeout() -> float: return _connect_timeout
def _is_json_mode() -> bool: return _output_format == "json" def _is_jsonl_mode() -> bool: return _output_format == "jsonl" def _is_machine_mode() -> bool: return _output_format in {"json", "jsonl"}
[docs] def is_json_mode() -> bool: """Public helper for checking JSON mode without exposing internals.""" return _is_json_mode()
[docs] def is_jsonl_mode() -> bool: """Public helper for checking JSON Lines mode without exposing internals.""" return _is_jsonl_mode()
def _human_stream(): return sys.stderr if _is_machine_mode() else sys.stdout def _normalize_key(key: Any) -> str: return re.sub(r"[^a-z0-9]+", "_", str(key).lower()).strip("_") def _is_sensitive_key(key: Any) -> bool: normalized = _normalize_key(key) if normalized in _SENSITIVE_KEY_NAMES: return True key_parts = set(normalized.split("_")) return bool(key_parts & _SENSITIVE_KEY_PARTS) def _redact_sensitive_text(text: str) -> str: redacted = _PEM_PRIVATE_KEY_PATTERN.sub(_REDACTED, text) redacted = _BEARER_TOKEN_PATTERN.sub(r"\1" + _REDACTED, redacted) redacted = _AUTH_VALUE_PATTERN.sub(r"\1\2" + _REDACTED + r"\4", redacted) redacted = _URL_PASSWORD_PATTERN.sub(r"\1" + _REDACTED + r"\3", redacted) return _SENSITIVE_ASSIGNMENT_PATTERN.sub(r"\1\2\3" + _REDACTED + r"\5", redacted) def _sanitize_for_cli_output(value: Any, key: Any = None) -> Any: if key is not None and _is_sensitive_key(key): return _REDACTED if isinstance(value, dict): return {k: _sanitize_for_cli_output(v, k) for k, v in value.items()} if isinstance(value, list): return [_sanitize_for_cli_output(v) for v in value] if isinstance(value, tuple): return tuple(_sanitize_for_cli_output(v) for v in value) if isinstance(value, str): return _redact_sensitive_text(value) return value def _render_table(data: Any) -> None: safe_table_data = _sanitize_for_cli_output(data) if isinstance(safe_table_data, dict): for table_key, safe_table_value in safe_table_data.items(): print(f"{table_key}: {safe_table_value}") elif isinstance(safe_table_data, list): if not safe_table_data: return if isinstance(safe_table_data[0], dict): keys = list(safe_table_data[0].keys()) widths = [max(len(k), max(len(str(r.get(k, ""))) for r in safe_table_data)) for k in keys] header = " ".join(k.ljust(w) for k, w in zip(keys, widths)) print(header) print("-" * len(header)) for safe_table_row in safe_table_data: print(" ".join(str(safe_table_row.get(k, "")).ljust(w) for k, w in zip(keys, widths))) else: for safe_table_item in safe_table_data: print(safe_table_item) else: print(str(safe_table_data))
[docs] def output(data: Any, fmt: Optional[str]) -> None: """Legacy output helper used by older cert/package command paths.""" safe_output_data = _sanitize_for_cli_output(data) if fmt is None and _is_json_mode(): fmt = "json" if fmt == "json": safe_output_payload = { "schema_version": SCHEMA_VERSION, "status": "ok", "exit_code": 0, "data": safe_output_data, } print(json.dumps(safe_output_payload)) elif fmt == "quiet": if isinstance(safe_output_data, dict): safe_quiet_value = next(iter(safe_output_data.values()), "") print(safe_quiet_value) elif isinstance(safe_output_data, list): safe_quiet_value = safe_output_data[0] if safe_output_data else "" print(safe_quiet_value) else: print(str(safe_output_data)) else: _render_table(safe_output_data)
[docs] def output_ok(data: Any, exit_code: int = 0) -> None: """Print command success output.""" safe_output_data = _sanitize_for_cli_output(data) if _is_jsonl_mode(): output_jsonl_event( { "event": "terminal", "status": "ok", "exit_code": exit_code, "data": safe_output_data, "terminal": True, } ) elif _is_json_mode(): print( json.dumps( { "schema_version": SCHEMA_VERSION, "status": "ok", "exit_code": exit_code, "data": safe_output_data, } ) ) else: _render_table(safe_output_data) if exit_code != 0: sys.exit(exit_code)
[docs] def output_error( error_code: str, exit_code: int = 1, hint: str = None, data: Any = None, detail: str = None, **kwargs, ) -> None: """Print an error from ERROR_REGISTRY and exit. Never returns.""" from nvflare.tool.cli_errors import get_error_entry entry = get_error_entry(error_code) or {"message": error_code, "hint": ""} try: message = entry["message"].format_map(kwargs) if kwargs else entry["message"] except KeyError: logger.warning("Missing format key for error %s", error_code) message = entry["message"] if detail: message = f"{message} \u2014 {detail}" resolved_hint = hint if hint is not None else entry["hint"] safe_error_message = _sanitize_for_cli_output(message) safe_error_hint = _sanitize_for_cli_output(resolved_hint) safe_error_data = _sanitize_for_cli_output(data) if _is_machine_mode(): safe_error_payload = { "schema_version": SCHEMA_VERSION, "status": "error", "exit_code": exit_code, "error_code": error_code, "message": safe_error_message, "hint": safe_error_hint, } if safe_error_data is not None: safe_error_payload["data"] = safe_error_data if _is_jsonl_mode(): safe_error_payload["event"] = "terminal" safe_error_payload["terminal"] = True print(json.dumps(safe_error_payload), flush=True) else: print(json.dumps(safe_error_payload)) else: if safe_error_data is not None: _render_table(safe_error_data) print(safe_error_message, file=sys.stderr) if safe_error_hint: print(f"Hint: {safe_error_hint}", file=sys.stderr) print(f"Code: {error_code} (exit {exit_code})", file=sys.stderr) sys.exit(exit_code)
[docs] def output_jsonl_event(event: Any) -> None: """Print one JSONL event for streaming command output.""" if not isinstance(event, dict): event = {"event": event} safe_event = _sanitize_for_cli_output(event) safe_jsonl_payload = {"schema_version": SCHEMA_VERSION} safe_jsonl_payload.update(safe_event) print(json.dumps(safe_jsonl_payload), flush=True)
[docs] def output_error_message( error_code: str, message: str, hint: str = None, fmt: Optional[str] = None, exit_code: int = 1, detail: str = None, ) -> None: """Print an explicit error message/hint pair and exit. Never returns.""" resolved_hint = hint or "" if detail: message = f"{message} \u2014 {detail}" safe_error_message = _sanitize_for_cli_output(message) safe_error_hint = _sanitize_for_cli_output(resolved_hint) jsonl_mode = fmt == "jsonl" or (fmt is None and _is_jsonl_mode()) if fmt in {"json", "jsonl"} or (fmt is None and _is_machine_mode()): safe_error_payload = { "schema_version": SCHEMA_VERSION, "status": "error", "exit_code": exit_code, "error_code": error_code, "message": safe_error_message, "hint": safe_error_hint, } if jsonl_mode: safe_error_payload["event"] = "terminal" safe_error_payload["terminal"] = True print(json.dumps(safe_error_payload), flush=True) else: print(json.dumps(safe_error_payload)) else: print(safe_error_message, file=sys.stderr) if safe_error_hint: print(f"Hint: {safe_error_hint}", file=sys.stderr) print(f"Code: {error_code} (exit {exit_code})", file=sys.stderr) sys.exit(exit_code)
[docs] def output_usage_error( parser, detail: str, exit_code: int = 4, error_code: str = "INVALID_ARGS", message: str = "Invalid arguments.", hint: str = "Run with -h for usage.", ) -> None: """Print usage/help followed by a structured usage error and exit.""" if not _is_machine_mode() and parser is not None: parser.print_help(sys.stderr) print(file=sys.stderr) output_error_message(error_code, message, hint, None, exit_code=exit_code, detail=detail)
[docs] def prompt_yn(question: str, default_no: bool = True) -> bool: """Write a Y/N prompt to stderr (json mode) or stdout (human mode) and read the answer from stdin. Returns True if the user answered Y/y, False otherwise. Writes the prompt to stderr in json mode so that stdout contains only JSON. Callers must check sys.stdin.isatty() and handle --force before calling. Usage: if not cmd_args.force: if not sys.stdin.isatty(): output_error("INVALID_ARGS", exit_code=4, detail="use --force in non-interactive mode") return if not prompt_yn(f"Delete job '{job_id}'?"): print_human("Cancelled.") return """ suffix = " [y/N] " if default_no else " [Y/n] " stream = _human_stream() safe_question = _sanitize_for_cli_output(question) stream.write(safe_question + suffix) stream.flush() answer = sys.stdin.readline().strip().upper() return answer == "Y"