Source code for nvflare.fuel.hci.server.authz

# Copyright (c) 2021-2022, NVIDIA CORPORATION.  All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import json
import os
from typing import List

from nvflare.fuel.hci.conn import Connection
from nvflare.fuel.hci.reg import CommandEntry, CommandModule, CommandModuleSpec, CommandSpec
from nvflare.fuel.sec.authz import Authorizer, AuthzContext
from nvflare.fuel.sec.security_content_service import LoadResult, SecurityContentService
from nvflare.fuel.utils.time_utils import time_to_string

from .constants import ConnProps
from .reg import CommandFilter

AUTHORIZATION_POLICY_FILE = "authorization.json"


[docs]class AuthorizationService(object): the_authorizer = None
[docs] @staticmethod def initialize(authorizer: Authorizer, policy_file: str = AUTHORIZATION_POLICY_FILE) -> (Authorizer, str): assert isinstance(authorizer, Authorizer), "authorizer must be Authorizer but got {}".format(type(authorizer)) if not AuthorizationService.the_authorizer: # get secure content of the policy file policy_conf, result = SecurityContentService.load_json(policy_file) if result == LoadResult.NOT_MANAGED or result == LoadResult.NO_SUCH_CONTENT: # no authorization needed AuthorizationService.the_authorizer = authorizer elif result == LoadResult.OK: err = authorizer.load_policy(policy_conf) if err: return None, err AuthorizationService.the_authorizer = authorizer else: return None, "invalid policy file {}: {}".format(policy_file, result) return AuthorizationService.the_authorizer, ""
[docs] @staticmethod def initialize_with_policy(authorizer: Authorizer, policy_file_path: str) -> (Authorizer, str): assert isinstance(authorizer, Authorizer), "authorizer must be Authorizer but got {}".format(type(authorizer)) if AuthorizationService.the_authorizer: return AuthorizationService.the_authorizer if not os.path.exists(policy_file_path): return None, 'policy file "{}" does not exist'.format(policy_file_path) with open(policy_file_path) as file: try: policy_conf = json.load(file) except json.JSONDecodeError: return None, 'policy file "{}" is invalid'.format(policy_file_path) err = authorizer.load_policy(policy_conf) if err: return None, err AuthorizationService.the_authorizer = authorizer return AuthorizationService.the_authorizer, ""
[docs] @staticmethod def get_authorizer(): return AuthorizationService.the_authorizer
[docs] @staticmethod def authorize(ctx: AuthzContext): if not AuthorizationService.the_authorizer: return None, "no authorizer defined" return AuthorizationService.the_authorizer.authorize(ctx)
[docs]class AuthzFilter(CommandFilter): def __init__(self, authorizer: Authorizer): """Filter for authorization of admin commands. Args: authorizer: instance of Authorizer """ CommandFilter.__init__(self) assert isinstance(authorizer, Authorizer), "authorizer must be Authorizer but got {}".format(type(authorizer)) self.authorizer = authorizer
[docs] def pre_command(self, conn: Connection, args: List[str]): cmd_entry = conn.get_prop(ConnProps.CMD_ENTRY, None) if not cmd_entry: return True assert isinstance(cmd_entry, CommandEntry) authz_func = cmd_entry.authz_func if not authz_func: return True valid, authz_ctx = authz_func(conn, args) if not valid: return False if not authz_ctx or (isinstance(authz_ctx, tuple) and not any(authz_ctx)): # no authz needed return True if isinstance(authz_ctx, tuple): for authz in authz_ctx: result = self.check_authz(authz, conn) if not result: return False return True else: return self.check_authz(authz_ctx, conn)
[docs] def check_authz(self, authz_ctx, conn: Connection): assert isinstance(authz_ctx, AuthzContext), "authz_ctx must be AuthzContext but got {}".format(type(authz_ctx)) authz_ctx.user_name = conn.get_prop(ConnProps.USER_NAME, "") conn.set_prop(ConnProps.AUTHZ_CTX, authz_ctx) authorized, err = self.authorizer.authorize(ctx=authz_ctx) if err: conn.append_error("Authorization Error: {}".format(err)) return False if not authorized: conn.append_error("This action is not authorized") return False return True
[docs]class AuthzCommandModule(CommandModule): def __init__(self, authorizer: Authorizer): """Authorization command module. Args: authorizer: instance of Authorizer """ assert isinstance(authorizer, Authorizer), "authorizer must be Authorizer but got {}".format(type(authorizer)) self.authorizer = authorizer
[docs] def get_spec(self): return CommandModuleSpec( name="authz", cmd_specs=[ CommandSpec( name="show_info", description="show general info of authorization policy", usage="show_info", handler_func=self.show_info, authz_func=self._authorize_cmd, visible=True, ), CommandSpec( name="show_users", description="show users configured for authorization", usage="show_users", handler_func=self.show_users, authz_func=self._authorize_cmd, visible=True, ), CommandSpec( name="show_sites", description="show sites configured for authorization", usage="show_sites", handler_func=self.show_sites, authz_func=self._authorize_cmd, visible=True, ), CommandSpec( name="show_rules", description="show rules configured for authorization", usage="show_rules", handler_func=self.show_rules, authz_func=self._authorize_cmd, visible=True, ), CommandSpec( name="show_rights", description="show rights configured for authorization", usage="show_rights", handler_func=self.show_rights, authz_func=self._authorize_cmd, visible=True, ), CommandSpec( name="show_config", description="show authorization config", usage="show_config", handler_func=self.show_config, authz_func=self._authorize_cmd, visible=True, ), CommandSpec( name="eval_right", description="check if a user has a right on a site", usage="eval_right user_name right_name [site_name...]", handler_func=self.eval_right, authz_func=self._authorize_cmd, visible=True, ), CommandSpec( name="eval_rule", description="evaluate a site against a rule", usage="eval_rule site_name rule_name", handler_func=self.eval_rule, authz_func=self._authorize_cmd, visible=True, ), ], )
def _authorize_cmd(self, conn: Connection, args: List[str]): # Use this method to pre-process the command, # mainly to get authorizer and policy and store them in conn # so the command handler can get them from the conn. # This way, command handlers don't need to know how to get authorizer or policy. # In case we use different way to get authorizer, this is the only place to change! policy = self.authorizer.get_policy() if not policy: conn.append_error("no authorization policy defined") return False, None conn.set_prop("authorizer", self.authorizer) conn.set_prop("policy", policy) # return a None authz_ctx because the command does not need to be authorized! return True, None
[docs] def show_info(self, conn: Connection, args: List[str]): authorizer = conn.get_prop("authorizer") conn.append_string("Last Loaded: {}".format(time_to_string(authorizer.last_load_time)))
[docs] def show_users(self, conn: Connection, args: List[str]): policy = conn.get_prop("policy") users = policy.get_users() table = conn.append_table(["user", "org", "roles"]) for user_name, user_def in users.items(): table.add_row([user_name, user_def["org"], ",".join(user_def["roles"])])
[docs] def show_sites(self, conn: Connection, args: List[str]): policy = conn.get_prop("policy") if not policy: conn.append_error("no authorization policy") return sites = policy.get_sites() table = conn.append_table(["site", "org"]) for site_name, org in sites.items(): table.add_row([site_name, org])
[docs] def show_rights(self, conn: Connection, args: List[str]): policy = conn.get_prop("policy") if not policy: conn.append_error("no authorization policy") return rights = policy.get_rights() table = conn.append_table(["name", "description", "default"]) for name, right_def in rights.items(): desc = right_def.get("desc", "") default = right_def.get("default", "") table.add_row([name, desc, "{}".format(default)])
[docs] def show_rules(self, conn: Connection, args: List[str]): policy = conn.get_prop("policy") if not policy: conn.append_error("no authorization policy") return rules = policy.get_rules() table = conn.append_table(["name", "description", "default"]) for name, rule_def in rules.items(): desc = rule_def.get("desc", "") default = rule_def.get("default", "") table.add_row([name, desc, "{}".format(default)])
[docs] def show_config(self, conn: Connection, args: List[str]): policy = conn.get_prop("policy") config = policy.get_config() conn.append_string(json.dumps(config, indent=1))
[docs] def eval_right(self, conn: Connection, args: List[str]): if len(args) < 3: conn.append_error("Usage: {} user_name right_name [site_name...]".format(args[0])) return authorizer = conn.get_prop("authorizer") user_name = args[1] right_name = args[2] site_names = args[3:] if not site_names: # all sites policy = conn.get_prop("policy") if not policy: conn.append_error("no authorization policy") return sites = policy.get_sites() for s, _ in sites.items(): site_names.append(s) table = conn.append_table(["Site", "Result"]) for s in site_names: result, err = authorizer.evaluate_user_right_on_site( user_name=user_name, site_name=s, right_name=right_name ) if err: result = err else: result = str(result) table.add_row([s, result])
[docs] def eval_rule(self, conn: Connection, args: List[str]): if len(args) != 3: conn.append_error("Usage: {} site_name rule_name".format(args[0])) return authorizer = conn.get_prop("authorizer") site_name = args[1] rule_name = args[2] result, err = authorizer.evaluate_rule_on_site(site_name=site_name, rule_name=rule_name) if err: conn.append_error(err) else: conn.append_string("{}".format(result))