nvflare.private.fed.client.client_engine module
- class ClientEngine(client: FederatedClient, args, rank, workers=5)[source]
Bases:
ClientEngineInternalSpec,StreamableEngineClientEngine runs in the client parent process (CP).
To init the ClientEngine.
- Parameters:
client – FL client object
args – command args
rank – local process rank
workers – number of workers
- abort_app(job_id: str) str[source]
Aborts the app execution for the specified run.
- Returns:
A string message.
- abort_task(job_id: str) str[source]
Abort the client current executing task.
- Returns:
A string message.
- add_component(component_id: str, component)[source]
Add a component into the system.
- Parameters:
component_id – component ID
component – component object
Returns:
- delete_run(job_id: str) str[source]
Deletes the specified run.
- Parameters:
job_id – job_id
- Returns:
A string message.
- deploy_app(app_name: str, job_id: str, job_meta: dict, client_name: str, app_data) str[source]
Deploy the app to specified run.
- Parameters:
app_name – FL_app name
job_id – job that the app is to be deployed to
job_meta – meta data of the job that the app belongs to
client_name – name of the client
app_data – zip data of the app
- Returns:
A error message if any; empty str is okay.
- fire_and_forget_aux_request(topic: str, request: Shareable, fl_ctx: FLContext, optional=False, secure=False) dict[source]
- get_cell()[source]
Get the communication cell. This method must be implemented since AuxRunner calls to get cell.
Returns:
- get_component(component_id: str) object[source]
Retrieve the system component from the engine.
- Parameters:
component_id – component ID
- Returns:
component object
- get_current_run_info(job_id) ClientRunInfo[source]
- initialize_comm(cell: Cell)[source]
This is called when communication cell has been created. We will set up aux message handler here.
- Parameters:
cell
Returns:
- notify_job_status(job_id: str, job_status)[source]
Notify the engine what’s the client job’s new status.
- Parameters:
job_id – job_id
job_status – Client job status
Returns:
- register_aux_message_handler(topic: str, message_handle_func)[source]
Register aux message handling function with specified topics.
- Exception is raised when:
a handler is already registered for the topic; bad topic - must be a non-empty string bad message_handle_func - must be callable
- Implementation Note:
This method should simply call the ServerAuxRunner’s register_aux_message_handler method.
- Parameters:
topic – the topic to be handled by the func
message_handle_func – the func to handle the message. Must follow aux_message_handle_func_signature.
- register_stream_processing(channel: str, topic: str, factory: ConsumerFactory, stream_done_cb=None, consumed_cb=None, **cb_kwargs)[source]
Register a ConsumerFactory for specified app channel and topic. Once a new streaming request is received for the channel/topic, the registered factory will be used to create an ObjectConsumer object to handle the new stream.
Note: the factory should generate a new ObjectConsumer every time get_consumer() is called. This is because multiple streaming sessions could be going on at the same time. Each streaming session should have its own ObjectConsumer.
- Parameters:
channel – app channel
topic – app topic
factory – the factory to be registered
stream_done_cb – the callback to be called when streaming is done on receiving side
consumed_cb – the CB to be called after a chunk is consumed
Returns: None
- send_aux_request(topic: str, request: Shareable, timeout: float, fl_ctx: FLContext, optional=False, secure=False) Shareable[source]
Send a request to the Server via the aux channel.
Implementation: simply calls the AuxRunner’s send_aux_request method.
- Parameters:
topic – topic of the request.
request – request to be sent
timeout – number of secs to wait for replies. 0 means fire-and-forget.
fl_ctx – FL context
optional – whether this message is optional
secure – send the aux request in a secure way
Returns: a dict of replies (client name => reply Shareable)
- send_to_job(job_id, channel: str, topic: str, msg: Message, timeout: float, optional=False) Message[source]
Send a message to CJ
- Parameters:
job_id – id of the job
channel – message channel
topic – message topic
msg – the message to be sent
timeout – how long to wait for reply
optional – whether the message is optional
Returns: reply from CJ
- start_app(job_id: str, job_meta: dict, allocated_resource: dict | None = None, token: str | None = None, resource_manager=None) str[source]
Starts the app for the specified run.
- Parameters:
job_id – job_id
allocated_resource – allocated resource
token – token
resource_manager – resource manager
- Returns:
A string message.
- stream_objects(channel: str, topic: str, stream_ctx: dict, targets: List[str], producer: ObjectProducer, fl_ctx: FLContext, optional=False, secure=False)[source]
Send a stream of Shareable objects to receivers.
- Parameters:
channel – the channel for this stream
topic – topic of the stream
stream_ctx – context of the stream
targets – receiving sites
producer – the ObjectProducer that can produces the stream of Shareable objects
fl_ctx – the FLContext object
optional – whether the stream is optional
secure – whether to use P2P security
Returns: result from the generator’s reply processing