Source code for nvflare.fuel.f3.cellnet.net_manager

# Copyright (c) 2023, NVIDIA CORPORATION.  All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from nvflare.fuel.f3.cellnet.fqcn import FQCN
from nvflare.fuel.f3.cellnet.net_agent import NetAgent
from nvflare.fuel.f3.stats_pool import VALID_HIST_MODES, parse_hist_mode
from nvflare.fuel.hci.conn import Connection
from nvflare.fuel.hci.reg import CommandModule, CommandModuleSpec, CommandSpec
from nvflare.fuel.hci.server.constants import ConnProps
from nvflare.security.logging import secure_format_exception


def _to_int(s: str):
    try:
        return int(s)
    except Exception as ex:
        return f"'{s}' is not a valid number: {secure_format_exception(ex)}"


[docs]class NetManager(CommandModule): def __init__(self, agent: NetAgent, diagnose=False): self.agent = agent self.diagnose = diagnose
[docs] def get_spec(self) -> CommandModuleSpec: return CommandModuleSpec( name="cellnet", cmd_specs=[ CommandSpec( name="cells", description="get system cells info", usage="cells", handler_func=self._cmd_cells, visible=self.diagnose, ), CommandSpec( name="route", description="send message to a cell and show route", usage="route to_cell [from_cell]", handler_func=self._cmd_route, visible=self.diagnose, ), CommandSpec( name="peers", description="show connected peers of a cell", usage="peers target_cell", handler_func=self._cmd_peers, visible=self.diagnose, ), CommandSpec( name="conns", description="show connectors of a cell", usage="conns target_cell", handler_func=self._cmd_connectors, visible=self.diagnose, ), CommandSpec( name="url_use", description="show use of a url", usage="url_use url", handler_func=self._cmd_url_use, visible=self.diagnose, ), CommandSpec( name="speed", description="test communication speed between cells", usage="speed from_fqcn to_fqcn [num_tries] [payload_size]", handler_func=self._cmd_speed_test, visible=self.diagnose, ), CommandSpec( name="stress", description="stress test communication among cells", usage="stress [num_tries] [timeout]", handler_func=self._cmd_stress_test, visible=self.diagnose, ), CommandSpec( name="bulk", description="test bulk messaging - each client sends a bulk to server", usage="bulk [bulk_size]", handler_func=self._cmd_bulk_test, visible=self.diagnose, enabled=self.diagnose, ), CommandSpec( name="change_root", description="change to a new root server", usage="change_root url", handler_func=self._cmd_change_root, visible=self.diagnose, enabled=self.diagnose, ), CommandSpec( name="msg_stats", description="show request stats", usage="msg_stats target [mode]", handler_func=self._cmd_msg_stats, visible=self.diagnose, ), CommandSpec( name="list_pools", description="list stats pools", usage="list_pools target", handler_func=self._cmd_list_pools, visible=self.diagnose, ), CommandSpec( name="show_pool", description="show stats pool detail", usage="show_pool target pool_name [mode]", handler_func=self._cmd_show_pool, visible=self.diagnose, ), CommandSpec( name="show_comm_config", description="show communication config", usage="show_comm_config target", handler_func=self._cmd_show_comm_config, visible=self.diagnose, ), CommandSpec( name="show_config_vars", description="show all defined config var values", usage="show_config_vars target", handler_func=self._cmd_show_config_vars, visible=self.diagnose, ), CommandSpec( name="process_info", description="show process information", usage="process_info target", handler_func=self._cmd_process_info, visible=self.diagnose, ), CommandSpec( name="stop_cell", description="stop a cell and its children", usage="stop_cell target", handler_func=self._cmd_stop_cell, visible=self.diagnose, enabled=self.diagnose, ), CommandSpec( name="stop_net", description="stop the whole cellnet", usage="stop_net", handler_func=self._cmd_stop_net, visible=self.diagnose, enabled=self.diagnose, ), ], )
def _cmd_cells(self, conn: Connection, args: [str]): err, cell_fqcns = self.agent.request_cells_info() if err: conn.append_error(err) total_cells = 0 if cell_fqcns: for c in cell_fqcns: conn.append_string(c) err = FQCN.validate(c) if not err: total_cells += 1 conn.append_string(f"Total Cells: {total_cells}") def _cmd_url_use(self, conn: Connection, args: [str]): if len(args) != 2: cmd_entry = conn.get_prop(ConnProps.CMD_ENTRY) conn.append_string(f"Usage: {cmd_entry.usage}") return url = args[1] results = self.agent.get_url_use(url) useless_cells = [] for k, v in results.items(): if v == "none": useless_cells.append(k) for k in useless_cells: results.pop(k) if not results: conn.append_string(f"No cell uses {url}") else: conn.append_dict(results) def _cmd_route(self, conn: Connection, args: [str]): if len(args) < 2: cmd_entry = conn.get_prop(ConnProps.CMD_ENTRY) conn.append_string(f"Usage: {cmd_entry.usage}") return target_fqcn = args[1] from_fqcn = "server" if len(args) > 2: from_fqcn = args[2] err, reply_headers, req_headers = self.agent.start_route(from_fqcn, target_fqcn) conn.append_string(f"Route Info from {from_fqcn} to {target_fqcn}") if err: conn.append_error(err) if req_headers: conn.append_string("Request Headers:") conn.append_dict(req_headers) if reply_headers: conn.append_string("Reply Headers:") conn.append_dict(reply_headers) def _cmd_peers(self, conn: Connection, args: [str]): if len(args) != 2: cmd_entry = conn.get_prop(ConnProps.CMD_ENTRY) conn.append_string(f"Usage: {cmd_entry.usage}") return target_fqcn = args[1] err_dict, agents = self.agent.get_peers(target_fqcn) if err_dict: conn.append_dict(err_dict) if agents: for a in agents: conn.append_string(a) conn.append_string(f"Total Agents: {len(agents)}") else: conn.append_string("No peers") def _cmd_connectors(self, conn: Connection, args: [str]): if len(args) != 2: cmd_entry = conn.get_prop(ConnProps.CMD_ENTRY) conn.append_string(f"Usage: {cmd_entry.usage}") return target_fqcn = args[1] err_dict, result = self.agent.get_connectors(target_fqcn) if err_dict: conn.append_dict(err_dict) if result: conn.append_dict(result) def _cmd_speed_test(self, conn: Connection, args: [str]): if len(args) < 3: cmd_entry = conn.get_prop(ConnProps.CMD_ENTRY) conn.append_string(f"Usage: {cmd_entry.usage}") return from_fqcn = args[1] to_fqcn = args[2] num_tries = 100 payload_size = 1000 if len(args) > 3: num_tries = _to_int(args[3]) if not isinstance(num_tries, int): conn.append_error(num_tries) return if len(args) > 4: payload_size = _to_int(args[4]) if not isinstance(payload_size, int): conn.append_error(payload_size) return result = self.agent.speed_test( from_fqcn=from_fqcn, to_fqcn=to_fqcn, num_tries=num_tries, payload_size=payload_size ) conn.append_dict(result) def _cmd_stress_test(self, conn: Connection, args: [str]): num_tries = 10 timeout = 5.0 if len(args) > 1: num_tries = _to_int(args[1]) if not isinstance(num_tries, int): conn.append_error(num_tries) return if len(args) > 2: timeout = _to_int(args[2]) if not isinstance(timeout, int): conn.append_error(timeout) return err, targets = self.agent.request_cells_info() if err: conn.append_error(err) if not targets: conn.append_error("no targets to test") conn.append_string(f"starting stress test on {targets}", flush=True) result = self.agent.start_stress_test(targets=targets, num_rounds=num_tries, timeout=timeout) total_errors = 0 for t, v in result.items(): if not isinstance(v, dict): continue err_dict = v.get("errors") cell_errs = 0 for _, c in err_dict.items(): cell_errs += c total_errors += cell_errs if cell_errs == 0: v.pop("errors") conn.append_dict(result) conn.append_string(f"total errors: {total_errors}") def _cmd_bulk_test(self, conn: Connection, args: [str]): bulk_size = 1 if len(args) > 1: bulk_size = _to_int(args[1]) if not isinstance(bulk_size, int): conn.append_error(bulk_size) return err, targets = self.agent.request_cells_info() if err: conn.append_error(err) if not targets: conn.append_error("no targets to test") conn.append_string(f"starting bulk test on {targets}", flush=True) result = self.agent.start_bulk_test(targets, bulk_size) conn.append_dict(result) @staticmethod def _show_table_dict(conn: Connection, d: dict): t = conn.append_table(d.get("headers")) rows = d.get("rows") for r in rows: t.add_row(r) def _cmd_msg_stats(self, conn: Connection, args: [str]): if len(args) < 2: cmd_entry = conn.get_prop(ConnProps.CMD_ENTRY) conn.append_string(f"Usage: {cmd_entry.usage}") return target = args[1] mode = "" if len(args) > 2: mode = args[2] mode = parse_hist_mode(mode) if not mode: conn.append_error(f"invalid mode '{mode}': must be one of {VALID_HIST_MODES}") return reply = self.agent.get_msg_stats_table(target, mode) if isinstance(reply, str): conn.append_error(reply) return if not isinstance(reply, dict): conn.append_error(f"expect dict bt got {type(reply)}") return self._show_table_dict(conn, reply) def _cmd_show_pool(self, conn: Connection, args: [str]): if len(args) < 3: cmd_entry = conn.get_prop(ConnProps.CMD_ENTRY) conn.append_string(f"Usage: {cmd_entry.usage}") return target = args[1] pool_name = args[2] mode = "" if len(args) > 3: mode = args[3] mode = parse_hist_mode(mode) if not mode: conn.append_error(f"invalid mode '{mode}': must be one of {VALID_HIST_MODES}") return reply = self.agent.show_pool(target, pool_name, mode) if isinstance(reply, str): conn.append_error(reply) return if not isinstance(reply, dict): conn.append_error(f"expect dict bt got {type(reply)}") return self._show_table_dict(conn, reply) def _cmd_list_pools(self, conn: Connection, args: [str]): if len(args) < 2: cmd_entry = conn.get_prop(ConnProps.CMD_ENTRY) conn.append_string(f"Usage: {cmd_entry.usage}") return target = args[1] reply = self.agent.get_pool_list(target) if isinstance(reply, str): conn.append_error(reply) return if not isinstance(reply, dict): conn.append_error(f"expect dict bt got {type(reply)}") return self._show_table_dict(conn, reply) def _cmd_show_comm_config(self, conn: Connection, args: [str]): if len(args) < 2: cmd_entry = conn.get_prop(ConnProps.CMD_ENTRY) conn.append_string(f"Usage: {cmd_entry.usage}") return target = args[1] reply = self.agent.get_comm_config(target) if isinstance(reply, str): conn.append_error(reply) return if not isinstance(reply, dict): conn.append_error(f"expect dict bt got {type(reply)}") return conn.append_dict(reply) def _cmd_show_config_vars(self, conn: Connection, args: [str]): if len(args) < 2: cmd_entry = conn.get_prop(ConnProps.CMD_ENTRY) conn.append_string(f"Usage: {cmd_entry.usage}") return target = args[1] reply = self.agent.get_config_vars(target) if isinstance(reply, str): conn.append_error(reply) return if not isinstance(reply, dict): conn.append_error(f"expect dict bt got {type(reply)}") return conn.append_dict(reply) def _cmd_process_info(self, conn: Connection, args: [str]): if len(args) < 2: cmd_entry = conn.get_prop(ConnProps.CMD_ENTRY) conn.append_string(f"Usage: {cmd_entry.usage}") return target = args[1] reply = self.agent.get_process_info(target) if isinstance(reply, str): conn.append_error(reply) return if not isinstance(reply, dict): conn.append_error(f"expect dict bt got {type(reply)}") return self._show_table_dict(conn, reply) def _cmd_change_root(self, conn: Connection, args: [str]): if len(args) < 2: cmd_entry = conn.get_prop(ConnProps.CMD_ENTRY) conn.append_string(f"Usage: {cmd_entry.usage}") return url = args[1] self.agent.change_root(url) conn.append_string("Asked to change root") def _cmd_stop_net(self, conn: Connection, args: [str]): self.agent.stop() conn.append_shutdown("Cellnet Stopped") def _cmd_stop_cell(self, conn: Connection, args: [str]): if len(args) < 2: cmd_entry = conn.get_prop(ConnProps.CMD_ENTRY) conn.append_string(f"Usage: {cmd_entry.usage}") return target = args[1] reply = self.agent.stop_cell(target) conn.append_string(f"Asked {target} to stop: {reply}")