Source code for nvflare.tool.agent.skill_manager

# Copyright (c) 2026, NVIDIA CORPORATION.  All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Native install/list support for NVFLARE-owned agent skills."""

import json
import os
import shutil
import stat
import tempfile
import time
from contextlib import contextmanager
from dataclasses import dataclass
from datetime import datetime, timezone
from importlib import resources, util
from pathlib import Path
from typing import Optional

import nvflare
from nvflare.tool.agent.skill_manifest import (
    IGNORED_SKILL_FILE_NAMES,
    MANIFEST_CONTENT_MODE_RELEASE,
    MANIFEST_FILE_NAME,
    RELEASE_SKILL_FILE_EXCLUDE_NAMES,
    SHARED_SKILL_REFERENCE_DIR,
    build_skill_manifest,
    load_manifest,
    skill_tree_hash,
)

INSTALL_MANIFEST_FILE_NAME = ".nvflare_skill_install.json"
SUPPORTED_AGENT_TARGETS = ("codex", "claude")
BUNDLED_SKILLS_PACKAGE = "nvflare.tool.agent.bundled_skills"
DEFAULT_INSTALL_LOCK_TTL_SECONDS = 300
INSTALL_LOCK_TIMESTAMP_FILE_NAME = "created_at_ns"


[docs] @dataclass(frozen=True) class SkillSource: source_type: str root: Path manifest: dict
[docs] def resolve_agent_target_dir( agent: str, *, target_dir: Optional[Path | str] = None, env: Optional[dict] = None ) -> Path: """Resolve a named agent target to its skill installation directory. Explicit write targets, including CODEX_HOME-derived targets, use the same no-traversal/no-symlink policy as --target. The only symlink exception is the platform temp-directory alias handled in _resolve_target_override. """ if target_dir: return _resolve_target_override(target_dir) env_map = env or os.environ if agent == "codex": codex_home = env_map.get("CODEX_HOME") if codex_home: return _resolve_target_override(Path(codex_home).expanduser() / "skills") return Path.home() / ".codex" / "skills" if agent == "claude": return Path.home() / ".claude" / "skills" raise ValueError(f"unsupported agent target: {agent}")
[docs] def find_skill_source() -> SkillSource: """Find skills from an editable/source checkout first, then from the installed package bundle.""" source_root = _source_checkout_root() if source_root: return SkillSource( source_type="editable", root=source_root, manifest=build_skill_manifest(source_root, source_type="editable", nvflare_version=nvflare.__version__), ) bundle_root = _bundled_skills_root() manifest_path = bundle_root / MANIFEST_FILE_NAME manifest = ( load_manifest(manifest_path) if manifest_path.is_file() else build_skill_manifest(bundle_root, source_type="wheel") ) return SkillSource(source_type="wheel", root=bundle_root, manifest=manifest)
def _bundled_skills_root() -> Path: bundle_root = Path(str(resources.files(BUNDLED_SKILLS_PACKAGE))) if not bundle_root.is_dir(): raise FileNotFoundError( f"bundled agent skills must be available from an unpacked filesystem package: {BUNDLED_SKILLS_PACKAGE}" ) return bundle_root
[docs] def install_skills( *, agent: str, skill_name: Optional[str] = None, dry_run: bool = False, target_dir: Optional[Path | str] = None, source: Optional[SkillSource] = None, ) -> dict: """Plan or apply a native NVFLARE skill installation.""" source = source or find_skill_source() target = resolve_agent_target_dir(agent, target_dir=target_dir) selected, missing = _select_skills(source.manifest, skill_name) plan = _install_plan(source, selected, target, agent=agent, requested_skill=skill_name) plan["missing"] = missing plan["applied"] = False if dry_run or missing: return plan try: target.mkdir(parents=True, exist_ok=True) except OSError as e: error = _install_error(str(target), e) plan["errors"].append(error) plan["applied"] = False return plan try: _sync_shared_references(source, target) except Exception as e: error = _install_error(SHARED_SKILL_REFERENCE_DIR, e) plan["errors"].append(error) plan["applied"] = False return plan for entry in plan["skills"]: try: if entry["action"] == "copy": _copy_skill(source.root / entry["relative_path"], Path(entry["target_path"]), entry, source) entry["status"] = "installed" elif entry["action"] == "replace": _replace_skill( source.root / entry["relative_path"], Path(entry["target_path"]), Path(entry["backup_path"]), entry, source, ) entry["status"] = "replaced" else: entry["status"] = "skipped" except Exception as e: error = _install_error(entry["name"], e) entry["status"] = "failed" entry["error"] = error plan["errors"].append(error) plan["applied"] = not plan["errors"] return plan
[docs] def list_skills(*, agent: str, target_dir: Optional[Path | str] = None, source: Optional[SkillSource] = None) -> dict: """List available packaged skills and installed managed skills for an agent target.""" source = source or find_skill_source() target = resolve_agent_target_dir(agent, target_dir=target_dir) installed = [] conflicts = [] errors = [] available = source.manifest.get("skills", []) available_names = {skill["name"] for skill in available} if target.is_dir(): try: children = sorted(target.iterdir(), key=lambda p: p.name) except OSError as e: errors.append(_list_error(str(target), e)) children = [] for child in children: try: if child.name.startswith("."): continue child_stat = child.lstat() if stat.S_ISLNK(child_stat.st_mode): if child.name in available_names: conflicts.append(_target_symlink_conflict(child.name, child, child)) continue if not stat.S_ISDIR(child_stat.st_mode): continue except OSError as e: errors.append(_list_error(str(child), e)) continue install_manifest = _read_install_manifest(child) if install_manifest and install_manifest.get("managed_by") == "nvflare": installed.append( { "name": install_manifest.get("name", child.name), "skill_version": install_manifest.get("skill_version"), "source_hash": install_manifest.get("source_hash"), "target_path": str(child), "source_type": install_manifest.get("source_type"), } ) elif child.name in available_names: conflicts.append( { "skill": child.name, "code": "external_install_detected", "message": "target skill directory is not managed by nvflare", "target_path": str(child), } ) return { "agent": agent, "target_path": str(target), "source": _source_summary(source), "available": available, "installed": installed, "conflicts": conflicts, "errors": errors, }
def _install_plan( source: SkillSource, skills: list[dict], target: Path, *, agent: str, requested_skill: Optional[str] ) -> dict: planned_skills = [] conflicts = [] for skill in skills: source_skill_dir = source.root / skill["relative_path"] target_skill_dir = target / skill["name"] source_symlink = _first_symlink_in_tree(source_skill_dir) # The dry-run plan is advisory: source symlinks are checked again in # _stage_skill because the source tree could change before apply. # version_delta: new, unknown external state, blocked local edit, same, or update. entry = { "name": skill["name"], "skill_version": skill.get("skill_version"), "source_hash": skill["source_hash"], "relative_path": skill["relative_path"], "target_path": str(target_skill_dir), "files": ( [] if source_symlink else _files_to_copy(source_skill_dir, target_skill_dir, exclude_names=_copy_exclude_names(source)) ), "version_delta": "new", } if source_symlink: entry["action"] = "skip" entry["conflict"] = "source_symlink_detected" entry["version_delta"] = "blocked" entry["source_issue"] = { "code": "source_symlink_detected", "message": "source skill directory contains a symlink", "source_path": str(source_skill_dir), "symlink_path": str(source_symlink), } conflicts.append(_source_conflict(skill["name"], source_skill_dir, source_symlink)) elif not target_skill_dir.exists(): entry["action"] = "copy" else: target_symlink = _first_symlink_in_tree(target_skill_dir) if target_symlink: entry["action"] = "skip" entry["conflict"] = "target_symlink_detected" entry["version_delta"] = "blocked" entry["target_issue"] = { "code": "target_symlink_detected", "message": "target skill directory contains a symlink", "target_path": str(target_skill_dir), "symlink_path": str(target_symlink), } conflicts.append(_target_symlink_conflict(skill["name"], target_skill_dir, target_symlink)) else: install_manifest = _read_install_manifest(target_skill_dir) if not install_manifest or install_manifest.get("managed_by") != "nvflare": entry["action"] = "skip" entry["conflict"] = "external_install_detected" entry["version_delta"] = "unknown" conflicts.append(_conflict(skill["name"], "external_install_detected", target_skill_dir)) else: try: installed_source_hash = skill_tree_hash( target_skill_dir, exclude_names={INSTALL_MANIFEST_FILE_NAME} ) except (OSError, ValueError) as e: installed_source_hash = None entry["target_issue"] = { "code": "local_modifications_detected", "message": str(e), "error_type": type(e).__name__, "target_path": str(target_skill_dir), } if installed_source_hash != install_manifest.get("source_hash"): entry["action"] = "skip" entry["conflict"] = "local_modifications_detected" entry["version_delta"] = "blocked" conflicts.append(_conflict(skill["name"], "local_modifications_detected", target_skill_dir)) elif install_manifest.get("source_hash") == skill["source_hash"]: entry["action"] = "skip" entry["reason"] = "already_installed" entry["version_delta"] = "same" else: backup_path = _backup_path(target, skill["name"]) entry["action"] = "replace" entry["backup_path"] = str(backup_path) entry["version_delta"] = "update" planned_skills.append(entry) return { "agent": agent, "target_path": str(target), "requested_skill": requested_skill, "source": _source_summary(source), "available": source.manifest.get("skills", []), "skills": planned_skills, "conflicts": conflicts, "errors": [], "deprecated_skills_skipped": [], } def _copy_skill(source_dir: Path, target_dir: Path, plan_entry: dict, source: SkillSource) -> None: target_dir.parent.mkdir(parents=True, exist_ok=True) with _skill_install_lock(target_dir): with tempfile.TemporaryDirectory(prefix=f".{target_dir.name}.", dir=target_dir.parent) as temp_root: temp_skill_dir = Path(temp_root) / target_dir.name _stage_skill(source_dir, temp_skill_dir, plan_entry, source, installed_path=target_dir) _publish_staged_skill(temp_skill_dir, target_dir) def _replace_skill( source_dir: Path, target_dir: Path, backup_path: Path, plan_entry: dict, source: SkillSource ) -> None: target_dir.parent.mkdir(parents=True, exist_ok=True) with _skill_install_lock(target_dir): with tempfile.TemporaryDirectory(prefix=f".{target_dir.name}.", dir=target_dir.parent) as temp_root: temp_skill_dir = Path(temp_root) / target_dir.name _stage_skill(source_dir, temp_skill_dir, plan_entry, source, installed_path=target_dir) if not target_dir.exists(): raise FileNotFoundError(f"target skill directory no longer exists: {target_dir}") if backup_path.exists(): raise FileExistsError(f"backup skill directory already exists: {backup_path}") backup_path.parent.mkdir(parents=True, exist_ok=True) shutil.move(target_dir, backup_path) try: _publish_staged_skill(temp_skill_dir, target_dir) except Exception as publish_error: if not target_dir.exists() and backup_path.exists(): try: shutil.move(backup_path, target_dir) _remove_empty_dir(backup_path.parent) except Exception as recovery_error: publish_error.recovery_error = recovery_error raise def _sync_shared_references(source: SkillSource, target: Path) -> None: source_shared = source.root / SHARED_SKILL_REFERENCE_DIR if not source_shared.is_dir(): return exclude_names = _copy_exclude_names(source) source_hash = skill_tree_hash(source_shared, exclude_names=exclude_names) target_shared = target / SHARED_SKILL_REFERENCE_DIR plan_entry = {"name": SHARED_SKILL_REFERENCE_DIR, "source_hash": source_hash} if not target_shared.exists(): _copy_skill(source_shared, target_shared, plan_entry, source) return if target_shared.is_symlink(): raise ValueError(f"shared reference target must not be a symlink: {target_shared}") install_manifest = _read_install_manifest(target_shared) if not install_manifest or install_manifest.get("managed_by") != "nvflare": raise FileExistsError(f"shared reference target is not managed by nvflare: {target_shared}") installed_source_hash = skill_tree_hash(target_shared, exclude_names={INSTALL_MANIFEST_FILE_NAME, *exclude_names}) managed_source_hash = install_manifest.get("source_hash") if installed_source_hash != managed_source_hash: raise FileExistsError(f"shared reference target has local modifications: {target_shared}") if managed_source_hash == source_hash: return backup_path = _backup_path(target, target_shared.name) _replace_skill(source_shared, target_shared, backup_path, plan_entry, source) def _backup_path(target: Path, skill_name: str) -> Path: timestamp = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%S.%fZ") return target / ".nvflare_bak" / f"{timestamp}-{time.time_ns()}" / skill_name @contextmanager def _skill_install_lock(target_dir: Path): lock_dir = target_dir.parent / f".{target_dir.name}.install.lock" try: _create_lock_dir(lock_dir) except FileExistsError as e: if _lock_dir_is_stale(lock_dir): shutil.rmtree(lock_dir, ignore_errors=True) try: _create_lock_dir(lock_dir) except FileExistsError as retry_error: raise FileExistsError( f"target skill directory is already being installed: {target_dir}" ) from retry_error else: raise FileExistsError(f"target skill directory is already being installed: {target_dir}") from e try: yield finally: shutil.rmtree(lock_dir, ignore_errors=True) def _lock_ttl_seconds() -> int: value = os.environ.get("NVFLARE_AGENT_SKILL_INSTALL_LOCK_TTL_SECONDS") if value is None: return DEFAULT_INSTALL_LOCK_TTL_SECONDS try: return max(0, int(value)) except ValueError: return DEFAULT_INSTALL_LOCK_TTL_SECONDS def _lock_dir_is_stale(lock_dir: Path) -> bool: if lock_dir.is_symlink() or not lock_dir.is_dir(): return False ttl_seconds = _lock_ttl_seconds() if ttl_seconds <= 0: return False lock_started_at = _read_lock_started_at(lock_dir) if lock_started_at is not None: return time.time() - lock_started_at > ttl_seconds try: age_seconds = time.time() - lock_dir.stat().st_mtime except OSError: return False return age_seconds > ttl_seconds def _write_lock_timestamp(lock_dir: Path) -> None: (lock_dir / INSTALL_LOCK_TIMESTAMP_FILE_NAME).write_text(str(time.time_ns()), encoding="utf-8") def _create_lock_dir(lock_dir: Path) -> None: lock_dir.mkdir() try: _write_lock_timestamp(lock_dir) except Exception: shutil.rmtree(lock_dir, ignore_errors=True) raise def _read_lock_started_at(lock_dir: Path) -> float | None: timestamp_path = lock_dir / INSTALL_LOCK_TIMESTAMP_FILE_NAME try: text = timestamp_path.read_text(encoding="utf-8").strip() return int(text) / 1_000_000_000 except (OSError, ValueError): return None def _remove_empty_dir(path: Path) -> None: try: path.rmdir() except OSError: pass def _install_error(skill_name: str, error: Exception) -> dict: result = { "skill": skill_name, "code": "skill_install_failed", "type": type(error).__name__, "message": str(error), } recovery_error = getattr(error, "recovery_error", None) if recovery_error is not None: result["recovery_error"] = { "type": type(recovery_error).__name__, "message": str(recovery_error), } return result def _list_error(target_path: str, error: Exception) -> dict: return { "target": target_path, "code": "skill_list_failed", "type": type(error).__name__, "message": str(error), } def _publish_staged_skill(staged_dir: Path, target_dir: Path) -> None: if target_dir.exists(): raise FileExistsError(f"target skill directory already exists: {target_dir}") os.rename(staged_dir, target_dir) def _stage_skill( source_dir: Path, staged_dir: Path, plan_entry: dict, source: SkillSource, *, installed_path: Path ) -> None: symlink = _first_symlink_in_tree(source_dir) if symlink: raise ValueError(f"skill source must not contain symlinks: {symlink.relative_to(source_dir).as_posix()}") shutil.copytree(source_dir, staged_dir, ignore=shutil.ignore_patterns(*_copy_exclude_names(source))) manifest = { "schema_version": "1", "managed_by": "nvflare", "name": plan_entry["name"], "skill_version": plan_entry.get("skill_version"), "nvflare_version": nvflare.__version__, "source_type": source.source_type, "source_hash": plan_entry["source_hash"], "installed_paths": [str(installed_path)], "installed_at": datetime.now(timezone.utc).isoformat(), } (staged_dir / INSTALL_MANIFEST_FILE_NAME).write_text( json.dumps(manifest, indent=2, sort_keys=True) + "\n", encoding="utf-8" ) def _select_skills(manifest: dict, skill_name: Optional[str]) -> tuple[list[dict], list[str]]: skills = manifest.get("skills", []) if not skill_name: return skills, [] selected = [skill for skill in skills if skill.get("name") == skill_name] return selected, [] if selected else [skill_name] def _source_summary(source: SkillSource) -> dict: return { "type": source.source_type, "root": str(source.root), "nvflare_version": source.manifest.get("nvflare_version"), "manifest_schema_version": source.manifest.get("schema_version"), "skill_count": len(source.manifest.get("skills", [])), "findings": source.manifest.get("findings", []), } def _resolve_target_override(target_dir: Path | str) -> Path: target = Path(target_dir).expanduser() if any(part == ".." for part in target.parts): raise ValueError(f"agent skill target must not contain parent directory traversal: {target}") logical = Path(os.path.abspath(os.path.normpath(str(target)))) symlink = _first_disallowed_target_symlink_component(logical) if symlink is not None: raise ValueError(f"agent skill target must not contain symlink components: {symlink}") return target.resolve(strict=False) def _target_system_symlink_aliases() -> tuple[Path, ...]: aliases = {Path("/tmp")} for value in (os.environ.get("TMPDIR"), tempfile.gettempdir()): if value: aliases.add(Path(os.path.abspath(os.path.normpath(value)))) var_alias = Path("/var") if var_alias.is_symlink() and any(_path_is_relative_to(alias, var_alias) for alias in aliases): aliases.add(var_alias) return tuple(sorted(aliases, key=lambda item: str(item))) def _is_allowed_system_target_symlink(path: Path, *, target: Path) -> bool: for alias in _target_system_symlink_aliases(): if path == alias and _path_is_relative_to(target, alias): return True return False def _path_is_relative_to(path: Path, base: Path) -> bool: try: path.relative_to(base) except ValueError: return False return True def _first_disallowed_target_symlink_component(path: Path) -> Optional[Path]: absolute = path if path.is_absolute() else Path.cwd() / path current = Path(absolute.anchor) parts = absolute.parts[1:] for part in parts: if part in ("", "."): continue current = current / part if current.is_symlink(): if _is_allowed_system_target_symlink(current, target=absolute): continue return current return None def _first_symlink_in_tree(root_dir: Path) -> Optional[Path]: if root_dir.is_symlink(): return root_dir for root, dir_names, file_names in os.walk(root_dir, topdown=True, followlinks=False): root_path = Path(root) dir_names.sort() file_names.sort() for name in dir_names + file_names: path = root_path / name if path.is_symlink(): return path dir_names[:] = [name for name in dir_names if not (root_path / name).is_symlink()] return None def _copy_exclude_names(source: SkillSource) -> set[str]: if source.manifest.get("content_mode") == MANIFEST_CONTENT_MODE_RELEASE: return set(RELEASE_SKILL_FILE_EXCLUDE_NAMES) return set(IGNORED_SKILL_FILE_NAMES) def _files_to_copy(source_dir: Path, target_dir: Path, *, exclude_names: set[str]) -> list[dict]: files = [] for root, dir_names, file_names in os.walk(source_dir, topdown=True, followlinks=False): root_path = Path(root) dir_names.sort() file_names.sort() dir_names[:] = [name for name in dir_names if name not in exclude_names and not (root_path / name).is_symlink()] for file_name in file_names: if file_name in exclude_names: continue file_path = root_path / file_name if file_path.is_symlink(): continue if file_path.suffix in {".pyc", ".pyo"}: continue if not file_path.is_file(): continue rel_path = file_path.relative_to(source_dir) files.append({"source": str(file_path), "target": str(target_dir / rel_path)}) return files def _read_install_manifest(skill_dir: Path) -> Optional[dict]: manifest_path = skill_dir / INSTALL_MANIFEST_FILE_NAME if not manifest_path.is_file(): return None try: return json.loads(manifest_path.read_text(encoding="utf-8")) except (OSError, json.JSONDecodeError): return None def _conflict(skill_name: str, code: str, target_path: Path) -> dict: messages = { "external_install_detected": "target skill directory is not managed by nvflare", "local_modifications_detected": "managed skill content differs from its install manifest", } return { "skill": skill_name, "code": code, "message": messages.get(code, code), "target_path": str(target_path), } def _source_conflict(skill_name: str, source_path: Path, symlink_path: Path) -> dict: return { "skill": skill_name, "code": "source_symlink_detected", "message": "source skill directory contains a symlink", "source_path": str(source_path), "symlink_path": str(symlink_path), } def _target_symlink_conflict(skill_name: str, target_path: Path, symlink_path: Path) -> dict: return { "skill": skill_name, "code": "target_symlink_detected", "message": "target skill directory contains a symlink", "target_path": str(target_path), "symlink_path": str(symlink_path), } def _source_checkout_root() -> Optional[Path]: spec = util.find_spec("nvflare") if spec is None or not spec.submodule_search_locations: return None repo_root = Path(next(iter(spec.submodule_search_locations))).resolve().parent source_root = repo_root / "skills" if source_root.is_dir() and (repo_root / "pyproject.toml").is_file(): return source_root return None