nvflare.apis.impl.controller module

class Controller(task_check_period=0.2)[source]

Bases: FLComponent, ControllerSpec, ABC

Controller logic for tasks and their destinations.

Must set_communicator() to access communication related function implementations.

Parameters:

task_check_period (float, optional) – interval for checking status of tasks. Applicable for WFCommServer. Defaults to 0.2.

broadcast(task: Task, fl_ctx: FLContext, targets: List[Client] | List[str] | None = None, min_responses: int = 1, wait_time_after_min_received: int = 0)[source]

Schedule to broadcast the task to specified targets.

This is a non-blocking call.

The task is standing until one of the following conditions comes true:
  • if timeout is specified (> 0), and the task has been standing for more than the specified time

  • the controller has received the specified min_responses results for this task, and all target clients are done.

  • the controller has received the specified min_responses results for this task, and has waited for wait_time_after_min_received.

While the task is standing:
  • Before sending the task to a client, the before_task_sent CB (if specified) is called;

  • When a result is received from a client, the result_received CB (if specified) is called;

After the task is done, the task_done CB (if specified) is called:
  • If result_received CB is specified, the ‘result’ in the ClientTask of each client is produced by the result_received CB;

  • Otherwise, the ‘result’ contains the original result submitted by the clients;

NOTE: if the targets is None, the actual broadcast target clients will be dynamic, because the clients could join/disconnect at any moment. While the task is standing, any client that joins automatically becomes a target for this broadcast.

Parameters:
  • task – the task to be sent

  • fl_ctx – the FL context

  • targets – list of destination clients. None means all clients are determined dynamically;

  • min_responses – the min number of responses expected. If == 0, must get responses from all clients that the task has been sent to;

  • wait_time_after_min_received – how long (secs) to wait after the min_responses is received. If == 0, end the task immediately after the min responses are received;

broadcast_and_wait(task: Task, fl_ctx: FLContext, targets: List[Client] | List[str] | None = None, min_responses: int = 1, wait_time_after_min_received: int = 0, abort_signal: Signal | None = None)[source]

This is the blocking version of the ‘broadcast’ method.

First, the task is scheduled for broadcast (see the broadcast method); It then waits until the task is completed.

Parameters:
  • task – the task to be sent

  • fl_ctx – the FL context

  • targets – list of destination clients. None means all clients are determined dynamically.

  • min_responses – the min number of responses expected. If == 0, must get responses from all clients that the task has been sent to;

  • wait_time_after_min_received – how long (secs) to wait after the min_responses is received. If == 0, end the task immediately after the min responses are received;

  • abort_signal – the abort signal. If triggered, this method stops waiting and returns to the caller.

broadcast_forever(task: Task, fl_ctx: FLContext, targets: List[Client] | List[str] | None = None)[source]

Schedule a broadcast task that never ends until timeout or explicitly cancelled.

All clients will get the task every time it asks for a new task. This is a non-blocking call.

NOTE: you can change the content of the task in the before_task_sent function.

Parameters:
  • task – the task to be sent

  • fl_ctx – the FL context

  • targets – list of destination clients. None means all clients are determined dynamically.

cancel_all_tasks(completion_status=TaskCompletionStatus.CANCELLED, fl_ctx: FLContext | None = None)[source]

Cancels all standing tasks.

Parameters:
  • completion_status – the TaskCompletionStatus of the task

  • fl_ctx – the FL context

cancel_task(task: Task, completion_status=TaskCompletionStatus.CANCELLED, fl_ctx: FLContext | None = None)[source]

Cancels the specified task.

If the task is standing, the task is cancelled immediately (and removed from job queue) and calls the task_done CB (if specified);

If the task is not standing, this method has no effect.

Parameters:
  • task – the task to be cancelled

  • completion_status – the TaskCompletionStatus of the task

  • fl_ctx – the FL context

get_client_disconnect_time(client_name)[source]

Get the time when the client is deemed disconnected.

Parameters:

client_name – the name of the client

Returns: time at which the client was deemed disconnected; or None if the client is not disconnected.

get_num_standing_tasks() int[source]

Gets tasks that are currently standing.

Returns: length of the list of standing tasks

initialize(fl_ctx: FLContext)[source]
relay(task: Task, fl_ctx: FLContext, targets: List[Client] | List[str] | None = None, send_order: SendOrder = SendOrder.SEQUENTIAL, task_assignment_timeout: int = 0, task_result_timeout: int = 0, dynamic_targets: bool = True)[source]

Schedules a task to be done sequentially by the clients in the targets list. This is a non-blocking call.

Parameters:
  • task – the task to be performed by each client

  • fl_ctx – the FL context for scheduling the task

  • targets – list of clients. If None, all clients.

  • send_order – how to choose the next client

  • task_assignment_timeout – how long to wait for the expected client to get assigned

  • client. (before assigning to next) –

  • task_result_timeout – how long to wait for result from the assigned client before giving up.

  • dynamic_targets – whether to dynamically grow the target list. If True, then the target list is

  • joins. (expanded dynamically when a new client) –

relay_and_wait(task: Task, fl_ctx: FLContext, targets: List[Client] | List[str] | None = None, send_order=SendOrder.SEQUENTIAL, task_assignment_timeout: int = 0, task_result_timeout: int = 0, dynamic_targets: bool = True, abort_signal: Signal | None = None)[source]

This is the blocking version of ‘relay’.

send(task: Task, fl_ctx: FLContext, targets: List[Client] | List[str] | None = None, send_order: SendOrder = SendOrder.SEQUENTIAL, task_assignment_timeout: int = 0)[source]

Schedule to send the task to a single target client.

This is a non-blocking call.

In ANY order, the target client is the first target that asks for task. In SEQUENTIAL order, the controller will try its best to send the task to the first client in the targets list. If can’t, it will try the next target, and so on.

NOTE: if the ‘targets’ is None, the actual target clients will be dynamic, because the clients could join/disconnect at any moment. While the task is standing, any client that joins automatically becomes a target for this task.

If the send_order is SEQUENTIAL, the targets must be a non-empty list of client names.

Parameters:
  • task – the task to be sent

  • fl_ctx – the FL context

  • targets – list of candidate target clients.

  • send_order – how to choose the client to send the task.

  • task_assignment_timeout – in SEQUENTIAL order, this is the wait time for trying a target client, before trying next target.

send_and_wait(task: Task, fl_ctx: FLContext, targets: List[Client] | List[str] | None = None, send_order: SendOrder = SendOrder.SEQUENTIAL, task_assignment_timeout: int = 0, abort_signal: Signal | None = None)[source]

This is the blocking version of the ‘send’ method.

First, the task is scheduled for send (see the ‘send’ method); It then waits until the task is completed and returns the task completion status and collected result.

Parameters:
  • task – the task to be performed by each client

  • fl_ctx – the FL context for scheduling the task

  • targets – list of clients. If None, all clients.

  • send_order – how to choose the next client

  • task_assignment_timeout – how long to wait for the expected client to get assigned

  • client. (before assigning to next) –

  • abort_signal – the abort signal. If triggered, this method stops waiting and returns to the caller.

set_communicator(communicator: WFCommSpec)[source]