nvflare.apis.impl.controller module

class Controller(task_check_period=0.5)[source]

Bases: Responder, ControllerSpec, ABC

Manage life cycles of tasks and their destinations.

Parameters

task_check_period (float, optional) – interval for checking status of tasks. Defaults to 0.5.

abort_all_tasks(fl_ctx: FLContext)[source]

Ask clients to abort the execution of all tasks.

NOTE: the server should send a notification to all clients, regardless of whether the server has any standing tasks.

Parameters

fl_ctx (FLContext) – FLContext associated with this action

abort_task(task, fl_ctx: FLContext)[source]

Ask all clients to abort the execution of the specified task.

Parameters
  • task (str) – the task to be aborted

  • fl_ctx (FLContext) – FLContext associated with this action

broadcast(task: Task, fl_ctx: FLContext, targets: Optional[Union[List[Client], List[str]]] = None, min_responses: int = 1, wait_time_after_min_received: int = 0)[source]

Schedule a broadcast task. This is a non-blocking call.

The task is scheduled into a task list. Clients can request tasks and controller will dispatch the task to eligible clients.

Parameters
  • task (Task) – the task to be scheduled

  • fl_ctx (FLContext) – FLContext associated with this task

  • targets (Union[List[Client], List[str], None], optional) – the list of eligible clients or client names or None (all clients). Defaults to None.

  • min_responses (int, optional) – the condition to mark this task as completed because enough clients respond with submission. Defaults to 1.

  • wait_time_after_min_received (int, optional) – a grace period for late clients to contribute their submission. 0 means no grace period. Submission of late clients in the grace period are still collected as valid submission. Defaults to 0.

Raises

ValueError – min_responses is greater than the length of targets since this condition will make the task, if allowed to be scheduled, never exit.

broadcast_and_wait(task: Task, fl_ctx: FLContext, targets: Optional[Union[List[Client], List[str]]] = None, min_responses: int = 1, wait_time_after_min_received: int = 0, abort_signal: Optional[Signal] = None)[source]

Schedule a broadcast task. This is a blocking call.

The task is scheduled into a task list. Clients can request tasks and controller will dispatch the task to eligible clients.

Parameters
  • task (Task) – the task to be scheduled

  • fl_ctx (FLContext) – FLContext associated with this task

  • targets (Union[List[Client], List[str], None], optional) – the list of eligible clients or client names or None (all clients). Defaults to None.

  • min_responses (int, optional) – the condition to mark this task as completed because enough clients respond with submission. Defaults to 1.

  • wait_time_after_min_received (int, optional) – a grace period for late clients to contribute their submission. 0 means no grace period.

  • 0. (Submission of late clients in the grace period are still collected as valid submission. Defaults to) –

  • abort_signal (Optional[Signal], optional) – as this is a blocking call, this abort_signal informs this method to return. Defaults to None.

broadcast_forever(task: Task, fl_ctx: FLContext, targets: Optional[Union[List[Client], List[str]]] = None)[source]

Schedule a broadcast task. This is a non-blocking call.

The task is scheduled into a task list. Clients can request tasks and controller will dispatch the task to eligible clients. This broadcast will not end.

Parameters
  • task (Task) – the task to be scheduled

  • fl_ctx (FLContext) – FLContext associated with this task

  • targets (Union[List[Client], List[str], None], optional) – the list of eligible clients or client names or None (all clients). Defaults to None.

cancel_all_tasks(completion_status=TaskCompletionStatus.CANCELLED, fl_ctx: Optional[FLContext] = None)[source]

Cancel all standing tasks in this controller.

Parameters
  • completion_status (str, optional) – the completion status for this cancellation. Defaults to TaskCompletionStatus.CANCELLED.

  • fl_ctx (Optional[FLContext], optional) – FLContext associated with this cancellation. Defaults to None.

cancel_task(task: Task, completion_status=TaskCompletionStatus.CANCELLED, fl_ctx: Optional[FLContext] = None)[source]

Cancel the specified task.

