# Copyright (c) 2022, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import datetime
import threading
import time
from typing import Dict, List, Optional, Tuple
from nvflare.apis.event_type import EventType
from nvflare.apis.fl_component import FLComponent
from nvflare.apis.fl_constant import FLContextKey
from nvflare.apis.fl_context import FLContext
from nvflare.apis.job_def import ALL_SITES, SERVER_SITE_NAME, Job, JobMetaKey, RunStatus
from nvflare.apis.job_def_manager_spec import JobDefManagerSpec
from nvflare.apis.job_scheduler_spec import DispatchInfo, JobSchedulerSpec
from nvflare.apis.server_engine_spec import ServerEngineSpec
SCHEDULE_RESULT_OK = 0 # the job is scheduled
SCHEDULE_RESULT_NO_RESOURCE = 1 # job is not scheduled due to lack of resources
SCHEDULE_RESULT_BLOCK = 2 # job is to be blocked from scheduled again due to fatal error
[docs]class DefaultJobScheduler(JobSchedulerSpec, FLComponent):
def __init__(
self,
max_jobs: int = 1,
max_schedule_count: int = 10,
min_schedule_interval: float = 10.0,
max_schedule_interval: float = 600.0,
):
"""
Create a DefaultJobScheduler
Args:
max_jobs: max number of concurrent jobs allowed
max_schedule_count: max number of times to try to schedule a job
min_schedule_interval: min interval between two schedules
max_schedule_interval: max interval between two schedules
"""
super().__init__()
self.max_jobs = max_jobs
self.max_schedule_count = max_schedule_count
self.min_schedule_interval = min_schedule_interval
self.max_schedule_interval = max_schedule_interval
self.scheduled_jobs = []
self.lock = threading.Lock()
def _check_client_resources(
self, job: Job, resource_reqs: Dict[str, dict], fl_ctx: FLContext
) -> Dict[str, Tuple[bool, str]]:
"""Checks resources on each site.
Args:
resource_reqs (dict): {client_name: resource_requirements}
Returns:
A dict of {client_name: client_check_result}.
client_check_result is a tuple of (is_resource_enough, token);
is_resource_enough is a bool indicates whether there is enough resources;
token is for resource reservation / cancellation for this check request.
"""
engine = fl_ctx.get_engine()
if not isinstance(engine, ServerEngineSpec):
raise RuntimeError(f"engine inside fl_ctx should be of type ServerEngineSpec, but got {type(engine)}.")
result = engine.check_client_resources(job, resource_reqs, fl_ctx)
self.log_debug(fl_ctx, f"check client resources result: {result}")
return result
def _cancel_resources(
self, resource_reqs: Dict[str, dict], resource_check_results: Dict[str, Tuple[bool, str]], fl_ctx: FLContext
):
"""Cancels any reserved resources based on resource check results.
Args:
resource_reqs (dict): {client_name: resource_requirements}
resource_check_results: A dict of {client_name: client_check_result}
where client_check_result is a tuple of {is_resource_enough, resource reserve token if any}
fl_ctx: FL context
"""
engine = fl_ctx.get_engine()
if not isinstance(engine, ServerEngineSpec):
raise RuntimeError(f"engine inside fl_ctx should be of type ServerEngineSpec, but got {type(engine)}.")
engine.cancel_client_resources(resource_check_results, resource_reqs, fl_ctx)
self.log_debug(fl_ctx, f"cancel client resources using check results: {resource_check_results}")
return False, None
def _try_job(self, job: Job, fl_ctx: FLContext) -> (int, Optional[Dict[str, DispatchInfo]], str):
engine = fl_ctx.get_engine()
online_clients = engine.get_clients()
online_site_names = [x.name for x in online_clients]
if not job.deploy_map:
self.log_error(fl_ctx, f"Job '{job.job_id}' does not have deploy_map, can't be scheduled.")
return SCHEDULE_RESULT_BLOCK, None, "no deploy map"
applicable_sites = []
sites_to_app = {}
for app_name in job.deploy_map:
for site_name in job.deploy_map[app_name]:
if site_name.upper() == ALL_SITES:
# deploy_map: {"app_name": ["ALL_SITES"]} will be treated as deploying to all online clients
applicable_sites = online_site_names
sites_to_app = {x: app_name for x in online_site_names}
sites_to_app[SERVER_SITE_NAME] = app_name
elif site_name in online_site_names:
applicable_sites.append(site_name)
sites_to_app[site_name] = app_name
elif site_name == SERVER_SITE_NAME:
sites_to_app[SERVER_SITE_NAME] = app_name
self.log_debug(fl_ctx, f"Job {job.job_id} is checking against applicable sites: {applicable_sites}")
required_sites = job.required_sites if job.required_sites else []
if required_sites:
for s in required_sites:
if s not in applicable_sites:
self.log_debug(fl_ctx, f"Job {job.job_id} can't be scheduled: required site {s} is not connected.")
return SCHEDULE_RESULT_NO_RESOURCE, None, f"missing required site {s}"
if job.min_sites and len(applicable_sites) < job.min_sites:
self.log_debug(
fl_ctx,
f"Job {job.job_id} can't be scheduled: connected sites ({len(applicable_sites)}) "
f"are less than min_sites ({job.min_sites}).",
)
return (
SCHEDULE_RESULT_NO_RESOURCE,
None,
f"connected sites ({len(applicable_sites)}) < min_sites ({job.min_sites})",
)
# we are assuming server resource is sufficient
resource_reqs = {}
for site_name in applicable_sites:
if site_name in job.resource_spec:
resource_reqs[site_name] = job.resource_spec[site_name]
else:
resource_reqs[site_name] = {}
job_participants = [fl_ctx.get_identity_name(default=SERVER_SITE_NAME)]
job_participants.extend(applicable_sites)
fl_ctx.set_prop(FLContextKey.CURRENT_JOB_ID, job.job_id, private=True)
fl_ctx.set_prop(FLContextKey.CLIENT_RESOURCE_SPECS, resource_reqs, private=True, sticky=False)
fl_ctx.set_prop(FLContextKey.JOB_PARTICIPANTS, job_participants, private=True, sticky=False)
fl_ctx.set_prop(FLContextKey.JOB_META, job.meta, private=True, sticky=False)
self.fire_event(EventType.BEFORE_CHECK_CLIENT_RESOURCES, fl_ctx)
block_reason = fl_ctx.get_prop(FLContextKey.JOB_BLOCK_REASON)
if block_reason:
# cannot schedule this job
self.log_info(fl_ctx, f"Job {job.job_id} can't be scheduled: {block_reason}")
return SCHEDULE_RESULT_NO_RESOURCE, None, block_reason
resource_check_results = self._check_client_resources(job=job, resource_reqs=resource_reqs, fl_ctx=fl_ctx)
fl_ctx.set_prop(FLContextKey.RESOURCE_CHECK_RESULT, resource_check_results, private=True, sticky=False)
self.fire_event(EventType.AFTER_CHECK_CLIENT_RESOURCES, fl_ctx)
if not resource_check_results:
self.log_debug(fl_ctx, f"Job {job.job_id} can't be scheduled: resource check results is None or empty.")
return SCHEDULE_RESULT_NO_RESOURCE, None, "error checking resources"
required_sites_not_enough_resource = list(required_sites)
num_sites_ok = 0
sites_dispatch_info = {}
no_resource_message = ""
for site_name, check_result in resource_check_results.items():
is_resource_enough, token = check_result
if is_resource_enough:
sites_dispatch_info[site_name] = DispatchInfo(
app_name=sites_to_app[site_name],
resource_requirements=resource_reqs[site_name],
token=token,
)
num_sites_ok += 1
if site_name in required_sites:
required_sites_not_enough_resource.remove(site_name)
else:
if site_name in required_sites:
no_resource_message += site_name + ":" + token + ";"
if num_sites_ok < job.min_sites:
self.log_debug(fl_ctx, f"Job {job.job_id} can't be scheduled: not enough sites have enough resources.")
self._cancel_resources(
resource_reqs=job.resource_spec, resource_check_results=resource_check_results, fl_ctx=fl_ctx
)
return (
SCHEDULE_RESULT_NO_RESOURCE,
None,
f"not enough sites have enough resources (ok sites {num_sites_ok} < min sites {job.min_sites})",
)
if required_sites_not_enough_resource:
self.log_debug(
fl_ctx,
f"Job {job.job_id} can't be scheduled: required sites: {required_sites_not_enough_resource}"
f" don't have enough resources.",
)
self._cancel_resources(
resource_reqs=job.resource_spec, resource_check_results=resource_check_results, fl_ctx=fl_ctx
)
return (
SCHEDULE_RESULT_NO_RESOURCE,
None,
f"required sites: {required_sites_not_enough_resource} don't have enough resources. "
f"Details: {no_resource_message}",
)
# add server dispatch info
sites_dispatch_info[SERVER_SITE_NAME] = DispatchInfo(
app_name=sites_to_app[SERVER_SITE_NAME], resource_requirements={}, token=None
)
return SCHEDULE_RESULT_OK, sites_dispatch_info, ""
def _exceed_max_jobs(self, fl_ctx: FLContext) -> bool:
exceed_limit = False
with self.lock:
if len(self.scheduled_jobs) >= self.max_jobs:
self.log_debug(
fl_ctx,
f"Skipping schedule job because scheduled_jobs ({len(self.scheduled_jobs)}) "
f"is greater than max_jobs ({self.max_jobs})",
)
exceed_limit = True
return exceed_limit
[docs] def handle_event(self, event_type: str, fl_ctx: FLContext):
if event_type == EventType.JOB_STARTED:
with self.lock:
job_id = fl_ctx.get_prop(FLContextKey.CURRENT_JOB_ID)
if job_id not in self.scheduled_jobs:
self.scheduled_jobs.append(job_id)
elif event_type == EventType.JOB_COMPLETED or event_type == EventType.JOB_ABORTED:
with self.lock:
job_id = fl_ctx.get_prop(FLContextKey.CURRENT_JOB_ID)
if job_id in self.scheduled_jobs:
self.scheduled_jobs.remove(job_id)
[docs] def schedule_job(
self, job_manager: JobDefManagerSpec, job_candidates: List[Job], fl_ctx: FLContext
) -> (Optional[Job], Optional[Dict[str, DispatchInfo]]):
failed_jobs = []
blocked_jobs = []
try:
ready_job, dispatch_info = self._do_schedule_job(job_candidates, fl_ctx, failed_jobs, blocked_jobs)
except:
self.log_exception(fl_ctx, "error scheduling job")
ready_job, dispatch_info = None, None
# process failed and blocked jobs
try:
if failed_jobs:
# set the try count
for job in failed_jobs:
job_manager.refresh_meta(job, self._get_update_meta_keys(), fl_ctx)
if blocked_jobs:
for job in blocked_jobs:
job_manager.refresh_meta(job, self._get_update_meta_keys(), fl_ctx)
job_manager.set_status(job.job_id, RunStatus.FINISHED_CANT_SCHEDULE, fl_ctx)
except:
self.log_exception(fl_ctx, "error updating scheduling info in job store")
return ready_job, dispatch_info
def _get_update_meta_keys(self):
return [
JobMetaKey.SCHEDULE_COUNT.value,
JobMetaKey.LAST_SCHEDULE_TIME.value,
JobMetaKey.SCHEDULE_HISTORY.value,
]
def _update_schedule_history(self, job: Job, result: str, fl_ctx: FLContext):
history = job.meta.get(JobMetaKey.SCHEDULE_HISTORY.value, None)
if not history:
history = []
job.meta[JobMetaKey.SCHEDULE_HISTORY.value] = history
now = datetime.datetime.now()
cur_time = now.strftime("%Y-%m-%d %H:%M:%S")
history.append(f"{cur_time}: {result}")
self.log_info(fl_ctx, f"Try to schedule job {job.job_id}, get result: ({result}).")
schedule_count = job.meta.get(JobMetaKey.SCHEDULE_COUNT.value, 0)
schedule_count += 1
job.meta[JobMetaKey.SCHEDULE_COUNT.value] = schedule_count
job.meta[JobMetaKey.LAST_SCHEDULE_TIME.value] = time.time()
def _do_schedule_job(
self, job_candidates: List[Job], fl_ctx: FLContext, failed_jobs: list, blocked_jobs: list
) -> (Optional[Job], Optional[Dict[str, DispatchInfo]]):
self.log_debug(fl_ctx, f"Current scheduled_jobs is {self.scheduled_jobs}")
if self._exceed_max_jobs(fl_ctx=fl_ctx):
self.log_debug(fl_ctx, f"skipped scheduling since there are {self.max_jobs} concurrent job(s) already")
return None, None
# sort by submitted time
job_candidates.sort(key=lambda j: j.meta.get(JobMetaKey.SUBMIT_TIME.value, 0.0))
engine = fl_ctx.get_engine()
for job in job_candidates:
schedule_count = job.meta.get(JobMetaKey.SCHEDULE_COUNT.value, 0)
if schedule_count >= self.max_schedule_count:
self.log_info(
fl_ctx, f"skipped job {job.job_id} since it exceeded max schedule count {self.max_schedule_count}"
)
blocked_jobs.append(job)
self._update_schedule_history(job, f"exceeded max schedule count {self.max_schedule_count}", fl_ctx)
continue
last_schedule_time = job.meta.get(JobMetaKey.LAST_SCHEDULE_TIME.value, 0.0)
time_since_last_schedule = time.time() - last_schedule_time
n = 0 if schedule_count == 0 else schedule_count - 1
required_interval = min(self.max_schedule_interval, (2**n) * self.min_schedule_interval)
if time_since_last_schedule < required_interval:
# do not schedule again too soon
continue
with engine.new_context() as ctx:
rc, sites_dispatch_info, result = self._try_job(job, ctx)
self.log_debug(ctx, f"Try to schedule job {job.job_id}, get result: {rc}, {sites_dispatch_info}.")
if not result:
result = "scheduled"
self._update_schedule_history(job, result, ctx)
if rc == SCHEDULE_RESULT_OK:
return job, sites_dispatch_info
elif rc == SCHEDULE_RESULT_NO_RESOURCE:
failed_jobs.append(job)
else:
blocked_jobs.append(job)
self.log_debug(fl_ctx, "No job is scheduled.")
return None, None
[docs] def restore_scheduled_job(self, job_id: str):
with self.lock:
if job_id not in self.scheduled_jobs:
self.scheduled_jobs.append(job_id)
[docs] def remove_scheduled_job(self, job_id: str):
with self.lock:
if job_id in self.scheduled_jobs:
self.scheduled_jobs.remove(job_id)