nvflare.apis.wf_comm_spec module

class WFCommSpec[source]

Bases: ABC

broadcast(task: Task, fl_ctx: FLContext, targets: List[Client] | List[str] | None = None, min_responses: int = 0, wait_time_after_min_received: int = 0)[source]

Schedule to broadcast the task to specified targets.

This is a non-blocking call.

The task is standing until one of the following conditions comes true:
  • if timeout is specified (> 0), and the task has been standing for more than the specified time

  • the controller has received the specified min_responses results for this task, and all target clients are done.

  • the controller has received the specified min_responses results for this task, and has waited for wait_time_after_min_received.

While the task is standing:
  • Before sending the task to a client, the before_task_sent CB (if specified) is called;

  • When a result is received from a client, the result_received CB (if specified) is called;

After the task is done, the task_done CB (if specified) is called:
  • If result_received CB is specified, the ‘result’ in the ClientTask of each client is produced by the result_received CB;

  • Otherwise, the ‘result’ contains the original result submitted by the clients;

NOTE: if the targets is None, the actual broadcast target clients will be dynamic, because the clients could join/disconnect at any moment. While the task is standing, any client that joins automatically becomes a target for this broadcast.

Parameters:
  • task – the task to be sent

  • fl_ctx – the FL context

  • targets – list of destination clients. None means all clients are determined dynamically;

  • min_responses – the min number of responses expected. If == 0, must get responses from all clients that the task has been sent to;

  • wait_time_after_min_received – how long (secs) to wait after the min_responses is received. If == 0, end the task immediately after the min responses are received;

broadcast_and_wait(task: Task, fl_ctx: FLContext, targets: List[Client] | List[str] | None = None, min_responses: int = 0, wait_time_after_min_received: int = 0, abort_signal: Signal | None = None)[source]

This is the blocking version of the ‘broadcast’ method.

First, the task is scheduled for broadcast (see the broadcast method); It then waits until the task is completed.

Parameters:
  • task – the task to be sent

  • fl_ctx – the FL context

  • targets – list of destination clients. None means all clients are determined dynamically.

  • min_responses – the min number of responses expected. If == 0, must get responses from all clients that the task has been sent to;

  • wait_time_after_min_received – how long (secs) to wait after the min_responses is received. If == 0, end the task immediately after the min responses are received;

  • abort_signal – the abort signal. If triggered, this method stops waiting and returns to the caller.

broadcast_forever(task: Task, fl_ctx: FLContext, targets: List[Client] | List[str] | None = None)[source]

Schedule a broadcast task that never ends until timeout or explicitly cancelled.

All clients will get the task every time it asks for a new task. This is a non-blocking call.

NOTE: you can change the content of the task in the before_task_sent function.

Parameters:
  • task – the task to be sent

  • fl_ctx – the FL context

  • targets – list of destination clients. None means all clients are determined dynamically.

cancel_all_tasks(completion_status=TaskCompletionStatus.CANCELLED, fl_ctx: FLContext | None = None)[source]

Cancels all standing tasks.

Parameters:
  • completion_status – the TaskCompletionStatus of the task

  • fl_ctx – the FL context

cancel_task(task: Task, completion_status: TaskCompletionStatus = TaskCompletionStatus.CANCELLED, fl_ctx: FLContext | None = None)[source]

Cancels the specified task.

If the task is standing, the task is cancelled immediately (and removed from job queue) and calls the task_done CB (if specified);

If the task is not standing, this method has no effect.

Parameters:
  • task – the task to be cancelled

  • completion_status – the TaskCompletionStatus of the task

  • fl_ctx – the FL context

check_tasks()[source]

Checks if tasks should be exited.

client_is_active(client_name: str, reason: str, fl_ctx: FLContext)[source]

Called by the Engine to notify us that the client is active .

Parameters:
  • client_name – name of the client that is active

  • reason – why client is considered active

  • fl_ctx – the FLContext

finalize_run(fl_ctx: FLContext)[source]

Called when a new RUN is finished.

Parameters:

fl_ctx – the FL context

get_client_disconnect_time(client_name)[source]

Get the time that the client is deemed disconnected.

Parameters:

client_name – the name of the client

Returns: time at which the client was deemed disconnected; or None if the client is not disconnected.

get_num_standing_tasks() int[source]

Gets tasks that are currently standing.

Returns: length of the list of standing tasks

