Source code for nvflare.app_common.resource_managers.gpu_resource_manager

# Copyright (c) 2022, NVIDIA CORPORATION.  All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import Union

from nvflare.app_common.resource_managers.auto_clean_resource_manager import AutoCleanResourceManager
from nvflare.fuel.utils.gpu_utils import get_host_gpu_ids, get_host_gpu_memory_total


[docs] class GPUResource: def __init__(self, gpu_id: int, gpu_memory: Union[int, float]): self.id = gpu_id self.memory = gpu_memory
[docs] def to_dict(self): return {"gpu_id": self.id, "memory": self.memory}
[docs] class GPUResourceManager(AutoCleanResourceManager): def __init__( self, num_of_gpus: int, mem_per_gpu_in_GiB: Union[int, float], num_gpu_key: str = "num_of_gpus", gpu_mem_key: str = "mem_per_gpu_in_GiB", expiration_period: Union[int, float] = 30, ): """Resource manager for GPUs. Args: num_of_gpus: Number of GPUs. mem_per_gpu_in_GiB: Memory for each GPU. num_gpu_key: The key in resource requirements that specify the number of GPUs. gpu_mem_key: The key in resource requirements that specify the memory per GPU. expiration_period: Number of seconds to hold the resources reserved. If check_resources is called but after "expiration_period" no allocate resource is called, then the reserved resources will be released. """ if not isinstance(num_of_gpus, int): raise ValueError(f"num_of_gpus should be of type int, but got {type(num_of_gpus)}.") if num_of_gpus < 0: raise ValueError("num_of_gpus should be greater than or equal to 0.") if not isinstance(mem_per_gpu_in_GiB, (float, int)): raise ValueError(f"mem_per_gpu_in_GiB should be of type int or float, but got {type(mem_per_gpu_in_GiB)}.") if mem_per_gpu_in_GiB < 0: raise ValueError("mem_per_gpu_in_GiB should be greater than or equal to 0.") if not isinstance(expiration_period, (float, int)): raise ValueError(f"expiration_period should be of type int or float, but got {type(expiration_period)}.") if expiration_period < 0: raise ValueError("expiration_period should be greater than or equal to 0.") if num_of_gpus > 0: num_host_gpus = len(get_host_gpu_ids()) if num_of_gpus > num_host_gpus: raise ValueError(f"num_of_gpus specified ({num_of_gpus}) exceeds available GPUs: {num_host_gpus}.") host_gpu_mem = get_host_gpu_memory_total() for i in host_gpu_mem: if mem_per_gpu_in_GiB * 1024 > i: raise ValueError( f"Memory per GPU specified ({mem_per_gpu_in_GiB * 1024}) exceeds available GPU memory: {i}." ) self.num_gpu_key = num_gpu_key self.gpu_mem_key = gpu_mem_key resources = {i: GPUResource(gpu_id=i, gpu_memory=mem_per_gpu_in_GiB) for i in range(num_of_gpus)} super().__init__(resources=resources, expiration_period=expiration_period) def _deallocate(self, resources: dict): for k, v in resources.items(): self.resources[k].memory += v def _check_required_resource_available(self, resource_requirement: dict) -> bool: if not resource_requirement: return True if self.num_gpu_key not in resource_requirement: raise ValueError(f"resource_requirement is missing num_gpu_key {self.num_gpu_key}.") is_resource_enough = False num_gpu = resource_requirement[self.num_gpu_key] gpu_mem = resource_requirement.get(self.gpu_mem_key, 0) satisfied = 0 for k in self.resources: r: GPUResource = self.resources[k] if r.memory >= gpu_mem: satisfied += 1 if satisfied >= num_gpu: is_resource_enough = True break return is_resource_enough def _reserve_resource(self, resource_requirement: dict) -> dict: if not resource_requirement: return {} if self.num_gpu_key not in resource_requirement: raise ValueError(f"resource_requirement is missing num_gpu_key {self.num_gpu_key}.") reserved_resources = {} num_gpu = resource_requirement[self.num_gpu_key] gpu_mem = resource_requirement.get(self.gpu_mem_key, 0) reserved = 0 for k in self.resources: r: GPUResource = self.resources[k] if r.memory >= gpu_mem: r.memory -= gpu_mem reserved_resources[k] = gpu_mem reserved += 1 if reserved == num_gpu: break return reserved_resources def _resource_to_dict(self) -> dict: return { "resources": [self.resources[k].to_dict() for k in self.resources], "reserved_resources": self.reserved_resources, }