# Copyright (c) 2026, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Stream contract for all nvflare CLI command handlers
=====================================================
Current default (human-first):
stdout — human-readable output (tables, summaries, prompts).
stderr — errors and diagnostics.
JSON output mode (--format json):
stdout — exactly one JSON envelope per command invocation:
{"schema_version": "1", "status": "ok"|"error", "data": {...}}
stderr — all human-readable output: progress, warnings, prompts, diagnostics.
JSON Lines output mode (--format jsonl):
stdout — newline-delimited JSON events for streaming commands.
stderr — all human-readable output: progress, warnings, prompts, diagnostics.
Exceptions (plain text, outside the JSON contract):
--help / -h argparse usage text; agents use --schema instead
--version top-level utility path, not command output
argparse errors unrecognised-argument messages generated by argparse
"""
import json
import logging
import re
import sys
from typing import Any, Optional
from nvflare.tool.cli_contract import SCHEMA_VERSION
logger = logging.getLogger(__name__)
_REDACTED = "<redacted>"
_SENSITIVE_KEY_PARTS = {
"password",
"passwd",
"passphrase",
}
_SENSITIVE_KEY_NAMES = {
"access_key",
"access_token",
"api_key",
"apikey",
"auth_token",
"bearer_token",
"client_secret",
"credential",
"credentials",
"id_token",
"private_key",
"privatekey",
"refresh_token",
"secret_key",
"session_token",
}
_SENSITIVE_ASSIGNMENT_PATTERN = re.compile(
r"(?i)\b("
r"password|passwd|passphrase|credential|credentials|"
r"access[-_ ]?token|refresh[-_ ]?token|id[-_ ]?token|session[-_ ]?token|auth[-_ ]?token|"
r"api[-_ ]?key|private[-_ ]?key|secret[-_ ]?key|client[-_ ]?secret"
r")(\s*[:=]\s*)([\"']?)([^\"'\s,;]+)([\"']?)"
)
_BEARER_TOKEN_PATTERN = re.compile(r"(?i)\b(authorization\s*:\s*bearer\s+)([A-Za-z0-9._~+/=-]+)")
_AUTH_VALUE_PATTERN = re.compile(r"(?i)\b(authorization\s*[:=]\s*)(?!bearer\s+)([\"']?)([^\"'\s,;]+)([\"']?)")
_PEM_PRIVATE_KEY_PATTERN = re.compile(
r"-----BEGIN [A-Z ]*PRIVATE KEY-----.*?-----END [A-Z ]*PRIVATE KEY-----",
re.DOTALL,
)
_URL_PASSWORD_PATTERN = re.compile(r"\b([a-zA-Z][a-zA-Z0-9+.-]*://[^/\s:@]+:)([^@\s/]+)(@)")
# Module-level CLI state. This process is single-command/single-process, so a pair of globals is
# sufficient here, but they are intentionally process-global and not thread-safe.
# Possible values: "txt" (default, human-readable), "json", or "jsonl".
_output_format: str = "txt"
_connect_timeout: float = 5.0
_VALID_OUTPUT_STATUS = {"ok", "error"}
[docs]
def set_connect_timeout(value: float) -> None:
"""Set CLI connection timeout (seconds)."""
global _connect_timeout
try:
_connect_timeout = float(value)
except (TypeError, ValueError):
logger.warning("invalid CLI connection timeout; using default 5.0 seconds")
_connect_timeout = 5.0
[docs]
def get_connect_timeout() -> float:
return _connect_timeout
def _is_json_mode() -> bool:
return _output_format == "json"
def _is_jsonl_mode() -> bool:
return _output_format == "jsonl"
def _is_machine_mode() -> bool:
return _output_format in {"json", "jsonl"}
[docs]
def is_json_mode() -> bool:
"""Public helper for checking JSON mode without exposing internals."""
return _is_json_mode()
[docs]
def is_jsonl_mode() -> bool:
"""Public helper for checking JSON Lines mode without exposing internals."""
return _is_jsonl_mode()
def _human_stream():
return sys.stderr if _is_machine_mode() else sys.stdout
def _normalize_key(key: Any) -> str:
return re.sub(r"[^a-z0-9]+", "_", str(key).lower()).strip("_")
def _is_sensitive_key(key: Any) -> bool:
normalized = _normalize_key(key)
if normalized in _SENSITIVE_KEY_NAMES:
return True
key_parts = set(normalized.split("_"))
return bool(key_parts & _SENSITIVE_KEY_PARTS)
def _redact_sensitive_text(text: str) -> str:
redacted = _PEM_PRIVATE_KEY_PATTERN.sub(_REDACTED, text)
redacted = _BEARER_TOKEN_PATTERN.sub(r"\1" + _REDACTED, redacted)
redacted = _AUTH_VALUE_PATTERN.sub(r"\1\2" + _REDACTED + r"\4", redacted)
redacted = _URL_PASSWORD_PATTERN.sub(r"\1" + _REDACTED + r"\3", redacted)
return _SENSITIVE_ASSIGNMENT_PATTERN.sub(r"\1\2\3" + _REDACTED + r"\5", redacted)
def _sanitize_for_cli_output(value: Any, key: Any = None) -> Any:
if key is not None and _is_sensitive_key(key):
return _REDACTED
if isinstance(value, dict):
return {k: _sanitize_for_cli_output(v, k) for k, v in value.items()}
if isinstance(value, list):
return [_sanitize_for_cli_output(v) for v in value]
if isinstance(value, tuple):
return tuple(_sanitize_for_cli_output(v) for v in value)
if isinstance(value, str):
return _redact_sensitive_text(value)
return value
def _render_table(data: Any) -> None:
safe_table_data = _sanitize_for_cli_output(data)
if isinstance(safe_table_data, dict):
for table_key, safe_table_value in safe_table_data.items():
print(f"{table_key}: {safe_table_value}")
elif isinstance(safe_table_data, list):
if not safe_table_data:
return
if isinstance(safe_table_data[0], dict):
keys = list(safe_table_data[0].keys())
widths = [max(len(k), max(len(str(r.get(k, ""))) for r in safe_table_data)) for k in keys]
header = " ".join(k.ljust(w) for k, w in zip(keys, widths))
print(header)
print("-" * len(header))
for safe_table_row in safe_table_data:
print(" ".join(str(safe_table_row.get(k, "")).ljust(w) for k, w in zip(keys, widths)))
else:
for safe_table_item in safe_table_data:
print(safe_table_item)
else:
print(str(safe_table_data))
[docs]
def output(data: Any, fmt: Optional[str]) -> None:
"""Legacy output helper used by older cert/package command paths."""
safe_output_data = _sanitize_for_cli_output(data)
if fmt is None and _is_json_mode():
fmt = "json"
if fmt == "json":
safe_output_payload = {
"schema_version": SCHEMA_VERSION,
"status": "ok",
"exit_code": 0,
"data": safe_output_data,
}
print(json.dumps(safe_output_payload))
elif fmt == "quiet":
if isinstance(safe_output_data, dict):
safe_quiet_value = next(iter(safe_output_data.values()), "")
print(safe_quiet_value)
elif isinstance(safe_output_data, list):
safe_quiet_value = safe_output_data[0] if safe_output_data else ""
print(safe_quiet_value)
else:
print(str(safe_output_data))
else:
_render_table(safe_output_data)
[docs]
def output_ok(data: Any, exit_code: int = 0) -> None:
"""Print command success output."""
safe_output_data = _sanitize_for_cli_output(data)
if _is_jsonl_mode():
output_jsonl_event(
{
"event": "terminal",
"status": "ok",
"exit_code": exit_code,
"data": safe_output_data,
"terminal": True,
}
)
elif _is_json_mode():
print(
json.dumps(
{
"schema_version": SCHEMA_VERSION,
"status": "ok",
"exit_code": exit_code,
"data": safe_output_data,
}
)
)
else:
_render_table(safe_output_data)
if exit_code != 0:
sys.exit(exit_code)
[docs]
def output_error(
error_code: str,
exit_code: int = 1,
hint: str = None,
data: Any = None,
detail: str = None,
**kwargs,
) -> None:
"""Print an error from ERROR_REGISTRY and exit. Never returns."""
from nvflare.tool.cli_errors import get_error_entry
entry = get_error_entry(error_code) or {"message": error_code, "hint": ""}
try:
message = entry["message"].format_map(kwargs) if kwargs else entry["message"]
except KeyError:
logger.warning("Missing format key for error %s", error_code)
message = entry["message"]
if detail:
message = f"{message} \u2014 {detail}"
resolved_hint = hint if hint is not None else entry["hint"]
safe_error_message = _sanitize_for_cli_output(message)
safe_error_hint = _sanitize_for_cli_output(resolved_hint)
safe_error_data = _sanitize_for_cli_output(data)
if _is_machine_mode():
safe_error_payload = {
"schema_version": SCHEMA_VERSION,
"status": "error",
"exit_code": exit_code,
"error_code": error_code,
"message": safe_error_message,
"hint": safe_error_hint,
}
if safe_error_data is not None:
safe_error_payload["data"] = safe_error_data
if _is_jsonl_mode():
safe_error_payload["event"] = "terminal"
safe_error_payload["terminal"] = True
print(json.dumps(safe_error_payload), flush=True)
else:
print(json.dumps(safe_error_payload))
else:
if safe_error_data is not None:
_render_table(safe_error_data)
print(safe_error_message, file=sys.stderr)
if safe_error_hint:
print(f"Hint: {safe_error_hint}", file=sys.stderr)
print(f"Code: {error_code} (exit {exit_code})", file=sys.stderr)
sys.exit(exit_code)
[docs]
def output_jsonl_event(event: Any) -> None:
"""Print one JSONL event for streaming command output."""
if not isinstance(event, dict):
event = {"event": event}
safe_event = _sanitize_for_cli_output(event)
safe_jsonl_payload = {"schema_version": SCHEMA_VERSION}
safe_jsonl_payload.update(safe_event)
print(json.dumps(safe_jsonl_payload), flush=True)
[docs]
def output_error_message(
error_code: str,
message: str,
hint: str = None,
fmt: Optional[str] = None,
exit_code: int = 1,
detail: str = None,
) -> None:
"""Print an explicit error message/hint pair and exit. Never returns."""
resolved_hint = hint or ""
if detail:
message = f"{message} \u2014 {detail}"
safe_error_message = _sanitize_for_cli_output(message)
safe_error_hint = _sanitize_for_cli_output(resolved_hint)
jsonl_mode = fmt == "jsonl" or (fmt is None and _is_jsonl_mode())
if fmt in {"json", "jsonl"} or (fmt is None and _is_machine_mode()):
safe_error_payload = {
"schema_version": SCHEMA_VERSION,
"status": "error",
"exit_code": exit_code,
"error_code": error_code,
"message": safe_error_message,
"hint": safe_error_hint,
}
if jsonl_mode:
safe_error_payload["event"] = "terminal"
safe_error_payload["terminal"] = True
print(json.dumps(safe_error_payload), flush=True)
else:
print(json.dumps(safe_error_payload))
else:
print(safe_error_message, file=sys.stderr)
if safe_error_hint:
print(f"Hint: {safe_error_hint}", file=sys.stderr)
print(f"Code: {error_code} (exit {exit_code})", file=sys.stderr)
sys.exit(exit_code)
[docs]
def output_usage_error(
parser,
detail: str,
exit_code: int = 4,
error_code: str = "INVALID_ARGS",
message: str = "Invalid arguments.",
hint: str = "Run with -h for usage.",
) -> None:
"""Print usage/help followed by a structured usage error and exit."""
if not _is_machine_mode() and parser is not None:
parser.print_help(sys.stderr)
print(file=sys.stderr)
output_error_message(error_code, message, hint, None, exit_code=exit_code, detail=detail)
[docs]
def print_human(*args, **kwargs):
"""Print any human-readable text (progress, warnings, tables, diagnostics).
Drop-in replacement for print() in CLI command handlers.
Keeps stdout clean for the JSON envelope in JSON output mode.
Usage: print_human("Starting shutdown of NVFLARE")
"""
kwargs.setdefault("file", _human_stream())
safe_args = tuple(_sanitize_for_cli_output(arg) for arg in args)
print(*safe_args, **kwargs)
[docs]
def prompt_yn(question: str, default_no: bool = True) -> bool:
"""Write a Y/N prompt to stderr (json mode) or stdout (human mode) and read the answer from stdin.
Returns True if the user answered Y/y, False otherwise.
Writes the prompt to stderr in json mode so that stdout contains only JSON.
Callers must check sys.stdin.isatty() and handle --force before calling.
Usage:
if not cmd_args.force:
if not sys.stdin.isatty():
output_error("INVALID_ARGS", exit_code=4, detail="use --force in non-interactive mode")
return
if not prompt_yn(f"Delete job '{job_id}'?"):
print_human("Cancelled.")
return
"""
suffix = " [y/N] " if default_no else " [Y/n] "
stream = _human_stream()
safe_question = _sanitize_for_cli_output(question)
stream.write(safe_question + suffix)
stream.flush()
answer = sys.stdin.readline().strip().upper()
return answer == "Y"