Source code for nvflare.private.fed.server.job_meta_validator
# Copyright (c) 2022, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import collections
import logging
from io import BytesIO
from typing import Optional, Set, Tuple
from zipfile import ZipFile
from nvflare.apis.app_validation import AppValidationKey
from nvflare.apis.fl_constant import JobConstants
from nvflare.apis.job_def import ALL_SITES, SERVER_SITE_NAME, JobMetaKey
from nvflare.apis.job_meta_validator_spec import JobMetaValidatorSpec
from nvflare.fuel.utils.config import ConfigFormat
from nvflare.fuel.utils.config_factory import ConfigFactory
from nvflare.security.logging import secure_format_exception
CONFIG_FOLDER = "/config/"
CUSTOM_FOLDER = "/custom/"
MAX_CLIENTS = 1000000
logger = logging.getLogger(__name__)
[docs]class JobMetaValidator(JobMetaValidatorSpec):
"""Job validator"""
def _validate_zf(self, job_name, zf):
meta = self._validate_meta(job_name, zf)
site_list = self._validate_deploy_map(job_name, meta)
self._validate_app(job_name, meta, zf)
clients = self._get_all_clients(site_list)
self._validate_min_clients(job_name, meta, clients)
self._validate_resource(job_name, meta)
self._validate_mandatory_clients(job_name, meta, clients)
return meta
[docs] def validate(self, job_name: str, job_data: bytes) -> Tuple[bool, str, dict]:
"""Validate job
Args:
job_name (str): Job name
job_data (bytes): Job ZIP data
Returns:
Tuple[bool, str, dict]: (is_valid, error_message, meta)
"""
meta = {}
try:
if isinstance(job_data, bytes):
with ZipFile(BytesIO(job_data), "r") as zf:
meta = self._validate_zf(job_name, zf)
elif isinstance(job_data, str):
# job_data is a file name
with ZipFile(job_data, "r") as zf:
meta = self._validate_zf(job_name, zf)
else:
raise TypeError(f"job_data must be bytes or str but got {type(job_data)}")
except ValueError as e:
return False, str(e), meta
return True, "", meta
@staticmethod
def _validate_meta(job_name: str, zf: ZipFile) -> Optional[dict]:
base_meta_file = f"{job_name}/{JobConstants.META}"
logger.debug(f"validate file {base_meta_file}.[json|conf|yml] exists for job {job_name}")
meta = None
for ext, fmt in ConfigFormat.config_ext_formats().items():
meta_file = f"{base_meta_file}{ext}"
if meta_file in zf.namelist():
config_loader = ConfigFactory.get_config_loader(fmt)
meta_data = zf.read(meta_file)
meta = config_loader.load_config_from_str(meta_data.decode()).to_dict()
break
return meta
@staticmethod
def _validate_deploy_map(job_name: str, meta: dict) -> list:
if not meta:
raise ValueError(
f"{JobConstants.META}.[json|conf|yml] not existing for job {job_name}, possible in legacy job format. Please upgrade the job structure."
)
deploy_map = meta.get(JobMetaKey.DEPLOY_MAP.value)
if not deploy_map:
raise ValueError(f"deploy_map is empty for job {job_name}")
site_list = [site for deployments in deploy_map.values() for site in deployments]
if not site_list:
raise ValueError(f"No site is specified in deploy_map for job {job_name}")
if ALL_SITES.casefold() in (site.casefold() for site in site_list):
# if ALL_SITES is specified, no other site can be in the list
if len(site_list) > 1:
raise ValueError(f"No other site can be specified if {ALL_SITES} is used for job {job_name}")
else:
site_list = [ALL_SITES]
elif SERVER_SITE_NAME not in site_list:
raise ValueError(f"Missing server site in deploy_map for job {job_name}")
else:
duplicates = [site for site, count in collections.Counter(site_list).items() if count > 1]
if duplicates:
raise ValueError(f"Multiple apps to be deployed to following sites {duplicates} for job {job_name}")
return site_list
def _validate_app(self, job_name: str, meta: dict, zip_file: ZipFile) -> None:
deploy_map = meta.get(JobMetaKey.DEPLOY_MAP.value)
has_byoc = False
for app, deployments in deploy_map.items():
config_folder = job_name + "/" + app + CONFIG_FOLDER
if not self._entry_exists(zip_file, config_folder):
logger.debug(f"zip folder {config_folder} missing. Files in the zip:")
for x in zip_file.namelist():
logger.debug(f" {x}")
raise ValueError(f"App '{app}' in deploy_map doesn't exist for job {job_name}")
all_sites = ALL_SITES.casefold() in (site.casefold() for site in deployments)
if (all_sites or SERVER_SITE_NAME in deployments) and not self._config_exists(
zip_file, config_folder, JobConstants.SERVER_JOB_CONFIG
):
raise ValueError(f"App '{app}' will be deployed to server but server config is missing")
if (all_sites or [site for site in deployments if site != SERVER_SITE_NAME]) and not self._config_exists(
zip_file, config_folder, JobConstants.CLIENT_JOB_CONFIG
):
raise ValueError(f"App '{app}' will be deployed to client but client config is missing")
custom_folder = job_name + "/" + app + CUSTOM_FOLDER
if self._entry_exists(zip_file, custom_folder):
has_byoc = True
if has_byoc:
meta[AppValidationKey.BYOC] = True
@staticmethod
def _convert_value_to_int(v) -> int:
if isinstance(v, int):
return v
else:
try:
v = int(v)
return v
except ValueError as e:
raise ValueError(f"invalid data type for {v},can't not convert to Int", secure_format_exception(e))
except TypeError as e:
raise ValueError(f"invalid data type for {v},can't not convert to Int", secure_format_exception(e))
def _validate_min_clients(self, job_name: str, meta: dict, clients: set) -> None:
logger.debug(f"validate min_clients for job {job_name}")
value = meta.get(JobMetaKey.MIN_CLIENTS)
if value is not None:
min_clients = self._convert_value_to_int(value)
if min_clients <= 0:
raise ValueError(f"min_clients {min_clients} must be positive for job {job_name}")
elif min_clients > MAX_CLIENTS:
raise ValueError(f"min_clients {min_clients} must be less than {MAX_CLIENTS} for job {job_name}")
if next(iter(clients)) != ALL_SITES and len(clients) < min_clients:
raise ValueError(f"min {min_clients} clients required for job {job_name}, found {len(clients)}.")
@staticmethod
def _validate_mandatory_clients(job_name: str, meta: dict, clients: set) -> None:
logger.debug(f" validate mandatory_clients for job {job_name}")
if next(iter(clients)) != ALL_SITES:
# Validating mandatory clients are deployed
mandatory_clients = meta.get(JobMetaKey.MANDATORY_CLIENTS)
if mandatory_clients:
mandatory_set = set(mandatory_clients)
if not mandatory_set.issubset(clients):
diff = mandatory_set - clients
raise ValueError(f"Mandatory clients {diff} are not in the deploy_map for job {job_name}")
@staticmethod
def _validate_resource(job_name: str, meta: dict) -> None:
logger.debug(f"validate resource for job {job_name}")
resource_spec = meta.get(JobMetaKey.RESOURCE_SPEC.value)
if resource_spec and not isinstance(resource_spec, dict):
raise ValueError(f"Invalid resource_spec for job {job_name}")
if not resource_spec:
logger.debug("empty resource spec provided")
if resource_spec:
for k in resource_spec:
if resource_spec[k] and not isinstance(resource_spec[k], dict):
raise ValueError(f"value for key {k} in resource spec is expecting a dictionary")
@staticmethod
def _get_all_clients(site_list: Optional[list]) -> Set[str]:
if site_list[0] == ALL_SITES:
return {ALL_SITES}
return set([site for site in site_list if site != SERVER_SITE_NAME])
@staticmethod
def _entry_exists(zip_file: ZipFile, path: str) -> bool:
try:
zip_file.getinfo(path)
return True
except KeyError:
return False
@staticmethod
def _config_exists(zip_file: ZipFile, zip_folder, init_config_path: str) -> bool:
def match(parent: ZipFile, config_path: str) -> bool:
import os
full_path = os.path.join(zip_folder, config_path)
return full_path in parent.namelist()
return ConfigFactory.match_config(zip_file, init_config_path, match)