nvflare.apis.impl.wf_comm_server module

class WFCommServer(task_check_period=0.2)[source]

Bases: FLComponent, WFCommSpec

Manage life cycles of tasks and their destinations.

Parameters:

task_check_period (float, optional) – interval for checking status of tasks. Defaults to 0.2.

broadcast(task: Task, fl_ctx: FLContext, targets: List[Client] | List[str] | None = None, min_responses: int = 1, wait_time_after_min_received: int = 0)[source]

Schedule a broadcast task. This is a non-blocking call.

The task is scheduled into a task list. Clients can request tasks and controller will dispatch the task to eligible clients.

Parameters:
  • task (Task) – the task to be scheduled

  • fl_ctx (FLContext) – FLContext associated with this task

  • targets (Union[List[Client], List[str], None], optional) – the list of eligible clients or client names or None (all clients). Defaults to None.

  • min_responses (int, optional) – the condition to mark this task as completed because enough clients respond with submission. Defaults to 1.

  • wait_time_after_min_received (int, optional) –

    a grace period for late clients to contribute their

    submission. 0 means no grace period.

    Submission of late clients in the grace period are still collected as valid submission. Defaults to 0.

Raises:

ValueError – min_responses is greater than the length of targets since this condition will make the task, if allowed to be scheduled, never exit.

broadcast_and_wait(task: Task, fl_ctx: FLContext, targets: List[Client] | List[str] | None = None, min_responses: int = 1, wait_time_after_min_received: int = 0, abort_signal: Signal | None = None)[source]

Schedule a broadcast task. This is a blocking call.

The task is scheduled into a task list. Clients can request tasks and controller will dispatch the task to eligible clients.

Parameters:
  • task (Task) – the task to be scheduled

  • fl_ctx (FLContext) – FLContext associated with this task

  • targets (Union[List[Client], List[str], None], optional) – the list of eligible clients or client names or None (all clients). Defaults to None.

  • min_responses (int, optional) – the condition to mark this task as completed because enough clients respond with submission. Defaults to 1.

  • wait_time_after_min_received (int, optional) – a grace period for late clients to contribute their submission. 0 means no grace period.

  • 0. (Submission of late clients in the grace period are still collected as valid submission. Defaults to) –

  • abort_signal (Optional[Signal], optional) – as this is a blocking call, this abort_signal informs this method to return. Defaults to None.

broadcast_forever(task: Task, fl_ctx: FLContext, targets: List[Client] | List[str] | None = None)[source]

Schedule a broadcast task. This is a non-blocking call.

The task is scheduled into a task list. Clients can request tasks and controller will dispatch the task to eligible clients.

This broadcast will not end.

Parameters:
  • task (Task) – the task to be scheduled

  • fl_ctx (FLContext) – FLContext associated with this task

  • targets (Union[List[Client], List[str], None], optional) – the list of eligible clients or client names or None (all clients). Defaults to None.

cancel_all_tasks(completion_status=TaskCompletionStatus.CANCELLED, fl_ctx: FLContext | None = None)[source]

Cancel all standing tasks in this controller.

Parameters:
  • completion_status (str, optional) – the completion status for this cancellation. Defaults to TaskCompletionStatus.CANCELLED.

  • fl_ctx (Optional[FLContext], optional) – FLContext associated with this cancellation. Defaults to None.

cancel_task(task: Task, completion_status=TaskCompletionStatus.CANCELLED, fl_ctx: FLContext | None = None)[source]

Cancel the specified task.

Change the task completion_status, which will inform task monitor to clean up this task

note:

We only mark the task as completed and leave it to the task monitor to clean up.
This is to avoid potential deadlock of task_lock.
Parameters:
  • task (Task) – the task to be cancelled

  • completion_status (str, optional) – the completion status for this cancellation. Defaults to TaskCompletionStatus.CANCELLED.

  • fl_ctx (Optional[FLContext], optional) – FLContext associated with this cancellation. Defaults to None.

check_tasks()[source]

Checks if tasks should be exited.

client_is_active(client_name: str, reason: str, fl_ctx: FLContext)[source]

Called by the Engine to notify us that the client is active .

