nvflare.fuel.f3.cellnet.core_cell module

class CellAgent(fqcn: str, endpoint: Endpoint)[source]

Bases: object

A CellAgent represents a cell in another cell.

Parameters:

fqcn – FQCN of the cell represented

get_fqcn()[source]
class CertificateExchanger(core_cell, credential_manager: CredentialManager)[source]

Bases: object

This class handles cert-exchange messages

exchange_certificate(target: str) bytes[source]
get_certificate(target: str) bytes[source]
class CoreCell(fqcn: str, root_url: str, secure: bool, credentials: dict, create_internal_listener: bool = False, parent_url: str | None = None, max_timeout=3600, bulk_check_interval=0.5, bulk_process_interval=0.5, max_bulk_size=100)[source]

Bases: MessageReceiver, EndpointMonitor

Parameters:
  • fqcn – the Cell’s FQCN (Fully Qualified Cell Name)

  • credentials – credentials for secure connections

  • root_url – the URL for backbone external connection

  • secure – secure mode or not

  • max_timeout – default timeout for send_and_receive

  • create_internal_listener – whether to create an internal listener for child cells

  • parent_url – url for connecting to parent cell

FQCN is the names of all ancestor, concatenated with dots.

Note

Internal listener is automatically created for root cells.

Example:
    server.J12345       (the cell for job J12345 on the server)
    server              (the root cell of server)
    nih_1.J12345        (the cell for job J12345 on client_1's site)
    client_1.J12345.R0  (the cell for rank R0 of J12345 on client_1 site)
    client_1            (he root cell of client_1)
ALL_CELLS = {}
APP_ID = 1
ERR_TYPE_COMM = 'CommErr'
ERR_TYPE_MSG_TOO_BIG = 'MsgTooBig'
SUB_TYPE_CHILD = 1
SUB_TYPE_CLIENT = 2
SUB_TYPE_NONE = 0
add_error_handler(channel: str, topic: str, cb, *args, **kwargs)[source]
add_incoming_reply_filter(channel: str, topic: str, cb, *args, **kwargs)[source]
add_incoming_request_filter(channel: str, topic: str, cb, *args, **kwargs)[source]
add_outgoing_reply_filter(channel: str, topic: str, cb, *args, **kwargs)[source]
add_outgoing_request_filter(channel: str, topic: str, cb, *args, **kwargs)[source]
broadcast_multi_requests(target_msgs: Dict[str, TargetMessage], timeout=None, secure=False, optional=False) Dict[str, Message][source]

This is the core of the request/response handling. Be extremely careful when making any changes! To maximize the communication efficiency, we avoid the use of locks. We use a waiter implemented as a Python threading.Event object. We create the waiter, send out messages, set up default responses, and set it up to wait for response. Once the waiter is triggered from a reply-receiving thread, we process received results.

HOWEVER, if the network is extremely fast, the response may already be received even before we finish setting up the waiter in this thread!

We had a very mysterious bug that caused a request to be treated as timeout even though the reply is received. It was both threads try to set values to “waiter.replies”. In case of extremely fast network, the reply processing thread set the reply to “waiter.replies”, and then overwritten by this thread with a default timeout reply.

To avoid this kind of problems, we now use two sets of values in the waiter object. One set is for this thread: targets Another set is for the reply processing thread: received_replies, reply_time

Parameters:
  • target_msgs – messages to be sent

  • timeout – timeout value

  • secure – End-end encryption

  • optional – whether the message is optional

Returns: a dict of: target name => reply message

broadcast_request(channel: str, topic: str, targets: str | List[str], request: Message, timeout=None, secure=False, optional=False) Dict[str, Message][source]

Send a message over a channel to specified destination cell(s), and wait for reply

Parameters:
  • channel – channel for the message

  • topic – topic of the message

  • targets – FQCN of the destination cell(s)

  • request – message to be sent

  • timeout – how long to wait for replies

  • secure – End-end encryption

  • optional – whether the message is optional

Returns: a dict of: cell_id => reply message

change_server_root(to_url: str)[source]

Change to a different server url

Parameters:

to_url – the new url of the server root

Returns:

decrypt_payload(message: Message)[source]
drop_agents()[source]
drop_connectors()[source]
encrypt_payload(message: Message)[source]
fire_and_forget(channel: str, topic: str, targets: str | List[str], message: Message, secure=False, optional=False) Dict[str, str][source]

Send a message over a channel to specified destination cell(s), and do not wait for replies.

Parameters:
  • channel – channel for the message

  • topic – topic of the message

  • targets – one or more destination cell IDs. None means all.

  • message – message to be sent

  • secure – End-end encryption of the message

  • optional – whether the message is optional

Returns: None

fire_multi_requests_and_forget(target_msgs: Dict[str, TargetMessage], optional=False) Dict[str, str][source]
get_fqcn() str[source]
get_internal_listener_url() None | str[source]

Get the cell’s internal listener url.

This method should only be used for cells that need to have child cells. The url returned is to be passed to child of this cell to create connection

Returns: url for child cells to connect

get_root_url_for_child()[source]
get_sub_cell_names() Tuple[List[str], List[str]][source]

Get cell FQCNs of all subs, which are children or top-level client cells (if my cell is server).

Returns: fqcns of child cells, fqcns of top-level client cells

is_backbone_ready()[source]

Check if backbone is ready.

Backbone is the preconfigured network connections, like all the connections from clients to server. Adhoc connections are not part of the backbone.

is_cell_connected(target_fqcn: str) bool[source]
is_cell_reachable(target_fqcn: str, for_msg=None) bool[source]
is_secure()[source]
log_error(log_text: str, msg: None | Message, log_except=False)[source]
log_warning(log_text: str, msg: None | Message, log_except=False)[source]
make_internal_listener()[source]

Create the internal listener for child cells of this cell to connect to.

Returns:

process_message(endpoint: Endpoint, connection: Connection, app_id: int, message: Message)[source]
queue_message(channel: str, topic: str, targets: str | List[str], message: Message, optional=False)[source]
register_request_cb(channel: str, topic: str, cb, *args, **kwargs)[source]

Register a callback for handling request. The CB must follow request_cb_signature.

Parameters:
  • channel – the channel of the request

  • topic – topic of the request

  • cb

  • *args

  • **kwargs

Returns:

send_reply(reply: Message, to_cell: str, for_req_ids: List[str], secure=False, optional=False) str[source]

Send a reply to respond to one or more requests.

This is useful if the request receiver needs to delay its reply as follows:
  • When a request is received, if it’s not ready to reply (e.g. waiting for additional requests from other cells), simply remember the REQ_ID and returns None;

  • The receiver may queue up multiple such requests

  • When ready, call this method to send the reply for all the queued requests

Parameters:
  • reply – the reply message

  • to_cell – the target cell

  • for_req_ids – the list of req IDs that the reply is for

  • secure – End-end encryption

  • optional – whether the message is optional

Returns: an error message if any

send_request(channel: str, topic: str, target: str, request: Message, timeout=None, secure=False, optional=False) Message[source]
set_cell_connected_cb(cb, *args, **kwargs)[source]

Set a callback that is called when an external cell is connected.

Parameters:
  • cb – the callback function. It must follow the signature of cell_connected_cb_signature.

  • *args – args to be passed to the cb.

  • **kwargs – kwargs to be passed to the cb

Returns: None

set_cell_disconnected_cb(cb, *args, **kwargs)[source]

Set a callback that is called when an external cell is disconnected.

Parameters:
  • cb – the callback function. It must follow the signature of cell_disconnected_cb_signature.

  • *args – args to be passed to the cb.

  • **kwargs – kwargs to be passed to the cb

Returns: None

set_message_interceptor(cb, *args, **kwargs)[source]

Set a callback that is called when a message is received or forwarded.

Parameters:
  • cb – the callback function. It must follow the signature of message_interceptor_signature.

  • *args – args to be passed to the cb.

  • **kwargs – kwargs to be passed to the cb

Returns: None

start()[source]

Start the cell after it is fully set up (connectors and listeners are added, CBs are set up)

Returns:

state_change(endpoint: Endpoint)[source]
stop()[source]

Cleanup the cell. Once the cell is stopped, it won’t be able to send/receive messages.

Returns:

class TargetMessage(target: str, channel: str, topic: str, message: Message)[source]

Bases: object

static from_dict(d: dict)[source]
to_dict()[source]
log_messaging_error(logger, log_text: str, cell, msg: Message | None, log_except=False, log_level=40)[source]