nvflare.private.fed.server.server_engine module

class ClientConnection(client)[source]

Bases: object

recv()[source]
send(data)[source]
class ServerEngine(server, args, client_manager: ClientManager, snapshot_persistor, workers=3)[source]

Bases: ServerEngineInternalSpec

Server engine.

Parameters
  • server – server

  • args – arguments

  • client_manager (ClientManager) – client manager.

  • workers – number of worker threads.

abort_app_on_clients(clients: List[str]) str[source]

Abort the application on the specified clients.

abort_app_on_server(job_id: str) str[source]

Abort the application on the server.

ask_to_stop()[source]

Ask the engine to stop the current run.

aux_send(targets: [], topic: str, request: Shareable, timeout: float, fl_ctx: FLContext) dict[source]

Send a request to client(s) via the auxiliary channel.

Parameters
  • targets – list of Client or client names

  • topic – topic of the request

  • request – request to be sent

  • timeout – number of secs to wait for replies

  • fl_ctx – FL context

Returns

client_name => Shareable

Return type

A dict of replies

NOTE: when a reply is received, the peer_ctx props must be set into the PEER_PROPS header of the reply Shareable.

If a reply is not received from a client, do not put it into the reply dict.

build_component(config_dict)[source]

Build a component from the config_dict.

Parameters

config_dict – configuration.

cancel_client_resources(resource_check_results: Dict[str, Tuple[bool, str]], resource_reqs: Dict[str, dict])[source]

Cancels the request resources for the job.

Parameters
  • resource_check_results – A dict of {client_name: client_check_result} where client_check_result is a tuple of {client check OK, resource reserve token if any}

  • resource_reqs – A dict of {client_name: resource requirements dict}

check_app_start_readiness(job_id: str) str[source]

Check whether the app is ready to start.

Returns

An error message. An empty string if successful.

check_client_resources(resource_reqs) Dict[str, Tuple[bool, str]][source]

Sends the check_client_resources requests to the clients.

Parameters

resource_reqs – A dict of {client_name: resource requirements dict}

Returns

client_check_result} where client_check_result

is a tuple of {client check OK, resource reserve token if any}

Return type

