Source code for nvflare.fuel.f3.cellnet.net_agent

# Copyright (c) 2023, NVIDIA CORPORATION.  All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import copy
import hashlib
import os
import random
import threading
import time
from abc import ABC
from typing import List, Union

try:
    import resource
except ImportError:
    resource = None

from nvflare.fuel.f3.cellnet.cell import Cell
from nvflare.fuel.f3.cellnet.connector_manager import ConnectorData
from nvflare.fuel.f3.cellnet.core_cell import Message
from nvflare.fuel.f3.cellnet.defs import MessageHeaderKey, ReturnCode
from nvflare.fuel.f3.cellnet.fqcn import FQCN
from nvflare.fuel.f3.cellnet.utils import make_reply
from nvflare.fuel.f3.stats_pool import StatsPoolManager
from nvflare.fuel.utils.admin_name_utils import is_valid_admin_client_name
from nvflare.fuel.utils.config_service import ConfigService
from nvflare.fuel.utils.log_utils import get_obj_logger

_CHANNEL = "_net_manager"
_TOPIC_PEERS = "peers"
_TOPIC_CELLS = "cells"
_TOPIC_ROUTE = "route"
_TOPIC_START_ROUTE = "start_route"
_TOPIC_STOP = "stop"
_TOPIC_STOP_CELL = "stop_cell"
_TOPIC_URL_USE = "url_use"
_TOPIC_CONNS = "conns"
_TOPIC_SPEED = "speed"
_TOPIC_ECHO = "echo"
_TOPIC_STRESS = "stress"
_TOPIC_CHANGE_ROOT = "change_root"
_TOPIC_BULK_TEST = "bulk_test"
_TOPIC_BULK_ITEM = "bulk_item"
_TOPIC_MSG_STATS = "msg_stats"
_TOPIC_LIST_POOLS = "list_pools"
_TOPIC_SHOW_POOL = "show_pool"
_TOPIC_COMM_CONFIG = "comm_config"
_TOPIC_CONFIG_VARS = "config_vars"
_TOPIC_PROCESS_INFO = "process_info"
_TOPIC_HEARTBEAT = "heartbeat"

_ONE_K = bytes([1] * 1024)


class _Member:

    STATE_UNKNOWN = 0
    STATE_ONLINE = 1
    STATE_OFFLINE = 2

    def __init__(self, fqcn):
        self.fqcn = fqcn
        self.state = _Member.STATE_UNKNOWN
        self.last_heartbeat_time = time.time()
        self.lock = threading.Lock()