Parameters:
  • client_name – name of the client that is active

  • reason – why client is considered active

  • fl_ctx – the FLContext

finalize_run(fl_ctx: FLContext)[source]

Do cleanup of the coordinator implementation.

Attention

Subclass controllers should not overwrite finalize_run.

Parameters:

fl_ctx (FLContext) – FLContext associated with this action

get_client_disconnect_time(client_name: str)[source]

Get the time that the client was deemed disconnected

Parameters:

client_name – name of the client

Returns: time at which the client was deemed disconnected; or None if the client is not disconnected

get_num_standing_tasks() int[source]

Get the number of tasks that are currently standing.

Returns:

length of the list of standing tasks

Return type:

int

handle_event(event_type: str, fl_ctx: FLContext)[source]

Called when events are fired.

Parameters:
  • event_type (str) – all event types, including AppEventType and EventType

  • fl_ctx (FLContext) – FLContext information with current event type

handle_exception(task_id: str, fl_ctx: FLContext) None[source]

Called to cancel one task as its client_task is causing exception at upper level.

Parameters:
  • task_id (str) – an id to the failing client_task

  • fl_ctx (FLContext) – FLContext associated with this client_task

initialize_run(fl_ctx: FLContext)[source]

Called by runners to initialize controller with information in fl_ctx.

Attention

Controller subclasses must not overwrite this method.

Parameters:

fl_ctx (FLContext) – FLContext information

process_dead_client_report(client_name: str, fl_ctx: FLContext)[source]

Called by the Engine to process dead client report.

Parameters:
  • client_name – name of the client that dead report is received

  • fl_ctx – the FLContext

process_submission(client: Client, task_name: str, task_id: str, result: Shareable, fl_ctx: FLContext)[source]

Called to process a submission from one client.

Note

This method is called by a separate thread.

Parameters:
  • client (Client) – the client that submitted this task

  • task_name (str) – the task name associated this submission

  • task_id (str) – the id associated with the client_task

  • result (Shareable) – the actual submitted data from the client

  • fl_ctx (FLContext) – the FLContext associated with this submission

Raises:
  • TypeError – when client is not an instance of Client

  • TypeError – when fl_ctx is not an instance of FLContext

  • TypeError – when result is not an instance of Shareable

  • ValueError – task_name is not found in the client_task

process_task_check(task_id: str, fl_ctx: FLContext)[source]

Called by the Engine to check whether a specified task still exists. :param task_id: the id of the task :param fl_ctx: the FLContext

Returns: the ClientTask object if exists; None otherwise

process_task_request(client: Client, fl_ctx: FLContext) Tuple[str, str, Shareable][source]

Called by runner when a client asks for a task.

Note

This is called in a separate thread.

Parameters:
  • client (Client) – The record of one client requesting tasks

  • fl_ctx (FLContext) – The FLContext associated with this request

Raises:
  • TypeError – when client is not an instance of Client

  • TypeError – when fl_ctx is not an instance of FLContext

  • TypeError – when any standing task containing an invalid client_task

Returns:

task_name, an id for the client_task, and the data for this request

Return type:

Tuple[str, str, Shareable]

relay(task: Task, fl_ctx: FLContext, targets: List[Client] | List[str] | None = None, send_order: SendOrder = SendOrder.SEQUENTIAL, task_assignment_timeout: int = 0, task_result_timeout: int = 0, dynamic_targets: bool = True)[source]

Schedule a single task to targets in one-after-another style. This is a non-blocking call.

The task is scheduled into a task list. Clients can request tasks and controller will dispatch the task to eligible clients based on the send_order.

Parameters:
  • task (Task) – the task to be scheduled

  • fl_ctx (FLContext) – FLContext associated with this task

  • targets (Union[List[Client], List[str], None], optional) – the list of eligible clients or client names or None (all clients). Defaults to None.

  • send_order (SendOrder, optional) – the order for clients to become eligible. SEQUENTIAL means the order in targets is enforced. ANY means any clients that are inside the targets and haven’t received the task are eligible. Defaults to SendOrder.SEQUENTIAL.

  • task_assignment_timeout (int, optional) – how long to wait for one client to pick the task. Defaults to 0.

  • task_result_timeout (int, optional) – how long to wait for current working client to reply its result. Defaults to 0.

  • dynamic_targets (bool, optional) – allow clients not in targets to join at the end of targets list. Defaults to True.

