Source code for nvflare.app_common.widgets.component_path_authorizer

# Copyright (c) 2026, NVIDIA CORPORATION.  All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import json
import os
import threading
from typing import Optional

from nvflare.apis.app_validation import AppValidationKey
from nvflare.apis.event_type import EventType
from nvflare.apis.fl_constant import FLContextKey, SystemConfigs
from nvflare.apis.fl_context import FLContext
from nvflare.apis.fl_exception import UnsafeComponentError
from nvflare.fuel.utils.config_service import ConfigService
from nvflare.widgets.widget import Widget

CLASS_ALLOW_LIST = "class_allow_list"


[docs] class ComponentPathAuthorizer(Widget): def __init__(self): """Allows component builds by path prefixes configured in site resources.""" super().__init__() self._allow_list_cache = {} self._allow_list_cache_lock = threading.Lock()
[docs] def handle_event(self, event_type: str, fl_ctx: FLContext): if event_type != EventType.BEFORE_BUILD_COMPONENT: return if self._job_has_byoc(fl_ctx): return component_config = fl_ctx.get_prop(FLContextKey.COMPONENT_CONFIG) node = fl_ctx.get_prop(FLContextKey.COMPONENT_NODE) self.authorize_component_config(component_config, node, fl_ctx=fl_ctx)
[docs] def authorize_component_config( self, component_config, node=None, fl_ctx: Optional[FLContext] = None, workspace=None ): if self._job_has_byoc(fl_ctx): return component_path = self._get_component_path(component_config) if component_path is None: return allow_list = self._get_allow_list(fl_ctx=fl_ctx, workspace=workspace) if not any(self._path_matches_prefix(component_path, prefix) for prefix in allow_list): node_path = node.path() if node else "" raise UnsafeComponentError( f"Component '{component_path}' at config path '{node_path}' is not in allow_list" )
def _get_allow_list(self, fl_ctx: Optional[FLContext] = None, workspace=None): resources_file = self._get_resources_file_path(fl_ctx=fl_ctx, workspace=workspace) if resources_file: return self._get_allow_list_from_file(resources_file) resources = ConfigService.get_section(SystemConfigs.RESOURCES_CONF) return self._get_allow_list_from_resources(resources) def _get_allow_list_from_file(self, resources_file): cache_key = os.path.abspath(resources_file) with self._allow_list_cache_lock: stat_result = os.stat(cache_key) cache_signature = self._make_file_signature(stat_result) cached = self._allow_list_cache.get(cache_key) if cached and cached[0] == cache_signature: return cached[1] with open(cache_key, "rt") as f: resources = json.load(f) cache_signature = self._make_file_signature(os.fstat(f.fileno())) allow_list = self._get_allow_list_from_resources(resources) self._allow_list_cache[cache_key] = (cache_signature, allow_list) return allow_list @staticmethod def _make_file_signature(stat_result): return (stat_result.st_mtime_ns, stat_result.st_size, stat_result.st_ino, stat_result.st_dev) @classmethod def _get_allow_list_from_resources(cls, resources): if not isinstance(resources, dict) or CLASS_ALLOW_LIST not in resources: raise UnsafeComponentError( f"{CLASS_ALLOW_LIST} is not configured in resources.json or resources.json.default. " f"Non-BYOC jobs require a top-level {CLASS_ALLOW_LIST}; add allowed class path prefixes " 'such as "nvflare." to site resources, or enable BYOC for jobs that load custom code.' ) allow_list = resources.get(CLASS_ALLOW_LIST) if not isinstance(allow_list, list): raise UnsafeComponentError(f"{CLASS_ALLOW_LIST} must be list but got {type(allow_list)}") try: return cls._normalize_allow_list(allow_list) except (TypeError, ValueError) as ex: raise UnsafeComponentError(str(ex)) @staticmethod def _get_resources_file_path(fl_ctx: Optional[FLContext] = None, workspace=None): if fl_ctx and workspace is None: workspace = fl_ctx.get_prop(FLContextKey.WORKSPACE_OBJECT) if workspace and hasattr(workspace, "get_resources_file_path"): resources_file = workspace.get_resources_file_path() if resources_file and os.path.exists(resources_file): return resources_file return None @staticmethod def _job_has_byoc(fl_ctx: Optional[FLContext]): if not fl_ctx: return False job_meta = fl_ctx.get_prop(FLContextKey.JOB_META) if not isinstance(job_meta, dict): return False return bool(job_meta.get(AppValidationKey.BYOC)) @classmethod def _normalize_allow_list(cls, allow_list): result = [] for prefix in allow_list: prefix = cls._validate_prefix(prefix) if prefix not in result: result.append(prefix) return result @staticmethod def _validate_prefix(prefix: str): if not isinstance(prefix, str): raise TypeError(f"allow_list entries must be str but got {type(prefix)}") if not prefix: raise ValueError("allow_list entries must not be empty") if prefix != prefix.strip(): raise ValueError(f"allow_list entry '{prefix}' must not contain leading or trailing whitespace") if prefix.endswith("."): prefix_body = prefix[:-1] if not prefix_body: raise ValueError("allow_list entries must not be empty") else: prefix_body = prefix if "." not in prefix_body: raise ValueError( f"allow_list entry '{prefix}' must end with '.' for package prefixes " "or be a fully qualified dotted path" ) if any(not part for part in prefix_body.split(".")): raise ValueError(f"allow_list entry '{prefix}' is not a valid dotted path prefix") return prefix @staticmethod def _path_matches_prefix(component_path: str, prefix: str): if component_path == prefix: return True if prefix.endswith("."): return component_path.startswith(prefix) return component_path.startswith(f"{prefix}.") @staticmethod def _get_component_path(component_config): if not isinstance(component_config, dict): raise UnsafeComponentError(f"Component config must be dict but got {type(component_config)}") if "name" in component_config: raise UnsafeComponentError("Component config must use path or class_path; name is not allowed") if "path" in component_config: component_path = component_config["path"] key = "path" elif "class_path" in component_config: component_path = component_config["class_path"] key = "class_path" else: raise UnsafeComponentError("Component config must specify path or class_path") if not isinstance(component_path, str): raise UnsafeComponentError(f"Component {key} must be str but got {type(component_path)}") if not component_path: raise UnsafeComponentError(f"Component {key} must not be empty") return component_path