Source code for nvflare.apis.workspace

# Copyright (c) 2021, NVIDIA CORPORATION.  All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import glob
import os
from typing import List, Union

from nvflare.apis.fl_constant import WorkspaceConstants


[docs] class Workspace: def __init__(self, root_dir: str, site_name: str = "", config_folder: str = "config"): """Define a workspace. NOTE:: Example of client workspace folder structure: Workspace ROOT local authorization.json.default resources.json.default custom/ custom python code ... startup (optional) provisioned content fed_client.json run_1 app config (required) configurations custom (optional) custom python code other_folder (app defined) log.txt job_meta.json ... Args: root_dir: root directory of the workspace site_name: site name of the workspace config_folder: where to find required config inside an app """ self.root_dir = root_dir self.site_name = site_name self.config_folder = config_folder # check to make sure the workspace is valid if not os.path.isdir(root_dir): raise ValueError(f"invalid workspace {root_dir}: it does not exist or not a valid dir") startup_dir = self.get_startup_kit_dir() if not os.path.isdir(startup_dir): raise RuntimeError( f"invalid workspace {root_dir}: missing startup folder '{startup_dir}' or not a valid dir" ) site_dir = self.get_site_config_dir() if not os.path.isdir(site_dir): raise RuntimeError( f"invalid workspace {root_dir}: missing site config folder '{site_dir}' or not a valid dir" ) # check env vars for other roots (Result, Log, Audit) self.result_root = self._setup_root(WorkspaceConstants.ENV_VAR_RESULT_ROOT) self.audit_root = self._setup_root(WorkspaceConstants.ENV_VAR_AUDIT_ROOT) self.log_root = self._setup_root(WorkspaceConstants.ENV_VAR_LOG_ROOT) # determine defaults if not self.log_root: if self.audit_root: # if audit root is defined, use it for logging too since they are all for output only self.log_root = self.audit_root else: # otherwise we use the result root self.log_root = self.result_root if not self.audit_root: if self.log_root: # if log root is defined, use it for auditing too since they are all for output only self.audit_root = self.log_root else: # otherwise we use the result root self.audit_root = self.result_root def _setup_root(self, env_var: str): root = os.environ.get(env_var) if root: os.makedirs(root, exist_ok=True) if self.site_name: # create the site folder os.makedirs(os.path.join(root, self.site_name), exist_ok=True) return root else: return None def _fallback_path(self, file_names: [str]): for n in file_names: f = self.get_file_path_in_site_config(n) if os.path.exists(f): return f return None
[docs] def get_authorization_file_path(self): return self._fallback_path( [WorkspaceConstants.AUTHORIZATION_CONFIG, WorkspaceConstants.DEFAULT_AUTHORIZATION_CONFIG] )
[docs] def get_resources_file_path(self): return self._fallback_path([WorkspaceConstants.RESOURCES_CONFIG, WorkspaceConstants.DEFAULT_RESOURCES_CONFIG])
[docs] def get_job_resources_file_path(self): return self.get_file_path_in_site_config(WorkspaceConstants.JOB_RESOURCES_CONFIG)
[docs] def get_log_config_file_path(self): return self._fallback_path([WorkspaceConstants.LOGGING_CONFIG, WorkspaceConstants.DEFAULT_LOGGING_CONFIG])
[docs] def get_file_path_in_site_config(self, file_basename: Union[str, List[str]]): if isinstance(file_basename, str): return os.path.join(self.get_site_config_dir(), file_basename) elif isinstance(file_basename, list): return self._fallback_path(file_basename) else: raise ValueError(f"invalid file_basename '{file_basename}': must be str or List[str]")
[docs] def get_file_path_in_startup(self, file_basename: str): return os.path.join(self.get_startup_kit_dir(), file_basename)
[docs] def get_file_path_in_root(self, file_basename: str): return os.path.join(self.root_dir, file_basename)
[docs] def get_server_startup_file_path(self): # this is to get the full path to "fed_server.json" return self.get_file_path_in_startup(WorkspaceConstants.SERVER_STARTUP_CONFIG)
[docs] def get_server_app_config_file_path(self, job_id): return os.path.join(self.get_app_config_dir(job_id), WorkspaceConstants.SERVER_APP_CONFIG)
[docs] def get_client_app_config_file_path(self, job_id): return os.path.join(self.get_app_config_dir(job_id), WorkspaceConstants.CLIENT_APP_CONFIG)
[docs] def get_client_startup_file_path(self): # this is to get the full path to "fed_client.json" return self.get_file_path_in_startup(WorkspaceConstants.CLIENT_STARTUP_CONFIG)
[docs] def get_admin_startup_file_path(self): # this is to get the full path to "fed_admin.json" return self.get_file_path_in_startup(WorkspaceConstants.ADMIN_STARTUP_CONFIG)
[docs] def get_site_config_dir(self) -> str: return os.path.join(self.root_dir, WorkspaceConstants.SITE_FOLDER_NAME)
[docs] def get_site_custom_dir(self) -> str: return os.path.join(self.get_site_config_dir(), WorkspaceConstants.CUSTOM_FOLDER_NAME)
[docs] def get_startup_kit_dir(self) -> str: return os.path.join(self.root_dir, WorkspaceConstants.STARTUP_FOLDER_NAME)
[docs] def get_audit_root(self, job_id=None) -> str: if self.audit_root: return self._get_site_root_dir(self.audit_root, job_id) elif job_id: return self.get_run_dir(job_id) else: return self.root_dir
[docs] def get_audit_file_path(self, job_id=None) -> str: return os.path.join(self.get_audit_root(job_id), WorkspaceConstants.AUDIT_LOG)
def _get_site_root_dir(self, root, job_id=None): if job_id: site_root_dir = os.path.join(root, self.site_name, job_id) if not os.path.exists(site_root_dir): os.makedirs(site_root_dir, exist_ok=True) return site_root_dir else: return os.path.join(root, self.site_name)
[docs] def get_log_root(self, job_id=None) -> str: if self.log_root: return self._get_site_root_dir(self.log_root, job_id) elif job_id: return self.get_run_dir(job_id) else: return self.root_dir
[docs] def get_root_dir(self) -> str: return self.root_dir
[docs] def get_run_dir(self, job_id: str) -> str: return os.path.join(self.root_dir, WorkspaceConstants.WORKSPACE_PREFIX + str(job_id))
[docs] def get_app_dir(self, job_id: str) -> str: return os.path.join(self.get_run_dir(job_id), WorkspaceConstants.APP_PREFIX + self.site_name)
def _get_any_app_log_file_path(self, job_id: str, file_name: str): return os.path.join(self.get_log_root(job_id), file_name)
[docs] def get_app_error_log_file_path(self, job_id: str) -> str: return self._get_any_app_log_file_path(job_id, WorkspaceConstants.ERROR_LOG_FILE_NAME)
[docs] def get_app_config_dir(self, job_id: str) -> str: return os.path.join(self.get_app_dir(job_id), self.config_folder)
[docs] def get_app_custom_dir(self, job_id: str) -> str: return os.path.join(self.get_app_dir(job_id), WorkspaceConstants.CUSTOM_FOLDER_NAME)
[docs] def get_job_meta_path(self, job_id: str) -> str: return os.path.join(self.get_run_dir(job_id), WorkspaceConstants.JOB_META_FILE)
[docs] def get_site_privacy_file_path(self): return self.get_file_path_in_site_config(WorkspaceConstants.PRIVACY_CONFIG)
[docs] def get_client_custom_dir(self) -> str: return os.path.join(self.get_site_config_dir(), WorkspaceConstants.CUSTOM_FOLDER_NAME)
[docs] def get_stats_pool_summary_path(self, job_id: str, prefix=None) -> str: file_name = WorkspaceConstants.STATS_POOL_SUMMARY_FILE_NAME if prefix: file_name = f"{prefix}.{file_name}" return self._get_any_app_log_file_path(job_id, file_name)
[docs] def get_stats_pool_records_path(self, job_id: str, prefix=None) -> str: file_name = WorkspaceConstants.STATS_POOL_RECORDS_FILE_NAME if prefix: file_name = f"{prefix}.{file_name}" return self._get_any_app_log_file_path(job_id, file_name)
[docs] def get_result_root(self, job_id: str): if self.result_root: return self._get_site_root_dir(self.result_root, job_id) else: return self.get_run_dir(job_id)
[docs] def get_config_files_for_startup(self, is_server: bool, for_job: bool) -> list: """Get all config files to be used for startup of the process (SP, SJ, CP, CJ). We first get required config files: - the startup file (fed_server.json or fed_client.json) in "startup" folder - resource file (resources.json.default or resources.json) in "local" folder We then try to get resources files (usually generated by different builders of the Provision system): - resources files from the "startup" folder take precedence - resources files from the "local" folder are next These extra resource config files must be json and follow the following patterns: - *__resources.json: these files are for both parent process and job processes - *__p_resources.json: these files are for parent process only - *__j_resources.json: these files are for job process only Args: is_server: whether this is for server site or client site for_job: whether this is for job process or parent process Returns: a list of config file names """ if is_server: startup_file_path = self.get_server_startup_file_path() else: startup_file_path = self.get_client_startup_file_path() resource_config_path = self.get_resources_file_path() config_files = [startup_file_path, resource_config_path] if for_job: # this is for job process job_resources_file_path = self.get_job_resources_file_path() if os.path.exists(job_resources_file_path): config_files.append(job_resources_file_path) # add other resource config files patterns = [WorkspaceConstants.RESOURCE_FILE_NAME_PATTERN] if for_job: patterns.append(WorkspaceConstants.JOB_RESOURCE_FILE_NAME_PATTERN) else: patterns.append(WorkspaceConstants.PARENT_RESOURCE_FILE_NAME_PATTERN) # add startup files first, then local files self._add_resource_files(self.get_startup_kit_dir(), config_files, patterns) self._add_resource_files(self.get_site_config_dir(), config_files, patterns) return config_files
@staticmethod def _add_resource_files(from_dir: str, to_list: list, patterns: [str]): for p in patterns: files = glob.glob(os.path.join(from_dir, p)) if files: to_list.extend(files)