Source code for nvflare.app_common.ccwf.eval_gen

# Copyright (c) 2025, NVIDIA CORPORATION.  All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from enum import Enum
from typing import List

from nvflare.fuel.utils.validation_utils import check_non_negative_int


[docs] class EvalInclusionRC(Enum): CAN_INCLUDE = 0 EVALUATOR_CONFLICT = 1 ENOUGH_ACTIONS_FOR_EVALUATEE = 2 ENOUGH_ACTIONS_FOR_EVALUATOR = 3
def _check_names(arg_name, names_to_check): if not names_to_check: raise ValueError(f"no {arg_name}") if not isinstance(names_to_check, list): raise ValueError(f"expect {arg_name} to be a list of str but got {type(names_to_check)}") if not all(isinstance(e, str) for e in names_to_check): raise ValueError(f"expect {arg_name} to be a list of str but some items are not str")
[docs] def parallel_eval_generator(evaluators: List[str], evaluatees: List[str], max_parallel_actions: int): """Generates parallel evaluations to be performed. Args: evaluators: names of evaluators evaluatees: names of evaluatees max_parallel_actions: max parallel actions per site (evaluator or evaluatee) Each time iterated, it generates a list of evaluations that can be performed in parallel. An evaluation is expressed as a tuple of (evaluator name, evaluatee name). """ _check_names("evaluators", evaluators) _check_names("evaluatees", evaluatees) check_non_negative_int("max_parallel_actions", max_parallel_actions) evaluatee_states = [(e, list(evaluators)) for e in evaluatees] while evaluatee_states: result = [] empty_evaluatees = [] for ee in evaluatee_states: e, evaluators = ee accepted_evaluators = [] for t in evaluators: target = (t, e) rc = _can_be_included(result, target, max_parallel_actions) if rc == EvalInclusionRC.CAN_INCLUDE: result.append(target) accepted_evaluators.append(t) elif rc == EvalInclusionRC.ENOUGH_ACTIONS_FOR_EVALUATEE: # no need to try other evaluators with this evaluatee break else: # this evaluator cannot be included into the result either because its inclusion # will conflict with another eval in the result, or because it already has enough actions. # we'll try next evaluator. continue if accepted_evaluators: for t in accepted_evaluators: evaluators.remove(t) if not evaluators: empty_evaluatees.append(ee) for ee in empty_evaluatees: evaluatee_states.remove(ee) yield result
def _can_be_included(evals, target, max_parallel_actions) -> EvalInclusionRC: """Determine whether the target evaluation can be included into the set of evals without violating parallel evaluation rules. Args: evals: the set of evaluations already included target: the evaluation in question, expressed as a tuple (evaluator name, evaluatee name) max_parallel_actions: max parallel actions allowed per actor (evaluator or evaluatee). Returns: an EvalInclusionRC """ evaluator_actions = 0 evaluatee_actions = 0 evaluator_t, evaluatee_t = target for p in evals: evaluator_p, evaluatee_p = p if evaluator_t == evaluator_p: # the evaluator is already in the evals - we allow only once for the same evaluator return EvalInclusionRC.EVALUATOR_CONFLICT if evaluator_t == evaluatee_p: # the evaluator of the target is already an evaluatee of another evaluation evaluator_actions += 1 if evaluatee_t == evaluator_p: # the evaluatee of the target is already an evaluator of another evaluation evaluatee_actions += 1 if evaluatee_t == evaluatee_p and evaluator_p != evaluatee_p: # the evaluatee of the target is already an evaluatee of another evaluation evaluatee_actions += 1 if evaluatee_actions > max_parallel_actions: # if the target is included, its evaluatee_actions would be too much return EvalInclusionRC.ENOUGH_ACTIONS_FOR_EVALUATEE if evaluator_actions > max_parallel_actions: # if the target is included, its evaluator_actions would be too much return EvalInclusionRC.ENOUGH_ACTIONS_FOR_EVALUATOR # the target can be included! return EvalInclusionRC.CAN_INCLUDE