# Copyright (c) 2021-2022, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import List
from nvflare.apis.fl_constant import ReturnCode, SystemComponents
from nvflare.apis.resource_manager_spec import ResourceConsumerSpec, ResourceManagerSpec
from nvflare.apis.shareable import Shareable
from nvflare.fuel.utils import fobs
from nvflare.private.admin_defs import Message
from nvflare.private.defs import ERROR_MSG_PREFIX, RequestHeader, TrainingTopic
from nvflare.private.fed.client.admin import RequestProcessor
from nvflare.private.fed.client.client_engine_internal_spec import ClientEngineInternalSpec
from nvflare.private.scheduler_constants import ShareableHeader
[docs]class CheckResourceProcessor(RequestProcessor):
[docs] def get_topics(self) -> List[str]:
return [TrainingTopic.CHECK_RESOURCE]
[docs] def process(self, req: Message, app_ctx) -> Message:
engine = app_ctx
if not isinstance(engine, ClientEngineInternalSpec):
raise TypeError("engine must be ClientEngineInternalSpec, but got {}".format(type(engine)))
resource_manager = engine.get_component(SystemComponents.RESOURCE_MANAGER)
if not isinstance(resource_manager, ResourceManagerSpec):
raise RuntimeError(
f"resource_manager should be of type ResourceManagerSpec, but got {type(resource_manager)}."
)
with engine.new_context() as fl_ctx:
result = Shareable()
try:
resource_spec = fobs.loads(req.body)
check_result, token = resource_manager.check_resources(
resource_requirement=resource_spec, fl_ctx=fl_ctx
)
result.set_header(ShareableHeader.CHECK_RESOURCE_RESULT, check_result)
result.set_header(ShareableHeader.RESOURCE_RESERVE_TOKEN, token)
except Exception:
result.set_return_code(ReturnCode.EXECUTION_EXCEPTION)
return Message(topic="reply_" + req.topic, body=fobs.dumps(result))
[docs]class StartJobProcessor(RequestProcessor):
[docs] def get_topics(self) -> List[str]:
return [TrainingTopic.START_JOB]
[docs] def process(self, req: Message, app_ctx) -> Message:
engine = app_ctx
if not isinstance(engine, ClientEngineInternalSpec):
raise TypeError("engine must be ClientEngineInternalSpec, but got {}".format(type(engine)))
resource_manager = engine.get_component(SystemComponents.RESOURCE_MANAGER)
if not isinstance(resource_manager, ResourceManagerSpec):
raise RuntimeError(
f"resource_manager should be of type ResourceManagerSpec, but got {type(resource_manager)}."
)
resource_consumer = engine.get_component(SystemComponents.RESOURCE_CONSUMER)
if not isinstance(resource_consumer, ResourceConsumerSpec):
raise RuntimeError(
f"resource_consumer should be of type ResourceConsumerSpec, but got {type(resource_consumer)}."
)
try:
with engine.new_context() as fl_ctx:
resource_spec = fobs.loads(req.body)
job_id = req.get_header(RequestHeader.JOB_ID)
token = req.get_header(ShareableHeader.RESOURCE_RESERVE_TOKEN)
allocated_resources = resource_manager.allocate_resources(
resource_requirement=resource_spec, token=token, fl_ctx=fl_ctx
)
result = engine.start_app(
job_id,
allocated_resource=allocated_resources,
token=token,
resource_consumer=resource_consumer,
resource_manager=resource_manager,
)
except Exception as e:
result = f"{ERROR_MSG_PREFIX}: Execution exception: {e}."
if not result:
result = "OK"
return Message(topic="reply_" + req.topic, body=result)
[docs]class CancelResourceProcessor(RequestProcessor):
[docs] def get_topics(self) -> List[str]:
return [TrainingTopic.CANCEL_RESOURCE]
[docs] def process(self, req: Message, app_ctx) -> Message:
engine = app_ctx
if not isinstance(engine, ClientEngineInternalSpec):
raise TypeError("engine must be ClientEngineInternalSpec, but got {}".format(type(engine)))
resource_manager = engine.get_component(SystemComponents.RESOURCE_MANAGER)
if not isinstance(resource_manager, ResourceManagerSpec):
raise RuntimeError(
f"resource_manager should be of type ResourceManagerSpec, but got {type(resource_manager)}."
)
with engine.new_context() as fl_ctx:
result = Shareable()
try:
# resource_spec = req.get_header(ShareableHeader.RESOURCE_SPEC)
resource_spec = fobs.loads(req.body)
token = req.get_header(ShareableHeader.RESOURCE_RESERVE_TOKEN)
resource_manager.cancel_resources(resource_requirement=resource_spec, token=token, fl_ctx=fl_ctx)
except Exception:
result.set_return_code(ReturnCode.EXECUTION_EXCEPTION)
return Message(topic="reply_" + req.topic, body=fobs.dumps(result))