Source code for nvflare.fuel.sec.authz

# Copyright (c) 2021-2022, NVIDIA CORPORATION.  All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import time
from typing import List


def _validate_value(type_def: dict, value):
    value_type = type_def.get("type", "bool")
    if value_type == "bool":
        if value not in [True, False]:
            return False
    elif not isinstance(value, (int, float)):
        return False
    return True


[docs]def validate_policy_config(config: dict) -> str: """Validates that an authorization policy configuration has the right syntax. Args: config: configuration dictionary to validate Returns: empty string if there are no errors, else a string describing the error encountered """ if not isinstance(config, dict): return "policy definition must be a dict" # validate rules policy_rules = config.get("rules", None) if not policy_rules: return "missing rules in policy" if not isinstance(policy_rules, dict): return "rules must be dict" for rule_name, rule_def in policy_rules.items(): if not isinstance(rule_def, dict): return 'bad rule "{}": must be a dict'.format(rule_name) default_value = rule_def.get("default", None) if default_value is None: return 'bad rule "{}": missing default value'.format(rule_name) rule_type = rule_def.get("type", "bool") if rule_type not in ["bool", "int"]: return 'bad rule "{}": invalid type "{}"'.format(rule_name, rule_type) if not _validate_value(rule_def, default_value): return 'bad rule "{}": invalid default "{}"'.format(rule_name, default_value) # validate rights policy_rights = config.get("rights", None) if not policy_rights: return "missing rights in policy" if not isinstance(policy_rights, dict): return "rights must be dict" for right_name, right_def in policy_rights.items(): if not isinstance(right_def, dict): return 'bad right "{}": must be a dict'.format(right_name) precond = right_def.get("precond", None) if precond is not None and precond not in ["selfOrg"]: return 'bad right "{}": unknown precond "{}"'.format(right_name, precond) default_value = right_def.get("default", None) if default_value is None: return 'bad right "{}": missing default value'.format(right_name) right_type = right_def.get("type", "bool") if right_type not in ["bool", "int"]: return 'bad right "{}": invalid type "{}"'.format(right_name, right_type) if not _validate_value(right_def, default_value): return 'bad right "{}": invalid default "{}"'.format(right_name, default_value) # validate roles policy_roles = config.get("roles", {}) if policy_roles: if not isinstance(policy_roles, dict): return "roles must be dict" for role_name, role_desc in policy_roles.items(): if not isinstance(role_name, str): return 'bad role name "{}": must be str'.format(role_name) if not isinstance(role_desc, str): return 'bad role desc "{}": must be str'.format(role_desc) # validate groups policy_groups = config.get("groups", {}) if policy_groups: if not isinstance(policy_groups, dict): return "groups must be dict" for grp_name, grp_def in policy_groups.items(): if not isinstance(grp_def, dict): return 'bad group "{}": group def must be dict'.format(grp_name) rules = grp_def.get("rules", None) if rules: if not isinstance(rules, dict): return 'bad group "{}": rules must be dict'.format(grp_name) for rule_name, rule_value in rules.items(): rule_def = policy_rules.get(rule_name, None) if not rule_def: return 'bad group "{}": unknown rule "{}"'.format(grp_name, rule_name) if not _validate_value(rule_def, rule_value): return 'bad group "{}": invalid value "{}" for rule "{}"'.format( grp_name, rule_value, rule_name ) role_rights = grp_def.get("role_rights", None) if role_rights: if not isinstance(role_rights, dict): return 'bad group "{}": role_rights must be dict'.format(grp_name) for role_name, rights in role_rights.items(): if not isinstance(rights, dict): return 'bad group "{}": rights of role "{}" must be dict'.format(grp_name, role_name) for right_name, right_value in rights.items(): right_def = policy_rights.get(right_name, None) if not right_def: return 'bad group "{}": unknown right "{}" in role "{}" must be dict'.format( grp_name, right_name, role_name ) if not _validate_value(right_def, right_value): return 'bad group "{}": invalid value "{}" for right "{}" in role "{}" must be dict'.format( grp_name, right_value, right_name, role_name ) # validate orgs policy_orgs = config.get("orgs", None) if not policy_orgs: return "missing orgs in policy" if not isinstance(policy_orgs, dict): return "orgs must be a dict" for org_name, groups in policy_orgs.items(): if not isinstance(groups, list): return 'bad org "{}": groups must be a list'.format(org_name) if len(groups) <= 0: return 'bad org "{}": groups not defined'.format(org_name) for grp_name in groups: if not isinstance(grp_name, str): return 'bad org "{}": group name must be str'.format(org_name) grp_def = policy_groups.get(grp_name, None) if not grp_def: return 'bad org "{}": undefined group "{}"'.format(org_name, grp_name) # validate users policy_users = config.get("users", None) if not policy_users: return "missing users in policy" if not isinstance(policy_users, dict): return "users must be dict" for user_name, user_def in policy_users.items(): if not isinstance(user_def, dict): return 'bad user "{}": definition must be dict'.format(user_name) org_name = user_def.get("org", None) if not org_name: return 'bad user "{}": missing org'.format(user_name) org_def = policy_orgs.get(org_name) if not org_def: return 'bad user "{}": undefined org "{}"'.format(user_name, org_name) roles = user_def.get("roles", None) if not roles: return 'bad user "{}": missing roles'.format(user_name) if not isinstance(roles, list): return 'bad user "{}": roles must be list'.format(user_name) if len(roles) <= 0: return 'bad user "{}": no roles defined'.format(user_name) for role_name in roles: if not isinstance(role_name, str): return 'bad user "{}": role name must be str'.format(user_name) role_def = policy_roles.get(role_name, None) if not role_def: return 'bad user "{}": undefined role "{}" must be str'.format(user_name, role_name) # validate sites sites = config.get("sites", None) if not sites: return "missing sites in policy" if not isinstance(sites, dict): return "sites must be dict" for site_name, org_name in sites.items(): if org_name not in policy_orgs: return 'bad site "{}": undefined org "{}"'.format(site_name, org_name) return ""
def _group_rule_key(grp_name: str, rule_name: str): return grp_name + ":" + rule_name def _group_role_right_key(grp_name: str, role_name: str, right_name: str): return grp_name + ":" + role_name + ":" + right_name def _eval_bool(space: dict, keys: [str]): exit_value = None for k in keys: value = space.get(k, None) if value: return True if value is not None: exit_value = False return exit_value def _eval_int(space: dict, keys: [str]): exit_value = None for k in keys: value = space.get(k, None) if value is not None: if exit_value is None or exit_value < value: exit_value = value return exit_value
[docs]class Policy(object): def __init__(self, conf: dict): """The authorization policy definition. Authorization policy definition with methods to access information about the policy. Init creates the internal representation of the policy from a config dictionary. Policy evaluation result: For bool type of rules or rights: True - the rule is satisfied or the right is granted False - the rule is not satisfied; the right iis not granted None - the rule or right is not applicable (precondition not met) For int type or rules or rights: Number - the value of the evaluation None - the rule or right is not applicable (precondition not met) Args: conf (dict): the configuration dictionary with keys=groups, users, rights, rules, sites, orgs """ self.config = conf self.preconf_valuators = {"selfOrg": self._eval_precond_self_org} # compute the rule and right spaces self.rule_space = {} self.right_space = {} groups = conf["groups"] for grp_name, grp_def in groups.items(): rules = grp_def.get("rules", None) if rules: for rule_name, rule_value in rules.items(): key = _group_rule_key(grp_name, rule_name) self.rule_space[key] = rule_value role_rights = grp_def.get("role_rights", None) if role_rights: for role_name, rights in role_rights.items(): for right_name, right_value in rights.items(): key = _group_role_right_key(grp_name, role_name, right_name) self.right_space[key] = right_value
[docs] def get_config(self): return self.config
def _eval_precond(self, precond: str, user_name: str, org_name: str): evaluator = self.preconf_valuators.get(precond, None) if not evaluator: return None else: return evaluator(user_name, org_name) def _eval_precond_self_org(self, user_name: str, org_name: str): users = self.config["users"] user = users[user_name] return user["org"] == org_name def _get_org_groups(self, org_name: str): orgs = self.config["orgs"] return orgs.get(org_name, None)
[docs] def evaluate_rule_on_org(self, rule_name: str, org_name: str): rules = self.config["rules"] rule_def = rules.get(rule_name, None) if not rule_def: return None, 'undefined rule "{}"'.format(rule_name) rule_type = rule_def.get("type", "bool") groups = self._get_org_groups(org_name) if not groups: return None, 'unknown org "{}"'.format(org_name) keys = [] for grp_name in groups: keys.append(_group_rule_key(grp_name, rule_name)) if rule_type == "bool": result = _eval_bool(space=self.rule_space, keys=keys) else: result = _eval_bool(space=self.rule_space, keys=keys) if result is None: result = rule_def["default"] return result, ""
def _get_org_of_site(self, site_name): sites = self.config["sites"] return sites.get(site_name, None)
[docs] def evaluate_rule_on_site(self, rule_name: str, site_name: str): org_name = self._get_org_of_site(site_name) if not org_name: return None, 'unknown site "{}"'.format(site_name) return self.evaluate_rule_on_org(rule_name, org_name)
[docs] def evaluate_user_right_on_org(self, right_name: str, user_name: str, org_name: str): rights = self.config["rights"] right_def = rights.get(right_name, None) if not right_def: return None, 'undefined right "{}"'.format(right_name) right_type = right_def.get("type", "bool") groups = self._get_org_groups(org_name) if not groups: return None, 'unknown org "{}"'.format(org_name) users = self.config["users"] user = users.get(user_name, None) if not user: return None, 'unknown user "{}"'.format(user_name) precond = right_def.get("precond", None) if precond: matched = self._eval_precond(precond, user_name, org_name) if not matched: if right_type == "bool": return False, "" else: return 0, "" roles = user["roles"] keys = [] for grp_name in groups: for role_name in roles: keys.append(_group_role_right_key(grp_name, role_name, right_name)) if right_type == "bool": result = _eval_bool(self.right_space, keys) else: result = _eval_int(self.right_space, keys) if result is None: result = right_def["default"] return result, ""
[docs] def evaluate_user_right_on_site(self, right_name: str, user_name: str, site_name: str): org_name = self._get_org_of_site(site_name) if not org_name: return None, 'unknown site "{}"'.format(site_name) return self.evaluate_user_right_on_org(right_name, user_name, org_name)
[docs] def get_user(self, user_name: str): users = self.config["users"] return users.get(user_name, None)
[docs] def get_users(self): return self.config["users"]
[docs] def get_sites(self): return self.config["sites"]
[docs] def get_rights(self): return self.config["rights"]
[docs] def get_rules(self): return self.config["rules"]
[docs] def get_right_type(self, right_name: str): rights = self.config["rights"] right_def = rights.get(right_name, None) if not right_def: return None return right_def.get("type", "bool")
[docs]class AuthzContext(object): def __init__(self, user_name: str, site_names: List[str]): """Base class to contain context data for authorization. Args: user_name (str): user name to be checked site_names (List[str]): site names to be checked against """ self.user_name = user_name self.site_names = site_names self.attrs = {}
[docs] def set_attr(self, key: str, value): self.attrs[key] = value
[docs] def get_attr(self, key: str, default=None): return self.attrs.get(key, default)
[docs]class Authorizer(object): def __init__(self): """Base class containing the authorization policy.""" self.policy = None self.last_load_time = None
[docs] def get_policy(self) -> Policy: return self.policy
[docs] def authorize(self, ctx: AuthzContext) -> (object, str): return True, ""
[docs] def evaluate_user_right_on_site(self, right_name: str, user_name: str, site_name: str): if not self.policy: return None, "policy not defined" return self.policy.evaluate_user_right_on_site(right_name=right_name, user_name=user_name, site_name=site_name)
[docs] def evaluate_rule_on_site(self, rule_name: str, site_name: str): if not self.policy: return None, "policy not defined" return self.policy.evaluate_rule_on_site(rule_name=rule_name, site_name=site_name)
[docs] def load_policy(self, policy_config: dict) -> str: err = validate_policy_config(policy_config) if err: return err self.policy = Policy(policy_config) self.last_load_time = time.time() return ""