nvflare.private.fed.server.server_engine module

class ServerEngine(server, args, client_manager: ClientManager, snapshot_persistor, workers=3)[source]

Bases: ServerEngineInternalSpec, StreamableEngine

Server engine.

Parameters:
  • server – server

  • args – arguments

  • client_manager (ClientManager) – client manager.

  • workers – number of worker threads.

abort_app_on_clients(clients: List[str]) str[source]

Abort the application on the specified clients.

abort_app_on_server(job_id: str, turn_to_cold: bool = False) str[source]

Abort the application on the server.

add_component(component_id: str, component)[source]

Add a component into the system.

Parameters:
  • component_id – component ID

  • component – component object

Returns:

ask_to_stop()[source]

Ask the engine to stop the current run.

build_component(config_dict)[source]

Build a component from the config_dict.

Parameters:

config_dict – configuration.

cancel_client_resources(resource_check_results: Dict[str, Tuple[bool, str]], resource_reqs: Dict[str, dict], fl_ctx: FLContext)[source]

Cancels the request resources for the job.

Parameters:
  • resource_check_results – A dict of {client_name: client_check_result} where client_check_result is a tuple of (is_resource_enough, resource reserve token if any)

  • resource_reqs – A dict of {client_name: resource requirements dict}

  • fl_ctx – FLContext

check_app_start_readiness(job_id: str) str[source]

Check whether the app is ready to start.

Returns:

An error message. An empty string if successful.

check_client_resources(job: Job, resource_reqs, fl_ctx: FLContext) Dict[str, Tuple[bool, str]][source]

Sends the check_client_resources requests to the clients.

Parameters:
  • job – job object

  • resource_reqs – A dict of {client_name: resource requirements dict}

  • fl_ctx – FLContext

Returns:

client_check_result}.

client_check_result is a tuple of (is_resource_enough, token); is_resource_enough is a bool indicates whether there is enough resources; token is for resource reservation / cancellation for this check request.

Return type:

