Source code for nvflare.fuel.hci.server.authz

# Copyright (c) 2021, NVIDIA CORPORATION.  All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import enum
import logging
from typing import List

from nvflare.fuel.hci.conn import Connection
from nvflare.fuel.hci.proto import MetaStatusValue, make_meta
from nvflare.fuel.hci.reg import CommandEntry
from nvflare.fuel.sec.authz import AuthorizationService, AuthzContext, Person

from .constants import ConnProps
from .reg import CommandFilter

log = logging.getLogger(__name__)


[docs]class PreAuthzReturnCode(enum.Enum): OK = 0 # command preprocessed successfully, and no authz needed ERROR = 1 # error occurred in command processing REQUIRE_AUTHZ = 2 # command preprocessed successfully, further authz required
[docs]def command_handler_func_signature(conn: Connection, args: List[str]): pass
[docs]def command_authz_func_signature(conn: Connection, args: List[str]) -> PreAuthzReturnCode: pass
[docs]class AuthzFilter(CommandFilter): def __init__(self): """Filter for authorization of admin commands.""" CommandFilter.__init__(self)
[docs] def pre_command(self, conn: Connection, args: List[str]): cmd_entry = conn.get_prop(ConnProps.CMD_ENTRY, None) if not cmd_entry: return True assert isinstance(cmd_entry, CommandEntry) authz_func = cmd_entry.authz_func if not authz_func: return True return_code = authz_func(conn, args) if return_code == PreAuthzReturnCode.OK: return True if return_code == PreAuthzReturnCode.ERROR: return False # authz required - the command name is the name of the right to be checked! user = Person( name=conn.get_prop(ConnProps.USER_NAME, ""), org=conn.get_prop(ConnProps.USER_ORG, ""), role=conn.get_prop(ConnProps.USER_ROLE, ""), ) submitter = Person( name=conn.get_prop(ConnProps.SUBMITTER_NAME, ""), org=conn.get_prop(ConnProps.SUBMITTER_ORG, ""), role=conn.get_prop(ConnProps.SUBMITTER_ORG, ""), ) ctx = AuthzContext(user=user, submitter=submitter, right=cmd_entry.name) log.debug("User: {} Submitter: {} Right: {}".format(user, submitter, cmd_entry.name)) authorized, err = AuthorizationService.authorize(ctx) if err: conn.append_error(f"Authorization Error: {err}", meta=make_meta(MetaStatusValue.NOT_AUTHORIZED, err)) return False if not authorized: conn.append_error( "This action is not authorized", meta=make_meta(MetaStatusValue.NOT_AUTHORIZED, "not authorized") ) return False return True