A dict of {client_name

close()[source]
create_parent_connection(port)[source]
delete_job_id(num)[source]

Delete specified RUN.

The Engine must do status check before the run can be deleted. :param job_id: job id

Returns

An error message. An empty string if successful.

deploy_app(job_id, src, dst)[source]
deploy_app_to_server(run_destination: str, app_name: str, app_staging_path: str) str[source]

Deploy the specified app to the server.

Copy the app folder tree from staging area to the server’s RUN area

Parameters
  • job_id – job id of the app to be deployed

  • app_name – name of the app to be deployed

  • app_staging_path – the full path to the app folder in staging area

Returns

An error message. An empty string if successful.

dispatch(topic: str, request: Shareable, fl_ctx: FLContext) Shareable[source]
fire_event(event_type: str, fl_ctx: FLContext)[source]
get_all_clients()[source]
get_app_data(app_name: str) Tuple[str, object][source]

Get data for deploying the app.

Parameters

app_name – name of the app

Returns

An error message. An empty string if successful.

get_app_run_info(job_id) RunInfo[source]

Get the app RunInfo from the child process.

Returns

App RunInfo

get_client_from_name(client_name)[source]

Get the registered client token from client_name.

Parameters

client_name – client name

Returns: registered client

get_client_name_from_token(token: str) str[source]

Get the registered client name from communication token.

Parameters

token – communication token

Returns

Client name

get_clients() [<class 'nvflare.apis.client.Client'>][source]
get_command_conn(job_id)[source]
get_component(component_id: str) object[source]
get_engine_info() EngineInfo[source]

Get general info of the engine.

get_errors(job_id)[source]

Get the errors of the server components.

Parameters

job_id – current job_id

Returns

Server components errors.

get_job_clients(client_sites)[source]

To get the participating clients for the job

Parameters

client_sites – clients with the dispatching info

Returns:

get_run_info() RunInfo[source]
get_staging_path_of_app(app_name: str) str[source]

Get the staging path of the app waiting to be deployed.

Parameters

app_name (str) – application name

Returns

The app’s folder path or empty string if the app doesn’t exist

get_widget(widget_id: str) Widget[source]

Get the widget with the specified ID.

Parameters

widget_id – ID of the widget

Returns: the widget or None if not found

get_workspace() Workspace[source]
heartbeat_to_parent()[source]
new_context() FLContext[source]
parent_aux_send(targets: [], topic: str, request: Shareable, timeout: float, fl_ctx: FLContext) dict[source]
persist_components(fl_ctx: FLContext, completed: bool)[source]

To persist the FL running components

Parameters
  • fl_ctx – FLContext

  • completed – flag to indicate where the run is complete

Returns:

register_aux_message_handler(topic: str, message_handle_func)[source]

Register aux message handling function with specified topics.

Exception is raised when:

a handler is already registered for the topic; bad topic - must be a non-empty string bad message_handle_func - must be callable

Implementation Note:

This method should simply call the ServerAuxRunner’s register_aux_message_handler method.

Parameters
  • topic – the topic to be handled by the func

  • message_handle_func – the func to handle the message. Must follow aux_message_handle_func_signature.

remove_clients(clients: List[str]) str[source]

Remove specified clients.

Parameters

clients – clients to be removed

Returns

An error message. An empty string if successful.

remove_custom_path()[source]
restart_server() str[source]

Restart the server.

The engine should not exit right away. See shutdown_server.

Returns

An error message. An empty string if successful.

restore_components(snapshot: RunSnapshot, fl_ctx: FLContext)[source]

To restore the FL components from the saved snapshot

Parameters
  • snapshot – RunSnapshot

  • fl_ctx – FLContext

Returns:

send_aux_request(targets: [], topic: str, request: Shareable, timeout: float, fl_ctx: FLContext) dict[source]

Send a request to specified clients via the aux channel.

Implementation: simply calls the ServerAuxRunner’s send_aux_request method.

Parameters
  • targets – target clients. None or empty list means all clients

  • topic – topic of the request

  • request – request to be sent

  • timeout – number of secs to wait for replies. 0 means fire-and-forget.

  • fl_ctx – FL context

Returns: a dict of replies (client name => reply Shareable)

set_configurator(conf: ServerJsonConfigurator)[source]

Set the configurator for server.

Parameters

conf – A ServerJsonConfigurator object

set_job_runner(job_runner: JobRunner, job_manager: JobDefManagerSpec)[source]

Set the JobRunner for server.

Parameters
  • job_runner – A JobRunner object

  • job_manager – A JobDefManagerSpec object

set_run_manager(run_manager: RunManager)[source]

Set the RunManager for server.

Parameters

run_manager – A RunManager object

show_stats(job_id)[source]

Show_stats of the server.

Parameters

job_id – current job_id

Returns

Component stats of the server

shutdown_server() str[source]

Shutdown the server.

The engine should not exit right away. It should set its status to STOPPING, and set up a timer (in a different thread), and return from this call right away (if other restart conditions are met). When the timer fires, it exits. This would give the caller to process the feedback or clean up (e.g. admin cmd response).

Returns

An error message. An empty string if successful.

start_app_on_server(run_number: str, job_id: Optional[str] = None, job_clients=None, snapshot=None) str[source]

Start the FL app on Server.

Returns

An error message. An empty string if successful.

start_client_job(job_id, client_sites)[source]

To send the start client run commands to the clients

Parameters
  • client_sites – client sites

  • job_id – job_id

Returns:

stop_all_jobs()[source]
sync_clients_from_main_process()[source]

To fetch the participating clients from the main parent process

Returns: clients

validate_clients(client_names: List[str]) Tuple[List[Client], List[str]][source]

Validate specified client names.

Parameters

client_names – list of names to be validated

Returns: a list of validate clients and a list of invalid client names

wait_for_complete(job_id)[source]
copy_new_server_properties(server, new_server)[source]
server_shutdown(server, touch_file)[source]
set_up_run_config(server, conf)[source]