# Copyright (c) 2022, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import datetime
import json
import logging
import shutil
import uuid
from typing import Dict, List
import nvflare.fuel.hci.file_transfer_defs as ftd
from nvflare.apis.client import Client
from nvflare.apis.event_type import EventType
from nvflare.apis.fl_constant import AdminCommandNames, FLContextKey, RunProcessKey
from nvflare.apis.job_def import Job, JobMetaKey, is_valid_job_id
from nvflare.apis.job_def_manager_spec import JobDefManagerSpec, RunStatus
from nvflare.apis.storage import DATA, JOB_ZIP, META, META_JSON, WORKSPACE, WORKSPACE_ZIP
from nvflare.fuel.hci.conn import Connection
from nvflare.fuel.hci.proto import ConfirmMethod, MetaKey, MetaStatusValue, make_meta
from nvflare.fuel.hci.reg import CommandModule, CommandModuleSpec, CommandSpec
from nvflare.fuel.hci.server.authz import PreAuthzReturnCode
from nvflare.fuel.hci.server.binary_transfer import BinaryTransfer
from nvflare.fuel.hci.server.constants import ConnProps
from nvflare.fuel.utils.argument_utils import SafeArgumentParser
from nvflare.private.defs import RequestHeader, TrainingTopic
from nvflare.private.fed.server.admin import new_message
from nvflare.private.fed.server.job_meta_validator import JobMetaValidator
from nvflare.private.fed.server.server_engine import ServerEngine
from nvflare.private.fed.server.server_engine_internal_spec import ServerEngineInternalSpec
from nvflare.security.logging import secure_format_exception, secure_log_traceback
from .cmd_utils import CommandUtil
CLONED_META_KEYS = {
JobMetaKey.JOB_NAME.value,
JobMetaKey.JOB_FOLDER_NAME.value,
JobMetaKey.DEPLOY_MAP.value,
JobMetaKey.RESOURCE_SPEC.value,
JobMetaKey.CONTENT_LOCATION.value,
JobMetaKey.RESULT_LOCATION.value,
JobMetaKey.APPROVALS.value,
JobMetaKey.MIN_CLIENTS.value,
JobMetaKey.MANDATORY_CLIENTS.value,
JobMetaKey.DATA_STORAGE_FORMAT.value,
}
def _create_list_job_cmd_parser():
parser = SafeArgumentParser(prog=AdminCommandNames.LIST_JOBS)
parser.add_argument("job_id", nargs="?", help="Job ID prefix")
parser.add_argument("-d", action="store_true", help="Show detailed list")
parser.add_argument("-u", action="store_true", help="List jobs submitted by the same user")
parser.add_argument("-r", action="store_true", help="List jobs in reverse order of submission time")
parser.add_argument("-n", help="Filter by job name prefix")
parser.add_argument(
"-m",
type=int,
help="Maximum number of jobs that will be listed",
)
return parser
[docs]class JobCommandModule(CommandModule, CommandUtil, BinaryTransfer):
"""Command module with commands for job management."""
def __init__(self):
super().__init__()
self.logger = logging.getLogger(self.__class__.__name__)
[docs] def get_spec(self):
return CommandModuleSpec(
name="job_mgmt",
cmd_specs=[
CommandSpec(
name=AdminCommandNames.DELETE_WORKSPACE,
description="delete the workspace of a job",
usage=f"{AdminCommandNames.DELETE_WORKSPACE} job_id",
handler_func=self.delete_job_id,
authz_func=self.authorize_job,
enabled=False,
confirm=ConfirmMethod.AUTH,
),
CommandSpec(
name=AdminCommandNames.START_APP,
description="start the FL app",
usage=f"{AdminCommandNames.START_APP} job_id server|client|all",
handler_func=self.start_app,
authz_func=self.authorize_job,
),
CommandSpec(
name=AdminCommandNames.LIST_JOBS,
description="list submitted jobs",
usage=f"{AdminCommandNames.LIST_JOBS} [-n name_prefix] [-d] [-u] [-r] [-m num_of_jobs] [job_id_prefix]",
handler_func=self.list_jobs,
authz_func=self.command_authz_required,
),
CommandSpec(
name=AdminCommandNames.GET_JOB_META,
description="get meta info of specified job",
usage=f"{AdminCommandNames.GET_JOB_META} job_id",
handler_func=self.get_job_meta,
authz_func=self.authorize_job,
),
CommandSpec(
name=AdminCommandNames.DELETE_JOB,
description="delete a job and persisted workspace",
usage=f"{AdminCommandNames.DELETE_JOB} job_id",
handler_func=self.delete_job,
authz_func=self.authorize_job,
confirm=ConfirmMethod.AUTH,
),
CommandSpec(
name=AdminCommandNames.ABORT_JOB,
description="abort a job if it is running or dispatched",
usage=f"{AdminCommandNames.ABORT_JOB} job_id",
handler_func=self.abort_job, # see if running, if running, send abort command
authz_func=self.authorize_job,
confirm=ConfirmMethod.YESNO,
),
CommandSpec(
name=AdminCommandNames.CLONE_JOB,
description="clone a job with a new job_id",
usage=f"{AdminCommandNames.CLONE_JOB} job_id",
handler_func=self.clone_job,
authz_func=self.authorize_job,
),
CommandSpec(
name=AdminCommandNames.SUBMIT_JOB,
description="submit a job",
usage=f"{AdminCommandNames.SUBMIT_JOB} job_folder",
handler_func=self.submit_job,
authz_func=self.command_authz_required,
client_cmd=ftd.PUSH_FOLDER_FQN,
),
CommandSpec(
name=AdminCommandNames.DOWNLOAD_JOB,
description="download a specified job",
usage=f"{AdminCommandNames.DOWNLOAD_JOB} job_id [destination]",
handler_func=self.download_job,
authz_func=self.authorize_job,
client_cmd=ftd.PULL_FOLDER_FQN,
),
# DOWNLOAD_JOB_FILE is an internal command that the client automatically issues
# during the download process of a job.
# This command is not visible to the user and cannot be issued by the user.
CommandSpec(
name=AdminCommandNames.DOWNLOAD_JOB_FILE,
description="download a specified job file",
usage=f"{AdminCommandNames.DOWNLOAD_JOB_FILE} job_id file_name",
handler_func=self.pull_file,
authz_func=self.authorize_job_file,
client_cmd=ftd.PULL_BINARY_FQN,
visible=False,
),
],
)
[docs] def authorize_job_file(self, conn: Connection, args: List[str]):
"""
Args: cmd_name tx_id job_id file_name [end]
"""
if len(args) < 4:
cmd_entry = conn.get_prop(ConnProps.CMD_ENTRY)
conn.append_error(f"Usage: {cmd_entry.usage}", meta=make_meta(MetaStatusValue.SYNTAX_ERROR))
return PreAuthzReturnCode.ERROR
job_id = args[2]
args_for_authz = [args[0], job_id]
return self.authorize_job(conn, args_for_authz)
[docs] def authorize_job(self, conn: Connection, args: List[str]):
if len(args) < 2:
conn.append_error(
"syntax error: missing job_id", meta=make_meta(MetaStatusValue.SYNTAX_ERROR, "missing job_id")
)
return PreAuthzReturnCode.ERROR
job_id = args[1].lower()
if not is_valid_job_id(job_id):
conn.append_error(f"invalid job_id {job_id}", meta=make_meta(MetaStatusValue.INVALID_JOB_ID, job_id))
return PreAuthzReturnCode.ERROR
conn.set_prop(self.JOB_ID, job_id)
engine = conn.app_ctx
job_def_manager = engine.job_def_manager
with engine.new_context() as fl_ctx:
job = job_def_manager.get_job(job_id, fl_ctx)
if not job:
conn.append_error(
f"Job with ID {job_id} doesn't exist", meta=make_meta(MetaStatusValue.INVALID_JOB_ID, job_id)
)
return PreAuthzReturnCode.ERROR
conn.set_prop(self.JOB, job)
conn.set_prop(ConnProps.SUBMITTER_NAME, job.meta.get(JobMetaKey.SUBMITTER_NAME, ""))
conn.set_prop(ConnProps.SUBMITTER_ORG, job.meta.get(JobMetaKey.SUBMITTER_ORG, ""))
conn.set_prop(ConnProps.SUBMITTER_ROLE, job.meta.get(JobMetaKey.SUBMITTER_ROLE, ""))
if len(args) > 2:
err = self.validate_command_targets(conn, args[2:])
if err:
conn.append_error(err, meta=make_meta(MetaStatusValue.INVALID_TARGET, err))
return PreAuthzReturnCode.ERROR
return PreAuthzReturnCode.REQUIRE_AUTHZ
def _start_app_on_clients(self, conn: Connection, job_id: str) -> bool:
engine = conn.app_ctx
client_names = conn.get_prop(self.TARGET_CLIENT_NAMES, None)
run_process = engine.run_processes.get(job_id, {})
if not run_process:
conn.append_error(f"Job {job_id} is not running.")
return False
participants: Dict[str, Client] = run_process.get(RunProcessKey.PARTICIPANTS, {})
wrong_clients = []
for client in client_names:
client_valid = False
for _, p in participants.items():
if client == p.name:
client_valid = True
break
if not client_valid:
wrong_clients.append(client)
if wrong_clients:
display_clients = ",".join(wrong_clients)
conn.append_error(f"{display_clients} are not in the job running list.")
return False
err = engine.check_app_start_readiness(job_id)
if err:
conn.append_error(err)
return False
message = new_message(conn, topic=TrainingTopic.START, body="", require_authz=False)
message.set_header(RequestHeader.JOB_ID, job_id)
replies = self.send_request_to_clients(conn, message)
self.process_replies_to_table(conn, replies)
return True
[docs] def start_app(self, conn: Connection, args: List[str]):
engine = conn.app_ctx
if not isinstance(engine, ServerEngineInternalSpec):
raise TypeError("engine must be ServerEngineInternalSpec but got {}".format(type(engine)))
job_id = conn.get_prop(self.JOB_ID)
if len(args) < 3:
conn.append_error("Please provide the target name (client / all) for start_app command.")
return
target_type = args[2]
if target_type == self.TARGET_TYPE_SERVER:
# if not self._start_app_on_server(conn, job_id):
# return
conn.append_error("start_app command only supports client app start.")
return
elif target_type == self.TARGET_TYPE_CLIENT:
if not self._start_app_on_clients(conn, job_id):
return
else:
# # all
# success = self._start_app_on_server(conn, job_id)
#
# if success:
client_names = conn.get_prop(self.TARGET_CLIENT_NAMES, None)
if client_names:
if not self._start_app_on_clients(conn, job_id):
return
conn.append_success("")
[docs] def delete_job_id(self, conn: Connection, args: List[str]):
job_id = args[1]
engine = conn.app_ctx
if not isinstance(engine, ServerEngine):
raise TypeError("engine must be ServerEngine but got {}".format(type(engine)))
if job_id in engine.run_processes.keys():
conn.append_error(f"Current running run_{job_id} can not be deleted.")
return
err = engine.delete_job_id(job_id)
if err:
conn.append_error(err)
return
# ask clients to delete this RUN
message = new_message(conn, topic=TrainingTopic.DELETE_RUN, body="", require_authz=False)
message.set_header(RequestHeader.JOB_ID, str(job_id))
clients = engine.get_clients()
if clients:
conn.set_prop(self.TARGET_CLIENT_TOKENS, [x.token for x in clients])
replies = self.send_request_to_clients(conn, message)
self.process_replies_to_table(conn, replies)
conn.append_success("")
[docs] def list_jobs(self, conn: Connection, args: List[str]):
try:
parser = _create_list_job_cmd_parser()
parsed_args = parser.parse_args(args[1:])
engine = conn.app_ctx
job_def_manager = engine.job_def_manager
if not isinstance(job_def_manager, JobDefManagerSpec):
raise TypeError(
f"job_def_manager in engine is not of type JobDefManagerSpec, but got {type(job_def_manager)}"
)
with engine.new_context() as fl_ctx:
jobs = job_def_manager.get_all_jobs(fl_ctx)
if jobs:
id_prefix = parsed_args.job_id
name_prefix = parsed_args.n
max_jobs_listed = parsed_args.m
user_name = conn.get_prop(ConnProps.USER_NAME, "") if parsed_args.u else None
filtered_jobs = [job for job in jobs if self._job_match(job.meta, id_prefix, name_prefix, user_name)]
if not filtered_jobs:
conn.append_string(
"No jobs matching the specified criteria.",
meta=make_meta(MetaStatusValue.OK, extra={MetaKey.JOBS: []}),
)
return
reverse = True if parsed_args.r else False
filtered_jobs.sort(key=lambda job: job.meta.get(JobMetaKey.SUBMIT_TIME.value, 0.0), reverse=reverse)
if max_jobs_listed:
if reverse:
filtered_jobs = filtered_jobs[:max_jobs_listed]
else:
filtered_jobs = filtered_jobs[-max_jobs_listed:]
if parsed_args.d:
self._send_detail_list(conn, filtered_jobs)
else:
self._send_summary_list(conn, filtered_jobs)
else:
conn.append_string("No jobs found.", meta=make_meta(MetaStatusValue.OK, extra={MetaKey.JOBS: []}))
except Exception as e:
conn.append_error(
secure_format_exception(e),
meta=make_meta(MetaStatusValue.INTERNAL_ERROR, info=secure_format_exception(e)),
)
return
conn.append_success("")
[docs] def delete_job(self, conn: Connection, args: List[str]):
job = conn.get_prop(self.JOB)
if not job:
conn.append_error(
"program error: job not set in conn", meta=make_meta(MetaStatusValue.INTERNAL_ERROR, "no job")
)
return
job_id = conn.get_prop(self.JOB_ID)
if job.meta.get(JobMetaKey.STATUS, "") in [RunStatus.DISPATCHED.value, RunStatus.RUNNING.value]:
conn.append_error(
f"job: {job_id} is running, could not be deleted at this time.",
meta=make_meta(MetaStatusValue.JOB_RUNNING, job_id),
)
return
try:
engine = conn.app_ctx
job_def_manager = engine.job_def_manager
with engine.new_context() as fl_ctx:
job_def_manager.delete(job_id, fl_ctx)
conn.append_string(f"Job {job_id} deleted.")
except Exception as e:
conn.append_error(
f"exception occurred: {secure_format_exception(e)}",
meta=make_meta(MetaStatusValue.INTERNAL_ERROR, f"exception {type(e)}"),
)
return
conn.append_success("", meta=make_meta(MetaStatusValue.OK))
[docs] def abort_job(self, conn: Connection, args: List[str]):
engine = conn.app_ctx
job_runner = engine.job_runner
try:
job_id = conn.get_prop(self.JOB_ID)
with engine.new_context() as fl_ctx:
job_manager = engine.job_def_manager
job = job_manager.get_job(job_id, fl_ctx)
job_status = job.meta.get(JobMetaKey.STATUS)
if job_status in [RunStatus.SUBMITTED, RunStatus.DISPATCHED]:
job_manager.set_status(job.job_id, RunStatus.FINISHED_ABORTED, fl_ctx)
message = f"Aborted the job {job_id} before running it."
conn.append_string(message)
conn.append_success("", meta=make_meta(MetaStatusValue.OK, message))
return
elif job_status.startswith("FINISHED:"):
message = f"Job for {job_id} is already completed."
conn.append_string(message)
conn.append_success("", meta=make_meta(MetaStatusValue.OK, message))
else:
message = job_runner.stop_run(job_id, fl_ctx)
if message:
conn.append_error(message, meta=make_meta(MetaStatusValue.INTERNAL_ERROR, message))
else:
message = "Abort signal has been sent to the server app."
conn.append_string(message)
conn.append_success("", meta=make_meta(MetaStatusValue.OK, message))
except Exception as e:
conn.append_error(
f"Exception occurred trying to abort job: {secure_format_exception(e)}",
meta=make_meta(MetaStatusValue.INTERNAL_ERROR, f"exception {type(e)}"),
)
return
[docs] def clone_job(self, conn: Connection, args: List[str]):
job = conn.get_prop(self.JOB)
job_id = conn.get_prop(self.JOB_ID)
engine = conn.app_ctx
try:
if not isinstance(engine, ServerEngine):
raise TypeError(f"engine is not of type ServerEngine, but got {type(engine)}")
job_def_manager = engine.job_def_manager
if not isinstance(job_def_manager, JobDefManagerSpec):
raise TypeError(
f"job_def_manager in engine is not of type JobDefManagerSpec, but got {type(job_def_manager)}"
)
with engine.new_context() as fl_ctx:
job_meta = {str(k): job.meta[k] for k in job.meta.keys() & CLONED_META_KEYS}
# set the submitter info for the new job
job_meta[JobMetaKey.SUBMITTER_NAME.value] = conn.get_prop(ConnProps.USER_NAME)
job_meta[JobMetaKey.SUBMITTER_ORG.value] = conn.get_prop(ConnProps.USER_ORG)
job_meta[JobMetaKey.SUBMITTER_ROLE.value] = conn.get_prop(ConnProps.USER_ROLE)
job_meta[JobMetaKey.CLONED_FROM.value] = job_id
meta = job_def_manager.clone(from_jid=job_id, meta=job_meta, fl_ctx=fl_ctx)
new_job_id = meta.get(JobMetaKey.JOB_ID)
conn.append_string(f"Cloned job {job_id} as: {new_job_id}")
except Exception as e:
conn.append_error(
f"Exception occurred trying to clone job: {secure_format_exception(e)}",
meta=make_meta(MetaStatusValue.INTERNAL_ERROR, f"exception {type(e)}"),
)
return
conn.append_success("", meta=make_meta(status=MetaStatusValue.OK, extra={MetaKey.JOB_ID: new_job_id}))
@staticmethod
def _job_match(job_meta: Dict, id_prefix: str, name_prefix: str, user_name: str) -> bool:
return (
((not id_prefix) or job_meta.get("job_id").lower().startswith(id_prefix.lower()))
and ((not name_prefix) or job_meta.get("name").lower().startswith(name_prefix.lower()))
and ((not user_name) or job_meta.get("submitter_name") == user_name)
)
@staticmethod
def _send_detail_list(conn: Connection, jobs: List[Job]):
list_of_jobs = []
for job in jobs:
JobCommandModule._set_duration(job)
conn.append_string(json.dumps(job.meta, indent=4))
list_of_jobs.append(job.meta)
conn.append_string("", meta=make_meta(MetaStatusValue.OK, extra={MetaKey.JOBS: list_of_jobs}))
@staticmethod
def _send_summary_list(conn: Connection, jobs: List[Job]):
table = conn.append_table(["Job ID", "Name", "Status", "Submit Time", "Run Duration"], name=MetaKey.JOBS)
for job in jobs:
JobCommandModule._set_duration(job)
table_row = [
job.meta.get(JobMetaKey.JOB_ID.value, ""),
CommandUtil.get_job_name(job.meta),
job.meta.get(JobMetaKey.STATUS.value, ""),
job.meta.get(JobMetaKey.SUBMIT_TIME_ISO.value, ""),
str(job.meta.get(JobMetaKey.DURATION.value, "N/A")),
]
table.add_row(
table_row,
meta={
MetaKey.JOB_ID: job.meta.get(JobMetaKey.JOB_ID.value, ""),
MetaKey.JOB_NAME: CommandUtil.get_job_name(job.meta),
MetaKey.STATUS: job.meta.get(JobMetaKey.STATUS.value, ""),
MetaKey.SUBMIT_TIME: job.meta.get(JobMetaKey.SUBMIT_TIME_ISO.value, ""),
MetaKey.DURATION: str(job.meta.get(JobMetaKey.DURATION.value, "N/A")),
},
)
@staticmethod
def _set_duration(job):
if job.meta.get(JobMetaKey.STATUS) == RunStatus.RUNNING.value:
start_time = datetime.datetime.strptime(job.meta.get(JobMetaKey.START_TIME.value), "%Y-%m-%d %H:%M:%S.%f")
duration = datetime.datetime.now() - start_time
job.meta[JobMetaKey.DURATION.value] = str(duration)
[docs] def submit_job(self, conn: Connection, args: List[str]):
folder_name = args[1]
zip_file_name = conn.extra
engine = conn.app_ctx
try:
with engine.new_context() as fl_ctx:
job_validator = JobMetaValidator()
valid, error, meta = job_validator.validate(folder_name, zip_file_name)
if not valid:
conn.append_error(error, meta=make_meta(MetaStatusValue.INVALID_JOB_DEFINITION, error))
return
job_def_manager = engine.job_def_manager
if not isinstance(job_def_manager, JobDefManagerSpec):
raise TypeError(
f"job_def_manager in engine is not of type JobDefManagerSpec, but got {type(job_def_manager)}"
)
fl_ctx.set_prop(FLContextKey.JOB_META, meta, private=True, sticky=False)
engine.fire_event(EventType.SUBMIT_JOB, fl_ctx)
block_reason = fl_ctx.get_prop(FLContextKey.JOB_BLOCK_REASON)
if block_reason:
# submitted job blocked
self.logger.error(f"submitted job is blocked: {block_reason}")
conn.append_error(
block_reason, meta=make_meta(MetaStatusValue.INVALID_JOB_DEFINITION, block_reason)
)
return
# set submitter info
meta[JobMetaKey.SUBMITTER_NAME.value] = conn.get_prop(ConnProps.USER_NAME, "")
meta[JobMetaKey.SUBMITTER_ORG.value] = conn.get_prop(ConnProps.USER_ORG, "")
meta[JobMetaKey.SUBMITTER_ROLE.value] = conn.get_prop(ConnProps.USER_ROLE, "")
meta[JobMetaKey.JOB_FOLDER_NAME.value] = folder_name
custom_props = conn.get_prop(ConnProps.CUSTOM_PROPS)
if custom_props:
meta[JobMetaKey.CUSTOM_PROPS.value] = custom_props
meta = job_def_manager.create(meta, zip_file_name, fl_ctx)
job_id = meta.get(JobMetaKey.JOB_ID)
# os.remove(zip_file_name) # the file is no longer needed
conn.append_string(f"Submitted job: {job_id}")
conn.append_success("", meta=make_meta(MetaStatusValue.OK, extra={MetaKey.JOB_ID: job_id}))
except Exception as e:
conn.append_error(
f"Exception occurred trying to submit job: {secure_format_exception(e)}",
meta=make_meta(MetaStatusValue.INTERNAL_ERROR, f"exception {type(e)} occurred"),
)
return
def _clean_up_download(self, conn: Connection, tx_id: str):
"""
Remove the job download folder
"""
job_download_dir = self.tx_path(conn, tx_id)
shutil.rmtree(job_download_dir, ignore_errors=True)
[docs] def pull_file(self, conn: Connection, args: List[str]):
"""
Args: cmd_name tx_id folder_name file_name [end]
"""
if len(args) < 4:
# NOTE: this should never happen since args have been validated by authorize_job_file!
self.logger.error("syntax error: missing tx_id folder_name file name")
return
tx_id = args[1]
folder_name = args[2]
file_name = args[3]
self.download_file(conn, tx_id, folder_name, file_name)
if len(args) > 4:
# this is the end of the download - remove the download dir
self._clean_up_download(conn, tx_id)
[docs] def download_job(self, conn: Connection, args: List[str]):
"""
Job download uses binary protocol for more efficient download.
- Retrieve job data from job store. This puts job files (meta, data, and workspace) in a transfer folder
- Returns job file names, a TX ID, and a command name for downloading files to the admin client
- Admin client downloads received file names one by one. It signals the end of download in the last command.
"""
job_id = args[1]
self.logger.debug(f"pull_job called for {job_id}")
engine = conn.app_ctx
job_def_manager = engine.job_def_manager
if not isinstance(job_def_manager, JobDefManagerSpec):
self.logger.error(
f"job_def_manager in engine is not of type JobDefManagerSpec, but got {type(job_def_manager)}"
)
conn.append_error("internal error", meta=make_meta(MetaStatusValue.INTERNAL_ERROR))
return
# It is possible that the same job is downloaded in multiple sessions at the same time.
# To allow this, we use a separate sub-folder in the download_dir for each download.
# This sub-folder is named with a transaction ID (tx_id), which is a UUID.
# The folder path for download the job is: <download_dir>/<tx_id>/<job_id>.
tx_id = str(uuid.uuid4()) # generate a new tx_id
job_download_dir = self.tx_path(conn, tx_id) # absolute path of the job download dir.
with engine.new_context() as fl_ctx:
try:
job_def_manager.get_storage_for_download(job_id, job_download_dir, DATA, JOB_ZIP, fl_ctx)
job_def_manager.get_storage_for_download(job_id, job_download_dir, META, META_JSON, fl_ctx)
job_def_manager.get_storage_for_download(job_id, job_download_dir, WORKSPACE, WORKSPACE_ZIP, fl_ctx)
self.download_folder(
conn,
tx_id=tx_id,
folder_name=job_id,
download_file_cmd_name=AdminCommandNames.DOWNLOAD_JOB_FILE,
)
except Exception as e:
secure_log_traceback()
self.logger.error(f"exception downloading job {job_id}: {secure_format_exception(e)}")
self._clean_up_download(conn, tx_id)
conn.append_error("internal error", meta=make_meta(MetaStatusValue.INTERNAL_ERROR))