Source code for nvflare.private.fed.server.comp_caller_cmd

# Copyright (c) 2021-2022, NVIDIA CORPORATION.  All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from nvflare.fuel.hci.conn import Connection
from nvflare.fuel.hci.reg import CommandModule, CommandModuleSpec, CommandSpec
from nvflare.private.fed.server.server_engine_internal_spec import ServerEngineInternalSpec
from nvflare.security.security import Action, FLAuthzContext
from nvflare.widgets.comp_caller import ComponentCaller
from nvflare.widgets.widget import WidgetID

from .cmd_utils import CommandUtil


[docs]class ComponentCallerCommandModule(CommandModule, CommandUtil): _CONN_KEY_CALLER = "caller" _CONN_KEY_TARGET = "comp_target" _CONN_KEY_ACTION = "action" _CONN_KEY_PARAMS = "params"
[docs] def get_spec(self): return CommandModuleSpec( name="admin", cmd_specs=[ CommandSpec( name="call", description="issue a call to components", usage="call comp_target action params", handler_func=self.call_component, authz_func=self.authorize_call_component, visible=True, ) ], )
[docs] def authorize_call_component(self, conn: Connection, args: [str]): engine = conn.app_ctx if not isinstance(engine, ServerEngineInternalSpec): raise TypeError("engine must be ServerEngineInternalSpec but got {}".format(type(engine))) caller = engine.get_widget(WidgetID.COMPONENT_CALLER) if not caller: conn.append_error("component caller not available") return False, None if not isinstance(caller, ComponentCaller): conn.append_error("system error: component caller not right object") return False, None conn.set_prop(self._CONN_KEY_CALLER, caller) run_info = engine.get_app_run_info() if not run_info or run_info.job_id < 0: conn.append_string("App is not running") return False, None # validate the command if len(args) < 3: conn.append_error("Syntax error. Usage: {} comp_target action [params...]".format(args[0])) return False, None comp_target = args[1] action = args[2] param_args = args[3:] # parse params params_dict = {} if param_args: for a in param_args: parts = a.split() if len(parts) <= 0: conn.append_error("missing params") return False, None # each param part must be: key=value for p in parts: kvs = p.split("=") if len(kvs) != 2: conn.append_error("Syntax error: params must be key/value pairs separated by =") return False, None params_dict[kvs[0]] = kvs[1] conn.set_prop(self._CONN_KEY_ACTION, action) conn.set_prop(self._CONN_KEY_TARGET, comp_target) conn.set_prop(self._CONN_KEY_PARAMS, params_dict) return True, FLAuthzContext.new_authz_context(site_names=["server"], actions=[Action.TRAIN])
[docs] def call_component(self, conn: Connection, args: [str]): # only support server side for now caller = conn.get_prop(self._CONN_KEY_CALLER) if not isinstance(caller, ComponentCaller): raise TypeError("caller must be ComponentCaller but got {}".format(type(caller))) action = conn.get_prop(self._CONN_KEY_ACTION) comp_target = conn.get_prop(self._CONN_KEY_TARGET) call_params = conn.get_prop(self._CONN_KEY_PARAMS) result = caller.call_components(target=comp_target, action=action, params=call_params) if not result: conn.append_string("No result: no component responded to the call") return # the result is a dict of: target => response table = conn.append_table(["Component", "Response"]) for comp_name, resp in result.items(): table.add_row([comp_name, resp])