Source code for nvflare.lighter.impl.static_file

# Copyright (c) 2021, NVIDIA CORPORATION.  All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import copy
import json
import os

import yaml

from nvflare.lighter import utils
from nvflare.lighter.spec import Builder


[docs]class StaticFileBuilder(Builder): def __init__( self, enable_byoc=False, config_folder="", scheme="grpc", app_validator="", download_job_url="", docker_image="", snapshot_persistor="", overseer_agent="", components="", ): """Build all static files from template. Uses the information from project.yml through project to go through the participants and write the contents of each file with the template, and replacing with the appropriate values from project.yml. Usually, two main categories of files are created in all FL participants, static and dynamic. Static files have similar contents among different participants, with small differences. For example, the differences in sub_start.sh are client name and python module. Those are basically static files. This builder uses template file and string replacement to generate those static files for each participant. Args: enable_byoc: for each participant, true to enable loading of code in the custom folder of applications config_folder: usually "config" app_validator: optional path to an app validator to verify that uploaded app has the expected structure docker_image: when docker_image is set to a docker image name, docker.sh will be generated on server/client/admin """ self.enable_byoc = enable_byoc self.config_folder = config_folder self.scheme = scheme self.docker_image = docker_image self.download_job_url = download_job_url self.app_validator = app_validator self.overseer_agent = overseer_agent self.snapshot_persistor = snapshot_persistor self.components = components
[docs] def get_server_name(self, server): return server.name
[docs] def get_overseer_name(self, overseer): return overseer.name
def _build_overseer(self, overseer, ctx): dest_dir = self.get_kit_dir(overseer, ctx) utils._write( os.path.join(dest_dir, "start.sh"), self.template["start_svr_sh"], "t", exe=True, ) protocol = overseer.props.get("protocol", "http") api_root = overseer.props.get("api_root", "/api/v1/") default_port = "443" if protocol == "https" else "80" port = overseer.props.get("port", default_port) replacement_dict = {"port": port, "hostname": self.get_overseer_name(overseer)} admins = self.project.get_participants_by_type("admin", first_only=False) privilege_dict = dict() for admin in admins: role = admin.props.get("role") if role in privilege_dict: privilege_dict[role].append(admin.subject) else: privilege_dict[role] = [admin.subject] utils._write( os.path.join(dest_dir, "privilege.yml"), yaml.dump(privilege_dict, Dumper=yaml.Dumper), "t", exe=False, ) if self.docker_image: utils._write( os.path.join(dest_dir, "docker.sh"), utils.sh_replace(self.template["docker_svr_sh"], replacement_dict), "t", exe=True, ) utils._write( os.path.join(dest_dir, "gunicorn.conf.py"), utils.sh_replace(self.template["gunicorn_conf_py"], replacement_dict), "t", exe=False, ) utils._write( os.path.join(dest_dir, "start.sh"), self.template["start_ovsr_sh"], "t", exe=True, ) if port: ctx["overseer_end_point"] = f"{protocol}://{self.get_overseer_name(overseer)}:{port}{api_root}" else: ctx["overseer_end_point"] = f"{protocol}://{self.get_overseer_name(overseer)}{api_root}" def _build_server(self, server, ctx): config = json.loads(self.template["fed_server"]) dest_dir = self.get_kit_dir(server, ctx) server_0 = config["servers"][0] server_0["name"] = self.project_name admin_port = server.props.get("admin_port", 8003) ctx["admin_port"] = admin_port fed_learn_port = server.props.get("fed_learn_port", 8002) ctx["fed_learn_port"] = fed_learn_port ctx["server_name"] = self.get_server_name(server) server_0["service"]["target"] = f"{self.get_server_name(server)}:{fed_learn_port}" server_0["service"]["scheme"] = self.scheme server_0["admin_host"] = self.get_server_name(server) server_0["admin_port"] = admin_port if self.overseer_agent: overseer_agent = copy.deepcopy(self.overseer_agent) if overseer_agent.get("overseer_exists", True): overseer_agent["args"] = { "role": "server", "overseer_end_point": ctx.get("overseer_end_point", ""), "project": self.project_name, "name": self.get_server_name(server), "fl_port": str(fed_learn_port), "admin_port": str(admin_port), } overseer_agent.pop("overseer_exists", None) config["overseer_agent"] = overseer_agent utils._write(os.path.join(dest_dir, "fed_server.json"), json.dumps(config, indent=2), "t") replacement_dict = { "admin_port": admin_port, "fed_learn_port": fed_learn_port, "config_folder": self.config_folder, "docker_image": self.docker_image, "org_name": server.org, "type": "server", "cln_uid": "", } if self.docker_image: utils._write( os.path.join(dest_dir, "docker.sh"), utils.sh_replace(self.template["docker_svr_sh"], replacement_dict), "t", exe=True, ) utils._write( os.path.join(dest_dir, "start.sh"), self.template["start_svr_sh"], "t", exe=True, ) utils._write( os.path.join(dest_dir, "sub_start.sh"), utils.sh_replace(self.template["sub_start_sh"], replacement_dict), "t", exe=True, ) utils._write( os.path.join(dest_dir, "stop_fl.sh"), self.template["stop_fl_sh"], "t", exe=True, ) # local folder creation dest_dir = self.get_local_dir(server, ctx) utils._write( os.path.join(dest_dir, "log.config.default"), self.template["log_config"], "t", ) utils._write( os.path.join(dest_dir, "resources.json.default"), self.template["local_server_resources"], "t", ) utils._write( os.path.join(dest_dir, "privacy.json.sample"), self.template["sample_privacy"], "t", ) utils._write( os.path.join(dest_dir, "authorization.json.default"), self.template["default_authz"], "t", ) # workspace folder file utils._write( os.path.join(self.get_ws_dir(server, ctx), "readme.txt"), self.template["readme_fs"], "t", ) def _build_client(self, client, ctx): config = json.loads(self.template["fed_client"]) dest_dir = self.get_kit_dir(client, ctx) fed_learn_port = ctx.get("fed_learn_port") server_name = ctx.get("server_name") # config["servers"][0]["service"]["target"] = f"{server_name}:{fed_learn_port}" config["servers"][0]["service"]["scheme"] = self.scheme config["servers"][0]["name"] = self.project_name # config["enable_byoc"] = client.enable_byoc replacement_dict = { "client_name": f"{client.subject}", "config_folder": self.config_folder, "docker_image": self.docker_image, "org_name": client.org, "type": "client", "cln_uid": f"uid={client.subject}", } if self.overseer_agent: overseer_agent = copy.deepcopy(self.overseer_agent) if overseer_agent.get("overseer_exists", True): overseer_agent["args"] = { "role": "client", "overseer_end_point": ctx.get("overseer_end_point", ""), "project": self.project_name, "name": client.subject, } overseer_agent.pop("overseer_exists", None) config["overseer_agent"] = overseer_agent # components = client.props.get("components", []) # config["components"] = list() # for comp in components: # temp_dict = {"id": comp} # temp_dict.update(components[comp]) # config["components"].append(temp_dict) utils._write(os.path.join(dest_dir, "fed_client.json"), json.dumps(config, indent=2), "t") if self.docker_image: utils._write( os.path.join(dest_dir, "docker.sh"), utils.sh_replace(self.template["docker_cln_sh"], replacement_dict), "t", exe=True, ) utils._write( os.path.join(dest_dir, "start.sh"), self.template["start_cln_sh"], "t", exe=True, ) utils._write( os.path.join(dest_dir, "sub_start.sh"), utils.sh_replace(self.template["sub_start_sh"], replacement_dict), "t", exe=True, ) utils._write( os.path.join(dest_dir, "stop_fl.sh"), self.template["stop_fl_sh"], "t", exe=True, ) # local folder creation dest_dir = self.get_local_dir(client, ctx) utils._write( os.path.join(dest_dir, "log.config.default"), self.template["log_config"], "t", ) utils._write( os.path.join(dest_dir, "resources.json.default"), self.template["local_client_resources"], "t", ) utils._write( os.path.join(dest_dir, "privacy.json.sample"), self.template["sample_privacy"], "t", ) utils._write( os.path.join(dest_dir, "authorization.json.default"), self.template["default_authz"], "t", ) # workspace folder file utils._write( os.path.join(self.get_ws_dir(client, ctx), "readme.txt"), self.template["readme_fc"], "t", ) def _build_admin(self, admin, ctx): dest_dir = self.get_kit_dir(admin, ctx) admin_port = ctx.get("admin_port") server_name = ctx.get("server_name") replacement_dict = { "cn": f"{server_name}", "admin_port": f"{admin_port}", "docker_image": self.docker_image, } config = self.prepare_admin_config(admin, ctx) utils._write(os.path.join(dest_dir, "fed_admin.json"), json.dumps(config, indent=2), "t") if self.docker_image: utils._write( os.path.join(dest_dir, "docker.sh"), utils.sh_replace(self.template["docker_adm_sh"], replacement_dict), "t", exe=True, ) utils._write( os.path.join(dest_dir, "fl_admin.sh"), utils.sh_replace(self.template["fl_admin_sh"], replacement_dict), "t", exe=True, ) utils._write( os.path.join(dest_dir, "readme.txt"), self.template["readme_am"], "t", )
[docs] def prepare_admin_config(self, admin, ctx): config = json.loads(self.template["fed_admin"]) agent_config = dict() if self.overseer_agent: overseer_agent = copy.deepcopy(self.overseer_agent) if overseer_agent.get("overseer_exists", True): overseer_agent["args"] = { "role": "admin", "overseer_end_point": ctx.get("overseer_end_point", ""), "project": self.project_name, "name": admin.subject, } overseer_agent.pop("overseer_exists", None) agent_config["overseer_agent"] = overseer_agent config["admin"].update(agent_config) return config
[docs] def build(self, project, ctx): self.template = ctx.get("template") self.project_name = project.name self.project = project overseer = project.get_participants_by_type("overseer") if overseer: self._build_overseer(overseer, ctx) servers = project.get_participants_by_type("server", first_only=False) for server in servers: self._build_server(server, ctx) for client in project.get_participants_by_type("client", first_only=False): self._build_client(client, ctx) for admin in project.get_participants_by_type("admin", first_only=False): self._build_admin(admin, ctx)