Source code for nvflare.app_common.resource_managers.gpu_resource_manager

# Copyright (c) 2022, NVIDIA CORPORATION.  All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import os
from typing import Optional, Union

from nvflare.app_common.resource_managers.auto_clean_resource_manager import AutoCleanResourceManager
from nvflare.fuel.utils.gpu_utils import get_host_gpu_ids, get_host_gpu_memory_total

CUDA_VISIBLE_DEVICES = "CUDA_VISIBLE_DEVICES"


def _get_cuda_visible_device_indices(host_gpu_ids: list[int]) -> Optional[list[int]]:
    """Gets visible nvidia-smi GPU indices from CUDA_VISIBLE_DEVICES.

    NVIDIA CUDA also supports GPU UUID and MIG identifiers in CUDA_VISIBLE_DEVICES. Those are not interpreted here
    because GPUResourceManager uses nvidia-smi integer GPU indices as resource IDs, and POC -gpu sets integer IDs.
    """
    value = os.environ.get(CUDA_VISIBLE_DEVICES)
    if value is None:
        return None

    value = value.strip()
    if not value:
        return []

    gpu_ids = []
    seen = set()
    for item in value.split(","):
        item = item.strip()
        if not item:
            break
        try:
            gpu_id = int(item)
        except ValueError:
            return None
        if gpu_id not in host_gpu_ids:
            break
        if gpu_id not in seen:
            gpu_ids.append(gpu_id)
            seen.add(gpu_id)
    return gpu_ids


def _get_managed_host_gpu_ids(host_gpu_ids: list[int]) -> list[int]:
    visible_gpu_ids = _get_cuda_visible_device_indices(host_gpu_ids)
    if visible_gpu_ids is None:
        return host_gpu_ids

    return visible_gpu_ids


[docs] class GPUResource: def __init__(self, gpu_id: int, gpu_memory: Union[int, float]): self.id = gpu_id self.memory = gpu_memory
[docs] def to_dict(self): return {"gpu_id": self.id, "memory": self.memory}
[docs] class GPUResourceManager(AutoCleanResourceManager): def __init__( self, num_of_gpus: int, mem_per_gpu_in_GiB: Union[int, float], num_gpu_key: str = "num_of_gpus", gpu_mem_key: str = "mem_per_gpu_in_GiB", expiration_period: Union[int, float] = 30, ignore_host: bool = False, ): """Resource manager for GPUs. Args: num_of_gpus: Number of GPUs. mem_per_gpu_in_GiB: Memory for each GPU. num_gpu_key: The key in resource requirements that specify the number of GPUs. gpu_mem_key: The key in resource requirements that specify the memory per GPU. expiration_period: Number of seconds to hold the resources reserved. If check_resources is called but after "expiration_period" no allocate resource is called, then the reserved resources will be released. ignore_host: Whether to skip validation against GPUs present on the local host. Set to True in environments where the NVFlare process runs on a node without GPUs (for example, some Kubernetes deployments) but GPU resources are managed externally. """ if not isinstance(num_of_gpus, int): raise ValueError(f"num_of_gpus should be of type int, but got {type(num_of_gpus)}.") if num_of_gpus < 0: raise ValueError("num_of_gpus should be greater than or equal to 0.") if not isinstance(mem_per_gpu_in_GiB, (float, int)): raise ValueError(f"mem_per_gpu_in_GiB should be of type int or float, but got {type(mem_per_gpu_in_GiB)}.") if mem_per_gpu_in_GiB < 0: raise ValueError("mem_per_gpu_in_GiB should be greater than or equal to 0.") if not isinstance(expiration_period, (float, int)): raise ValueError(f"expiration_period should be of type int or float, but got {type(expiration_period)}.") if expiration_period < 0: raise ValueError("expiration_period should be greater than or equal to 0.") if not isinstance(ignore_host, bool): raise ValueError(f"ignore_host should be of type bool, but got {type(ignore_host)}.") resource_gpu_ids = list(range(num_of_gpus)) if not ignore_host: if num_of_gpus > 0: host_gpu_ids = get_host_gpu_ids() managed_host_gpu_ids = _get_managed_host_gpu_ids(host_gpu_ids) num_host_gpus = len(managed_host_gpu_ids) if num_of_gpus > num_host_gpus: raise ValueError(f"num_of_gpus specified ({num_of_gpus}) exceeds available GPUs: {num_host_gpus}.") host_gpu_mem = get_host_gpu_memory_total() host_gpu_mem_by_id = { gpu_id: host_gpu_mem[index] for index, gpu_id in enumerate(host_gpu_ids) if index < len(host_gpu_mem) } resource_gpu_ids = managed_host_gpu_ids[:num_of_gpus] for gpu_id in resource_gpu_ids: try: available_gpu_mem = host_gpu_mem_by_id[gpu_id] except KeyError as e: raise RuntimeError(f"Failed to determine GPU memory for GPU ID {gpu_id}.") from e if mem_per_gpu_in_GiB * 1024 > available_gpu_mem: raise ValueError( f"Memory per GPU specified ({mem_per_gpu_in_GiB * 1024}) exceeds available GPU memory " f"for GPU ID {gpu_id}: {available_gpu_mem}." ) self.num_gpu_key = num_gpu_key self.gpu_mem_key = gpu_mem_key resources = {i: GPUResource(gpu_id=i, gpu_memory=mem_per_gpu_in_GiB) for i in resource_gpu_ids} super().__init__(resources=resources, expiration_period=expiration_period) def _deallocate(self, resources: dict): for k, v in resources.items(): self.resources[k].memory += v def _check_required_resource_available(self, resource_requirement: dict) -> bool: if not resource_requirement: return True if self.num_gpu_key not in resource_requirement: raise ValueError(f"resource_requirement is missing num_gpu_key {self.num_gpu_key}.") is_resource_enough = False num_gpu = resource_requirement[self.num_gpu_key] gpu_mem = resource_requirement.get(self.gpu_mem_key, 0) satisfied = 0 for k in self.resources: r: GPUResource = self.resources[k] if r.memory >= gpu_mem: satisfied += 1 if satisfied >= num_gpu: is_resource_enough = True break return is_resource_enough def _reserve_resource(self, resource_requirement: dict) -> dict: if not resource_requirement: return {} if self.num_gpu_key not in resource_requirement: raise ValueError(f"resource_requirement is missing num_gpu_key {self.num_gpu_key}.") reserved_resources = {} num_gpu = resource_requirement[self.num_gpu_key] gpu_mem = resource_requirement.get(self.gpu_mem_key, 0) reserved = 0 for k in self.resources: r: GPUResource = self.resources[k] if r.memory >= gpu_mem: r.memory -= gpu_mem reserved_resources[k] = gpu_mem reserved += 1 if reserved == num_gpu: break return reserved_resources def _resource_to_dict(self) -> dict: return { "resources": [self.resources[k].to_dict() for k in self.resources], "reserved_resources": self.reserved_resources, }