# Copyright (c) 2026, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Stream contract for all nvflare CLI command handlers
=====================================================
Current default (human-first):
stdout — human-readable output (tables, summaries, prompts).
stderr — errors and diagnostics.
JSON output mode (--format json):
stdout — exactly one JSON envelope per command invocation:
{"schema_version": "1", "status": "ok"|"error", "data": {...}}
stderr — all human-readable output: progress, warnings, prompts, diagnostics.
JSON Lines output mode (--format jsonl):
stdout — newline-delimited JSON events for streaming commands.
stderr — all human-readable output: progress, warnings, prompts, diagnostics.
Exceptions (plain text, outside the JSON contract):
--help / -h argparse usage text; agents use --schema instead
--version top-level utility path, not command output
argparse errors unrecognised-argument messages generated by argparse
"""
import json
import logging
import sys
from typing import Any, Optional
from nvflare.tool.cli_contract import SCHEMA_VERSION
logger = logging.getLogger(__name__)
# Module-level CLI state. This process is single-command/single-process, so a pair of globals is
# sufficient here, but they are intentionally process-global and not thread-safe.
# Possible values: "txt" (default, human-readable), "json", or "jsonl".
_output_format: str = "txt"
_connect_timeout: float = 5.0
_VALID_OUTPUT_STATUS = {"ok", "error"}
[docs]
def set_connect_timeout(value: float) -> None:
"""Set CLI connection timeout (seconds)."""
global _connect_timeout
try:
_connect_timeout = float(value)
except (TypeError, ValueError):
logger.warning("invalid CLI connection timeout %r; using default 5.0 seconds", value)
_connect_timeout = 5.0
[docs]
def get_connect_timeout() -> float:
return _connect_timeout
def _is_json_mode() -> bool:
return _output_format == "json"
def _is_jsonl_mode() -> bool:
return _output_format == "jsonl"
def _is_machine_mode() -> bool:
return _output_format in {"json", "jsonl"}
[docs]
def is_json_mode() -> bool:
"""Public helper for checking JSON mode without exposing internals."""
return _is_json_mode()
[docs]
def is_jsonl_mode() -> bool:
"""Public helper for checking JSON Lines mode without exposing internals."""
return _is_jsonl_mode()
def _human_stream():
return sys.stderr if _is_machine_mode() else sys.stdout
def _render_table(data: Any) -> None:
if isinstance(data, dict):
for k, v in data.items():
print(f"{k}: {v}")
elif isinstance(data, list):
if not data:
return
if isinstance(data[0], dict):
keys = list(data[0].keys())
widths = [max(len(k), max(len(str(r.get(k, ""))) for r in data)) for k in keys]
header = " ".join(k.ljust(w) for k, w in zip(keys, widths))
print(header)
print("-" * len(header))
for row in data:
print(" ".join(str(row.get(k, "")).ljust(w) for k, w in zip(keys, widths)))
else:
for item in data:
print(item)
else:
print(str(data))
[docs]
def output(data: Any, fmt: Optional[str]) -> None:
"""Legacy output helper used by older cert/package command paths."""
if fmt is None and _is_json_mode():
fmt = "json"
if fmt == "json":
print(json.dumps({"schema_version": SCHEMA_VERSION, "status": "ok", "exit_code": 0, "data": data}))
elif fmt == "quiet":
if isinstance(data, dict):
print(next(iter(data.values()), ""))
elif isinstance(data, list):
print(data[0] if data else "")
else:
print(str(data))
else:
_render_table(data)
[docs]
def output_ok(data: Any, exit_code: int = 0) -> None:
"""Print command success output."""
if _is_jsonl_mode():
output_jsonl_event(
{"event": "terminal", "status": "ok", "exit_code": exit_code, "data": data, "terminal": True}
)
elif _is_json_mode():
print(json.dumps({"schema_version": SCHEMA_VERSION, "status": "ok", "exit_code": exit_code, "data": data}))
else:
_render_table(data)
if exit_code != 0:
sys.exit(exit_code)
[docs]
def output_error(
error_code: str,
exit_code: int = 1,
hint: str = None,
data: Any = None,
detail: str = None,
**kwargs,
) -> None:
"""Print an error from ERROR_REGISTRY and exit. Never returns."""
from nvflare.tool.cli_errors import get_error_entry
entry = get_error_entry(error_code) or {"message": error_code, "hint": ""}
try:
message = entry["message"].format_map(kwargs) if kwargs else entry["message"]
except KeyError:
logger.warning("Missing format key for error %s: %s", error_code, entry["message"])
message = entry["message"]
if detail:
message = f"{message} \u2014 {detail}"
resolved_hint = hint if hint is not None else entry["hint"]
if _is_machine_mode():
payload = {
"schema_version": SCHEMA_VERSION,
"status": "error",
"exit_code": exit_code,
"error_code": error_code,
"message": message,
"hint": resolved_hint,
}
if data is not None:
payload["data"] = data
if _is_jsonl_mode():
payload["event"] = "terminal"
payload["terminal"] = True
print(json.dumps(payload), flush=True)
else:
print(json.dumps(payload))
else:
if data is not None:
_render_table(data)
print(message, file=sys.stderr)
if resolved_hint:
print(f"Hint: {resolved_hint}", file=sys.stderr)
print(f"Code: {error_code} (exit {exit_code})", file=sys.stderr)
sys.exit(exit_code)
[docs]
def output_jsonl_event(event: Any) -> None:
"""Print one JSONL event for streaming command output."""
if not isinstance(event, dict):
event = {"event": event}
payload = {"schema_version": SCHEMA_VERSION}
payload.update(event)
print(json.dumps(payload), flush=True)
[docs]
def output_error_message(
error_code: str,
message: str,
hint: str = None,
fmt: Optional[str] = None,
exit_code: int = 1,
detail: str = None,
) -> None:
"""Print an explicit error message/hint pair and exit. Never returns."""
resolved_hint = hint or ""
if detail:
message = f"{message} \u2014 {detail}"
jsonl_mode = fmt == "jsonl" or (fmt is None and _is_jsonl_mode())
if fmt in {"json", "jsonl"} or (fmt is None and _is_machine_mode()):
payload = {
"schema_version": SCHEMA_VERSION,
"status": "error",
"exit_code": exit_code,
"error_code": error_code,
"message": message,
"hint": resolved_hint,
}
if jsonl_mode:
payload["event"] = "terminal"
payload["terminal"] = True
print(json.dumps(payload), flush=True)
else:
print(json.dumps(payload))
else:
print(message, file=sys.stderr)
if resolved_hint:
print(f"Hint: {resolved_hint}", file=sys.stderr)
print(f"Code: {error_code} (exit {exit_code})", file=sys.stderr)
sys.exit(exit_code)
[docs]
def output_usage_error(
parser,
detail: str,
exit_code: int = 4,
error_code: str = "INVALID_ARGS",
message: str = "Invalid arguments.",
hint: str = "Run with -h for usage.",
) -> None:
"""Print usage/help followed by a structured usage error and exit."""
if not _is_machine_mode() and parser is not None:
parser.print_help(sys.stderr)
print(file=sys.stderr)
output_error_message(error_code, message, hint, None, exit_code=exit_code, detail=detail)
[docs]
def print_human(*args, **kwargs):
"""Print any human-readable text (progress, warnings, tables, diagnostics).
Drop-in replacement for print() in CLI command handlers.
Keeps stdout clean for the JSON envelope in JSON output mode.
Usage: print_human("Starting shutdown of NVFLARE")
"""
kwargs.setdefault("file", _human_stream())
print(*args, **kwargs)
[docs]
def prompt_yn(question: str, default_no: bool = True) -> bool:
"""Write a Y/N prompt to stderr (json mode) or stdout (human mode) and read the answer from stdin.
Returns True if the user answered Y/y, False otherwise.
Writes the prompt to stderr in json mode so that stdout contains only JSON.
Callers must check sys.stdin.isatty() and handle --force before calling.
Usage:
if not cmd_args.force:
if not sys.stdin.isatty():
output_error("INVALID_ARGS", exit_code=4, detail="use --force in non-interactive mode")
return
if not prompt_yn(f"Delete job '{job_id}'?"):
print_human("Cancelled.")
return
"""
suffix = " [y/N] " if default_no else " [Y/n] "
stream = _human_stream()
stream.write(question + suffix)
stream.flush()
answer = sys.stdin.readline().strip().upper()
return answer == "Y"