A dict of {client_name

close()[source]
configure_job_log(job_id, data) str[source]
delete_job_id(num)[source]

Delete specified RUN.

The Engine must do status check before the run can be deleted. :param job_id: job id

Returns:

An error message. An empty string if successful.

deploy_app(job_id, src, dst)[source]
deploy_app_to_server(run_destination: str, app_name: str, app_staging_path: str) str[source]

Deploy the specified app to the server.

Copy the app folder tree from staging area to the server’s RUN area

Parameters:
  • job_id – job id of the app to be deployed

  • app_name – name of the app to be deployed

  • app_staging_path – the full path to the app folder in staging area

Returns:

An error message. An empty string if successful.

disable_clients(client_names: List[str]) dict[source]

Disable specified clients so they cannot rejoin until enabled.

Parameters:

client_names – client names to disable

Returns:

A result dictionary describing disabled clients.

dispatch(topic: str, request: Shareable, fl_ctx: FLContext) Shareable[source]
enable_clients(client_names: List[str]) dict[source]

Enable specified disabled clients so they can rejoin.

Parameters:

client_names – client names to enable

Returns:

A result dictionary describing enabled clients.

fire_event(event_type: str, fl_ctx: FLContext)[source]
get_app_data(app_name: str) Tuple[str, object][source]

Get data for deploying the app.

Parameters:

app_name – name of the app

Returns:

An error message. An empty string if successful.

get_app_run_info(job_id) RunInfo | None[source]

Gets the app RunInfo from the child process.

get_cell()[source]
get_client_from_name(client_name)[source]

Get the registered client from client_name.

Parameters:

client_name – client name

Returns: registered client

get_client_name_from_token(token: str) str[source]

Gets the client name from client login token.

Parameters:

token – client login token

Returns:

Client name

get_clients() [<class 'nvflare.apis.client.Client'>][source]
get_component(component_id: str) object[source]

Retrieve the system component from the engine.

Parameters:

component_id – component ID

Returns:

component object

get_engine_info() EngineInfo[source]

Get general info of the engine.

get_errors(job_id) dict[source]

Get the errors of the server components.

Parameters:

job_id – current job_id

Returns:

Server components errors.

get_job_clients(client_sites)[source]

To get the participating clients for the job

Parameters:

client_sites – clients with the dispatching info

Returns:

get_participating_clients()[source]
get_run_info() RunInfo | None[source]
get_staging_path_of_app(app_name: str) str[source]

Get the staging path of the app waiting to be deployed.

Parameters:

app_name (str) – application name

Returns:

The app’s folder path or empty string if the app doesn’t exist

get_widget(widget_id: str) Widget[source]

Get the widget with the specified ID.

Parameters:

widget_id – ID of the widget

Returns: the widget or None if not found

get_workspace() Workspace[source]
has_relays()[source]
initialize_comm(cell: Cell)[source]

This is called when the communication cell has been created. We will set up aux message handler here.

Parameters:

cell

Returns:

multicast_aux_requests(topic: str, target_requests: Dict[str, Shareable], timeout: float, fl_ctx: FLContext, optional: bool = False, secure: bool = False) dict[source]

Send requests to specified clients via the aux channel.

Implementation: simply calls the AuxRunner’s multicast_aux_requests method.

Parameters:
  • topic – topic of the request

  • target_requests – requests of the target clients. Different target can have different request.

  • timeout – amount of time to wait for responses. 0 means fire and forget.

  • fl_ctx – FL context

  • optional – whether this request is optional

  • secure – whether to send the aux request in P2P secure

Returns: a dict of replies (client name => reply Shareable)

new_context() FLContext[source]
notify_dead_job(job_id: str, client_name: str, reason: str)[source]
pause_server_jobs()[source]
persist_components(fl_ctx: FLContext, completed: bool)[source]

To persist the FL running components

Parameters:
  • fl_ctx – FLContext

  • completed – flag to indicate where the run is complete

Returns:

register_app_command(topic: str, cmd_func, *args, **kwargs)[source]

Register app command handler.

Parameters:
  • topic – topic of the command to be handled

  • cmd_func – the function to handle the app command

  • *args – optional args to be passed to the cmd_func

  • **kwargs – optional kwargs to be passed to the cmd_func

Returns: None

register_aux_message_handler(topic: str, message_handle_func)[source]

Register aux message handling function with specified topics.

Exception is raised when:

a handler is already registered for the topic; bad topic - must be a non-empty string bad message_handle_func - must be callable

Implementation Note:

This method should simply call the ServerAuxRunner’s register_aux_message_handler method.

Parameters:
  • topic – the topic to be handled by the func

  • message_handle_func – the func to handle the message. Must follow aux_message_handle_func_signature.

register_stream_processing(channel: str, topic: str, factory: ConsumerFactory, stream_done_cb=None, consumed_cb=None, **cb_kwargs)[source]

Register a ConsumerFactory for specified app channel and topic. Once a new streaming request is received for the channel/topic, the registered factory will be used to create an ObjectConsumer object to handle the new stream.

Note: the factory should generate a new ObjectConsumer every time get_consumer() is called. This is because multiple streaming sessions could be going on at the same time. Each streaming session should have its own ObjectConsumer.

Parameters:
  • channel – app channel

  • topic – app topic

  • factory – the factory to be registered

  • stream_done_cb – the callback to be called when streaming is done on receiving side

Returns: None

remove_clients(clients: List[str]) str[source]

Remove active client-token entries so those clients can register again.

remove_custom_path()[source]
remove_exception_process(job_id)[source]
reset_errors(job_id) str[source]

Get the errors of the server components.

Parameters:

job_id – current job_id

Returns:

Server components errors.

restart_server() str[source]

Restart the server.

The engine should not exit right away. See shutdown_server.

Returns:

An error message. An empty string if successful.

restore_components(snapshot: RunSnapshot, fl_ctx: FLContext)[source]

To restore the FL components from the saved snapshot

Parameters:
  • snapshot – RunSnapshot

  • fl_ctx – FLContext

Returns:

send_app_command(job_id: str, topic: str, cmd_data, timeout: float) Shareable[source]
send_aux_request(targets: [], topic: str, request: Shareable, timeout: float, fl_ctx: FLContext, optional=False, secure=False) dict[source]

Send a request to specified clients via the aux channel.

Implementation: simply calls the AuxRunner’s send_aux_request method.

Parameters:
  • targets – target clients. None or empty list means all clients.

  • topic – topic of the request.

  • request – request to be sent

  • timeout – number of secs to wait for replies. 0 means fire-and-forget.

  • fl_ctx – FL context

  • optional – whether this message is optional

  • secure – send the aux request in a secure way

Returns: a dict of replies (client name => reply Shareable)

send_aux_to_targets(targets, topic, request, timeout, fl_ctx, optional, secure)[source]
send_command_to_child_runner_process(job_id: str, command_name: str, command_data, timeout=5.0, optional=False)[source]
set_configurator(conf: ServerJsonConfigurator)[source]

Set the configurator for server.

Parameters:

conf – A ServerJsonConfigurator object

set_job_runner(job_runner: JobRunner, job_manager: JobDefManagerSpec)[source]

Set the JobRunner for server.

Parameters:
  • job_runner – A JobRunner object

  • job_manager – A JobDefManagerSpec object

set_run_manager(run_manager: RunManager)[source]

Set the RunManager for server.

Parameters:

run_manager – A RunManager object

show_stats(job_id) dict[source]

Show_stats of the server.

Parameters:

job_id – current job_id

Returns:

Component stats of the server

shutdown_server() str[source]

Shutdown the server.

The engine should not exit right away. It should set its status to STOPPING, and set up a timer (in a different thread), and return from this call right away (if other restart conditions are met). When the timer fires, it exits. This would give the caller to process the feedback or clean up (e.g. admin cmd response).

Returns:

An error message. An empty string if successful.

shutdown_streamer()[source]

Shutdown the engine’s streamer.

Returns: None

start_app_on_server(fl_ctx: FLContext, job: Job | None = None, job_clients=None, snapshot=None) str[source]

Start the FL app on Server.

Returns:

An error message. An empty string if successful.

start_client_job(job, client_sites, fl_ctx: FLContext)[source]

To send the start client run commands to the clients

Parameters:
  • client_sites – client sites

  • job – job object

  • fl_ctx – FLContext

Returns:

stop_all_jobs()[source]
stream_objects(channel: str, topic: str, stream_ctx: dict, targets: List[str], producer: ObjectProducer, fl_ctx: FLContext, optional=False, secure=False)[source]

Send a stream of Shareable objects to receivers.

Parameters:
  • channel – the channel for this stream

  • topic – topic of the stream

  • stream_ctx – context of the stream

  • targets – receiving sites

  • producer – the ObjectProducer that can produces the stream of Shareable objects

  • fl_ctx – the FLContext object

  • optional – whether the stream is optional

  • secure – whether to use P2P security

Returns: result from the generator’s reply processing

sync_clients_from_main_process()[source]

To fetch the participating clients from the main parent process

Returns: clients

update_job_run_status()[source]

To update the job run status to parent process.

validate_targets(client_names: List[str]) Tuple[List[Client], List[str]][source]

Validate specified target names.

Parameters:

target_names – list of names to be validated

Returns: a list of validate targets and a list of invalid target names

wait_for_complete(workspace, job_id, process)[source]
server_shutdown(server, touch_file)[source]