# Copyright (c) 2022, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import datetime
import io
import json
import os
import shutil
import threading
import uuid
import weakref
from typing import Dict, List, Optional, Set
from zipfile import BadZipFile, ZipFile
import nvflare.fuel.hci.file_transfer_defs as ftd
from nvflare.apis.event_type import EventType
from nvflare.apis.fl_constant import (
SUBMIT_TOKEN_CONFLICT_STATUS,
SUBMIT_TOKEN_JOB_DELETED_STATUS,
AdminCommandNames,
FLContextKey,
ReturnCode,
ServerCommandKey,
WorkspaceConstants,
)
from nvflare.apis.job_def import (
ALL_SITES,
DEFAULT_STUDY,
SERVER_SITE_NAME,
Job,
JobMetaKey,
SubmitRecordKey,
SubmitRecordState,
get_job_meta_study,
is_valid_job_id,
)
from nvflare.apis.job_def_manager_spec import JobDefManagerSpec, RunStatus
from nvflare.apis.shareable import Shareable
from nvflare.apis.storage import (
DATA,
JOB_ZIP,
META,
META_JSON,
WORKSPACE,
WORKSPACE_ZIP,
DataTypes,
StorageException,
StorageSpec,
)
from nvflare.apis.utils.job_submit_token import canonical_job_content_hash, validate_submit_token
from nvflare.fuel.hci.conn import Connection
from nvflare.fuel.hci.proto import ConfirmMethod, MetaKey, MetaStatusValue, make_meta
from nvflare.fuel.hci.reg import CommandModule, CommandModuleSpec, CommandSpec
from nvflare.fuel.hci.server.authz import PreAuthzReturnCode
from nvflare.fuel.hci.server.binary_transfer import BinaryTransfer
from nvflare.fuel.hci.server.constants import ConnProps
from nvflare.fuel.utils.argument_utils import SafeArgumentParser
from nvflare.fuel.utils.log_utils import get_obj_logger
from nvflare.private.defs import RequestHeader, TrainingTopic
from nvflare.private.fed.server.admin import new_message
from nvflare.private.fed.server.job_meta_validator import JobMetaValidator
from nvflare.private.fed.server.server_engine import ServerEngine
from nvflare.private.fed.server.server_engine_internal_spec import ServerEngineInternalSpec
from nvflare.private.fed.utils.fed_utils import extract_participants
from nvflare.security.logging import secure_format_exception, secure_log_traceback
from nvflare.security.study_registry import StudyRegistryService
from .cmd_utils import CommandUtil
class _SubmitTokenJobDeleted(Exception):
def __init__(self, record: dict):
super().__init__("submit token refers to a deleted job")
self.record = record
CLONED_META_KEYS = {
JobMetaKey.JOB_NAME.value,
JobMetaKey.JOB_FOLDER_NAME.value,
JobMetaKey.DEPLOY_MAP.value,
JobMetaKey.RESOURCE_SPEC.value,
JobMetaKey.CONTENT_LOCATION.value,
JobMetaKey.RESULT_LOCATION.value,
JobMetaKey.APPROVALS.value,
JobMetaKey.MIN_CLIENTS.value,
JobMetaKey.MANDATORY_CLIENTS.value,
JobMetaKey.DATA_STORAGE_FORMAT.value,
JobMetaKey.STUDY.value,
}
JSON_LOG_FILE_NAME = "log.json"
def _active_study_from_conn(conn: Connection) -> str:
return conn.get_prop(ConnProps.ACTIVE_STUDY, DEFAULT_STUDY) or DEFAULT_STUDY
def _append_no_such_job_error(conn: Connection, job_id: str):
conn.append_error(f"no such job: {job_id}", meta=make_meta(MetaStatusValue.INVALID_JOB_ID, job_id))
def _create_list_job_cmd_parser():
parser = SafeArgumentParser(prog=AdminCommandNames.LIST_JOBS)
parser.add_argument("job_id", nargs="?", help="Job ID prefix")
parser.add_argument("-d", action="store_true", help="Show detailed list")
parser.add_argument("-u", action="store_true", help="List jobs submitted by the same user")
parser.add_argument("-r", action="store_true", help="List jobs in reverse order of submission time")
parser.add_argument("-n", help="Filter by job name prefix")
parser.add_argument(
"-m",
type=int,
help="Maximum number of jobs that will be listed",
)
parser.add_argument("--submit-token", dest="submit_token", help="retry-safe submit token")
return parser
def _create_submit_job_cmd_parser():
parser = SafeArgumentParser(prog=AdminCommandNames.SUBMIT_JOB)
parser.add_argument("folder_name", help="Uploaded job folder name")
parser.add_argument("--submit-token", dest="submit_token", help="retry-safe submit token")
return parser
def _create_get_job_log_cmd_parser():
parser = SafeArgumentParser(prog=AdminCommandNames.GET_JOB_LOG)
parser.add_argument("job_id", help="Job ID")
parser.add_argument("target", nargs="?", default=SERVER_SITE_NAME, help="server, all, or a client site name")
parser.add_argument("log_file_name", nargs="?", default=WorkspaceConstants.LOG_FILE_NAME, help="log file name")
return parser
[docs]
class JobCommandModule(CommandModule, CommandUtil, BinaryTransfer):
"""Command module with commands for job management."""
MAX_RETURNED_JOB_LOG_BYTES = 5 * 1024 * 1024
_submit_token_locks = weakref.WeakValueDictionary()
_submit_token_locks_guard = threading.Lock()
def __init__(self):
super().__init__()
self.logger = get_obj_logger(self)
[docs]
def get_spec(self):
return CommandModuleSpec(
name="job_mgmt",
cmd_specs=[
CommandSpec(
name=AdminCommandNames.DELETE_WORKSPACE,
description="delete the workspace of a job",
usage=f"{AdminCommandNames.DELETE_WORKSPACE} job_id",
handler_func=self.delete_job_id,
authz_func=self.authorize_job,
enabled=False,
confirm=ConfirmMethod.AUTH,
),
CommandSpec(
name=AdminCommandNames.CONFIGURE_JOB_LOG,
description="configure logging of a running job",
usage=f"{AdminCommandNames.CONFIGURE_JOB_LOG} job_id server|client <client-name>... config",
handler_func=self.configure_job_log,
authz_func=self.authorize_configure_job_log,
),
CommandSpec(
name=AdminCommandNames.LIST_JOBS,
description="list submitted jobs",
usage=(
f"{AdminCommandNames.LIST_JOBS} [-n name_prefix] [-d] [-u] [-r] "
"[-m num_of_jobs] [--submit-token token] [job_id_prefix]"
),
handler_func=self.list_jobs,
authz_func=self.command_authz_required,
),
CommandSpec(
name=AdminCommandNames.GET_JOB_LOG,
description="get job log text from the server-side log store",
usage=f"{AdminCommandNames.GET_JOB_LOG} job_id [server|all|client_name]",
handler_func=self.get_job_log,
authz_func=self.authorize_job_id,
),
CommandSpec(
name=AdminCommandNames.GET_JOB_META,
description="get meta info of specified job",
usage=f"{AdminCommandNames.GET_JOB_META} job_id",
handler_func=self.get_job_meta,
authz_func=self.authorize_job,
),
CommandSpec(
name=AdminCommandNames.LIST_JOB,
description="list additional components of specified job",
usage=f"{AdminCommandNames.LIST_JOB} job_id",
handler_func=self.list_job_components,
authz_func=self.authorize_job,
),
CommandSpec(
name=AdminCommandNames.DELETE_JOB,
description="delete a job and persisted workspace",
usage=f"{AdminCommandNames.DELETE_JOB} job_id",
handler_func=self.delete_job,
authz_func=self.authorize_job,
confirm=ConfirmMethod.AUTH,
),
CommandSpec(
name=AdminCommandNames.ABORT_JOB,
description="abort a job if it is running or dispatched",
usage=f"{AdminCommandNames.ABORT_JOB} job_id",
handler_func=self.abort_job, # see if running, if running, send abort command
authz_func=self.authorize_job,
confirm=ConfirmMethod.YESNO,
),
CommandSpec(
name=AdminCommandNames.CLONE_JOB,
description="clone a job with a new job_id",
usage=f"{AdminCommandNames.CLONE_JOB} job_id",
handler_func=self.clone_job,
authz_func=self.authorize_job,
),
CommandSpec(
name=AdminCommandNames.SUBMIT_JOB,
description="submit a job",
usage=f"{AdminCommandNames.SUBMIT_JOB} job_folder [--submit-token token]",
handler_func=self.submit_job,
authz_func=self.command_authz_required,
client_cmd=ftd.PUSH_FOLDER_FQN,
),
CommandSpec(
name=AdminCommandNames.DOWNLOAD_JOB,
description="download a specified job",
usage=f"{AdminCommandNames.DOWNLOAD_JOB} job_id [destination]",
handler_func=self.download_job,
authz_func=self.authorize_job,
client_cmd=ftd.PULL_FOLDER_FQN,
),
CommandSpec(
name=AdminCommandNames.DOWNLOAD_JOB_COMPONENTS,
description="download additional components for a specified job",
usage=f"{AdminCommandNames.DOWNLOAD_JOB_COMPONENTS} job_id",
authz_func=self.authorize_job,
handler_func=self.download_job_components,
client_cmd=ftd.PULL_FOLDER_FQN,
),
CommandSpec(
name=AdminCommandNames.APP_COMMAND,
description="execute an app-defined command",
usage=f"{AdminCommandNames.APP_COMMAND} job_id topic # cmd_data",
handler_func=self.do_app_command,
authz_func=self.authorize_job_id,
),
],
)
[docs]
def authorize_job_file(self, conn: Connection, args: List[str]):
"""
Args: cmd_name tx_id job_id file_name [end]
"""
if len(args) < 4:
cmd_entry = conn.get_prop(ConnProps.CMD_ENTRY)
conn.append_error(f"Usage: {cmd_entry.usage}", meta=make_meta(MetaStatusValue.SYNTAX_ERROR))
return PreAuthzReturnCode.ERROR
job_id = args[2]
args_for_authz = [args[0], job_id]
return self.authorize_job_id(conn, args_for_authz)
[docs]
def authorize_job_id(self, conn: Connection, args: List[str]):
if len(args) < 2:
conn.append_error(
"syntax error: missing job_id", meta=make_meta(MetaStatusValue.SYNTAX_ERROR, "missing job_id")
)
return PreAuthzReturnCode.ERROR
job_id = args[1].lower()
if not is_valid_job_id(job_id):
conn.append_error(f"invalid job_id {job_id}", meta=make_meta(MetaStatusValue.INVALID_JOB_ID, job_id))
return PreAuthzReturnCode.ERROR
conn.set_prop(self.JOB_ID, job_id)
engine = conn.app_ctx
job_def_manager = engine.job_def_manager
with engine.new_context() as fl_ctx:
job = job_def_manager.get_job(job_id, fl_ctx)
if not job:
_append_no_such_job_error(conn, job_id)
return PreAuthzReturnCode.ERROR
requested_study = _active_study_from_conn(conn)
if requested_study and get_job_meta_study(job.meta) != requested_study:
_append_no_such_job_error(conn, job_id)
return PreAuthzReturnCode.ERROR
conn.set_prop(self.JOB, job)
conn.set_prop(ConnProps.SUBMITTER_NAME, job.meta.get(JobMetaKey.SUBMITTER_NAME, ""))
conn.set_prop(ConnProps.SUBMITTER_ORG, job.meta.get(JobMetaKey.SUBMITTER_ORG, ""))
conn.set_prop(ConnProps.SUBMITTER_ROLE, job.meta.get(JobMetaKey.SUBMITTER_ROLE, ""))
if not self._apply_study_role_for_authz(conn):
return PreAuthzReturnCode.ERROR
return PreAuthzReturnCode.REQUIRE_AUTHZ
[docs]
def authorize_job(self, conn: Connection, args: List[str]):
rc = self.authorize_job_id(conn, args)
if rc == PreAuthzReturnCode.ERROR:
return rc
if len(args) > 2:
err = self.validate_command_targets(conn, args[2:])
if err:
conn.append_error(err, meta=make_meta(MetaStatusValue.INVALID_TARGET, err))
return PreAuthzReturnCode.ERROR
return PreAuthzReturnCode.REQUIRE_AUTHZ
[docs]
def delete_job_id(self, conn: Connection, args: List[str]):
job_id = args[1]
engine = conn.app_ctx
if not isinstance(engine, ServerEngine):
raise TypeError("engine must be ServerEngine but got {}".format(type(engine)))
if job_id in engine.run_processes.keys():
conn.append_error(f"Current running run_{job_id} can not be deleted.")
return
err = engine.delete_job_id(job_id)
if err:
conn.append_error(err)
return
# ask clients to delete this RUN
message = new_message(conn, topic=TrainingTopic.DELETE_RUN, body="", require_authz=False)
message.set_header(RequestHeader.JOB_ID, str(job_id))
clients = engine.get_clients()
if clients:
conn.set_prop(self.TARGET_CLIENT_TOKENS, [x.token for x in clients])
replies = self.send_request_to_clients(conn, message)
self.process_replies_to_table(conn, replies)
conn.append_success("")
[docs]
def list_jobs(self, conn: Connection, args: List[str]):
try:
parser = _create_list_job_cmd_parser()
parsed_args = parser.parse_args(args[1:])
submit_token = validate_submit_token(parsed_args.submit_token)
requested_study = _active_study_from_conn(conn)
engine = conn.app_ctx
job_def_manager = engine.job_def_manager
if not isinstance(job_def_manager, JobDefManagerSpec):
raise TypeError(
f"job_def_manager in engine is not of type JobDefManagerSpec, but got {type(job_def_manager)}"
)
with engine.new_context() as fl_ctx:
if submit_token:
jobs = self._list_jobs_by_submit_token(
job_def_manager,
requested_study,
self._submitter_from_conn(conn),
submit_token,
fl_ctx,
)
else:
jobs = job_def_manager.get_all_jobs(fl_ctx)
if jobs:
id_prefix = parsed_args.job_id
name_prefix = parsed_args.n
max_jobs_listed = parsed_args.m
user_name = conn.get_prop(ConnProps.USER_NAME, "") if parsed_args.u else None
filtered_jobs = [
job for job in jobs if self._job_match(job.meta, id_prefix, name_prefix, user_name, requested_study)
]
if not filtered_jobs:
conn.append_string(
"No jobs matching the specified criteria.",
meta=make_meta(MetaStatusValue.OK, extra={MetaKey.JOBS: []}),
)
return
reverse = True if parsed_args.r else False
filtered_jobs.sort(key=lambda job: job.meta.get(JobMetaKey.SUBMIT_TIME.value, 0.0), reverse=reverse)
if max_jobs_listed:
if reverse:
filtered_jobs = filtered_jobs[:max_jobs_listed]
else:
filtered_jobs = filtered_jobs[-max_jobs_listed:]
if parsed_args.d:
self._send_detail_list(conn, filtered_jobs)
else:
self._send_summary_list(conn, filtered_jobs)
else:
conn.append_string("No jobs found.", meta=make_meta(MetaStatusValue.OK, extra={MetaKey.JOBS: []}))
except ValueError as e:
conn.append_error(str(e), meta=make_meta(MetaStatusValue.SYNTAX_ERROR, str(e)))
return
except _SubmitTokenJobDeleted as e:
self._append_submit_token_job_deleted(conn, e.record)
return
except Exception as e:
conn.append_error(
secure_format_exception(e),
meta=make_meta(MetaStatusValue.INTERNAL_ERROR, info=secure_format_exception(e)),
)
return
conn.append_success("")
[docs]
def delete_job(self, conn: Connection, args: List[str]):
job = conn.get_prop(self.JOB)
if not job:
conn.append_error(
"program error: job not set in conn", meta=make_meta(MetaStatusValue.INTERNAL_ERROR, "no job")
)
return
job_id = conn.get_prop(self.JOB_ID)
if job.meta.get(JobMetaKey.STATUS, "") in [RunStatus.DISPATCHED.value, RunStatus.RUNNING.value]:
conn.append_error(
f"job: {job_id} is running, could not be deleted at this time.",
meta=make_meta(MetaStatusValue.JOB_RUNNING, job_id),
)
return
try:
engine = conn.app_ctx
job_def_manager = engine.job_def_manager
with engine.new_context() as fl_ctx:
job_def_manager.delete(job_id, fl_ctx)
mark_deleted = getattr(job_def_manager, "mark_submit_records_job_deleted", None)
if callable(mark_deleted):
marked_records = mark_deleted(job_id, self._submitter_from_conn(conn), fl_ctx)
else:
marked_records = []
marked_count = len(marked_records)
conn.append_string(f"Job {job_id} deleted. Submit records marked deleted: {marked_count}.")
except Exception as e:
conn.append_error(
f"exception occurred: {secure_format_exception(e)}",
meta=make_meta(MetaStatusValue.INTERNAL_ERROR, f"exception {type(e)}"),
)
return
conn.append_success(
"",
meta=make_meta(
MetaStatusValue.OK,
extra={MetaKey.JOB_ID: job_id, "submit_records_marked_deleted": marked_count},
),
)
[docs]
def get_job_log(self, conn: Connection, args: List[str]):
try:
parser = _create_get_job_log_cmd_parser()
parsed_args = parser.parse_args(args[1:])
except Exception as e:
conn.append_error(
secure_format_exception(e),
meta=make_meta(MetaStatusValue.SYNTAX_ERROR, secure_format_exception(e)),
)
return
prop_job_id = conn.get_prop(self.JOB_ID)
parsed_job_id = parsed_args.job_id.lower() if isinstance(parsed_args.job_id, str) else parsed_args.job_id
if prop_job_id is not None and parsed_job_id is not None and prop_job_id != parsed_job_id:
conn.append_error(
"job_id mismatch between connection property and parsed argument",
meta=make_meta(
MetaStatusValue.SYNTAX_ERROR,
"job_id mismatch between connection property and parsed argument",
),
)
return
job_id = prop_job_id if prop_job_id is not None else parsed_job_id
engine = conn.app_ctx
if not isinstance(engine, ServerEngine):
raise TypeError("engine must be ServerEngine but got {}".format(type(engine)))
target = parsed_args.target
target_lower = target.lower() if isinstance(target, str) else target
log_file_name = self._normalize_log_file_name(parsed_args.log_file_name)
payload = {"logs": {}}
# Accept the protocol token "@ALL" and the CLI/API spelling "all".
all_targets = {ALL_SITES.lower(), "all"}
workspace_zip = None
workspace_zip_loaded = False
if target_lower in all_targets:
workspace_zip = self._read_stored_workspace_zip(conn, job_id)
workspace_zip_loaded = True
if target_lower in {SERVER_SITE_NAME, *all_targets}:
server_log = self._read_server_job_log(
conn,
job_id,
log_file_name=log_file_name,
workspace_zip=workspace_zip,
workspace_zip_loaded=workspace_zip_loaded,
)
if server_log is not None:
payload["logs"][SERVER_SITE_NAME] = server_log or ""
else:
payload.setdefault("unavailable", {})[SERVER_SITE_NAME] = "server log not available for this job"
try:
if target_lower not in {SERVER_SITE_NAME, *all_targets}:
self._add_client_job_log(conn, payload, job_id, target, log_file_name)
elif target_lower in all_targets:
self._add_all_client_job_logs(
conn,
payload,
job_id,
log_file_name,
workspace_zip=workspace_zip,
workspace_zip_loaded=workspace_zip_loaded,
)
except TypeError as e:
error = secure_format_exception(e)
conn.append_error(error, meta=make_meta(MetaStatusValue.INTERNAL_ERROR, error))
return
conn.append_dict(payload, meta=make_meta(MetaStatusValue.OK))
@staticmethod
def _normalize_log_file_name(log_file_name: str) -> str:
if log_file_name == JSON_LOG_FILE_NAME:
return JSON_LOG_FILE_NAME
return WorkspaceConstants.LOG_FILE_NAME
def _read_server_job_log(
self,
conn: Connection,
job_id: str,
log_file_name: str = WorkspaceConstants.LOG_FILE_NAME,
workspace_zip: bytes = None,
workspace_zip_loaded: bool = False,
) -> Optional[str]:
engine = conn.app_ctx
log_text = self._read_live_server_job_log(engine, job_id, log_file_name)
if log_text is not None:
return log_text
if workspace_zip_loaded:
return self._extract_server_log_from_workspace_zip(workspace_zip, log_file_name)
return self._read_stored_server_job_log(conn, job_id, log_file_name)
def _read_live_server_job_log(
self, engine, job_id: str, log_file_name: str = WorkspaceConstants.LOG_FILE_NAME
) -> Optional[str]:
workspace = engine.get_workspace()
log_file = os.path.join(workspace.get_log_root(job_id), log_file_name)
try:
if os.path.exists(log_file):
return "".join(self._collect_job_log_lines(log_file))
except FileNotFoundError:
# The log file can disappear between the existence check and the open() if the
# active run rotates or cleans up the workspace. Treat that as unavailable
# rather than surfacing an internal error to the admin client.
return None
return None
def _read_stored_server_job_log(
self, conn: Connection, job_id: str, log_file_name: str = WorkspaceConstants.LOG_FILE_NAME
) -> Optional[str]:
workspace_zip = self._read_stored_workspace_zip(conn, job_id)
return self._extract_server_log_from_workspace_zip(workspace_zip, log_file_name)
def _read_stored_workspace_zip(self, conn: Connection, job_id: str) -> Optional[bytes]:
engine = conn.app_ctx
job_def_manager = engine.job_def_manager
if not isinstance(job_def_manager, JobDefManagerSpec):
return None
with engine.new_context() as fl_ctx:
try:
workspace_zip = job_def_manager.get_storage_component(jid=job_id, component=WORKSPACE, fl_ctx=fl_ctx)
except StorageException:
return None
return workspace_zip
def _extract_server_log_from_workspace_zip(
self, workspace_zip: bytes, log_file_name: str = WorkspaceConstants.LOG_FILE_NAME
) -> Optional[str]:
if not workspace_zip:
return None
try:
with ZipFile(io.BytesIO(workspace_zip), "r") as zip_file:
log_name = self._find_server_log_member(zip_file.namelist(), log_file_name)
if not log_name:
return None
with zip_file.open(log_name, "r") as log_file:
data = log_file.read(self.MAX_RETURNED_JOB_LOG_BYTES + 1)
except (BadZipFile, KeyError, OSError, TypeError):
return None
return self._decode_job_log_data(data)
@staticmethod
def _find_server_log_member(
member_names: List[str], log_file_name: str = WorkspaceConstants.LOG_FILE_NAME
) -> Optional[str]:
if log_file_name in member_names:
return log_file_name
server_log_name = f"{SERVER_SITE_NAME}/{log_file_name}"
if server_log_name in member_names:
return server_log_name
suffix = f"/{server_log_name}"
for member_name in member_names:
if not member_name.endswith("/") and member_name.endswith(suffix):
return member_name
return None
def _add_client_job_log(
self,
conn: Connection,
payload: dict,
job_id: str,
client_name: str,
log_file_name: str = WorkspaceConstants.LOG_FILE_NAME,
):
text = self._read_client_job_log(conn, job_id, client_name, log_file_name)
if text is None:
payload.setdefault("unavailable", {})[client_name] = "client log stream not available for this job"
else:
payload["logs"][client_name] = text
def _add_all_client_job_logs(
self,
conn: Connection,
payload: dict,
job_id: str,
log_file_name: str = WorkspaceConstants.LOG_FILE_NAME,
workspace_zip: bytes = None,
workspace_zip_loaded: bool = False,
):
engine = conn.app_ctx
job_def_manager = engine.job_def_manager
if not isinstance(job_def_manager, JobDefManagerSpec):
raise TypeError(
f"job_def_manager in engine is not of type JobDefManagerSpec, but got {type(job_def_manager)}"
)
with engine.new_context() as fl_ctx:
available_sites = self._get_available_client_log_sites(job_def_manager, job_id, fl_ctx, log_file_name)
if workspace_zip_loaded:
workspace_client_logs = self._extract_client_logs_from_workspace_zip(workspace_zip, log_file_name)
else:
# Defensive path for direct helper calls. get_job_log preloads the workspace
# ZIP before calling this for --site all.
workspace_client_logs = self._read_workspace_client_job_logs(
job_def_manager, job_id, fl_ctx, log_file_name
)
available_sites.update(workspace_client_logs.keys())
known_sites = self._get_job_client_targets(conn.get_prop(self.JOB))
for client_name in sorted(known_sites | available_sites):
text = self._read_live_client_job_log(engine, job_id, client_name, log_file_name)
if text is None:
text = workspace_client_logs.get(client_name)
if text is None:
with engine.new_context() as fl_ctx:
data = job_def_manager.get_client_data(
jid=job_id,
client_name=client_name,
data_type=self._client_log_data_type(log_file_name),
fl_ctx=fl_ctx,
)
text = self._decode_job_log_data(data)
if text is not None:
payload["logs"][client_name] = text
missing_sites = known_sites - set(payload["logs"].keys()) - {SERVER_SITE_NAME}
if missing_sites:
unavailable = payload.setdefault("unavailable", {})
for client_name in sorted(missing_sites):
unavailable[client_name] = "client log stream not available for this job"
def _read_client_job_log(
self,
conn: Connection,
job_id: str,
client_name: str,
log_file_name: str = WorkspaceConstants.LOG_FILE_NAME,
) -> Optional[str]:
engine = conn.app_ctx
job_def_manager = engine.job_def_manager
if not isinstance(job_def_manager, JobDefManagerSpec):
raise TypeError(
f"job_def_manager in engine is not of type JobDefManagerSpec, but got {type(job_def_manager)}"
)
text = self._read_live_client_job_log(engine, job_id, client_name, log_file_name)
if text is not None:
return text
text = self._read_stored_client_job_log(conn, job_id, client_name, log_file_name)
if text is not None:
return text
with engine.new_context() as fl_ctx:
data = job_def_manager.get_client_data(
jid=job_id,
client_name=client_name,
data_type=self._client_log_data_type(log_file_name),
fl_ctx=fl_ctx,
)
return self._decode_job_log_data(data)
def _read_live_client_job_log(
self,
engine,
job_id: str,
client_name: str,
log_file_name: str = WorkspaceConstants.LOG_FILE_NAME,
) -> Optional[str]:
workspace = engine.get_workspace()
log_file = os.path.join(workspace.get_log_root(job_id), client_name, log_file_name)
try:
if os.path.exists(log_file):
return "".join(self._collect_job_log_lines(log_file))
except FileNotFoundError:
return None
return None
def _read_stored_client_job_log(
self,
conn: Connection,
job_id: str,
client_name: str,
log_file_name: str = WorkspaceConstants.LOG_FILE_NAME,
) -> Optional[str]:
engine = conn.app_ctx
job_def_manager = engine.job_def_manager
if not isinstance(job_def_manager, JobDefManagerSpec):
return None
with engine.new_context() as fl_ctx:
try:
workspace_zip = job_def_manager.get_storage_component(jid=job_id, component=WORKSPACE, fl_ctx=fl_ctx)
except StorageException:
return None
return self._extract_client_log_from_workspace_zip(workspace_zip, client_name, log_file_name)
def _extract_client_log_from_workspace_zip(
self,
workspace_zip: bytes,
client_name: str,
log_file_name: str = WorkspaceConstants.LOG_FILE_NAME,
) -> Optional[str]:
if not workspace_zip:
return None
try:
with ZipFile(io.BytesIO(workspace_zip), "r") as zip_file:
log_name = self._find_client_log_member(zip_file.namelist(), client_name, log_file_name)
if not log_name:
return None
with zip_file.open(log_name, "r") as log_file:
data = log_file.read(self.MAX_RETURNED_JOB_LOG_BYTES + 1)
except (BadZipFile, KeyError, OSError, TypeError):
return None
return self._decode_job_log_data(data)
@staticmethod
def _find_client_log_member(
member_names: List[str], client_name: str, log_file_name: str = WorkspaceConstants.LOG_FILE_NAME
) -> Optional[str]:
client_log_name = f"{client_name}/{log_file_name}"
if client_log_name in member_names:
return client_log_name
suffix = f"/{client_log_name}"
for member_name in member_names:
if not member_name.endswith("/") and member_name.endswith(suffix):
return member_name
return None
@staticmethod
def _client_log_data_type(log_file_name: str = WorkspaceConstants.LOG_FILE_NAME) -> str:
return f"{DataTypes.LOG.value}_{log_file_name}"
def _get_available_client_log_sites(
self,
job_def_manager,
job_id: str,
fl_ctx,
log_file_name: str = WorkspaceConstants.LOG_FILE_NAME,
) -> Set[str]:
component_prefix = f"{self._client_log_data_type(log_file_name)}_"
components = job_def_manager.list_components(jid=job_id, fl_ctx=fl_ctx) or []
return {
component[len(component_prefix) :]
for component in components
if component.startswith(component_prefix) and component[len(component_prefix) :]
}
def _read_workspace_client_job_logs(
self,
job_def_manager,
job_id: str,
fl_ctx,
log_file_name: str = WorkspaceConstants.LOG_FILE_NAME,
) -> Dict[str, str]:
try:
workspace_zip = job_def_manager.get_storage_component(jid=job_id, component=WORKSPACE, fl_ctx=fl_ctx)
except StorageException:
return {}
return self._extract_client_logs_from_workspace_zip(workspace_zip, log_file_name)
def _extract_client_logs_from_workspace_zip(
self, workspace_zip: bytes, log_file_name: str = WorkspaceConstants.LOG_FILE_NAME
) -> Dict[str, str]:
if not workspace_zip:
return {}
try:
with ZipFile(io.BytesIO(workspace_zip), "r") as zip_file:
log_members = self._find_workspace_client_log_members(zip_file.namelist(), log_file_name)
logs = {}
for client_name, log_name in log_members.items():
try:
with zip_file.open(log_name, "r") as log_file:
data = log_file.read(self.MAX_RETURNED_JOB_LOG_BYTES + 1)
except (KeyError, OSError):
continue
text = self._decode_job_log_data(data)
if text is not None:
logs[client_name] = text
return logs
except (BadZipFile, OSError, TypeError):
return {}
@staticmethod
def _find_workspace_client_log_sites(
member_names: List[str], log_file_name: str = WorkspaceConstants.LOG_FILE_NAME
) -> Set[str]:
return set(JobCommandModule._find_workspace_client_log_members(member_names, log_file_name))
@staticmethod
def _find_workspace_client_log_members(
member_names: List[str], log_file_name: str = WorkspaceConstants.LOG_FILE_NAME
) -> Dict[str, str]:
members = {}
exact_members = {}
for member_name in member_names:
if member_name.endswith("/") or not member_name.endswith(f"/{log_file_name}"):
continue
parts = member_name.split("/")
if len(parts) >= 2 and parts[-2] and parts[-2] != SERVER_SITE_NAME:
if len(parts) == 2:
exact_members[parts[-2]] = member_name
elif parts[-2] not in members:
members[parts[-2]] = member_name
# Prefer exact two-part paths (client_name/fl.log) over deeper paths
# (run_1/client_name/fl.log) by letting exact_members overwrite members.
members.update(exact_members)
return members
@staticmethod
def _get_job_client_targets(job) -> Set[str]:
if not job or not getattr(job, "meta", None):
return set()
deploy_map = job.meta.get(JobMetaKey.DEPLOY_MAP.value, {})
if not isinstance(deploy_map, dict):
return set()
targets = set()
for deployments in deploy_map.values():
for site_name in extract_participants(deployments):
if not site_name or site_name == SERVER_SITE_NAME or site_name.upper() == ALL_SITES:
continue
targets.add(site_name)
return targets
def _decode_job_log_data(self, data) -> Optional[str]:
if data is None:
return None
if isinstance(data, str):
raw_data = data.encode("utf-8", errors="replace")
else:
raw_data = bytes(data)
truncated = len(raw_data) > self.MAX_RETURNED_JOB_LOG_BYTES
if truncated:
raw_data = raw_data[: self.MAX_RETURNED_JOB_LOG_BYTES]
text = raw_data.decode("utf-8", errors="replace")
if truncated:
text += f"\n... output truncated after {self.MAX_RETURNED_JOB_LOG_BYTES} bytes ...\n"
return text
def _collect_job_log_lines(self, log_file: str):
lines = []
collected_bytes = 0
truncated_by_bytes = False
with open(log_file, "r", encoding="utf-8", errors="replace") as f:
for line in f:
line_len = len(line.encode("utf-8", errors="replace"))
if collected_bytes + line_len > self.MAX_RETURNED_JOB_LOG_BYTES:
truncated_by_bytes = True
break
collected_bytes += line_len
lines.append(line)
if truncated_by_bytes:
lines.append(f"... output truncated after {self.MAX_RETURNED_JOB_LOG_BYTES} bytes ...\n")
return lines
[docs]
def list_job_components(self, conn: Connection, args: List[str]):
if len(args) < 2:
conn.append_error("Usage: list_job_components job_id", meta=make_meta(MetaStatusValue.SYNTAX_ERROR))
return
job_id = conn.get_prop(self.JOB_ID)
engine = conn.app_ctx
job_def_manager = engine.job_def_manager
if not isinstance(job_def_manager, JobDefManagerSpec):
raise TypeError(
f"job_def_manager in engine is not of type JobDefManagerSpec, but got {type(job_def_manager)}"
)
with engine.new_context() as fl_ctx:
list_of_data = job_def_manager.list_components(jid=job_id, fl_ctx=fl_ctx)
if list_of_data:
system_components = {"workspace", "meta", "scheduled", "data"}
filtered_data = [item for item in list_of_data if item not in system_components]
if filtered_data:
data_str = ", ".join(filtered_data)
conn.append_string(data_str)
conn.append_success(
"", meta=make_meta(MetaStatusValue.OK, extra={MetaKey.JOB_COMPONENTS: filtered_data})
)
else:
_append_no_such_job_error(conn, job_id)
[docs]
def abort_job(self, conn: Connection, args: List[str]):
engine = conn.app_ctx
job_runner = engine.job_runner
try:
job_id = conn.get_prop(self.JOB_ID)
with engine.new_context() as fl_ctx:
job_manager = engine.job_def_manager
job = job_manager.get_job(job_id, fl_ctx)
job_status = job.meta.get(JobMetaKey.STATUS)
if job_status in [RunStatus.SUBMITTED.value, RunStatus.DISPATCHED.value]:
job_manager.set_status(job.job_id, RunStatus.FINISHED_ABORTED, fl_ctx)
message = f"Aborted the job {job_id} before running it."
conn.append_string(message)
conn.append_success("", meta=make_meta(MetaStatusValue.OK, message))
return
elif job_status and job_status.startswith("FINISHED:"):
message = f"Job for {job_id} is already completed."
conn.append_string(message)
conn.append_success("", meta=make_meta(MetaStatusValue.OK, message))
else:
message = job_runner.stop_run(job_id, fl_ctx)
if message:
conn.append_error(message, meta=make_meta(MetaStatusValue.INTERNAL_ERROR, message))
else:
message = "Abort signal has been sent to the server app."
conn.append_string(message)
conn.append_success("", meta=make_meta(MetaStatusValue.OK, message))
except Exception as e:
conn.append_error(
f"Exception occurred trying to abort job: {secure_format_exception(e)}",
meta=make_meta(MetaStatusValue.INTERNAL_ERROR, f"exception {type(e)}"),
)
return
[docs]
def clone_job(self, conn: Connection, args: List[str]):
job = conn.get_prop(self.JOB)
job_id = conn.get_prop(self.JOB_ID)
engine = conn.app_ctx
try:
if not isinstance(engine, ServerEngine):
raise TypeError(f"engine is not of type ServerEngine, but got {type(engine)}")
job_def_manager = engine.job_def_manager
if not isinstance(job_def_manager, JobDefManagerSpec):
raise TypeError(
f"job_def_manager in engine is not of type JobDefManagerSpec, but got {type(job_def_manager)}"
)
with engine.new_context() as fl_ctx:
job_meta = {str(k): job.meta[k] for k in job.meta.keys() & CLONED_META_KEYS}
# set the submitter info for the new job
job_meta[JobMetaKey.SUBMITTER_NAME.value] = conn.get_prop(ConnProps.USER_NAME)
job_meta[JobMetaKey.SUBMITTER_ORG.value] = conn.get_prop(ConnProps.USER_ORG)
job_meta[JobMetaKey.SUBMITTER_ROLE.value] = conn.get_prop(ConnProps.USER_ROLE, "")
job_meta[JobMetaKey.CLONED_FROM.value] = job_id
job_meta[JobMetaKey.STUDY.value] = get_job_meta_study(job.meta)
meta = job_def_manager.clone(from_jid=job_id, meta=job_meta, fl_ctx=fl_ctx)
new_job_id = meta.get(JobMetaKey.JOB_ID)
conn.append_string(f"Cloned job {job_id} as: {new_job_id}")
except Exception as e:
conn.append_error(
f"Exception occurred trying to clone job: {secure_format_exception(e)}",
meta=make_meta(MetaStatusValue.INTERNAL_ERROR, f"exception {type(e)}"),
)
return
conn.append_success("", meta=make_meta(status=MetaStatusValue.OK, extra={MetaKey.JOB_ID: new_job_id}))
@staticmethod
def _job_match(job_meta: Dict, id_prefix: str, name_prefix: str, user_name: str, requested_study: str) -> bool:
job_id = (job_meta.get(JobMetaKey.JOB_ID.value) or "").lower()
job_name = (job_meta.get(JobMetaKey.JOB_NAME.value) or "").lower()
return (
((not id_prefix) or job_id.startswith(id_prefix.lower()))
and ((not name_prefix) or job_name.startswith(name_prefix.lower()))
and ((not user_name) or job_meta.get(JobMetaKey.SUBMITTER_NAME.value) == user_name)
and get_job_meta_study(job_meta) == requested_study
)
@staticmethod
def _submitter_from_conn(conn: Connection) -> dict:
return {
"name": conn.get_prop(ConnProps.USER_NAME, ""),
"org": conn.get_prop(ConnProps.USER_ORG, ""),
"role": conn.get_prop(ConnProps.USER_ROLE, ""),
}
@staticmethod
def _canonical_job_content_hash(zip_file_name: str) -> str:
return canonical_job_content_hash(zip_file_name)
@staticmethod
def _require_submit_record_method(job_def_manager: JobDefManagerSpec, method_name: str):
method = getattr(job_def_manager, method_name, None)
if not callable(method):
raise RuntimeError(f"job_def_manager does not implement required submit-token method '{method_name}'")
return method
def _get_submit_record(
self, job_def_manager: JobDefManagerSpec, study: str, submitter: dict, submit_token: str, fl_ctx
):
method = self._require_submit_record_method(job_def_manager, "get_submit_record")
return method(study, submitter, submit_token, fl_ctx)
def _create_submit_record(self, job_def_manager: JobDefManagerSpec, record: dict, fl_ctx):
method = self._require_submit_record_method(job_def_manager, "create_submit_record")
return method(record, fl_ctx)
def _update_submit_record(self, job_def_manager: JobDefManagerSpec, record: dict, fl_ctx):
method = self._require_submit_record_method(job_def_manager, "update_submit_record")
return method(record, fl_ctx)
def _new_submit_record(
self,
job_def_manager: JobDefManagerSpec,
*,
study: str,
submitter: dict,
submit_token: str,
job_content_hash: str,
job_name: str,
job_folder_name: str,
state: str,
) -> dict:
method = self._require_submit_record_method(job_def_manager, "new_submit_record")
return method(
study=study,
submitter=submitter,
submit_token=submit_token,
job_content_hash=job_content_hash,
job_name=job_name,
job_folder_name=job_folder_name,
state=state,
)
def _ensure_submit_record_job_id(
self,
job_def_manager: JobDefManagerSpec,
record: dict,
*,
study: str,
submitter: dict,
submit_token: str,
job_content_hash: str,
meta: dict,
folder_name: str,
fl_ctx,
):
job_id = record.get(SubmitRecordKey.JOB_ID.value)
if job_id:
return job_id
repaired = self._new_submit_record(
job_def_manager,
study=study,
submitter=submitter,
submit_token=submit_token,
job_content_hash=job_content_hash,
job_name=record.get(SubmitRecordKey.JOB_NAME.value) or CommandUtil.get_job_name(meta),
job_folder_name=record.get(SubmitRecordKey.JOB_FOLDER_NAME.value) or folder_name,
state=record.get(SubmitRecordKey.STATE.value) or SubmitRecordState.CREATING.value,
)
if not isinstance(repaired, dict):
raise RuntimeError("submit record repair did not return a record")
repaired_job_id = repaired.get(SubmitRecordKey.JOB_ID.value)
if not repaired_job_id:
raise RuntimeError("submit record repair is missing job_id")
record.update(repaired)
# Submit-token operations acquire the per-token lock before touching submit-record storage.
# Do not add reverse ordering that takes the store lock before this per-token lock.
self._update_submit_record(job_def_manager, record, fl_ctx)
return repaired_job_id
def _create_job_for_submit_record(
self,
job_def_manager: JobDefManagerSpec,
record: dict,
*,
job_id: str,
meta: dict,
zip_file_name: str,
fl_ctx,
):
job = self._job_for_submit_record(job_def_manager, record, fl_ctx)
existing_job_id = self._job_id_from_job(job)
if existing_job_id:
if record.get(SubmitRecordKey.STATE.value) != SubmitRecordState.CREATED.value:
record[SubmitRecordKey.STATE.value] = SubmitRecordState.CREATED.value
self._update_submit_record(job_def_manager, record, fl_ctx)
return existing_job_id
if isinstance(zip_file_name, str) and not os.path.exists(zip_file_name):
raise RuntimeError("uploaded job content is no longer available for submit-token recovery")
meta[JobMetaKey.JOB_ID.value] = job_id
created_meta = job_def_manager.create(meta, zip_file_name, fl_ctx)
record[SubmitRecordKey.STATE.value] = SubmitRecordState.CREATED.value
self._update_submit_record(job_def_manager, record, fl_ctx)
return created_meta.get(JobMetaKey.JOB_ID.value)
@staticmethod
def _append_submit_token_conflict(conn: Connection, record: dict):
existing_job_id = record.get(SubmitRecordKey.JOB_ID.value) if isinstance(record, dict) else None
extra = {"code": "SUBMIT_TOKEN_CONFLICT"}
if existing_job_id:
extra[MetaKey.JOB_ID] = existing_job_id
conn.append_error(
"SUBMIT_TOKEN_CONFLICT: submit token was already used for different job content. "
"Use a new submit token for a new job.",
meta=make_meta(
SUBMIT_TOKEN_CONFLICT_STATUS,
"submit token was already used for different job content",
extra=extra,
),
)
@staticmethod
def _submit_record_job_deleted(record: dict) -> bool:
return (
isinstance(record, dict) and record.get(SubmitRecordKey.STATE.value) == SubmitRecordState.JOB_DELETED.value
)
@staticmethod
def _append_submit_token_job_deleted(conn: Connection, record: dict):
job_id = record.get(SubmitRecordKey.JOB_ID.value) if isinstance(record, dict) else None
deleted_time = record.get(SubmitRecordKey.DELETED_TIME.value) if isinstance(record, dict) else None
extra = {
"code": "SUBMIT_TOKEN_JOB_DELETED",
"submit_record_state": SubmitRecordState.JOB_DELETED.value,
}
if job_id:
extra[MetaKey.JOB_ID] = job_id
if deleted_time:
extra[SubmitRecordKey.DELETED_TIME.value] = deleted_time
conn.append_error(
"SUBMIT_TOKEN_JOB_DELETED: submit token refers to a deleted job. "
"Use a new submit token to submit again.",
meta=make_meta(
SUBMIT_TOKEN_JOB_DELETED_STATUS,
"submit token refers to a deleted job",
extra=extra,
),
)
def _job_for_submit_record(self, job_def_manager: JobDefManagerSpec, record: dict, fl_ctx):
if not isinstance(record, dict):
return None
job_id = record.get(SubmitRecordKey.JOB_ID.value)
if not job_id:
return None
return job_def_manager.get_job(job_id, fl_ctx)
@staticmethod
def _job_id_from_job(job) -> Optional[str]:
if not job:
return None
job_id = getattr(job, "job_id", None)
if job_id:
return job_id
meta = getattr(job, "meta", None)
if isinstance(meta, dict):
return meta.get(JobMetaKey.JOB_ID.value)
return None
@classmethod
def _submit_token_lock(cls, study: str, submitter: dict, submit_token: str):
key = (
study,
submitter.get("name", ""),
submitter.get("org", ""),
submitter.get("role", ""),
submit_token,
)
with cls._submit_token_locks_guard:
lock = cls._submit_token_locks.get(key)
if lock is None:
lock = threading.Lock()
cls._submit_token_locks[key] = lock
return lock
def _handle_submit_token_record(
self,
conn: Connection,
job_def_manager: JobDefManagerSpec,
*,
study: str,
submitter: dict,
submit_token: str,
job_content_hash: str,
meta: dict,
folder_name: str,
zip_file_name: str,
fl_ctx,
):
with self._submit_token_lock(study, submitter, submit_token):
return self._handle_submit_token_record_locked(
conn,
job_def_manager,
study=study,
submitter=submitter,
submit_token=submit_token,
job_content_hash=job_content_hash,
meta=meta,
folder_name=folder_name,
zip_file_name=zip_file_name,
fl_ctx=fl_ctx,
)
def _handle_submit_token_record_locked(
self,
conn: Connection,
job_def_manager: JobDefManagerSpec,
*,
study: str,
submitter: dict,
submit_token: str,
job_content_hash: str,
meta: dict,
folder_name: str,
zip_file_name: str,
fl_ctx,
):
meta = dict(meta)
record = self._get_submit_record(job_def_manager, study, submitter, submit_token, fl_ctx)
if record:
if self._submit_record_job_deleted(record):
self._append_submit_token_job_deleted(conn, record)
return None
if record.get(SubmitRecordKey.JOB_CONTENT_HASH.value) != job_content_hash:
self._append_submit_token_conflict(conn, record)
return None
job = self._job_for_submit_record(job_def_manager, record, fl_ctx)
existing_job_id = self._job_id_from_job(job)
if existing_job_id:
return existing_job_id
recorded_job_id = self._ensure_submit_record_job_id(
job_def_manager,
record,
study=study,
submitter=submitter,
submit_token=submit_token,
job_content_hash=job_content_hash,
meta=meta,
folder_name=folder_name,
fl_ctx=fl_ctx,
)
return self._create_job_for_submit_record(
job_def_manager,
record,
job_id=recorded_job_id,
meta=meta,
zip_file_name=zip_file_name,
fl_ctx=fl_ctx,
)
record = self._new_submit_record(
job_def_manager,
study=study,
submitter=submitter,
submit_token=submit_token,
job_content_hash=job_content_hash,
job_name=CommandUtil.get_job_name(meta),
job_folder_name=folder_name,
state=SubmitRecordState.CREATING.value,
)
job_id = record.get(SubmitRecordKey.JOB_ID.value)
if not job_id:
raise RuntimeError("submit record is missing job_id")
meta[JobMetaKey.JOB_ID.value] = job_id
try:
create_result = self._create_submit_record(job_def_manager, record, fl_ctx)
except Exception:
existing = self._get_submit_record(job_def_manager, study, submitter, submit_token, fl_ctx)
if not existing:
raise
if self._submit_record_job_deleted(existing):
self._append_submit_token_job_deleted(conn, existing)
return None
if existing.get(SubmitRecordKey.JOB_CONTENT_HASH.value) != job_content_hash:
self._append_submit_token_conflict(conn, existing)
return None
job = self._job_for_submit_record(job_def_manager, existing, fl_ctx)
existing_job_id = self._job_id_from_job(job)
if existing_job_id:
return existing_job_id
recovered_job_id = self._ensure_submit_record_job_id(
job_def_manager,
existing,
study=study,
submitter=submitter,
submit_token=submit_token,
job_content_hash=job_content_hash,
meta=meta,
folder_name=folder_name,
fl_ctx=fl_ctx,
)
return self._create_job_for_submit_record(
job_def_manager,
existing,
job_id=recovered_job_id,
meta=meta,
zip_file_name=zip_file_name,
fl_ctx=fl_ctx,
)
if create_result is False:
existing = self._get_submit_record(job_def_manager, study, submitter, submit_token, fl_ctx)
if not existing:
raise RuntimeError("submit record creation failed and no existing record was found")
if self._submit_record_job_deleted(existing):
self._append_submit_token_job_deleted(conn, existing)
return None
if existing.get(SubmitRecordKey.JOB_CONTENT_HASH.value) != job_content_hash:
self._append_submit_token_conflict(conn, existing)
return None
job = self._job_for_submit_record(job_def_manager, existing, fl_ctx)
resolved_job_id = self._job_id_from_job(job)
if resolved_job_id:
return resolved_job_id
existing_job_id = self._ensure_submit_record_job_id(
job_def_manager,
existing,
study=study,
submitter=submitter,
submit_token=submit_token,
job_content_hash=job_content_hash,
meta=meta,
folder_name=folder_name,
fl_ctx=fl_ctx,
)
return self._create_job_for_submit_record(
job_def_manager,
existing,
job_id=existing_job_id,
meta=meta,
zip_file_name=zip_file_name,
fl_ctx=fl_ctx,
)
created_meta = job_def_manager.create(meta, zip_file_name, fl_ctx)
record[SubmitRecordKey.STATE.value] = SubmitRecordState.CREATED.value
self._update_submit_record(job_def_manager, record, fl_ctx)
return created_meta.get(JobMetaKey.JOB_ID.value)
def _list_jobs_by_submit_token(
self,
job_def_manager: JobDefManagerSpec,
study: str,
submitter: dict,
submit_token: str,
fl_ctx,
) -> List[Job]:
record = self._get_submit_record(job_def_manager, study, submitter, submit_token, fl_ctx)
if self._submit_record_job_deleted(record):
raise _SubmitTokenJobDeleted(record)
get_job_by_submit_token = getattr(job_def_manager, "get_job_by_submit_token", None)
if callable(get_job_by_submit_token):
job = get_job_by_submit_token(study, submitter, submit_token, fl_ctx)
if not job:
return []
return job if isinstance(job, list) else [job]
job = self._job_for_submit_record(job_def_manager, record, fl_ctx)
return [job] if job else []
@staticmethod
def _send_detail_list(conn: Connection, jobs: List[Job]):
list_of_jobs = []
for job in jobs:
try:
JobCommandModule._set_duration(job)
except Exception:
pass
normalized_meta = dict(job.meta)
normalized_meta[JobMetaKey.STUDY.value] = get_job_meta_study(job.meta)
conn.append_string(json.dumps(normalized_meta, indent=4))
list_of_jobs.append(normalized_meta)
conn.append_string("", meta=make_meta(MetaStatusValue.OK, extra={MetaKey.JOBS: list_of_jobs}))
@staticmethod
def _send_summary_list(conn: Connection, jobs: List[Job]):
table = conn.append_table(["Job ID", "Name", "Status", "Submit Time", "Run Duration"], name=MetaKey.JOBS)
for job in jobs:
try:
JobCommandModule._set_duration(job)
except Exception:
pass
table_row = [
job.meta.get(JobMetaKey.JOB_ID.value, ""),
CommandUtil.get_job_name(job.meta),
job.meta.get(JobMetaKey.STATUS.value, ""),
job.meta.get(JobMetaKey.SUBMIT_TIME_ISO.value, ""),
str(job.meta.get(JobMetaKey.DURATION.value, "N/A")),
]
table.add_row(
table_row,
meta={
MetaKey.JOB_ID: job.meta.get(JobMetaKey.JOB_ID.value, ""),
MetaKey.JOB_NAME: CommandUtil.get_job_name(job.meta),
MetaKey.STATUS: job.meta.get(JobMetaKey.STATUS.value, ""),
MetaKey.SUBMIT_TIME: job.meta.get(JobMetaKey.SUBMIT_TIME_ISO.value, ""),
MetaKey.DURATION: str(job.meta.get(JobMetaKey.DURATION.value, "N/A")),
JobMetaKey.STUDY.value: get_job_meta_study(job.meta),
},
)
@staticmethod
def _set_duration(job):
if job.meta.get(JobMetaKey.STATUS) == RunStatus.RUNNING.value:
try:
start_time = datetime.datetime.strptime(
job.meta.get(JobMetaKey.START_TIME.value), "%Y-%m-%d %H:%M:%S.%f"
)
except (TypeError, ValueError):
return
duration = datetime.datetime.now() - start_time
job.meta[JobMetaKey.DURATION.value] = str(duration)
[docs]
def submit_job(self, conn: Connection, args: List[str]):
try:
parser = _create_submit_job_cmd_parser()
parsed_args = parser.parse_args(args[1:])
folder_name = parsed_args.folder_name
submit_token = validate_submit_token(parsed_args.submit_token)
except ValueError as e:
conn.append_error(str(e), meta=make_meta(MetaStatusValue.SYNTAX_ERROR, str(e)))
return
zip_file_name = conn.get_prop(ConnProps.FILE_LOCATION)
if not zip_file_name:
conn.append_error("missing upload file", meta=make_meta(MetaStatusValue.INTERNAL_ERROR))
return
engine = conn.app_ctx
try:
with engine.new_context() as fl_ctx:
job_validator = JobMetaValidator()
valid, error, meta = job_validator.validate(folder_name, zip_file_name)
if not valid:
conn.append_error(error, meta=make_meta(MetaStatusValue.INVALID_JOB_DEFINITION, error))
return
# Strip privileged meta keys that must only be set by internal server components.
# A user-submitted job is never "from hub site" — only HubAppDeployer sets this flag
# after verifying the job, and it operates on the server side.
meta.pop(JobMetaKey.FROM_HUB_SITE.value, None)
# Submit-token is server-owned submission metadata. User job metadata must not expose it.
meta.pop(SubmitRecordKey.SUBMIT_TOKEN.value, None)
job_def_manager = engine.job_def_manager
if not isinstance(job_def_manager, JobDefManagerSpec):
raise TypeError(
f"job_def_manager in engine is not of type JobDefManagerSpec, but got {type(job_def_manager)}"
)
meta[JobMetaKey.STUDY.value] = _active_study_from_conn(conn)
registry = StudyRegistryService.get_registry()
requested_study = meta[JobMetaKey.STUDY.value]
if registry:
enrolled_sites = registry.get_sites(requested_study)
if enrolled_sites is not None:
deploy_map = meta.get(JobMetaKey.DEPLOY_MAP.value, {})
invalid_sites = []
seen_invalid_sites = set()
for deployments in deploy_map.values():
for site_name in extract_participants(deployments):
if site_name == SERVER_SITE_NAME or site_name.upper() == ALL_SITES:
continue
if site_name not in enrolled_sites:
if site_name not in seen_invalid_sites:
invalid_sites.append(site_name)
seen_invalid_sites.add(site_name)
if invalid_sites:
if len(invalid_sites) == 1:
error = f"site '{invalid_sites[0]}' is not enrolled in study '{requested_study}'"
else:
quoted_names = ", ".join(f"'{name}'" for name in invalid_sites)
error = f"sites {quoted_names} are not enrolled in study '{requested_study}'"
conn.append_error(error, meta=make_meta(MetaStatusValue.INVALID_JOB_DEFINITION, error))
return
fl_ctx.set_prop(FLContextKey.JOB_META, meta, private=True, sticky=False)
engine.fire_event(EventType.SUBMIT_JOB, fl_ctx)
block_reason = fl_ctx.get_prop(FLContextKey.JOB_BLOCK_REASON)
if block_reason:
# submitted job blocked
self.logger.error(f"submitted job is blocked: {block_reason}")
conn.append_error(
block_reason, meta=make_meta(MetaStatusValue.INVALID_JOB_DEFINITION, block_reason)
)
return
# set submitter info
submitter = self._submitter_from_conn(conn)
meta[JobMetaKey.SUBMITTER_NAME.value] = submitter["name"]
meta[JobMetaKey.SUBMITTER_ORG.value] = submitter["org"]
meta[JobMetaKey.SUBMITTER_ROLE.value] = submitter["role"]
meta[JobMetaKey.JOB_FOLDER_NAME.value] = folder_name
custom_props = conn.get_prop(ConnProps.CUSTOM_PROPS)
if custom_props:
meta[JobMetaKey.CUSTOM_PROPS.value] = custom_props
if submit_token:
job_content_hash = self._canonical_job_content_hash(zip_file_name)
job_id = self._handle_submit_token_record(
conn,
job_def_manager,
study=requested_study,
submitter=submitter,
submit_token=submit_token,
job_content_hash=job_content_hash,
meta=meta,
folder_name=folder_name,
zip_file_name=zip_file_name,
fl_ctx=fl_ctx,
)
if job_id is None:
return
else:
meta = job_def_manager.create(meta, zip_file_name, fl_ctx)
job_id = meta.get(JobMetaKey.JOB_ID)
# os.remove(zip_file_name) # the file is no longer needed
conn.append_string(f"Submitted job: {job_id}")
conn.append_success("", meta=make_meta(MetaStatusValue.OK, extra={MetaKey.JOB_ID: job_id}))
except Exception as e:
conn.append_error(
f"Exception occurred trying to submit job: {secure_format_exception(e)}",
meta=make_meta(MetaStatusValue.INTERNAL_ERROR, f"exception {type(e)} occurred"),
)
return
def _clean_up_download(self, conn: Connection, tx_id: str):
"""
Remove the job download folder
"""
job_download_dir = self.tx_path(conn, tx_id)
shutil.rmtree(job_download_dir, ignore_errors=True)
def _download_job_comps(self, conn: Connection, args: List[str], get_comps_f, require_finished: bool = False):
"""
Job download uses binary protocol for more efficient download.
- Retrieve job data from job store. This puts job files (meta, data, and workspace) in a transfer folder
- Returns job file names, a TX ID, and a command name for downloading files to the admin client
- Admin client downloads received file names one by one. It signals the end of download in the last command.
"""
engine = conn.app_ctx
job_def_manager = engine.job_def_manager
if not isinstance(job_def_manager, JobDefManagerSpec):
self.logger.error(
f"job_def_manager in engine is not of type JobDefManagerSpec, but got {type(job_def_manager)}"
)
conn.append_error("internal error", meta=make_meta(MetaStatusValue.INTERNAL_ERROR))
return
# It is possible that the same job is downloaded in multiple sessions at the same time.
# To allow this, we use a separate sub-folder in the download_dir for each download.
# This sub-folder is named with a transaction ID (tx_id), which is a UUID.
# The folder path for download the job is: <download_dir>/<tx_id>/<job_id>.
tx_id = str(uuid.uuid4()) # generate a new tx_id
job_download_dir = self.tx_path(conn, tx_id) # absolute path of the job download dir.
job_id = args[1]
with engine.new_context() as fl_ctx:
if require_finished:
job = job_def_manager.get_job(job_id, fl_ctx)
if not job:
_append_no_such_job_error(conn, job_id)
return
job_status = job.meta.get(JobMetaKey.STATUS)
if not job_status or not job_status.startswith("FINISHED:"):
conn.append_error(
f"job {job_id} is not done",
meta=make_meta(MetaStatusValue.JOB_RUNNING, job_id),
)
return
comps = get_comps_f(job_def_manager, job_id, fl_ctx)
if not comps:
conn.append_string(
"No components to download",
meta=make_meta(
MetaStatusValue.NO_JOB_COMPONENTS,
info="No components to download",
),
)
return
try:
for ct, file_name in comps:
job_def_manager.get_storage_for_download(job_id, job_download_dir, ct, file_name, fl_ctx)
self.download_folder(
conn,
tx_id=tx_id,
folder_name=job_id,
)
except Exception as e:
secure_log_traceback()
self.logger.error(f"exception downloading job {job_id}: {secure_format_exception(e)}")
self._clean_up_download(conn, tx_id)
conn.append_error("internal error", meta=make_meta(MetaStatusValue.INTERNAL_ERROR))
def _get_default_job_components(self, job_def_manager, job_id, fl_ctx):
return [(DATA, JOB_ZIP), (META, META_JSON), (WORKSPACE, WORKSPACE_ZIP)]
[docs]
def download_job(self, conn: Connection, args: List[str]):
"""
Job download uses binary protocol for more efficient download.
- Retrieve job data from job store. This puts job files (meta, data, and workspace) in a transfer folder
- Returns job file names, a TX ID, and a command name for downloading files to the admin client
- Admin client downloads received file names one by one. It signals the end of download in the last command.
"""
self._download_job_comps(conn, args, self._get_default_job_components, require_finished=True)
[docs]
def download_job_components(self, conn: Connection, args: List[str]):
"""Download additional job components (e.g., ERRORLOG_site-1) for a specified job.
Based on job download but downloads the additional components for a job that job download does
not download.
"""
self._download_job_comps(conn, args, self._get_extra_job_components)
def _get_extra_job_components(self, job_def_manager, job_id, fl_ctx):
all_components = job_def_manager.list_components(jid=job_id, fl_ctx=fl_ctx)
if all_components:
return [
(item, item)
for item in all_components
if item not in {WORKSPACE, META, "scheduled", DATA} and StorageSpec.is_valid_component(item)
]
else:
return None
[docs]
def do_app_command(self, conn: Connection, args: List[str]):
# cmd job_id topic
if len(args) != 3:
cmd_entry = conn.get_prop(ConnProps.CMD_ENTRY)
conn.append_error(f"Usage: {cmd_entry.usage}", meta=make_meta(MetaStatusValue.SYNTAX_ERROR, ""))
return
engine = conn.app_ctx
if not isinstance(engine, ServerEngineInternalSpec):
raise TypeError(f"engine must be ServerEngineInternalSpec but got {type(engine)}")
job_id = conn.get_prop(self.JOB_ID)
topic = args[2]
cmd_data = conn.get_prop(ConnProps.CMD_PROPS)
if job_id not in engine.run_processes:
conn.append_error(
f"Job_id: {job_id} is not running.", meta=make_meta(MetaStatusValue.JOB_NOT_RUNNING, job_id)
)
return
timeout = conn.get_prop(ConnProps.CMD_TIMEOUT)
if timeout is None:
timeout = 5.0
result = engine.send_app_command(job_id, topic, cmd_data, timeout)
if result is None:
conn.append_error(
"command execution error: no result", meta=make_meta(MetaStatusValue.NO_REPLY, "no result")
)
return
if not isinstance(result, Shareable):
conn.append_error(
f"command execution internal error: invalid result type {type(result)}",
meta=make_meta(MetaStatusValue.INTERNAL_ERROR, f"invalid result type {type(result)}"),
)
return
rc = result.get_return_code()
if rc != ReturnCode.OK:
reason = result.get_header(ServerCommandKey.REASON)
conn.append_error(
f"command execution error: {rc=} {reason=}", meta=make_meta(MetaStatusValue.ERROR, f"{rc=} {reason=}")
)
return
reply = result.get(ServerCommandKey.DATA)
conn.append_dict(reply, meta=make_meta(MetaStatusValue.OK))