nvflare.private.fed.client.client_run_manager module

class ClientRunInfo(job_id)[source]

Bases: object

To init the ClientRunInfo.

Parameters:

job_id – job id

class ClientRunManager(client_name: str, job_id: str, workspace: Workspace, client: FederatedClient, components: Dict[str, FLComponent], handlers: List[FLComponent] | None = None, conf: ClientJsonConfigurator | None = None)[source]

Bases: ClientEngineExecutorSpec, StreamableEngine

ClientRunManager provides the ClientEngine APIs implementation running in the child process (CJ).

To init the ClientRunManager.

Parameters:
  • client_name – client name

  • job_id – job id

  • workspace – workspace

  • client – FL client object

  • components – available FL components

  • handlers – available handlers.

  • conf – ClientJsonConfigurator object

abort_app(job_id: str, fl_ctx: FLContext)[source]

Abort the running FL App on the client.

Parameters:
  • job_id – current_job_id

  • fl_ctx – FLContext

add_component(component_id: str, component)[source]

Add a component into the system.

Parameters:
  • component_id – component ID

  • component – component object

Returns:

add_handler(handler: FLComponent)[source]
build_component(config_dict)[source]

Build a component from the config_dict.

Parameters:

config_dict – config dict

create_job_processing_context_properties(workspace, job_id)[source]
dispatch(topic: str, request: Shareable, fl_ctx: FLContext) Shareable[source]
fire_and_forget_aux_request(topic: str, request: Shareable, fl_ctx: FLContext, optional=False, secure=False) dict[source]

Send an async request to Server via the aux channel.

Parameters:
  • topic – topic of the request.

  • request – request to be sent

  • fl_ctx – FL context

  • optional – whether the request is optional

  • secure – whether to send the message in P2P secure mode

Returns:

fire_event(event_type: str, fl_ctx: FLContext)[source]
get_all_components() dict[source]
get_cell()[source]

Get communication cell

Returns:

get_client_from_name(client_name)[source]
get_clients()[source]
get_component(component_id: str) object[source]

Retrieve the system component from the engine.

Parameters:

component_id – component ID

Returns:

component object

get_job_clients(fl_ctx: FLContext)[source]

Get participating clients of the job. We no longer send message to the Server to ask this info. Instead, job clients are included in the meta of the job when Server started the job!

Parameters:

fl_ctx – The FLContext object

Returns:

get_run_info() ClientRunInfo[source]
get_task_assignment(fl_ctx: FLContext, timeout=None) TaskAssignment[source]
get_widget(widget_id: str) Widget[source]
get_workspace() Workspace[source]
multicast_aux_requests(topic: str, target_requests: Dict[str, Shareable], timeout: float, fl_ctx: FLContext, optional: bool = False, secure: bool = False) dict[source]

Send requests to specified targets (server or other clients) via the aux channel.

Implementation: simply calls the AuxRunner’s multicast_aux_requests method.

Parameters:
  • topic – topic of the request

  • target_requests – requests of the targets. Different target can have different request.

  • timeout – amount of time to wait for responses. 0 means fire and forget.

  • fl_ctx – FL context

  • optional – whether this request is optional

  • secure – whether to send the aux request in P2P secure

Returns: a dict of replies (client name => reply Shareable)

new_context() FLContext[source]
persist_components(fl_ctx: FLContext, completed: bool)[source]
register_aux_message_handler(topic: str, message_handle_func)[source]

Register aux message handling function with specified topics.

Exception is raised when:

a handler is already registered for the topic; bad topic - must be a non-empty string bad message_handle_func - must be callable

Implementation Note: This method should simply call the ClientAuxRunner’s register_aux_message_handler method.

Parameters:
  • topic – the topic to be handled by the func

  • message_handle_func – the func to handle the message. Must follow aux_message_handle_func_signature.

register_stream_processing(channel: str, topic: str, factory: ConsumerFactory, stream_done_cb=None, consumed_cb=None, **cb_kwargs)[source]

Register a ConsumerFactory for specified app channel and topic. Once a new streaming request is received for the channel/topic, the registered factory will be used to create an ObjectConsumer object to handle the new stream.

Note: the factory should generate a new ObjectConsumer every time get_consumer() is called. This is because multiple streaming sessions could be going on at the same time. Each streaming session should have its own ObjectConsumer.

Parameters:
  • channel – app channel

  • topic – app topic

  • factory – the factory to be registered

  • stream_done_cb – the callback to be called when streaming is done on receiving side

Returns: None

reset_errors() ClientRunInfo[source]
send_aux_request(targets: None | str | List[str], topic: str, request: Shareable, timeout: float, fl_ctx: FLContext, optional=False, secure=False) dict[source]

Send a request to Server via the aux channel.

Implementation: simply calls the ClientAuxRunner’s send_aux_request method.

Parameters:
  • targets – aux messages targets. None or empty list means the server.

  • topic – topic of the request

  • request – request to be sent

  • timeout – number of secs to wait for replies. 0 means fire-and-forget.

  • fl_ctx – FL context

  • optional – whether the request is optional

  • secure – should the request sent in the secure way

Returns:

{ site_name: reply_shareable }

Return type:

a dict of reply Shareable in the format of

send_task_result(result: Shareable, fl_ctx: FLContext, timeout=None) bool[source]
show_errors() ClientRunInfo[source]
shutdown_streamer()[source]

Shutdown the engine’s streamer.

Returns: None

stream_objects(channel: str, topic: str, stream_ctx: dict, targets: List[str], producer: ObjectProducer, fl_ctx: FLContext, optional=False, secure=False)[source]

Send a stream of Shareable objects to receivers.

Parameters:
  • channel – the channel for this stream

  • topic – topic of the stream

  • stream_ctx – context of the stream

  • targets – receiving sites

  • producer – the ObjectProducer that can produces the stream of Shareable objects

  • fl_ctx – the FLContext object

  • optional – whether the stream is optional

  • secure – whether to use P2P security

Returns: result from the generator’s reply processing

validate_targets(inputs)[source]

Validate specified target names.

Parameters:

target_names – list of names to be validated

Returns: a list of validate targets and a list of invalid target names