# Copyright (c) 2026, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Stream contract for all nvflare CLI command handlers
=====================================================
Current default (human-first):
stdout — human-readable output (tables, summaries, prompts).
stderr — errors and diagnostics.
JSON output mode (--format json):
stdout — exactly one JSON envelope per command invocation:
{"schema_version": "1", "status": "ok"|"error", "data": {...}}
stderr — all human-readable output: progress, warnings, prompts, diagnostics.
JSON Lines output mode (--format jsonl):
stdout — newline-delimited JSON events for streaming commands.
stderr — all human-readable output: progress, warnings, prompts, diagnostics.
Exceptions (plain text, outside the JSON contract):
--help / -h argparse usage text; agents use --schema instead
--version top-level utility path, not command output
argparse errors pre-dispatch errors are emitted by nvflare.cli; in machine
modes they use a single JSON envelope without JSONL
terminal markers
"""
import json
import logging
import re
import sys
from typing import Any, Optional
from nvflare.tool.cli_contract import SCHEMA_VERSION
logger = logging.getLogger(__name__)
_REDACTED = "<redacted>"
_SENSITIVE_KEY_PARTS = {
"password",
"passwd",
"passphrase",
}
_SENSITIVE_KEY_NAMES = {
"access_key",
"access_token",
"api_key",
"apikey",
"auth_token",
"bearer_token",
"client_secret",
"credential",
"credentials",
"id_token",
"private_key",
"privatekey",
"refresh_token",
"secret_key",
"session_token",
}
_SENSITIVE_ASSIGNMENT_PATTERN = re.compile(
r"(?i)\b("
r"password|passwd|passphrase|credential|credentials|"
r"access[-_ ]?token|refresh[-_ ]?token|id[-_ ]?token|session[-_ ]?token|auth[-_ ]?token|"
r"api[-_ ]?key|private[-_ ]?key|secret[-_ ]?key|client[-_ ]?secret"
r")(\s*[:=]\s*)([\"']?)([^\"'\s,;]+)([\"']?)"
)
_SENSITIVE_CLI_OPTION_PATTERN = re.compile(
r"(?i)(--(?:"
r"password|passwd|passphrase|credential|credentials|"
r"access[-_]?token|refresh[-_]?token|id[-_]?token|session[-_]?token|auth[-_]?token|"
r"api[-_]?key|private[-_]?key|secret[-_]?key|client[-_]?secret"
r")(?:\s+|=))([\"']?)([^\"'\s,;]+)([\"']?)"
)
_BEARER_TOKEN_PATTERN = re.compile(r"(?i)\b(authorization\s*:\s*bearer\s+)([A-Za-z0-9._~+/=-]+)")
_AUTH_VALUE_PATTERN = re.compile(r"(?i)\b(authorization\s*[:=]\s*)(?!bearer\s+)([\"']?)([^\"'\s,;]+)([\"']?)")
_PEM_PRIVATE_KEY_PATTERN = re.compile(
r"-----BEGIN [A-Z ]*PRIVATE KEY-----.*?-----END [A-Z ]*PRIVATE KEY-----",
re.DOTALL,
)
_URL_PASSWORD_PATTERN = re.compile(r"\b([a-zA-Z][a-zA-Z0-9+.-]*://[^/\s:@]+:)([^@\s/]+)(@)")
# Module-level CLI state. This process is single-command/single-process, so a pair of globals is
# sufficient here, but they are intentionally process-global and not thread-safe.
# Possible values: "txt" (default, human-readable), "json", or "jsonl".
_output_format: str = "txt"
_connect_timeout: float = 5.0
_VALID_OUTPUT_STATUS = {"ok", "error"}
[docs]
def set_connect_timeout(value: float) -> None:
"""Set CLI connection timeout (seconds)."""
global _connect_timeout
try:
_connect_timeout = float(value)
except (TypeError, ValueError):
logger.warning("invalid CLI connection timeout; using default 5.0 seconds")
_connect_timeout = 5.0
[docs]
def get_connect_timeout() -> float:
return _connect_timeout
def _is_json_mode() -> bool:
return _output_format == "json"
def _is_jsonl_mode() -> bool:
return _output_format == "jsonl"
def _is_machine_mode() -> bool:
return _output_format in {"json", "jsonl"}
[docs]
def is_json_mode() -> bool:
"""Public helper for checking JSON mode without exposing internals."""
return _is_json_mode()
[docs]
def is_jsonl_mode() -> bool:
"""Public helper for checking JSON Lines mode without exposing internals."""
return _is_jsonl_mode()
def _human_stream():
return sys.stderr if _is_machine_mode() else sys.stdout
def _normalize_key(key: Any) -> str:
return re.sub(r"[^a-z0-9]+", "_", str(key).lower()).strip("_")
def _is_sensitive_key(key: Any) -> bool:
normalized = _normalize_key(key)
if normalized in _SENSITIVE_KEY_NAMES:
return True
key_parts = set(normalized.split("_"))
return bool(key_parts & _SENSITIVE_KEY_PARTS)
def _redact_sensitive_text(text: str) -> str:
redacted = _PEM_PRIVATE_KEY_PATTERN.sub(_REDACTED, text)
redacted = _BEARER_TOKEN_PATTERN.sub(r"\1" + _REDACTED, redacted)
redacted = _AUTH_VALUE_PATTERN.sub(r"\1\2" + _REDACTED + r"\4", redacted)
redacted = _URL_PASSWORD_PATTERN.sub(r"\1" + _REDACTED + r"\3", redacted)
redacted = _SENSITIVE_CLI_OPTION_PATTERN.sub(r"\1\2" + _REDACTED + r"\4", redacted)
return _SENSITIVE_ASSIGNMENT_PATTERN.sub(r"\1\2\3" + _REDACTED + r"\5", redacted)
def _sanitize_for_cli_output(value: Any, key: Any = None) -> Any:
if key is not None and _is_sensitive_key(key):
return _REDACTED
if isinstance(value, dict):
return {k: _sanitize_for_cli_output(v, k) for k, v in value.items()}
if isinstance(value, list):
return [_sanitize_for_cli_output(v) for v in value]
if isinstance(value, tuple):
return tuple(_sanitize_for_cli_output(v) for v in value)
if isinstance(value, str):
return _redact_sensitive_text(value)
return value
[docs]
def sanitize_cli_output(value: Any) -> Any:
"""Return a value with secret-like fields and inline tokens redacted for CLI output."""
return _sanitize_for_cli_output(value)
def _emit_json(value: Any, *, flush: bool = False) -> None:
safe_value = _sanitize_for_cli_output(value)
safe_text = json.dumps(safe_value)
# Safe: safe_text is serialized only after recursive CLI redaction above.
# codeql[py/clear-text-logging-sensitive-data]
# lgtm[py/clear-text-logging-sensitive-data]
sys.stdout.write(safe_text)
sys.stdout.write("\n")
if flush:
sys.stdout.flush()
def _render_table(data: Any) -> None:
safe_table_data = _sanitize_for_cli_output(data)
if isinstance(safe_table_data, dict):
for table_key, safe_table_value in safe_table_data.items():
print(f"{table_key}: {safe_table_value}")
elif isinstance(safe_table_data, list):
if not safe_table_data:
return
if isinstance(safe_table_data[0], dict):
keys = list(safe_table_data[0].keys())
widths = [max(len(k), max(len(str(r.get(k, ""))) for r in safe_table_data)) for k in keys]
header = " ".join(k.ljust(w) for k, w in zip(keys, widths))
print(header)
print("-" * len(header))
for safe_table_row in safe_table_data:
print(" ".join(str(safe_table_row.get(k, "")).ljust(w) for k, w in zip(keys, widths)))
else:
for safe_table_item in safe_table_data:
print(safe_table_item)
else:
print(str(safe_table_data))
[docs]
def output(data: Any, fmt: Optional[str]) -> None:
"""Legacy output helper used by older cert/package command paths."""
safe_output_data = _sanitize_for_cli_output(data)
if fmt is None and _is_json_mode():
fmt = "json"
if fmt == "json":
safe_output_payload = {
"schema_version": SCHEMA_VERSION,
"status": "ok",
"exit_code": 0,
"data": safe_output_data,
}
_emit_json(safe_output_payload)
elif fmt == "quiet":
if isinstance(safe_output_data, dict):
safe_quiet_value = next(iter(safe_output_data.values()), "")
print(safe_quiet_value)
elif isinstance(safe_output_data, list):
safe_quiet_value = safe_output_data[0] if safe_output_data else ""
print(safe_quiet_value)
else:
print(str(safe_output_data))
else:
_render_table(safe_output_data)
def _add_agent_envelope_fields(
payload: dict,
code: str = None,
message: str = None,
hint: str = None,
recovery_category: str = None,
suggested_skill: str = None,
) -> dict:
if code is not None:
payload["code"] = _sanitize_for_cli_output(code)
if message is not None:
payload["message"] = _sanitize_for_cli_output(message)
if hint is not None:
payload["hint"] = _sanitize_for_cli_output(hint)
if recovery_category is not None:
payload["recovery_category"] = _sanitize_for_cli_output(recovery_category)
if suggested_skill is not None:
payload["suggested_skill"] = _sanitize_for_cli_output(suggested_skill)
return payload
[docs]
def output_ok(
data: Any,
exit_code: int = 0,
code: str = None,
message: str = None,
hint: str = None,
recovery_category: str = None,
suggested_skill: str = None,
) -> None:
"""Print command success output.
code/message/hint/recovery fields are emitted in JSON/JSONL modes only;
human mode renders command data.
"""
safe_output_data = _sanitize_for_cli_output(data)
if _is_jsonl_mode():
payload = _add_agent_envelope_fields(
{"event": "terminal", "status": "ok", "exit_code": exit_code, "data": safe_output_data, "terminal": True},
code=code,
message=message,
hint=hint,
recovery_category=recovery_category,
suggested_skill=suggested_skill,
)
output_jsonl_event(payload)
elif _is_json_mode():
payload = _add_agent_envelope_fields(
{"schema_version": SCHEMA_VERSION, "status": "ok", "exit_code": exit_code, "data": safe_output_data},
code=code,
message=message,
hint=hint,
recovery_category=recovery_category,
suggested_skill=suggested_skill,
)
_emit_json(payload)
else:
_render_table(safe_output_data)
if exit_code != 0:
sys.exit(exit_code)
[docs]
def output_error(
error_code: str,
exit_code: int = 1,
hint: str = None,
data: Any = None,
detail: str = None,
recovery_category: str = None,
suggested_skill: str = None,
**kwargs,
) -> None:
"""Print an error from ERROR_REGISTRY and exit. Never returns."""
from nvflare.tool.cli_errors import get_error_entry
entry = get_error_entry(error_code) or {"message": error_code, "hint": ""}
try:
message = entry["message"].format_map(kwargs) if kwargs else entry["message"]
except KeyError:
logger.warning("Missing format key for error %s", error_code)
message = entry["message"]
if detail:
message = f"{message} \u2014 {detail}"
resolved_hint = hint if hint is not None else entry["hint"]
safe_error_message = _sanitize_for_cli_output(message)
safe_error_hint = _sanitize_for_cli_output(resolved_hint)
safe_error_data = _sanitize_for_cli_output(data)
if _is_machine_mode():
payload = {
"schema_version": SCHEMA_VERSION,
"status": "error",
"exit_code": exit_code,
"error_code": error_code,
"message": safe_error_message,
"hint": safe_error_hint,
}
_add_agent_envelope_fields(
payload,
recovery_category=recovery_category,
suggested_skill=suggested_skill,
)
if safe_error_data is not None:
payload["data"] = safe_error_data
if _is_jsonl_mode():
payload["event"] = "terminal"
payload["terminal"] = True
_emit_json(payload, flush=True)
else:
_emit_json(payload)
else:
if safe_error_data is not None:
_render_table(safe_error_data)
print(safe_error_message, file=sys.stderr)
if safe_error_hint:
print(f"Hint: {safe_error_hint}", file=sys.stderr)
print(f"Code: {error_code} (exit {exit_code})", file=sys.stderr)
sys.exit(exit_code)
[docs]
def output_jsonl_event(event: Any) -> None:
"""Print one JSONL event for streaming command output."""
if not isinstance(event, dict):
event = {"event": event}
safe_event = _sanitize_for_cli_output(event)
safe_jsonl_payload = {"schema_version": SCHEMA_VERSION}
safe_jsonl_payload.update(safe_event)
_emit_json(safe_jsonl_payload, flush=True)
[docs]
def output_error_message(
error_code: str,
message: str,
hint: str = None,
fmt: Optional[str] = None,
exit_code: int = 1,
detail: str = None,
data: Any = None,
include_data: bool = False,
recovery_category: str = None,
suggested_skill: str = None,
) -> None:
"""Print an explicit error message/hint pair and exit. Never returns.
In machine modes, data is omitted when data is None unless include_data=True.
This lets callers distinguish an absent data field from an explicit JSON null.
"""
resolved_hint = hint or ""
if detail:
message = f"{message} \u2014 {detail}"
safe_error_message = _sanitize_for_cli_output(message)
safe_error_hint = _sanitize_for_cli_output(resolved_hint)
safe_error_data = _sanitize_for_cli_output(data)
jsonl_mode = fmt == "jsonl" or (fmt is None and _is_jsonl_mode())
if fmt in {"json", "jsonl"} or (fmt is None and _is_machine_mode()):
payload = {
"schema_version": SCHEMA_VERSION,
"status": "error",
"exit_code": exit_code,
"error_code": error_code,
"message": safe_error_message,
"hint": safe_error_hint,
}
_add_agent_envelope_fields(
payload,
recovery_category=recovery_category,
suggested_skill=suggested_skill,
)
if include_data or data is not None:
payload["data"] = safe_error_data
if jsonl_mode:
payload["event"] = "terminal"
payload["terminal"] = True
_emit_json(payload, flush=True)
else:
_emit_json(payload)
else:
print(safe_error_message, file=sys.stderr)
if safe_error_hint:
print(f"Hint: {safe_error_hint}", file=sys.stderr)
print(f"Code: {error_code} (exit {exit_code})", file=sys.stderr)
sys.exit(exit_code)
[docs]
def output_usage_error(
parser,
detail: str,
exit_code: int = 4,
error_code: str = "INVALID_ARGS",
message: str = "Invalid arguments.",
hint: str = "Run with -h for usage.",
) -> None:
"""Print usage/help followed by a structured usage error and exit."""
if not _is_machine_mode() and parser is not None:
parser.print_help(sys.stderr)
print(file=sys.stderr)
output_error_message(error_code, message, hint, None, exit_code=exit_code, detail=detail)
[docs]
def print_human(*args, **kwargs):
"""Print any human-readable text (progress, warnings, tables, diagnostics).
Drop-in replacement for print() in CLI command handlers.
Keeps stdout clean for the JSON envelope in JSON output mode.
Usage: print_human("Starting shutdown of NVFLARE")
"""
kwargs.setdefault("file", _human_stream())
safe_args = tuple(_sanitize_for_cli_output(arg) for arg in args)
print(*safe_args, **kwargs)
[docs]
def prompt_yn(question: str, default_no: bool = True) -> bool:
"""Write a Y/N prompt to stderr (json mode) or stdout (human mode) and read the answer from stdin.
Returns True if the user answered Y/y, False otherwise.
Writes the prompt to stderr in json mode so that stdout contains only JSON.
Callers must check sys.stdin.isatty() and handle --force before calling.
Usage:
if not cmd_args.force:
if not sys.stdin.isatty():
output_error("INVALID_ARGS", exit_code=4, detail="use --force in non-interactive mode")
return
if not prompt_yn(f"Delete job '{job_id}'?"):
print_human("Cancelled.")
return
"""
suffix = " [y/N] " if default_no else " [Y/n] "
stream = _human_stream()
safe_question = _sanitize_for_cli_output(question)
stream.write(safe_question + suffix)
stream.flush()
answer = sys.stdin.readline().strip().upper()
return answer == "Y"