nvflare.private.fed.client.client_engine module

class ClientEngine(client: FederatedClient, args, rank, workers=5)[source]

Bases: ClientEngineInternalSpec, StreamableEngine

ClientEngine runs in the client parent process (CP).

To init the ClientEngine.

Parameters:
  • client – FL client object

  • args – command args

  • rank – local process rank

  • workers – number of workers

abort_app(job_id: str) str[source]

Aborts the app execution for the specified run.

Returns:

A string message.

abort_task(job_id: str) str[source]

Abort the client current executing task.

Returns:

A string message.

add_component(component_id: str, component)[source]

Add a component into the system.

Parameters:
  • component_id – component ID

  • component – component object

Returns:

configure_job_log(job_id, config)[source]
delete_run(job_id: str) str[source]

Deletes the specified run.

Parameters:

job_id – job_id

Returns:

A string message.

deploy_app(app_name: str, job_id: str, job_meta: dict, client_name: str, app_data) str[source]

Deploy the app to specified run.

Parameters:
  • app_name – FL_app name

  • job_id – job that the app is to be deployed to

  • job_meta – meta data of the job that the app belongs to

  • client_name – name of the client

  • app_data – zip data of the app

Returns:

A error message if any; empty str is okay.

fire_and_forget_aux_request(topic: str, request: Shareable, fl_ctx: FLContext, optional=False, secure=False) dict[source]
fire_event(event_type: str, fl_ctx: FLContext)[source]
get_all_job_ids()[source]

Get all the client job_id.

Returns: list of all the job_id

get_cell()[source]

Get the communication cell. This method must be implemented since AuxRunner calls to get cell.

Returns:

get_client_name()[source]

Get the ClientEngine client_name.

Returns: the client_name

get_component(component_id: str) object[source]

Retrieve the system component from the engine.

Parameters:

component_id – component ID

Returns:

component object

get_current_run_info(job_id) ClientRunInfo[source]
get_engine_status()[source]
get_errors(job_id)[source]
initialize_comm(cell: Cell)[source]

This is called when communication cell has been created. We will set up aux message handler here.

Parameters:

cell

Returns:

new_context() FLContext[source]
notify_job_status(job_id: str, job_status)[source]

Notify the engine what’s the client job’s new status.

Parameters:
  • job_id – job_id

  • job_status – Client job status

Returns:

register_aux_message_handler(topic: str, message_handle_func)[source]

Register aux message handling function with specified topics.

Exception is raised when:

a handler is already registered for the topic; bad topic - must be a non-empty string bad message_handle_func - must be callable

Implementation Note:

This method should simply call the ServerAuxRunner’s register_aux_message_handler method.

Parameters:
  • topic – the topic to be handled by the func

  • message_handle_func – the func to handle the message. Must follow aux_message_handle_func_signature.

register_stream_processing(channel: str, topic: str, factory: ConsumerFactory, stream_done_cb=None, consumed_cb=None, **cb_kwargs)[source]

Register a ConsumerFactory for specified app channel and topic. Once a new streaming request is received for the channel/topic, the registered factory will be used to create an ObjectConsumer object to handle the new stream.

Note: the factory should generate a new ObjectConsumer every time get_consumer() is called. This is because multiple streaming sessions could be going on at the same time. Each streaming session should have its own ObjectConsumer.

Parameters:
  • channel – app channel

  • topic – app topic

  • factory – the factory to be registered

  • stream_done_cb – the callback to be called when streaming is done on receiving side

  • consumed_cb – the CB to be called after a chunk is consumed

Returns: None

reset_errors(job_id)[source]
restart() str[source]

Restarts the FL client.

Returns:

A string message.

send_aux_request(topic: str, request: Shareable, timeout: float, fl_ctx: FLContext, optional=False, secure=False) Shareable[source]

Send a request to the Server via the aux channel.

Implementation: simply calls the AuxRunner’s send_aux_request method.

Parameters:
  • topic – topic of the request.

  • request – request to be sent

  • timeout – number of secs to wait for replies. 0 means fire-and-forget.

  • fl_ctx – FL context

  • optional – whether this message is optional

  • secure – send the aux request in a secure way

Returns: a dict of replies (client name => reply Shareable)

send_to_job(job_id, channel: str, topic: str, msg: Message, timeout: float, optional=False) Message[source]

Send a message to CJ

Parameters:
  • job_id – id of the job

  • channel – message channel

  • topic – message topic

  • msg – the message to be sent

  • timeout – how long to wait for reply

  • optional – whether the message is optional

Returns: reply from CJ

set_agent(admin_agent)[source]
shutdown() str[source]

Shuts down the FL client.

Returns:

A string message.

shutdown_streamer()[source]

Shutdown the engine’s streamer.

Returns: None

start_app(job_id: str, job_meta: dict, allocated_resource: dict | None = None, token: str | None = None, resource_manager=None) str[source]

Starts the app for the specified run.

Parameters:
  • job_id – job_id

  • allocated_resource – allocated resource

  • token – token

  • resource_manager – resource manager

Returns:

A string message.

stream_objects(channel: str, topic: str, stream_ctx: dict, targets: List[str], producer: ObjectProducer, fl_ctx: FLContext, optional=False, secure=False)[source]

Send a stream of Shareable objects to receivers.

Parameters:
  • channel – the channel for this stream

  • topic – topic of the stream

  • stream_ctx – context of the stream

  • targets – receiving sites

  • producer – the ObjectProducer that can produces the stream of Shareable objects

  • fl_ctx – the FLContext object

  • optional – whether the stream is optional

  • secure – whether to use P2P security

Returns: result from the generator’s reply processing

shutdown_client(federated_client, touch_file)[source]