Change the task completion_status, which will inform task monitor to clean up this task NOTE: we only mark the task as completed and leave it to the task monitor to clean up This is to avoid potential dead lock of task_lock

Parameters
  • task (Task) – the task to be cancelled

  • completion_status (str, optional) – the completion status for this cancellation. Defaults to TaskCompletionStatus.CANCELLED.

  • fl_ctx (Optional[FLContext], optional) – FLContext associated with this cancellation. Defaults to None.

finalize_run(fl_ctx: FLContext)[source]

Do cleanup of the coordinator implementation.

NOTE: subclass controllers should not overwrite finalize_run.

Parameters

fl_ctx (FLContext) – FLContext associated with this action

get_num_standing_tasks() int[source]

Get the number of tasks that are currently standing.

Returns

length of the list of standing tasks

Return type

int

handle_event(event_type: str, fl_ctx: FLContext)[source]

Called when events are fired.

Parameters
  • event_type (str) – all event types, including AppEventType and EventType

  • fl_ctx (FLContext) – FLContext information with current event type

handle_exception(task_id: str, fl_ctx: FLContext) None[source]

Called to cancel one task as its client_task is causing exception at upper level.

Parameters
  • task_id (str) – an id to the failing client_task

  • fl_ctx (FLContext) – FLContext associated with this client_task

initialize_run(fl_ctx: FLContext)[source]

Called by runners to initialize controller with information in fl_ctx.

Note: Controller subclasses must not overwrite this method.

Parameters

fl_ctx (FLContext) – FLContext information

process_submission(client: Client, task_name: str, task_id: str, result: Shareable, fl_ctx: FLContext)[source]

Called to process a submission from one client.

Note: this method is called by a separate thread.

Parameters
  • client (Client) – the client that submitted this task

  • task_name (str) – the task name associated this submission

  • task_id (str) – the id associated with the client_task

  • result (Shareable) – the actual submitted data from the client

  • fl_ctx (FLContext) – the FLContext associated with this submission

Raises
  • TypeError – when client is not an instance of Client

  • TypeError – when fl_ctx is not an instance of FLContext

  • TypeError – when result is not an instance of Shareable

  • ValueError – task_name is not found in the client_task

process_task_request(client: Client, fl_ctx: FLContext) Tuple[str, str, Shareable][source]

Called by runner when a client asks for a task.

Note: this is called in a separate thread.

Parameters
  • client (Client) – The record of one client requesting tasks

  • fl_ctx (FLContext) – The FLContext associated with this request

Raises
  • TypeError – when client is not an instance of Client

  • TypeError – when fl_ctx is not an instance of FLContext

  • TypeError – when any standing task containing an invalid client_task

Returns

task_name, an id for the client_task, and the data for this requst

Return type

Tuple[str, str, Shareable]

relay(task: Task, fl_ctx: FLContext, targets: Optional[Union[List[Client], List[str]]] = None, send_order: SendOrder = SendOrder.SEQUENTIAL, task_assignment_timeout: int = 0, task_result_timeout: int = 0, dynamic_targets: bool = True)[source]

Schedule a single task to targets in one-after-another style. This is a non-blocking call.

The task is scheduled into a task list. Clients can request tasks and controller will dispatch the task to eligible clients based on the send_order.

Parameters
  • task (Task) – the task to be scheduled

  • fl_ctx (FLContext) – FLContext associated with this task

  • targets (Union[List[Client], List[str], None], optional) – the list of eligible clients or client names or None (all clients). Defaults to None.

  • send_order (SendOrder, optional) – the order for clients to become eligible. SEQUENTIAL means the order in targets is enforced. ANY means any clients that are inside the targets and haven’t received the task are eligible. Defaults to SendOrder.SEQUENTIAL.

  • task_assignment_timeout (int, optional) – how long to wait for one client to pick the task. Defaults to 0.

  • task_result_timeout (int, optional) – how long to wait for current working client to reply its result. Defaults to 0.

  • dynamic_targets (bool, optional) – allow clients not in targets to join at the end of targets list. Defaults to True.