Raises:
  • ValueError – when task_assignment_timeout is greater than task’s timeout

  • ValueError – when task_result_timeout is greater than task’s timeout

  • TypeError – send_order is not defined in SendOrder

  • TypeError – when dynamic_targets is not a boolean variable

  • ValueError – targets is None or an empty list but dynamic_targets is False

relay_and_wait(task: Task, fl_ctx: FLContext, targets: List[Client] | List[str] | None = None, send_order=SendOrder.SEQUENTIAL, task_assignment_timeout: int = 0, task_result_timeout: int = 0, dynamic_targets: bool = True, abort_signal: Signal | None = None)[source]

Schedule a single task to targets in one-after-another style. This is a blocking call.

The task is scheduled into a task list. Clients can request tasks and controller will dispatch the task to eligible clients based on the send_order.

Parameters:
  • task (Task) – the task to be scheduled

  • fl_ctx (FLContext) – FLContext associated with this task

  • targets (Union[List[Client], List[str], None], optional) – the list of eligible clients or client names or None (all clients). Defaults to None.

  • send_order (SendOrder, optional) – the order for clients to become eligible. SEQUENTIAL means the order in targets is enforced. ANY means clients in targets and haven’t received task are eligible for task. Defaults to SendOrder.SEQUENTIAL.

  • task_assignment_timeout (int, optional) – how long to wait for one client to pick the task. Defaults to 0.

  • task_result_timeout (int, optional) – how long to wait for current working client to reply its result. Defaults to 0.

  • dynamic_targets (bool, optional) – allow clients not in targets to join at the end of targets list. Defaults to True.

  • abort_signal (Optional[Signal], optional) – as this is a blocking call, this abort_signal informs this method to return. Defaults to None.

send(task: Task, fl_ctx: FLContext, targets: List[Client] | List[str] | None = None, send_order: SendOrder = SendOrder.SEQUENTIAL, task_assignment_timeout: int = 0)[source]

Schedule a single task to targets. This is a non-blocking call.

The task is scheduled into a task list. Clients can request tasks and controller will dispatch the task to eligible clients based on the send_order.

Parameters:
  • task (Task) – the task to be scheduled

  • fl_ctx (FLContext) – FLContext associated with this task

  • targets (Union[List[Client], List[str], None], optional) – the list of eligible clients or client names or None (all clients). Defaults to None.

  • send_order (SendOrder, optional) – the order for clients to become eligible. SEQUENTIAL means the order in targets is enforced. ANY means clients in targets and haven’t received task are eligible for task. Defaults to SendOrder.SEQUENTIAL.

  • task_assignment_timeout (int, optional) – how long to wait for one client to pick the task. Defaults to 0.

Raises:
  • ValueError – when task_assignment_timeout is greater than task’s timeout.

  • TypeError – send_order is not defined in SendOrder

  • ValueError – targets is None or an empty list

send_and_wait(task: Task, fl_ctx: FLContext, targets: List[Client] | List[str] | None = None, send_order: SendOrder = SendOrder.SEQUENTIAL, task_assignment_timeout: int = 0, abort_signal: Signal | None = None)[source]

Schedule a single task to targets. This is a blocking call.

The task is scheduled into a task list. Clients can request tasks and controller will dispatch the task to eligible clients based on the send_order.

Parameters:
  • task (Task) – the task to be scheduled

  • fl_ctx (FLContext) – FLContext associated with this task

  • targets (Union[List[Client], List[str], None], optional) – the list of eligible clients or client names or None (all clients). Defaults to None.

  • send_order (SendOrder, optional) – the order for clients to become eligible. SEQUENTIAL means the order in targets is enforced. ANY means clients in targets and haven’t received task are eligible for task. Defaults to SendOrder.SEQUENTIAL.

  • task_assignment_timeout (int, optional) – how long to wait for one client to pick the task. Defaults to 0.

  • abort_signal (Optional[Signal], optional) – as this is a blocking call, this abort_signal informs this method to return. Defaults to None.

wait_for_task(task: Task, abort_signal: Signal)[source]