nvflare.private.fed.server.server_engine module¶
- class ServerEngine(server, args, client_manager: ClientManager, snapshot_persistor, workers=3)[source]¶
Bases:
ServerEngineInternalSpec
Server engine.
- Parameters
server – server
args – arguments
client_manager (ClientManager) – client manager.
workers – number of worker threads.
- abort_app_on_clients(clients: List[str]) str [source]¶
Abort the application on the specified clients.
- aux_send(targets: [], topic: str, request: Shareable, timeout: float, fl_ctx: FLContext) dict [source]¶
Send a request to client(s) via the auxiliary channel.
- Parameters
targets – list of Client or client names
topic – topic of the request
request – request to be sent
timeout – number of secs to wait for replies
fl_ctx – FL context
- Returns
client_name => Shareable
- Return type
A dict of replies
NOTE: when a reply is received, the peer_ctx props must be set into the PEER_PROPS header of the reply Shareable.
If a reply is not received from a client, do not put it into the reply dict.
- build_component(config_dict)[source]¶
Build a component from the config_dict.
- Parameters
config_dict – configuration.
- cancel_client_resources(resource_check_results: Dict[str, Tuple[bool, str]], resource_reqs: Dict[str, dict])[source]¶
Cancels the request resources for the job.
- Parameters
resource_check_results – A dict of {client_name: client_check_result} where client_check_result is a tuple of {client check OK, resource reserve token if any}
resource_reqs – A dict of {client_name: resource requirements dict}
- check_app_start_readiness(job_id: str) str [source]¶
Check whether the app is ready to start.
- Returns
An error message. An empty string if successful.
- check_client_resources(resource_reqs) Dict[str, Tuple[bool, str]] [source]¶
Sends the check_client_resources requests to the clients.
- Parameters
resource_reqs – A dict of {client_name: resource requirements dict}
- Returns
- client_check_result} where client_check_result
is a tuple of {client check OK, resource reserve token if any}
- Return type
A dict of {client_name
- delete_job_id(num)[source]¶
Delete specified RUN.
The Engine must do status check before the run can be deleted. :param job_id: job id
- Returns
An error message. An empty string if successful.
- deploy_app_to_server(run_destination: str, app_name: str, app_staging_path: str) str [source]¶
Deploy the specified app to the server.
Copy the app folder tree from staging area to the server’s RUN area
- Parameters
job_id – job id of the app to be deployed
app_name – name of the app to be deployed
app_staging_path – the full path to the app folder in staging area
- Returns
An error message. An empty string if successful.
- get_app_data(app_name: str) Tuple[str, object] [source]¶
Get data for deploying the app.
- Parameters
app_name – name of the app
- Returns
An error message. An empty string if successful.
- get_app_run_info(job_id) RunInfo [source]¶
Get the app RunInfo from the child process.
- Returns
App RunInfo
- get_client_from_name(client_name)[source]¶
Get the registered client token from client_name.
- Parameters
client_name – client name
Returns: registered client
- get_client_name_from_token(token: str) str [source]¶
Get the registered client name from communication token.
- Parameters
token – communication token
- Returns
Client name
- get_engine_info() EngineInfo [source]¶
Get general info of the engine.
- get_errors(job_id)[source]¶
Get the errors of the server components.
- Parameters
job_id – current job_id
- Returns
Server components errors.
- get_job_clients(client_sites)[source]¶
To get the participating clients for the job
- Parameters
client_sites – clients with the dispatching info
Returns:
- get_staging_path_of_app(app_name: str) str [source]¶
Get the staging path of the app waiting to be deployed.
- Parameters
app_name (str) – application name
- Returns
The app’s folder path or empty string if the app doesn’t exist
- get_widget(widget_id: str) Widget [source]¶
Get the widget with the specified ID.
- Parameters
widget_id – ID of the widget
Returns: the widget or None if not found
- parent_aux_send(targets: [], topic: str, request: Shareable, timeout: float, fl_ctx: FLContext) dict [source]¶
- persist_components(fl_ctx: FLContext, completed: bool)[source]¶
To persist the FL running components
- Parameters
fl_ctx – FLContext
completed – flag to indicate where the run is complete
Returns:
- register_aux_message_handler(topic: str, message_handle_func)[source]¶
Register aux message handling function with specified topics.
- Exception is raised when:
a handler is already registered for the topic; bad topic - must be a non-empty string bad message_handle_func - must be callable
- Implementation Note:
This method should simply call the ServerAuxRunner’s register_aux_message_handler method.
- Parameters
topic – the topic to be handled by the func
message_handle_func – the func to handle the message. Must follow aux_message_handle_func_signature.
- remove_clients(clients: List[str]) str [source]¶
Remove specified clients.
- Parameters
clients – clients to be removed
- Returns
An error message. An empty string if successful.
- restart_server() str [source]¶
Restart the server.
The engine should not exit right away. See shutdown_server.
- Returns
An error message. An empty string if successful.
- restore_components(snapshot: RunSnapshot, fl_ctx: FLContext)[source]¶
To restore the FL components from the saved snapshot
- Parameters
snapshot – RunSnapshot
fl_ctx – FLContext
Returns:
- send_aux_request(targets: [], topic: str, request: Shareable, timeout: float, fl_ctx: FLContext) dict [source]¶
Send a request to specified clients via the aux channel.
Implementation: simply calls the ServerAuxRunner’s send_aux_request method.
- Parameters
targets – target clients. None or empty list means all clients
topic – topic of the request
request – request to be sent
timeout – number of secs to wait for replies. 0 means fire-and-forget.
fl_ctx – FL context
Returns: a dict of replies (client name => reply Shareable)
- set_configurator(conf: ServerJsonConfigurator)[source]¶
Set the configurator for server.
- Parameters
conf – A ServerJsonConfigurator object
- set_job_runner(job_runner: JobRunner, job_manager: JobDefManagerSpec)[source]¶
Set the JobRunner for server.
- Parameters
job_runner – A JobRunner object
job_manager – A JobDefManagerSpec object
- set_run_manager(run_manager: RunManager)[source]¶
Set the RunManager for server.
- Parameters
run_manager – A RunManager object
- show_stats(job_id)[source]¶
Show_stats of the server.
- Parameters
job_id – current job_id
- Returns
Component stats of the server
- shutdown_server() str [source]¶
Shutdown the server.
The engine should not exit right away. It should set its status to STOPPING, and set up a timer (in a different thread), and return from this call right away (if other restart conditions are met). When the timer fires, it exits. This would give the caller to process the feedback or clean up (e.g. admin cmd response).
- Returns
An error message. An empty string if successful.
- start_app_on_server(run_number: str, job_id: Optional[str] = None, job_clients=None, snapshot=None) str [source]¶
Start the FL app on Server.
- Returns
An error message. An empty string if successful.
- start_client_job(job_id, client_sites)[source]¶
To send the start client run commands to the clients
- Parameters
client_sites – client sites
job_id – job_id
Returns:
- sync_clients_from_main_process()[source]¶
To fetch the participating clients from the main parent process
Returns: clients