# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import copy
import os
import sys
from nvflare.apis.fl_constant import FLContextKey, JobConstants, SystemVarName
from nvflare.apis.job_def import JobMetaKey
from nvflare.apis.job_launcher_spec import JobProcessArgs
def _job_args_str(job_args, arg_names) -> str:
result = ""
sep = ""
for name in arg_names:
e = job_args.get(name)
if not e:
continue
n, v = e
result += f"{sep}{n} {v}"
sep = " "
return result
[docs]
def get_client_job_args(include_exe_module=True, include_set_options=True):
result = []
if include_exe_module:
result.append(JobProcessArgs.EXE_MODULE)
result.extend(
[
JobProcessArgs.WORKSPACE,
JobProcessArgs.STARTUP_DIR,
JobProcessArgs.AUTH_TOKEN,
JobProcessArgs.TOKEN_SIGNATURE,
JobProcessArgs.SSID,
JobProcessArgs.JOB_ID,
JobProcessArgs.CLIENT_NAME,
JobProcessArgs.PARENT_URL,
JobProcessArgs.PARENT_CONN_SEC,
JobProcessArgs.TARGET,
JobProcessArgs.SCHEME,
JobProcessArgs.STARTUP_CONFIG_FILE,
]
)
if include_set_options:
result.append(JobProcessArgs.OPTIONS)
return result
[docs]
def generate_client_command(fl_ctx) -> str:
job_args = fl_ctx.get_prop(FLContextKey.JOB_PROCESS_ARGS)
if not job_args:
raise RuntimeError(f"missing {FLContextKey.JOB_PROCESS_ARGS} in FLContext")
args_str = _job_args_str(job_args, get_client_job_args())
return f"{sys.executable} {args_str}"
[docs]
def get_server_job_args(include_exe_module=True, include_set_options=True):
result = []
if include_exe_module:
result.append(JobProcessArgs.EXE_MODULE)
result.extend(
[
JobProcessArgs.WORKSPACE,
JobProcessArgs.STARTUP_CONFIG_FILE,
JobProcessArgs.APP_ROOT,
JobProcessArgs.JOB_ID,
JobProcessArgs.TOKEN_SIGNATURE,
JobProcessArgs.PARENT_URL,
JobProcessArgs.ROOT_URL,
JobProcessArgs.SERVICE_HOST,
JobProcessArgs.SERVICE_PORT,
JobProcessArgs.SSID,
JobProcessArgs.HA_MODE,
]
)
if include_set_options:
result.append(JobProcessArgs.OPTIONS)
return result
[docs]
def generate_server_command(fl_ctx) -> str:
job_args = fl_ctx.get_prop(FLContextKey.JOB_PROCESS_ARGS)
if not job_args:
raise RuntimeError(f"missing {FLContextKey.JOB_PROCESS_ARGS} in FLContext!")
args_str = _job_args_str(job_args, get_server_job_args())
return f"{sys.executable} {args_str}"
[docs]
def add_custom_dir_to_path(app_custom_folder, new_env):
"""Util method to add app_custom_folder into the sys.path and carry into the child process."""
sys_path = copy.copy(sys.path)
sys_path.append(app_custom_folder)
new_env[SystemVarName.PYTHONPATH] = os.pathsep.join(sys_path)