[docs] class SubnetMonitor(ABC): def __init__(self, subnet_id: str, member_cells: List[str], trouble_alert_threshold: float): if not member_cells: raise ValueError("member cells must not be empty") self.agent = None self.subnet_id = subnet_id self.trouble_alert_threshold = trouble_alert_threshold self.lock = threading.Lock() self.members = {} for m in member_cells: self.members[m] = _Member(m)
[docs] def member_online(self, member_cell_fqcn: str): pass
[docs] def member_offline(self, member_cell_fqcn: str): pass
[docs] def put_member_online(self, member: _Member): with self.lock: member.last_heartbeat_time = time.time() current_state = member.state member.state = member.STATE_ONLINE if current_state in [member.STATE_UNKNOWN, member.STATE_OFFLINE]: self.member_online(member.fqcn)
[docs] def put_member_offline(self, member: _Member): with self.lock: if time.time() - member.last_heartbeat_time <= self.trouble_alert_threshold: return if member.state in [member.STATE_ONLINE]: self.member_offline(member.fqcn) member.state = member.STATE_OFFLINE
[docs] def stop_subnet(self): if not self.agent: raise RuntimeError("No NetAgent in this monitor. Make sure the monitor is added to a NetAgent.") return self.agent.stop_subnet(self)
[docs] class NetAgent: def __init__(self, cell, change_root_cb=None, agent_closed_cb=None): if isinstance(cell, Cell): cell = cell.core_cell self.cell = cell self.change_root_cb = change_root_cb self.agent_closed_cb = agent_closed_cb self.logger = get_obj_logger(self) cell.register_request_cb( channel=_CHANNEL, topic=_TOPIC_CELLS, cb=self._do_report_cells, ) cell.register_request_cb( channel=_CHANNEL, topic=_TOPIC_ROUTE, cb=self._do_route, ) cell.register_request_cb( channel=_CHANNEL, topic=_TOPIC_START_ROUTE, cb=self._do_start_route, ) cell.register_request_cb( channel=_CHANNEL, topic=_TOPIC_STOP, cb=self._do_stop, ) cell.register_request_cb( channel=_CHANNEL, topic=_TOPIC_STOP_CELL, cb=self._do_stop_cell, ) cell.register_request_cb( channel=_CHANNEL, topic=_TOPIC_PEERS, cb=self._do_peers, ) cell.register_request_cb( channel=_CHANNEL, topic=_TOPIC_CONNS, cb=self._do_connectors, ) cell.register_request_cb( channel=_CHANNEL, topic=_TOPIC_URL_USE, cb=self._do_url_use, ) cell.register_request_cb( channel=_CHANNEL, topic=_TOPIC_SPEED, cb=self._do_speed, ) cell.register_request_cb( channel=_CHANNEL, topic=_TOPIC_ECHO, cb=self._do_echo, ) cell.register_request_cb( channel=_CHANNEL, topic=_TOPIC_STRESS, cb=self._do_stress, ) cell.register_request_cb( channel=_CHANNEL, topic=_TOPIC_CHANGE_ROOT, cb=self._do_change_root, ) cell.register_request_cb( channel=_CHANNEL, topic=_TOPIC_BULK_TEST, cb=self._do_bulk_test, ) cell.register_request_cb( channel=_CHANNEL, topic=_TOPIC_BULK_ITEM, cb=self._do_bulk_item, ) cell.register_request_cb( channel=_CHANNEL, topic=_TOPIC_MSG_STATS, cb=self._do_msg_stats, ) cell.register_request_cb( channel=_CHANNEL, topic=_TOPIC_LIST_POOLS, cb=self._do_list_pools, ) cell.register_request_cb( channel=_CHANNEL, topic=_TOPIC_SHOW_POOL, cb=self._do_show_pool, ) cell.register_request_cb( channel=_CHANNEL, topic=_TOPIC_COMM_CONFIG, cb=self._do_comm_config, ) cell.register_request_cb( channel=_CHANNEL, topic=_TOPIC_CONFIG_VARS, cb=self._do_config_vars, ) cell.register_request_cb( channel=_CHANNEL, topic=_TOPIC_PROCESS_INFO, cb=self._do_process_info, ) cell.register_request_cb( channel=_CHANNEL, topic=_TOPIC_HEARTBEAT, cb=self._do_heartbeat, ) self.heartbeat_thread = None self.monitor_thread = None self.asked_to_close = False self.subnets = {} self.monitors = {} self.hb_lock = threading.Lock() self.monitor_lock = threading.Lock()
[docs] def add_to_subnet(self, subnet_id: str, monitor_fqcn: str = FQCN.ROOT_SERVER): with self.hb_lock: self.subnets[subnet_id] = monitor_fqcn if self.heartbeat_thread is None: self.heartbeat_thread = threading.Thread(target=self._subnet_heartbeat) self.heartbeat_thread.start()
[docs] def add_subnet_monitor(self, monitor: SubnetMonitor): if not isinstance(monitor, SubnetMonitor): raise ValueError(f"monitor must be SubnetMonitor but got {type(monitor)}") if monitor.subnet_id in self.monitors: raise ValueError(f"monitor for subnet {monitor.subnet_id} already exists") monitor.agent = self with self.monitor_lock: self.monitors[monitor.subnet_id] = monitor if self.monitor_thread is None: self.monitor_thread = threading.Thread(target=self._monitor_subnet) self.monitor_thread.start()
[docs] def stop_subnet(self, monitor: SubnetMonitor): cells_to_stop = [] for member_fqcn, member in monitor.members.items(): if member.state == member.STATE_ONLINE: cells_to_stop.append(member_fqcn) if cells_to_stop: return self.cell.broadcast_request( channel=_CHANNEL, topic=_TOPIC_STOP_CELL, request=Message(), targets=cells_to_stop, timeout=1.0 ) else: return None
[docs] def delete_subnet_monitor(self, subnet_id: str): with self.monitor_lock: self.monitors.pop(subnet_id, None)
[docs] def close(self): if self.asked_to_close: return self.asked_to_close = True if self.heartbeat_thread and self.heartbeat_thread.is_alive(): self.heartbeat_thread.join() if self.monitor_thread and self.monitor_thread.is_alive(): self.monitor_thread.join() if self.agent_closed_cb: self.agent_closed_cb()
def _subnet_heartbeat(self): cc = self.cell.comm_configurator interval = cc.get_subnet_heartbeat_interval(5.0) if interval <= 0: interval = 5.0 while True: with self.hb_lock: for subnet_id, target in self.subnets.items(): self.cell.fire_and_forget( channel=_CHANNEL, topic=_TOPIC_HEARTBEAT, targets=target, message=Message(payload={"subnet_id": subnet_id}), ) # wait for interval time, but watch for "asked_to_stop" every 0.1 secs start = time.time() while True: time.sleep(0.1) if self.asked_to_close: return if time.time() - start >= interval: break @staticmethod def _check_monitor(m: SubnetMonitor): for member_fqcn, member in m.members.items(): m.put_member_offline(member) def _monitor_subnet(self): while not self.asked_to_close: with self.monitor_lock: monitors = copy.copy(self.monitors) for _, m in monitors.items(): self._check_monitor(m) time.sleep(0.5) def _do_heartbeat(self, request: Message) -> Union[None, Message]: origin = request.get_header(MessageHeaderKey.ORIGIN, "?") if not self.monitors: self.logger.warning(f"got subnet heartbeat from {origin} but no monitors") return payload = request.payload assert isinstance(payload, dict) subnet_id = payload.get("subnet_id", "") m = self.monitors.get(subnet_id) if not m: self.logger.warning(f"got subnet heartbeat from {origin} for subnet_id {subnet_id} but no monitor") return assert isinstance(m, SubnetMonitor) member = m.members.get(origin) if not member: self.logger.warning(f"got subnet heartbeat from {origin} for subnet_id {subnet_id} but it's not a member") return m.put_member_online(member) def _do_stop(self, request: Message) -> Union[None, Message]: self.stop() return None def _do_stop_cell(self, request: Message) -> Union[None, Message]: self.stop() return Message() def _do_route(self, request: Message) -> Union[None, Message]: return Message(payload=dict(request.headers)) def _do_start_route(self, request: Message) -> Union[None, Message]: target_fqcn = request.payload err = FQCN.validate(target_fqcn) if err: return make_reply(ReturnCode.PROCESS_EXCEPTION, f"bad target fqcn {err}") assert isinstance(target_fqcn, str) reply_headers, req_headers = self.get_route_info(target_fqcn) return Message(payload={"request": dict(req_headers), "reply": dict(reply_headers)}) def _do_peers(self, request: Message) -> Union[None, Message]: return Message(payload=list(self.cell.agents.keys()))
[docs] def get_peers(self, target_fqcn: str) -> (Union[None, dict], List[str]): reply = self.cell.send_request( channel=_CHANNEL, topic=_TOPIC_PEERS, target=target_fqcn, timeout=1.0, request=Message() ) err = "" rc = reply.get_header(MessageHeaderKey.RETURN_CODE) if rc == ReturnCode.OK: result = reply.payload if not isinstance(result, list): err = f"reply payload should be list but got {type(reply.payload)}" result = None else: result = None err = f"return code: {rc}" if err: return {"error": err, "reply": reply.headers}, None else: return None, result
@staticmethod def _connector_info(info: ConnectorData) -> dict: return { "url": info.connect_url, "handle": info.handle, "type": "connector" if info.active else "listener", "params": info.params, } def _get_connectors(self) -> dict: cell = self.cell result = {} if cell.int_listener: result["int_listener"] = self._connector_info(cell.int_listener) if cell.ext_listeners: listeners = [self._connector_info(x) for _, x in cell.ext_listeners.items()] result["ext_listeners"] = listeners if cell.bb_ext_connector: result["bb_ext_connector"] = self._connector_info(cell.bb_ext_connector) if cell.bb_int_connector: result["bb_int_connector"] = self._connector_info(cell.bb_int_connector) if cell.adhoc_connectors: conns = {} for k, v in cell.adhoc_connectors.items(): conns[k] = self._connector_info(v) result["adhoc_connectors"] = conns return result def _do_connectors(self, request: Message) -> Union[None, Message]: return Message(payload=self._get_connectors())
[docs] def get_connectors(self, target_fqcn: str) -> (dict, dict): reply = self.cell.send_request( channel=_CHANNEL, topic=_TOPIC_CONNS, target=target_fqcn, timeout=1.0, request=Message() ) rc = reply.get_header(MessageHeaderKey.RETURN_CODE) if rc == ReturnCode.OK: result = reply.payload if not isinstance(result, dict): return { "error": f"reply payload should be dict but got {type(reply.payload)}", "reply": reply.headers, }, {} if not result: return {}, {} else: return {}, result else: return {"error": "processing error", "reply": reply.headers}, {}
[docs] def request_cells_info(self) -> (str, List[str]): result = [self.cell.get_fqcn()] err = "" replies = self._broadcast_to_subs(topic=_TOPIC_CELLS) for t, r in replies.items(): assert isinstance(r, Message) rc = r.get_header(MessageHeaderKey.RETURN_CODE) if rc == ReturnCode.OK: sub_result = r.payload result.extend(sub_result) else: err = f"no reply from {t}: {rc}" result.append(err) return err, result
def _get_url_use_of_cell(self, url: str): cell = self.cell if cell.int_listener and cell.int_listener.connect_url == url: return "int_listen" if cell.ext_listeners: for k in cell.ext_listeners.keys(): if k == url: return "ext_listen" if cell.bb_ext_connector and cell.bb_ext_connector.connect_url == url: return "bb_ext_connect" if cell.bb_int_connector and cell.bb_int_connector.connect_url == url: return "int_connect" if cell.adhoc_connectors: for _, h in cell.adhoc_connectors.items(): if h.connect_url == url: return "adhoc_connect" return "none"
[docs] def get_url_use(self, url) -> dict: result = {self.cell.get_fqcn(): self._get_url_use_of_cell(url)} replies = self._broadcast_to_subs(topic=_TOPIC_URL_USE, message=Message(payload=url)) for t, r in replies.items(): assert isinstance(r, Message) rc = r.get_header(MessageHeaderKey.RETURN_CODE) if rc == ReturnCode.OK: if not isinstance(r.payload, dict): result[t] = f"bad reply type {type(r.payload)}" else: result.update(r.payload) else: result[t] = f"error {rc}" return result
def _do_url_use(self, request: Message) -> Union[None, Message]: results = self.get_url_use(request.payload) return Message(payload=results)
[docs] def get_route_info(self, target_fqcn: str) -> (dict, dict): reply = self.cell.send_request( channel=_CHANNEL, topic=_TOPIC_ROUTE, target=target_fqcn, timeout=1.0, request=Message() ) reply_headers = reply.headers rc = reply.get_header(MessageHeaderKey.RETURN_CODE, ReturnCode.OK) if rc == ReturnCode.OK: if not isinstance(reply.payload, dict): return reply_headers, {"error": f"reply payload got {type(reply.payload)}"} return reply_headers, reply.payload else: return reply_headers, {"error": f"Reply ReturnCode: {rc}"}
[docs] def start_route(self, from_fqcn: str, target_fqcn: str) -> (str, dict, dict): err = "" reply_headers = {} req_headers = {} reply = self.cell.send_request( channel=_CHANNEL, topic=_TOPIC_START_ROUTE, target=from_fqcn, timeout=1.0, request=Message(payload=target_fqcn), ) rc = reply.get_header(MessageHeaderKey.RETURN_CODE) if rc == ReturnCode.OK: result = reply.payload if not isinstance(result, dict): err = f"reply payload should be dict but got {type(reply.payload)}" else: reply_headers = result.get("reply") req_headers = result.get("request") else: err = f"error in reply {rc}" reply_headers = reply.headers return err, reply_headers, req_headers
def _do_report_cells(self, request: Message) -> Union[None, Message]: _, results = self.request_cells_info() return Message(payload=results)
[docs] def stop(self): # ask all children to stop self._broadcast_to_subs(topic=_TOPIC_STOP, timeout=0.0) self.close()
[docs] def stop_cell(self, target: str) -> str: reply = self.cell.send_request( channel=_CHANNEL, topic=_TOPIC_STOP_CELL, request=Message(), target=target, timeout=1.0 ) rc = reply.get_header(MessageHeaderKey.RETURN_CODE) return rc
def _request_speed_test(self, target_fqcn: str, num, size) -> Message: start = time.perf_counter() payload = bytes(_ONE_K * size) payload_size = len(payload) h = hashlib.md5(payload) dig1 = h.digest() end = time.perf_counter() payload_prep_time = end - start errs = 0 timeouts = 0 comm_errs = 0 proc_errs = 0 size_errs = 0 start = time.perf_counter() for i in range(num): r = self.cell.send_request( channel=_CHANNEL, topic=_TOPIC_ECHO, target=target_fqcn, request=Message(payload=payload), timeout=10.0, ) rc = r.get_header(MessageHeaderKey.RETURN_CODE, ReturnCode.OK) if rc == ReturnCode.OK: if len(r.payload) != payload_size: self.cell.logger.error( f"{self.cell.get_fqcn()}: expect {payload_size} bytes but received {len(r.payload)}" ) proc_errs += 1 else: h = hashlib.md5(r.payload) dig2 = h.digest() if dig1 != dig2: self.cell.logger.error(f"{self.cell.get_fqcn()}: digest mismatch!") proc_errs += 1 elif rc == ReturnCode.TIMEOUT: timeouts += 1 elif rc == ReturnCode.COMM_ERROR: comm_errs += 1 elif rc == ReturnCode.MSG_TOO_BIG: size_errs += 1 else: errs += 1 end = time.perf_counter() total = end - start avg = total / num return Message( payload={ "test": f"{size:,}KB {num} rounds between {self.cell.get_fqcn()} and {target_fqcn}", "prep": payload_prep_time, "timeouts": timeouts, "comm_errors": comm_errs, "size_errors": size_errs, "proc_errors": proc_errs, "other_errors": errs, "total": total, "average": avg, } ) def _do_speed(self, request: Message) -> Union[None, Message]: params = request.payload if not isinstance(params, dict): return make_reply(ReturnCode.INVALID_REQUEST, f"request body must be dict but got {type(params)}") to_fqcn = params.get("to") if not to_fqcn: return make_reply(ReturnCode.INVALID_REQUEST, "missing 'to' param in request") err = FQCN.validate(to_fqcn) if err: return make_reply(ReturnCode.INVALID_REQUEST, f"bad target FQCN: {err}") num = params.get("num", 100) size = params.get("size", 1000) if size <= 0: size = 1000 if num <= 0: num = 100 return self._request_speed_test(to_fqcn, num, size) def _do_echo(self, request: Message) -> Union[None, Message]: return Message(payload=request.payload) def _do_stress_test(self, params): if not isinstance(params, dict): return {"error": f"bad params - expect dict but got {type(params)}"} targets = params.get("targets") if not targets: return {"error": "no targets specified"} num_rounds = params.get("num") if not num_rounds: return {"error": "missing num of rounds"} my_fqcn = self.cell.get_fqcn() if my_fqcn in targets: targets.remove(my_fqcn) if not targets: return {"error": "no targets to try"} counts = {} errors = {} start = time.perf_counter() for i in range(num_rounds): payload = os.urandom(1024) h = hashlib.md5(payload) d1 = h.digest() target = targets[random.randrange(len(targets))] req = Message(payload=payload) reply = self.cell.send_request(channel=_CHANNEL, topic=_TOPIC_ECHO, target=target, request=req, timeout=1.0) if target not in counts: counts[target] = 0 counts[target] += 1 rc = reply.get_header(MessageHeaderKey.RETURN_CODE, ReturnCode.OK) if rc != ReturnCode.OK: self.cell.logger.error(f"{self.cell.get_fqcn()}: return code from {target}: {rc}") if target not in errors: errors[target] = 0 errors[target] += 1 else: h = hashlib.md5(reply.payload) d2 = h.digest() if d1 != d2: self.cell.logger.error(f"{self.cell.get_fqcn()}: digest mismatch from {target}") if target not in errors: errors[target] = 0 errors[target] += 1 end = time.perf_counter() return {"counts": counts, "errors": errors, "time": end - start} def _do_stress(self, request: Message) -> Union[None, Message]: params = request.payload result = self._do_stress_test(params) return Message(payload=result)
[docs] def start_stress_test(self, targets: list, num_rounds=10, timeout=5.0): self.cell.logger.info(f"{self.cell.get_fqcn()}: starting stress test on {targets}") result = {} payload = {"targets": targets, "num": num_rounds} msg_targets = [x for x in targets] my_fqcn = self.cell.get_fqcn() if my_fqcn in msg_targets: msg_targets.remove(my_fqcn) if not msg_targets: return {"error": "no targets for stress test"} replies = self.cell.broadcast_request( channel=_CHANNEL, topic=_TOPIC_STRESS, targets=msg_targets, request=Message(payload=payload), timeout=timeout, ) for t, r in replies.items(): rc = r.get_header(MessageHeaderKey.RETURN_CODE, ReturnCode.OK) if rc != ReturnCode.OK: result[t] = f"RC={rc}" else: result[t] = r.payload return result
[docs] def speed_test(self, from_fqcn: str, to_fqcn: str, num_tries, payload_size) -> dict: err = FQCN.validate(from_fqcn) if err: return {"error": f"invalid from_fqcn {from_fqcn}: {err}"} err = FQCN.validate(to_fqcn) if err: return {"error": f"invalid to_fqcn {to_fqcn}: {err}"} result = {} start = time.perf_counter() reply = self.cell.send_request( channel=_CHANNEL, topic=_TOPIC_SPEED, request=Message(payload={"to": to_fqcn, "num": num_tries, "size": payload_size}), target=from_fqcn, timeout=100.0, ) end = time.perf_counter() result["test_time"] = end - start rc = reply.get_header(MessageHeaderKey.RETURN_CODE, ReturnCode.OK) if rc != ReturnCode.OK: result.update({"error": f"return code {rc}"}) elif not isinstance(reply.payload, dict): result.update({"error": f"bad reply: expect dict but got {type(reply.payload)}"}) else: result.update(reply.payload) return result
[docs] def change_root(self, new_root_url: str): self._broadcast_to_subs(topic=_TOPIC_CHANGE_ROOT, message=Message(payload=new_root_url), timeout=0.0)
def _do_change_root(self, request: Message) -> Union[None, Message]: new_root_url = request.payload assert isinstance(new_root_url, str) self.change_root(new_root_url) if self.change_root_cb is not None: self.change_root_cb(new_root_url) return None
[docs] def start_bulk_test(self, targets: list, size: int): self.cell.logger.info(f"{self.cell.get_fqcn()}: starting bulk test on {targets}") msg_targets = [x for x in targets] my_fqcn = self.cell.get_fqcn() if my_fqcn in msg_targets: msg_targets.remove(my_fqcn) if not msg_targets: return {"error": "no targets for bulk test"} result = {} replies = self.cell.broadcast_request( channel=_CHANNEL, topic=_TOPIC_BULK_TEST, targets=msg_targets, request=Message(payload=size), timeout=1.0, ) for t, r in replies.items(): rc = r.get_header(MessageHeaderKey.RETURN_CODE, ReturnCode.OK) if rc != ReturnCode.OK: result[t] = f"RC={rc}" else: result[t] = r.payload return result
def _do_bulk_test(self, request: Message) -> Union[None, Message]: size = request.payload assert isinstance(size, int) nums = [] for _ in range(size): num = random.randint(0, 100) nums.append(num) msg = Message(payload=num) self.cell.queue_message( channel=_CHANNEL, topic=_TOPIC_BULK_ITEM, targets=FQCN.ROOT_SERVER, message=msg, ) return Message(payload=f"queued: {nums}") def _do_bulk_item(self, request: Message) -> Union[None, Message]: num = request.payload origin = request.get_header(MessageHeaderKey.ORIGIN) self.cell.logger.info(f"{self.cell.get_fqcn()}: got {num} from {origin}") return None
[docs] def get_msg_stats_table(self, target: str, mode: str): reply = self.cell.send_request( channel=_CHANNEL, topic=_TOPIC_MSG_STATS, request=Message(payload={"mode": mode}), timeout=1.0, target=target, ) rc = reply.get_header(MessageHeaderKey.RETURN_CODE) if rc != ReturnCode.OK: return f"error: {rc}" return reply.payload
def _do_msg_stats(self, request: Message) -> Union[None, Message]: p = request.payload assert isinstance(p, dict) mode = p.get("mode") headers, rows = self.cell.msg_stats_pool.get_table(mode) reply = {"headers": headers, "rows": rows} return Message(payload=reply)
[docs] def get_pool_list(self, target: str): reply = self.cell.send_request( channel=_CHANNEL, topic=_TOPIC_LIST_POOLS, request=Message(), timeout=1.0, target=target ) rc = reply.get_header(MessageHeaderKey.RETURN_CODE) err = reply.get_header(MessageHeaderKey.ERROR, "") if rc != ReturnCode.OK: return f"{rc}: {err}" return reply.payload
def _do_list_pools(self, request: Message) -> Union[None, Message]: headers, rows = StatsPoolManager.get_table() reply = {"headers": headers, "rows": rows} return Message(payload=reply)
[docs] def show_pool(self, target: str, pool_name: str, mode: str): reply = self.cell.send_request( channel=_CHANNEL, topic=_TOPIC_SHOW_POOL, request=Message(payload={"mode": mode, "pool": pool_name}), timeout=1.0, target=target, ) rc = reply.get_header(MessageHeaderKey.RETURN_CODE) if rc != ReturnCode.OK: err = reply.get_header(MessageHeaderKey.ERROR, "") return f"{rc}: {err}" return reply.payload
def _do_show_pool(self, request: Message) -> Union[None, Message]: p = request.payload assert isinstance(p, dict) pool_name = p.get("pool", "") mode = p.get("mode", "") pool = StatsPoolManager.get_pool(pool_name) if not pool: return Message( headers={ MessageHeaderKey.RETURN_CODE: ReturnCode.INVALID_REQUEST, MessageHeaderKey.ERROR: f"unknown pool '{pool_name}'", } ) headers, rows = pool.get_table(mode) reply = {"headers": headers, "rows": rows} return Message(payload=reply)
[docs] def get_comm_config(self, target: str): reply = self.cell.send_request( channel=_CHANNEL, topic=_TOPIC_COMM_CONFIG, request=Message(), timeout=1.0, target=target ) rc = reply.get_header(MessageHeaderKey.RETURN_CODE) if rc != ReturnCode.OK: err = reply.get_header(MessageHeaderKey.ERROR, "") return f"{rc}: {err}" return reply.payload
[docs] def get_config_vars(self, target: str): reply = self.cell.send_request( channel=_CHANNEL, topic=_TOPIC_CONFIG_VARS, request=Message(), timeout=1.0, target=target ) rc = reply.get_header(MessageHeaderKey.RETURN_CODE) if rc != ReturnCode.OK: err = reply.get_header(MessageHeaderKey.ERROR, "") return f"{rc}: {err}" return reply.payload
[docs] def get_process_info(self, target: str): reply = self.cell.send_request( channel=_CHANNEL, topic=_TOPIC_PROCESS_INFO, request=Message(), timeout=1.0, target=target ) rc = reply.get_header(MessageHeaderKey.RETURN_CODE) if rc != ReturnCode.OK: err = reply.get_header(MessageHeaderKey.ERROR, "") return f"{rc}: {err}" return reply.payload
def _do_comm_config(self, request: Message) -> Union[None, Message]: info = self.cell.connector_manager.get_config_info() return Message(payload=info) def _do_config_vars(self, request: Message) -> Union[None, Message]: info = ConfigService.get_var_values() return Message(payload=info) def _do_process_info(self, request: Message) -> Union[None, Message]: if resource is not None: usage = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss else: usage = "N/A (not supported on this platform)" rows = [ ["Process ID", str(os.getpid())], ["Memory Usage", str(usage)], ["Thread Count", str(threading.active_count())], ] for thread in threading.enumerate(): rows.append([f"Thread:{thread.ident}", thread.name]) return Message(payload={"headers": ["Resource", "Value"], "rows": rows}) def _broadcast_to_subs(self, topic: str, message=None, timeout=1.0): if not message: message = Message() children, clients = self.cell.get_sub_cell_names() targets = [] for c in children: if not is_valid_admin_client_name(c): targets.append(c) for c in clients: if not is_valid_admin_client_name(c): targets.append(c) if targets: if timeout > 0.0: if self.cell.my_info.is_root and self.cell.my_info.is_on_server: timeout = timeout + 0.1 else: timeout = timeout / self.cell.my_info.gen return self.cell.broadcast_request( channel=_CHANNEL, topic=topic, targets=targets, request=message, timeout=timeout ) else: self.cell.fire_and_forget(channel=_CHANNEL, topic=topic, targets=targets, message=message) return {}