nvflare.fuel.f3.sfm.conn_manager module

class ConnManager(local_endpoint: Endpoint)[source]

Bases: ConnMonitor

SFM connection manager The class is responsible for maintaining state of SFM connections and pumping data through them

add_connector(driver: Driver, params: dict, mode: Mode) str[source]
add_endpoint_monitor(monitor: EndpointMonitor)[source]
close_connection(connection: Connection)[source]
find_endpoint(name: str) Endpoint | None[source]
get_connections(name: str) List[SfmConnection] | None[source]
static get_dict_payload(prefix, frame)[source]
handle_new_connection(connection: Connection)[source]
loopback_message_task(endpoint: Endpoint, app_id: int, headers: dict | None, payload: bytes | bytearray | memoryview | list)[source]
notify_monitors(endpoint: Endpoint)[source]
process_frame(sfm_conn: SfmConnection, frame: bytes | bytearray | memoryview | list)[source]
process_frame_task(sfm_conn: SfmConnection, frame: bytes | bytearray | memoryview | list)[source]
register_message_receiver(app_id: int, receiver: MessageReceiver)[source]
remove_connector(handle: str)[source]
remove_endpoint(name: str)[source]
send_loopback_message(endpoint: Endpoint, app_id: int, headers: dict | None, payload: bytes | bytearray | memoryview | list)[source]

Send message to itself

send_message(endpoint: Endpoint, app_id: int, headers: dict | None, payload: bytes | bytearray | memoryview | list)[source]

Send a message to endpoint for app

The message is asynchronous, no response is expected.

Parameters:
  • endpoint – An endpoint to send the message to

  • app_id – Application ID

  • headers – headers, optional

  • payload – message payload, optional

Raises:

CommError – If any error happens while sending the data

start()[source]
start_connector(connector: ConnectorInfo)[source]

Start connector in a new thread

static start_connector_task(connector: ConnectorInfo)[source]

Start connector in a new thread This function will loop as long as connector is not stopped

state_change(connection: Connection)[source]

Driver state change notification, including new connections

Parameters:

connection – The connection that state has changed

Raises:

CommError – If any error happens while processing the frame

stop()[source]
update_endpoint(sfm_conn: SfmConnection, data: dict)[source]
class NullConnection[source]

Bases: Connection

A mock connection used for loopback messages

close()[source]

Close connection

Raises:

CommError – If any errors

get_conn_properties() dict[source]

Get connection specific properties, like peer address, TLS certificate etc

Raises:

CommError – If any errors

send_frame(frame: bytes | bytearray | memoryview | list)[source]

Send a SFM frame through the connection to the remote endpoint.

Parameters:

frame – The frame to be sent

Raises:

CommError – If any error happens while sending the frame

class SfmFrameReceiver(conn_manager: ConnManager, conn: SfmConnection)[source]

Bases: FrameReceiver

process_frame(frame: bytes | bytearray | memoryview | list)[source]

Frame received callback

Parameters:

frame – The frame received

Raises:

CommError – If any error happens while processing the frame

get_handle()[source]