nvflare.private.fed.client.client_run_manager module
- class ClientRunInfo(job_id)[source]
Bases:
objectTo init the ClientRunInfo.
- Parameters:
job_id – job id
- class ClientRunManager(client_name: str, job_id: str, workspace: Workspace, client: FederatedClient, components: Dict[str, FLComponent], handlers: List[FLComponent] | None = None, conf: ClientJsonConfigurator | None = None)[source]
Bases:
ClientEngineExecutorSpec,StreamableEngineClientRunManager provides the ClientEngine APIs implementation running in the child process (CJ).
To init the ClientRunManager.
- Parameters:
client_name – client name
job_id – job id
workspace – workspace
client – FL client object
components – available FL components
handlers – available handlers.
conf – ClientJsonConfigurator object
- abort_app(job_id: str, fl_ctx: FLContext)[source]
Abort the running FL App on the client.
- Parameters:
job_id – current_job_id
fl_ctx – FLContext
- add_component(component_id: str, component)[source]
Add a component into the system.
- Parameters:
component_id – component ID
component – component object
Returns:
- add_handler(handler: FLComponent)[source]
- build_component(config_dict)[source]
Build a component from the config_dict.
- Parameters:
config_dict – config dict
- fire_and_forget_aux_request(topic: str, request: Shareable, fl_ctx: FLContext, optional=False, secure=False) dict[source]
Send an async request to Server via the aux channel.
- Parameters:
topic – topic of the request.
request – request to be sent
fl_ctx – FL context
optional – whether the request is optional
secure – whether to send the message in P2P secure mode
Returns:
- get_component(component_id: str) object[source]
Retrieve the system component from the engine.
- Parameters:
component_id – component ID
- Returns:
component object
- get_job_clients(fl_ctx: FLContext)[source]
Get participating clients of the job. We no longer send message to the Server to ask this info. Instead, job clients are included in the meta of the job when Server started the job!
- Parameters:
fl_ctx – The FLContext object
Returns:
- get_run_info() ClientRunInfo[source]
- get_task_assignment(fl_ctx: FLContext, timeout=None) TaskAssignment[source]
- multicast_aux_requests(topic: str, target_requests: Dict[str, Shareable], timeout: float, fl_ctx: FLContext, optional: bool = False, secure: bool = False) dict[source]
Send requests to specified targets (server or other clients) via the aux channel.
Implementation: simply calls the AuxRunner’s multicast_aux_requests method.
- Parameters:
topic – topic of the request
target_requests – requests of the targets. Different target can have different request.
timeout – amount of time to wait for responses. 0 means fire and forget.
fl_ctx – FL context
optional – whether this request is optional
secure – whether to send the aux request in P2P secure
Returns: a dict of replies (client name => reply Shareable)
- register_aux_message_handler(topic: str, message_handle_func)[source]
Register aux message handling function with specified topics.
- Exception is raised when:
a handler is already registered for the topic; bad topic - must be a non-empty string bad message_handle_func - must be callable
Implementation Note: This method should simply call the ClientAuxRunner’s register_aux_message_handler method.
- Parameters:
topic – the topic to be handled by the func
message_handle_func – the func to handle the message. Must follow aux_message_handle_func_signature.
- register_stream_processing(channel: str, topic: str, factory: ConsumerFactory, stream_done_cb=None, consumed_cb=None, **cb_kwargs)[source]
Register a ConsumerFactory for specified app channel and topic. Once a new streaming request is received for the channel/topic, the registered factory will be used to create an ObjectConsumer object to handle the new stream.
Note: the factory should generate a new ObjectConsumer every time get_consumer() is called. This is because multiple streaming sessions could be going on at the same time. Each streaming session should have its own ObjectConsumer.
- Parameters:
channel – app channel
topic – app topic
factory – the factory to be registered
stream_done_cb – the callback to be called when streaming is done on receiving side
Returns: None
- reset_errors() ClientRunInfo[source]
- send_aux_request(targets: None | str | List[str], topic: str, request: Shareable, timeout: float, fl_ctx: FLContext, optional=False, secure=False) dict[source]
Send a request to Server via the aux channel.
Implementation: simply calls the ClientAuxRunner’s send_aux_request method.
- Parameters:
targets – aux messages targets. None or empty list means the server.
topic – topic of the request
request – request to be sent
timeout – number of secs to wait for replies. 0 means fire-and-forget.
fl_ctx – FL context
optional – whether the request is optional
secure – should the request sent in the secure way
- Returns:
{ site_name: reply_shareable }
- Return type:
a dict of reply Shareable in the format of
- show_errors() ClientRunInfo[source]
- stream_objects(channel: str, topic: str, stream_ctx: dict, targets: List[str], producer: ObjectProducer, fl_ctx: FLContext, optional=False, secure=False)[source]
Send a stream of Shareable objects to receivers.
- Parameters:
channel – the channel for this stream
topic – topic of the stream
stream_ctx – context of the stream
targets – receiving sites
producer – the ObjectProducer that can produces the stream of Shareable objects
fl_ctx – the FLContext object
optional – whether the stream is optional
secure – whether to use P2P security
Returns: result from the generator’s reply processing