Source code for nvflare.fuel.sec.authz

# Copyright (c) 2021, NVIDIA CORPORATION.  All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import time
from abc import ABC, abstractmethod
from enum import Enum

_KEY_PERMISSIONS = "permissions"
_KEY_FORMAT_VERSION = "format_version"
_TARGET_SITE = "site"
_TARGET_SUBMITTER = "submitter"
_ANY_RIGHT = "*"


[docs]class FieldNames(str, Enum): USER_NAME = "User name" USER_ORG = "User org" USER_ROLE = "User role" EXP = "Expression" TARGET_TYPE = "Target type" TARGET_VALUE = "Target value" SITE_ORG = "Site org" ROLE_NAME = "Role name" RIGHT = "Right" CATEGORY_RIGHT = "Right for Category"
[docs]class Person(object): def __init__(self, name: str, org: str, role: str): self.name = _normalize_str(name, FieldNames.USER_NAME) self.org = _normalize_str(org, FieldNames.USER_ORG) self.role = _normalize_str(role, FieldNames.USER_ROLE) def __str__(self): name = self.name if self.name else "None" org = self.org if self.org else "None" role = self.role if self.role else "None" if (not name) and (not org) and (not role): return "None" else: return f"{name}:{org}:{role}"
[docs]class AuthzContext(object): def __init__(self, right: str, user: Person, submitter: Person = None): """Base class to contain context data for authorization.""" if not isinstance(user, Person): raise ValueError(f"user needs to be of type Person but got {type(user)}") if submitter and not isinstance(submitter, Person): raise ValueError(f"submitter needs to be of type Person but got {type(submitter)}") self.right = right self.user = user self.submitter = submitter self.attrs = {} if submitter is None: self.submitter = Person("", "", "")
[docs] def set_attr(self, key: str, value): self.attrs[key] = value
[docs] def get_attr(self, key: str, default=None): return self.attrs.get(key, default)
[docs]class ConditionEvaluator(ABC):
[docs] @abstractmethod def evaluate(self, site_org: str, ctx: AuthzContext) -> bool: pass
[docs]class UserOrgEvaluator(ConditionEvaluator): def __init__(self, target): self.target = target
[docs] def evaluate(self, site_org: str, ctx: AuthzContext): if self.target == _TARGET_SITE: return ctx.user.org == site_org elif self.target == _TARGET_SUBMITTER: return ctx.user.org == ctx.submitter.org else: return ctx.user.org == self.target
[docs]class UserNameEvaluator(ConditionEvaluator): def __init__(self, target: str): self.target = target
[docs] def evaluate(self, site_org: str, ctx: AuthzContext): if self.target == _TARGET_SUBMITTER: return ctx.user.name == ctx.submitter.name else: return ctx.user.name == self.target
[docs]class TrueEvaluator(ConditionEvaluator):
[docs] def evaluate(self, site_org: str, ctx: AuthzContext) -> bool: return True
[docs]class FalseEvaluator(ConditionEvaluator):
[docs] def evaluate(self, site_org: str, ctx: AuthzContext) -> bool: return False
class _RoleRightConditions(object): def __init__(self): self.allowed_conditions = [] self.blocked_conditions = [] self.exp = None def _any_condition_matched(self, conds: [ConditionEvaluator], site_org: str, ctx: AuthzContext): # if any condition is met, return True # only when all conditions fail to match, return False for e in conds: matched = e.evaluate(site_org, ctx) if matched: return True return False def evaluate(self, site_org: str, ctx: AuthzContext): # first evaluate blocked list if self.blocked_conditions: if self._any_condition_matched(self.blocked_conditions, site_org, ctx): # if any block condition is met, return False return False # evaluate allowed list if self.allowed_conditions: if self._any_condition_matched(self.allowed_conditions, site_org, ctx): return True else: # all allowed conditions failed return False # no allowed list specified - only blocked list specified # we got here since no blocked condition matched return True def _parse_one_expression(self, exp) -> str: v = _normalize_str(exp, FieldNames.EXP) blocked = False parts = v.split() if len(parts) == 2 and parts[0] == "not": blocked = True v = parts[1] if v in ["all", "any"]: ev = TrueEvaluator() elif v in ["none", "no"]: ev = FalseEvaluator() else: parts = v.split(":") if len(parts) == 2: target_type = _normalize_str(parts[0], FieldNames.TARGET_TYPE) target_value = _normalize_str(parts[1], FieldNames.TARGET_VALUE) if target_type in ["o", "org"]: ev = UserOrgEvaluator(target_value) elif target_type in ["n", "name"]: ev = UserNameEvaluator(target_value) else: return f'bad condition expression "{exp}": invalid type "{target_type}"' else: return f'bad condition expression "{exp}"' if blocked: self.blocked_conditions.append(ev) else: self.allowed_conditions.append(ev) return "" def parse_expression(self, exp): """Parses the value expression into a list of condition(s). Args: exp: expression to be parsed Returns: An error string if value is invalid. """ self.exp = exp if isinstance(exp, str): return self._parse_one_expression(exp) if isinstance(exp, list): # we expect the list contains str only if not exp: # empty list return "bad condition expression - no conditions specified" for ex in exp: err = self._parse_one_expression(ex) if err: # this is an error return err else: return f"bad condition expression type - expect str or list but got {type(exp)}" return ""
[docs]class Policy(object): def __init__(self, config: dict, role_right_map: dict, roles: list, rights: list, role_rights: dict): self.config = config self.role_right_map = role_right_map self.roles = roles self.rights = rights self.roles.sort() self.rights.sort() self.role_rights = role_rights
[docs] def get_rights(self): return self.rights
[docs] def get_roles(self): return self.roles
def _eval_for_role(self, role: str, site_org: str, ctx: AuthzContext): conds = self.role_right_map.get(_role_right_key(role, _ANY_RIGHT)) if not conds: conds = self.role_right_map.get(_role_right_key(role, ctx.right)) if not conds: return False return conds.evaluate(site_org, ctx)
[docs] def evaluate(self, site_org: str, ctx: AuthzContext) -> (bool, str): """ Args: site_org: ctx: Returns: A tuple of (result, error) """ site_org = _normalize_str(site_org, FieldNames.SITE_ORG) permitted = self._eval_for_role(role=ctx.user.role, site_org=site_org, ctx=ctx) if permitted: # permitted if any role is okay return True, "" return False, ""
def _normalize_str(s: str, field_name: FieldNames) -> str: if not isinstance(s, str): raise TypeError(f"{field_name.value} must be a str but got {type(s)}") return " ".join(s.lower().split()) def _role_right_key(role_name: str, right_name: str): return role_name + ":" + right_name def _add_role_right_conds(role, right, conds, rr_map: dict, rights, right_conds): right_conds[right] = conds.exp rr_map[_role_right_key(role, right)] = conds if right not in rights: rights.append(right)
[docs]def parse_policy_config(config: dict, right_categories: dict): """Validates that an authorization policy configuration has the right syntax. Args: config: configuration dictionary to validate right_categories: a dict of right => category mapping Returns: a Policy object if no error, a string describing the error encountered """ if not isinstance(config, dict): return None, f"policy definition must be a dict but got {type(config)}" if not config: # empty policy return None, "policy definition is empty" role_right_map = {} role_rights = {} roles = [] rights = [] # Compute category => right list cat_to_rights = {} if right_categories: for r, c in right_categories.items(): right_list = cat_to_rights.get(c) if not right_list: right_list = [] right_list.append(r) cat_to_rights[c] = right_list # check version format_version = config.get(_KEY_FORMAT_VERSION) if not format_version or format_version != "1.0": return None, "missing or invalid policy format_version: must be 1.0" permissions = config.get(_KEY_PERMISSIONS) if not permissions: return None, "missing permissions" if not isinstance(permissions, dict): return None, f"invalid permissions: expect a dict but got {type(permissions)}" # permissions is a dict of role => rights; for role_name, right_conf in permissions.items(): if not isinstance(role_name, str): return None, f"bad role name: expect a str but got {type(role_name)}" role_name = _normalize_str(role_name, FieldNames.ROLE_NAME) roles.append(role_name) right_conds = {} # rights of this role role_rights[role_name] = right_conds if isinstance(right_conf, str) or isinstance(right_conf, list): conds = _RoleRightConditions() err = conds.parse_expression(right_conf) if err: return None, err _add_role_right_conds(role_name, _ANY_RIGHT, conds, role_right_map, rights, right_conds) continue if not isinstance(right_conf, dict): return None, f"bad right config: expect a dict but got {type(right_conf)}" # process right categories for right, exp in right_conf.items(): if not isinstance(right, str): return None, f"bad right name: expect a str but got {type(right)}" right = _normalize_str(right, FieldNames.CATEGORY_RIGHT) # see whether this is a right category right_list = cat_to_rights.get(right) if not right_list: # this is a regular right - skip it continue conds = _RoleRightConditions() err = conds.parse_expression(exp) if err: return None, err # all rights in the category share the same conditions _add_role_right_conds(role_name, right, conds, role_right_map, rights, right_conds) for r in right_list: _add_role_right_conds(role_name, r, conds, role_right_map, rights, right_conds) # process regular rights, which may override the rights from categories for right, exp in right_conf.items(): right = _normalize_str(right, FieldNames.RIGHT) # see whether this is a right category right_list = cat_to_rights.get(right) if right_list: # this is category - already processed continue conds = _RoleRightConditions() err = conds.parse_expression(exp) if err: return None, err # this may cause the same right to be overwritten in the map _add_role_right_conds(role_name, right, conds, role_right_map, rights, right_conds) return Policy(config=config, role_right_map=role_right_map, role_rights=role_rights, roles=roles, rights=rights), ""
[docs]class Authorizer(object): def __init__(self, site_org: str, right_categories: dict = None): """Base class containing the authorization policy.""" self.site_org = _normalize_str(site_org, FieldNames.SITE_ORG) self.right_categories = right_categories self.policy = None self.last_load_time = None
[docs] def get_policy(self) -> Policy: return self.policy
[docs] def authorize(self, ctx: AuthzContext) -> (bool, str): if not ctx: return True, "" if not isinstance(ctx, AuthzContext): return False, f"ctx must be AuthzContext but got {type(ctx)}" if "super" == ctx.user.role: # use this for testing purpose return True, "" authorized, err = self.evaluate(ctx) if not authorized: if err: return False, err else: return ( False, f"user '{ctx.user.name}' is not authorized for '{ctx.right}'", ) return True, ""
[docs] def evaluate(self, ctx: AuthzContext) -> (bool, str): if not self.policy: return False, "policy not defined" return self.policy.evaluate(ctx=ctx, site_org=self.site_org)
[docs] def load_policy(self, policy_config: dict) -> str: policy, err = parse_policy_config(policy_config, self.right_categories) if err: return err self.policy = policy self.last_load_time = time.time() return ""
[docs]class AuthorizationService(object): the_authorizer = None
[docs] @staticmethod def initialize(authorizer: Authorizer) -> (Authorizer, str): if not isinstance(authorizer, Authorizer): raise ValueError(f"authorizer must be Authorizer but got {type(authorizer)}") if not AuthorizationService.the_authorizer: # authorizer is not loaded AuthorizationService.the_authorizer = authorizer return AuthorizationService.the_authorizer, ""
[docs] @staticmethod def get_authorizer(): return AuthorizationService.the_authorizer
[docs] @staticmethod def authorize(ctx: AuthzContext): if not AuthorizationService.the_authorizer: # no authorizer - assume that authorization is not required return True, "" return AuthorizationService.the_authorizer.authorize(ctx)