# Copyright (c) 2026, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Implementation for ``nvflare deploy`` subcommands."""
from __future__ import annotations
import base64
import hashlib
import json
import os
import re
import shlex
import shutil
import stat
import subprocess
import tempfile
from dataclasses import dataclass
from pathlib import Path, PurePosixPath
from typing import Any
import yaml
from cryptography import x509
from cryptography.hazmat.backends import default_backend
from cryptography.x509.oid import NameOID
from nvflare.tool.cli_output import output_error_message, output_ok, print_human
RUNTIME_DOCKER = "docker"
RUNTIME_K8S = "k8s"
ROLE_SERVER = "server"
ROLE_CLIENT = "client"
ROLE_ADMIN = "admin"
FED_SERVER_JSON = "fed_server.json"
FED_CLIENT_JSON = "fed_client.json"
FED_ADMIN_JSON = "fed_admin.json"
RESOURCES_JSON_DEFAULT = "resources.json.default"
RESOURCES_JSON = "resources.json"
COMM_CONFIG_JSON = "comm_config.json"
STUDY_DATA_YAML = "study_data.yaml"
HELM_CHART_DIR = "helm_chart"
START_SH = "start.sh"
SUB_START_SH = "sub_start.sh"
STOP_FL_SH = "stop_fl.sh"
LEGACY_DOCKER_SH = "docker.sh"
DOCKER_START_SH = "start_docker.sh"
WORKSPACE_MOUNT_PATH = "/var/tmp/nvflare/workspace"
WORKSPACE_VOLUME_NAME = "workspace"
K8S_PARENT_PYTHON_PATH = "/usr/local/bin/python3"
HELM_TEMPLATES_DIR = Path(__file__).resolve().parent / "templates" / "helm"
PASSTHROUGH_RESOURCE_MANAGER = (
"nvflare.app_common.resource_managers.passthrough_resource_manager.PassthroughResourceManager"
)
GPU_RESOURCE_MANAGER = "nvflare.app_common.resource_managers.gpu_resource_manager.GPUResourceManager"
DOCKER_CLIENT_LAUNCHER = "nvflare.app_opt.job_launcher.docker_launcher.ClientDockerJobLauncher"
DOCKER_SERVER_LAUNCHER = "nvflare.app_opt.job_launcher.docker_launcher.ServerDockerJobLauncher"
K8S_CLIENT_LAUNCHER = "nvflare.app_opt.job_launcher.k8s_launcher.ClientK8sJobLauncher"
K8S_SERVER_LAUNCHER = "nvflare.app_opt.job_launcher.k8s_launcher.ServerK8sJobLauncher"
PROCESS_CLIENT_LAUNCHER = "nvflare.app_common.job_launcher.client_process_launcher.ClientProcessJobLauncher"
PROCESS_SERVER_LAUNCHER = "nvflare.app_common.job_launcher.server_process_launcher.ServerProcessJobLauncher"
GPU_RESOURCE_CONSUMER = "nvflare.app_common.resource_consumers.gpu_resource_consumer.GPUResourceConsumer"
K8S_LAUNCHER_PATHS = {K8S_CLIENT_LAUNCHER, K8S_SERVER_LAUNCHER}
LAUNCHER_IDS = {"process_launcher", "docker_launcher", "k8s_launcher"}
BUILTIN_LAUNCHER_PATHS = {
DOCKER_CLIENT_LAUNCHER,
DOCKER_SERVER_LAUNCHER,
K8S_CLIENT_LAUNCHER,
K8S_SERVER_LAUNCHER,
PROCESS_CLIENT_LAUNCHER,
PROCESS_SERVER_LAUNCHER,
}
BUILTIN_RESOURCE_MANAGER_PATHS = {GPU_RESOURCE_MANAGER, PASSTHROUGH_RESOURCE_MANAGER}
BUILTIN_RESOURCE_CONSUMER_PATHS = {GPU_RESOURCE_CONSUMER}
RESOURCE_CONSUMER_IDS = {"resource_consumer"}
DOCKER_RESERVED_KWARGS = {
"volumes",
"mounts",
"network",
"environment",
"command",
"name",
"detach",
"user",
"working_dir",
}
STUDY_DATA_TEMPLATE = """# Study data mapping used by Docker and Kubernetes job launchers.
# Example:
# default:
# training:
# source: /data/training # Docker: host path; K8s: PVC claim name
# mode: ro # ro or rw
{}
"""
K8S_NAME_PATTERN = re.compile(r"^[a-z]([-a-z0-9]*[a-z0-9])?$")
K8S_NAMESPACE_PATTERN = re.compile(r"^[a-z0-9]([-a-z0-9]*[a-z0-9])?$")
K8S_SECRET_NAME_PATTERN = re.compile(r"^[a-z0-9]([-a-z0-9]*[a-z0-9])?(\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*$")
K8S_NAMESPACE_MAX_LENGTH = 63
K8S_SERVICE_NAME_MAX_LENGTH = 63
K8S_SECRET_NAME_MAX_LENGTH = 253
HELM_RELEASE_NAME_MAX_LENGTH = 53
DEFAULT_K8S_SERVER_SERVICE_NAME = "nvflare-server"
K8_STAGE_VALUES_KEY = "workspaceConfig"
K8_STAGE_LOCAL_KEY = "local"
K8_STAGE_STARTUP_KEY = "startup"
K8_STAGE_OBJECT_NAME_MAX_LENGTH = 253
K8_STAGE_OBJECT_SIZE_WARN_BYTES = 900 * 1024
KUBECTL_ENV_VAR = "KUBECTL"
K8_STAGE_ALLOWED_KUBECTL_NAMES = {"kubectl", "oc"}
[docs]
@dataclass
class KitInfo:
kit_dir: Path
role: str
name: str
org: str
fed_learn_port: int | None
admin_port: int | None
[docs]
def prepare_deployment(args) -> None:
kit, output_arg, config_path = _resolve_prepare_inputs(args)
config = _load_config(config_path)
runtime = config["runtime"]
_validate_runtime_config(runtime, config)
output = _resolve_output_path(kit, output_arg, runtime)
kit_info = _validate_kit(kit)
if kit_info.role == ROLE_ADMIN:
_fail(
"UNSUPPORTED_KIT",
"Admin startup kits are not supported by 'nvflare deploy prepare'.",
"Use a server or client startup kit.",
)
if output == kit or _is_path_relative_to(kit, output):
_fail(
"INVALID_ARGS",
"--output must not be the same as --kit or contain it.",
"Choose a prepared-kit directory outside the kit, or use the default <kit>/prepared/<runtime>.",
)
if output.exists() and not output.is_dir():
_fail("INVALID_ARGS", f"Output path exists and is not a directory: {output}", "Choose a directory path.")
parent_dir = output.parent
try:
parent_dir.mkdir(parents=True, exist_ok=True)
except Exception as ex:
_fail("INVALID_ARGS", f"Cannot create output parent directory: {parent_dir}", f"Check the path: {ex}")
temp_parent = None if _is_path_relative_to(output, kit) else str(parent_dir)
temp_dir = Path(tempfile.mkdtemp(prefix=f".{output.name}.prepare-", dir=temp_parent))
prepared_dir = temp_dir / output.name
try:
shutil.copytree(kit, prepared_dir, ignore=_ignore_output_path(kit, output))
prepared_info = KitInfo(
kit_dir=prepared_dir,
role=kit_info.role,
name=kit_info.name,
org=kit_info.org,
fed_learn_port=kit_info.fed_learn_port,
admin_port=kit_info.admin_port,
)
if runtime == RUNTIME_DOCKER:
result = _prepare_docker(prepared_info, output, config)
elif runtime == RUNTIME_K8S:
result = _prepare_k8s(prepared_info, output, config)
else:
_fail("INVALID_CONFIG", f"Unsupported runtime: {runtime}", "Use runtime: docker or runtime: k8s.")
if output.exists():
shutil.rmtree(output)
shutil.move(str(prepared_dir), str(output))
except SystemExit:
raise
except Exception as ex:
_fail("DEPLOY_PREPARE_FAILED", f"Failed to prepare deployment: {ex}", "Check the kit and runtime config.")
finally:
if temp_dir.exists():
shutil.rmtree(temp_dir, ignore_errors=True)
output_ok(result)
[docs]
def stage_k8_deployment(args) -> None:
kit, namespace, local_configmap, startup_secret = _resolve_k8_stage_inputs(args)
_validate_k8_stage_inputs(kit, namespace, local_configmap, startup_secret)
kubectl = _resolve_kubectl(args)
local_bundle = _build_volume_bundle(kit / "local")
startup_bundle = _build_volume_bundle(kit / "startup")
_warn_if_large_k8_object("ConfigMap", local_configmap, local_bundle["encoded_size"])
_warn_if_large_k8_object("Secret", startup_secret, startup_bundle["encoded_size"])
_kubectl_apply(_configmap_manifest(local_configmap, namespace, local_bundle["data"]), kubectl)
_kubectl_apply(_secret_manifest(startup_secret, namespace, startup_bundle["data"]), kubectl)
_patch_k8_stage_values(
kit=kit,
local_configmap=local_configmap,
local_items=local_bundle["items"],
startup_secret=startup_secret,
startup_items=startup_bundle["items"],
)
helm_command = _k8_stage_helm_command(kit, namespace)
output_ok(
{
"namespace": namespace,
"prepared_kit": str(kit),
"local_configmap": local_configmap,
"startup_secret": startup_secret,
"local_files": len(local_bundle["items"]),
"startup_files": len(startup_bundle["items"]),
"helm_values": str(kit / HELM_CHART_DIR / "values.yaml"),
"kubectl": kubectl,
"next_step": "Start the server/client parent pod with the helm_command.",
"helm_command": helm_command,
}
)
def _resolve_prepare_inputs(args) -> tuple[Path, str | None, Path]:
positional_kit = getattr(args, "kit", None)
flag_kit = getattr(args, "kit_flag", None)
if positional_kit and flag_kit:
_fail("INVALID_ARGS", "Specify the startup kit only once.", "Use either positional kit or --kit.")
kit_arg = positional_kit or flag_kit
if not kit_arg:
_fail("INVALID_ARGS", "Missing startup kit directory.", "Run nvflare deploy prepare <startup-kit-dir>.")
kit = Path(kit_arg).expanduser().resolve()
output_arg = getattr(args, "output", None)
config_arg = getattr(args, "config", None)
config_path = Path(config_arg).expanduser().resolve() if config_arg else kit / "config.yaml"
return kit, output_arg, config_path
def _resolve_k8_stage_inputs(args) -> tuple[Path, str, str, str]:
positional_kit = getattr(args, "kit", None)
flag_kit = getattr(args, "kit_flag", None)
if positional_kit and flag_kit:
_fail("INVALID_ARGS", "Specify the prepared startup kit only once.", "Use either positional kit or --kit.")
kit_arg = positional_kit or flag_kit
if not kit_arg:
_fail(
"INVALID_ARGS",
"Missing prepared startup kit directory.",
"Run nvflare deploy k8 stage <prepared-kit-dir>.",
)
kit = Path(kit_arg).expanduser().resolve()
values = _load_k8_stage_values(kit)
namespace = getattr(args, "namespace", None) or _read_k8_launcher_namespace(kit) or "default"
local_configmap = getattr(args, "local_configmap", None)
startup_secret = getattr(args, "startup_secret", None)
if not local_configmap or not startup_secret:
default_local_configmap, default_startup_secret = _default_k8_stage_resource_names(kit, values)
local_configmap = local_configmap or default_local_configmap
startup_secret = startup_secret or default_startup_secret
return kit, namespace, local_configmap, startup_secret
def _validate_k8_stage_inputs(kit: Path, namespace: str, local_configmap: str, startup_secret: str) -> None:
if not kit.is_dir():
_fail("INVALID_KIT", f"Prepared kit directory does not exist: {kit}", "Provide an existing prepared K8s kit.")
for folder in ("local", "startup"):
path = kit / folder
if not path.is_dir():
_fail(
"INVALID_KIT",
f"Missing prepared kit folder: {path}",
"Run nvflare deploy prepare with runtime: k8s before staging.",
)
if path.is_symlink():
_fail(
"INVALID_KIT",
f"Prepared kit folder must not be a symlink: {path}",
"Use a prepared K8s kit whose local/ and startup/ folders are regular directories.",
)
if not _find_k8s_launcher(kit):
_fail(
"INVALID_KIT",
f"Input folder was not generated for the Kubernetes runtime: {kit}",
"Run nvflare deploy prepare with runtime: k8s and pass that prepared output directory to "
"nvflare deploy k8 stage.",
)
if not (kit / HELM_CHART_DIR / "values.yaml").is_file():
_fail(
"INVALID_KIT",
f"Missing Helm values file: {kit / HELM_CHART_DIR / 'values.yaml'}",
"Run nvflare deploy prepare with runtime: k8s before staging.",
)
_validate_k8s_namespace(
{"namespace": namespace},
"namespace",
"deploy k8 stage",
error_code="INVALID_ARGS",
hint="Use a valid --namespace value.",
)
_validate_k8s_secret_name(
local_configmap,
"local ConfigMap name",
error_code="INVALID_ARGS",
hint="Use a valid --local-configmap value.",
)
_validate_k8s_secret_name(
startup_secret,
"startup Secret name",
error_code="INVALID_ARGS",
hint="Use a valid --startup-secret value.",
)
def _load_k8_stage_values(kit: Path) -> dict[str, Any]:
values_path = kit / HELM_CHART_DIR / "values.yaml"
if not values_path.is_file():
return {}
try:
with values_path.open("rt", encoding="utf-8") as f:
values = yaml.safe_load(f)
except Exception as ex:
_fail("INVALID_KIT", f"Failed to parse Helm values: {ex}", "Fix helm_chart/values.yaml.")
if not isinstance(values, dict):
_fail("INVALID_KIT", "helm_chart/values.yaml must contain a YAML mapping.", "Fix the prepared Helm chart.")
return values
def _read_k8_launcher_namespace(kit: Path) -> str | None:
component = _find_k8s_launcher(kit)
if not component:
return None
args = component.get("args") or {}
namespace = args.get("namespace")
return namespace if isinstance(namespace, str) and namespace else None
def _find_k8s_launcher(kit: Path) -> dict[str, Any] | None:
resources_path = kit / "local" / RESOURCES_JSON_DEFAULT
if not resources_path.is_file():
return None
resources = _load_json_file(resources_path, RESOURCES_JSON_DEFAULT)
components = resources.get("components", [])
if not isinstance(components, list):
return None
for component in components:
if (
isinstance(component, dict)
and component.get("id") == "k8s_launcher"
and component.get("path") in K8S_LAUNCHER_PATHS
):
return component
return None
def _default_k8_stage_resource_names(kit: Path, values: dict[str, Any]) -> tuple[str, str]:
raw_name = values.get("siteName") or values.get("name") or kit.name
safe_name = _stable_k8s_name(str(raw_name), K8S_SERVICE_NAME_MAX_LENGTH)
return (
_stable_k8s_name(f"nvflare-local-{safe_name}", K8_STAGE_OBJECT_NAME_MAX_LENGTH),
_stable_k8s_name(f"nvflare-startup-{safe_name}", K8_STAGE_OBJECT_NAME_MAX_LENGTH),
)
def _resolve_kubectl(args) -> str:
kubectl = getattr(args, "kubectl", None) or os.environ.get(KUBECTL_ENV_VAR) or "kubectl"
if not isinstance(kubectl, str) or not kubectl.strip():
_fail("INVALID_ARGS", "Kubernetes CLI command must be a non-empty string.", "Set --kubectl or KUBECTL.")
kubectl = kubectl.strip()
if kubectl not in K8_STAGE_ALLOWED_KUBECTL_NAMES:
_fail(
"INVALID_ARGS",
f"Kubernetes CLI command must be one of {sorted(K8_STAGE_ALLOWED_KUBECTL_NAMES)}: {kubectl!r}",
"Set --kubectl or KUBECTL to kubectl or oc.",
)
return kubectl
def _k8_stage_helm_command(kit: Path, namespace: str) -> str:
values = _load_k8_stage_values(kit)
raw_name = values.get("siteName") or values.get("name") or kit.name
release_name = _k8s_release_name(str(raw_name))
chart_dir = kit / HELM_CHART_DIR
return _format_command(["helm", "upgrade", "--install", release_name, str(chart_dir), "--namespace", namespace])
def _resolve_output_path(kit: Path, output_arg: str | None, runtime: str) -> Path:
if output_arg:
return Path(output_arg).expanduser().resolve()
return kit / "prepared" / runtime
def _ignore_output_path(kit: Path, output: Path):
if not _is_path_relative_to(output, kit):
return None
excluded = output.resolve()
prepared_root = (kit / "prepared").resolve()
def _ignore(dir_name: str, names: list[str]) -> list[str]:
current = Path(dir_name).resolve()
ignored = []
for name in names:
path = (current / name).resolve()
if path == excluded or path == prepared_root:
ignored.append(name)
return ignored
return _ignore
def _prepare_docker(kit_info: KitInfo, final_output: Path, config: dict[str, Any]) -> dict[str, Any]:
parent = config.get("parent") or {}
job_launcher = config.get("job_launcher") or {}
docker_image = parent["docker_image"]
network = parent.get("network", "nvflare-network")
launcher_path = DOCKER_SERVER_LAUNCHER if kit_info.role == ROLE_SERVER else DOCKER_CLIENT_LAUNCHER
launcher_args = {
"network": network,
"default_python_path": job_launcher.get("default_python_path", "/usr/local/bin/python"),
"default_job_container_kwargs": job_launcher.get("default_job_container_kwargs") or {},
"default_job_env": job_launcher.get("default_job_env") or {},
}
_patch_resources(kit_info.kit_dir, "docker_launcher", launcher_path, launcher_args)
if kit_info.role == ROLE_SERVER:
_relocate_server_storage_to_workspace(kit_info.kit_dir, WORKSPACE_MOUNT_PATH)
_patch_comm_config_for_docker(kit_info.kit_dir)
_ensure_study_data_template(kit_info.kit_dir)
_remove_start_scripts(kit_info.kit_dir, keep={DOCKER_START_SH})
start_script = _write_docker_start_script(kit_info, docker_image=docker_image, network=network)
final_start_script = final_output / "startup" / start_script.name
return {
"runtime": RUNTIME_DOCKER,
"role": kit_info.role,
"name": kit_info.name,
"output": str(final_output),
"start_command": f"cd {final_output} && ./startup/{DOCKER_START_SH}",
"start_script": str(final_start_script),
"resources": str(final_output / "local" / RESOURCES_JSON_DEFAULT),
}
def _prepare_k8s(kit_info: KitInfo, final_output: Path, config: dict[str, Any]) -> dict[str, Any]:
namespace = config.get("namespace", "default")
parent = config.get("parent") or {}
job_launcher = config.get("job_launcher") or {}
parent_port = parent.get("parent_port", 8102)
workspace_mount_path = parent.get("workspace_mount_path", WORKSPACE_MOUNT_PATH)
server_service_name = config.get("server_service_name", DEFAULT_K8S_SERVER_SERVICE_NAME)
launcher_path = K8S_SERVER_LAUNCHER if kit_info.role == ROLE_SERVER else K8S_CLIENT_LAUNCHER
launcher_args = {
"config_file_path": job_launcher.get("config_file_path"),
"study_data_pvc_file_path": f"{workspace_mount_path}/local/{STUDY_DATA_YAML}",
"namespace": namespace,
"default_python_path": job_launcher.get("default_python_path", K8S_PARENT_PYTHON_PATH),
"workspace_mount_path": workspace_mount_path,
}
if "pending_timeout" in job_launcher:
launcher_args["pending_timeout"] = job_launcher["pending_timeout"]
if job_launcher.get("job_pod_security_context"):
launcher_args["security_context"] = job_launcher["job_pod_security_context"]
if job_launcher.get("image_pull_secrets") is not None:
launcher_args["image_pull_secrets"] = job_launcher["image_pull_secrets"]
if "study_job_spec_file_path" in job_launcher:
launcher_args["study_job_spec_file_path"] = job_launcher["study_job_spec_file_path"]
_patch_resources(kit_info.kit_dir, "k8s_launcher", launcher_path, launcher_args)
_patch_comm_config_for_k8s(kit_info.kit_dir, kit_info.role, kit_info.name, parent_port, server_service_name)
_ensure_study_data_template(kit_info.kit_dir)
if kit_info.role == ROLE_SERVER:
_relocate_server_storage_to_workspace(kit_info.kit_dir, workspace_mount_path)
_remove_start_scripts(kit_info.kit_dir, keep=set())
_write_helm_chart(kit_info, config)
release_name = _k8s_release_name(kit_info.name)
final_chart_dir = final_output / HELM_CHART_DIR
return {
"runtime": RUNTIME_K8S,
"role": kit_info.role,
"name": kit_info.name,
"namespace": namespace,
"output": str(final_output),
"helm_chart": str(final_chart_dir),
"helm_command": f"helm upgrade --install {release_name} {final_chart_dir} --namespace {namespace}",
"resources": str(final_output / "local" / RESOURCES_JSON_DEFAULT),
}
def _load_config(config_path: Path) -> dict[str, Any]:
if not config_path.is_file():
_fail("CONFIG_NOT_FOUND", f"Config file not found: {config_path}", "Provide a YAML runtime config file.")
try:
with config_path.open("rt", encoding="utf-8") as f:
config = yaml.safe_load(f)
except Exception as ex:
_fail("INVALID_CONFIG", f"Failed to parse config file: {ex}", "Ensure the file is valid YAML.")
if not isinstance(config, dict):
_fail("INVALID_CONFIG", "Runtime config must be a YAML mapping.", "Add runtime: docker or runtime: k8s.")
runtime = config.get("runtime")
if runtime not in {RUNTIME_DOCKER, RUNTIME_K8S}:
_fail("INVALID_CONFIG", "Config must contain runtime: docker or runtime: k8s.", "Set a supported runtime.")
return config
def _validate_runtime_config(runtime: str, config: dict[str, Any]) -> None:
if runtime == RUNTIME_DOCKER:
_validate_allowed_keys(config, {"runtime", "parent", "job_launcher"}, "docker config")
parent = _mapping(config.get("parent"), "parent")
job_launcher = _mapping(config.get("job_launcher", {}), "job_launcher")
_validate_allowed_keys(parent, {"docker_image", "network"}, "parent")
_validate_allowed_keys(
job_launcher,
{"default_python_path", "default_job_env", "default_job_container_kwargs"},
"job_launcher",
)
_required_str(parent, "docker_image", "parent")
if "network" in parent:
_required_str(parent, "network", "parent")
_optional_str(job_launcher, "default_python_path", "job_launcher")
_optional_mapping(job_launcher, "default_job_env", "job_launcher")
default_kwargs = _optional_mapping(job_launcher, "default_job_container_kwargs", "job_launcher")
if default_kwargs:
reserved = DOCKER_RESERVED_KWARGS & set(default_kwargs)
if reserved:
_fail(
"INVALID_CONFIG",
f"default_job_container_kwargs contains reserved keys: {sorted(reserved)}",
"Remove keys controlled by DockerJobLauncher.",
)
elif runtime == RUNTIME_K8S:
_validate_allowed_keys(
config,
{"runtime", "namespace", "server_service_name", "parent", "job_launcher"},
"k8s config",
)
parent = _mapping(config.get("parent"), "parent")
job_launcher = _mapping(config.get("job_launcher", {}), "job_launcher")
_validate_allowed_keys(
parent,
{
"docker_image",
"parent_port",
"workspace_pvc",
"workspace_mount_path",
"python_path",
"resources",
"pod_security_context",
"image_pull_secrets",
},
"parent",
)
_validate_allowed_keys(
job_launcher,
{
"config_file_path",
"pending_timeout",
"default_python_path",
"job_pod_security_context",
"image_pull_secrets",
"study_job_spec_file_path",
},
"job_launcher",
)
_required_str(parent, "docker_image", "parent")
if "namespace" in config:
_validate_k8s_namespace(config, "namespace", "k8s config")
if "server_service_name" in config:
_validate_k8s_service_name(config, "server_service_name", "k8s config")
_optional_int(parent, "parent_port", "parent")
_optional_str(parent, "workspace_pvc", "parent")
_optional_str(parent, "workspace_mount_path", "parent")
_optional_str(parent, "python_path", "parent")
_optional_mapping(parent, "resources", "parent")
_optional_mapping(parent, "pod_security_context", "parent")
_optional_k8s_secret_name_list(parent, "image_pull_secrets", "parent image pull references")
_optional_str(job_launcher, "config_file_path", "job_launcher")
_optional_str(job_launcher, "default_python_path", "job_launcher")
_optional_non_negative_int(job_launcher, "pending_timeout", "job_launcher")
_optional_mapping(job_launcher, "job_pod_security_context", "job_launcher")
_optional_k8s_secret_name_list(job_launcher, "image_pull_secrets", "job launcher image pull references")
_optional_str(job_launcher, "study_job_spec_file_path", "job_launcher")
def _validate_kit(kit_dir: Path) -> KitInfo:
if not kit_dir.is_dir():
_fail("INVALID_KIT", f"Startup kit directory does not exist: {kit_dir}", "Provide an existing kit directory.")
startup_dir = kit_dir / "startup"
local_dir = kit_dir / "local"
if not startup_dir.is_dir():
_fail("INVALID_KIT", f"Missing startup directory: {startup_dir}", "Provide a valid startup kit.")
if not local_dir.is_dir():
_fail("INVALID_KIT", f"Missing local directory: {local_dir}", "Provide a valid startup kit.")
role_files = {
ROLE_SERVER: startup_dir / FED_SERVER_JSON,
ROLE_CLIENT: startup_dir / FED_CLIENT_JSON,
ROLE_ADMIN: startup_dir / FED_ADMIN_JSON,
}
roles = [role for role, path in role_files.items() if path.is_file()]
if len(roles) != 1:
_fail(
"INVALID_KIT",
f"Expected exactly one role file in startup/, found: {roles or 'none'}",
"A kit should contain one of fed_server.json, fed_client.json, or fed_admin.json.",
)
role = roles[0]
resources_path = local_dir / RESOURCES_JSON_DEFAULT
_load_json_file(resources_path, "resources.json.default")
role_config = _load_json_file(role_files[role], role_files[role].name)
if role != ROLE_ADMIN:
_validate_identity_files(startup_dir, role)
name = _detect_name(kit_dir, role, role_config)
org = _detect_org(startup_dir, role) if role != ROLE_ADMIN else name
fed_learn_port, admin_port = _detect_ports(role, role_config)
return KitInfo(kit_dir=kit_dir, role=role, name=name, org=org, fed_learn_port=fed_learn_port, admin_port=admin_port)
def _validate_identity_files(startup_dir: Path, role: str) -> None:
required = ["rootCA.pem"]
if role == ROLE_SERVER:
required.extend(["server.crt", "server.key"])
else:
required.extend(["client.crt", "client.key"])
missing = [name for name in required if not (startup_dir / name).is_file()]
if missing:
_fail(
"INVALID_KIT",
f"Missing identity material in {startup_dir}: {missing}",
"Use a fully provisioned or packaged startup kit.",
)
def _detect_name(kit_dir: Path, role: str, role_config: dict[str, Any]) -> str:
if role == ROLE_CLIENT:
client = role_config.get("client") or {}
return str(client.get("fqsn") or kit_dir.name)
if role == ROLE_SERVER:
server = (role_config.get("servers") or [{}])[0]
return str(server.get("identity") or server.get("admin_server") or kit_dir.name)
admin = role_config.get("admin") or {}
return str(admin.get("name") or kit_dir.name)
def _detect_org(startup_dir: Path, role: str) -> str:
cert_name = "server.crt" if role == ROLE_SERVER else "client.crt"
cert_path = startup_dir / cert_name
try:
cert = x509.load_pem_x509_certificate(cert_path.read_bytes(), default_backend())
org_attrs = cert.subject.get_attributes_for_oid(NameOID.ORGANIZATION_NAME)
except Exception as ex:
_fail("INVALID_KIT", f"Failed to parse {cert_name}: {ex}", "Use a valid provisioned startup kit.")
if not org_attrs or not org_attrs[0].value:
_fail("INVALID_KIT", f"Missing organization in {cert_name}.", "Use a valid provisioned startup kit.")
return org_attrs[0].value
def _detect_ports(role: str, role_config: dict[str, Any]) -> tuple[int | None, int | None]:
if role != ROLE_SERVER:
return None, None
server = (role_config.get("servers") or [{}])[0]
fed_port = _port_from_target((server.get("service") or {}).get("target"))
admin_port = server.get("admin_port")
if isinstance(admin_port, int):
return fed_port, admin_port
return fed_port, fed_port
def _port_from_target(target: str | None) -> int | None:
if not target or ":" not in target:
return None
try:
return int(str(target).rsplit(":", 1)[-1])
except ValueError:
return None
def _patch_resources(kit_dir: Path, launcher_id: str, launcher_path: str, launcher_args: dict[str, Any]) -> None:
local_dir = kit_dir / "local"
default_path = local_dir / RESOURCES_JSON_DEFAULT
resources = _load_json_file(default_path, RESOURCES_JSON_DEFAULT)
components = resources.setdefault("components", [])
if not isinstance(components, list):
_fail("INVALID_KIT", "resources.json.default components must be a list.", "Fix the startup kit resources file.")
_warn_for_replaced_components(components, launcher_id, launcher_path)
components[:] = [
c for c in components if c.get("id") not in LAUNCHER_IDS and c.get("id") not in RESOURCE_CONSUMER_IDS
]
components[:] = [c for c in components if c.get("id") != "resource_manager"]
components.append(
{
"id": "resource_manager",
"path": PASSTHROUGH_RESOURCE_MANAGER,
"args": {},
}
)
components.append({"id": launcher_id, "path": launcher_path, "args": launcher_args})
_write_resources(local_dir, resources)
def _warn_for_replaced_components(components: list[dict[str, Any]], launcher_id: str, launcher_path: str) -> None:
for component in components:
component_id = component.get("id")
if component_id in RESOURCE_CONSUMER_IDS and _component_has_custom_config(
component, BUILTIN_RESOURCE_CONSUMER_PATHS
):
_warn(
f"deploy prepare removes component '{component_id}' from resources.json.default; "
"existing resource consumer configuration will not be used by the prepared runtime."
)
elif component_id == "resource_manager" and _component_has_custom_config(
component, BUILTIN_RESOURCE_MANAGER_PATHS
):
_warn(
"deploy prepare replaces component 'resource_manager' with PassthroughResourceManager; "
"existing resource manager path/args will not be used by the prepared runtime."
)
elif component_id in LAUNCHER_IDS and _launcher_replacement_discards_config(
component, launcher_id, launcher_path
):
_warn(
f"deploy prepare replaces component '{component_id}' with generated '{launcher_id}' configuration; "
"existing launcher path/args will not be used by the prepared runtime."
)
def _component_has_custom_config(component: dict[str, Any], builtin_paths: set[str]) -> bool:
args = component.get("args")
# Empty args are the canonical "no arguments" shape in generated resources files.
if args:
return True
path = component.get("path")
return bool(path) and path not in builtin_paths
def _launcher_replacement_discards_config(component: dict[str, Any], launcher_id: str, launcher_path: str) -> bool:
if component.get("args"):
return True
path = component.get("path")
if not path:
return False
if component.get("id") == launcher_id and path != launcher_path:
return True
return path not in BUILTIN_LAUNCHER_PATHS
def _warn(message: str) -> None:
print_human(f"Warning: {message}")
def _write_resources(local_dir: Path, resources: dict[str, Any]) -> None:
payload = json.dumps(resources, indent=4)
(local_dir / RESOURCES_JSON_DEFAULT).write_text(payload + "\n", encoding="utf-8")
active_resources = local_dir / RESOURCES_JSON
if active_resources.exists():
active_resources.unlink()
def _patch_comm_config_for_docker(kit_dir: Path) -> None:
comm_config_path = kit_dir / "local" / COMM_CONFIG_JSON
comm_config = _load_or_default_comm_config(comm_config_path)
internal = comm_config.setdefault("internal", {})
internal["scheme"] = "tcp"
resources = _internal_resources(comm_config)
resources["host"] = "0.0.0.0"
resources.setdefault("connection_security", "clear")
_write_json(comm_config_path, comm_config)
def _patch_comm_config_for_k8s(
kit_dir: Path,
role: str,
site_name: str,
parent_port: int,
server_service_name: str = DEFAULT_K8S_SERVER_SERVICE_NAME,
) -> None:
comm_config_path = kit_dir / "local" / COMM_CONFIG_JSON
comm_config = _load_or_default_comm_config(comm_config_path)
resources = _internal_resources(comm_config)
resources.update(
{
"host": _k8s_parent_service_name(role, site_name, server_service_name),
"port": parent_port,
"connection_security": "clear",
}
)
internal = comm_config.setdefault("internal", {})
internal["scheme"] = "tcp"
_write_json(comm_config_path, comm_config)
def _load_or_default_comm_config(comm_config_path: Path) -> dict[str, Any]:
if comm_config_path.is_file():
return _load_json_file(comm_config_path, COMM_CONFIG_JSON)
return {
"allow_adhoc_conns": False,
"backbone_conn_gen": 2,
"internal": {"scheme": "tcp", "resources": {"connection_security": "clear"}},
}
def _internal_resources(comm_config: dict[str, Any]) -> dict[str, Any]:
internal = comm_config.setdefault("internal", {})
resources = internal.setdefault("resources", {})
if not isinstance(resources, dict):
_fail(
"INVALID_KIT",
"comm_config.json internal.resources must be a mapping.",
"Fix the startup kit comm config.",
)
return resources
def _ensure_study_data_template(kit_dir: Path) -> None:
path = kit_dir / "local" / STUDY_DATA_YAML
if not path.exists():
path.write_text(STUDY_DATA_TEMPLATE, encoding="utf-8")
def _remove_start_scripts(kit_dir: Path, keep: set[str]) -> None:
startup_dir = kit_dir / "startup"
for filename in (START_SH, SUB_START_SH, STOP_FL_SH, LEGACY_DOCKER_SH, DOCKER_START_SH):
if filename in keep:
continue
path = startup_dir / filename
if path.exists():
path.unlink()
def _k8s_parent_service_name(
role: str, site_name: str, server_service_name: str = DEFAULT_K8S_SERVER_SERVICE_NAME
) -> str:
if role == ROLE_SERVER:
return server_service_name
return _k8s_service_name(site_name)
def _k8s_service_name(site_name: str) -> str:
return _stable_k8s_name(site_name, K8S_SERVICE_NAME_MAX_LENGTH)
def _stable_k8s_name(raw_name: str, max_length: int) -> str:
if len(raw_name) <= max_length and K8S_NAME_PATTERN.match(raw_name):
return raw_name
normalized = re.sub(r"[^a-z0-9-]", "-", raw_name.lower())
normalized = re.sub(r"-+", "-", normalized).strip("-")
if not normalized:
normalized = "site"
if not normalized[0].isalpha():
normalized = f"site-{normalized}"
digest = hashlib.sha256(raw_name.encode("utf-8")).hexdigest()[:8]
head_max = max_length - len(digest) - 1
head = normalized[:head_max].rstrip("-") or "site"
return f"{head}-{digest}"
def _write_docker_start_script(kit_info: KitInfo, docker_image: str, network: str) -> Path:
role_label = "server" if kit_info.role == ROLE_SERVER else "client"
publish_port = ""
network_alias = ""
if kit_info.role == ROLE_SERVER:
fed_learn_port = kit_info.fed_learn_port or 8002
ports = [fed_learn_port]
if kit_info.admin_port is not None and kit_info.admin_port != fed_learn_port:
ports.append(kit_info.admin_port)
publish_port = "".join(f" -p {port}:{port} \\\n" for port in ports)
network_alias = f" --network-alias {shlex.quote(ROLE_SERVER)} \\\n"
script = f"""#!/usr/bin/env bash
# Generated by NVFlare deploy prepare - Docker runtime {role_label} startup script.
# The parent {role_label} container mounts this prepared kit and launches job containers through Docker.
DIR="$( cd "$( dirname "${{BASH_SOURCE[0]}}" )" >/dev/null 2>&1 && pwd )"
HOST_WORKSPACE="$(cd "$DIR/.." && pwd)"
DOCKER_IMAGE=${{NVFL_P_IMAGE:-{_bash_double_quote(docker_image)}}}
if ! docker info > /dev/null 2>&1; then
echo "ERROR: cannot connect to Docker daemon. Make sure your user has permission to run docker."
echo " Option 1 (docker group): sudo usermod -aG docker \\$USER then log out and back in."
echo " Option 2 (rootless Docker): https://docs.docker.com/engine/security/rootless/"
exit 1
fi
echo "Starting NVFlare {role_label} {kit_info.name} with image $DOCKER_IMAGE"
echo "Host workspace: $HOST_WORKSPACE"
NETWORK_NAME={_bash_double_quote(network)}
if ! docker network ls --filter name="$NETWORK_NAME" --format "{{{{.Name}}}}" | grep -wq "$NETWORK_NAME"; then
docker network create "$NETWORK_NAME"
echo "Created Docker network: $NETWORK_NAME"
fi
rm -f "$HOST_WORKSPACE/daemon_pid.fl"
SOCK_GID=$(stat -c '%g' /var/run/docker.sock 2>/dev/null || stat -f '%g' /var/run/docker.sock 2>/dev/null || echo "")
GROUP_ADD_ARG=""
if [ -n "$SOCK_GID" ] && [ "$SOCK_GID" != "0" ]; then
GROUP_ADD_ARG="--group-add $SOCK_GID"
fi
docker run --name {shlex.quote(kit_info.name)} \\
--user "$(id -u):$(id -g)" \\
$GROUP_ADD_ARG \\
--network "$NETWORK_NAME" \\
{network_alias} -v "$HOST_WORKSPACE":{WORKSPACE_MOUNT_PATH} \\
-v /var/run/docker.sock:/var/run/docker.sock \\
-e NVFL_DOCKER_WORKSPACE="$HOST_WORKSPACE" \\
{publish_port} --rm "$DOCKER_IMAGE" \\
{_docker_parent_command(kit_info)}
"""
script_path = kit_info.kit_dir / "startup" / DOCKER_START_SH
script_path.write_text(script, encoding="utf-8")
script_path.chmod(script_path.stat().st_mode | stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH)
return script_path
def _bash_double_quote(value: str) -> str:
escaped = value.replace("\\", "\\\\").replace('"', '\\"').replace("$", "\\$").replace("`", "\\`")
return f'"{escaped}"'
def _docker_parent_command(kit_info: KitInfo) -> str:
if kit_info.role == ROLE_SERVER:
module = "nvflare.private.fed.app.server.server_train"
startup_config = FED_SERVER_JSON
set_args = ["secure_train=true", "config_folder=config", f"org={kit_info.org}"]
else:
module = "nvflare.private.fed.app.client.client_train"
startup_config = FED_CLIENT_JSON
set_args = ["secure_train=true", f"uid={kit_info.name}", f"org={kit_info.org}", "config_folder=config"]
command = [
"/usr/local/bin/python3",
"-u",
"-m",
module,
"-m",
WORKSPACE_MOUNT_PATH,
"-s",
startup_config,
"--set",
*set_args,
]
return " \\\n ".join(shlex.quote(arg) for arg in command)
def _write_helm_chart(kit_info: KitInfo, config: dict[str, Any]) -> Path:
parent = config.get("parent") or {}
docker_image = parent["docker_image"]
parent_port = parent.get("parent_port", 8102)
workspace_pvc = parent.get("workspace_pvc", "nvflws")
workspace_mount_path = parent.get("workspace_mount_path", WORKSPACE_MOUNT_PATH)
parent_python_path = parent.get("python_path") or K8S_PARENT_PYTHON_PATH
server_service_name = config.get("server_service_name", DEFAULT_K8S_SERVER_SERVICE_NAME)
chart_dir = kit_info.kit_dir / HELM_CHART_DIR
if chart_dir.exists():
shutil.rmtree(chart_dir)
templates_dir = chart_dir / "templates"
templates_dir.mkdir(parents=True)
if kit_info.role == ROLE_SERVER:
_write_server_helm_chart(
kit_info,
chart_dir,
docker_image,
parent_port,
workspace_pvc,
workspace_mount_path,
parent_python_path,
parent,
server_service_name,
)
_copy_helm_templates("server", templates_dir)
else:
_write_client_helm_chart(
kit_info,
chart_dir,
docker_image,
parent_port,
workspace_pvc,
workspace_mount_path,
parent_python_path,
parent,
)
_copy_helm_templates("client", templates_dir)
return chart_dir
def _write_server_helm_chart(
kit_info: KitInfo,
chart_dir: Path,
docker_image: str,
parent_port: int,
workspace_pvc: str,
workspace_mount_path: str,
parent_python_path: str,
parent: dict[str, Any],
server_service_name: str,
) -> None:
repo, tag = _split_image(docker_image)
_write_yaml(
chart_dir / "Chart.yaml",
{
"apiVersion": "v2",
"name": "nvflare-server",
"description": f"NVFlare federated learning server for {kit_info.name}",
"type": "application",
"version": "0.1.0",
"appVersion": tag or "latest",
"keywords": ["nvflare", "federated-learning"],
"maintainers": [],
},
)
fed_learn_port = kit_info.fed_learn_port or 8002
admin_port = kit_info.admin_port if kit_info.admin_port != fed_learn_port else None
values = {
"name": kit_info.name,
"serviceName": server_service_name,
"image": {"repository": repo, "tag": tag, "pullPolicy": "IfNotPresent"},
"imagePullSecrets": _image_pull_secret_refs(parent),
"serviceAccount": {"create": True, "annotations": {}, "automountServiceAccountToken": True},
"podAnnotations": {},
"rbac": {"create": True},
"persistence": {
"workspace": {
"claimName": workspace_pvc,
"volumeName": WORKSPACE_VOLUME_NAME,
"mountPath": workspace_mount_path,
}
},
"workspaceConfig": {
"local": {"configMapName": None, "items": []},
"startup": {"secretName": None, "items": []},
},
"fedLearnPort": fed_learn_port,
"adminPort": admin_port,
"parentPort": parent_port,
"resources": parent.get("resources") or {"requests": {"cpu": "2", "memory": "8Gi"}},
"securityContext": parent.get("pod_security_context") or {},
"hostPortEnabled": False,
"tcpConfigMapEnabled": False,
"service": {"type": "ClusterIP", "loadBalancerIP": None, "annotations": {}},
"command": [parent_python_path],
"args": [
"-u",
"-m",
"nvflare.private.fed.app.server.server_train",
"-m",
workspace_mount_path,
"-s",
FED_SERVER_JSON,
"--set",
"secure_train=true",
"config_folder=config",
f"org={kit_info.org}",
],
}
_write_yaml(chart_dir / "values.yaml", values)
def _write_client_helm_chart(
kit_info: KitInfo,
chart_dir: Path,
docker_image: str,
parent_port: int,
workspace_pvc: str,
workspace_mount_path: str,
parent_python_path: str,
parent: dict[str, Any],
) -> None:
repo, tag = _split_image(docker_image)
_write_yaml(
chart_dir / "Chart.yaml",
{
"apiVersion": "v2",
"name": "nvflare-client",
"description": f"NVFlare federated learning client deployment and service for {kit_info.name}",
"type": "application",
"version": "0.1.0",
"appVersion": tag or "latest",
"keywords": ["nvflare", "federated-learning"],
"maintainers": [],
},
)
values = {
"name": kit_info.name,
"siteName": kit_info.name,
"serviceName": _k8s_parent_service_name(kit_info.role, kit_info.name),
"image": {"repository": repo, "tag": tag, "pullPolicy": "Always"},
"imagePullSecrets": _image_pull_secret_refs(parent),
"serviceAccount": {"create": True, "annotations": {}, "automountServiceAccountToken": True},
"podAnnotations": {},
"rbac": {"create": True},
"persistence": {
"workspace": {
"claimName": workspace_pvc,
"volumeName": WORKSPACE_VOLUME_NAME,
"mountPath": workspace_mount_path,
}
},
"workspaceConfig": {
"local": {"configMapName": None, "items": []},
"startup": {"secretName": None, "items": []},
},
"port": parent_port,
"service": {"annotations": {}},
"securityContext": parent.get("pod_security_context") or {},
"resources": parent.get("resources") or {"requests": {"cpu": "2", "memory": "8Gi"}},
"command": [parent_python_path],
"args": [
"-u",
"-m",
"nvflare.private.fed.app.client.client_train",
"-m",
workspace_mount_path,
"-s",
FED_CLIENT_JSON,
"--set",
"secure_train=true",
"config_folder=config",
f"org={kit_info.org}",
],
}
_write_yaml(chart_dir / "values.yaml", values)
def _copy_helm_templates(role: str, templates_dir: Path) -> None:
files = {
"server": [
("_helpers.tpl", "_helpers.tpl"),
("deployment.yaml", "server-deployment.yaml"),
("service.yaml", "server-service.yaml"),
("tcp-services.yaml", "server-tcp-services.yaml"),
("serviceaccount.yaml", "serviceaccount.yaml"),
("role.yaml", "role.yaml"),
],
"client": [
("_helpers.tpl", "_helpers.tpl"),
("deployment.yaml", "client-deployment.yaml"),
("service.yaml", "service.yaml"),
("serviceaccount.yaml", "serviceaccount.yaml"),
("role.yaml", "role.yaml"),
],
}
for src_name, dst_name in files[role]:
shutil.copy(_helm_src(role, src_name), templates_dir / dst_name)
def _split_image(docker_image: str) -> tuple[str, str]:
if ":" in docker_image:
return docker_image.rsplit(":", 1)
return docker_image, ""
def _image_pull_secret_refs(parent: dict[str, Any]) -> list[dict[str, str]]:
return [{"name": name} for name in parent.get("image_pull_secrets") or []]
def _helm_src(role: str, filename: str) -> Path:
return HELM_TEMPLATES_DIR / role / filename
def _relocate_server_storage_to_workspace(kit_dir: Path, workspace_mount_path: str) -> None:
local_dir = kit_dir / "local"
resources = _load_json_file(local_dir / RESOURCES_JSON_DEFAULT, RESOURCES_JSON_DEFAULT)
if "snapshot_persistor" in resources:
try:
resources["snapshot_persistor"]["args"]["storage"]["args"][
"root_dir"
] = f"{workspace_mount_path}/snapshot-storage"
except (KeyError, TypeError):
_warn(
"snapshot_persistor is present, but deploy prepare could not relocate snapshot storage to the "
f"workspace at {workspace_mount_path}/snapshot-storage. Expected nested key: "
"snapshot_persistor.args.storage.args.root_dir."
)
for component in resources.get("components", []):
if component.get("id") == "job_manager":
component.setdefault("args", {})["uri_root"] = f"{workspace_mount_path}/jobs-storage"
_write_resources(local_dir, resources)
def _build_volume_bundle(root: Path) -> dict[str, Any]:
data = {}
items = []
encoded_size = 0
root_resolved = root.resolve()
files = sorted(path for path in root.rglob("*") if path.is_file() or path.is_symlink())
if not files:
_fail("INVALID_KIT", f"No files found in prepared kit folder: {root}", "Stage a non-empty prepared folder.")
for index, path in enumerate(files):
if path.is_symlink():
_fail(
"INVALID_KIT",
f"Symlinks cannot be staged into Kubernetes volumes: {path}",
"Replace it with a file.",
)
source_path = path.resolve()
if not _is_path_relative_to(source_path, root_resolved):
_fail(
"INVALID_KIT",
f"Staged file resolves outside the prepared kit folder: {path}",
"Use files contained under local/ or startup/.",
)
rel_path = path.relative_to(root).as_posix()
_validate_k8_volume_item_path(rel_path, path)
key = _k8_stage_file_key(index, rel_path)
encoded = base64.b64encode(source_path.read_bytes()).decode("ascii")
data[key] = encoded
encoded_size += len(encoded)
items.append({"key": key, "path": rel_path})
return {"data": data, "items": items, "encoded_size": encoded_size}
def _validate_k8_volume_item_path(rel_path: str, source_path: Path) -> None:
path = PurePosixPath(rel_path)
if path.is_absolute() or ".." in path.parts or not path.parts:
_fail("INVALID_KIT", f"Unsafe staged file path: {source_path}", "Use files contained under local/ or startup/.")
def _k8_stage_file_key(index: int, rel_path: str) -> str:
digest = hashlib.sha256(rel_path.encode("utf-8")).hexdigest()[:12]
safe_name = re.sub(r"[^A-Za-z0-9_.-]", "_", PurePosixPath(rel_path).name) or "file"
prefix = f"f{index:04d}-{digest}-"
return f"{prefix}{safe_name}"[:253]
def _configmap_manifest(name: str, namespace: str, data: dict[str, str]) -> dict[str, Any]:
return {
"apiVersion": "v1",
"kind": "ConfigMap",
"metadata": {"name": name, "namespace": namespace},
"binaryData": data,
}
def _secret_manifest(name: str, namespace: str, data: dict[str, str]) -> dict[str, Any]:
return {
"apiVersion": "v1",
"kind": "Secret",
"metadata": {"name": name, "namespace": namespace},
"type": "Opaque",
"data": data,
}
def _patch_k8_stage_values(
kit: Path,
local_configmap: str,
local_items: list[dict[str, str]],
startup_secret: str,
startup_items: list[dict[str, str]],
) -> None:
values_path = kit / HELM_CHART_DIR / "values.yaml"
values = _load_k8_stage_values(kit)
workspace_config = values.setdefault(K8_STAGE_VALUES_KEY, {})
workspace_config[K8_STAGE_LOCAL_KEY] = {
"configMapName": local_configmap,
"items": local_items,
}
workspace_config[K8_STAGE_STARTUP_KEY] = {
"secretName": startup_secret,
"items": startup_items,
}
_write_yaml(values_path, values)
def _warn_if_large_k8_object(kind: str, name: str, encoded_size: int) -> None:
if encoded_size > K8_STAGE_OBJECT_SIZE_WARN_BYTES:
_warn(
f"{kind} '{name}' encoded payload is about {encoded_size} bytes; "
"Kubernetes objects have size limits, so large local/startup folders may fail to apply."
)
def _kubectl_apply(manifest: dict[str, Any], kubectl: str) -> subprocess.CompletedProcess:
payload = yaml.safe_dump(manifest, default_flow_style=False, sort_keys=False)
if kubectl == "oc":
return _kubectl(["oc", "apply", "-f", "-"], input_text=payload)
return _kubectl(["kubectl", "apply", "-f", "-"], input_text=payload)
def _kubectl(cmd: list[str], input_text: str | None = None) -> subprocess.CompletedProcess:
try:
result = subprocess.run(
cmd,
input=input_text,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
check=False,
)
except OSError:
_fail(
"KUBECTL_NOT_FOUND",
f"Kubernetes CLI executable could not be started: {cmd[0]}",
"Install kubectl or set --kubectl/KUBECTL to a compatible command such as oc.",
)
if result.returncode != 0:
detail = (result.stderr or result.stdout or "").strip()
message = f"Kubernetes command failed: {_format_command(cmd)}"
if detail:
message = f"{message}\n{detail}"
_fail("KUBECTL_FAILED", message, "Check kubectl access, namespace, and resource quotas.")
return result
def _format_command(cmd: list[str]) -> str:
return " ".join(shlex.quote(str(part)) for part in cmd)
def _k8s_release_name(name: str) -> str:
return _stable_k8s_name(name, HELM_RELEASE_NAME_MAX_LENGTH)
def _is_path_relative_to(path: Path, parent: Path) -> bool:
try:
path.relative_to(parent)
return True
except ValueError:
return False
def _mapping(value: Any, name: str) -> dict[str, Any]:
if not isinstance(value, dict):
_fail("INVALID_CONFIG", f"{name} must be a YAML mapping.", "Fix the runtime config.")
return value
def _validate_allowed_keys(data: dict[str, Any], allowed: set[str], where: str) -> None:
unknown = sorted(set(data) - allowed)
if unknown:
_fail("INVALID_CONFIG", f"Unknown keys in {where}: {unknown}", "Remove or rename unsupported keys.")
def _required_str(data: dict[str, Any], key: str, where: str) -> None:
if not isinstance(data.get(key), str) or not data.get(key):
_fail("INVALID_CONFIG", f"{where}.{key} must be a non-empty string.", "Fix the runtime config.")
def _validate_k8s_namespace(
data: dict[str, Any],
key: str,
where: str,
error_code: str = "INVALID_CONFIG",
hint: str = "Fix the runtime config.",
) -> None:
namespace = data.get(key)
if not isinstance(namespace, str) or not namespace:
_fail(error_code, f"{where}.{key} must be a non-empty string.", hint)
return
if len(namespace) > K8S_NAMESPACE_MAX_LENGTH or not K8S_NAMESPACE_PATTERN.fullmatch(namespace):
_fail(
error_code,
f"{where}.{key} must be a valid Kubernetes namespace (DNS-1123 label): {namespace!r}.",
"Use lower case alphanumeric characters or '-', start and end with an alphanumeric character, "
f"and keep length <= {K8S_NAMESPACE_MAX_LENGTH}.",
)
def _validate_k8s_service_name(data: dict[str, Any], key: str, where: str) -> None:
service_name = data.get(key)
if not isinstance(service_name, str) or not service_name:
_fail("INVALID_CONFIG", f"{where}.{key} must be a non-empty string.", "Fix the runtime config.")
if len(service_name) > K8S_SERVICE_NAME_MAX_LENGTH or not K8S_NAME_PATTERN.fullmatch(service_name):
_fail(
"INVALID_CONFIG",
f"{where}.{key} must be a valid Kubernetes Service name (DNS-1035 label): {service_name!r}.",
"Use lower case alphanumeric characters or '-', start with a letter, end with an alphanumeric character, "
f"and keep length <= {K8S_SERVICE_NAME_MAX_LENGTH}.",
)
def _validate_k8s_secret_name(
name: str, label: str, error_code: str = "INVALID_CONFIG", hint: str = "Fix the runtime config."
) -> None:
if (
not isinstance(name, str)
or not name
or len(name) > K8S_SECRET_NAME_MAX_LENGTH
or not K8S_SECRET_NAME_PATTERN.fullmatch(name)
):
_fail(
error_code,
f"{label} must contain valid Kubernetes object names.",
"Use lower case alphanumeric characters, '-', or '.', start and end with an alphanumeric character, "
f"and keep length <= 253. {hint}",
)
def _optional_k8s_secret_name_list(data: dict[str, Any], key: str, label: str) -> list[str] | None:
if key not in data or data[key] is None:
return None
names = data[key]
if not isinstance(names, list):
_fail("INVALID_CONFIG", f"{label} must be a list of Kubernetes object names.", "Fix the runtime config.")
for name in names:
_validate_k8s_secret_name(name, label)
return names
def _optional_str(data: dict[str, Any], key: str, where: str) -> str | None:
if key not in data or data[key] is None:
return None
if not isinstance(data[key], str) or not data[key]:
_fail("INVALID_CONFIG", f"{where}.{key} must be a non-empty string.", "Fix the runtime config.")
return data[key]
def _optional_mapping(data: dict[str, Any], key: str, where: str) -> dict[str, Any] | None:
if key not in data or data[key] is None:
return None
if not isinstance(data[key], dict):
_fail("INVALID_CONFIG", f"{where}.{key} must be a mapping.", "Fix the runtime config.")
return data[key]
def _optional_int(data: dict[str, Any], key: str, where: str) -> int | None:
if key not in data or data[key] is None:
return None
if not isinstance(data[key], int):
_fail("INVALID_CONFIG", f"{where}.{key} must be an integer.", "Fix the runtime config.")
return data[key]
def _optional_non_negative_int(data: dict[str, Any], key: str, where: str) -> int | None:
value = _optional_int(data, key, where)
if isinstance(value, bool):
_fail("INVALID_CONFIG", f"{where}.{key} must be an integer.", "Fix the runtime config.")
if value is not None and value < 0:
_fail("INVALID_CONFIG", f"{where}.{key} must be a non-negative integer.", "Fix the runtime config.")
return value
def _load_json_file(path: Path, label: str) -> dict[str, Any]:
if not path.is_file():
_fail("INVALID_KIT", f"Missing {label}: {path}", "Provide a valid startup kit.")
try:
with path.open("rt", encoding="utf-8") as f:
data = json.load(f)
except Exception as ex:
_fail("INVALID_KIT", f"Failed to parse {label}: {ex}", "Fix the startup kit JSON.")
if not isinstance(data, dict):
_fail("INVALID_KIT", f"{label} must contain a JSON object.", "Fix the startup kit JSON.")
return data
def _write_json(path: Path, data: dict[str, Any]) -> None:
path.write_text(json.dumps(data, indent=4) + "\n", encoding="utf-8")
def _write_yaml(path: Path, data: dict[str, Any]) -> None:
path.write_text(yaml.safe_dump(data, default_flow_style=False, sort_keys=False), encoding="utf-8")
def _fail(error_code: str, message: str, hint: str = "") -> None:
output_error_message(error_code, message, hint, None, exit_code=4)