Source code for nvflare.fuel.f3.cellnet.connector_manager

# Copyright (c) 2023, NVIDIA CORPORATION.  All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import os
import time
from typing import Union

from nvflare.fuel.common.excepts import ConfigError
from nvflare.fuel.f3.cellnet.defs import ConnectorRequirementKey
from nvflare.fuel.f3.cellnet.fqcn import FqcnInfo
from nvflare.fuel.f3.comm_config import CommConfigurator
from nvflare.fuel.f3.communicator import CommError, Communicator, Mode
from nvflare.security.logging import secure_format_exception, secure_format_traceback

_KEY_RESOURCES = "resources"
_KEY_INT = "internal"
_KEY_ADHOC = "adhoc"
_KEY_SCHEME = "scheme"
_KEY_HOST = "host"
_KEY_PORTS = "ports"


class _Defaults:

    ALLOW_ADHOC_CONNECTIONS = False
    SCHEME_FOR_INTERNAL_CONNECTIONS = "tcp"
    SCHEME_FOR_ADHOC_CONNECTIONS = "tcp"


[docs]class ConnectorData: def __init__(self, handle, connect_url: str, active: bool): self.handle = handle self.connect_url = connect_url self.active = active
[docs] def get_connection_url(self): return self.connect_url
[docs]class ConnectorManager: """ Manages creation of connectors """ def __init__(self, communicator: Communicator, secure: bool, comm_configurator: CommConfigurator): self._name = self.__class__.__name__ self.logger = logging.getLogger(self._name) self.communicator = communicator self.secure = secure self.bb_conn_gen = comm_configurator.get_backbone_connection_generation(2) # set up default drivers self.int_scheme = comm_configurator.get_internal_connection_scheme(_Defaults.SCHEME_FOR_INTERNAL_CONNECTIONS) self.int_resources = { _KEY_HOST: "localhost", } self.adhoc_allowed = comm_configurator.allow_adhoc_connections(_Defaults.ALLOW_ADHOC_CONNECTIONS) self.adhoc_scheme = comm_configurator.get_adhoc_connection_scheme(_Defaults.SCHEME_FOR_ADHOC_CONNECTIONS) self.adhoc_resources = {} # load config if any comm_config = comm_configurator.get_config() if comm_config: int_conf = self._validate_conn_config(comm_config, _KEY_INT) if int_conf: self.int_scheme = int_conf.get(_KEY_SCHEME) self.int_resources = int_conf.get(_KEY_RESOURCES) adhoc_conf = self._validate_conn_config(comm_config, _KEY_ADHOC) if adhoc_conf: self.adhoc_scheme = adhoc_conf.get(_KEY_SCHEME) self.adhoc_resources = adhoc_conf.get(_KEY_RESOURCES) self.logger.debug(f"internal scheme={self.int_scheme}, resources={self.int_resources}") self.logger.debug(f"adhoc scheme={self.adhoc_scheme}, resources={self.adhoc_resources}") self.comm_config = comm_config
[docs] def get_config_info(self): return { "allow_adhoc": self.adhoc_allowed, "adhoc_scheme": self.adhoc_scheme, "adhoc_resources": self.adhoc_resources, "internal_scheme": self.int_scheme, "internal_resources": self.int_resources, "config": self.comm_config if self.comm_config else "none", }
[docs] def should_connect_to_server(self, fqcn_info: FqcnInfo) -> bool: if fqcn_info.gen == 1: return True if self.comm_config: bb_config = self.comm_config.get("backbone") if bb_config: gens = bb_config.get("connect_generation") if gens: if isinstance(gens, list): return fqcn_info.gen in gens else: return fqcn_info.gen == gens # use default policy return fqcn_info.gen <= self.bb_conn_gen
[docs] def is_adhoc_allowed(self, c1: FqcnInfo, c2: FqcnInfo) -> bool: """ Is ad-hoc connection allowed between the two cells? Args: c1: FQCN info of cell one c2: FQCN info of cell two. c2 will offer listener if ad-hoc is allowed. Returns: whether ad-hoc connection is allowed between the two cells """ if not self.adhoc_allowed: return False if c1.root == c2.root: # same family return False return True
@staticmethod def _validate_conn_config(config: dict, key: str) -> Union[None, dict]: conn_config = config.get(key) if conn_config: if not isinstance(conn_config, dict): raise ConfigError(f"'{key}' must be dict but got {type(conn_config)}") scheme = conn_config.get(_KEY_SCHEME) if not scheme: raise ConfigError(f"missing '{_KEY_SCHEME}' in {key} config") resources = conn_config.get(_KEY_RESOURCES) if resources: if not isinstance(resources, dict): raise ConfigError(f"'{_KEY_RESOURCES}' in {key} must be dict but got {type(resources)}") return conn_config def _get_connector( self, url: str, active: bool, internal: bool, adhoc: bool, secure: bool ) -> Union[None, ConnectorData]: if active and not url: raise RuntimeError("url is required by not provided for active connector!") ssl_required = False if not adhoc: # backbone if not internal: # external if not url: raise RuntimeError("url is required but not provided for external backbone connector/listener!") scheme = self.adhoc_scheme resources = {} ssl_required = secure else: # internal scheme = self.int_scheme resources = self.int_resources else: # ad-hoc - must be external if internal: raise RuntimeError("internal ad-hoc connector not supported") scheme = self.adhoc_scheme resources = self.adhoc_resources self.logger.debug( f"{os.getpid()}: creating ad-hoc external listener: " f"active={active} scheme={scheme}, resources={resources}" ) if not active and not self.adhoc_allowed: # ad-hoc listener is not allowed! return None reqs = {ConnectorRequirementKey.SECURE: ssl_required} if url: reqs[ConnectorRequirementKey.URL] = url reqs.update(resources) try: if active: handle = self.communicator.add_connector(url, Mode.ACTIVE, ssl_required) connect_url = url elif url: handle = self.communicator.add_connector(url, Mode.PASSIVE, ssl_required) connect_url = url else: self.logger.info(f"{os.getpid()}: Try start_listener Listener resources: {reqs}") handle, connect_url = self.communicator.start_listener(scheme, reqs) self.logger.debug(f"{os.getpid()}: ############ dynamic listener at {connect_url}") # Kludge: to wait for listener ready and avoid race time.sleep(0.5) return ConnectorData(handle, connect_url, active) except CommError as ex: self.logger.error(f"Failed to get connector: {secure_format_exception(ex)}") return None except Exception as ex: self.logger.error(f"Unexpected exception: {secure_format_exception(ex)}") self.logger.error(secure_format_traceback()) return None
[docs] def get_external_listener(self, url: str, adhoc: bool) -> Union[None, ConnectorData]: """ Try to get an external listener. Args: url: adhoc: """ return self._get_connector(url=url, active=False, internal=False, adhoc=adhoc, secure=self.secure)
[docs] def get_external_connector(self, url: str, adhoc: bool) -> Union[None, ConnectorData]: """ Try to get an external listener. Args: url: adhoc: """ return self._get_connector(url=url, active=True, internal=False, adhoc=adhoc, secure=self.secure)
[docs] def get_internal_listener(self) -> Union[None, ConnectorData]: """ Try to get an internal listener. """ return self._get_connector(url="", active=False, internal=True, adhoc=False, secure=False)
[docs] def get_internal_connector(self, url: str) -> Union[None, ConnectorData]: """ Try to get an internal listener. Args: url: """ return self._get_connector(url=url, active=True, internal=True, adhoc=False, secure=False)