Raises
  • ValueError – when task_assignment_timeout is greater than task’s timeout

  • ValueError – when task_result_timeout is greater than task’s timeout

  • TypeError – send_order is not defined in SendOrder

  • TypeError – when dynamic_targets is not a boolean variable

  • ValueError – targets is None or an empty list but dynamic_targets is False

relay_and_wait(task: Task, fl_ctx: FLContext, targets: Optional[Union[List[Client], List[str]]] = None, send_order=SendOrder.SEQUENTIAL, task_assignment_timeout: int = 0, task_result_timeout: int = 0, dynamic_targets: bool = True, abort_signal: Optional[Signal] = None)[source]

Schedule a single task to targets in one-after-another style. This is a blocking call.

The task is scheduled into a task list. Clients can request tasks and controller will dispatch the task to eligible clients based on the send_order.

Parameters
  • task (Task) – the task to be scheduled

  • fl_ctx (FLContext) – FLContext associated with this task

  • targets (Union[List[Client], List[str], None], optional) – the list of eligible clients or client names or None (all clients). Defaults to None.

  • send_order (SendOrder, optional) – the order for clients to become eligible. SEQUENTIAL means the order in targets is enforced. ANY means

  • SendOrder.SEQUENTIAL. (clients in targets and haven't received task are eligible for task. Defaults to) –

  • task_assignment_timeout (int, optional) – how long to wait for one client to pick the task. Defaults to 0.

  • task_result_timeout (int, optional) – how long to wait for current working client to reply its result. Defaults to 0.

  • dynamic_targets (bool, optional) – allow clients not in targets to join at the end of targets list. Defaults to True.

  • abort_signal (Optional[Signal], optional) – as this is a blocking call, this abort_signal informs this method to return. Defaults to None.

send(task: Task, fl_ctx: FLContext, targets: Optional[Union[List[Client], List[str]]] = None, send_order: SendOrder = SendOrder.SEQUENTIAL, task_assignment_timeout: int = 0)[source]

Schedule a single task to targets. This is a non-blocking call.

The task is scheduled into a task list. Clients can request tasks and controller will dispatch the task to eligible clients based on the send_order.

Parameters
  • task (Task) – the task to be scheduled

  • fl_ctx (FLContext) – FLContext associated with this task

  • targets (Union[List[Client], List[str], None], optional) – the list of eligible clients or client names or None (all clients). Defaults to None.

  • send_order (SendOrder, optional) – the order for clients to become eligible. SEQUENTIAL means the order in targets is enforced. ANY means

  • SendOrder.SEQUENTIAL. (clients in targets and haven't received task are eligible for task. Defaults to) –

  • task_assignment_timeout (int, optional) – how long to wait for one client to pick the task. Defaults to 0.

Raises
  • ValueError – when task_assignment_timeout is greater than task’s timeout.

  • TypeError – send_order is not defined in SendOrder

  • ValueError – targets is None or an empty list

send_and_wait(task: Task, fl_ctx: FLContext, targets: Optional[Union[List[Client], List[str]]] = None, send_order: SendOrder = SendOrder.SEQUENTIAL, task_assignment_timeout: int = 0, abort_signal: Optional[Signal] = None)[source]

Schedule a single task to targets. This is a blocking call.

The task is scheduled into a task list. Clients can request tasks and controller will dispatch the task to eligible clients based on the send_order.

Parameters
  • task (Task) – the task to be scheduled

  • fl_ctx (FLContext) – FLContext associated with this task

  • targets (Union[List[Client], List[str], None], optional) – the list of eligible clients or client names or None (all clients). Defaults to None.

  • send_order (SendOrder, optional) – the order for clients to become eligible. SEQUENTIAL means the order in targets is enforced. ANY means

  • SendOrder.SEQUENTIAL. (clients in targets and haven't received task are eligible for task. Defaults to) –

  • task_assignment_timeout (int, optional) – how long to wait for one client to pick the task. Defaults to 0.

  • abort_signal (Optional[Signal], optional) – as this is a blocking call, this abort_signal informs this method to return. Defaults to None.