handle_exception(task_id: str, fl_ctx: FLContext)[source]

Called after process_task_request returns, but exception occurs before task is sent out.

initialize_run(fl_ctx: FLContext)[source]

Called when a new RUN is about to start.

Parameters:

fl_ctx – FL context. It must contain ‘job_id’ that is to be initialized

process_dead_client_report(client_name: str, fl_ctx: FLContext)[source]

Called by the Engine to process dead client report.

Parameters:
  • client_name – name of the client that dead report is received

  • fl_ctx – the FLContext

process_submission(client: Client, task_name: str, task_id: str, result: Shareable, fl_ctx: FLContext)[source]

Called by the Engine to process the submitted result from a client.

Parameters:
  • client – the Client that the submitted result is from

  • task_name – the name of the task

  • task_id – the id of the task

  • result – the Shareable result from the Client

  • fl_ctx – the FLContext

process_task_check(task_id: str, fl_ctx: FLContext)[source]

Called by the Engine to check whether a specified task still exists. :param task_id: the id of the task :param fl_ctx: the FLContext

Returns: the ClientTask object if exists; None otherwise

process_task_request(client: Client, fl_ctx: FLContext) Tuple[str, str, Shareable][source]

Called by the Engine when a task request is received from a client.

Parameters:
  • client – the Client that the task request is from

  • fl_ctx – the FLContext

Returns: task name, task id, and task data

relay(task: Task, fl_ctx: FLContext, targets: List[Client] | List[str] | None = None, send_order: SendOrder = SendOrder.SEQUENTIAL, task_assignment_timeout: int = 0, task_result_timeout: int = 0, dynamic_targets: bool = True)[source]

Schedules a task to be done sequentially by the clients in the targets list. This is a non-blocking call.

Parameters:
  • task – the task to be performed by each client

  • fl_ctx – the FL context for scheduling the task

  • targets – list of clients. If None, all clients.

  • send_order – how to choose the next client

  • task_assignment_timeout – how long to wait for the expected client to get assigned

  • client. (before assigning to next)

  • task_result_timeout – how long to wait for result from the assigned client before giving up.

  • dynamic_targets – whether to dynamically grow the target list. If True, then the target list is

  • joins. (expanded dynamically when a new client)

relay_and_wait(task: Task, fl_ctx: FLContext, targets: List[Client] | List[str] | None = None, send_order=SendOrder.SEQUENTIAL, task_assignment_timeout: int = 0, task_result_timeout: int = 0, dynamic_targets: bool = True, abort_signal: Signal | None = None)[source]

This is the blocking version of ‘relay’.

send(task: Task, fl_ctx: FLContext, targets: List[Client] | List[str] | None = None, send_order: SendOrder = SendOrder.SEQUENTIAL, task_assignment_timeout: int = 0)[source]

Schedule to send the task to a single target client.

This is a non-blocking call.

In ANY order, the target client is the first target that asks for task. In SEQUENTIAL order, the controller will try its best to send the task to the first client in the targets list. If can’t, it will try the next target, and so on.

NOTE: if the ‘targets’ is None, the actual target clients will be dynamic, because the clients could join/disconnect at any moment. While the task is standing, any client that joins automatically becomes a target for this task.

If the send_order is SEQUENTIAL, the targets must be a non-empty list of client names.

Parameters:
  • task – the task to be sent

  • fl_ctx – the FL context

  • targets – list of candidate target clients.

  • send_order – how to choose the client to send the task.

  • task_assignment_timeout – in SEQUENTIAL order, this is the wait time for trying a target client, before trying next target.

send_and_wait(task: Task, fl_ctx: FLContext, targets: List[Client] | List[str] | None = None, send_order: SendOrder = SendOrder.SEQUENTIAL, task_assignment_timeout: int = 0, abort_signal: Signal | None = None)[source]

This is the blocking version of the ‘send’ method.

First, the task is scheduled for send (see the ‘send’ method); It then waits until the task is completed and returns the task completion status and collected result.

Parameters:
  • task – the task to be performed by each client

  • fl_ctx – the FL context for scheduling the task

  • targets – list of clients. If None, all clients.

  • send_order – how to choose the next client

  • task_assignment_timeout – how long to wait for the expected client to get assigned

  • client. (before assigning to next)

  • abort_signal – the abort signal. If triggered, this method stops waiting and returns to the caller.