# Copyright (c) 2023, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import copy
import hashlib
import logging
import os
import random
import resource
import threading
import time
from abc import ABC
from typing import List, Union
from nvflare.fuel.f3.cellnet.cell import Cell
from nvflare.fuel.f3.cellnet.connector_manager import ConnectorData
from nvflare.fuel.f3.cellnet.core_cell import Message
from nvflare.fuel.f3.cellnet.defs import MessageHeaderKey, ReturnCode
from nvflare.fuel.f3.cellnet.fqcn import FQCN
from nvflare.fuel.f3.cellnet.utils import make_reply
from nvflare.fuel.f3.stats_pool import StatsPoolManager
from nvflare.fuel.utils.config_service import ConfigService
_CHANNEL = "_net_manager"
_TOPIC_PEERS = "peers"
_TOPIC_CELLS = "cells"
_TOPIC_ROUTE = "route"
_TOPIC_START_ROUTE = "start_route"
_TOPIC_STOP = "stop"
_TOPIC_STOP_CELL = "stop_cell"
_TOPIC_URL_USE = "url_use"
_TOPIC_CONNS = "conns"
_TOPIC_SPEED = "speed"
_TOPIC_ECHO = "echo"
_TOPIC_STRESS = "stress"
_TOPIC_CHANGE_ROOT = "change_root"
_TOPIC_BULK_TEST = "bulk_test"
_TOPIC_BULK_ITEM = "bulk_item"
_TOPIC_MSG_STATS = "msg_stats"
_TOPIC_LIST_POOLS = "list_pools"
_TOPIC_SHOW_POOL = "show_pool"
_TOPIC_COMM_CONFIG = "comm_config"
_TOPIC_CONFIG_VARS = "config_vars"
_TOPIC_PROCESS_INFO = "process_info"
_TOPIC_HEARTBEAT = "heartbeat"
_ONE_K = bytes([1] * 1024)
class _Member:
STATE_UNKNOWN = 0
STATE_ONLINE = 1
STATE_OFFLINE = 2
def __init__(self, fqcn):
self.fqcn = fqcn
self.state = _Member.STATE_UNKNOWN
self.last_heartbeat_time = time.time()
self.lock = threading.Lock()
[docs]class SubnetMonitor(ABC):
def __init__(self, subnet_id: str, member_cells: List[str], trouble_alert_threshold: float):
if not member_cells:
raise ValueError("member cells must not be empty")
self.agent = None
self.subnet_id = subnet_id
self.trouble_alert_threshold = trouble_alert_threshold
self.lock = threading.Lock()
self.members = {}
for m in member_cells:
self.members[m] = _Member(m)
[docs] def member_online(self, member_cell_fqcn: str):
pass
[docs] def member_offline(self, member_cell_fqcn: str):
pass
[docs] def put_member_online(self, member: _Member):
with self.lock:
member.last_heartbeat_time = time.time()
current_state = member.state
member.state = member.STATE_ONLINE
if current_state in [member.STATE_UNKNOWN, member.STATE_OFFLINE]:
self.member_online(member.fqcn)
[docs] def put_member_offline(self, member: _Member):
with self.lock:
if time.time() - member.last_heartbeat_time <= self.trouble_alert_threshold:
return
if member.state in [member.STATE_ONLINE]:
self.member_offline(member.fqcn)
member.state = member.STATE_OFFLINE
[docs] def stop_subnet(self):
if not self.agent:
raise RuntimeError("No NetAgent in this monitor. Make sure the monitor is added to a NetAgent.")
return self.agent.stop_subnet(self)
[docs]class NetAgent:
def __init__(self, cell, change_root_cb=None, agent_closed_cb=None):
if isinstance(cell, Cell):
cell = cell.core_cell
self.cell = cell
self.change_root_cb = change_root_cb
self.agent_closed_cb = agent_closed_cb
self.logger = logging.getLogger(self.__class__.__name__)
cell.register_request_cb(
channel=_CHANNEL,
topic=_TOPIC_CELLS,
cb=self._do_report_cells,
)
cell.register_request_cb(
channel=_CHANNEL,
topic=_TOPIC_ROUTE,
cb=self._do_route,
)
cell.register_request_cb(
channel=_CHANNEL,
topic=_TOPIC_START_ROUTE,
cb=self._do_start_route,
)
cell.register_request_cb(
channel=_CHANNEL,
topic=_TOPIC_STOP,
cb=self._do_stop,
)
cell.register_request_cb(
channel=_CHANNEL,
topic=_TOPIC_STOP_CELL,
cb=self._do_stop_cell,
)
cell.register_request_cb(
channel=_CHANNEL,
topic=_TOPIC_PEERS,
cb=self._do_peers,
)
cell.register_request_cb(
channel=_CHANNEL,
topic=_TOPIC_CONNS,
cb=self._do_connectors,
)
cell.register_request_cb(
channel=_CHANNEL,
topic=_TOPIC_URL_USE,
cb=self._do_url_use,
)
cell.register_request_cb(
channel=_CHANNEL,
topic=_TOPIC_SPEED,
cb=self._do_speed,
)
cell.register_request_cb(
channel=_CHANNEL,
topic=_TOPIC_ECHO,
cb=self._do_echo,
)
cell.register_request_cb(
channel=_CHANNEL,
topic=_TOPIC_STRESS,
cb=self._do_stress,
)
cell.register_request_cb(
channel=_CHANNEL,
topic=_TOPIC_CHANGE_ROOT,
cb=self._do_change_root,
)
cell.register_request_cb(
channel=_CHANNEL,
topic=_TOPIC_BULK_TEST,
cb=self._do_bulk_test,
)
cell.register_request_cb(
channel=_CHANNEL,
topic=_TOPIC_BULK_ITEM,
cb=self._do_bulk_item,
)
cell.register_request_cb(
channel=_CHANNEL,
topic=_TOPIC_MSG_STATS,
cb=self._do_msg_stats,
)
cell.register_request_cb(
channel=_CHANNEL,
topic=_TOPIC_LIST_POOLS,
cb=self._do_list_pools,
)
cell.register_request_cb(
channel=_CHANNEL,
topic=_TOPIC_SHOW_POOL,
cb=self._do_show_pool,
)
cell.register_request_cb(
channel=_CHANNEL,
topic=_TOPIC_COMM_CONFIG,
cb=self._do_comm_config,
)
cell.register_request_cb(
channel=_CHANNEL,
topic=_TOPIC_CONFIG_VARS,
cb=self._do_config_vars,
)
cell.register_request_cb(
channel=_CHANNEL,
topic=_TOPIC_PROCESS_INFO,
cb=self._do_process_info,
)
cell.register_request_cb(
channel=_CHANNEL,
topic=_TOPIC_HEARTBEAT,
cb=self._do_heartbeat,
)
self.heartbeat_thread = None
self.monitor_thread = None
self.asked_to_close = False
self.subnets = {}
self.monitors = {}
self.hb_lock = threading.Lock()
self.monitor_lock = threading.Lock()
[docs] def add_to_subnet(self, subnet_id: str, monitor_fqcn: str = FQCN.ROOT_SERVER):
with self.hb_lock:
self.subnets[subnet_id] = monitor_fqcn
if self.heartbeat_thread is None:
self.heartbeat_thread = threading.Thread(target=self._subnet_heartbeat)
self.heartbeat_thread.start()
[docs] def add_subnet_monitor(self, monitor: SubnetMonitor):
if not isinstance(monitor, SubnetMonitor):
raise ValueError(f"monitor must be SubnetMonitor but got {type(monitor)}")
if monitor.subnet_id in self.monitors:
raise ValueError(f"monitor for subnet {monitor.subnet_id} already exists")
monitor.agent = self
with self.monitor_lock:
self.monitors[monitor.subnet_id] = monitor
if self.monitor_thread is None:
self.monitor_thread = threading.Thread(target=self._monitor_subnet)
self.monitor_thread.start()
[docs] def stop_subnet(self, monitor: SubnetMonitor):
cells_to_stop = []
for member_fqcn, member in monitor.members.items():
if member.state == member.STATE_ONLINE:
cells_to_stop.append(member_fqcn)
if cells_to_stop:
return self.cell.broadcast_request(
channel=_CHANNEL, topic=_TOPIC_STOP_CELL, request=Message(), targets=cells_to_stop, timeout=1.0
)
else:
return None
[docs] def delete_subnet_monitor(self, subnet_id: str):
with self.monitor_lock:
self.monitors.pop(subnet_id, None)
[docs] def close(self):
if self.asked_to_close:
return
self.asked_to_close = True
if self.heartbeat_thread and self.heartbeat_thread.is_alive():
self.heartbeat_thread.join()
if self.monitor_thread and self.monitor_thread.is_alive():
self.monitor_thread.join()
if self.agent_closed_cb:
self.agent_closed_cb()
def _subnet_heartbeat(self):
cc = self.cell.comm_configurator
interval = cc.get_subnet_heartbeat_interval(5.0)
if interval <= 0:
interval = 5.0
while True:
with self.hb_lock:
for subnet_id, target in self.subnets.items():
self.cell.fire_and_forget(
channel=_CHANNEL,
topic=_TOPIC_HEARTBEAT,
targets=target,
message=Message(payload={"subnet_id": subnet_id}),
)
# wait for interval time, but watch for "asked_to_stop" every 0.1 secs
start = time.time()
while True:
time.sleep(0.1)
if self.asked_to_close:
return
if time.time() - start >= interval:
break
@staticmethod
def _check_monitor(m: SubnetMonitor):
for member_fqcn, member in m.members.items():
m.put_member_offline(member)
def _monitor_subnet(self):
while not self.asked_to_close:
with self.monitor_lock:
monitors = copy.copy(self.monitors)
for _, m in monitors.items():
self._check_monitor(m)
time.sleep(0.5)
def _do_heartbeat(self, request: Message) -> Union[None, Message]:
origin = request.get_header(MessageHeaderKey.ORIGIN, "?")
if not self.monitors:
self.logger.warning(f"got subnet heartbeat from {origin} but no monitors")
return
payload = request.payload
assert isinstance(payload, dict)
subnet_id = payload.get("subnet_id", "")
m = self.monitors.get(subnet_id)
if not m:
self.logger.warning(f"got subnet heartbeat from {origin} for subnet_id {subnet_id} but no monitor")
return
assert isinstance(m, SubnetMonitor)
member = m.members.get(origin)
if not member:
self.logger.warning(f"got subnet heartbeat from {origin} for subnet_id {subnet_id} but it's not a member")
return
m.put_member_online(member)
def _do_stop(self, request: Message) -> Union[None, Message]:
self.stop()
return None
def _do_stop_cell(self, request: Message) -> Union[None, Message]:
self.stop()
return Message()
def _do_route(self, request: Message) -> Union[None, Message]:
return Message(payload=dict(request.headers))
def _do_start_route(self, request: Message) -> Union[None, Message]:
target_fqcn = request.payload
err = FQCN.validate(target_fqcn)
if err:
return make_reply(ReturnCode.PROCESS_EXCEPTION, f"bad target fqcn {err}")
assert isinstance(target_fqcn, str)
reply_headers, req_headers = self.get_route_info(target_fqcn)
return Message(payload={"request": dict(req_headers), "reply": dict(reply_headers)})
def _do_peers(self, request: Message) -> Union[None, Message]:
return Message(payload=list(self.cell.agents.keys()))
[docs] def get_peers(self, target_fqcn: str) -> (Union[None, dict], List[str]):
reply = self.cell.send_request(
channel=_CHANNEL, topic=_TOPIC_PEERS, target=target_fqcn, timeout=1.0, request=Message()
)
err = ""
rc = reply.get_header(MessageHeaderKey.RETURN_CODE)
if rc == ReturnCode.OK:
result = reply.payload
if not isinstance(result, list):
err = f"reply payload should be list but got {type(reply.payload)}"
result = None
else:
result = None
err = f"return code: {rc}"
if err:
return {"error": err, "reply": reply.headers}, None
else:
return None, result
@staticmethod
def _connector_info(info: ConnectorData) -> dict:
return {"url": info.connect_url, "handle": info.handle, "type": "connector" if info.active else "listener"}
def _get_connectors(self) -> dict:
cell = self.cell
result = {}
if cell.int_listener:
result["int_listener"] = self._connector_info(cell.int_listener)
if cell.ext_listeners:
listeners = [self._connector_info(x) for _, x in cell.ext_listeners.items()]
result["ext_listeners"] = listeners
if cell.bb_ext_connector:
result["bb_ext_connector"] = self._connector_info(cell.bb_ext_connector)
if cell.bb_int_connector:
result["bb_int_connector"] = self._connector_info(cell.bb_int_connector)
if cell.adhoc_connectors:
conns = {}
for k, v in cell.adhoc_connectors.items():
conns[k] = self._connector_info(v)
result["adhoc_connectors"] = conns
return result
def _do_connectors(self, request: Message) -> Union[None, Message]:
return Message(payload=self._get_connectors())
[docs] def get_connectors(self, target_fqcn: str) -> (dict, dict):
reply = self.cell.send_request(
channel=_CHANNEL, topic=_TOPIC_CONNS, target=target_fqcn, timeout=1.0, request=Message()
)
rc = reply.get_header(MessageHeaderKey.RETURN_CODE)
if rc == ReturnCode.OK:
result = reply.payload
if not isinstance(result, dict):
return {
"error": f"reply payload should be dict but got {type(reply.payload)}",
"reply": reply.headers,
}, {}
if not result:
return {}, {}
else:
return {}, result
else:
return {"error": "processing error", "reply": reply.headers}, {}
[docs] def request_cells_info(self) -> (str, List[str]):
result = [self.cell.get_fqcn()]
err = ""
replies = self._broadcast_to_subs(topic=_TOPIC_CELLS)
for t, r in replies.items():
assert isinstance(r, Message)
rc = r.get_header(MessageHeaderKey.RETURN_CODE)
if rc == ReturnCode.OK:
sub_result = r.payload
result.extend(sub_result)
else:
err = f"no reply from {t}: {rc}"
result.append(err)
return err, result
def _get_url_use_of_cell(self, url: str):
cell = self.cell
if cell.int_listener and cell.int_listener.connect_url == url:
return "int_listen"
if cell.ext_listeners:
for k in cell.ext_listeners.keys():
if k == url:
return "ext_listen"
if cell.bb_ext_connector and cell.bb_ext_connector.connect_url == url:
return "bb_ext_connect"
if cell.bb_int_connector and cell.bb_int_connector.connect_url == url:
return "int_connect"
if cell.adhoc_connectors:
for _, h in cell.adhoc_connectors.items():
if h.connect_url == url:
return "adhoc_connect"
return "none"
[docs] def get_url_use(self, url) -> dict:
result = {self.cell.get_fqcn(): self._get_url_use_of_cell(url)}
replies = self._broadcast_to_subs(topic=_TOPIC_URL_USE, message=Message(payload=url))
for t, r in replies.items():
assert isinstance(r, Message)
rc = r.get_header(MessageHeaderKey.RETURN_CODE)
if rc == ReturnCode.OK:
if not isinstance(r.payload, dict):
result[t] = f"bad reply type {type(r.payload)}"
else:
result.update(r.payload)
else:
result[t] = f"error {rc}"
return result
def _do_url_use(self, request: Message) -> Union[None, Message]:
results = self.get_url_use(request.payload)
return Message(payload=results)
[docs] def get_route_info(self, target_fqcn: str) -> (dict, dict):
reply = self.cell.send_request(
channel=_CHANNEL, topic=_TOPIC_ROUTE, target=target_fqcn, timeout=1.0, request=Message()
)
reply_headers = reply.headers
rc = reply.get_header(MessageHeaderKey.RETURN_CODE, ReturnCode.OK)
if rc == ReturnCode.OK:
if not isinstance(reply.payload, dict):
return reply_headers, {"error": f"reply payload got {type(reply.payload)}"}
return reply_headers, reply.payload
else:
return reply_headers, {"error": f"Reply ReturnCode: {rc}"}
[docs] def start_route(self, from_fqcn: str, target_fqcn: str) -> (str, dict, dict):
err = ""
reply_headers = {}
req_headers = {}
reply = self.cell.send_request(
channel=_CHANNEL,
topic=_TOPIC_START_ROUTE,
target=from_fqcn,
timeout=1.0,
request=Message(payload=target_fqcn),
)
rc = reply.get_header(MessageHeaderKey.RETURN_CODE)
if rc == ReturnCode.OK:
result = reply.payload
if not isinstance(result, dict):
err = f"reply payload should be dict but got {type(reply.payload)}"
else:
reply_headers = result.get("reply")
req_headers = result.get("request")
else:
err = f"error in reply {rc}"
reply_headers = reply.headers
return err, reply_headers, req_headers
def _do_report_cells(self, request: Message) -> Union[None, Message]:
_, results = self.request_cells_info()
return Message(payload=results)
[docs] def stop(self):
# ask all children to stop
self._broadcast_to_subs(topic=_TOPIC_STOP, timeout=0.0)
self.close()
[docs] def stop_cell(self, target: str) -> str:
# if self.cell.get_fqcn() == target:
# self.stop()
# return ReturnCode.OK
reply = self.cell.send_request(
channel=_CHANNEL, topic=_TOPIC_STOP_CELL, request=Message(), target=target, timeout=1.0
)
rc = reply.get_header(MessageHeaderKey.RETURN_CODE)
return rc
def _request_speed_test(self, target_fqcn: str, num, size) -> Message:
start = time.perf_counter()
payload = bytes(_ONE_K * size)
payload_size = len(payload)
h = hashlib.md5(payload)
dig1 = h.digest()
end = time.perf_counter()
payload_prep_time = end - start
errs = 0
timeouts = 0
comm_errs = 0
proc_errs = 0
size_errs = 0
start = time.perf_counter()
for i in range(num):
r = self.cell.send_request(
channel=_CHANNEL,
topic=_TOPIC_ECHO,
target=target_fqcn,
request=Message(payload=payload),
timeout=10.0,
)
rc = r.get_header(MessageHeaderKey.RETURN_CODE, ReturnCode.OK)
if rc == ReturnCode.OK:
if len(r.payload) != payload_size:
self.cell.logger.error(
f"{self.cell.get_fqcn()}: expect {payload_size} bytes but received {len(r.payload)}"
)
proc_errs += 1
else:
h = hashlib.md5(r.payload)
dig2 = h.digest()
if dig1 != dig2:
self.cell.logger.error(f"{self.cell.get_fqcn()}: digest mismatch!")
proc_errs += 1
elif rc == ReturnCode.TIMEOUT:
timeouts += 1
elif rc == ReturnCode.COMM_ERROR:
comm_errs += 1
elif rc == ReturnCode.MSG_TOO_BIG:
size_errs += 1
else:
errs += 1
end = time.perf_counter()
total = end - start
avg = total / num
return Message(
payload={
"test": f"{size:,}KB {num} rounds between {self.cell.get_fqcn()} and {target_fqcn}",
"prep": payload_prep_time,
"timeouts": timeouts,
"comm_errors": comm_errs,
"size_errors": size_errs,
"proc_errors": proc_errs,
"other_errors": errs,
"total": total,
"average": avg,
}
)
def _do_speed(self, request: Message) -> Union[None, Message]:
params = request.payload
if not isinstance(params, dict):
return make_reply(ReturnCode.INVALID_REQUEST, f"request body must be dict but got {type(params)}")
to_fqcn = params.get("to")
if not to_fqcn:
return make_reply(ReturnCode.INVALID_REQUEST, "missing 'to' param in request")
err = FQCN.validate(to_fqcn)
if err:
return make_reply(ReturnCode.INVALID_REQUEST, f"bad target FQCN: {err}")
num = params.get("num", 100)
size = params.get("size", 1000)
if size <= 0:
size = 1000
if num <= 0:
num = 100
return self._request_speed_test(to_fqcn, num, size)
def _do_echo(self, request: Message) -> Union[None, Message]:
return Message(payload=request.payload)
def _do_stress_test(self, params):
if not isinstance(params, dict):
return {"error": f"bad params - expect dict but got {type(params)}"}
targets = params.get("targets")
if not targets:
return {"error": "no targets specified"}
num_rounds = params.get("num")
if not num_rounds:
return {"error": "missing num of rounds"}
my_fqcn = self.cell.get_fqcn()
if my_fqcn in targets:
targets.remove(my_fqcn)
if not targets:
return {"error": "no targets to try"}
counts = {}
errors = {}
start = time.perf_counter()
for i in range(num_rounds):
payload = os.urandom(1024)
h = hashlib.md5(payload)
d1 = h.digest()
target = targets[random.randrange(len(targets))]
req = Message(payload=payload)
reply = self.cell.send_request(channel=_CHANNEL, topic=_TOPIC_ECHO, target=target, request=req, timeout=1.0)
if target not in counts:
counts[target] = 0
counts[target] += 1
rc = reply.get_header(MessageHeaderKey.RETURN_CODE, ReturnCode.OK)
if rc != ReturnCode.OK:
self.cell.logger.error(f"{self.cell.get_fqcn()}: return code from {target}: {rc}")
if target not in errors:
errors[target] = 0
errors[target] += 1
else:
h = hashlib.md5(reply.payload)
d2 = h.digest()
if d1 != d2:
self.cell.logger.error(f"{self.cell.get_fqcn()}: digest mismatch from {target}")
if target not in errors:
errors[target] = 0
errors[target] += 1
end = time.perf_counter()
return {"counts": counts, "errors": errors, "time": end - start}
def _do_stress(self, request: Message) -> Union[None, Message]:
params = request.payload
result = self._do_stress_test(params)
return Message(payload=result)
[docs] def start_stress_test(self, targets: list, num_rounds=10, timeout=5.0):
self.cell.logger.info(f"{self.cell.get_fqcn()}: starting stress test on {targets}")
result = {}
payload = {"targets": targets, "num": num_rounds}
msg_targets = [x for x in targets]
my_fqcn = self.cell.get_fqcn()
if my_fqcn in msg_targets:
msg_targets.remove(my_fqcn)
if not msg_targets:
return {"error": "no targets for stress test"}
replies = self.cell.broadcast_request(
channel=_CHANNEL,
topic=_TOPIC_STRESS,
targets=msg_targets,
request=Message(payload=payload),
timeout=timeout,
)
for t, r in replies.items():
rc = r.get_header(MessageHeaderKey.RETURN_CODE, ReturnCode.OK)
if rc != ReturnCode.OK:
result[t] = f"RC={rc}"
else:
result[t] = r.payload
return result
[docs] def speed_test(self, from_fqcn: str, to_fqcn: str, num_tries, payload_size) -> dict:
err = FQCN.validate(from_fqcn)
if err:
return {"error": f"invalid from_fqcn {from_fqcn}: {err}"}
err = FQCN.validate(to_fqcn)
if err:
return {"error": f"invalid to_fqcn {to_fqcn}: {err}"}
result = {}
start = time.perf_counter()
reply = self.cell.send_request(
channel=_CHANNEL,
topic=_TOPIC_SPEED,
request=Message(payload={"to": to_fqcn, "num": num_tries, "size": payload_size}),
target=from_fqcn,
timeout=100.0,
)
end = time.perf_counter()
result["test_time"] = end - start
rc = reply.get_header(MessageHeaderKey.RETURN_CODE, ReturnCode.OK)
if rc != ReturnCode.OK:
result.update({"error": f"return code {rc}"})
elif not isinstance(reply.payload, dict):
result.update({"error": f"bad reply: expect dict but got {type(reply.payload)}"})
else:
result.update(reply.payload)
return result
[docs] def change_root(self, new_root_url: str):
self._broadcast_to_subs(topic=_TOPIC_CHANGE_ROOT, message=Message(payload=new_root_url), timeout=0.0)
def _do_change_root(self, request: Message) -> Union[None, Message]:
new_root_url = request.payload
assert isinstance(new_root_url, str)
self.change_root(new_root_url)
if self.change_root_cb is not None:
self.change_root_cb(new_root_url)
return None
[docs] def start_bulk_test(self, targets: list, size: int):
self.cell.logger.info(f"{self.cell.get_fqcn()}: starting bulk test on {targets}")
msg_targets = [x for x in targets]
my_fqcn = self.cell.get_fqcn()
if my_fqcn in msg_targets:
msg_targets.remove(my_fqcn)
if not msg_targets:
return {"error": "no targets for bulk test"}
result = {}
replies = self.cell.broadcast_request(
channel=_CHANNEL,
topic=_TOPIC_BULK_TEST,
targets=msg_targets,
request=Message(payload=size),
timeout=1.0,
)
for t, r in replies.items():
rc = r.get_header(MessageHeaderKey.RETURN_CODE, ReturnCode.OK)
if rc != ReturnCode.OK:
result[t] = f"RC={rc}"
else:
result[t] = r.payload
return result
def _do_bulk_test(self, request: Message) -> Union[None, Message]:
size = request.payload
assert isinstance(size, int)
nums = []
for _ in range(size):
num = random.randint(0, 100)
nums.append(num)
msg = Message(payload=num)
self.cell.queue_message(
channel=_CHANNEL,
topic=_TOPIC_BULK_ITEM,
targets=FQCN.ROOT_SERVER,
message=msg,
)
return Message(payload=f"queued: {nums}")
def _do_bulk_item(self, request: Message) -> Union[None, Message]:
num = request.payload
origin = request.get_header(MessageHeaderKey.ORIGIN)
self.cell.logger.info(f"{self.cell.get_fqcn()}: got {num} from {origin}")
return None
[docs] def get_msg_stats_table(self, target: str, mode: str):
reply = self.cell.send_request(
channel=_CHANNEL,
topic=_TOPIC_MSG_STATS,
request=Message(payload={"mode": mode}),
timeout=1.0,
target=target,
)
rc = reply.get_header(MessageHeaderKey.RETURN_CODE)
if rc != ReturnCode.OK:
return f"error: {rc}"
return reply.payload
def _do_msg_stats(self, request: Message) -> Union[None, Message]:
p = request.payload
assert isinstance(p, dict)
mode = p.get("mode")
headers, rows = self.cell.msg_stats_pool.get_table(mode)
reply = {"headers": headers, "rows": rows}
return Message(payload=reply)
[docs] def get_pool_list(self, target: str):
reply = self.cell.send_request(
channel=_CHANNEL, topic=_TOPIC_LIST_POOLS, request=Message(), timeout=1.0, target=target
)
rc = reply.get_header(MessageHeaderKey.RETURN_CODE)
err = reply.get_header(MessageHeaderKey.ERROR, "")
if rc != ReturnCode.OK:
return f"{rc}: {err}"
return reply.payload
def _do_list_pools(self, request: Message) -> Union[None, Message]:
headers, rows = StatsPoolManager.get_table()
reply = {"headers": headers, "rows": rows}
return Message(payload=reply)
[docs] def show_pool(self, target: str, pool_name: str, mode: str):
reply = self.cell.send_request(
channel=_CHANNEL,
topic=_TOPIC_SHOW_POOL,
request=Message(payload={"mode": mode, "pool": pool_name}),
timeout=1.0,
target=target,
)
rc = reply.get_header(MessageHeaderKey.RETURN_CODE)
if rc != ReturnCode.OK:
err = reply.get_header(MessageHeaderKey.ERROR, "")
return f"{rc}: {err}"
return reply.payload
def _do_show_pool(self, request: Message) -> Union[None, Message]:
p = request.payload
assert isinstance(p, dict)
pool_name = p.get("pool", "")
mode = p.get("mode", "")
pool = StatsPoolManager.get_pool(pool_name)
if not pool:
return Message(
headers={
MessageHeaderKey.RETURN_CODE: ReturnCode.INVALID_REQUEST,
MessageHeaderKey.ERROR: f"unknown pool '{pool_name}'",
}
)
headers, rows = pool.get_table(mode)
reply = {"headers": headers, "rows": rows}
return Message(payload=reply)
[docs] def get_comm_config(self, target: str):
reply = self.cell.send_request(
channel=_CHANNEL, topic=_TOPIC_COMM_CONFIG, request=Message(), timeout=1.0, target=target
)
rc = reply.get_header(MessageHeaderKey.RETURN_CODE)
if rc != ReturnCode.OK:
err = reply.get_header(MessageHeaderKey.ERROR, "")
return f"{rc}: {err}"
return reply.payload
[docs] def get_config_vars(self, target: str):
reply = self.cell.send_request(
channel=_CHANNEL, topic=_TOPIC_CONFIG_VARS, request=Message(), timeout=1.0, target=target
)
rc = reply.get_header(MessageHeaderKey.RETURN_CODE)
if rc != ReturnCode.OK:
err = reply.get_header(MessageHeaderKey.ERROR, "")
return f"{rc}: {err}"
return reply.payload
[docs] def get_process_info(self, target: str):
reply = self.cell.send_request(
channel=_CHANNEL, topic=_TOPIC_PROCESS_INFO, request=Message(), timeout=1.0, target=target
)
rc = reply.get_header(MessageHeaderKey.RETURN_CODE)
if rc != ReturnCode.OK:
err = reply.get_header(MessageHeaderKey.ERROR, "")
return f"{rc}: {err}"
return reply.payload
def _do_comm_config(self, request: Message) -> Union[None, Message]:
info = self.cell.connector_manager.get_config_info()
return Message(payload=info)
def _do_config_vars(self, request: Message) -> Union[None, Message]:
info = ConfigService.get_var_values()
return Message(payload=info)
def _do_process_info(self, request: Message) -> Union[None, Message]:
usage = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
rows = [
["Process ID", str(os.getpid())],
["Memory Usage", str(usage)],
["Thread Count", str(threading.active_count())],
]
for thread in threading.enumerate():
rows.append([f"Thread:{thread.ident}", thread.name])
return Message(payload={"headers": ["Resource", "Value"], "rows": rows})
def _broadcast_to_subs(self, topic: str, message=None, timeout=1.0):
if not message:
message = Message()
children, clients = self.cell.get_sub_cell_names()
targets = []
targets.extend(children)
targets.extend(clients)
if targets:
if timeout > 0.0:
if self.cell.my_info.is_root and self.cell.my_info.is_on_server:
timeout = timeout + 0.1
else:
timeout = timeout / self.cell.my_info.gen
return self.cell.broadcast_request(
channel=_CHANNEL, topic=topic, targets=targets, request=message, timeout=timeout
)
else:
self.cell.fire_and_forget(channel=_CHANNEL, topic=topic, targets=targets, message=message)
return {}